riprap / app /emissions.py
seriffic's picture
deploy(l4): self-contained Riprap mirror
3dbff85
"""Per-query emissions tracker for inference calls.
Records every LLM and ML-inference call made during a single query and
summarizes:
- wallclock duration per call
- prompt + completion tokens (LLM)
- energy in watt-hours, **measured from the L4 GPU when available**
(the inference proxy reports per-call `X-GPU-Power-W` /
`X-GPU-Energy-J` headers from a 100 ms-cadence NVML sampler).
Falls back to a duration × data-sheet-power estimate when the
proxy is unreachable / NVML init failed / call went to a backend
that doesn't surface power readings.
Each call record carries a `measured: bool` flag indicating which path
was used, so the UI can disclose. `summarize()` aggregates total Wh,
total tokens, by-kind and by-hardware splits — no cloud comparison.
Thread propagation
------------------
The tracker is held in a thread-local. The dispatch layer
(web/main.py) installs one per request; `app/fsm.py:iter_steps`
captures and re-installs it on the FSM runner thread (mirroring the
existing `_captured_token_cb` pattern). Worker threads spawned inside
specialists (prithvi_live, eo_chip_cache) inherit nothing — those calls
are silently dropped, which is acceptable: those specialists do <1 s of
inference each and are off the hot path for the energy story.
"""
from __future__ import annotations
import threading
from typing import Any
# (label, fallback_sustained_power_w, source). Used only when the
# proxy doesn't surface a real measurement (NVML disabled, backend
# unreachable, local-fallback path). The fallback figure is a
# conservative public-record estimate; the `measured: bool` flag on
# each call record indicates whether the row used the fallback.
HARDWARE: dict[str, tuple[str, float, str]] = {
"nvidia_l4": (
"NVIDIA L4",
60.0,
"NVIDIA L4 Tensor Core GPU data sheet (72 W TGP, Ada Lovelace, "
"24 GB); ~60 W sustained during transformer inference. The "
"active backend for the Riprap inference Space "
"(msradam/riprap-vllm). When the proxy is reachable and NVML "
"is initialized, real per-call power is read off the device "
"via nvmlDeviceGetPowerUsage and this fallback is unused.",
),
"amd_mi300x": (
"AMD MI300X",
600.0,
"AMD Instinct MI300X data sheet (750 W TDP); ~600 W sustained "
"during vLLM generation. Selected only when an operator deploys "
"against an MI300X droplet and sets RIPRAP_HARDWARE_LABEL=AMD "
"MI300X explicitly. The hackathon submission used to run on "
"this hardware; the droplet was decommissioned 2026-05-06.",
),
"nvidia_t4": (
"NVIDIA T4",
50.0,
"NVIDIA T4 data sheet (70 W max); ~50 W sustained during "
"transformer inference.",
),
"apple_m": (
"Apple M-series",
20.0,
"ml.energy / community measurements: ~20 W package power "
"during Granite 4.1 q4_K_M inference on Apple M3/M4 (the "
"local-dev path, no remote backend configured).",
),
"cpu_server": (
"x86 CPU",
30.0,
"Typical sustained x86 server-core load (~30 W) for CPU-only "
"inference fallbacks.",
),
}
def _wh(power_w: float, duration_s: float) -> float:
return power_w * max(duration_s, 0.0) / 3600.0
class Tracker:
"""Append-only call ledger for one query. Thread-safe."""
def __init__(self) -> None:
self.calls: list[dict[str, Any]] = []
self._lock = threading.Lock()
def _record(self, *, base: dict[str, Any], hardware: str,
duration_s: float,
joules_real: float | None,
power_w_real: float | None) -> None:
"""Shared body of record_llm / record_ml.
When `joules_real` is provided (NVML-derived from the proxy),
we use it directly and stamp `measured=True`. Otherwise we
fall back to the data-sheet sustained-power estimate.
"""
hw_label, fallback_w, _src = HARDWARE.get(hardware,
HARDWARE["cpu_server"])
if joules_real is not None and joules_real >= 0:
joules = float(joules_real)
wh = joules / 3600.0
measured = True
avg_w = (joules / duration_s) if duration_s > 0 else (
power_w_real if power_w_real is not None else fallback_w)
else:
avg_w = fallback_w
wh = _wh(avg_w, duration_s)
joules = wh * 3600.0
measured = False
record = {
**base,
"hardware": hardware,
"hardware_label": hw_label,
"power_w": round(avg_w, 2),
"duration_s": round(duration_s, 3),
"measured": measured,
"wh": round(wh, 5),
"joules": round(joules, 3),
}
with self._lock:
self.calls.append(record)
def record_llm(self, *, model: str, backend: str, hardware: str,
prompt_tokens: int | None,
completion_tokens: int | None,
duration_s: float,
stream: bool = False,
joules_real: float | None = None,
power_w_real: float | None = None) -> None:
total = None
if prompt_tokens is not None or completion_tokens is not None:
total = (prompt_tokens or 0) + (completion_tokens or 0)
self._record(
base={
"kind": "llm",
"model": model,
"backend": backend,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total,
"stream": stream,
},
hardware=hardware,
duration_s=duration_s,
joules_real=joules_real,
power_w_real=power_w_real,
)
def record_ml(self, *, endpoint: str, backend: str, hardware: str,
duration_s: float,
joules_real: float | None = None,
power_w_real: float | None = None) -> None:
self._record(
base={
"kind": "ml",
"endpoint": endpoint,
"backend": backend,
},
hardware=hardware,
duration_s=duration_s,
joules_real=joules_real,
power_w_real=power_w_real,
)
def summarize(self) -> dict[str, Any]:
with self._lock:
calls = list(self.calls)
total_wh = sum(c["wh"] for c in calls)
total_dur = sum(c["duration_s"] for c in calls)
n_measured = sum(1 for c in calls if c.get("measured"))
prompt = sum((c.get("prompt_tokens") or 0)
for c in calls if c["kind"] == "llm")
completion = sum((c.get("completion_tokens") or 0)
for c in calls if c["kind"] == "llm")
by_kind: dict[str, dict[str, Any]] = {}
for c in calls:
slot = by_kind.setdefault(c["kind"], {"wh": 0.0, "n": 0,
"duration_s": 0.0})
slot["wh"] += c["wh"]
slot["n"] += 1
slot["duration_s"] += c["duration_s"]
for slot in by_kind.values():
slot["wh"] = round(slot["wh"], 5)
slot["mwh"] = round(slot["wh"] * 1000, 2)
slot["duration_s"] = round(slot["duration_s"], 3)
by_hw: dict[str, dict[str, Any]] = {}
for c in calls:
slot = by_hw.setdefault(c["hardware"], {
"label": c["hardware_label"],
"wh": 0.0, "n": 0, "duration_s": 0.0,
})
slot["wh"] += c["wh"]
slot["n"] += 1
slot["duration_s"] += c["duration_s"]
for slot in by_hw.values():
slot["wh"] = round(slot["wh"], 5)
slot["mwh"] = round(slot["wh"] * 1000, 2)
slot["duration_s"] = round(slot["duration_s"], 3)
return {
"n_calls": len(calls),
"n_measured": n_measured,
"total_wh": round(total_wh, 5),
"total_mwh": round(total_wh * 1000, 2),
"total_joules": round(total_wh * 3600, 1),
"total_duration_s": round(total_dur, 3),
"tokens": {
"prompt": prompt or None,
"completion": completion or None,
"total": (prompt + completion) or None,
},
"by_kind": by_kind,
"by_hardware": by_hw,
"calls": calls,
"method": (
"Energy is read off the L4 GPU per call via "
"nvmlDeviceGetPowerUsage on the inference proxy "
"(X-GPU-Energy-J response header). Calls flagged "
"measured=false fall back to "
"(data-sheet sustained_power_w × duration_s ÷ 3600) "
"— see app/emissions.HARDWARE for sources. Tokens "
"are reported by the backend (LiteLLM usage) when "
"available, else estimated from response text length "
"(~4 chars/token)."
),
}
# Thread-local install. Calls made on threads without an installed
# tracker hit a no-op stub — always safe to call active().record_*().
_tl = threading.local()
class _NullTracker:
def record_llm(self, **_kw: Any) -> None:
return None
def record_ml(self, **_kw: Any) -> None:
return None
_NULL = _NullTracker()
def install(tracker: Tracker | None) -> None:
_tl.tracker = tracker
def current() -> Tracker | None:
return getattr(_tl, "tracker", None)
def active() -> Tracker | _NullTracker:
"""Return the installed tracker for this thread, or a no-op stub.
Always safe to call in instrumentation hot paths."""
return getattr(_tl, "tracker", None) or _NULL
def estimate_completion_tokens(text: str) -> int:
"""Rough char/4 estimator used when the backend doesn't report usage
(e.g. streaming through Ollama, where LiteLLM's stream wrapper does
not always surface a final usage block)."""
return max(1, len(text) // 4)