Смотри этот кодл парсит теелграмм контакты и номера
он использует для этого selenium ты можешь сохранить текущие настройки этого кода но сделать так что бы он использовал другую систему а именно асинхронный парсер
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from urllib.parse import unquote, parse_qs, urlparse
import phonenumbers
from phonenumbers import carrier
import time
import os
import re
os.environ['PYTHONWARNINGS'] = 'ignore:Unverified HTTPS request'
# Общие настройки
COMMON_YT_NUMBERS = {
'+71065286052', '+71124342058', '+71741801509', '+74278607828',
'+74279440147', '+74282296063', '+74294967295', '+76861867695',
'+77741124305', '+78800000000', '+71741801753'
}
EXCLUDED_NUMBERS = {
'+74278607828', '+74282296063', '+74895550089', '+79600380000', '+79600360000',
'+76765370000', '+76765380000','+79099990870', '+78000000000', '+79600390000',
'+76765390000'
}
def setup_driver():
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--log-level=3')
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
return webdriver.Chrome(options=chrome_options)
# Telegram-парсер
def check_telegram_link(driver, telegram_url):
try:
driver.get(telegram_url)
time.sleep(2)
page_source = driver.page_source.upper()
# Проверяем обе фразы
return any(
phrase in page_source
for phrase in [
"SEND MESSAGE",
"НАПИСАТЬ СООБЩЕНИЕ" # Добавлена русская фраза
]
)
except Exception as e:
print(f"Ошибка проверки Telegram: {str(e)}")
return False
def extract_telegram_from_redirect(url):
try:
parsed = urlparse(url)
if 'youtube.com/redirect' in url:
params = parse_qs(parsed.query)
if 'q' in params:
redirect_url = unquote(params['q'][0])
if 't.me/' in redirect_url:
return redirect_url
except:
pass
return None
def normalize_telegram_url(url):
# Приводим к нижнему регистру и нормализуем протокол
url = url.lower().replace('http://', 'https://')
# Удаляем дублирующие слеши и параметры
parsed = urlparse(url)
path = parsed.path.rstrip('/')
# Собираем нормализованный URL
return f'https://{parsed.netloc}{path}'
def find_telegram_links(text):
pattern = r'(?:https?://)?(?:t\.me|telegram\.(?:me|dog))/[a-zA-Z0-9_-]+'
matches = re.findall(pattern, text, re.IGNORECASE)
normalized = []
for match in matches:
if not match.startswith('http'):
match = f'https://{match}'
normalized.append(normalize_telegram_url(match))
return list(set(normalized)) # Удаление дублей на этапе поиска
# Phone-парсер
def find_phone_numbers(text):
matches = []
for match in phonenumbers.PhoneNumberMatcher(text, "RU"):
try:
if carrier.is_valid_number(match.number):
formatted_num = phonenumbers.format_number(match.number, phonenumbers.PhoneNumberFormat.E164)
if formatted_num not in EXCLUDED_NUMBERS:
matches.append(formatted_num)
except:
continue
patterns = [
r'(?:\+7|8)[\s\-()]*(?:495|800|903|9\d{2})[\s\-()]*\d{3}[\s\-()]*\d{2}[\s\-()]*\d{2}',
r'\b(?:\+7|8)(?:495|800|9\d{2})\d{7}\b'
]
for pattern in patterns:
for number in re.findall(pattern, text):
cleaned = re.sub(r'[^\d+]', '', number)
if cleaned.startswith('8') and len(cleaned) == 11:
formatted = f"+7{cleaned[1:]}"
elif cleaned.startswith('+7') and len(cleaned) == 12:
formatted = cleaned
else:
continue
if formatted not in EXCLUDED_NUMBERS and formatted not in matches:
matches.append(formatted)
return list(set(matches))
# Общая логика
def process_page(driver, url):
driver.get(url)
time.sleep(2)
try:
WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.ID, "description")))
show_more_button = driver.find_elements(By.XPATH, '//tp-yt-paper-button[@id="expand"]')
if show_more_button:
driver.execute_script("arguments[0].click();", show_more_button[0])
time.sleep(2)
except:
pass
for _ in range(3):
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(1)
return driver.page_source
def get_data(driver, youtube_url, max_retries=3, timeout=40):
for attempt in range(max_retries):
try:
about_url = youtube_url + '/about' if not youtube_url.endswith('/about') else youtube_url
driver.set_page_load_timeout(timeout)
page_source = process_page(driver, about_url)
# Поиск Telegram
telegram_links = set()
for link in driver.find_elements(By.TAG_NAME, 'a'):
try:
href = link.get_attribute('href')
if href:
href = normalize_telegram_url(href)
if 't.me/' in href:
telegram_links.add(href)
elif 'youtube.com/redirect' in href:
redirect_url = extract_telegram_from_redirect(href)
if redirect_url:
telegram_links.add(normalize_telegram_url(redirect_url))
except:
continue
text_links = find_telegram_links(page_source)
telegram_links.update(text_links)
active_links = set()
for link in telegram_links:
normalized_link = normalize_telegram_url(link)
if check_telegram_link(driver, normalized_link):
active_links.add(normalized_link)
# Поиск номеров
numbers = find_phone_numbers(page_source)
return {
'telegram': list(active_links),
'phones': numbers
}
except Exception as e:
print(f"Попытка {attempt+1} ошибка: {str(e)}")
if attempt == max_retries-1:
return {'telegram': [], 'phones': []}
driver.quit()
driver = setup_driver()
return {'telegram': [], 'phones': []}
def save_results(channel, data):
try:
with open('COMBINED_RESULTS.txt', 'a', encoding='utf-8') as f:
line = [channel]
has_tg = bool(data['telegram'])
# Добавляем Telegram только если он есть
if has_tg:
tg_links = ', '.join([f"t.me/{url.split('/')[-1]}" for url in data['telegram']])
line.append(f"TG: {tg_links}")
# Добавляем телефоны ТОЛЬКО если нет Telegram
if not has_tg and data['phones']:
phones = ', '.join(data['phones'])
line.append(f"Phone: {phones}")
# Записываем только если есть данные
if len(line) > 1:
f.write(' | '.join(line) + '\n')
except Exception as e:
print(f"Ошибка сохранения: {str(e)}")
def process_channel(youtube_url):
print(f"\nОбработка: {youtube_url}")
driver = setup_driver()
try:
data = get_data(driver, youtube_url)
save_results(youtube_url, data)
return data
except Exception as e:
print(f"Ошибка: {str(e)}")
return {'telegram': [], 'phones': []}
finally:
driver.quit()
def main():
with open('COMBINED_RESULTS.txt', 'w', encoding='utf-8') as f:
f.write("=== Результаты парсинга ===\n\n")
with open('CHANNELS_LIST.txt', 'r', encoding='utf-8') as f:
channels = [url.strip() for url in f.readlines() if url.strip()]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(process_channel, channels)
with open('COMBINED_RESULTS.txt', 'a', encoding='utf-8') as f:
f.write("\n=== Парсинг завершен ===")
if __name__ == "__main__":
main()