import aiohttp
import asyncio
import re
from bs4 import BeautifulSoup
import phonenumbers
import logging
import aiofiles
# Настройки
MAX_CONCURRENT_REQUESTS = 150 # Увеличено для повышения параллелизма
REQUEST_DELAY = 0.1 # Уменьшено для ускорения запросов
# Исключённые номера
EXCLUDED_NUMBERS = {
"+74278607828", "+74282296063", "+74895550089", "+79600380000",
"+79600360000", "+76765370000", "+76765380000", "+79099990870",
"+78000000000", "+79600390000", "+76765390000"
}
# Кэш для Telegram-ссылок
telegram_cache = {}
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
# Логирование
logging.basicConfig(level=logging.INFO, filename='errors.log')
logger = logging.getLogger(__name__)
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html'
}
async def fetch_page(session, url, retries=5):
"""Загружает страницу с повторными попытками и адаптивной задержкой."""
delay = REQUEST_DELAY
for attempt in range(retries):
try:
async with semaphore:
await asyncio.sleep(delay)
async with session.get(url, headers=HEADERS, timeout=10) as response:
if response.status == 429:
delay *= 2 # Увеличиваем задержку при ошибке 429
logger.warning(f"429 Too Many Requests для {url}, увеличение задержки до {delay} сек")
continue
return await response.text()
except Exception as e:
logger.error(f"Попытка {attempt + 1} не удалась для {url}: {e}")
await asyncio.sleep(2)
logger.warning(f"Не удалось загрузить {url} после {retries} попыток")
return None
async def check_telegram_link(session, telegram_url):
"""Проверяет, активна ли Telegram-ссылка."""
if telegram_url in telegram_cache:
return telegram_cache[telegram_url]
try:
async with semaphore:
await asyncio.sleep(REQUEST_DELAY)
async with session.get(telegram_url, headers=HEADERS, timeout=5) as response:
page_source = (await response.text()).upper()
result = any(phrase in page_source for phrase in [
"SEND MESSAGE", "WRITE A MESSAGE", "START CHATTING", "OPEN CHAT", "TELEGRAM"
])
telegram_cache[telegram_url] = result
return result
except Exception:
telegram_cache[telegram_url] = False
return False
async def check_telegram_links_bulk(session, telegram_links):
"""Пакетная проверка Telegram-ссылок."""
unique_links = set(telegram_links) # Удаляем дубликаты перед проверкой
tasks = [check_telegram_link(session, link) for link in unique_links]
results = await asyncio.gather(*tasks)
return {link for link, valid in zip(unique_links, results) if valid}
def extract_telegram_links(soup):
"""Извлекает Telegram-ссылки из HTML."""
return {link['href'] for link in soup.find_all('a', href=True) if 't.me/' in link['href']}
def find_phone_numbers(text):
"""Находит и валидирует номера телефонов."""
phone_pattern = r'(?:\+7|8)[\s\-()]*(?:495|800|9\d{2})[\s\-()]*\d{3}[\s\-()]*\d{2}[\s\-()]*\d{2}'
potential_numbers = re.findall(phone_pattern, text)
valid_numbers = set()
for number in potential_numbers:
cleaned = re.sub(r'[^\d+]', '', number)
if cleaned.startswith('8'):
cleaned = "+7" + cleaned[1:]
if cleaned not in EXCLUDED_NUMBERS:
try:
parsed_number = phonenumbers.parse(cleaned, "RU")
if phonenumbers.is_valid_number(parsed_number):
valid_numbers.add(cleaned)
except:
continue
return list(valid_numbers)
async def get_data(youtube_url, session, retries=3):
"""Извлекает Telegram-ссылки и номера телефонов с YouTube-канала."""
about_url = youtube_url + '/about' if not youtube_url.endswith('/about') else youtube_url
for attempt in range(retries):
page_source = await fetch_page(session, about_url)
if page_source:
break
logger.warning(f"Попытка {attempt + 1} не удалась для {youtube_url}, повторяю...")
await asyncio.sleep(3)
if not page_source:
return {'telegram': [], 'phones': []}
soup = BeautifulSoup(page_source, 'lxml')
telegram_links = extract_telegram_links(soup)
active_links = await check_telegram_links_bulk(session, telegram_links)
numbers = find_phone_numbers(page_source)
return {'telegram': list(active_links), 'phones': numbers}
async def process_channel(youtube_url, session, queue):
"""Обрабатывает один YouTube-канал и отправляет результат в очередь."""
print(f"Обрабатываю: {youtube_url}")
data = await get_data(youtube_url, session)
if data['telegram'] or data['phones']:
details = []
if data['telegram']:
details.append(f"TG: {', '.join(data['telegram'])}")
if data['phones']:
details.append(f"Phone: {', '.join(data['phones'])}")
await queue.put(f"{youtube_url} | {' | '.join(details)}")
async def write_results(queue):
"""Асинхронно записывает результаты в файл."""
async with aiofiles.open('COMBINED_RESULTS.txt', 'w', encoding='utf-8') as f:
await f.write("=== Parsing Results ===\n")
while True:
result = await queue.get()
if result is None: # Сигнал завершения
break
await f.write(f"{result}\n")
queue.task_done()
await f.write("\n=== Parsing Completed ===")
async def main():
"""Основная функция обработки."""
queue = asyncio.Queue()
writer_task = asyncio.create_task(write_results(queue))
async with aiohttp.ClientSession() as session:
with open('CHANNELS_LIST.txt', 'r', encoding='utf-8') as f:
channels = [line.strip() for line in f if line.strip()]
tasks = [process_channel(channel, session, queue) for channel in channels]
await asyncio.gather(*tasks)
await queue.put(None) # Сигнал завершения для writer_task
await writer_task
if __name__ == "__main__":
asyncio.run(main())