"""In-process activity bus: every pipeline stage emits structured events here so the Activity dashboard can show what the LLM and agent are doing in real time. A thread-safe ring buffer holds recent events. A contextvar (run_scope) tags all events emitted during one agent run with the same run id, so the dashboard can group them into per-run traces. """ from __future__ import annotations import threading from collections import deque from contextlib import contextmanager from contextvars import ContextVar from datetime import datetime from itertools import count MAXLEN = 800 # Stages of the pipeline, in display order (used by the stepper + chart). STAGES = ["ingest", "vision", "model", "decision", "conflict", "calendar"] _BUF: deque[dict] = deque(maxlen=MAXLEN) _lock = threading.Lock() _run_var: ContextVar = ContextVar("agent_run", default=None) _seq = count(1) def _now() -> str: return datetime.now().isoformat(timespec="seconds") def emit(stage: str, message: str, level: str = "info", **payload) -> dict: """Record one activity event. ``payload`` may carry latency_ms, events, conflicts, images, tokens, etc. Returns the event dict.""" ev = { "id": next(_seq), "ts": _now(), "stage": stage, "level": level, "message": message, "run": _run_var.get(), **payload, } with _lock: _BUF.append(ev) return ev @contextmanager def run_scope(label: str = ""): """Tag every event emitted inside the block with a shared run id.""" run_id = f"{next(_seq)}:{label}" if label else str(next(_seq)) token = _run_var.set(run_id) try: yield run_id finally: # Best-effort: when used inside a streaming generator that the server drives # across different contexts (e.g. Gradio's queue), reset(token) raises # "Token was created in a different Context". Clearing is enough either way. try: _run_var.reset(token) except ValueError: _run_var.set(None) def recent(n: int = 120) -> list[dict]: with _lock: return list(_BUF)[-n:][::-1] # newest first def current_stage() -> str | None: with _lock: return _BUF[-1]["stage"] if _BUF else None def metrics() -> dict: with _lock: evs = list(_BUF) lat = [e["latency_ms"] for e in evs if e.get("latency_ms")] return { "messages": sum(1 for e in evs if e["stage"] == "ingest"), "events_created": sum(e.get("events", 0) for e in evs if e["stage"] == "decision"), "conflicts": sum(e.get("conflicts", 0) for e in evs if e["stage"] == "conflict"), "images_read": sum(e.get("images", 0) for e in evs), "model_calls": len(lat), "avg_latency_ms": round(sum(lat) / len(lat)) if lat else 0, "errors": sum(1 for e in evs if e["level"] == "error"), } def stage_counts() -> list[dict]: """Counts per stage, ready for gr.BarPlot.""" with _lock: evs = list(_BUF) counts = {s: 0 for s in STAGES} for e in evs: if e["stage"] in counts: counts[e["stage"]] += 1 return [{"stage": s, "count": counts[s]} for s in STAGES] def recent_runs(n: int = 8) -> list[tuple[str, list[dict]]]: """Group recent events by run id (newest run first).""" with _lock: evs = list(_BUF) groups: dict[str, list[dict]] = {} order: list[str] = [] for e in evs: rid = e.get("run") if not rid: continue if rid not in groups: groups[rid] = [] order.append(rid) groups[rid].append(e) return [(rid, groups[rid]) for rid in order[-n:][::-1]]