godseed / server /persistence.py
AndresCarreon's picture
GODSEED v0
0594db3 verified
Raw
History Blame Contribute Delete
8.82 kB
"""Trace persistence + boot-time world rebuild + optional HF dataset sync.
The single source of durable truth is ``traces/wishes.jsonl``: one JSON line per
granted wish (the WishTrace of ARCHITECTURE.md, extended with a ``features`` array —
the engine-clamped, seeded features the wish produced — so the world can be rebuilt
exactly on boot: genesis + every trace's features, in order).
Dataset sync (both persistence across Space restarts and the published agent-trace
evidence): when ``HF_TOKEN`` and ``GODSEED_DATASET`` are set, wishes.jsonl is
downloaded on boot if the local file is missing and uploaded after each granted
wish. Every network error is swallowed and logged — the world must never die of a
flaky connection.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import threading
from pathlib import Path
from typing import Any, Optional
log = logging.getLogger("godseed.persistence")
TRACES_FILENAME = "wishes.jsonl"
def trace_features(trace: dict[str, Any]) -> list[dict[str, Any]]:
"""The full features stored on a trace line (``[]`` for lines without them)."""
features = trace.get("features")
if isinstance(features, list):
return [f for f in features if isinstance(f, dict)]
return []
def rebuild_features(
genesis: list[dict[str, Any]], traces: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Ordered feature list for boot: genesis + each trace's features, in order."""
features: list[dict[str, Any]] = [dict(f) for f in genesis]
for trace in traces:
found = trace_features(trace)
if not found and trace.get("feature_ids"):
# A pre-extension line: its features can't be replayed; the world simply
# rebuilds without them rather than refusing to boot.
log.warning(
"trace %s carries feature_ids but no features; skipped in rebuild",
trace.get("wish_id"),
)
features.extend(found)
return features
class TraceStore:
"""Append-only JSONL store under ``traces_dir``. Thread-safe appends (uploads
run in a worker thread; appends may too, depending on the engine)."""
def __init__(self, traces_dir: Path | str, filename: str = TRACES_FILENAME) -> None:
self.dir = Path(traces_dir)
self.dir.mkdir(parents=True, exist_ok=True)
self.path = self.dir / filename
self._lock = threading.Lock()
def append(
self, trace: dict[str, Any], features: Optional[list[dict[str, Any]]] = None
) -> dict[str, Any]:
"""Write one trace line; returns the dict actually written (trace + features)."""
line_obj = dict(trace)
if features is not None:
line_obj["features"] = features
encoded = json.dumps(line_obj, ensure_ascii=False)
with self._lock:
with self.path.open("a", encoding="utf-8") as fh:
fh.write(encoded + "\n")
return line_obj
def load(self) -> list[dict[str, Any]]:
"""All trace lines, in order. Corrupt lines are skipped with a warning (a
half-written final line after a crash must not brick the boot)."""
if not self.path.exists():
return []
traces: list[dict[str, Any]] = []
with self.path.open("r", encoding="utf-8") as fh:
for lineno, raw in enumerate(fh, start=1):
raw = raw.strip()
if not raw:
continue
try:
obj = json.loads(raw)
except json.JSONDecodeError:
log.warning("skipping corrupt trace line %d in %s", lineno, self.path)
continue
if isinstance(obj, dict):
traces.append(obj)
return traces
class DatasetSync:
"""Mirror wishes.jsonl to/from an HF dataset repo. Inert unless both HF_TOKEN
and GODSEED_DATASET are set; never raises for network problems."""
def __init__(
self,
store: TraceStore,
token: Optional[str] = None,
dataset: Optional[str] = None,
) -> None:
self.store = store
self.token = token if token is not None else os.environ.get("HF_TOKEN")
self.dataset = dataset if dataset is not None else os.environ.get("GODSEED_DATASET")
@property
def enabled(self) -> bool:
return bool(self.token and self.dataset)
def download_if_missing(self) -> bool:
"""Fetch wishes.jsonl from the dataset when the local file doesn't exist
(fresh Space boot). Returns True only if a file was restored."""
if not self.enabled or self.store.path.exists():
return False
try:
from huggingface_hub import hf_hub_download
fetched = hf_hub_download(
repo_id=self.dataset,
repo_type="dataset",
filename=self.store.path.name,
token=self.token,
)
self.store.path.write_bytes(Path(fetched).read_bytes())
log.info("restored %s from dataset %s", self.store.path.name, self.dataset)
return True
except Exception as exc: # noqa: BLE001 — sync must never crash the app
log.warning("dataset download skipped (%s): %s", self.dataset, exc)
return False
def upload(self) -> bool:
"""Push the current wishes.jsonl to the dataset. Returns success; the next
granted wish retries naturally on failure."""
if not self.enabled or not self.store.path.exists():
return False
try:
from huggingface_hub import HfApi
HfApi(token=self.token).upload_file(
path_or_fileobj=str(self.store.path),
path_in_repo=self.store.path.name,
repo_id=self.dataset,
repo_type="dataset",
commit_message="godseed: wish granted",
)
return True
except Exception as exc: # noqa: BLE001 — sync must never crash the app
log.warning("dataset upload failed (next wish retries): %s", exc)
return False
class PersistenceService:
"""What the rest of the app talks to.
Boot: ``features = service.boot(genesis_features)`` (download-if-missing +
rebuild). Afterwards ``service.traces()`` serves the Genesis Log from
memory — no file reads per request.
Wish: ``await service.record(trace, features)`` — append + fire-and-forget
dataset upload off the event loop.
"""
def __init__(
self, traces_dir: Path | str, sync: Optional[DatasetSync] = None
) -> None:
self.store = TraceStore(traces_dir)
self.sync = sync if sync is not None else DatasetSync(self.store)
self._traces: list[dict[str, Any]] = []
self._booted = False
self._upload_tasks: set[asyncio.Task[Any]] = set()
@property
def path(self) -> Path:
return self.store.path
def boot(self, genesis: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Restore (if configured) + load traces + return the full ordered feature
list (genesis + every trace's features)."""
self.sync.download_if_missing()
self._traces = self.store.load()
self._booted = True
return rebuild_features(genesis, self._traces)
def traces(self) -> list[dict[str, Any]]:
"""All granted-wish traces in grant order (loads lazily if boot was skipped)."""
if not self._booted:
self._traces = self.store.load()
self._booted = True
return self._traces
def find(self, wish_id: str) -> Optional[dict[str, Any]]:
for trace in self.traces():
if trace.get("wish_id") == wish_id:
return trace
return None
async def record(
self, trace: dict[str, Any], features: Optional[list[dict[str, Any]]] = None
) -> dict[str, Any]:
"""Persist one granted wish, then upload to the dataset in the background.
Never raises for sync problems; the append itself is the only hard step."""
line_obj = self.store.append(trace, features)
self.traces().append(line_obj)
if self.sync.enabled:
task = asyncio.create_task(asyncio.to_thread(self.sync.upload))
# keep a strong reference so the task isn't garbage-collected mid-flight
self._upload_tasks.add(task)
task.add_done_callback(self._upload_tasks.discard)
return line_obj
async def drain(self) -> None:
"""Wait for in-flight uploads (shutdown hygiene)."""
if self._upload_tasks:
await asyncio.gather(*tuple(self._upload_tasks), return_exceptions=True)