File size: 3,807 Bytes
c2f82fe
69a35bd
c37aecc
c2f82fe
69a35bd
 
c2f82fe
 
69a35bd
 
 
c37aecc
69a35bd
 
c2f82fe
 
c37aecc
69a35bd
 
c37aecc
 
69a35bd
 
61b7300
69a35bd
 
c2f82fe
69a35bd
 
 
 
 
 
c2f82fe
 
69a35bd
 
 
 
 
 
 
 
c2f82fe
 
 
 
69a35bd
61b7300
4626cf8
61b7300
 
4626cf8
 
 
61b7300
 
 
 
 
 
4626cf8
61b7300
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4626cf8
 
 
69a35bd
 
 
 
 
c2f82fe
c37aecc
c2f82fe
69a35bd
 
 
 
 
 
c2f82fe
69a35bd
 
 
c2f82fe
 
69a35bd
 
 
 
 
c2f82fe
 
69a35bd
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# PATH: bot/core/speedtest.py
import asyncio
import os
import time
from typing import Optional

import httpx

CF_DOWN = "https://speed.cloudflare.com/__down"
CF_UP = "https://speed.cloudflare.com/__up"


def bytes_to_mb(n: float) -> float:
    return float(n) / (1024.0 * 1024.0)


def bytes_per_sec_to_mb_s(bps: float) -> float:
    return float(bps) / (1024.0 * 1024.0)


def disk_total_free(path: str = "/") -> dict:
    st = os.statvfs(path)
    total = st.f_frsize * st.f_blocks
    free = st.f_frsize * st.f_frsize * (st.f_bavail // st.f_frsize) if st.f_frsize else st.f_frsize * st.f_bavail
    free = st.f_frsize * st.f_bavail
    return {"total": int(total), "free": int(free)}


async def ping_ms(host: str = "1.1.1.1", port: int = 443, timeout: float = 3.0) -> Optional[float]:
    """
    "Ping" approximation via TCP connect time (no ICMP).
    Returns ms or None.
    """
    t0 = time.perf_counter()
    try:
        fut = asyncio.open_connection(host, port)
        reader, writer = await asyncio.wait_for(fut, timeout=timeout)
        try:
            writer.close()
            if hasattr(writer, "wait_closed"):
                await writer.wait_closed()
        except Exception:
            pass
        return (time.perf_counter() - t0) * 1000.0
    except Exception:
        return None


async def public_ip(timeout: float = 6.0) -> Optional[str]:
    """
    Best-effort public IP.
    Tries ipify, then Cloudflare trace.
    """
    try:
        async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c:
            r = await c.get("https://api.ipify.org?format=json")
            if r.status_code < 400:
                j = r.json()
                ip = str(j.get("ip") or "").strip()
                if ip:
                    return ip
    except Exception:
        pass

    try:
        async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c:
            r = await c.get("https://cloudflare.com/cdn-cgi/trace")
            if r.status_code < 400:
                txt = r.text or ""
                for ln in txt.splitlines():
                    if ln.startswith("ip="):
                        ip = ln.split("=", 1)[1].strip()
                        if ip:
                            return ip
    except Exception:
        pass

    return None


async def net_download_test(bytes_target: int = 8 * 1024 * 1024, timeout: float = 15.0) -> dict:
    """
    Downloads bytes_target from Cloudflare endpoint and measures throughput.
    """
    url = f"{CF_DOWN}?bytes={int(bytes_target)}"
    t0 = time.perf_counter()
    got = 0

    limits = httpx.Limits(max_connections=10, max_keepalive_connections=5)
    async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client:
        async with client.stream("GET", url) as r:
            r.raise_for_status()
            async for chunk in r.aiter_bytes():
                got += len(chunk)

    sec = max(1e-6, time.perf_counter() - t0)
    bps = got / sec
    return {"bytes": int(got), "seconds": float(sec), "bps": float(bps)}


async def net_upload_test(bytes_target: int = 3 * 1024 * 1024, timeout: float = 20.0) -> dict:
    """
    Uploads bytes_target to Cloudflare endpoint and measures throughput.
    """
    payload = os.urandom(int(bytes_target))
    t0 = time.perf_counter()

    limits = httpx.Limits(max_connections=10, max_keepalive_connections=5)
    async with httpx.AsyncClient(timeout=timeout, limits=limits, follow_redirects=True) as client:
        r = await client.post(CF_UP, content=payload, headers={"content-type": "application/octet-stream"})
        r.raise_for_status()

    sec = max(1e-6, time.perf_counter() - t0)
    bps = bytes_target / sec
    return {"bytes": int(bytes_target), "seconds": float(sec), "bps": float(bps)}