| """Per-query emissions tracker for inference calls. |
| |
| Records every LLM and ML-inference call made during a single query and |
| summarizes: |
| - wallclock duration per call |
| - prompt + completion tokens (LLM) |
| - energy in watt-hours, **measured from the L4 GPU when available** |
| (the inference proxy reports per-call `X-GPU-Power-W` / |
| `X-GPU-Energy-J` headers from a 100 ms-cadence NVML sampler). |
| Falls back to a duration × data-sheet-power estimate when the |
| proxy is unreachable / NVML init failed / call went to a backend |
| that doesn't surface power readings. |
| |
| Each call record carries a `measured: bool` flag indicating which path |
| was used, so the UI can disclose. `summarize()` aggregates total Wh, |
| total tokens, by-kind and by-hardware splits — no cloud comparison. |
| |
| Thread propagation |
| ------------------ |
| The tracker is held in a thread-local. The dispatch layer |
| (web/main.py) installs one per request; `app/fsm.py:iter_steps` |
| captures and re-installs it on the FSM runner thread (mirroring the |
| existing `_captured_token_cb` pattern). Worker threads spawned inside |
| specialists (prithvi_live, eo_chip_cache) inherit nothing — those calls |
| are silently dropped, which is acceptable: those specialists do <1 s of |
| inference each and are off the hot path for the energy story. |
| """ |
| from __future__ import annotations |
|
|
| import threading |
| from typing import Any |
|
|
| |
| |
| |
| |
| |
| HARDWARE: dict[str, tuple[str, float, str]] = { |
| "nvidia_l4": ( |
| "NVIDIA L4", |
| 60.0, |
| "NVIDIA L4 Tensor Core GPU data sheet (72 W TGP, Ada Lovelace, " |
| "24 GB); ~60 W sustained during transformer inference. The " |
| "active backend for the Riprap inference Space " |
| "(msradam/riprap-vllm). When the proxy is reachable and NVML " |
| "is initialized, real per-call power is read off the device " |
| "via nvmlDeviceGetPowerUsage and this fallback is unused.", |
| ), |
| "amd_mi300x": ( |
| "AMD MI300X", |
| 600.0, |
| "AMD Instinct MI300X data sheet (750 W TDP); ~600 W sustained " |
| "during vLLM generation. Selected only when an operator deploys " |
| "against an MI300X droplet and sets RIPRAP_HARDWARE_LABEL=AMD " |
| "MI300X explicitly. The hackathon submission used to run on " |
| "this hardware; the droplet was decommissioned 2026-05-06.", |
| ), |
| "nvidia_t4": ( |
| "NVIDIA T4", |
| 50.0, |
| "NVIDIA T4 data sheet (70 W max); ~50 W sustained during " |
| "transformer inference.", |
| ), |
| "apple_m": ( |
| "Apple M-series", |
| 20.0, |
| "ml.energy / community measurements: ~20 W package power " |
| "during Granite 4.1 q4_K_M inference on Apple M3/M4 (the " |
| "local-dev path, no remote backend configured).", |
| ), |
| "cpu_server": ( |
| "x86 CPU", |
| 30.0, |
| "Typical sustained x86 server-core load (~30 W) for CPU-only " |
| "inference fallbacks.", |
| ), |
| } |
|
|
|
|
| def _wh(power_w: float, duration_s: float) -> float: |
| return power_w * max(duration_s, 0.0) / 3600.0 |
|
|
|
|
| class Tracker: |
| """Append-only call ledger for one query. Thread-safe.""" |
|
|
| def __init__(self) -> None: |
| self.calls: list[dict[str, Any]] = [] |
| self._lock = threading.Lock() |
|
|
| def _record(self, *, base: dict[str, Any], hardware: str, |
| duration_s: float, |
| joules_real: float | None, |
| power_w_real: float | None) -> None: |
| """Shared body of record_llm / record_ml. |
| |
| When `joules_real` is provided (NVML-derived from the proxy), |
| we use it directly and stamp `measured=True`. Otherwise we |
| fall back to the data-sheet sustained-power estimate. |
| """ |
| hw_label, fallback_w, _src = HARDWARE.get(hardware, |
| HARDWARE["cpu_server"]) |
| if joules_real is not None and joules_real >= 0: |
| joules = float(joules_real) |
| wh = joules / 3600.0 |
| measured = True |
| avg_w = (joules / duration_s) if duration_s > 0 else ( |
| power_w_real if power_w_real is not None else fallback_w) |
| else: |
| avg_w = fallback_w |
| wh = _wh(avg_w, duration_s) |
| joules = wh * 3600.0 |
| measured = False |
| record = { |
| **base, |
| "hardware": hardware, |
| "hardware_label": hw_label, |
| "power_w": round(avg_w, 2), |
| "duration_s": round(duration_s, 3), |
| "measured": measured, |
| "wh": round(wh, 5), |
| "joules": round(joules, 3), |
| } |
| with self._lock: |
| self.calls.append(record) |
|
|
| def record_llm(self, *, model: str, backend: str, hardware: str, |
| prompt_tokens: int | None, |
| completion_tokens: int | None, |
| duration_s: float, |
| stream: bool = False, |
| joules_real: float | None = None, |
| power_w_real: float | None = None) -> None: |
| total = None |
| if prompt_tokens is not None or completion_tokens is not None: |
| total = (prompt_tokens or 0) + (completion_tokens or 0) |
| self._record( |
| base={ |
| "kind": "llm", |
| "model": model, |
| "backend": backend, |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": completion_tokens, |
| "total_tokens": total, |
| "stream": stream, |
| }, |
| hardware=hardware, |
| duration_s=duration_s, |
| joules_real=joules_real, |
| power_w_real=power_w_real, |
| ) |
|
|
| def record_ml(self, *, endpoint: str, backend: str, hardware: str, |
| duration_s: float, |
| joules_real: float | None = None, |
| power_w_real: float | None = None) -> None: |
| self._record( |
| base={ |
| "kind": "ml", |
| "endpoint": endpoint, |
| "backend": backend, |
| }, |
| hardware=hardware, |
| duration_s=duration_s, |
| joules_real=joules_real, |
| power_w_real=power_w_real, |
| ) |
|
|
| def summarize(self) -> dict[str, Any]: |
| with self._lock: |
| calls = list(self.calls) |
| total_wh = sum(c["wh"] for c in calls) |
| total_dur = sum(c["duration_s"] for c in calls) |
| n_measured = sum(1 for c in calls if c.get("measured")) |
| prompt = sum((c.get("prompt_tokens") or 0) |
| for c in calls if c["kind"] == "llm") |
| completion = sum((c.get("completion_tokens") or 0) |
| for c in calls if c["kind"] == "llm") |
|
|
| by_kind: dict[str, dict[str, Any]] = {} |
| for c in calls: |
| slot = by_kind.setdefault(c["kind"], {"wh": 0.0, "n": 0, |
| "duration_s": 0.0}) |
| slot["wh"] += c["wh"] |
| slot["n"] += 1 |
| slot["duration_s"] += c["duration_s"] |
| for slot in by_kind.values(): |
| slot["wh"] = round(slot["wh"], 5) |
| slot["mwh"] = round(slot["wh"] * 1000, 2) |
| slot["duration_s"] = round(slot["duration_s"], 3) |
|
|
| by_hw: dict[str, dict[str, Any]] = {} |
| for c in calls: |
| slot = by_hw.setdefault(c["hardware"], { |
| "label": c["hardware_label"], |
| "wh": 0.0, "n": 0, "duration_s": 0.0, |
| }) |
| slot["wh"] += c["wh"] |
| slot["n"] += 1 |
| slot["duration_s"] += c["duration_s"] |
| for slot in by_hw.values(): |
| slot["wh"] = round(slot["wh"], 5) |
| slot["mwh"] = round(slot["wh"] * 1000, 2) |
| slot["duration_s"] = round(slot["duration_s"], 3) |
|
|
| return { |
| "n_calls": len(calls), |
| "n_measured": n_measured, |
| "total_wh": round(total_wh, 5), |
| "total_mwh": round(total_wh * 1000, 2), |
| "total_joules": round(total_wh * 3600, 1), |
| "total_duration_s": round(total_dur, 3), |
| "tokens": { |
| "prompt": prompt or None, |
| "completion": completion or None, |
| "total": (prompt + completion) or None, |
| }, |
| "by_kind": by_kind, |
| "by_hardware": by_hw, |
| "calls": calls, |
| "method": ( |
| "Energy is read off the L4 GPU per call via " |
| "nvmlDeviceGetPowerUsage on the inference proxy " |
| "(X-GPU-Energy-J response header). Calls flagged " |
| "measured=false fall back to " |
| "(data-sheet sustained_power_w × duration_s ÷ 3600) " |
| "— see app/emissions.HARDWARE for sources. Tokens " |
| "are reported by the backend (LiteLLM usage) when " |
| "available, else estimated from response text length " |
| "(~4 chars/token)." |
| ), |
| } |
|
|
|
|
| |
| |
| _tl = threading.local() |
|
|
|
|
| class _NullTracker: |
| def record_llm(self, **_kw: Any) -> None: |
| return None |
|
|
| def record_ml(self, **_kw: Any) -> None: |
| return None |
|
|
|
|
| _NULL = _NullTracker() |
|
|
|
|
| def install(tracker: Tracker | None) -> None: |
| _tl.tracker = tracker |
|
|
|
|
| def current() -> Tracker | None: |
| return getattr(_tl, "tracker", None) |
|
|
|
|
| def active() -> Tracker | _NullTracker: |
| """Return the installed tracker for this thread, or a no-op stub. |
| Always safe to call in instrumentation hot paths.""" |
| return getattr(_tl, "tracker", None) or _NULL |
|
|
|
|
| def estimate_completion_tokens(text: str) -> int: |
| """Rough char/4 estimator used when the backend doesn't report usage |
| (e.g. streaming through Ollama, where LiteLLM's stream wrapper does |
| not always surface a final usage block).""" |
| return max(1, len(text) // 4) |
|
|