"""Trace persistence + boot-time world rebuild + optional HF dataset sync. The single source of durable truth is ``traces/wishes.jsonl``: one JSON line per granted wish (the WishTrace of ARCHITECTURE.md, extended with a ``features`` array — the engine-clamped, seeded features the wish produced — so the world can be rebuilt exactly on boot: genesis + every trace's features, in order). Dataset sync (both persistence across Space restarts and the published agent-trace evidence): when ``HF_TOKEN`` and ``GODSEED_DATASET`` are set, wishes.jsonl is downloaded on boot if the local file is missing and uploaded after each granted wish. Every network error is swallowed and logged — the world must never die of a flaky connection. """ from __future__ import annotations import asyncio import json import logging import os import threading from pathlib import Path from typing import Any, Optional log = logging.getLogger("godseed.persistence") TRACES_FILENAME = "wishes.jsonl" def trace_features(trace: dict[str, Any]) -> list[dict[str, Any]]: """The full features stored on a trace line (``[]`` for lines without them).""" features = trace.get("features") if isinstance(features, list): return [f for f in features if isinstance(f, dict)] return [] def rebuild_features( genesis: list[dict[str, Any]], traces: list[dict[str, Any]] ) -> list[dict[str, Any]]: """Ordered feature list for boot: genesis + each trace's features, in order.""" features: list[dict[str, Any]] = [dict(f) for f in genesis] for trace in traces: found = trace_features(trace) if not found and trace.get("feature_ids"): # A pre-extension line: its features can't be replayed; the world simply # rebuilds without them rather than refusing to boot. log.warning( "trace %s carries feature_ids but no features; skipped in rebuild", trace.get("wish_id"), ) features.extend(found) return features class TraceStore: """Append-only JSONL store under ``traces_dir``. Thread-safe appends (uploads run in a worker thread; appends may too, depending on the engine).""" def __init__(self, traces_dir: Path | str, filename: str = TRACES_FILENAME) -> None: self.dir = Path(traces_dir) self.dir.mkdir(parents=True, exist_ok=True) self.path = self.dir / filename self._lock = threading.Lock() def append( self, trace: dict[str, Any], features: Optional[list[dict[str, Any]]] = None ) -> dict[str, Any]: """Write one trace line; returns the dict actually written (trace + features).""" line_obj = dict(trace) if features is not None: line_obj["features"] = features encoded = json.dumps(line_obj, ensure_ascii=False) with self._lock: with self.path.open("a", encoding="utf-8") as fh: fh.write(encoded + "\n") return line_obj def load(self) -> list[dict[str, Any]]: """All trace lines, in order. Corrupt lines are skipped with a warning (a half-written final line after a crash must not brick the boot).""" if not self.path.exists(): return [] traces: list[dict[str, Any]] = [] with self.path.open("r", encoding="utf-8") as fh: for lineno, raw in enumerate(fh, start=1): raw = raw.strip() if not raw: continue try: obj = json.loads(raw) except json.JSONDecodeError: log.warning("skipping corrupt trace line %d in %s", lineno, self.path) continue if isinstance(obj, dict): traces.append(obj) return traces class DatasetSync: """Mirror wishes.jsonl to/from an HF dataset repo. Inert unless both HF_TOKEN and GODSEED_DATASET are set; never raises for network problems.""" def __init__( self, store: TraceStore, token: Optional[str] = None, dataset: Optional[str] = None, ) -> None: self.store = store self.token = token if token is not None else os.environ.get("HF_TOKEN") self.dataset = dataset if dataset is not None else os.environ.get("GODSEED_DATASET") @property def enabled(self) -> bool: return bool(self.token and self.dataset) def download_if_missing(self) -> bool: """Fetch wishes.jsonl from the dataset when the local file doesn't exist (fresh Space boot). Returns True only if a file was restored.""" if not self.enabled or self.store.path.exists(): return False try: from huggingface_hub import hf_hub_download fetched = hf_hub_download( repo_id=self.dataset, repo_type="dataset", filename=self.store.path.name, token=self.token, ) self.store.path.write_bytes(Path(fetched).read_bytes()) log.info("restored %s from dataset %s", self.store.path.name, self.dataset) return True except Exception as exc: # noqa: BLE001 — sync must never crash the app log.warning("dataset download skipped (%s): %s", self.dataset, exc) return False def upload(self) -> bool: """Push the current wishes.jsonl to the dataset. Returns success; the next granted wish retries naturally on failure.""" if not self.enabled or not self.store.path.exists(): return False try: from huggingface_hub import HfApi HfApi(token=self.token).upload_file( path_or_fileobj=str(self.store.path), path_in_repo=self.store.path.name, repo_id=self.dataset, repo_type="dataset", commit_message="godseed: wish granted", ) return True except Exception as exc: # noqa: BLE001 — sync must never crash the app log.warning("dataset upload failed (next wish retries): %s", exc) return False class PersistenceService: """What the rest of the app talks to. Boot: ``features = service.boot(genesis_features)`` (download-if-missing + rebuild). Afterwards ``service.traces()`` serves the Genesis Log from memory — no file reads per request. Wish: ``await service.record(trace, features)`` — append + fire-and-forget dataset upload off the event loop. """ def __init__( self, traces_dir: Path | str, sync: Optional[DatasetSync] = None ) -> None: self.store = TraceStore(traces_dir) self.sync = sync if sync is not None else DatasetSync(self.store) self._traces: list[dict[str, Any]] = [] self._booted = False self._upload_tasks: set[asyncio.Task[Any]] = set() @property def path(self) -> Path: return self.store.path def boot(self, genesis: list[dict[str, Any]]) -> list[dict[str, Any]]: """Restore (if configured) + load traces + return the full ordered feature list (genesis + every trace's features).""" self.sync.download_if_missing() self._traces = self.store.load() self._booted = True return rebuild_features(genesis, self._traces) def traces(self) -> list[dict[str, Any]]: """All granted-wish traces in grant order (loads lazily if boot was skipped).""" if not self._booted: self._traces = self.store.load() self._booted = True return self._traces def find(self, wish_id: str) -> Optional[dict[str, Any]]: for trace in self.traces(): if trace.get("wish_id") == wish_id: return trace return None async def record( self, trace: dict[str, Any], features: Optional[list[dict[str, Any]]] = None ) -> dict[str, Any]: """Persist one granted wish, then upload to the dataset in the background. Never raises for sync problems; the append itself is the only hard step.""" line_obj = self.store.append(trace, features) self.traces().append(line_obj) if self.sync.enabled: task = asyncio.create_task(asyncio.to_thread(self.sync.upload)) # keep a strong reference so the task isn't garbage-collected mid-flight self._upload_tasks.add(task) task.add_done_callback(self._upload_tasks.discard) return line_obj async def drain(self) -> None: """Wait for in-flight uploads (shutdown hygiene).""" if self._upload_tasks: await asyncio.gather(*tuple(self._upload_tasks), return_exceptions=True)