# PATH: bot/core/speedtest.py import asyncio import os import time from typing import Optional import httpx CF_DOWN = "https://speed.cloudflare.com/__down" CF_UP = "https://speed.cloudflare.com/__up" def bytes_to_mb(n: float) -> float: return float(n) / (1024.0 * 1024.0) def bytes_per_sec_to_mb_s(bps: float) -> float: return float(bps) / (1024.0 * 1024.0) def disk_total_free(path: str = "/") -> dict: st = os.statvfs(path) total = st.f_frsize * st.f_blocks free = st.f_frsize * st.f_frsize * (st.f_bavail // st.f_frsize) if st.f_frsize else st.f_frsize * st.f_bavail free = st.f_frsize * st.f_bavail return {"total": int(total), "free": int(free)} async def ping_ms(host: str = "1.1.1.1", port: int = 443, timeout: float = 3.0) -> Optional[float]: """ "Ping" approximation via TCP connect time (no ICMP). Returns ms or None. """ t0 = time.perf_counter() try: fut = asyncio.open_connection(host, port) reader, writer = await asyncio.wait_for(fut, timeout=timeout) try: writer.close() if hasattr(writer, "wait_closed"): await writer.wait_closed() except Exception: pass return (time.perf_counter() - t0) * 1000.0 except Exception: return None async def public_ip(timeout: float = 6.0) -> Optional[str]: """ Best-effort public IP. Tries ipify, then Cloudflare trace. """ try: async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c: r = await c.get("https://api.ipify.org?format=json") if r.status_code < 400: j = r.json() ip = str(j.get("ip") or "").strip() if ip: return ip except Exception: pass try: async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c: r = await c.get("https://cloudflare.com/cdn-cgi/trace") if r.status_code < 400: txt = r.text or "" for ln in txt.splitlines(): if ln.startswith("ip="): ip = ln.split("=", 1)[1].strip() if ip: return ip except Exception: pass return None async def net_download_test(bytes_target: int = 8 * 1024 * 1024, timeout: float = 15.0) -> dict: """ Downloads bytes_target from Cloudflare endpoint and measures throughput. """ url = f"{CF_DOWN}?bytes={int(bytes_target)}" t0 = time.perf_counter() got = 0 limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client: async with client.stream("GET", url) as r: r.raise_for_status() async for chunk in r.aiter_bytes(): got += len(chunk) sec = max(1e-6, time.perf_counter() - t0) bps = got / sec return {"bytes": int(got), "seconds": float(sec), "bps": float(bps)} async def net_upload_test(bytes_target: int = 3 * 1024 * 1024, timeout: float = 20.0) -> dict: """ Uploads bytes_target to Cloudflare endpoint and measures throughput. """ payload = os.urandom(int(bytes_target)) t0 = time.perf_counter() limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client: r = await client.post(CF_UP, content=payload, headers={"content-type": "application/octet-stream"}) r.raise_for_status() sec = max(1e-6, time.perf_counter() - t0) bps = bytes_target / sec return {"bytes": int(bytes_target), "seconds": float(sec), "bps": float(bps)}