Spaces:
Running on Zero
Running on Zero
| """Periodic host + CPU-worker-pool telemetry sampler. | |
| V3.1 scope: persistent subprocess pool only. Spawn mode + remote worker | |
| dispatch are deferred — this sampler reads from `cpu_worker_pool.stats()` | |
| directly and skips if the pool isn't started. | |
| A background daemon thread wakes every `TELEMETRY_SAMPLE_SECONDS`, reads: | |
| - host: CPU%, load_1m, memory, swap (via psutil) | |
| - pool: active/free counts, per-worker CPU%/RSS/is_busy/total_jobs | |
| and appends a row to the telemetry Parquet scheduler. The scheduler pushes | |
| every `TELEMETRY_FLUSH_MINUTES`. | |
| Rows are independent of per-request log rows — analysts join by timestamp | |
| window (for each request row in `-logs` dataset, telemetry rows with | |
| `timestamp ∈ [request.timestamp, request.timestamp + request.wall_total_s]` | |
| describe what the host was doing during that run). | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import Any, Optional | |
| # ---------------------------------------------------------------------------- | |
| # psutil lazy init — the first `cpu_percent()` call always returns 0.0 (it | |
| # measures the delta since the previous call). Prime process-level probes on | |
| # startup so the first telemetry sample reports a meaningful number. | |
| # ---------------------------------------------------------------------------- | |
| _psutil = None # imported lazily | |
| # Per-pid blocking sample interval for `Process.cpu_percent`. The `interval=None` | |
| # path would require priming each pid with a prior call AND enough wall time | |
| # before the next call to accumulate a meaningful delta — neither holds inside | |
| # a single sampler tick. `interval=0.1` blocks 100ms per worker to measure % | |
| # directly. At cap=2 that's ~200ms per sample cycle — well within the 15-30s | |
| # sample interval budget. | |
| _PROC_CPU_SAMPLE_INTERVAL_S = 0.1 | |
| def _get_psutil(): | |
| global _psutil | |
| if _psutil is None: | |
| import psutil # type: ignore | |
| _psutil = psutil | |
| # Prime system-level cpu_percent — first call returns 0.0, subsequent | |
| # calls return the real % over the interval since last call. | |
| _psutil.cpu_percent(interval=None) | |
| return _psutil | |
| # ---------------------------------------------------------------------------- | |
| # Sample builders | |
| # ---------------------------------------------------------------------------- | |
| def _read_cgroup_int(path: str) -> int | None: | |
| """Read an integer cgroup file (bytes or usec). Returns None if missing.""" | |
| try: | |
| with open(path) as f: | |
| v = f.read().strip() | |
| return int(v) if v.isdigit() else None | |
| except Exception: | |
| return None | |
| # Module state — last cpu.stat usage_usec so we can compute delta % per sample. | |
| _last_cpu_usec: int | None = None | |
| _last_sample_mono: float | None = None | |
| def _sample_container() -> dict: | |
| """Per-container CPU + memory from cgroup v2. Reflects THIS Space only. | |
| On HF Spaces (cgroup v2, unified hierarchy): | |
| - `memory.current` / `memory.max` / `memory.peak` = this container's | |
| memory usage / limit / high-water-mark. | |
| - `cpu.max` = `"<quota_usec> <period_usec>"` — quota is the per-period | |
| CPU budget, so `quota / period` = number of cores we're entitled to. | |
| - `cpu.stat::usage_usec` is a monotonic CPU-microseconds-used counter. | |
| Delta between samples / (sample_interval * quota_cores) = % of quota. | |
| """ | |
| global _last_cpu_usec, _last_sample_mono | |
| out: dict = {} | |
| # Memory (from cgroup v2) | |
| mem_current = _read_cgroup_int("/sys/fs/cgroup/memory.current") | |
| mem_max = _read_cgroup_int("/sys/fs/cgroup/memory.max") | |
| mem_peak = _read_cgroup_int("/sys/fs/cgroup/memory.peak") | |
| if mem_current is not None: | |
| out["mem_current_mb"] = round(mem_current / (1024 * 1024), 1) | |
| if mem_max: | |
| out["mem_max_mb"] = round(mem_max / (1024 * 1024), 1) | |
| out["mem_used_pct"] = round(100.0 * mem_current / mem_max, 1) | |
| if mem_peak is not None: | |
| out["mem_peak_mb"] = round(mem_peak / (1024 * 1024), 1) | |
| # CPU quota (cores) from cpu.max | |
| try: | |
| with open("/sys/fs/cgroup/cpu.max") as f: | |
| parts = f.read().strip().split() | |
| quota_us = int(parts[0]) if parts[0].isdigit() else None | |
| period_us = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else 100_000 | |
| if quota_us: | |
| out["cpu_quota_cores"] = round(quota_us / period_us, 2) | |
| except Exception: | |
| quota_us = None | |
| period_us = 100_000 | |
| # CPU usage delta — requires two samples | |
| cur_usec = None | |
| try: | |
| with open("/sys/fs/cgroup/cpu.stat") as f: | |
| for line in f: | |
| if line.startswith("usage_usec"): | |
| cur_usec = int(line.split()[1]) | |
| break | |
| except Exception: | |
| pass | |
| import time as _t | |
| now_mono = _t.monotonic() | |
| if cur_usec is not None and _last_cpu_usec is not None and _last_sample_mono is not None: | |
| dt_s = max(0.001, now_mono - _last_sample_mono) | |
| d_usec = max(0, cur_usec - _last_cpu_usec) | |
| out["cpu_used_usec_delta"] = d_usec | |
| if quota_us: | |
| quota_cores = quota_us / period_us | |
| # % of OUR quota used over the sample interval | |
| out["cpu_used_pct_of_quota"] = round( | |
| 100.0 * (d_usec / 1e6) / (dt_s * quota_cores), 1 | |
| ) | |
| _last_cpu_usec = cur_usec | |
| _last_sample_mono = now_mono | |
| return out | |
| def _sample_host() -> dict: | |
| """Physical-HOST global stats (shared across all tenants on the box). | |
| These are NOT about this Space — on ZeroGPU the physical host has ~192 | |
| vCPU / 2 TB RAM shared among many tenants. Useful only as noisy-neighbor | |
| context. Per-container truth lives in `container`. | |
| """ | |
| try: | |
| ps = _get_psutil() | |
| load_1m = None | |
| try: | |
| load_1m = os.getloadavg()[0] | |
| except (AttributeError, OSError): | |
| pass | |
| return { | |
| "cpu_percent_global": round(float(ps.cpu_percent(interval=None)), 1), | |
| "load_1m_global": round(load_1m, 2) if load_1m is not None else None, | |
| } | |
| except Exception as e: | |
| return {"error": f"{type(e).__name__}: {e}"} | |
| def _sample_pool() -> Optional[dict]: | |
| """Persistent-pool snapshot. Returns None if pool isn't started.""" | |
| try: | |
| from .cpu_worker_pool import is_started, stats | |
| except Exception: | |
| return None | |
| if not is_started(): | |
| return None | |
| try: | |
| s = stats() | |
| except Exception as e: | |
| return {"error": f"stats failed: {type(e).__name__}: {e}"} | |
| ps = _get_psutil() | |
| workers = [] | |
| for w in s.get("workers", []): | |
| pid = w.get("pid") | |
| cpu_pct = 0.0 | |
| rss_mb = 0.0 | |
| if pid: | |
| try: | |
| proc = ps.Process(pid) | |
| cpu_pct = round(float(proc.cpu_percent(interval=_PROC_CPU_SAMPLE_INTERVAL_S)), 1) | |
| rss_mb = round(proc.memory_info().rss / (1024 * 1024), 1) | |
| except Exception: | |
| pass | |
| workers.append({ | |
| "worker_id": w.get("id"), | |
| "pid": pid, | |
| "cpu_percent": cpu_pct, | |
| "rss_mb": rss_mb, | |
| "is_busy": bool(w.get("is_busy")), | |
| "alive": bool(w.get("alive")), | |
| "total_jobs": int(w.get("total_jobs", 0)), | |
| }) | |
| # Semaphore info (capacity + current holders). Best-effort. | |
| cap = len(s.get("workers", [])) | |
| try: | |
| from .zero_gpu import _get_subprocess_semaphore | |
| sem = _get_subprocess_semaphore() | |
| cap = getattr(sem, "capacity", cap) | |
| except Exception: | |
| pass | |
| return { | |
| "mode": "persistent", | |
| "concurrency_cap": cap, | |
| "active_workers": s.get("busy_count", 0), | |
| "free_workers": s.get("free_count", 0), | |
| "n_workers": s.get("n_workers", 0), | |
| "respawn_count": int(s.get("respawn_count", 0)), | |
| "workers": workers, | |
| } | |
| def build_sample() -> dict: | |
| """Produce one telemetry row dict. Pure function — no side effects. | |
| Side note: `_sample_container()` mutates module state (`_last_cpu_usec`) | |
| to compute CPU delta across samples. It's re-entrant for a single sampler | |
| thread; do not call concurrently. | |
| """ | |
| from config import TELEMETRY_SCHEMA_VERSION | |
| return { | |
| "timestamp": datetime.now().isoformat(timespec="seconds"), | |
| "schema_version": TELEMETRY_SCHEMA_VERSION, | |
| "space": os.environ.get("SPACE_ID", "local"), | |
| "container": _sample_container(), | |
| "host": _sample_host(), | |
| "pool": _sample_pool(), | |
| } | |
| # ---------------------------------------------------------------------------- | |
| # Sampler thread | |
| # ---------------------------------------------------------------------------- | |
| _sampler_thread: Optional[threading.Thread] = None | |
| _sampler_stop = threading.Event() | |
| def _sampler_loop(sample_every_s: int) -> None: | |
| """Daemon loop: sample → serialize → append to scheduler.""" | |
| import json | |
| from .usage_logger import _ensure_schedulers, _telemetry_scheduler # lazy | |
| # Prime host-level cpu_percent so first real sample isn't 0.0 | |
| _get_psutil() | |
| time.sleep(1.0) | |
| while not _sampler_stop.is_set(): | |
| t0 = time.monotonic() | |
| try: | |
| _ensure_schedulers() | |
| row_obj = build_sample() | |
| # Flatten JSON sub-dicts to strings (match the parquet schema) | |
| row = { | |
| "timestamp": row_obj["timestamp"], | |
| "schema_version": row_obj["schema_version"], | |
| "space": row_obj["space"], | |
| "container": json.dumps(row_obj["container"]), | |
| "host": json.dumps(row_obj["host"]), | |
| "pool": json.dumps(row_obj["pool"]) if row_obj["pool"] is not None else None, | |
| } | |
| from .usage_logger import get_telemetry_scheduler | |
| sch = get_telemetry_scheduler() | |
| if sch is not None: | |
| sch.append(row) | |
| except Exception as e: | |
| print(f"[TELEMETRY] sample failed: {type(e).__name__}: {e}") | |
| # Sleep the remainder of the interval (tolerate slow samples) | |
| elapsed = time.monotonic() - t0 | |
| remaining = max(0.0, sample_every_s - elapsed) | |
| _sampler_stop.wait(timeout=remaining) | |
| def start_sampler() -> None: | |
| """Start the telemetry daemon thread. Idempotent.""" | |
| global _sampler_thread | |
| if _sampler_thread is not None and _sampler_thread.is_alive(): | |
| return | |
| try: | |
| from config import TELEMETRY_ENABLED, TELEMETRY_SAMPLE_SECONDS | |
| except Exception as e: | |
| print(f"[TELEMETRY] config import failed: {e}") | |
| return | |
| if not TELEMETRY_ENABLED: | |
| print("[TELEMETRY] disabled via TELEMETRY_ENABLED=0") | |
| return | |
| _sampler_stop.clear() | |
| _sampler_thread = threading.Thread( | |
| target=_sampler_loop, args=(TELEMETRY_SAMPLE_SECONDS,), | |
| name="telemetry-sampler", daemon=True, | |
| ) | |
| _sampler_thread.start() | |
| print(f"[TELEMETRY] sampler started (sample_every={TELEMETRY_SAMPLE_SECONDS}s)") | |
| def stop_sampler() -> None: | |
| """Signal the sampler to stop at its next tick. Best-effort.""" | |
| _sampler_stop.set() | |