Spaces:
Running on Zero
Running on Zero
File size: 11,361 Bytes
4a0777e 22d5ea2 4a0777e 22d5ea2 4a0777e 22d5ea2 4a0777e 22d5ea2 4a0777e 419fe6e 4a0777e 22d5ea2 4a0777e b163a55 22d5ea2 4a0777e b163a55 22d5ea2 4a0777e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | """Periodic host + CPU-worker-pool telemetry sampler.
V3.1 scope: persistent subprocess pool only. Spawn mode + remote worker
dispatch are deferred β this sampler reads from `cpu_worker_pool.stats()`
directly and skips if the pool isn't started.
A background daemon thread wakes every `TELEMETRY_SAMPLE_SECONDS`, reads:
- host: CPU%, load_1m, memory, swap (via psutil)
- pool: active/free counts, per-worker CPU%/RSS/is_busy/total_jobs
and appends a row to the telemetry Parquet scheduler. The scheduler pushes
every `TELEMETRY_FLUSH_MINUTES`.
Rows are independent of per-request log rows β analysts join by timestamp
window (for each request row in `-logs` dataset, telemetry rows with
`timestamp β [request.timestamp, request.timestamp + request.wall_total_s]`
describe what the host was doing during that run).
"""
from __future__ import annotations
import os
import threading
import time
from datetime import datetime
from typing import Any, Optional
# ----------------------------------------------------------------------------
# psutil lazy init β the first `cpu_percent()` call always returns 0.0 (it
# measures the delta since the previous call). Prime process-level probes on
# startup so the first telemetry sample reports a meaningful number.
# ----------------------------------------------------------------------------
_psutil = None # imported lazily
# Per-pid blocking sample interval for `Process.cpu_percent`. The `interval=None`
# path would require priming each pid with a prior call AND enough wall time
# before the next call to accumulate a meaningful delta β neither holds inside
# a single sampler tick. `interval=0.1` blocks 100ms per worker to measure %
# directly. At cap=2 that's ~200ms per sample cycle β well within the 15-30s
# sample interval budget.
_PROC_CPU_SAMPLE_INTERVAL_S = 0.1
def _get_psutil():
global _psutil
if _psutil is None:
import psutil # type: ignore
_psutil = psutil
# Prime system-level cpu_percent β first call returns 0.0, subsequent
# calls return the real % over the interval since last call.
_psutil.cpu_percent(interval=None)
return _psutil
# ----------------------------------------------------------------------------
# Sample builders
# ----------------------------------------------------------------------------
def _read_cgroup_int(path: str) -> int | None:
"""Read an integer cgroup file (bytes or usec). Returns None if missing."""
try:
with open(path) as f:
v = f.read().strip()
return int(v) if v.isdigit() else None
except Exception:
return None
# Module state β last cpu.stat usage_usec so we can compute delta % per sample.
_last_cpu_usec: int | None = None
_last_sample_mono: float | None = None
def _sample_container() -> dict:
"""Per-container CPU + memory from cgroup v2. Reflects THIS Space only.
On HF Spaces (cgroup v2, unified hierarchy):
- `memory.current` / `memory.max` / `memory.peak` = this container's
memory usage / limit / high-water-mark.
- `cpu.max` = `"<quota_usec> <period_usec>"` β quota is the per-period
CPU budget, so `quota / period` = number of cores we're entitled to.
- `cpu.stat::usage_usec` is a monotonic CPU-microseconds-used counter.
Delta between samples / (sample_interval * quota_cores) = % of quota.
"""
global _last_cpu_usec, _last_sample_mono
out: dict = {}
# Memory (from cgroup v2)
mem_current = _read_cgroup_int("/sys/fs/cgroup/memory.current")
mem_max = _read_cgroup_int("/sys/fs/cgroup/memory.max")
mem_peak = _read_cgroup_int("/sys/fs/cgroup/memory.peak")
if mem_current is not None:
out["mem_current_mb"] = round(mem_current / (1024 * 1024), 1)
if mem_max:
out["mem_max_mb"] = round(mem_max / (1024 * 1024), 1)
out["mem_used_pct"] = round(100.0 * mem_current / mem_max, 1)
if mem_peak is not None:
out["mem_peak_mb"] = round(mem_peak / (1024 * 1024), 1)
# CPU quota (cores) from cpu.max
try:
with open("/sys/fs/cgroup/cpu.max") as f:
parts = f.read().strip().split()
quota_us = int(parts[0]) if parts[0].isdigit() else None
period_us = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else 100_000
if quota_us:
out["cpu_quota_cores"] = round(quota_us / period_us, 2)
except Exception:
quota_us = None
period_us = 100_000
# CPU usage delta β requires two samples
cur_usec = None
try:
with open("/sys/fs/cgroup/cpu.stat") as f:
for line in f:
if line.startswith("usage_usec"):
cur_usec = int(line.split()[1])
break
except Exception:
pass
import time as _t
now_mono = _t.monotonic()
if cur_usec is not None and _last_cpu_usec is not None and _last_sample_mono is not None:
dt_s = max(0.001, now_mono - _last_sample_mono)
d_usec = max(0, cur_usec - _last_cpu_usec)
out["cpu_used_usec_delta"] = d_usec
if quota_us:
quota_cores = quota_us / period_us
# % of OUR quota used over the sample interval
out["cpu_used_pct_of_quota"] = round(
100.0 * (d_usec / 1e6) / (dt_s * quota_cores), 1
)
_last_cpu_usec = cur_usec
_last_sample_mono = now_mono
return out
def _sample_host() -> dict:
"""Physical-HOST global stats (shared across all tenants on the box).
These are NOT about this Space β on ZeroGPU the physical host has ~192
vCPU / 2 TB RAM shared among many tenants. Useful only as noisy-neighbor
context. Per-container truth lives in `container`.
"""
try:
ps = _get_psutil()
load_1m = None
try:
load_1m = os.getloadavg()[0]
except (AttributeError, OSError):
pass
return {
"cpu_percent_global": round(float(ps.cpu_percent(interval=None)), 1),
"load_1m_global": round(load_1m, 2) if load_1m is not None else None,
}
except Exception as e:
return {"error": f"{type(e).__name__}: {e}"}
def _sample_pool() -> Optional[dict]:
"""Persistent-pool snapshot. Returns None if pool isn't started."""
try:
from .cpu_worker_pool import is_started, stats
except Exception:
return None
if not is_started():
return None
try:
s = stats()
except Exception as e:
return {"error": f"stats failed: {type(e).__name__}: {e}"}
ps = _get_psutil()
workers = []
for w in s.get("workers", []):
pid = w.get("pid")
cpu_pct = 0.0
rss_mb = 0.0
if pid:
try:
proc = ps.Process(pid)
cpu_pct = round(float(proc.cpu_percent(interval=_PROC_CPU_SAMPLE_INTERVAL_S)), 1)
rss_mb = round(proc.memory_info().rss / (1024 * 1024), 1)
except Exception:
pass
workers.append({
"worker_id": w.get("id"),
"pid": pid,
"cpu_percent": cpu_pct,
"rss_mb": rss_mb,
"is_busy": bool(w.get("is_busy")),
"alive": bool(w.get("alive")),
"total_jobs": int(w.get("total_jobs", 0)),
})
# Semaphore info (capacity + current holders). Best-effort.
cap = len(s.get("workers", []))
try:
from .zero_gpu import _get_subprocess_semaphore
sem = _get_subprocess_semaphore()
cap = getattr(sem, "capacity", cap)
except Exception:
pass
return {
"mode": "persistent",
"concurrency_cap": cap,
"active_workers": s.get("busy_count", 0),
"free_workers": s.get("free_count", 0),
"n_workers": s.get("n_workers", 0),
"respawn_count": int(s.get("respawn_count", 0)),
"workers": workers,
}
def build_sample() -> dict:
"""Produce one telemetry row dict. Pure function β no side effects.
Side note: `_sample_container()` mutates module state (`_last_cpu_usec`)
to compute CPU delta across samples. It's re-entrant for a single sampler
thread; do not call concurrently.
"""
from config import TELEMETRY_SCHEMA_VERSION
return {
"timestamp": datetime.now().isoformat(timespec="seconds"),
"schema_version": TELEMETRY_SCHEMA_VERSION,
"space": os.environ.get("SPACE_ID", "local"),
"container": _sample_container(),
"host": _sample_host(),
"pool": _sample_pool(),
}
# ----------------------------------------------------------------------------
# Sampler thread
# ----------------------------------------------------------------------------
_sampler_thread: Optional[threading.Thread] = None
_sampler_stop = threading.Event()
def _sampler_loop(sample_every_s: int) -> None:
"""Daemon loop: sample β serialize β append to scheduler."""
import json
from .usage_logger import _ensure_schedulers, _telemetry_scheduler # lazy
# Prime host-level cpu_percent so first real sample isn't 0.0
_get_psutil()
time.sleep(1.0)
while not _sampler_stop.is_set():
t0 = time.monotonic()
try:
_ensure_schedulers()
row_obj = build_sample()
# Flatten JSON sub-dicts to strings (match the parquet schema)
row = {
"timestamp": row_obj["timestamp"],
"schema_version": row_obj["schema_version"],
"space": row_obj["space"],
"container": json.dumps(row_obj["container"]),
"host": json.dumps(row_obj["host"]),
"pool": json.dumps(row_obj["pool"]) if row_obj["pool"] is not None else None,
}
from .usage_logger import get_telemetry_scheduler
sch = get_telemetry_scheduler()
if sch is not None:
sch.append(row)
except Exception as e:
print(f"[TELEMETRY] sample failed: {type(e).__name__}: {e}")
# Sleep the remainder of the interval (tolerate slow samples)
elapsed = time.monotonic() - t0
remaining = max(0.0, sample_every_s - elapsed)
_sampler_stop.wait(timeout=remaining)
def start_sampler() -> None:
"""Start the telemetry daemon thread. Idempotent."""
global _sampler_thread
if _sampler_thread is not None and _sampler_thread.is_alive():
return
try:
from config import TELEMETRY_ENABLED, TELEMETRY_SAMPLE_SECONDS
except Exception as e:
print(f"[TELEMETRY] config import failed: {e}")
return
if not TELEMETRY_ENABLED:
print("[TELEMETRY] disabled via TELEMETRY_ENABLED=0")
return
_sampler_stop.clear()
_sampler_thread = threading.Thread(
target=_sampler_loop, args=(TELEMETRY_SAMPLE_SECONDS,),
name="telemetry-sampler", daemon=True,
)
_sampler_thread.start()
print(f"[TELEMETRY] sampler started (sample_every={TELEMETRY_SAMPLE_SECONDS}s)")
def stop_sampler() -> None:
"""Signal the sampler to stop at its next tick. Best-effort."""
_sampler_stop.set()
|