Spaces:
Sleeping
Sleeping
| """Trace persistence + boot-time world rebuild + optional HF dataset sync. | |
| The single source of durable truth is ``traces/wishes.jsonl``: one JSON line per | |
| granted wish (the WishTrace of ARCHITECTURE.md, extended with a ``features`` array — | |
| the engine-clamped, seeded features the wish produced — so the world can be rebuilt | |
| exactly on boot: genesis + every trace's features, in order). | |
| Dataset sync (both persistence across Space restarts and the published agent-trace | |
| evidence): when ``HF_TOKEN`` and ``GODSEED_DATASET`` are set, wishes.jsonl is | |
| downloaded on boot if the local file is missing and uploaded after each granted | |
| wish. Every network error is swallowed and logged — the world must never die of a | |
| flaky connection. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import threading | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| log = logging.getLogger("godseed.persistence") | |
| TRACES_FILENAME = "wishes.jsonl" | |
| def trace_features(trace: dict[str, Any]) -> list[dict[str, Any]]: | |
| """The full features stored on a trace line (``[]`` for lines without them).""" | |
| features = trace.get("features") | |
| if isinstance(features, list): | |
| return [f for f in features if isinstance(f, dict)] | |
| return [] | |
| def rebuild_features( | |
| genesis: list[dict[str, Any]], traces: list[dict[str, Any]] | |
| ) -> list[dict[str, Any]]: | |
| """Ordered feature list for boot: genesis + each trace's features, in order.""" | |
| features: list[dict[str, Any]] = [dict(f) for f in genesis] | |
| for trace in traces: | |
| found = trace_features(trace) | |
| if not found and trace.get("feature_ids"): | |
| # A pre-extension line: its features can't be replayed; the world simply | |
| # rebuilds without them rather than refusing to boot. | |
| log.warning( | |
| "trace %s carries feature_ids but no features; skipped in rebuild", | |
| trace.get("wish_id"), | |
| ) | |
| features.extend(found) | |
| return features | |
| class TraceStore: | |
| """Append-only JSONL store under ``traces_dir``. Thread-safe appends (uploads | |
| run in a worker thread; appends may too, depending on the engine).""" | |
| def __init__(self, traces_dir: Path | str, filename: str = TRACES_FILENAME) -> None: | |
| self.dir = Path(traces_dir) | |
| self.dir.mkdir(parents=True, exist_ok=True) | |
| self.path = self.dir / filename | |
| self._lock = threading.Lock() | |
| def append( | |
| self, trace: dict[str, Any], features: Optional[list[dict[str, Any]]] = None | |
| ) -> dict[str, Any]: | |
| """Write one trace line; returns the dict actually written (trace + features).""" | |
| line_obj = dict(trace) | |
| if features is not None: | |
| line_obj["features"] = features | |
| encoded = json.dumps(line_obj, ensure_ascii=False) | |
| with self._lock: | |
| with self.path.open("a", encoding="utf-8") as fh: | |
| fh.write(encoded + "\n") | |
| return line_obj | |
| def load(self) -> list[dict[str, Any]]: | |
| """All trace lines, in order. Corrupt lines are skipped with a warning (a | |
| half-written final line after a crash must not brick the boot).""" | |
| if not self.path.exists(): | |
| return [] | |
| traces: list[dict[str, Any]] = [] | |
| with self.path.open("r", encoding="utf-8") as fh: | |
| for lineno, raw in enumerate(fh, start=1): | |
| raw = raw.strip() | |
| if not raw: | |
| continue | |
| try: | |
| obj = json.loads(raw) | |
| except json.JSONDecodeError: | |
| log.warning("skipping corrupt trace line %d in %s", lineno, self.path) | |
| continue | |
| if isinstance(obj, dict): | |
| traces.append(obj) | |
| return traces | |
| class DatasetSync: | |
| """Mirror wishes.jsonl to/from an HF dataset repo. Inert unless both HF_TOKEN | |
| and GODSEED_DATASET are set; never raises for network problems.""" | |
| def __init__( | |
| self, | |
| store: TraceStore, | |
| token: Optional[str] = None, | |
| dataset: Optional[str] = None, | |
| ) -> None: | |
| self.store = store | |
| self.token = token if token is not None else os.environ.get("HF_TOKEN") | |
| self.dataset = dataset if dataset is not None else os.environ.get("GODSEED_DATASET") | |
| def enabled(self) -> bool: | |
| return bool(self.token and self.dataset) | |
| def download_if_missing(self) -> bool: | |
| """Fetch wishes.jsonl from the dataset when the local file doesn't exist | |
| (fresh Space boot). Returns True only if a file was restored.""" | |
| if not self.enabled or self.store.path.exists(): | |
| return False | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| fetched = hf_hub_download( | |
| repo_id=self.dataset, | |
| repo_type="dataset", | |
| filename=self.store.path.name, | |
| token=self.token, | |
| ) | |
| self.store.path.write_bytes(Path(fetched).read_bytes()) | |
| log.info("restored %s from dataset %s", self.store.path.name, self.dataset) | |
| return True | |
| except Exception as exc: # noqa: BLE001 — sync must never crash the app | |
| log.warning("dataset download skipped (%s): %s", self.dataset, exc) | |
| return False | |
| def upload(self) -> bool: | |
| """Push the current wishes.jsonl to the dataset. Returns success; the next | |
| granted wish retries naturally on failure.""" | |
| if not self.enabled or not self.store.path.exists(): | |
| return False | |
| try: | |
| from huggingface_hub import HfApi | |
| HfApi(token=self.token).upload_file( | |
| path_or_fileobj=str(self.store.path), | |
| path_in_repo=self.store.path.name, | |
| repo_id=self.dataset, | |
| repo_type="dataset", | |
| commit_message="godseed: wish granted", | |
| ) | |
| return True | |
| except Exception as exc: # noqa: BLE001 — sync must never crash the app | |
| log.warning("dataset upload failed (next wish retries): %s", exc) | |
| return False | |
| class PersistenceService: | |
| """What the rest of the app talks to. | |
| Boot: ``features = service.boot(genesis_features)`` (download-if-missing + | |
| rebuild). Afterwards ``service.traces()`` serves the Genesis Log from | |
| memory — no file reads per request. | |
| Wish: ``await service.record(trace, features)`` — append + fire-and-forget | |
| dataset upload off the event loop. | |
| """ | |
| def __init__( | |
| self, traces_dir: Path | str, sync: Optional[DatasetSync] = None | |
| ) -> None: | |
| self.store = TraceStore(traces_dir) | |
| self.sync = sync if sync is not None else DatasetSync(self.store) | |
| self._traces: list[dict[str, Any]] = [] | |
| self._booted = False | |
| self._upload_tasks: set[asyncio.Task[Any]] = set() | |
| def path(self) -> Path: | |
| return self.store.path | |
| def boot(self, genesis: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Restore (if configured) + load traces + return the full ordered feature | |
| list (genesis + every trace's features).""" | |
| self.sync.download_if_missing() | |
| self._traces = self.store.load() | |
| self._booted = True | |
| return rebuild_features(genesis, self._traces) | |
| def traces(self) -> list[dict[str, Any]]: | |
| """All granted-wish traces in grant order (loads lazily if boot was skipped).""" | |
| if not self._booted: | |
| self._traces = self.store.load() | |
| self._booted = True | |
| return self._traces | |
| def find(self, wish_id: str) -> Optional[dict[str, Any]]: | |
| for trace in self.traces(): | |
| if trace.get("wish_id") == wish_id: | |
| return trace | |
| return None | |
| async def record( | |
| self, trace: dict[str, Any], features: Optional[list[dict[str, Any]]] = None | |
| ) -> dict[str, Any]: | |
| """Persist one granted wish, then upload to the dataset in the background. | |
| Never raises for sync problems; the append itself is the only hard step.""" | |
| line_obj = self.store.append(trace, features) | |
| self.traces().append(line_obj) | |
| if self.sync.enabled: | |
| task = asyncio.create_task(asyncio.to_thread(self.sync.upload)) | |
| # keep a strong reference so the task isn't garbage-collected mid-flight | |
| self._upload_tasks.add(task) | |
| task.add_done_callback(self._upload_tasks.discard) | |
| return line_obj | |
| async def drain(self) -> None: | |
| """Wait for in-flight uploads (shutdown hygiene).""" | |
| if self._upload_tasks: | |
| await asyncio.gather(*tuple(self._upload_tasks), return_exceptions=True) | |