| """Export an agent run as a portable, shareable trace (Sharing is Caring). |
| |
| The activity bus (``server/events.py``) groups every event from one agent run |
| under a ``run_scope`` id. This module serializes such a run into a small, |
| self-contained JSON envelope that a user can download and (optionally) publish to |
| the Hugging Face Hub with ``training/share_trace.py``. |
| |
| Privacy: the bus is structural by design — every ``emit(...)`` carries counts + |
| short status strings, never event titles or raw thread text. The *only* free-text |
| that can carry personal data is the chat-name suffix in the ingest message |
| (``app.py``: ``"N msg(s) from {chats}"``). With ``redact=True`` (the default) that |
| tail is dropped. Steps use a fixed key allowlist, so a future payload key can't |
| silently leak into a shared trace. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| import re |
| import tempfile |
| from datetime import datetime |
|
|
| from . import events as bus |
|
|
| TRACE_SCHEMA = "imessage-cal-trace" |
| TRACE_SCHEMA_VERSION = 1 |
|
|
| |
| _STEP_KEYS = ("stage", "level", "ts", "latency_ms", "events", "conflicts", "images", "tokens") |
|
|
|
|
| def _scrub_message(stage: str, message: str, redact: bool) -> str: |
| """All bus messages are structural except the ingest one, which appends |
| ``" from {chats}"`` (chat names — PII). Drop that tail when redacting.""" |
| if redact and stage == "ingest": |
| |
| return re.sub(r"\s+from\s+.*$", "", message) |
| return message |
|
|
|
|
| def _step(ev: dict, redact: bool) -> dict: |
| step = {k: ev[k] for k in _STEP_KEYS if k in ev} |
| step["message"] = _scrub_message(ev.get("stage", ""), ev.get("message", ""), redact) |
| return step |
|
|
|
|
| def export_run(run_id: str | None = None, redact: bool = True) -> dict: |
| """Serialize one agent run (newest by default) into a shareable envelope. |
| |
| Returns a valid empty envelope (``steps == []``) when there is no matching |
| run, so callers don't need to handle exceptions. |
| """ |
| runs = bus.recent_runs(n=50) |
| evs: list[dict] = [] |
| rid = run_id |
| if run_id is None: |
| if runs: |
| rid, evs = runs[0] |
| else: |
| for r, e in runs: |
| if r == run_id: |
| evs = e |
| break |
|
|
| steps = [_step(e, redact) for e in evs] |
| summary = { |
| "steps": len(steps), |
| "events": sum(s.get("events", 0) for s in steps), |
| "conflicts": sum(s.get("conflicts", 0) for s in steps), |
| "images": sum(s.get("images", 0) for s in steps), |
| "model_calls": sum(1 for s in steps if s.get("latency_ms") is not None), |
| "total_latency_ms": sum(s.get("latency_ms", 0) for s in steps), |
| } |
| return { |
| "schema": TRACE_SCHEMA, |
| "version": TRACE_SCHEMA_VERSION, |
| "exported_at": datetime.now().isoformat(timespec="seconds"), |
| "run_id": rid, |
| |
| "run_label": (rid.split(":", 1)[1] if rid and ":" in rid else None), |
| "redacted": redact, |
| "steps": steps, |
| "summary": summary, |
| } |
|
|
|
|
| def write_trace(trace: dict, path: str | None = None) -> str: |
| """Write a trace envelope to a JSON file and return the path (Gradio download). |
| |
| Mirrors ``calendar_out.ics.write_ics``. Deliberately does NOT emit a bus event |
| — that would mutate the very run being exported. |
| """ |
| if path is None: |
| fd, path = tempfile.mkstemp(suffix=".json", prefix="trace_") |
| os.close(fd) |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(trace, f, indent=2, ensure_ascii=False) |
| return path |
|
|