File size: 11,361 Bytes
4a0777e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22d5ea2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a0777e
22d5ea2
 
 
 
 
 
4a0777e
 
 
 
 
 
22d5ea2
4a0777e
22d5ea2
 
4a0777e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
419fe6e
4a0777e
 
 
 
 
22d5ea2
 
 
 
 
 
4a0777e
 
 
 
b163a55
22d5ea2
4a0777e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b163a55
22d5ea2
4a0777e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
"""Periodic host + CPU-worker-pool telemetry sampler.

V3.1 scope: persistent subprocess pool only. Spawn mode + remote worker
dispatch are deferred β€” this sampler reads from `cpu_worker_pool.stats()`
directly and skips if the pool isn't started.

A background daemon thread wakes every `TELEMETRY_SAMPLE_SECONDS`, reads:
  - host: CPU%, load_1m, memory, swap (via psutil)
  - pool: active/free counts, per-worker CPU%/RSS/is_busy/total_jobs
and appends a row to the telemetry Parquet scheduler. The scheduler pushes
every `TELEMETRY_FLUSH_MINUTES`.

Rows are independent of per-request log rows β€” analysts join by timestamp
window (for each request row in `-logs` dataset, telemetry rows with
`timestamp ∈ [request.timestamp, request.timestamp + request.wall_total_s]`
describe what the host was doing during that run).
"""

from __future__ import annotations

import os
import threading
import time
from datetime import datetime
from typing import Any, Optional


# ----------------------------------------------------------------------------
# psutil lazy init β€” the first `cpu_percent()` call always returns 0.0 (it
# measures the delta since the previous call). Prime process-level probes on
# startup so the first telemetry sample reports a meaningful number.
# ----------------------------------------------------------------------------

_psutil = None  # imported lazily

# Per-pid blocking sample interval for `Process.cpu_percent`. The `interval=None`
# path would require priming each pid with a prior call AND enough wall time
# before the next call to accumulate a meaningful delta β€” neither holds inside
# a single sampler tick. `interval=0.1` blocks 100ms per worker to measure %
# directly. At cap=2 that's ~200ms per sample cycle β€” well within the 15-30s
# sample interval budget.
_PROC_CPU_SAMPLE_INTERVAL_S = 0.1


def _get_psutil():
    global _psutil
    if _psutil is None:
        import psutil  # type: ignore
        _psutil = psutil
        # Prime system-level cpu_percent β€” first call returns 0.0, subsequent
        # calls return the real % over the interval since last call.
        _psutil.cpu_percent(interval=None)
    return _psutil


# ----------------------------------------------------------------------------
# Sample builders
# ----------------------------------------------------------------------------


def _read_cgroup_int(path: str) -> int | None:
    """Read an integer cgroup file (bytes or usec). Returns None if missing."""
    try:
        with open(path) as f:
            v = f.read().strip()
        return int(v) if v.isdigit() else None
    except Exception:
        return None


# Module state β€” last cpu.stat usage_usec so we can compute delta % per sample.
_last_cpu_usec: int | None = None
_last_sample_mono: float | None = None


