Spaces:
Runtime error
Runtime error
| """Snapshot persistence for the live world. | |
| The simulation keeps all of its state in memory (:class:`WorldState` plus the | |
| :class:`PlayerManager` registry). On Hugging Face Spaces the process is reset on | |
| rebuilds, inactivity sleep, and crashes, so to survive a restart we dump a full | |
| snapshot to a mounted bucket after every tick and reload it on boot. | |
| Two halves: | |
| * **Dump** is cheap -- everything is already JSON-native dataclasses, so | |
| :meth:`WorldState.to_dict` plus :meth:`PlayerManager.snapshot` is enough. | |
| * **Load** needs reconstruction, which :func:`world_from_dict` provides via a | |
| generic recursive rebuild over dataclass fields and their type hints. | |
| Writes go through :class:`BackgroundSnapshotWriter` so the (slow, network) | |
| bucket write never happens while the runtime lock is held. Slots are written | |
| ping-pong so a torn write can never destroy the most recent good snapshot. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import threading | |
| import types | |
| import typing | |
| from dataclasses import fields, is_dataclass | |
| from functools import cache | |
| from pathlib import Path | |
| from typing import Any | |
| from world_simulator.domain import WorldState | |
| SNAPSHOT_VERSION = 1 | |
| _SLOT_NAMES = ("world_state.0.json", "world_state.1.json") | |
| # -- reconstruction ------------------------------------------------------- # | |
| def _type_hints(cls: type) -> dict[str, Any]: | |
| return typing.get_type_hints(cls) | |
| def _reconstruct(tp: Any, value: Any) -> Any: | |
| """Rebuild ``value`` (from JSON) into an instance of declared type ``tp``.""" | |
| if value is None: | |
| return None | |
| if is_dataclass(tp) and isinstance(tp, type): | |
| hints = _type_hints(tp) | |
| kwargs = { | |
| field.name: _reconstruct(hints.get(field.name, Any), value[field.name]) | |
| for field in fields(tp) | |
| if field.name in value | |
| } | |
| return tp(**kwargs) | |
| origin = typing.get_origin(tp) | |
| if origin in (typing.Union, types.UnionType): | |
| # The model only uses ``X | None``; rebuild as the single concrete arm. | |
| args = [arm for arm in typing.get_args(tp) if arm is not type(None)] | |
| if len(args) == 1: | |
| return _reconstruct(args[0], value) | |
| return value | |
| if origin is list: | |
| list_args = typing.get_args(tp) | |
| elem = list_args[0] if list_args else Any | |
| return [_reconstruct(elem, item) for item in value] | |
| # dict / Literal / primitives / Any are stored verbatim and need no rebuild. | |
| return value | |
| def world_from_dict(data: dict[str, Any]) -> WorldState: | |
| """Rebuild a :class:`WorldState` from the dict produced by ``to_dict``.""" | |
| world = _reconstruct(WorldState, data) | |
| assert isinstance(world, WorldState) | |
| return world | |
| def build_snapshot(world: WorldState, players_state: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "version": SNAPSHOT_VERSION, | |
| "tick": world.tick, | |
| "world": world.to_dict(), | |
| "players": players_state, | |
| } | |
| def restore_snapshot(snapshot: dict[str, Any]) -> tuple[WorldState, dict[str, Any]]: | |
| """Return ``(world, players_state)`` from a loaded snapshot envelope.""" | |
| world = world_from_dict(snapshot["world"]) | |
| players_state = snapshot.get("players") or {} | |
| return world, players_state | |
| # -- storage -------------------------------------------------------------- # | |
| class SnapshotStore: | |
| """Two-slot snapshot file on disk (typically a mounted bucket). | |
| Writes always overwrite the *older* slot, so the freshest good snapshot is | |
| never the one being clobbered -- a process killed mid-write loses at most | |
| the in-flight slot, and :meth:`load` falls back to the other one. | |
| """ | |
| def __init__(self, directory: Path) -> None: | |
| self._dir = directory | |
| self._slot_ticks = [-1, -1] | |
| def load(self) -> dict[str, Any] | None: | |
| best: tuple[int, dict[str, Any]] | None = None | |
| for index, name in enumerate(_SLOT_NAMES): | |
| data = self._read(self._dir / name) | |
| if data is None: | |
| continue | |
| tick = data.get("tick", -1) | |
| tick = tick if isinstance(tick, int) else -1 | |
| self._slot_ticks[index] = tick | |
| if best is None or tick > best[0]: | |
| best = (tick, data) | |
| return best[1] if best else None | |
| def clear(self) -> None: | |
| """Delete both slot files and forget their ticks. | |
| Missing files are ignored so a clear on a never-persisted world is a | |
| no-op. After this, :meth:`load` returns ``None`` and the next | |
| :meth:`save` starts the ping-pong from slot 0 again. | |
| """ | |
| for name in _SLOT_NAMES: | |
| try: | |
| (self._dir / name).unlink() | |
| except FileNotFoundError: | |
| pass | |
| except OSError as exc: | |
| print(f"world snapshot clear failed for {name}: {exc}", flush=True) | |
| self._slot_ticks = [-1, -1] | |
| def save(self, snapshot: dict[str, Any]) -> None: | |
| target = 0 if self._slot_ticks[0] <= self._slot_ticks[1] else 1 | |
| path = self._dir / _SLOT_NAMES[target] | |
| self._dir.mkdir(parents=True, exist_ok=True) | |
| tmp = path.with_suffix(f".tmp.{os.getpid()}") | |
| with open(tmp, "w", encoding="utf-8") as handle: | |
| handle.write(json.dumps(snapshot)) | |
| handle.flush() | |
| try: | |
| os.fsync(handle.fileno()) | |
| except OSError: | |
| # Some FUSE-mounted buckets do not support fsync; the os.replace | |
| # below still gives us a best-effort all-or-nothing publish. | |
| pass | |
| os.replace(tmp, path) | |
| tick = snapshot.get("tick", -1) | |
| self._slot_ticks[target] = tick if isinstance(tick, int) else -1 | |
| try: | |
| size = path.stat().st_size | |
| except OSError: | |
| size = -1 | |
| print( | |
| f"[persistence] wrote snapshot tick={tick} -> {path} ({size} bytes)", | |
| flush=True, | |
| ) | |
| def _read(path: Path) -> dict[str, Any] | None: | |
| try: | |
| with open(path, encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| except (OSError, ValueError): | |
| return None | |
| return data if isinstance(data, dict) else None | |
| class BackgroundSnapshotWriter: | |
| """Coalescing writer: keeps only the latest pending snapshot and writes it | |
| off the runtime lock on a daemon thread.""" | |
| def __init__(self, store: SnapshotStore) -> None: | |
| self._store = store | |
| self._cond = threading.Condition() | |
| self._pending: dict[str, Any] | None = None | |
| self._closed = False | |
| self._thread = threading.Thread( | |
| target=self._run, name="world-snapshot-writer", daemon=True | |
| ) | |
| self._thread.start() | |
| def submit(self, snapshot: dict[str, Any]) -> None: | |
| with self._cond: | |
| self._pending = snapshot | |
| self._cond.notify() | |
| def drop_pending(self) -> None: | |
| """Discard any not-yet-written snapshot so a reset can't be undone by a | |
| stale snapshot submitted just before it.""" | |
| with self._cond: | |
| self._pending = None | |
| def _run(self) -> None: | |
| while True: | |
| with self._cond: | |
| while self._pending is None and not self._closed: | |
| self._cond.wait() | |
| if self._pending is None: | |
| return | |
| snapshot = self._pending | |
| self._pending = None | |
| try: | |
| self._store.save(snapshot) | |
| except Exception as exc: | |
| print(f"[persistence] snapshot write FAILED: {exc!r}", flush=True) | |
| def close(self, timeout: float = 5.0) -> None: | |
| with self._cond: | |
| self._closed = True | |
| self._cond.notify() | |
| self._thread.join(timeout) | |