File size: 2,360 Bytes
9660ed3
 
 
 
 
28c0b4f
9660ed3
 
 
28c0b4f
 
9660ed3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28c0b4f
 
9660ed3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28c0b4f
 
9660ed3
 
 
 
28c0b4f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# FILE: bot/core/progress.py
# NOTE: Updated so SpeedETA matches handlers.py (returns float speed, has eta_seconds)

from __future__ import annotations

import time
from collections import deque
from typing import Deque, Optional, Tuple


class SpeedETA:
    """Tracks moving-average speed and ETA.

    - update(sent, total) returns speed in bytes/sec (float)
    - eta_seconds is an int or None
    """

    def __init__(self, window_seconds: float = 12.0) -> None:
        self.window_seconds = float(window_seconds)
        self.samples: Deque[Tuple[float, int]] = deque()
        self.speed_bps: float = 0.0
        self.eta_seconds: Optional[int] = None

    def update(self, sent: int, total: int) -> float:
        # Be defensive: sent/total can be weird types in callbacks
        try:
            s = int(sent)
        except Exception:
            s = 0
        try:
            t = int(total) if total else 0
        except Exception:
            t = 0

        now = time.monotonic()
        self.samples.append((now, s))

        # keep only recent samples
        while self.samples and (now - self.samples[0][0]) > self.window_seconds:
            self.samples.popleft()

        if len(self.samples) >= 2:
            t0, b0 = self.samples[0]
            t1, b1 = self.samples[-1]
            dt = t1 - t0
            db = b1 - b0
            if dt > 0 and db >= 0:
                self.speed_bps = db / dt
            else:
                self.speed_bps = 0.0
        else:
            self.speed_bps = 0.0

        if t > 0 and self.speed_bps > 0:
            remaining = max(0, t - s)
            self.eta_seconds = int(remaining / self.speed_bps)
        else:
            self.eta_seconds = None

        return self.speed_bps


def human_bytes(n: float) -> str:
    try:
        n = float(n)
    except Exception:
        n = 0.0

    units = ["B", "KB", "MB", "GB", "TB"]
    i = 0
    while n >= 1024 and i < len(units) - 1:
        n /= 1024
        i += 1
    return f"{n:.2f}{units[i]}"


def human_eta(seconds: Optional[float]) -> str:
    if seconds is None:
        return "—"
    try:
        s = int(seconds)
    except Exception:
        return "—"
    if s < 0:
        s = 0
    m, s = divmod(s, 60)
    h, m = divmod(m, 60)
    if h:
        return f"{h}h {m}m"
    if m:
        return f"{m}m {s}s"
    return f"{s}s"