def _sample_container() -> dict:
    """Per-container CPU + memory from cgroup v2. Reflects THIS Space only.

    On HF Spaces (cgroup v2, unified hierarchy):
      - `memory.current` / `memory.max` / `memory.peak` = this container's
        memory usage / limit / high-water-mark.
      - `cpu.max` = `"<quota_usec> <period_usec>"` β€” quota is the per-period
        CPU budget, so `quota / period` = number of cores we're entitled to.
      - `cpu.stat::usage_usec` is a monotonic CPU-microseconds-used counter.
        Delta between samples / (sample_interval * quota_cores) = % of quota.
    """
    global _last_cpu_usec, _last_sample_mono
    out: dict = {}

    # Memory (from cgroup v2)
    mem_current = _read_cgroup_int("/sys/fs/cgroup/memory.current")
    mem_max     = _read_cgroup_int("/sys/fs/cgroup/memory.max")
    mem_peak    = _read_cgroup_int("/sys/fs/cgroup/memory.peak")
    if mem_current is not None:
        out["mem_current_mb"] = round(mem_current / (1024 * 1024), 1)
        if mem_max:
            out["mem_max_mb"]   = round(mem_max / (1024 * 1024), 1)
            out["mem_used_pct"] = round(100.0 * mem_current / mem_max, 1)
        if mem_peak is not None:
            out["mem_peak_mb"] = round(mem_peak / (1024 * 1024), 1)

    # CPU quota (cores) from cpu.max
    try:
        with open("/sys/fs/cgroup/cpu.max") as f:
            parts = f.read().strip().split()
        quota_us = int(parts[0]) if parts[0].isdigit() else None
        period_us = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else 100_000
        if quota_us:
            out["cpu_quota_cores"] = round(quota_us / period_us, 2)
    except Exception:
        quota_us = None
        period_us = 100_000

    # CPU usage delta β€” requires two samples
    cur_usec = None
    try:
        with open("/sys/fs/cgroup/cpu.stat") as f:
            for line in f:
                if line.startswith("usage_usec"):
                    cur_usec = int(line.split()[1])
                    break
    except Exception:
        pass

    import time as _t
    now_mono = _t.monotonic()
    if cur_usec is not None and _last_cpu_usec is not None and _last_sample_mono is not None:
        dt_s = max(0.001, now_mono - _last_sample_mono)
        d_usec = max(0, cur_usec - _last_cpu_usec)
        out["cpu_used_usec_delta"] = d_usec
        if quota_us:
            quota_cores = quota_us / period_us
            # % of OUR quota used over the sample interval
            out["cpu_used_pct_of_quota"] = round(
                100.0 * (d_usec / 1e6) / (dt_s * quota_cores), 1
            )
    _last_cpu_usec = cur_usec
    _last_sample_mono = now_mono

    return out


def _sample_host() -> dict:
    """Physical-HOST global stats (shared across all tenants on the box).

    These are NOT about this Space β€” on ZeroGPU the physical host has ~192
    vCPU / 2 TB RAM shared among many tenants. Useful only as noisy-neighbor
    context. Per-container truth lives in `container`.
    """
    try:
        ps = _get_psutil()
        load_1m = None
        try:
            load_1m = os.getloadavg()[0]
        except (AttributeError, OSError):
            pass
        return {
            "cpu_percent_global": round(float(ps.cpu_percent(interval=None)), 1),
            "load_1m_global":     round(load_1m, 2) if load_1m is not None else None,
        }
    except Exception as e:
        return {"error": f"{type(e).__name__}: {e}"}


def _sample_pool() -> Optional[dict]:
    """Persistent-pool snapshot. Returns None if pool isn't started."""
    try:
        from .cpu_worker_pool import is_started, stats
    except Exception:
        return None
    if not is_started():
        return None

    try:
        s = stats()
    except Exception as e:
        return {"error": f"stats failed: {type(e).__name__}: {e}"}

    ps = _get_psutil()
    workers = []
    for w in s.get("workers", []):
        pid = w.get("pid")
        cpu_pct = 0.0
        rss_mb = 0.0
        if pid:
            try:
                proc = ps.Process(pid)
                cpu_pct = round(float(proc.cpu_percent(interval=_PROC_CPU_SAMPLE_INTERVAL_S)), 1)
                rss_mb = round(proc.memory_info().rss / (1024 * 1024), 1)
            except Exception:
                pass
        workers.append({
            "worker_id":   w.get("id"),
            "pid":         pid,
            "cpu_percent": cpu_pct,
            "rss_mb":      rss_mb,
            "is_busy":     bool(w.get("is_busy")),
            "alive":       bool(w.get("alive")),
            "total_jobs":  int(w.get("total_jobs", 0)),
        })

    # Semaphore info (capacity + current holders). Best-effort.
    cap = len(s.get("workers", []))
    try:
        from .zero_gpu import _get_subprocess_semaphore
        sem = _get_subprocess_semaphore()
        cap = getattr(sem, "capacity", cap)
    except Exception:
        pass

    return {
        "mode":            "persistent",
        "concurrency_cap": cap,
        "active_workers":  s.get("busy_count", 0),
        "free_workers":    s.get("free_count", 0),
        "n_workers":       s.get("n_workers", 0),
        "respawn_count":   int(s.get("respawn_count", 0)),
        "workers":         workers,
    }


