"""Anonymized agent-trace capture and Hub publishing for Fabella. What gets captured ------------------ One row per successful (or failed) agent invocation. The row contains the full LangChain ReAct message trace (system prompt, user prompt, assistant tool calls, tool responses, final draft) plus the judge's structured verdict. This is the "Sharing is Caring" merit badge: a public, anonymized dataset of how the drafter and judge reason about real parent situations, so other builders can learn from the loop. Anonymization ------------- Before anything leaves the Space, the row is run through `_anonymize()`: 1. ``child_name`` is dropped from the request and replaced with ``"[name]"`` in the final draft. The drafter was already told to address the child as "you" (see ``agent.py::SYSTEM_PROMPT``), so the name is rarely in the draft to begin with, but we strip it defensively. 2. The raw situation text is **never** stored. Only its SHA-256 hash, the first 80 characters, and its length go into the public row. The hash lets downstream consumers deduplicate across rows; the truncated prefix is enough to read the topic ("grandma in the hospital", "moving to a new house") without keeping the parent's actual words. 3. Freeform history turns are replaced with role + length counts. The drafter already takes only the last 6 turns, but we strip the content anyway. 4. The drafter system prompt is shipped in full (it's a static string from this repo, not user input) so the dataset is self-explanatory. Opt-out ------- ``FABELLA_SHARE_TRACES=0`` disables capture entirely (this is the default). To re-enable the public dataset, set ``FABELLA_SHARE_TRACES=1`` on the Space. Per-request opt-out is also supported via ``share_trace=False`` on the ``ExplainRequest`` payload. Publishing ---------- Captured rows accumulate in an in-memory buffer, then are flushed to the Hub via ``huggingface_hub.HfApi().create_commit(...)`` with one ``CommitOperationAdd`` per row, each targeting a unique ``data/.json`` path. A background daemon flushes the buffer every ``FLUSH_INTERVAL_S`` seconds OR when the buffer hits ``FLUSH_BUFFER_SIZE`` rows, whichever comes first. The HF token is read from the ``HF_TOKEN`` env var, which HF Spaces injects automatically for the owning user. Why per-row files, not a single JSONL ------------------------------------ The previous version downloaded ``data/train-00000-of-00001.jsonl``, appended locally, and re-uploaded the whole file. That is a read-modify-write cycle which (a) loses rows when two Space replicas flush concurrently, and (b) re-uploads the full file on every push (O(n²) bandwidth as the dataset grows). The per-row UUID pattern is race-free across replicas (each row is its own commit, no shared mutable state on the Hub) and constant-cost per push. The dataset card's "preview" still works because ``huggingface_hub`` lists ``data/*.json`` in the repo tree. Startup probe ------------- On ``start()`` (only when capture is enabled) we call ``_probe()`` which atomically Add+Deletes a tiny ``data/.probe.json`` file in a single ``create_commit`` call, to verify (a) the HF token is valid, (b) the token can create the dataset repo, and (c) the token can commit files to it. The Add+Delete happen in one commit, so a partial failure can never leave the probe file in the public dataset. If the probe fails we log a single loud ERROR with the exception class and message and disable capture for the lifetime of the process — no silent drops. """ from __future__ import annotations import hashlib import json import logging import os import sys import threading import time import uuid from dataclasses import asdict, dataclass from datetime import datetime, timezone from typing import Any sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) log = logging.getLogger("fabella.traces") # --- Configuration --------------------------------------------------------- # Where the public dataset lives. The default is the hackathon org's # namespace (``build-small-hackathon/fabella-traces``) so the live Space # publishes to the canonical repo attached to the Space's "Datasets" tab. # For local dev where the Space's HF_TOKEN cannot create repos in the org # namespace, set ``FABELLA_TRACE_REPO=Kiy-K/fabella-traces`` (the maker's # personal dataset) to publish to a fallback path. DATASET_REPO = os.environ.get( "FABELLA_TRACE_REPO", "build-small-hackathon/fabella-traces", ) # Buffer flush triggers. The background flusher will push whenever EITHER # the buffer hits this many rows OR this many seconds have passed since the # last flush, whichever fires first. FLUSH_BUFFER_SIZE = int(os.environ.get("FABELLA_TRACE_FLUSH_SIZE", "5")) FLUSH_INTERVAL_S = float(os.environ.get("FABELLA_TRACE_FLUSH_INTERVAL_S", "300")) # Per-row upload path prefix. Each row lives at ``data/.json``, # keyed by the row's own UUID. This eliminates the read-modify-write race # that bit the old single-JSONL design when (a) the Space ran >1 replica # or (b) the timer-thread and size-triggered async flush interleaved. DATASET_DIR = "data" PROBE_PATH = f"{DATASET_DIR}/.probe.json" # Capture is OFF by default. The dataset card and schema are live at # ``build-small-hackathon/fabella-traces``; the only path to data right # now is the per-parent "Download my history" self-export button in # ``app.py::api_history_download``. To re-enable publishing for a new # deployment, set ``FABELLA_SHARE_TRACES=1`` on the Space and the # publisher will resume writing rows to the org dataset. SHARE_TRACES = os.environ.get("FABELLA_SHARE_TRACES", "0").lower() in ( "1", "true", "yes", "on", ) # --- Anonymization --------------------------------------------------------- _SITUATION_PREFIX_LEN = 60 def _hash_situation(situation: str) -> str: """Stable, short hash so the same situation deduplicates across rows.""" if not situation: return "" return "sha256:" + hashlib.sha256(situation.encode("utf-8")).hexdigest()[:16] def _truncate_situation(situation: str) -> str: """Keep a short, generic prefix of the situation for at-a-glance browsing. The full situation text is never sent to the Hub. We only ship a short generic prefix (60 chars) so a human scanning the dataset can see what *kind* of topic each row covers without leaking identifying details about a real family. Pair this with ``situation_hash`` (for dedup) and ``situation_length`` (for context). """ if not situation: return "" s = situation.strip() if len(s) <= _SITUATION_PREFIX_LEN: return s return s[:_SITUATION_PREFIX_LEN].rstrip() + "..." _SITUATION_USER_PROMPT_MARKERS = ( "The latest thing the parent is asking about:", "The latest thing the parent is asking about", ) def _scrub_situation_from_prompt(prompt: str) -> str: """Strip the situation text from the drafter's user prompt. The user_prompt is built in ``agent.py::_build_user_prompt`` and embeds the raw situation in a sentence like ``"The latest thing the parent is asking about: "``. We replace the embedded situation with a length-only marker so the prompt's structure is preserved in the row but the parent's words are not. """ if not prompt: return prompt out = prompt for marker in _SITUATION_USER_PROMPT_MARKERS: idx = out.find(marker) if idx < 0: continue prefix_end = idx + len(marker) newline = out.find("\n", prefix_end) if newline < 0: out = out[:prefix_end].rstrip() + " " break out = out[:prefix_end] + " " + out[newline:] return out def _scrub_name(value: str) -> str: """Replace the child's name with a placeholder if it appears in text.""" if not value: return value return value.replace("[name]", "[REDACTED_NAME]") or value def _anonymize_request(req: dict) -> dict: """Strip PII from the request metadata before publishing.""" child_name = (req.get("child_name") or "").strip() situation = req.get("situation") or "" history = req.get("history") or [] return { "age": int(req.get("age") or 0), "tone": (req.get("tone") or "").strip().lower(), "child_name_present": bool(child_name), "child_name_redacted": "[name]" if child_name else "", "situation_hash": _hash_situation(situation), "situation_preview": _truncate_situation(situation), "situation_length": len(situation), "history_turns": len(history), } def _stringify_message_content(content: Any) -> str: """Render a LangChain message ``content`` field as a plain string. LangChain 1.x allows ``content`` to be ``str`` (the common case) or a list of content blocks (e.g. ``[{"type": "text", "text": "..."}]``). List values used to crash the trace publisher because we did ``str(content)`` and then string-replaced on the resulting repr. Now we extract just the text blocks, which is what a human reading the dataset wants anyway. """ if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for block in content: if isinstance(block, dict): txt = block.get("text") if isinstance(txt, str): parts.append(txt) else: parts.append(str(block)) elif isinstance(block, str): parts.append(block) else: parts.append(str(block)) return "\n".join(parts) return str(content) def _anonymize_messages(messages: list) -> list: """Convert LangChain messages into a public-safe, JSON-only shape. Drops the message ``id`` field (internal LangChain state), serializes tool-call args to plain JSON, and replaces any ``[name]`` placeholder with a generic marker so the dataset reads consistently. """ out: list[dict[str, Any]] = [] for m in messages or []: kind = getattr(m, "type", "") or "unknown" content = _stringify_message_content(getattr(m, "content", None)) entry: dict[str, Any] = { "role": kind, "content": _scrub_name(content), } tool_calls = getattr(m, "tool_calls", None) if tool_calls: entry["tool_calls"] = [ { "id": tc.get("id") if isinstance(tc, dict) else None, "name": tc.get("name") if isinstance(tc, dict) else None, "args": tc.get("args") if isinstance(tc, dict) else {}, } for tc in tool_calls ] tool_call_id = getattr(m, "tool_call_id", None) if tool_call_id: entry["tool_call_id"] = tool_call_id out.append(entry) return out def _extract_judge_verdict(messages: list) -> dict | None: """Pull the latest validate_explanation judge verdict from the message list. ``agent.py`` runs ``validate_explanation`` as a tool. The tool returns a JSON string with the Pydantic-validated verdict (see ``judge.py``); the content of the matching ``ToolMessage`` is the source of truth. We surface it on the trace row so the public dataset carries the drafter-judge conversation end-to-end. Note: LangChain 1.x ``ToolMessage`` does not populate ``name`` from the tool's registered name, so we identify the verdict by its JSON shape (presence of ``ok`` / ``verdict`` / ``issues``) rather than by ``name``. """ last_verdict_msg = None for m in messages or []: if getattr(m, "type", "") != "tool": continue content = _stringify_message_content(getattr(m, "content", None)).strip() if not content: continue try: parsed = json.loads(content) except (TypeError, ValueError): continue if isinstance(parsed, dict) and ( "ok" in parsed or "verdict" in parsed or "issues" in parsed ): last_verdict_msg = m if last_verdict_msg is None: return None content = _stringify_message_content(getattr(last_verdict_msg, "content", None)).strip() try: parsed = json.loads(content) except (TypeError, ValueError): return None if not isinstance(parsed, dict): return None return _anonymize_judge(parsed) def _anonymize_judge(verdict: dict | None) -> dict | None: if not verdict: return None return { "ok": bool(verdict.get("ok")), "issues": [str(i)[:200] for i in (verdict.get("issues") or [])], "score": float(verdict.get("score") or 0.0), "verdict": str(verdict.get("verdict") or ""), "reasoning": str(verdict.get("reasoning") or "")[:300], } def _anonymize_draft(draft: dict) -> dict: """Drop the child's name from the final draft if it slipped in.""" out = {} for k in ("opener", "body", "closer", "followup"): out[k] = _scrub_name((draft.get(k) or "").strip()) return out # --- Trace record ---------------------------------------------------------- @dataclass class TraceRecord: """A single anonymized agent trace, ready to publish.""" trace_id: str created_at: str model_versions: dict[str, str] request: dict agent: dict judge: dict | None latency_ms: int tool_calls: int app_version: str schema_version: int = 1 def to_json(self) -> str: """Render the record as a single JSON object string.""" return json.dumps(asdict(self), ensure_ascii=False) # --- Capture API ----------------------------------------------------------- def build_trace_record( *, req: Any, user_prompt: str, system_prompt: str, messages: list, final_draft: dict, judge_verdict: dict | None, latency_ms: int, app_version: str = "fabella-1", ) -> TraceRecord: """Assemble a TraceRecord from the agent's inputs and outputs. Called from ``agent.py::run_agent`` AFTER the loop completes. The caller is responsible for passing the live messages list and the parsed final draft dict. """ tool_calls = sum( 1 for m in messages if getattr(m, "type", "") == "ai" and getattr(m, "tool_calls", None) ) return TraceRecord( trace_id=str(uuid.uuid4()), created_at=datetime.now(timezone.utc).isoformat(), model_versions={ "drafter": "google/gemma-4-E4B-it", "judge": "nvidia/NVIDIA-Nemotron-3-Nano-4B-BF16", }, request=_anonymize_request( { "age": getattr(req, "age", 0), "tone": getattr(req, "tone", ""), "child_name": getattr(req, "child_name", ""), "situation": getattr(req, "situation", ""), "history": getattr(req, "history", []) or [], } ), agent={ "system_prompt": system_prompt, "user_prompt": _scrub_situation_from_prompt(_scrub_name(user_prompt)), "messages": _anonymize_messages(messages), "final_draft": _anonymize_draft(final_draft), }, judge=( _anonymize_judge(judge_verdict) if judge_verdict is not None else _extract_judge_verdict(messages) ), latency_ms=int(latency_ms), tool_calls=tool_calls, app_version=app_version, ) # --- Publisher (background buffer + flush) -------------------------------- class TracePublisher: """Thread-safe in-memory buffer that flushes trace rows to the Hub. The publisher is started once at app import time. ``submit()`` is non-blocking (it just appends to the buffer). The background thread flushes on a timer AND when the buffer crosses ``FLUSH_BUFFER_SIZE``, so a burst of traffic doesn't sit in memory for the full interval. All flushes are serialized through ``_flush_lock`` so two threads can never push overlapping Hub revisions. Each row is uploaded as its own ``data/.json`` file via a single ``create_commit`` per flush batch, so concurrent flushes (or >1 Space replica) cannot lose rows to a read-modify-write race on a shared JSONL file. """ def __init__(self) -> None: self._buffer: list[TraceRecord] = [] self._lock = threading.Lock() self._flush_lock = threading.Lock() self._last_flush = time.monotonic() # HF Spaces inject the token as HF_TOKEN. Some runtimes use HF_HUB_TOKEN # instead; accept either so a misconfigured Space does not silently # disable trace publishing. self._hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HF_HUB_TOKEN") or "" self._enabled = SHARE_TRACES and bool(self._hf_token) self._stop = threading.Event() self._thread: threading.Thread | None = None # How many rows we will buffer before refusing new submissions. Set # high enough to ride out a transient Hub failure, low enough to # avoid leaking memory if the Hub is permanently unreachable. self._max_buffer = max(FLUSH_BUFFER_SIZE * 4, 50) # Used to throttle repeated failure logs to one ERROR per N attempts. self._consecutive_failures = 0 @property def enabled(self) -> bool: return self._enabled def start(self) -> None: if not SHARE_TRACES: log.info("[traces] publishing disabled (FABELLA_SHARE_TRACES=0)") return if not self._hf_token: # Loud at WARNING so the Space logs make the missing-token case # obvious. The previous version's INFO-only line made it look # like an intentional no-op. log.warning( "[traces] publishing DISABLED: HF_TOKEN (or HF_HUB_TOKEN) is " "not set on this Space. Trace rows will be dropped on the floor. " "Set HF_TOKEN to the Space owner's write token to enable capture." ) return if self._thread is not None and self._thread.is_alive(): return # Ensure the dataset repo exists, then run a write+delete probe so a # misconfigured Space (bad token, read-only token, no create rights) # fails LOUDLY at import time instead of silently dropping rows for # hours. If either step fails we disable capture for the lifetime of # the process; the per-parent self-export path in app.py keeps # working regardless. if not self._ensure_repo(): self._enabled = False return if not self._probe(): self._enabled = False return self._stop.clear() self._thread = threading.Thread( target=self._run, name="fabella-trace-flusher", daemon=True ) self._thread.start() log.info( f"[traces] publisher started, repo={DATASET_REPO} " f"flush_size={FLUSH_BUFFER_SIZE} flush_interval_s={FLUSH_INTERVAL_S} " f"max_buffer={self._max_buffer}" ) def stop(self) -> None: self._stop.set() def submit(self, record: TraceRecord) -> None: """Non-blocking. Drops the record if publishing is disabled or full.""" if not self._enabled: return with self._lock: if len(self._buffer) >= self._max_buffer: # The Hub has been failing for a while and the buffer is # full. Drop the oldest rows so a recovered Hub push only # loses ancient data, not fresh rows. dropped = len(self._buffer) - self._max_buffer + 1 del self._buffer[:dropped] log.warning( f"[traces] buffer full ({self._max_buffer}); dropped " f"{dropped} oldest row(s) while Hub push is failing" ) self._buffer.append(record) should_flush = len(self._buffer) >= FLUSH_BUFFER_SIZE if should_flush: self._flush_async() def _run(self) -> None: while not self._stop.is_set(): # Sleep in small slices so stop() returns promptly. slept = 0.0 while slept < FLUSH_INTERVAL_S and not self._stop.is_set(): self._stop.wait(min(1.0, FLUSH_INTERVAL_S - slept)) slept = FLUSH_INTERVAL_S if self._stop.is_set() else slept + 1.0 if self._stop.is_set(): break self._flush() def _flush_async(self) -> None: threading.Thread(target=self._flush, daemon=True).start() def _flush(self) -> None: if not self._enabled: return with self._flush_lock: with self._lock: if not self._buffer: return rows = self._buffer self._buffer = [] try: self._push_to_hub(rows) self._last_flush = time.monotonic() self._consecutive_failures = 0 log.info(f"[traces] flushed {len(rows)} rows to {DATASET_REPO}") except Exception as e: self._consecutive_failures += 1 # Don't lose data on transient failures. Put the rows back # at the head of the buffer so the next flush retries. Log # at WARNING the first few times, then ERROR on every 10th # so a stuck Hub does not spam the Space log. with self._lock: self._buffer = rows + self._buffer if self._consecutive_failures <= 3 or self._consecutive_failures % 10 == 0: log.log( logging.ERROR if self._consecutive_failures > 3 else logging.WARNING, f"[traces] push failed " f"({self._consecutive_failures} consecutive): " f"{type(e).__name__}: {e}; re-queuing {len(rows)} rows. " f"Check that the Space's HF_TOKEN has write access to " f"'{DATASET_REPO}'.", ) def _commit(self, operations, message, description=None) -> None: """Single-call wrapper around ``HfApi.create_commit`` for the dataset repo. All Hub writes from the publisher go through this helper so the ``commit_message`` / ``commit_description`` format and the ``repo_id`` / ``repo_type`` kwargs stay consistent across the per-row flush and the startup probe. If we ever need to add e.g. ``revision`` or a custom user-agent, we change one place. """ from huggingface_hub import HfApi kwargs = {"commit_description": description} if description else {} HfApi(token=self._hf_token).create_commit( repo_id=DATASET_REPO, repo_type="dataset", operations=list(operations), commit_message=message, **kwargs, ) def _push_to_hub(self, rows: list[TraceRecord]) -> None: """Upload a batch of rows as one commit with one operation per row. Each row is its own ``data/.json`` file, so the upload is race-free across Space replicas and across the timer-thread vs. size-triggered async flush within one process. One ``create_commit`` call wraps the whole batch as a single commit on the Hub, so the git history is one commit per flush (not one per row). """ from huggingface_hub import CommitOperationAdd operations = [ CommitOperationAdd( path_in_repo=f"{DATASET_DIR}/{r.trace_id}.json", path_or_fileobj=r.to_json().encode("utf-8"), ) for r in rows ] self._commit( operations, message=f"append {len(rows)} fabella trace row(s)", description=( f"Append {len(rows)} anonymized Fabella trace row(s). " f"Schema: fabella.trace.v1. No raw situation, no child name." ), ) def _ensure_repo(self) -> bool: """Create the dataset repo if it doesn't exist. Return True on success. A 401/403 here means the Space's ``HF_TOKEN`` doesn't have ``repo.create`` rights in the target namespace (e.g. it can read ``Kiy-K/fabella-traces`` but not create it). In that case the Space owner must pre-create the dataset and grant the token write access, OR override ``FABELLA_TRACE_REPO`` to a namespace where the token can create. We return ``False`` so the caller can disable capture loudly instead of failing every flush. """ from huggingface_hub import HfApi try: HfApi(token=self._hf_token).create_repo( repo_id=DATASET_REPO, repo_type="dataset", exist_ok=True, private=False, ) return True except Exception as e: log.error( f"[traces] cannot create or access dataset repo " f"'{DATASET_REPO}': {type(e).__name__}: {e}. " f"Pre-create the dataset on the Hub and grant the Space's " f"HF_TOKEN write access, or set FABELLA_TRACE_REPO to a " f"namespace where the token can create repos. " f"Trace capture DISABLED for this process." ) return False def _probe(self) -> bool: """One-shot write+delete in a single atomic commit to verify Hub access. Catches (a) bad token, (b) missing repo-create rights, (c) missing repo-write rights — all at import time, so a misconfigured Space doesn't silently drop rows for hours before anyone notices. The Add and Delete of ``PROBE_PATH`` are bundled into a single ``create_commit`` call so the probe is atomic on the Hub: either the commit succeeds (probe is added then removed in one revision) or it fails (probe is never created). There is no intermediate state where a sentinel file sits in the public dataset. Returns True if the round-trip succeeded. """ from huggingface_hub import CommitOperationAdd, CommitOperationDelete import warnings payload = json.dumps( { "probe": True, "ts": datetime.now(timezone.utc).isoformat(), "pid": os.getpid(), } ).encode("utf-8") try: # The ``huggingface_hub`` SDK emits a ``UserWarning`` when an # Add and a Delete target the same path in one commit. We # intentionally rely on this pattern (atomic probe), so # silence just this warning around the call. The commit # itself still executes correctly. with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="About to delete a file that have just been updated", category=UserWarning, ) self._commit( [ CommitOperationAdd( path_in_repo=PROBE_PATH, path_or_fileobj=payload, ), CommitOperationDelete(path_in_repo=PROBE_PATH), ], message="fabella trace publisher startup probe", description=( "Atomic Add+Delete of .probe.json to verify the HF_TOKEN " "can commit to the dataset repo. The probe file is never " "left in the repo on success." ), ) log.info(f"[traces] probe OK against {DATASET_REPO}") return True except Exception as e: log.error( f"[traces] probe FAILED against {DATASET_REPO}: " f"{type(e).__name__}: {e}" ) return False # Module-level singleton. Imported by app.py; ``start()`` is called once at # import time below. publisher = TracePublisher()