OffGridSchedula / server /events.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
3.68 kB
"""In-process activity bus: every pipeline stage emits structured events here so
the Activity dashboard can show what the LLM and agent are doing in real time.
A thread-safe ring buffer holds recent events. A contextvar (run_scope) tags all
events emitted during one agent run with the same run id, so the dashboard can
group them into per-run traces.
"""
from __future__ import annotations
import threading
from collections import deque
from contextlib import contextmanager
from contextvars import ContextVar
from datetime import datetime
from itertools import count
MAXLEN = 800
# Stages of the pipeline, in display order (used by the stepper + chart).
STAGES = ["ingest", "vision", "model", "decision", "conflict", "calendar"]
_BUF: deque[dict] = deque(maxlen=MAXLEN)
_lock = threading.Lock()
_run_var: ContextVar = ContextVar("agent_run", default=None)
_seq = count(1)
def _now() -> str:
return datetime.now().isoformat(timespec="seconds")
def emit(stage: str, message: str, level: str = "info", **payload) -> dict:
"""Record one activity event. ``payload`` may carry latency_ms, events,
conflicts, images, tokens, etc. Returns the event dict."""
ev = {
"id": next(_seq),
"ts": _now(),
"stage": stage,
"level": level,
"message": message,
"run": _run_var.get(),
**payload,
}
with _lock:
_BUF.append(ev)
return ev
@contextmanager
def run_scope(label: str = ""):
"""Tag every event emitted inside the block with a shared run id."""
run_id = f"{next(_seq)}:{label}" if label else str(next(_seq))
token = _run_var.set(run_id)
try:
yield run_id
finally:
# Best-effort: when used inside a streaming generator that the server drives
# across different contexts (e.g. Gradio's queue), reset(token) raises
# "Token was created in a different Context". Clearing is enough either way.
try:
_run_var.reset(token)
except ValueError:
_run_var.set(None)
def recent(n: int = 120) -> list[dict]:
with _lock:
return list(_BUF)[-n:][::-1] # newest first
def current_stage() -> str | None:
with _lock:
return _BUF[-1]["stage"] if _BUF else None
def metrics() -> dict:
with _lock:
evs = list(_BUF)
lat = [e["latency_ms"] for e in evs if e.get("latency_ms")]
return {
"messages": sum(1 for e in evs if e["stage"] == "ingest"),
"events_created": sum(e.get("events", 0) for e in evs if e["stage"] == "decision"),
"conflicts": sum(e.get("conflicts", 0) for e in evs if e["stage"] == "conflict"),
"images_read": sum(e.get("images", 0) for e in evs),
"model_calls": len(lat),
"avg_latency_ms": round(sum(lat) / len(lat)) if lat else 0,
"errors": sum(1 for e in evs if e["level"] == "error"),
}
def stage_counts() -> list[dict]:
"""Counts per stage, ready for gr.BarPlot."""
with _lock:
evs = list(_BUF)
counts = {s: 0 for s in STAGES}
for e in evs:
if e["stage"] in counts:
counts[e["stage"]] += 1
return [{"stage": s, "count": counts[s]} for s in STAGES]
def recent_runs(n: int = 8) -> list[tuple[str, list[dict]]]:
"""Group recent events by run id (newest run first)."""
with _lock:
evs = list(_BUF)
groups: dict[str, list[dict]] = {}
order: list[str] = []
for e in evs:
rid = e.get("run")
if not rid:
continue
if rid not in groups:
groups[rid] = []
order.append(rid)
groups[rid].append(e)
return [(rid, groups[rid]) for rid in order[-n:][::-1]]