Spaces:
Running
Running
| """Anonymized agent-trace capture and Hub publishing for Fabella. | |
| What gets captured | |
| ------------------ | |
| One row per successful (or failed) agent invocation. The row contains | |
| the full LangChain ReAct message trace (system prompt, user prompt, | |
| assistant tool calls, tool responses, final draft) plus the judge's | |
| structured verdict. This is the "Sharing is Caring" merit badge: a public, | |
| anonymized dataset of how the drafter and judge reason about real parent | |
| situations, so other builders can learn from the loop. | |
| Anonymization | |
| ------------- | |
| Before anything leaves the Space, the row is run through `_anonymize()`: | |
| 1. ``child_name`` is dropped from the request and replaced with ``"[name]"`` | |
| in the final draft. The drafter was already told to address the child as | |
| "you" (see ``agent.py::SYSTEM_PROMPT``), so the name is rarely in the | |
| draft to begin with, but we strip it defensively. | |
| 2. The raw situation text is **never** stored. Only its SHA-256 hash, the | |
| first 80 characters, and its length go into the public row. The hash lets | |
| downstream consumers deduplicate across rows; the truncated prefix is | |
| enough to read the topic ("grandma in the hospital", "moving to a new | |
| house") without keeping the parent's actual words. | |
| 3. Freeform history turns are replaced with role + length counts. The drafter | |
| already takes only the last 6 turns, but we strip the content anyway. | |
| 4. The drafter system prompt is shipped in full (it's a static string from | |
| this repo, not user input) so the dataset is self-explanatory. | |
| Opt-out | |
| ------- | |
| ``FABELLA_SHARE_TRACES=0`` disables capture entirely (this is the | |
| default). To re-enable the public dataset, set | |
| ``FABELLA_SHARE_TRACES=1`` on the Space. Per-request opt-out is also | |
| supported via ``share_trace=False`` on the ``ExplainRequest`` payload. | |
| Publishing | |
| ---------- | |
| Captured rows accumulate in an in-memory buffer, then are flushed to the | |
| Hub via ``huggingface_hub.HfApi().create_commit(...)`` with one | |
| ``CommitOperationAdd`` per row, each targeting a unique | |
| ``data/<trace_id>.json`` path. A background daemon flushes the buffer every | |
| ``FLUSH_INTERVAL_S`` seconds OR when the buffer hits ``FLUSH_BUFFER_SIZE`` | |
| rows, whichever comes first. The HF token is read from the ``HF_TOKEN`` | |
| env var, which HF Spaces injects automatically for the owning user. | |
| Why per-row files, not a single JSONL | |
| ------------------------------------ | |
| The previous version downloaded ``data/train-00000-of-00001.jsonl``, | |
| appended locally, and re-uploaded the whole file. That is a read-modify-write | |
| cycle which (a) loses rows when two Space replicas flush concurrently, and | |
| (b) re-uploads the full file on every push (O(n²) bandwidth as the dataset | |
| grows). The per-row UUID pattern is race-free across replicas (each row is | |
| its own commit, no shared mutable state on the Hub) and constant-cost per | |
| push. The dataset card's "preview" still works because ``huggingface_hub`` | |
| lists ``data/*.json`` in the repo tree. | |
| Startup probe | |
| ------------- | |
| On ``start()`` (only when capture is enabled) we call ``_probe()`` which | |
| atomically Add+Deletes a tiny ``data/.probe.json`` file in a single | |
| ``create_commit`` call, to verify (a) the HF token is valid, (b) the | |
| token can create the dataset repo, and (c) the token can commit files | |
| to it. The Add+Delete happen in one commit, so a partial failure can | |
| never leave the probe file in the public dataset. If the probe fails we | |
| log a single loud ERROR with the exception class and message and | |
| disable capture for the lifetime of the process — no silent drops. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| import threading | |
| import time | |
| import uuid | |
| from dataclasses import asdict, dataclass | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| log = logging.getLogger("fabella.traces") | |
| # --- Configuration --------------------------------------------------------- | |
| # Where the public dataset lives. The default is the hackathon org's | |
| # namespace (``build-small-hackathon/fabella-traces``) so the live Space | |
| # publishes to the canonical repo attached to the Space's "Datasets" tab. | |
| # For local dev where the Space's HF_TOKEN cannot create repos in the org | |
| # namespace, set ``FABELLA_TRACE_REPO=Kiy-K/fabella-traces`` (the maker's | |
| # personal dataset) to publish to a fallback path. | |
| DATASET_REPO = os.environ.get( | |
| "FABELLA_TRACE_REPO", | |
| "build-small-hackathon/fabella-traces", | |
| ) | |
| # Buffer flush triggers. The background flusher will push whenever EITHER | |
| # the buffer hits this many rows OR this many seconds have passed since the | |
| # last flush, whichever fires first. | |
| FLUSH_BUFFER_SIZE = int(os.environ.get("FABELLA_TRACE_FLUSH_SIZE", "5")) | |
| FLUSH_INTERVAL_S = float(os.environ.get("FABELLA_TRACE_FLUSH_INTERVAL_S", "300")) | |
| # Per-row upload path prefix. Each row lives at ``data/<trace_id>.json``, | |
| # keyed by the row's own UUID. This eliminates the read-modify-write race | |
| # that bit the old single-JSONL design when (a) the Space ran >1 replica | |
| # or (b) the timer-thread and size-triggered async flush interleaved. | |
| DATASET_DIR = "data" | |
| PROBE_PATH = f"{DATASET_DIR}/.probe.json" | |
| # Capture is OFF by default. The dataset card and schema are live at | |
| # ``build-small-hackathon/fabella-traces``; the only path to data right | |
| # now is the per-parent "Download my history" self-export button in | |
| # ``app.py::api_history_download``. To re-enable publishing for a new | |
| # deployment, set ``FABELLA_SHARE_TRACES=1`` on the Space and the | |
| # publisher will resume writing rows to the org dataset. | |
| SHARE_TRACES = os.environ.get("FABELLA_SHARE_TRACES", "0").lower() in ( | |
| "1", | |
| "true", | |
| "yes", | |
| "on", | |
| ) | |
| # --- Anonymization --------------------------------------------------------- | |
| _SITUATION_PREFIX_LEN = 60 | |
| def _hash_situation(situation: str) -> str: | |
| """Stable, short hash so the same situation deduplicates across rows.""" | |
| if not situation: | |
| return "" | |
| return "sha256:" + hashlib.sha256(situation.encode("utf-8")).hexdigest()[:16] | |
| def _truncate_situation(situation: str) -> str: | |
| """Keep a short, generic prefix of the situation for at-a-glance browsing. | |
| The full situation text is never sent to the Hub. We only ship a short | |
| generic prefix (60 chars) so a human scanning the dataset can see what | |
| *kind* of topic each row covers without leaking identifying details | |
| about a real family. Pair this with ``situation_hash`` (for dedup) and | |
| ``situation_length`` (for context). | |
| """ | |
| if not situation: | |
| return "" | |
| s = situation.strip() | |
| if len(s) <= _SITUATION_PREFIX_LEN: | |
| return s | |
| return s[:_SITUATION_PREFIX_LEN].rstrip() + "..." | |
| _SITUATION_USER_PROMPT_MARKERS = ( | |
| "The latest thing the parent is asking about:", | |
| "The latest thing the parent is asking about", | |
| ) | |
| def _scrub_situation_from_prompt(prompt: str) -> str: | |
| """Strip the situation text from the drafter's user prompt. | |
| The user_prompt is built in ``agent.py::_build_user_prompt`` and | |
| embeds the raw situation in a sentence like ``"The latest thing the | |
| parent is asking about: <raw situation>"``. We replace the embedded | |
| situation with a length-only marker so the prompt's structure is | |
| preserved in the row but the parent's words are not. | |
| """ | |
| if not prompt: | |
| return prompt | |
| out = prompt | |
| for marker in _SITUATION_USER_PROMPT_MARKERS: | |
| idx = out.find(marker) | |
| if idx < 0: | |
| continue | |
| prefix_end = idx + len(marker) | |
| newline = out.find("\n", prefix_end) | |
| if newline < 0: | |
| out = out[:prefix_end].rstrip() + " <redacted, see request.situation_length>" | |
| break | |
| out = out[:prefix_end] + " <redacted>" + out[newline:] | |
| return out | |
| def _scrub_name(value: str) -> str: | |
| """Replace the child's name with a placeholder if it appears in text.""" | |
| if not value: | |
| return value | |
| return value.replace("[name]", "[REDACTED_NAME]") or value | |
| def _anonymize_request(req: dict) -> dict: | |
| """Strip PII from the request metadata before publishing.""" | |
| child_name = (req.get("child_name") or "").strip() | |
| situation = req.get("situation") or "" | |
| history = req.get("history") or [] | |
| return { | |
| "age": int(req.get("age") or 0), | |
| "tone": (req.get("tone") or "").strip().lower(), | |
| "child_name_present": bool(child_name), | |
| "child_name_redacted": "[name]" if child_name else "", | |
| "situation_hash": _hash_situation(situation), | |
| "situation_preview": _truncate_situation(situation), | |
| "situation_length": len(situation), | |
| "history_turns": len(history), | |
| } | |
| def _stringify_message_content(content: Any) -> str: | |
| """Render a LangChain message ``content`` field as a plain string. | |
| LangChain 1.x allows ``content`` to be ``str`` (the common case) or a | |
| list of content blocks (e.g. ``[{"type": "text", "text": "..."}]``). | |
| List values used to crash the trace publisher because we did | |
| ``str(content)`` and then string-replaced on the resulting repr. Now | |
| we extract just the text blocks, which is what a human reading the | |
| dataset wants anyway. | |
| """ | |
| if content is None: | |
| return "" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for block in content: | |
| if isinstance(block, dict): | |
| txt = block.get("text") | |
| if isinstance(txt, str): | |
| parts.append(txt) | |
| else: | |
| parts.append(str(block)) | |
| elif isinstance(block, str): | |
| parts.append(block) | |
| else: | |
| parts.append(str(block)) | |
| return "\n".join(parts) | |
| return str(content) | |
| def _anonymize_messages(messages: list) -> list: | |
| """Convert LangChain messages into a public-safe, JSON-only shape. | |
| Drops the message ``id`` field (internal LangChain state), serializes | |
| tool-call args to plain JSON, and replaces any ``[name]`` placeholder | |
| with a generic marker so the dataset reads consistently. | |
| """ | |
| out: list[dict[str, Any]] = [] | |
| for m in messages or []: | |
| kind = getattr(m, "type", "") or "unknown" | |
| content = _stringify_message_content(getattr(m, "content", None)) | |
| entry: dict[str, Any] = { | |
| "role": kind, | |
| "content": _scrub_name(content), | |
| } | |
| tool_calls = getattr(m, "tool_calls", None) | |
| if tool_calls: | |
| entry["tool_calls"] = [ | |
| { | |
| "id": tc.get("id") if isinstance(tc, dict) else None, | |
| "name": tc.get("name") if isinstance(tc, dict) else None, | |
| "args": tc.get("args") if isinstance(tc, dict) else {}, | |
| } | |
| for tc in tool_calls | |
| ] | |
| tool_call_id = getattr(m, "tool_call_id", None) | |
| if tool_call_id: | |
| entry["tool_call_id"] = tool_call_id | |
| out.append(entry) | |
| return out | |
| def _extract_judge_verdict(messages: list) -> dict | None: | |
| """Pull the latest validate_explanation judge verdict from the message list. | |
| ``agent.py`` runs ``validate_explanation`` as a tool. The tool returns a | |
| JSON string with the Pydantic-validated verdict (see ``judge.py``); the | |
| content of the matching ``ToolMessage`` is the source of truth. We | |
| surface it on the trace row so the public dataset carries the | |
| drafter-judge conversation end-to-end. | |
| Note: LangChain 1.x ``ToolMessage`` does not populate ``name`` from the | |
| tool's registered name, so we identify the verdict by its JSON shape | |
| (presence of ``ok`` / ``verdict`` / ``issues``) rather than by ``name``. | |
| """ | |
| last_verdict_msg = None | |
| for m in messages or []: | |
| if getattr(m, "type", "") != "tool": | |
| continue | |
| content = _stringify_message_content(getattr(m, "content", None)).strip() | |
| if not content: | |
| continue | |
| try: | |
| parsed = json.loads(content) | |
| except (TypeError, ValueError): | |
| continue | |
| if isinstance(parsed, dict) and ( | |
| "ok" in parsed or "verdict" in parsed or "issues" in parsed | |
| ): | |
| last_verdict_msg = m | |
| if last_verdict_msg is None: | |
| return None | |
| content = _stringify_message_content(getattr(last_verdict_msg, "content", None)).strip() | |
| try: | |
| parsed = json.loads(content) | |
| except (TypeError, ValueError): | |
| return None | |
| if not isinstance(parsed, dict): | |
| return None | |
| return _anonymize_judge(parsed) | |
| def _anonymize_judge(verdict: dict | None) -> dict | None: | |
| if not verdict: | |
| return None | |
| return { | |
| "ok": bool(verdict.get("ok")), | |
| "issues": [str(i)[:200] for i in (verdict.get("issues") or [])], | |
| "score": float(verdict.get("score") or 0.0), | |
| "verdict": str(verdict.get("verdict") or ""), | |
| "reasoning": str(verdict.get("reasoning") or "")[:300], | |
| } | |
| def _anonymize_draft(draft: dict) -> dict: | |
| """Drop the child's name from the final draft if it slipped in.""" | |
| out = {} | |
| for k in ("opener", "body", "closer", "followup"): | |
| out[k] = _scrub_name((draft.get(k) or "").strip()) | |
| return out | |
| # --- Trace record ---------------------------------------------------------- | |
| class TraceRecord: | |
| """A single anonymized agent trace, ready to publish.""" | |
| trace_id: str | |
| created_at: str | |
| model_versions: dict[str, str] | |
| request: dict | |
| agent: dict | |
| judge: dict | None | |
| latency_ms: int | |
| tool_calls: int | |
| app_version: str | |
| schema_version: int = 1 | |
| def to_json(self) -> str: | |
| """Render the record as a single JSON object string.""" | |
| return json.dumps(asdict(self), ensure_ascii=False) | |
| # --- Capture API ----------------------------------------------------------- | |
| def build_trace_record( | |
| *, | |
| req: Any, | |
| user_prompt: str, | |
| system_prompt: str, | |
| messages: list, | |
| final_draft: dict, | |
| judge_verdict: dict | None, | |
| latency_ms: int, | |
| app_version: str = "fabella-1", | |
| ) -> TraceRecord: | |
| """Assemble a TraceRecord from the agent's inputs and outputs. | |
| Called from ``agent.py::run_agent`` AFTER the loop completes. The | |
| caller is responsible for passing the live messages list and the | |
| parsed final draft dict. | |
| """ | |
| tool_calls = sum( | |
| 1 | |
| for m in messages | |
| if getattr(m, "type", "") == "ai" and getattr(m, "tool_calls", None) | |
| ) | |
| return TraceRecord( | |
| trace_id=str(uuid.uuid4()), | |
| created_at=datetime.now(timezone.utc).isoformat(), | |
| model_versions={ | |
| "drafter": "google/gemma-4-E4B-it", | |
| "judge": "nvidia/NVIDIA-Nemotron-3-Nano-4B-BF16", | |
| }, | |
| request=_anonymize_request( | |
| { | |
| "age": getattr(req, "age", 0), | |
| "tone": getattr(req, "tone", ""), | |
| "child_name": getattr(req, "child_name", ""), | |
| "situation": getattr(req, "situation", ""), | |
| "history": getattr(req, "history", []) or [], | |
| } | |
| ), | |
| agent={ | |
| "system_prompt": system_prompt, | |
| "user_prompt": _scrub_situation_from_prompt(_scrub_name(user_prompt)), | |
| "messages": _anonymize_messages(messages), | |
| "final_draft": _anonymize_draft(final_draft), | |
| }, | |
| judge=( | |
| _anonymize_judge(judge_verdict) | |
| if judge_verdict is not None | |
| else _extract_judge_verdict(messages) | |
| ), | |
| latency_ms=int(latency_ms), | |
| tool_calls=tool_calls, | |
| app_version=app_version, | |
| ) | |
| # --- Publisher (background buffer + flush) -------------------------------- | |
| class TracePublisher: | |
| """Thread-safe in-memory buffer that flushes trace rows to the Hub. | |
| The publisher is started once at app import time. ``submit()`` is | |
| non-blocking (it just appends to the buffer). The background thread | |
| flushes on a timer AND when the buffer crosses ``FLUSH_BUFFER_SIZE``, | |
| so a burst of traffic doesn't sit in memory for the full interval. | |
| All flushes are serialized through ``_flush_lock`` so two threads can | |
| never push overlapping Hub revisions. Each row is uploaded as its own | |
| ``data/<trace_id>.json`` file via a single ``create_commit`` per flush | |
| batch, so concurrent flushes (or >1 Space replica) cannot lose rows | |
| to a read-modify-write race on a shared JSONL file. | |
| """ | |
| def __init__(self) -> None: | |
| self._buffer: list[TraceRecord] = [] | |
| self._lock = threading.Lock() | |
| self._flush_lock = threading.Lock() | |
| self._last_flush = time.monotonic() | |
| # HF Spaces inject the token as HF_TOKEN. Some runtimes use HF_HUB_TOKEN | |
| # instead; accept either so a misconfigured Space does not silently | |
| # disable trace publishing. | |
| self._hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HF_HUB_TOKEN") or "" | |
| self._enabled = SHARE_TRACES and bool(self._hf_token) | |
| self._stop = threading.Event() | |
| self._thread: threading.Thread | None = None | |
| # How many rows we will buffer before refusing new submissions. Set | |
| # high enough to ride out a transient Hub failure, low enough to | |
| # avoid leaking memory if the Hub is permanently unreachable. | |
| self._max_buffer = max(FLUSH_BUFFER_SIZE * 4, 50) | |
| # Used to throttle repeated failure logs to one ERROR per N attempts. | |
| self._consecutive_failures = 0 | |
| def enabled(self) -> bool: | |
| return self._enabled | |
| def start(self) -> None: | |
| if not SHARE_TRACES: | |
| log.info("[traces] publishing disabled (FABELLA_SHARE_TRACES=0)") | |
| return | |
| if not self._hf_token: | |
| # Loud at WARNING so the Space logs make the missing-token case | |
| # obvious. The previous version's INFO-only line made it look | |
| # like an intentional no-op. | |
| log.warning( | |
| "[traces] publishing DISABLED: HF_TOKEN (or HF_HUB_TOKEN) is " | |
| "not set on this Space. Trace rows will be dropped on the floor. " | |
| "Set HF_TOKEN to the Space owner's write token to enable capture." | |
| ) | |
| return | |
| if self._thread is not None and self._thread.is_alive(): | |
| return | |
| # Ensure the dataset repo exists, then run a write+delete probe so a | |
| # misconfigured Space (bad token, read-only token, no create rights) | |
| # fails LOUDLY at import time instead of silently dropping rows for | |
| # hours. If either step fails we disable capture for the lifetime of | |
| # the process; the per-parent self-export path in app.py keeps | |
| # working regardless. | |
| if not self._ensure_repo(): | |
| self._enabled = False | |
| return | |
| if not self._probe(): | |
| self._enabled = False | |
| return | |
| self._stop.clear() | |
| self._thread = threading.Thread( | |
| target=self._run, name="fabella-trace-flusher", daemon=True | |
| ) | |
| self._thread.start() | |
| log.info( | |
| f"[traces] publisher started, repo={DATASET_REPO} " | |
| f"flush_size={FLUSH_BUFFER_SIZE} flush_interval_s={FLUSH_INTERVAL_S} " | |
| f"max_buffer={self._max_buffer}" | |
| ) | |
| def stop(self) -> None: | |
| self._stop.set() | |
| def submit(self, record: TraceRecord) -> None: | |
| """Non-blocking. Drops the record if publishing is disabled or full.""" | |
| if not self._enabled: | |
| return | |
| with self._lock: | |
| if len(self._buffer) >= self._max_buffer: | |
| # The Hub has been failing for a while and the buffer is | |
| # full. Drop the oldest rows so a recovered Hub push only | |
| # loses ancient data, not fresh rows. | |
| dropped = len(self._buffer) - self._max_buffer + 1 | |
| del self._buffer[:dropped] | |
| log.warning( | |
| f"[traces] buffer full ({self._max_buffer}); dropped " | |
| f"{dropped} oldest row(s) while Hub push is failing" | |
| ) | |
| self._buffer.append(record) | |
| should_flush = len(self._buffer) >= FLUSH_BUFFER_SIZE | |
| if should_flush: | |
| self._flush_async() | |
| def _run(self) -> None: | |
| while not self._stop.is_set(): | |
| # Sleep in small slices so stop() returns promptly. | |
| slept = 0.0 | |
| while slept < FLUSH_INTERVAL_S and not self._stop.is_set(): | |
| self._stop.wait(min(1.0, FLUSH_INTERVAL_S - slept)) | |
| slept = FLUSH_INTERVAL_S if self._stop.is_set() else slept + 1.0 | |
| if self._stop.is_set(): | |
| break | |
| self._flush() | |
| def _flush_async(self) -> None: | |
| threading.Thread(target=self._flush, daemon=True).start() | |
| def _flush(self) -> None: | |
| if not self._enabled: | |
| return | |
| with self._flush_lock: | |
| with self._lock: | |
| if not self._buffer: | |
| return | |
| rows = self._buffer | |
| self._buffer = [] | |
| try: | |
| self._push_to_hub(rows) | |
| self._last_flush = time.monotonic() | |
| self._consecutive_failures = 0 | |
| log.info(f"[traces] flushed {len(rows)} rows to {DATASET_REPO}") | |
| except Exception as e: | |
| self._consecutive_failures += 1 | |
| # Don't lose data on transient failures. Put the rows back | |
| # at the head of the buffer so the next flush retries. Log | |
| # at WARNING the first few times, then ERROR on every 10th | |
| # so a stuck Hub does not spam the Space log. | |
| with self._lock: | |
| self._buffer = rows + self._buffer | |
| if self._consecutive_failures <= 3 or self._consecutive_failures % 10 == 0: | |
| log.log( | |
| logging.ERROR if self._consecutive_failures > 3 else logging.WARNING, | |
| f"[traces] push failed " | |
| f"({self._consecutive_failures} consecutive): " | |
| f"{type(e).__name__}: {e}; re-queuing {len(rows)} rows. " | |
| f"Check that the Space's HF_TOKEN has write access to " | |
| f"'{DATASET_REPO}'.", | |
| ) | |
| def _commit(self, operations, message, description=None) -> None: | |
| """Single-call wrapper around ``HfApi.create_commit`` for the dataset repo. | |
| All Hub writes from the publisher go through this helper so the | |
| ``commit_message`` / ``commit_description`` format and the | |
| ``repo_id`` / ``repo_type`` kwargs stay consistent across the | |
| per-row flush and the startup probe. If we ever need to add | |
| e.g. ``revision`` or a custom user-agent, we change one place. | |
| """ | |
| from huggingface_hub import HfApi | |
| kwargs = {"commit_description": description} if description else {} | |
| HfApi(token=self._hf_token).create_commit( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| operations=list(operations), | |
| commit_message=message, | |
| **kwargs, | |
| ) | |
| def _push_to_hub(self, rows: list[TraceRecord]) -> None: | |
| """Upload a batch of rows as one commit with one operation per row. | |
| Each row is its own ``data/<trace_id>.json`` file, so the upload is | |
| race-free across Space replicas and across the timer-thread vs. | |
| size-triggered async flush within one process. One ``create_commit`` | |
| call wraps the whole batch as a single commit on the Hub, so the | |
| git history is one commit per flush (not one per row). | |
| """ | |
| from huggingface_hub import CommitOperationAdd | |
| operations = [ | |
| CommitOperationAdd( | |
| path_in_repo=f"{DATASET_DIR}/{r.trace_id}.json", | |
| path_or_fileobj=r.to_json().encode("utf-8"), | |
| ) | |
| for r in rows | |
| ] | |
| self._commit( | |
| operations, | |
| message=f"append {len(rows)} fabella trace row(s)", | |
| description=( | |
| f"Append {len(rows)} anonymized Fabella trace row(s). " | |
| f"Schema: fabella.trace.v1. No raw situation, no child name." | |
| ), | |
| ) | |
| def _ensure_repo(self) -> bool: | |
| """Create the dataset repo if it doesn't exist. Return True on success. | |
| A 401/403 here means the Space's ``HF_TOKEN`` doesn't have | |
| ``repo.create`` rights in the target namespace (e.g. it can read | |
| ``Kiy-K/fabella-traces`` but not create it). In that case the | |
| Space owner must pre-create the dataset and grant the token write | |
| access, OR override ``FABELLA_TRACE_REPO`` to a namespace where | |
| the token can create. We return ``False`` so the caller can | |
| disable capture loudly instead of failing every flush. | |
| """ | |
| from huggingface_hub import HfApi | |
| try: | |
| HfApi(token=self._hf_token).create_repo( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| private=False, | |
| ) | |
| return True | |
| except Exception as e: | |
| log.error( | |
| f"[traces] cannot create or access dataset repo " | |
| f"'{DATASET_REPO}': {type(e).__name__}: {e}. " | |
| f"Pre-create the dataset on the Hub and grant the Space's " | |
| f"HF_TOKEN write access, or set FABELLA_TRACE_REPO to a " | |
| f"namespace where the token can create repos. " | |
| f"Trace capture DISABLED for this process." | |
| ) | |
| return False | |
| def _probe(self) -> bool: | |
| """One-shot write+delete in a single atomic commit to verify Hub access. | |
| Catches (a) bad token, (b) missing repo-create rights, (c) missing | |
| repo-write rights — all at import time, so a misconfigured Space | |
| doesn't silently drop rows for hours before anyone notices. | |
| The Add and Delete of ``PROBE_PATH`` are bundled into a single | |
| ``create_commit`` call so the probe is atomic on the Hub: either | |
| the commit succeeds (probe is added then removed in one revision) | |
| or it fails (probe is never created). There is no intermediate | |
| state where a sentinel file sits in the public dataset. | |
| Returns True if the round-trip succeeded. | |
| """ | |
| from huggingface_hub import CommitOperationAdd, CommitOperationDelete | |
| import warnings | |
| payload = json.dumps( | |
| { | |
| "probe": True, | |
| "ts": datetime.now(timezone.utc).isoformat(), | |
| "pid": os.getpid(), | |
| } | |
| ).encode("utf-8") | |
| try: | |
| # The ``huggingface_hub`` SDK emits a ``UserWarning`` when an | |
| # Add and a Delete target the same path in one commit. We | |
| # intentionally rely on this pattern (atomic probe), so | |
| # silence just this warning around the call. The commit | |
| # itself still executes correctly. | |
| with warnings.catch_warnings(): | |
| warnings.filterwarnings( | |
| "ignore", | |
| message="About to delete a file that have just been updated", | |
| category=UserWarning, | |
| ) | |
| self._commit( | |
| [ | |
| CommitOperationAdd( | |
| path_in_repo=PROBE_PATH, | |
| path_or_fileobj=payload, | |
| ), | |
| CommitOperationDelete(path_in_repo=PROBE_PATH), | |
| ], | |
| message="fabella trace publisher startup probe", | |
| description=( | |
| "Atomic Add+Delete of .probe.json to verify the HF_TOKEN " | |
| "can commit to the dataset repo. The probe file is never " | |
| "left in the repo on success." | |
| ), | |
| ) | |
| log.info(f"[traces] probe OK against {DATASET_REPO}") | |
| return True | |
| except Exception as e: | |
| log.error( | |
| f"[traces] probe FAILED against {DATASET_REPO}: " | |
| f"{type(e).__name__}: {e}" | |
| ) | |
| return False | |
| # Module-level singleton. Imported by app.py; ``start()`` is called once at | |
| # import time below. | |
| publisher = TracePublisher() | |