| """Snapshot persistence for the live world. |
| |
| The simulation keeps all of its state in memory (:class:`WorldState` plus the |
| :class:`PlayerManager` registry). On Hugging Face Spaces the process is reset on |
| rebuilds, inactivity sleep, and crashes, so to survive a restart we dump a full |
| snapshot to a mounted bucket after every tick and reload it on boot. |
| |
| Two halves: |
| |
| * **Dump** is cheap -- everything is already JSON-native dataclasses, so |
| :meth:`WorldState.to_dict` plus :meth:`PlayerManager.snapshot` is enough. |
| * **Load** needs reconstruction, which :func:`world_from_dict` provides via a |
| generic recursive rebuild over dataclass fields and their type hints. |
| |
| Writes go through :class:`BackgroundSnapshotWriter` so the (slow, network) |
| bucket write never happens while the runtime lock is held. Slots are written |
| ping-pong so a torn write can never destroy the most recent good snapshot. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| import threading |
| import types |
| import typing |
| from dataclasses import fields, is_dataclass |
| from functools import cache |
| from pathlib import Path |
| from typing import Any |
|
|
| from world_simulator.domain import WorldState |
|
|
| SNAPSHOT_VERSION = 1 |
| _SLOT_NAMES = ("world_state.0.json", "world_state.1.json") |
|
|
|
|
| |
|
|
|
|
| @cache |
| def _type_hints(cls: type) -> dict[str, Any]: |
| return typing.get_type_hints(cls) |
|
|
|
|
| def _reconstruct(tp: Any, value: Any) -> Any: |
| """Rebuild ``value`` (from JSON) into an instance of declared type ``tp``.""" |
| if value is None: |
| return None |
| if is_dataclass(tp) and isinstance(tp, type): |
| hints = _type_hints(tp) |
| kwargs = { |
| field.name: _reconstruct(hints.get(field.name, Any), value[field.name]) |
| for field in fields(tp) |
| if field.name in value |
| } |
| return tp(**kwargs) |
| origin = typing.get_origin(tp) |
| if origin in (typing.Union, types.UnionType): |
| |
| args = [arm for arm in typing.get_args(tp) if arm is not type(None)] |
| if len(args) == 1: |
| return _reconstruct(args[0], value) |
| return value |
| if origin is list: |
| list_args = typing.get_args(tp) |
| elem = list_args[0] if list_args else Any |
| return [_reconstruct(elem, item) for item in value] |
| |
| return value |
|
|
|
|
| def world_from_dict(data: dict[str, Any]) -> WorldState: |
| """Rebuild a :class:`WorldState` from the dict produced by ``to_dict``.""" |
| world = _reconstruct(WorldState, data) |
| assert isinstance(world, WorldState) |
| return world |
|
|
|
|
| def build_snapshot(world: WorldState, players_state: dict[str, Any]) -> dict[str, Any]: |
| return { |
| "version": SNAPSHOT_VERSION, |
| "tick": world.tick, |
| "world": world.to_dict(), |
| "players": players_state, |
| } |
|
|
|
|
| def restore_snapshot(snapshot: dict[str, Any]) -> tuple[WorldState, dict[str, Any]]: |
| """Return ``(world, players_state)`` from a loaded snapshot envelope.""" |
| world = world_from_dict(snapshot["world"]) |
| players_state = snapshot.get("players") or {} |
| return world, players_state |
|
|
|
|
| |
|
|
|
|
| class SnapshotStore: |
| """Two-slot snapshot file on disk (typically a mounted bucket). |
| |
| Writes always overwrite the *older* slot, so the freshest good snapshot is |
| never the one being clobbered -- a process killed mid-write loses at most |
| the in-flight slot, and :meth:`load` falls back to the other one. |
| """ |
|
|
| def __init__(self, directory: Path) -> None: |
| self._dir = directory |
| self._slot_ticks = [-1, -1] |
|
|
| def load(self) -> dict[str, Any] | None: |
| best: tuple[int, dict[str, Any]] | None = None |
| for index, name in enumerate(_SLOT_NAMES): |
| data = self._read(self._dir / name) |
| if data is None: |
| continue |
| tick = data.get("tick", -1) |
| tick = tick if isinstance(tick, int) else -1 |
| self._slot_ticks[index] = tick |
| if best is None or tick > best[0]: |
| best = (tick, data) |
| return best[1] if best else None |
|
|
| def clear(self) -> None: |
| """Delete both slot files and forget their ticks. |
| |
| Missing files are ignored so a clear on a never-persisted world is a |
| no-op. After this, :meth:`load` returns ``None`` and the next |
| :meth:`save` starts the ping-pong from slot 0 again. |
| """ |
| for name in _SLOT_NAMES: |
| try: |
| (self._dir / name).unlink() |
| except FileNotFoundError: |
| pass |
| except OSError as exc: |
| print(f"world snapshot clear failed for {name}: {exc}", flush=True) |
| self._slot_ticks = [-1, -1] |
|
|
| def save(self, snapshot: dict[str, Any]) -> None: |
| target = 0 if self._slot_ticks[0] <= self._slot_ticks[1] else 1 |
| path = self._dir / _SLOT_NAMES[target] |
| self._dir.mkdir(parents=True, exist_ok=True) |
| tmp = path.with_suffix(f".tmp.{os.getpid()}") |
| with open(tmp, "w", encoding="utf-8") as handle: |
| handle.write(json.dumps(snapshot)) |
| handle.flush() |
| try: |
| os.fsync(handle.fileno()) |
| except OSError: |
| |
| |
| pass |
| os.replace(tmp, path) |
| tick = snapshot.get("tick", -1) |
| self._slot_ticks[target] = tick if isinstance(tick, int) else -1 |
| try: |
| size = path.stat().st_size |
| except OSError: |
| size = -1 |
| print( |
| f"[persistence] wrote snapshot tick={tick} -> {path} ({size} bytes)", |
| flush=True, |
| ) |
|
|
| @staticmethod |
| def _read(path: Path) -> dict[str, Any] | None: |
| try: |
| with open(path, encoding="utf-8") as handle: |
| data = json.load(handle) |
| except (OSError, ValueError): |
| return None |
| return data if isinstance(data, dict) else None |
|
|
|
|
| class BackgroundSnapshotWriter: |
| """Coalescing writer: keeps only the latest pending snapshot and writes it |
| off the runtime lock on a daemon thread.""" |
|
|
| def __init__(self, store: SnapshotStore) -> None: |
| self._store = store |
| self._cond = threading.Condition() |
| self._pending: dict[str, Any] | None = None |
| self._closed = False |
| self._thread = threading.Thread( |
| target=self._run, name="world-snapshot-writer", daemon=True |
| ) |
| self._thread.start() |
|
|
| def submit(self, snapshot: dict[str, Any]) -> None: |
| with self._cond: |
| self._pending = snapshot |
| self._cond.notify() |
|
|
| def drop_pending(self) -> None: |
| """Discard any not-yet-written snapshot so a reset can't be undone by a |
| stale snapshot submitted just before it.""" |
| with self._cond: |
| self._pending = None |
|
|
| def _run(self) -> None: |
| while True: |
| with self._cond: |
| while self._pending is None and not self._closed: |
| self._cond.wait() |
| if self._pending is None: |
| return |
| snapshot = self._pending |
| self._pending = None |
| try: |
| self._store.save(snapshot) |
| except Exception as exc: |
| print(f"[persistence] snapshot write FAILED: {exc!r}", flush=True) |
|
|
| def close(self, timeout: float = 5.0) -> None: |
| with self._cond: |
| self._closed = True |
| self._cond.notify() |
| self._thread.join(timeout) |
|
|