Без названия

2025-03-20 22:53 | Публичная
import asyncio
from playwright.async_api import async_playwright
import re
import phonenumbers
from phonenumbers import carrier
from urllib.parse import unquote, parse_qs, urlparse
import os

# Константы
COMMON_YT_NUMBERS = {
    '+71065286052', '+71124342058', '+71741801509', '+74278607828',
    '+74279440147', '+74282296063', '+74294967295', '+76861867695',
    '+77741124305', '+78800000000', '+71741801753'
}

EXCLUDED_NUMBERS = {
    '+74278607828', '+74282296063', '+74895550089', '+79600380000', '+79600360000',
    '+76765370000', '+76765380000', '+79099990870', '+78000000000', '+79600390000',
    '+76765390000'
}

# Telegram-парсер
async def check_telegram_link(page, telegram_url):
    """Проверка активности Telegram-ссылки."""
    try:
        await page.goto(telegram_url, wait_until='domcontentloaded')
        page_source = (await page.content()).upper()
        return any(phrase in page_source for phrase in ["SEND MESSAGE", "НАПИСАТЬ СООБЩЕНИЕ"])
    except Exception as e:
        print(f"Ошибка проверки Telegram: {str(e)}")
        return False

def extract_telegram_from_redirect(url):
    """Извлечение Telegram-ссылки из редиректа YouTube."""
    try:
        parsed = urlparse(url)
        if 'youtube.com/redirect' in url:
            params = parse_qs(parsed.query)
            if 'q' in params:
                redirect_url = unquote(params['q'][0])
                if 't.me/' in redirect_url:
                    return redirect_url
    except:
        pass
    return None

def normalize_telegram_url(url):
    """Нормализация Telegram-URL."""
    url = url.lower().replace('http://', 'https://')
    parsed = urlparse(url)
    path = parsed.path.rstrip('/')
    return f'https://{parsed.netloc}{path}'

def find_telegram_links(text):
    """Поиск Telegram-ссылок в тексте."""
    pattern = r'(?:https?://)?(?:t\.me|telegram\.(?:me|dog))/[a-zA-Z0-9_-]+'
    matches = re.findall(pattern, text, re.IGNORECASE)
    normalized = []
    for match in matches:
        if not match.startswith('http'):
            match = f'https://{match}'
        normalized.append(normalize_telegram_url(match))
    return list(set(normalized))

# Phone-парсер
def find_phone_numbers(text):
    """Поиск номеров телефонов в тексте."""
    matches = []
    for match in phonenumbers.PhoneNumberMatcher(text, "RU"):
        try:
            if carrier.is_valid_number(match.number):
                formatted_num = phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164)
                if formatted_num not in EXCLUDED_NUMBERS:
                    matches.append(formatted_num)
        except:
            continue

    patterns = [
        r'(?:\+7|8)[\s\-()]*(?:495|800|903|9\d{2})[\s\-()]*\d{3}[\s\-()]*\d{2}[\s\-()]*\d{2}',
        r'\b(?:\+7|8)(?:495|800|9\d{2})\d{7}\b'
    ]
    
    for pattern in patterns:
        for number in re.findall(pattern, text):
            cleaned = re.sub(r'[^\d+]', '', number)
            if cleaned.startswith('8') and len(cleaned) == 11:
                formatted = f"+7{cleaned[1:]}"
            elif cleaned.startswith('+7') and len(cleaned) == 12:
                formatted = cleaned
            else:
                continue
            if formatted not in EXCLUDED_NUMBERS and formatted not in matches:
                matches.append(formatted)

    return list(set(matches))

# Обработка страницы
async def process_page(page, url):
    """Загрузка страницы и взаимодействие с ней."""
    await page.goto(url, wait_until='domcontentloaded')
    
    try:
        await page.wait_for_selector('#description', timeout=15000)
        show_more_button = await page.query_selector('//tp-yt-paper-button[@id="expand"]')
        if show_more_button:
            await show_more_button.click()
            await page.wait_for_timeout(2000)
    except:
        pass
    
    for _ in range(3):
        await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
        await page.wait_for_timeout(1000)
    
    return await page.content()

async def get_data(page, youtube_url, max_retries=3):
    """Получение данных с YouTube-канала."""
    for attempt in range(max_retries):
        try:
            about_url = youtube_url + '/about' if not youtube_url.endswith('/about') else youtube_url
            page_source = await process_page(page, about_url)
            
            # Поиск Telegram
            telegram_links = set()
            links = await page.query_selector_all('a')
            for link in links:
                try:
                    href = await link.get_attribute('href')
                    if href:
                        href = normalize_telegram_url(href)
                        if 't.me/' in href:
                            telegram_links.add(href)
                        elif 'youtube.com/redirect' in href:
                            redirect_url = extract_telegram_from_redirect(href)
                            if redirect_url:
                                telegram_links.add(normalize_telegram_url(redirect_url))
                except:
                    continue
            
            text_links = find_telegram_links(page_source)
            telegram_links.update(text_links)
            
            active_links = set()
            for link in telegram_links:
                normalized_link = normalize_telegram_url(link)
                if await check_telegram_link(page, normalized_link):
                    active_links.add(normalized_link)
            
            # Поиск номеров
            numbers = find_phone_numbers(page_source)
            
            return {
                'telegram': list(active_links),
                'phones': numbers
            }
        except Exception as e:
            print(f"Попытка {attempt+1} ошибка: {str(e)}")
            if attempt == max_retries-1:
                return {'telegram': [], 'phones': []}
    
    return {'telegram': [], 'phones': []}

# Сохранение результатов
def save_results(channel, data):
    """Сохранение результатов в файл."""
    try:
        with open('COMBINED_RESULTS.txt', 'a', encoding='utf-8') as f:
            line = [channel]
            has_tg = bool(data['telegram'])
            
            if has_tg:
                tg_links = ', '.join([f"t.me/{url.split('/')[-1]}" for url in data['telegram']])
                line.append(f"TG: {tg_links}")
            
            if not has_tg and data['phones']:
                phones = ', '.join(data['phones'])
                line.append(f"Phone: {phones}")
            
            if len(line) > 1:
                f.write(' | '.join(line) + '\n')
    except Exception as e:
        print(f"Ошибка сохранения: {str(e)}")

# Обработка канала
async def process_channel(youtube_url, context):
    """Обработка одного YouTube-канала."""
    print(f"\nОбработка: {youtube_url}")
    page = await context.new_page()
    try:
        data = await get_data(page, youtube_url)
        save_results(youtube_url, data)
        return data
    except Exception as e:
        print(f"Ошибка: {str(e)}")
        return {'telegram': [], 'phones': []}
    finally:
        await page.close()

# Главная функция
async def main():
    """Запуск парсинга."""
    with open('COMBINED_RESULTS.txt', 'w', encoding='utf-8') as f:
        f.write("=== Результаты парсинга ===\n\n")
    
    with open('CHANNELS_LIST.txt', 'r', encoding='utf-8') as f:
        channels = [url.strip() for url in f.readlines() if url.strip()]
    
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
        )
        
        tasks = [process_channel(channel, context) for channel in channels]
        await asyncio.gather(*tasks)
        
        await browser.close()
    
    with open('COMBINED_RESULTS.txt', 'a', encoding='utf-8') as f:
        f.write("\n=== Парсинг завершен ===")

if __name__ == "__main__":
    asyncio.run(main())
Вернуться ко Всем Вставкам
Открыть чат
Чат с Send-Code AI Закрыть чат