"""Snapshot persistence for the live world. The simulation keeps all of its state in memory (:class:`WorldState` plus the :class:`PlayerManager` registry). On Hugging Face Spaces the process is reset on rebuilds, inactivity sleep, and crashes, so to survive a restart we dump a full snapshot to a mounted bucket after every tick and reload it on boot. Two halves: * **Dump** is cheap -- everything is already JSON-native dataclasses, so :meth:`WorldState.to_dict` plus :meth:`PlayerManager.snapshot` is enough. * **Load** needs reconstruction, which :func:`world_from_dict` provides via a generic recursive rebuild over dataclass fields and their type hints. Writes go through :class:`BackgroundSnapshotWriter` so the (slow, network) bucket write never happens while the runtime lock is held. Slots are written ping-pong so a torn write can never destroy the most recent good snapshot. """ from __future__ import annotations import json import os import threading import types import typing from dataclasses import fields, is_dataclass from functools import cache from pathlib import Path from typing import Any from world_simulator.domain import WorldState SNAPSHOT_VERSION = 1 _SLOT_NAMES = ("world_state.0.json", "world_state.1.json") # -- reconstruction ------------------------------------------------------- # @cache def _type_hints(cls: type) -> dict[str, Any]: return typing.get_type_hints(cls) def _reconstruct(tp: Any, value: Any) -> Any: """Rebuild ``value`` (from JSON) into an instance of declared type ``tp``.""" if value is None: return None if is_dataclass(tp) and isinstance(tp, type): hints = _type_hints(tp) kwargs = { field.name: _reconstruct(hints.get(field.name, Any), value[field.name]) for field in fields(tp) if field.name in value } return tp(**kwargs) origin = typing.get_origin(tp) if origin in (typing.Union, types.UnionType): # The model only uses ``X | None``; rebuild as the single concrete arm. args = [arm for arm in typing.get_args(tp) if arm is not type(None)] if len(args) == 1: return _reconstruct(args[0], value) return value if origin is list: list_args = typing.get_args(tp) elem = list_args[0] if list_args else Any return [_reconstruct(elem, item) for item in value] # dict / Literal / primitives / Any are stored verbatim and need no rebuild. return value def world_from_dict(data: dict[str, Any]) -> WorldState: """Rebuild a :class:`WorldState` from the dict produced by ``to_dict``.""" world = _reconstruct(WorldState, data) assert isinstance(world, WorldState) return world def build_snapshot(world: WorldState, players_state: dict[str, Any]) -> dict[str, Any]: return { "version": SNAPSHOT_VERSION, "tick": world.tick, "world": world.to_dict(), "players": players_state, } def restore_snapshot(snapshot: dict[str, Any]) -> tuple[WorldState, dict[str, Any]]: """Return ``(world, players_state)`` from a loaded snapshot envelope.""" world = world_from_dict(snapshot["world"]) players_state = snapshot.get("players") or {} return world, players_state # -- storage -------------------------------------------------------------- # class SnapshotStore: """Two-slot snapshot file on disk (typically a mounted bucket). Writes always overwrite the *older* slot, so the freshest good snapshot is never the one being clobbered -- a process killed mid-write loses at most the in-flight slot, and :meth:`load` falls back to the other one. """ def __init__(self, directory: Path) -> None: self._dir = directory self._slot_ticks = [-1, -1] def load(self) -> dict[str, Any] | None: best: tuple[int, dict[str, Any]] | None = None for index, name in enumerate(_SLOT_NAMES): data = self._read(self._dir / name) if data is None: continue tick = data.get("tick", -1) tick = tick if isinstance(tick, int) else -1 self._slot_ticks[index] = tick if best is None or tick > best[0]: best = (tick, data) return best[1] if best else None def clear(self) -> None: """Delete both slot files and forget their ticks. Missing files are ignored so a clear on a never-persisted world is a no-op. After this, :meth:`load` returns ``None`` and the next :meth:`save` starts the ping-pong from slot 0 again. """ for name in _SLOT_NAMES: try: (self._dir / name).unlink() except FileNotFoundError: pass except OSError as exc: print(f"world snapshot clear failed for {name}: {exc}", flush=True) self._slot_ticks = [-1, -1] def save(self, snapshot: dict[str, Any]) -> None: target = 0 if self._slot_ticks[0] <= self._slot_ticks[1] else 1 path = self._dir / _SLOT_NAMES[target] self._dir.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(f".tmp.{os.getpid()}") with open(tmp, "w", encoding="utf-8") as handle: handle.write(json.dumps(snapshot)) handle.flush() try: os.fsync(handle.fileno()) except OSError: # Some FUSE-mounted buckets do not support fsync; the os.replace # below still gives us a best-effort all-or-nothing publish. pass os.replace(tmp, path) tick = snapshot.get("tick", -1) self._slot_ticks[target] = tick if isinstance(tick, int) else -1 try: size = path.stat().st_size except OSError: size = -1 print( f"[persistence] wrote snapshot tick={tick} -> {path} ({size} bytes)", flush=True, ) @staticmethod def _read(path: Path) -> dict[str, Any] | None: try: with open(path, encoding="utf-8") as handle: data = json.load(handle) except (OSError, ValueError): return None return data if isinstance(data, dict) else None class BackgroundSnapshotWriter: """Coalescing writer: keeps only the latest pending snapshot and writes it off the runtime lock on a daemon thread.""" def __init__(self, store: SnapshotStore) -> None: self._store = store self._cond = threading.Condition() self._pending: dict[str, Any] | None = None self._closed = False self._thread = threading.Thread( target=self._run, name="world-snapshot-writer", daemon=True ) self._thread.start() def submit(self, snapshot: dict[str, Any]) -> None: with self._cond: self._pending = snapshot self._cond.notify() def drop_pending(self) -> None: """Discard any not-yet-written snapshot so a reset can't be undone by a stale snapshot submitted just before it.""" with self._cond: self._pending = None def _run(self) -> None: while True: with self._cond: while self._pending is None and not self._closed: self._cond.wait() if self._pending is None: return snapshot = self._pending self._pending = None try: self._store.save(snapshot) except Exception as exc: print(f"[persistence] snapshot write FAILED: {exc!r}", flush=True) def close(self, timeout: float = 5.0) -> None: with self._cond: self._closed = True self._cond.notify() self._thread.join(timeout)