Spaces:
Runtime error
Runtime error
| # FILE: bot/core/progress.py | |
| # NOTE: Updated so SpeedETA matches handlers.py (returns float speed, has eta_seconds) | |
| from __future__ import annotations | |
| import time | |
| from collections import deque | |
| from typing import Deque, Optional, Tuple | |
| class SpeedETA: | |
| """Tracks moving-average speed and ETA. | |
| - update(sent, total) returns speed in bytes/sec (float) | |
| - eta_seconds is an int or None | |
| """ | |
| def __init__(self, window_seconds: float = 12.0) -> None: | |
| self.window_seconds = float(window_seconds) | |
| self.samples: Deque[Tuple[float, int]] = deque() | |
| self.speed_bps: float = 0.0 | |
| self.eta_seconds: Optional[int] = None | |
| def update(self, sent: int, total: int) -> float: | |
| # Be defensive: sent/total can be weird types in callbacks | |
| try: | |
| s = int(sent) | |
| except Exception: | |
| s = 0 | |
| try: | |
| t = int(total) if total else 0 | |
| except Exception: | |
| t = 0 | |
| now = time.monotonic() | |
| self.samples.append((now, s)) | |
| # keep only recent samples | |
| while self.samples and (now - self.samples[0][0]) > self.window_seconds: | |
| self.samples.popleft() | |
| if len(self.samples) >= 2: | |
| t0, b0 = self.samples[0] | |
| t1, b1 = self.samples[-1] | |
| dt = t1 - t0 | |
| db = b1 - b0 | |
| if dt > 0 and db >= 0: | |
| self.speed_bps = db / dt | |
| else: | |
| self.speed_bps = 0.0 | |
| else: | |
| self.speed_bps = 0.0 | |
| if t > 0 and self.speed_bps > 0: | |
| remaining = max(0, t - s) | |
| self.eta_seconds = int(remaining / self.speed_bps) | |
| else: | |
| self.eta_seconds = None | |
| return self.speed_bps | |
| def human_bytes(n: float) -> str: | |
| try: | |
| n = float(n) | |
| except Exception: | |
| n = 0.0 | |
| units = ["B", "KB", "MB", "GB", "TB"] | |
| i = 0 | |
| while n >= 1024 and i < len(units) - 1: | |
| n /= 1024 | |
| i += 1 | |
| return f"{n:.2f}{units[i]}" | |
| def human_eta(seconds: Optional[float]) -> str: | |
| if seconds is None: | |
| return "—" | |
| try: | |
| s = int(seconds) | |
| except Exception: | |
| return "—" | |
| if s < 0: | |
| s = 0 | |
| m, s = divmod(s, 60) | |
| h, m = divmod(m, 60) | |
| if h: | |
| return f"{h}h {m}m" | |
| if m: | |
| return f"{m}m {s}s" | |
| return f"{s}s" |