Spaces:
Runtime error
Runtime error
| # PATH: bot/core/speedtest.py | |
| import asyncio | |
| import os | |
| import time | |
| from typing import Optional | |
| import httpx | |
| CF_DOWN = "https://speed.cloudflare.com/__down" | |
| CF_UP = "https://speed.cloudflare.com/__up" | |
| def bytes_to_mb(n: float) -> float: | |
| return float(n) / (1024.0 * 1024.0) | |
| def bytes_per_sec_to_mb_s(bps: float) -> float: | |
| return float(bps) / (1024.0 * 1024.0) | |
| def disk_total_free(path: str = "/") -> dict: | |
| st = os.statvfs(path) | |
| total = st.f_frsize * st.f_blocks | |
| free = st.f_frsize * st.f_frsize * (st.f_bavail // st.f_frsize) if st.f_frsize else st.f_frsize * st.f_bavail | |
| free = st.f_frsize * st.f_bavail | |
| return {"total": int(total), "free": int(free)} | |
| async def ping_ms(host: str = "1.1.1.1", port: int = 443, timeout: float = 3.0) -> Optional[float]: | |
| """ | |
| "Ping" approximation via TCP connect time (no ICMP). | |
| Returns ms or None. | |
| """ | |
| t0 = time.perf_counter() | |
| try: | |
| fut = asyncio.open_connection(host, port) | |
| reader, writer = await asyncio.wait_for(fut, timeout=timeout) | |
| try: | |
| writer.close() | |
| if hasattr(writer, "wait_closed"): | |
| await writer.wait_closed() | |
| except Exception: | |
| pass | |
| return (time.perf_counter() - t0) * 1000.0 | |
| except Exception: | |
| return None | |
| async def public_ip(timeout: float = 6.0) -> Optional[str]: | |
| """ | |
| Best-effort public IP. | |
| Tries ipify, then Cloudflare trace. | |
| """ | |
| try: | |
| async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c: | |
| r = await c.get("https://api.ipify.org?format=json") | |
| if r.status_code < 400: | |
| j = r.json() | |
| ip = str(j.get("ip") or "").strip() | |
| if ip: | |
| return ip | |
| except Exception: | |
| pass | |
| try: | |
| async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c: | |
| r = await c.get("https://cloudflare.com/cdn-cgi/trace") | |
| if r.status_code < 400: | |
| txt = r.text or "" | |
| for ln in txt.splitlines(): | |
| if ln.startswith("ip="): | |
| ip = ln.split("=", 1)[1].strip() | |
| if ip: | |
| return ip | |
| except Exception: | |
| pass | |
| return None | |
| async def net_download_test(bytes_target: int = 8 * 1024 * 1024, timeout: float = 15.0) -> dict: | |
| """ | |
| Downloads bytes_target from Cloudflare endpoint and measures throughput. | |
| """ | |
| url = f"{CF_DOWN}?bytes={int(bytes_target)}" | |
| t0 = time.perf_counter() | |
| got = 0 | |
| limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) | |
| async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client: | |
| async with client.stream("GET", url) as r: | |
| r.raise_for_status() | |
| async for chunk in r.aiter_bytes(): | |
| got += len(chunk) | |
| sec = max(1e-6, time.perf_counter() - t0) | |
| bps = got / sec | |
| return {"bytes": int(got), "seconds": float(sec), "bps": float(bps)} | |
| async def net_upload_test(bytes_target: int = 3 * 1024 * 1024, timeout: float = 20.0) -> dict: | |
| """ | |
| Uploads bytes_target to Cloudflare endpoint and measures throughput. | |
| """ | |
| payload = os.urandom(int(bytes_target)) | |
| t0 = time.perf_counter() | |
| limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) | |
| async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client: | |
| r = await client.post(CF_UP, content=payload, headers={"content-type": "application/octet-stream"}) | |
| r.raise_for_status() | |
| sec = max(1e-6, time.perf_counter() - t0) | |
| bps = bytes_target / sec | |
| return {"bytes": int(bytes_target), "seconds": float(sec), "bps": float(bps)} |