Без названия

2025-03-20 23:14 | Публичная
import aiohttp
import asyncio
import re
from bs4 import BeautifulSoup
import phonenumbers
import logging
import aiofiles

# Настройки
MAX_CONCURRENT_REQUESTS = 150  # Увеличено для повышения параллелизма
REQUEST_DELAY = 0.1  # Уменьшено для ускорения запросов

# Исключённые номера
EXCLUDED_NUMBERS = {
    "+74278607828", "+74282296063", "+74895550089", "+79600380000",
    "+79600360000", "+76765370000", "+76765380000", "+79099990870",
    "+78000000000", "+79600390000", "+76765390000"
}

# Кэш для Telegram-ссылок
telegram_cache = {}
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)

# Логирование
logging.basicConfig(level=logging.INFO, filename='errors.log')
logger = logging.getLogger(__name__)

HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                  '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
    'Accept': 'text/html'
}

async def fetch_page(session, url, retries=5):
    """Загружает страницу с повторными попытками и адаптивной задержкой."""
    delay = REQUEST_DELAY
    for attempt in range(retries):
        try:
            async with semaphore:
                await asyncio.sleep(delay)
                async with session.get(url, headers=HEADERS, timeout=10) as response:
                    if response.status == 429:
                        delay *= 2  # Увеличиваем задержку при ошибке 429
                        logger.warning(f"429 Too Many Requests для {url}, увеличение задержки до {delay} сек")
                        continue
                    return await response.text()
        except Exception as e:
            logger.error(f"Попытка {attempt + 1} не удалась для {url}: {e}")
            await asyncio.sleep(2)
    logger.warning(f"Не удалось загрузить {url} после {retries} попыток")
    return None

async def check_telegram_link(session, telegram_url):
    """Проверяет, активна ли Telegram-ссылка."""
    if telegram_url in telegram_cache:
        return telegram_cache[telegram_url]
    try:
        async with semaphore:
            await asyncio.sleep(REQUEST_DELAY)
            async with session.get(telegram_url, headers=HEADERS, timeout=5) as response:
                page_source = (await response.text()).upper()
                result = any(phrase in page_source for phrase in [
                    "SEND MESSAGE", "WRITE A MESSAGE", "START CHATTING", "OPEN CHAT", "TELEGRAM"
                ])
                telegram_cache[telegram_url] = result
                return result
    except Exception:
        telegram_cache[telegram_url] = False
        return False

async def check_telegram_links_bulk(session, telegram_links):
    """Пакетная проверка Telegram-ссылок."""
    unique_links = set(telegram_links)  # Удаляем дубликаты перед проверкой
    tasks = [check_telegram_link(session, link) for link in unique_links]
    results = await asyncio.gather(*tasks)
    return {link for link, valid in zip(unique_links, results) if valid}

def extract_telegram_links(soup):
    """Извлекает Telegram-ссылки из HTML."""
    return {link['href'] for link in soup.find_all('a', href=True) if 't.me/' in link['href']}

def find_phone_numbers(text):
    """Находит и валидирует номера телефонов."""
    phone_pattern = r'(?:\+7|8)[\s\-()]*(?:495|800|9\d{2})[\s\-()]*\d{3}[\s\-()]*\d{2}[\s\-()]*\d{2}'
    potential_numbers = re.findall(phone_pattern, text)
    valid_numbers = set()

    for number in potential_numbers:
        cleaned = re.sub(r'[^\d+]', '', number)
        if cleaned.startswith('8'):
            cleaned = "+7" + cleaned[1:]
        if cleaned not in EXCLUDED_NUMBERS:
            try:
                parsed_number = phonenumbers.parse(cleaned, "RU")
                if phonenumbers.is_valid_number(parsed_number):
                    valid_numbers.add(cleaned)
            except:
                continue
    return list(valid_numbers)

async def get_data(youtube_url, session, retries=3):
    """Извлекает Telegram-ссылки и номера телефонов с YouTube-канала."""
    about_url = youtube_url + '/about' if not youtube_url.endswith('/about') else youtube_url
    
    for attempt in range(retries):
        page_source = await fetch_page(session, about_url)
        if page_source:
            break
        logger.warning(f"Попытка {attempt + 1} не удалась для {youtube_url}, повторяю...")
        await asyncio.sleep(3)
    
    if not page_source:
        return {'telegram': [], 'phones': []}
    
    soup = BeautifulSoup(page_source, 'lxml')
    telegram_links = extract_telegram_links(soup)
    active_links = await check_telegram_links_bulk(session, telegram_links)
    numbers = find_phone_numbers(page_source)
    
    return {'telegram': list(active_links), 'phones': numbers}

async def process_channel(youtube_url, session, queue):
    """Обрабатывает один YouTube-канал и отправляет результат в очередь."""
    print(f"Обрабатываю: {youtube_url}")
    data = await get_data(youtube_url, session)
    
    if data['telegram'] or data['phones']:
        details = []
        if data['telegram']:
            details.append(f"TG: {', '.join(data['telegram'])}")
        if data['phones']:
            details.append(f"Phone: {', '.join(data['phones'])}")
        await queue.put(f"{youtube_url} | {' | '.join(details)}")

async def write_results(queue):
    """Асинхронно записывает результаты в файл."""
    async with aiofiles.open('COMBINED_RESULTS.txt', 'w', encoding='utf-8') as f:
        await f.write("=== Parsing Results ===\n")
        while True:
            result = await queue.get()
            if result is None:  # Сигнал завершения
                break
            await f.write(f"{result}\n")
            queue.task_done()
        await f.write("\n=== Parsing Completed ===")

async def main():
    """Основная функция обработки."""
    queue = asyncio.Queue()
    writer_task = asyncio.create_task(write_results(queue))
    
    async with aiohttp.ClientSession() as session:
        with open('CHANNELS_LIST.txt', 'r', encoding='utf-8') as f:
            channels = [line.strip() for line in f if line.strip()]
        
        tasks = [process_channel(channel, session, queue) for channel in channels]
        await asyncio.gather(*tasks)
    
    await queue.put(None)  # Сигнал завершения для writer_task
    await writer_task

if __name__ == "__main__":
    asyncio.run(main())
Вернуться ко Всем Вставкам
Открыть чат
Чат с Send-Code AI Закрыть чат