OffGridSchedula / server /trace.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
3.72 kB
"""Export an agent run as a portable, shareable trace (Sharing is Caring).
The activity bus (``server/events.py``) groups every event from one agent run
under a ``run_scope`` id. This module serializes such a run into a small,
self-contained JSON envelope that a user can download and (optionally) publish to
the Hugging Face Hub with ``training/share_trace.py``.
Privacy: the bus is structural by design — every ``emit(...)`` carries counts +
short status strings, never event titles or raw thread text. The *only* free-text
that can carry personal data is the chat-name suffix in the ingest message
(``app.py``: ``"N msg(s) from {chats}"``). With ``redact=True`` (the default) that
tail is dropped. Steps use a fixed key allowlist, so a future payload key can't
silently leak into a shared trace.
"""
from __future__ import annotations
import json
import os
import re
import tempfile
from datetime import datetime
from . import events as bus
TRACE_SCHEMA = "imessage-cal-trace"
TRACE_SCHEMA_VERSION = 1
# Only these keys ever appear in an exported step (allowlist, not denylist).
_STEP_KEYS = ("stage", "level", "ts", "latency_ms", "events", "conflicts", "images", "tokens")
def _scrub_message(stage: str, message: str, redact: bool) -> str:
"""All bus messages are structural except the ingest one, which appends
``" from {chats}"`` (chat names — PII). Drop that tail when redacting."""
if redact and stage == "ingest":
# "3 msg(s) from 3rd grade chat" -> "3 msg(s)"
return re.sub(r"\s+from\s+.*$", "", message)
return message
def _step(ev: dict, redact: bool) -> dict:
step = {k: ev[k] for k in _STEP_KEYS if k in ev}
step["message"] = _scrub_message(ev.get("stage", ""), ev.get("message", ""), redact)
return step
def export_run(run_id: str | None = None, redact: bool = True) -> dict:
"""Serialize one agent run (newest by default) into a shareable envelope.
Returns a valid empty envelope (``steps == []``) when there is no matching
run, so callers don't need to handle exceptions.
"""
runs = bus.recent_runs(n=50) # newest first
evs: list[dict] = []
rid = run_id
if run_id is None:
if runs:
rid, evs = runs[0]
else:
for r, e in runs:
if r == run_id:
evs = e
break
steps = [_step(e, redact) for e in evs]
summary = {
"steps": len(steps),
"events": sum(s.get("events", 0) for s in steps),
"conflicts": sum(s.get("conflicts", 0) for s in steps),
"images": sum(s.get("images", 0) for s in steps),
"model_calls": sum(1 for s in steps if s.get("latency_ms") is not None),
"total_latency_ms": sum(s.get("latency_ms", 0) for s in steps),
}
return {
"schema": TRACE_SCHEMA,
"version": TRACE_SCHEMA_VERSION,
"exported_at": datetime.now().isoformat(timespec="seconds"),
"run_id": rid,
# run ids look like "12:analyze" — the label is the part after ":".
"run_label": (rid.split(":", 1)[1] if rid and ":" in rid else None),
"redacted": redact,
"steps": steps,
"summary": summary,
}
def write_trace(trace: dict, path: str | None = None) -> str:
"""Write a trace envelope to a JSON file and return the path (Gradio download).
Mirrors ``calendar_out.ics.write_ics``. Deliberately does NOT emit a bus event
— that would mutate the very run being exported.
"""
if path is None:
fd, path = tempfile.mkstemp(suffix=".json", prefix="trace_")
os.close(fd)
with open(path, "w", encoding="utf-8") as f:
json.dump(trace, f, indent=2, ensure_ascii=False)
return path