world-simulator / src /world_simulator /api /persistence.py
DeltaZN
add persistance logs
ee354f2
Raw
History Blame Contribute Delete
7.86 kB
"""Snapshot persistence for the live world.
The simulation keeps all of its state in memory (:class:`WorldState` plus the
:class:`PlayerManager` registry). On Hugging Face Spaces the process is reset on
rebuilds, inactivity sleep, and crashes, so to survive a restart we dump a full
snapshot to a mounted bucket after every tick and reload it on boot.
Two halves:
* **Dump** is cheap -- everything is already JSON-native dataclasses, so
:meth:`WorldState.to_dict` plus :meth:`PlayerManager.snapshot` is enough.
* **Load** needs reconstruction, which :func:`world_from_dict` provides via a
generic recursive rebuild over dataclass fields and their type hints.
Writes go through :class:`BackgroundSnapshotWriter` so the (slow, network)
bucket write never happens while the runtime lock is held. Slots are written
ping-pong so a torn write can never destroy the most recent good snapshot.
"""
from __future__ import annotations
import json
import os
import threading
import types
import typing
from dataclasses import fields, is_dataclass
from functools import cache
from pathlib import Path
from typing import Any
from world_simulator.domain import WorldState
SNAPSHOT_VERSION = 1
_SLOT_NAMES = ("world_state.0.json", "world_state.1.json")
# -- reconstruction ------------------------------------------------------- #
@cache
def _type_hints(cls: type) -> dict[str, Any]:
return typing.get_type_hints(cls)
def _reconstruct(tp: Any, value: Any) -> Any:
"""Rebuild ``value`` (from JSON) into an instance of declared type ``tp``."""
if value is None:
return None
if is_dataclass(tp) and isinstance(tp, type):
hints = _type_hints(tp)
kwargs = {
field.name: _reconstruct(hints.get(field.name, Any), value[field.name])
for field in fields(tp)
if field.name in value
}
return tp(**kwargs)
origin = typing.get_origin(tp)
if origin in (typing.Union, types.UnionType):
# The model only uses ``X | None``; rebuild as the single concrete arm.
args = [arm for arm in typing.get_args(tp) if arm is not type(None)]
if len(args) == 1:
return _reconstruct(args[0], value)
return value
if origin is list:
list_args = typing.get_args(tp)
elem = list_args[0] if list_args else Any
return [_reconstruct(elem, item) for item in value]
# dict / Literal / primitives / Any are stored verbatim and need no rebuild.
return value
def world_from_dict(data: dict[str, Any]) -> WorldState:
"""Rebuild a :class:`WorldState` from the dict produced by ``to_dict``."""
world = _reconstruct(WorldState, data)
assert isinstance(world, WorldState)
return world
def build_snapshot(world: WorldState, players_state: dict[str, Any]) -> dict[str, Any]:
return {
"version": SNAPSHOT_VERSION,
"tick": world.tick,
"world": world.to_dict(),
"players": players_state,
}
def restore_snapshot(snapshot: dict[str, Any]) -> tuple[WorldState, dict[str, Any]]:
"""Return ``(world, players_state)`` from a loaded snapshot envelope."""
world = world_from_dict(snapshot["world"])
players_state = snapshot.get("players") or {}
return world, players_state
# -- storage -------------------------------------------------------------- #
class SnapshotStore:
"""Two-slot snapshot file on disk (typically a mounted bucket).
Writes always overwrite the *older* slot, so the freshest good snapshot is
never the one being clobbered -- a process killed mid-write loses at most
the in-flight slot, and :meth:`load` falls back to the other one.
"""
def __init__(self, directory: Path) -> None:
self._dir = directory
self._slot_ticks = [-1, -1]
def load(self) -> dict[str, Any] | None:
best: tuple[int, dict[str, Any]] | None = None
for index, name in enumerate(_SLOT_NAMES):
data = self._read(self._dir / name)
if data is None:
continue
tick = data.get("tick", -1)
tick = tick if isinstance(tick, int) else -1
self._slot_ticks[index] = tick
if best is None or tick > best[0]:
best = (tick, data)
return best[1] if best else None
def clear(self) -> None:
"""Delete both slot files and forget their ticks.
Missing files are ignored so a clear on a never-persisted world is a
no-op. After this, :meth:`load` returns ``None`` and the next
:meth:`save` starts the ping-pong from slot 0 again.
"""
for name in _SLOT_NAMES:
try:
(self._dir / name).unlink()
except FileNotFoundError:
pass
except OSError as exc:
print(f"world snapshot clear failed for {name}: {exc}", flush=True)
self._slot_ticks = [-1, -1]
def save(self, snapshot: dict[str, Any]) -> None:
target = 0 if self._slot_ticks[0] <= self._slot_ticks[1] else 1
path = self._dir / _SLOT_NAMES[target]
self._dir.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(f".tmp.{os.getpid()}")
with open(tmp, "w", encoding="utf-8") as handle:
handle.write(json.dumps(snapshot))
handle.flush()
try:
os.fsync(handle.fileno())
except OSError:
# Some FUSE-mounted buckets do not support fsync; the os.replace
# below still gives us a best-effort all-or-nothing publish.
pass
os.replace(tmp, path)
tick = snapshot.get("tick", -1)
self._slot_ticks[target] = tick if isinstance(tick, int) else -1
try:
size = path.stat().st_size
except OSError:
size = -1
print(
f"[persistence] wrote snapshot tick={tick} -> {path} ({size} bytes)",
flush=True,
)
@staticmethod
def _read(path: Path) -> dict[str, Any] | None:
try:
with open(path, encoding="utf-8") as handle:
data = json.load(handle)
except (OSError, ValueError):
return None
return data if isinstance(data, dict) else None
class BackgroundSnapshotWriter:
"""Coalescing writer: keeps only the latest pending snapshot and writes it
off the runtime lock on a daemon thread."""
def __init__(self, store: SnapshotStore) -> None:
self._store = store
self._cond = threading.Condition()
self._pending: dict[str, Any] | None = None
self._closed = False
self._thread = threading.Thread(
target=self._run, name="world-snapshot-writer", daemon=True
)
self._thread.start()
def submit(self, snapshot: dict[str, Any]) -> None:
with self._cond:
self._pending = snapshot
self._cond.notify()
def drop_pending(self) -> None:
"""Discard any not-yet-written snapshot so a reset can't be undone by a
stale snapshot submitted just before it."""
with self._cond:
self._pending = None
def _run(self) -> None:
while True:
with self._cond:
while self._pending is None and not self._closed:
self._cond.wait()
if self._pending is None:
return
snapshot = self._pending
self._pending = None
try:
self._store.save(snapshot)
except Exception as exc:
print(f"[persistence] snapshot write FAILED: {exc!r}", flush=True)
def close(self, timeout: float = 5.0) -> None:
with self._cond:
self._closed = True
self._cond.notify()
self._thread.join(timeout)