cl / colab_tunnel /_diagnostics.py
rottenstuff's picture
Upload 13 files
c0e2219 verified
Raw
History Blame Contribute Delete
12.9 kB
"""
Диагностика и бенчмарк провайдеров туннелей.
Использование:
from colab_tunnel import benchmark
results = benchmark() # все провайдеры
results = benchmark(['serveo', 'bore']) # выборочно
"""
import os
import socket
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from requests import get as get_url
from ._config import test_server_bin, test_server_bin_url
from ._logger import logger
from ._registry import proxies_functions
from ._utils import run, kill_process_by_name, download
@dataclass
class TunnelBenchmarkResult:
"""Результат тестирования одного провайдера."""
name: str
url: Optional[str] = None
tunnel_time: float = 0.0 # Время получения ссылки (сек)
latency: Optional[float] = None # RTT до публичного URL (сек)
success: bool = False
error: Optional[str] = None
def find_free_port() -> int:
"""Находит свободный TCP-порт в системе."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
def _find_test_server_bin() -> Path:
"""
Ищет бинарник тестового сервера в следующем порядке:
1. Переменная окружения TEST_SERVER_PATH
2. ../test_server/test_server (рядом с пакетом)
3. ./test_server/test_server (текущая директория)
Если не нашел - скачивает.
"""
candidates: list[Path] = []
env = os.environ.get('TEST_SERVER_PATH')
env_path = Path(env) if env else None
if env:
candidates.append(env_path)
if test_server_bin.exists():
candidates.append(test_server_bin)
candidates += [
Path(__file__).parent.parent / 'test_server' / 'test_server',
Path.cwd() / 'test_server' / 'test_server',
]
bin_path = None
for path in candidates:
if path.exists() and path.is_file():
bin_path = path
if not bin_path:
downloaded = download(test_server_bin_url, save_path=test_server_bin.parent, progress=False)
if downloaded and downloaded.exists():
downloaded.chmod(0o755)
return downloaded
raise FileNotFoundError(
'Бинарник тестового сервера не найден. '
'Укажите путь через переменную окружения TEST_SERVER_PATH.'
)
def _start_test_server(server_bin: Path, port: int) -> None:
"""Запускает тестовый HTTP-сервер в daemon-режиме."""
server_bin.chmod(0o755)
run(f'"{server_bin}" -d -p {port}', timeout=10)
time.sleep(1.5) # Даём серверу время на запуск
logger.debug(f'Тестовый сервер запущен на порту {port}.')
def _stop_test_server(server_bin: Path) -> None:
"""Останавливает тестовый HTTP-сервер."""
kill_process_by_name(server_bin.name)
logger.debug('Тестовый сервер остановлен.')
def _check_url_latency(
url: str,
timeout: float = 10.0,
warmup_retries: int = 3,
warmup_delay: float = 5.0,
) -> Optional[float]:
"""
Проверяет доступность публичного URL и возвращает RTT.
Некоторые провайдеры (cloudflared, tunnelite) анонсируют URL раньше,
чем маршрут фактически распространяется по их сети. cloudflared
обычно требует 5–15 секунд после получения URL.
Делаем несколько попыток с паузой между ними.
Returns:
RTT в секундах, или None если URL недоступен после всех попыток.
"""
# Убираем строку с IPv4-паролем, если провайдер вернул её второй строкой
clean_url = url.split('\n')[0].strip()
for attempt in range(warmup_retries):
if attempt > 0:
logger.debug(
f'Повтор проверки доступности {attempt + 1}/{warmup_retries}: {clean_url}'
)
time.sleep(warmup_delay)
try:
t = time.time()
resp = get_url(clean_url, timeout=timeout, allow_redirects=True)
elapsed = round(time.time() - t, 3)
if 200 <= resp.status_code < 400:
return elapsed
logger.debug(f'HTTP {resp.status_code} от {clean_url}')
except Exception as e:
logger.debug(f'Попытка {attempt + 1}: {e}')
return None
def _print_report(results: list[TunnelBenchmarkResult]) -> None:
"""Выводит форматированную таблицу результатов."""
W = {'name': 14, 'status': 10, 'tunnel': 14, 'latency': 12}
total_w = sum(W.values()) + 38 # 38 — ширина колонки URL
sep = '=' * total_w
print(f'\n{sep}')
print(f'{"БЕНЧМАРК ПРОВАЙДЕРОВ ТУННЕЛЕЙ":^{total_w}}')
print(sep)
print(
f'{"Провайдер":<{W["name"]}}'
f'{"Статус":<{W["status"]}}'
f'{"Туннель":<{W["tunnel"]}}'
f'{"Задержка":<{W["latency"]}}'
f'URL / Ошибка'
)
print('-' * total_w)
for r in results:
status = '✓ OK ' if r.success else '✗ FAIL'
t_str = f'{r.tunnel_time:.2f}с'
lat_str = f'{r.latency * 1000:.0f}мс' if r.latency else '—'
detail = (r.url or r.error or '').split('\n')[0]
if len(detail) > 38:
detail = detail[:35] + '…'
print(
f'{r.name:<{W["name"]}}'
f'{status:<{W["status"]}}'
f'{t_str:<{W["tunnel"]}}'
f'{lat_str:<{W["latency"]}}'
f'{detail}'
)
print(sep)
ok = [r for r in results if r.success]
print(f'Итого: {len(ok)}/{len(results)} провайдеров работают.')
if ok:
fastest = min(ok, key=lambda r: r.tunnel_time)
print(f'Быстрейший запуск : {fastest.name} ({fastest.tunnel_time:.2f}с)')
with_lat = [r for r in ok if r.latency]
if with_lat:
lowest = min(with_lat, key=lambda r: r.latency) # type: ignore[arg-type]
print(f'Минимальная задержка: {lowest.name} ({lowest.latency * 1000:.0f}мс)') # type: ignore[operator]
# Пояснение для native — его URL требует сессионные куки Google Colab
native_result = next((r for r in results if r.name == 'native'), None)
if native_result and native_result.success and not native_result.latency:
print('\n ℹ native: URL работает, но требует Google-авторизации —')
print(' внешняя проверка без куки браузера всегда будет недоступна.')
print()
def benchmark(
providers: list[str] | None = None,
timeout_per_provider: float = 30.0,
latency_timeout: float = 10.0,
print_report: bool = True,
) -> list[TunnelBenchmarkResult]:
"""
Последовательно тестирует провайдеров туннелей и собирает метрики.
По умолчанию тестирует только активных провайдеров (из proxies_functions).
Передать список имён явно — включая отключённых — можно через параметр providers.
Схема работы:
1. Запускает тестовый HTTP-сервер.
2. Последовательно запускает каждого провайдера, измеряя время старта.
3. Сразу проверяет RTT полученного URL.
4. После всех провайдеров — второй проход RTT для тех, кто не ответил
с первого раза (cloudflared, tunnelite: маршрут распространяется 10–15с).
5. Останавливает тестовый сервер (блок finally).
6. Выводит форматированный отчёт.
Args:
providers: Имена провайдеров. None — все активные.
timeout_per_provider: Таймаут ожидания ссылки от провайдера (сек).
latency_timeout: Таймаут проверки RTT (сек).
print_report: Вывести таблицу результатов.
Returns:
Список TunnelBenchmarkResult.
"""
server_bin = _find_test_server_bin()
port = find_free_port()
# Если провайдеры переданы явно — ищем в _all_providers (включая отключённых).
# Если нет — берём только активных из proxies_functions.
if providers is not None:
from ._registry import _all_providers
unknown = [n for n in providers if n not in _all_providers]
if unknown:
raise ValueError(
f'Неизвестные провайдеры: {unknown}. '
f'Все зарегистрированные: {list(_all_providers)}'
)
names = providers
func_map = _all_providers
else:
names = list(proxies_functions.keys())
func_map = proxies_functions
print(f'Бенчмарк: {len(names)} провайдеров, порт {port}.')
_start_test_server(server_bin, port)
print('Тестовый сервер запущен.\n')
results: list[TunnelBenchmarkResult] = []
try:
# Основной проход: запуск + первая проверка RTT
for name in names:
func = func_map[name]
print(f'[{name:<12}] запуск...', end=' ', flush=True)
result = TunnelBenchmarkResult(name=name)
t_start = time.time()
try:
url = func(port)
result.tunnel_time = round(time.time() - t_start, 2)
result.url = url
result.success = True
print(f'OK ({result.tunnel_time:.2f}с). Проверка...', end=' ', flush=True)
result.latency = _check_url_latency(url, timeout=latency_timeout)
if result.latency:
print(f'RTT {result.latency * 1000:.0f}мс')
else:
print('URL недоступен (туннель ещё поднимается?)')
except Exception as e:
result.tunnel_time = round(time.time() - t_start, 2)
result.error = str(e)
print(f'ОШИБКА: {str(e)[:100]}')
logger.debug(f'[{name}] детали:', exc_info=True)
results.append(result)
# Второй проход: RTT для провайдеров с warmup-задержкой
# К этому моменту прошло достаточно времени (пока тестировались
# остальные провайдеры), чтобы маршруты cloudflared/tunnelite
# успели распространиться по сети провайдера.
needs_recheck = [
r for r in results
if r.success and r.latency is None and r.name != 'native'
]
if needs_recheck:
print(f'\nПовторная проверка RTT ({len(needs_recheck)} провайдеров)...')
for r in needs_recheck:
r.latency = _check_url_latency(r.url, timeout=latency_timeout) # type: ignore[arg-type]
status = f'RTT {r.latency * 1000:.0f}мс' if r.latency else 'всё ещё недоступен'
print(f' [{r.name:<12}] {status}')
finally:
print()
_stop_test_server(server_bin)
print('Тестовый сервер остановлен.')
if print_report:
_print_report(results)
return results