| """In-process activity bus: every pipeline stage emits structured events here so |
| the Activity dashboard can show what the LLM and agent are doing in real time. |
| |
| A thread-safe ring buffer holds recent events. A contextvar (run_scope) tags all |
| events emitted during one agent run with the same run id, so the dashboard can |
| group them into per-run traces. |
| """ |
| from __future__ import annotations |
|
|
| import threading |
| from collections import deque |
| from contextlib import contextmanager |
| from contextvars import ContextVar |
| from datetime import datetime |
| from itertools import count |
|
|
| MAXLEN = 800 |
|
|
| |
| STAGES = ["ingest", "vision", "model", "decision", "conflict", "calendar"] |
|
|
| _BUF: deque[dict] = deque(maxlen=MAXLEN) |
| _lock = threading.Lock() |
| _run_var: ContextVar = ContextVar("agent_run", default=None) |
| _seq = count(1) |
|
|
|
|
| def _now() -> str: |
| return datetime.now().isoformat(timespec="seconds") |
|
|
|
|
| def emit(stage: str, message: str, level: str = "info", **payload) -> dict: |
| """Record one activity event. ``payload`` may carry latency_ms, events, |
| conflicts, images, tokens, etc. Returns the event dict.""" |
| ev = { |
| "id": next(_seq), |
| "ts": _now(), |
| "stage": stage, |
| "level": level, |
| "message": message, |
| "run": _run_var.get(), |
| **payload, |
| } |
| with _lock: |
| _BUF.append(ev) |
| return ev |
|
|
|
|
| @contextmanager |
| def run_scope(label: str = ""): |
| """Tag every event emitted inside the block with a shared run id.""" |
| run_id = f"{next(_seq)}:{label}" if label else str(next(_seq)) |
| token = _run_var.set(run_id) |
| try: |
| yield run_id |
| finally: |
| |
| |
| |
| try: |
| _run_var.reset(token) |
| except ValueError: |
| _run_var.set(None) |
|
|
|
|
| def recent(n: int = 120) -> list[dict]: |
| with _lock: |
| return list(_BUF)[-n:][::-1] |
|
|
|
|
| def current_stage() -> str | None: |
| with _lock: |
| return _BUF[-1]["stage"] if _BUF else None |
|
|
|
|
| def metrics() -> dict: |
| with _lock: |
| evs = list(_BUF) |
| lat = [e["latency_ms"] for e in evs if e.get("latency_ms")] |
| return { |
| "messages": sum(1 for e in evs if e["stage"] == "ingest"), |
| "events_created": sum(e.get("events", 0) for e in evs if e["stage"] == "decision"), |
| "conflicts": sum(e.get("conflicts", 0) for e in evs if e["stage"] == "conflict"), |
| "images_read": sum(e.get("images", 0) for e in evs), |
| "model_calls": len(lat), |
| "avg_latency_ms": round(sum(lat) / len(lat)) if lat else 0, |
| "errors": sum(1 for e in evs if e["level"] == "error"), |
| } |
|
|
|
|
| def stage_counts() -> list[dict]: |
| """Counts per stage, ready for gr.BarPlot.""" |
| with _lock: |
| evs = list(_BUF) |
| counts = {s: 0 for s in STAGES} |
| for e in evs: |
| if e["stage"] in counts: |
| counts[e["stage"]] += 1 |
| return [{"stage": s, "count": counts[s]} for s in STAGES] |
|
|
|
|
| def recent_runs(n: int = 8) -> list[tuple[str, list[dict]]]: |
| """Group recent events by run id (newest run first).""" |
| with _lock: |
| evs = list(_BUF) |
| groups: dict[str, list[dict]] = {} |
| order: list[str] = [] |
| for e in evs: |
| rid = e.get("run") |
| if not rid: |
| continue |
| if rid not in groups: |
| groups[rid] = [] |
| order.append(rid) |
| groups[rid].append(e) |
| return [(rid, groups[rid]) for rid in order[-n:][::-1]] |
|
|