"""Export an agent run as a portable, shareable trace (Sharing is Caring). The activity bus (``server/events.py``) groups every event from one agent run under a ``run_scope`` id. This module serializes such a run into a small, self-contained JSON envelope that a user can download and (optionally) publish to the Hugging Face Hub with ``training/share_trace.py``. Privacy: the bus is structural by design — every ``emit(...)`` carries counts + short status strings, never event titles or raw thread text. The *only* free-text that can carry personal data is the chat-name suffix in the ingest message (``app.py``: ``"N msg(s) from {chats}"``). With ``redact=True`` (the default) that tail is dropped. Steps use a fixed key allowlist, so a future payload key can't silently leak into a shared trace. """ from __future__ import annotations import json import os import re import tempfile from datetime import datetime from . import events as bus TRACE_SCHEMA = "imessage-cal-trace" TRACE_SCHEMA_VERSION = 1 # Only these keys ever appear in an exported step (allowlist, not denylist). _STEP_KEYS = ("stage", "level", "ts", "latency_ms", "events", "conflicts", "images", "tokens") def _scrub_message(stage: str, message: str, redact: bool) -> str: """All bus messages are structural except the ingest one, which appends ``" from {chats}"`` (chat names — PII). Drop that tail when redacting.""" if redact and stage == "ingest": # "3 msg(s) from 3rd grade chat" -> "3 msg(s)" return re.sub(r"\s+from\s+.*$", "", message) return message def _step(ev: dict, redact: bool) -> dict: step = {k: ev[k] for k in _STEP_KEYS if k in ev} step["message"] = _scrub_message(ev.get("stage", ""), ev.get("message", ""), redact) return step def export_run(run_id: str | None = None, redact: bool = True) -> dict: """Serialize one agent run (newest by default) into a shareable envelope. Returns a valid empty envelope (``steps == []``) when there is no matching run, so callers don't need to handle exceptions. """ runs = bus.recent_runs(n=50) # newest first evs: list[dict] = [] rid = run_id if run_id is None: if runs: rid, evs = runs[0] else: for r, e in runs: if r == run_id: evs = e break steps = [_step(e, redact) for e in evs] summary = { "steps": len(steps), "events": sum(s.get("events", 0) for s in steps), "conflicts": sum(s.get("conflicts", 0) for s in steps), "images": sum(s.get("images", 0) for s in steps), "model_calls": sum(1 for s in steps if s.get("latency_ms") is not None), "total_latency_ms": sum(s.get("latency_ms", 0) for s in steps), } return { "schema": TRACE_SCHEMA, "version": TRACE_SCHEMA_VERSION, "exported_at": datetime.now().isoformat(timespec="seconds"), "run_id": rid, # run ids look like "12:analyze" — the label is the part after ":". "run_label": (rid.split(":", 1)[1] if rid and ":" in rid else None), "redacted": redact, "steps": steps, "summary": summary, } def write_trace(trace: dict, path: str | None = None) -> str: """Write a trace envelope to a JSON file and return the path (Gradio download). Mirrors ``calendar_out.ics.write_ics``. Deliberately does NOT emit a bus event — that would mutate the very run being exported. """ if path is None: fd, path = tempfile.mkstemp(suffix=".json", prefix="trace_") os.close(fd) with open(path, "w", encoding="utf-8") as f: json.dump(trace, f, indent=2, ensure_ascii=False) return path