def build_sample() -> dict:
    """Produce one telemetry row dict. Pure function β€” no side effects.

    Side note: `_sample_container()` mutates module state (`_last_cpu_usec`)
    to compute CPU delta across samples. It's re-entrant for a single sampler
    thread; do not call concurrently.
    """
    from config import TELEMETRY_SCHEMA_VERSION
    return {
        "timestamp":      datetime.now().isoformat(timespec="seconds"),
        "schema_version": TELEMETRY_SCHEMA_VERSION,
        "space":          os.environ.get("SPACE_ID", "local"),
        "container":      _sample_container(),
        "host":           _sample_host(),
        "pool":           _sample_pool(),
    }


# ----------------------------------------------------------------------------
# Sampler thread
# ----------------------------------------------------------------------------

_sampler_thread: Optional[threading.Thread] = None
_sampler_stop = threading.Event()


def _sampler_loop(sample_every_s: int) -> None:
    """Daemon loop: sample β†’ serialize β†’ append to scheduler."""
    import json
    from .usage_logger import _ensure_schedulers, _telemetry_scheduler  # lazy

    # Prime host-level cpu_percent so first real sample isn't 0.0
    _get_psutil()
    time.sleep(1.0)

    while not _sampler_stop.is_set():
        t0 = time.monotonic()
        try:
            _ensure_schedulers()
            row_obj = build_sample()
            # Flatten JSON sub-dicts to strings (match the parquet schema)
            row = {
                "timestamp":      row_obj["timestamp"],
                "schema_version": row_obj["schema_version"],
                "space":          row_obj["space"],
                "container":      json.dumps(row_obj["container"]),
                "host":           json.dumps(row_obj["host"]),
                "pool":           json.dumps(row_obj["pool"]) if row_obj["pool"] is not None else None,
            }
            from .usage_logger import get_telemetry_scheduler
            sch = get_telemetry_scheduler()
            if sch is not None:
                sch.append(row)
        except Exception as e:
            print(f"[TELEMETRY] sample failed: {type(e).__name__}: {e}")

        # Sleep the remainder of the interval (tolerate slow samples)
        elapsed = time.monotonic() - t0
        remaining = max(0.0, sample_every_s - elapsed)
        _sampler_stop.wait(timeout=remaining)


def start_sampler() -> None:
    """Start the telemetry daemon thread. Idempotent."""
    global _sampler_thread
    if _sampler_thread is not None and _sampler_thread.is_alive():
        return
    try:
        from config import TELEMETRY_ENABLED, TELEMETRY_SAMPLE_SECONDS
    except Exception as e:
        print(f"[TELEMETRY] config import failed: {e}")
        return
    if not TELEMETRY_ENABLED:
        print("[TELEMETRY] disabled via TELEMETRY_ENABLED=0")
        return

    _sampler_stop.clear()
    _sampler_thread = threading.Thread(
        target=_sampler_loop, args=(TELEMETRY_SAMPLE_SECONDS,),
        name="telemetry-sampler", daemon=True,
    )
    _sampler_thread.start()
    print(f"[TELEMETRY] sampler started (sample_every={TELEMETRY_SAMPLE_SECONDS}s)")


def stop_sampler() -> None:
    """Signal the sampler to stop at its next tick. Best-effort."""
    _sampler_stop.set()