import asyncio
from playwright.async_api import async_playwright
import re
import phonenumbers
from phonenumbers import carrier
from urllib.parse import unquote, parse_qs, urlparse
import os
# Константы
COMMON_YT_NUMBERS = {
'+71065286052', '+71124342058', '+71741801509', '+74278607828',
'+74279440147', '+74282296063', '+74294967295', '+76861867695',
'+77741124305', '+78800000000', '+71741801753'
}
EXCLUDED_NUMBERS = {
'+74278607828', '+74282296063', '+74895550089', '+79600380000', '+79600360000',
'+76765370000', '+76765380000', '+79099990870', '+78000000000', '+79600390000',
'+76765390000'
}
# Telegram-парсер
async def check_telegram_link(page, telegram_url):
"""Проверка активности Telegram-ссылки."""
try:
await page.goto(telegram_url, wait_until='domcontentloaded')
page_source = (await page.content()).upper()
return any(phrase in page_source for phrase in ["SEND MESSAGE", "НАПИСАТЬ СООБЩЕНИЕ"])
except Exception as e:
print(f"Ошибка проверки Telegram: {str(e)}")
return False
def extract_telegram_from_redirect(url):
"""Извлечение Telegram-ссылки из редиректа YouTube."""
try:
parsed = urlparse(url)
if 'youtube.com/redirect' in url:
params = parse_qs(parsed.query)
if 'q' in params:
redirect_url = unquote(params['q'][0])
if 't.me/' in redirect_url:
return redirect_url
except:
pass
return None
def normalize_telegram_url(url):
"""Нормализация Telegram-URL."""
url = url.lower().replace('http://', 'https://')
parsed = urlparse(url)
path = parsed.path.rstrip('/')
return f'https://{parsed.netloc}{path}'
def find_telegram_links(text):
"""Поиск Telegram-ссылок в тексте."""
pattern = r'(?:https?://)?(?:t\.me|telegram\.(?:me|dog))/[a-zA-Z0-9_-]+'
matches = re.findall(pattern, text, re.IGNORECASE)
normalized = []
for match in matches:
if not match.startswith('http'):
match = f'https://{match}'
normalized.append(normalize_telegram_url(match))
return list(set(normalized))
# Phone-парсер
def find_phone_numbers(text):
"""Поиск номеров телефонов в тексте."""
matches = []
for match in phonenumbers.PhoneNumberMatcher(text, "RU"):
try:
if carrier.is_valid_number(match.number):
formatted_num = phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164)
if formatted_num not in EXCLUDED_NUMBERS:
matches.append(formatted_num)
except:
continue
patterns = [
r'(?:\+7|8)[\s\-()]*(?:495|800|903|9\d{2})[\s\-()]*\d{3}[\s\-()]*\d{2}[\s\-()]*\d{2}',
r'\b(?:\+7|8)(?:495|800|9\d{2})\d{7}\b'
]
for pattern in patterns:
for number in re.findall(pattern, text):
cleaned = re.sub(r'[^\d+]', '', number)
if cleaned.startswith('8') and len(cleaned) == 11:
formatted = f"+7{cleaned[1:]}"
elif cleaned.startswith('+7') and len(cleaned) == 12:
formatted = cleaned
else:
continue
if formatted not in EXCLUDED_NUMBERS and formatted not in matches:
matches.append(formatted)
return list(set(matches))
# Обработка страницы
async def process_page(page, url):
"""Загрузка страницы и взаимодействие с ней."""
await page.goto(url, wait_until='domcontentloaded')
try:
await page.wait_for_selector('#description', timeout=15000)
show_more_button = await page.query_selector('//tp-yt-paper-button[@id="expand"]')
if show_more_button:
await show_more_button.click()
await page.wait_for_timeout(2000)
except:
pass
for _ in range(3):
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(1000)
return await page.content()
async def get_data(page, youtube_url, max_retries=3):
"""Получение данных с YouTube-канала."""
for attempt in range(max_retries):
try:
about_url = youtube_url + '/about' if not youtube_url.endswith('/about') else youtube_url
page_source = await process_page(page, about_url)
# Поиск Telegram
telegram_links = set()
links = await page.query_selector_all('a')
for link in links:
try:
href = await link.get_attribute('href')
if href:
href = normalize_telegram_url(href)
if 't.me/' in href:
telegram_links.add(href)
elif 'youtube.com/redirect' in href:
redirect_url = extract_telegram_from_redirect(href)
if redirect_url:
telegram_links.add(normalize_telegram_url(redirect_url))
except:
continue
text_links = find_telegram_links(page_source)
telegram_links.update(text_links)
active_links = set()
for link in telegram_links:
normalized_link = normalize_telegram_url(link)
if await check_telegram_link(page, normalized_link):
active_links.add(normalized_link)
# Поиск номеров
numbers = find_phone_numbers(page_source)
return {
'telegram': list(active_links),
'phones': numbers
}
except Exception as e:
print(f"Попытка {attempt+1} ошибка: {str(e)}")
if attempt == max_retries-1:
return {'telegram': [], 'phones': []}
return {'telegram': [], 'phones': []}
# Сохранение результатов
def save_results(channel, data):
"""Сохранение результатов в файл."""
try:
with open('COMBINED_RESULTS.txt', 'a', encoding='utf-8') as f:
line = [channel]
has_tg = bool(data['telegram'])
if has_tg:
tg_links = ', '.join([f"t.me/{url.split('/')[-1]}" for url in data['telegram']])
line.append(f"TG: {tg_links}")
if not has_tg and data['phones']:
phones = ', '.join(data['phones'])
line.append(f"Phone: {phones}")
if len(line) > 1:
f.write(' | '.join(line) + '\n')
except Exception as e:
print(f"Ошибка сохранения: {str(e)}")
# Обработка канала
async def process_channel(youtube_url, context):
"""Обработка одного YouTube-канала."""
print(f"\nОбработка: {youtube_url}")
page = await context.new_page()
try:
data = await get_data(page, youtube_url)
save_results(youtube_url, data)
return data
except Exception as e:
print(f"Ошибка: {str(e)}")
return {'telegram': [], 'phones': []}
finally:
await page.close()
# Главная функция
async def main():
"""Запуск парсинга."""
with open('COMBINED_RESULTS.txt', 'w', encoding='utf-8') as f:
f.write("=== Результаты парсинга ===\n\n")
with open('CHANNELS_LIST.txt', 'r', encoding='utf-8') as f:
channels = [url.strip() for url in f.readlines() if url.strip()]
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
tasks = [process_channel(channel, context) for channel in channels]
await asyncio.gather(*tasks)
await browser.close()
with open('COMBINED_RESULTS.txt', 'a', encoding='utf-8') as f:
f.write("\n=== Парсинг завершен ===")
if __name__ == "__main__":
asyncio.run(main())