Spaces:
Sleeping
Sleeping
| """Export an agent run as a portable, shareable trace (Sharing is Caring). | |
| The activity bus (``server/events.py``) groups every event from one agent run | |
| under a ``run_scope`` id. This module serializes such a run into a small, | |
| self-contained JSON envelope that a user can download and (optionally) publish to | |
| the Hugging Face Hub with ``training/share_trace.py``. | |
| Privacy: the bus is structural by design β every ``emit(...)`` carries counts + | |
| short status strings, never event titles or raw thread text. The *only* free-text | |
| that can carry personal data is the chat-name suffix in the ingest message | |
| (``app.py``: ``"N msg(s) from {chats}"``). With ``redact=True`` (the default) that | |
| tail is dropped. Steps use a fixed key allowlist, so a future payload key can't | |
| silently leak into a shared trace. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| import tempfile | |
| from datetime import datetime | |
| from . import events as bus | |
| TRACE_SCHEMA = "imessage-cal-trace" | |
| TRACE_SCHEMA_VERSION = 1 | |
| # Only these keys ever appear in an exported step (allowlist, not denylist). | |
| _STEP_KEYS = ("stage", "level", "ts", "latency_ms", "events", "conflicts", "images", "tokens") | |
| def _scrub_message(stage: str, message: str, redact: bool) -> str: | |
| """All bus messages are structural except the ingest one, which appends | |
| ``" from {chats}"`` (chat names β PII). Drop that tail when redacting.""" | |
| if redact and stage == "ingest": | |
| # "3 msg(s) from 3rd grade chat" -> "3 msg(s)" | |
| return re.sub(r"\s+from\s+.*$", "", message) | |
| return message | |
| def _step(ev: dict, redact: bool) -> dict: | |
| step = {k: ev[k] for k in _STEP_KEYS if k in ev} | |
| step["message"] = _scrub_message(ev.get("stage", ""), ev.get("message", ""), redact) | |
| return step | |
| def export_run(run_id: str | None = None, redact: bool = True) -> dict: | |
| """Serialize one agent run (newest by default) into a shareable envelope. | |
| Returns a valid empty envelope (``steps == []``) when there is no matching | |
| run, so callers don't need to handle exceptions. | |
| """ | |
| runs = bus.recent_runs(n=50) # newest first | |
| evs: list[dict] = [] | |
| rid = run_id | |
| if run_id is None: | |
| if runs: | |
| rid, evs = runs[0] | |
| else: | |
| for r, e in runs: | |
| if r == run_id: | |
| evs = e | |
| break | |
| steps = [_step(e, redact) for e in evs] | |
| summary = { | |
| "steps": len(steps), | |
| "events": sum(s.get("events", 0) for s in steps), | |
| "conflicts": sum(s.get("conflicts", 0) for s in steps), | |
| "images": sum(s.get("images", 0) for s in steps), | |
| "model_calls": sum(1 for s in steps if s.get("latency_ms") is not None), | |
| "total_latency_ms": sum(s.get("latency_ms", 0) for s in steps), | |
| } | |
| return { | |
| "schema": TRACE_SCHEMA, | |
| "version": TRACE_SCHEMA_VERSION, | |
| "exported_at": datetime.now().isoformat(timespec="seconds"), | |
| "run_id": rid, | |
| # run ids look like "12:analyze" β the label is the part after ":". | |
| "run_label": (rid.split(":", 1)[1] if rid and ":" in rid else None), | |
| "redacted": redact, | |
| "steps": steps, | |
| "summary": summary, | |
| } | |
| def write_trace(trace: dict, path: str | None = None) -> str: | |
| """Write a trace envelope to a JSON file and return the path (Gradio download). | |
| Mirrors ``calendar_out.ics.write_ics``. Deliberately does NOT emit a bus event | |
| β that would mutate the very run being exported. | |
| """ | |
| if path is None: | |
| fd, path = tempfile.mkstemp(suffix=".json", prefix="trace_") | |
| os.close(fd) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(trace, f, indent=2, ensure_ascii=False) | |
| return path | |