""" Диагностика и бенчмарк провайдеров туннелей. Использование: from colab_tunnel import benchmark results = benchmark() # все провайдеры results = benchmark(['serveo', 'bore']) # выборочно """ import os import socket import time from dataclasses import dataclass from pathlib import Path from typing import Optional from requests import get as get_url from ._config import test_server_bin, test_server_bin_url from ._logger import logger from ._registry import proxies_functions from ._utils import run, kill_process_by_name, download @dataclass class TunnelBenchmarkResult: """Результат тестирования одного провайдера.""" name: str url: Optional[str] = None tunnel_time: float = 0.0 # Время получения ссылки (сек) latency: Optional[float] = None # RTT до публичного URL (сек) success: bool = False error: Optional[str] = None def find_free_port() -> int: """Находит свободный TCP-порт в системе.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] def _find_test_server_bin() -> Path: """ Ищет бинарник тестового сервера в следующем порядке: 1. Переменная окружения TEST_SERVER_PATH 2. ../test_server/test_server (рядом с пакетом) 3. ./test_server/test_server (текущая директория) Если не нашел - скачивает. """ candidates: list[Path] = [] env = os.environ.get('TEST_SERVER_PATH') env_path = Path(env) if env else None if env: candidates.append(env_path) if test_server_bin.exists(): candidates.append(test_server_bin) candidates += [ Path(__file__).parent.parent / 'test_server' / 'test_server', Path.cwd() / 'test_server' / 'test_server', ] bin_path = None for path in candidates: if path.exists() and path.is_file(): bin_path = path if not bin_path: downloaded = download(test_server_bin_url, save_path=test_server_bin.parent, progress=False) if downloaded and downloaded.exists(): downloaded.chmod(0o755) return downloaded raise FileNotFoundError( 'Бинарник тестового сервера не найден. ' 'Укажите путь через переменную окружения TEST_SERVER_PATH.' ) def _start_test_server(server_bin: Path, port: int) -> None: """Запускает тестовый HTTP-сервер в daemon-режиме.""" server_bin.chmod(0o755) run(f'"{server_bin}" -d -p {port}', timeout=10) time.sleep(1.5) # Даём серверу время на запуск logger.debug(f'Тестовый сервер запущен на порту {port}.') def _stop_test_server(server_bin: Path) -> None: """Останавливает тестовый HTTP-сервер.""" kill_process_by_name(server_bin.name) logger.debug('Тестовый сервер остановлен.') def _check_url_latency( url: str, timeout: float = 10.0, warmup_retries: int = 3, warmup_delay: float = 5.0, ) -> Optional[float]: """ Проверяет доступность публичного URL и возвращает RTT. Некоторые провайдеры (cloudflared, tunnelite) анонсируют URL раньше, чем маршрут фактически распространяется по их сети. cloudflared обычно требует 5–15 секунд после получения URL. Делаем несколько попыток с паузой между ними. Returns: RTT в секундах, или None если URL недоступен после всех попыток. """ # Убираем строку с IPv4-паролем, если провайдер вернул её второй строкой clean_url = url.split('\n')[0].strip() for attempt in range(warmup_retries): if attempt > 0: logger.debug( f'Повтор проверки доступности {attempt + 1}/{warmup_retries}: {clean_url}' ) time.sleep(warmup_delay) try: t = time.time() resp = get_url(clean_url, timeout=timeout, allow_redirects=True) elapsed = round(time.time() - t, 3) if 200 <= resp.status_code < 400: return elapsed logger.debug(f'HTTP {resp.status_code} от {clean_url}') except Exception as e: logger.debug(f'Попытка {attempt + 1}: {e}') return None def _print_report(results: list[TunnelBenchmarkResult]) -> None: """Выводит форматированную таблицу результатов.""" W = {'name': 14, 'status': 10, 'tunnel': 14, 'latency': 12} total_w = sum(W.values()) + 38 # 38 — ширина колонки URL sep = '=' * total_w print(f'\n{sep}') print(f'{"БЕНЧМАРК ПРОВАЙДЕРОВ ТУННЕЛЕЙ":^{total_w}}') print(sep) print( f'{"Провайдер":<{W["name"]}}' f'{"Статус":<{W["status"]}}' f'{"Туннель":<{W["tunnel"]}}' f'{"Задержка":<{W["latency"]}}' f'URL / Ошибка' ) print('-' * total_w) for r in results: status = '✓ OK ' if r.success else '✗ FAIL' t_str = f'{r.tunnel_time:.2f}с' lat_str = f'{r.latency * 1000:.0f}мс' if r.latency else '—' detail = (r.url or r.error or '').split('\n')[0] if len(detail) > 38: detail = detail[:35] + '…' print( f'{r.name:<{W["name"]}}' f'{status:<{W["status"]}}' f'{t_str:<{W["tunnel"]}}' f'{lat_str:<{W["latency"]}}' f'{detail}' ) print(sep) ok = [r for r in results if r.success] print(f'Итого: {len(ok)}/{len(results)} провайдеров работают.') if ok: fastest = min(ok, key=lambda r: r.tunnel_time) print(f'Быстрейший запуск : {fastest.name} ({fastest.tunnel_time:.2f}с)') with_lat = [r for r in ok if r.latency] if with_lat: lowest = min(with_lat, key=lambda r: r.latency) # type: ignore[arg-type] print(f'Минимальная задержка: {lowest.name} ({lowest.latency * 1000:.0f}мс)') # type: ignore[operator] # Пояснение для native — его URL требует сессионные куки Google Colab native_result = next((r for r in results if r.name == 'native'), None) if native_result and native_result.success and not native_result.latency: print('\n ℹ native: URL работает, но требует Google-авторизации —') print(' внешняя проверка без куки браузера всегда будет недоступна.') print() def benchmark( providers: list[str] | None = None, timeout_per_provider: float = 30.0, latency_timeout: float = 10.0, print_report: bool = True, ) -> list[TunnelBenchmarkResult]: """ Последовательно тестирует провайдеров туннелей и собирает метрики. По умолчанию тестирует только активных провайдеров (из proxies_functions). Передать список имён явно — включая отключённых — можно через параметр providers. Схема работы: 1. Запускает тестовый HTTP-сервер. 2. Последовательно запускает каждого провайдера, измеряя время старта. 3. Сразу проверяет RTT полученного URL. 4. После всех провайдеров — второй проход RTT для тех, кто не ответил с первого раза (cloudflared, tunnelite: маршрут распространяется 10–15с). 5. Останавливает тестовый сервер (блок finally). 6. Выводит форматированный отчёт. Args: providers: Имена провайдеров. None — все активные. timeout_per_provider: Таймаут ожидания ссылки от провайдера (сек). latency_timeout: Таймаут проверки RTT (сек). print_report: Вывести таблицу результатов. Returns: Список TunnelBenchmarkResult. """ server_bin = _find_test_server_bin() port = find_free_port() # Если провайдеры переданы явно — ищем в _all_providers (включая отключённых). # Если нет — берём только активных из proxies_functions. if providers is not None: from ._registry import _all_providers unknown = [n for n in providers if n not in _all_providers] if unknown: raise ValueError( f'Неизвестные провайдеры: {unknown}. ' f'Все зарегистрированные: {list(_all_providers)}' ) names = providers func_map = _all_providers else: names = list(proxies_functions.keys()) func_map = proxies_functions print(f'Бенчмарк: {len(names)} провайдеров, порт {port}.') _start_test_server(server_bin, port) print('Тестовый сервер запущен.\n') results: list[TunnelBenchmarkResult] = [] try: # Основной проход: запуск + первая проверка RTT for name in names: func = func_map[name] print(f'[{name:<12}] запуск...', end=' ', flush=True) result = TunnelBenchmarkResult(name=name) t_start = time.time() try: url = func(port) result.tunnel_time = round(time.time() - t_start, 2) result.url = url result.success = True print(f'OK ({result.tunnel_time:.2f}с). Проверка...', end=' ', flush=True) result.latency = _check_url_latency(url, timeout=latency_timeout) if result.latency: print(f'RTT {result.latency * 1000:.0f}мс') else: print('URL недоступен (туннель ещё поднимается?)') except Exception as e: result.tunnel_time = round(time.time() - t_start, 2) result.error = str(e) print(f'ОШИБКА: {str(e)[:100]}') logger.debug(f'[{name}] детали:', exc_info=True) results.append(result) # Второй проход: RTT для провайдеров с warmup-задержкой # К этому моменту прошло достаточно времени (пока тестировались # остальные провайдеры), чтобы маршруты cloudflared/tunnelite # успели распространиться по сети провайдера. needs_recheck = [ r for r in results if r.success and r.latency is None and r.name != 'native' ] if needs_recheck: print(f'\nПовторная проверка RTT ({len(needs_recheck)} провайдеров)...') for r in needs_recheck: r.latency = _check_url_latency(r.url, timeout=latency_timeout) # type: ignore[arg-type] status = f'RTT {r.latency * 1000:.0f}мс' if r.latency else 'всё ещё недоступен' print(f' [{r.name:<12}] {status}') finally: print() _stop_test_server(server_bin) print('Тестовый сервер остановлен.') if print_report: _print_report(results) return results