Spaces:
Runtime error
Runtime error
File size: 7,863 Bytes
8043870 217ea4c 8043870 ee354f2 8043870 217ea4c 8043870 ee354f2 8043870 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | """Snapshot persistence for the live world.
The simulation keeps all of its state in memory (:class:`WorldState` plus the
:class:`PlayerManager` registry). On Hugging Face Spaces the process is reset on
rebuilds, inactivity sleep, and crashes, so to survive a restart we dump a full
snapshot to a mounted bucket after every tick and reload it on boot.
Two halves:
* **Dump** is cheap -- everything is already JSON-native dataclasses, so
:meth:`WorldState.to_dict` plus :meth:`PlayerManager.snapshot` is enough.
* **Load** needs reconstruction, which :func:`world_from_dict` provides via a
generic recursive rebuild over dataclass fields and their type hints.
Writes go through :class:`BackgroundSnapshotWriter` so the (slow, network)
bucket write never happens while the runtime lock is held. Slots are written
ping-pong so a torn write can never destroy the most recent good snapshot.
"""
from __future__ import annotations
import json
import os
import threading
import types
import typing
from dataclasses import fields, is_dataclass
from functools import cache
from pathlib import Path
from typing import Any
from world_simulator.domain import WorldState
SNAPSHOT_VERSION = 1
_SLOT_NAMES = ("world_state.0.json", "world_state.1.json")
# -- reconstruction ------------------------------------------------------- #
@cache
def _type_hints(cls: type) -> dict[str, Any]:
return typing.get_type_hints(cls)
def _reconstruct(tp: Any, value: Any) -> Any:
"""Rebuild ``value`` (from JSON) into an instance of declared type ``tp``."""
if value is None:
return None
if is_dataclass(tp) and isinstance(tp, type):
hints = _type_hints(tp)
kwargs = {
field.name: _reconstruct(hints.get(field.name, Any), value[field.name])
for field in fields(tp)
if field.name in value
}
return tp(**kwargs)
origin = typing.get_origin(tp)
if origin in (typing.Union, types.UnionType):
# The model only uses ``X | None``; rebuild as the single concrete arm.
args = [arm for arm in typing.get_args(tp) if arm is not type(None)]
if len(args) == 1:
return _reconstruct(args[0], value)
return value
if origin is list:
list_args = typing.get_args(tp)
elem = list_args[0] if list_args else Any
return [_reconstruct(elem, item) for item in value]
# dict / Literal / primitives / Any are stored verbatim and need no rebuild.
return value
def world_from_dict(data: dict[str, Any]) -> WorldState:
"""Rebuild a :class:`WorldState` from the dict produced by ``to_dict``."""
world = _reconstruct(WorldState, data)
assert isinstance(world, WorldState)
return world
def build_snapshot(world: WorldState, players_state: dict[str, Any]) -> dict[str, Any]:
return {
"version": SNAPSHOT_VERSION,
"tick": world.tick,
"world": world.to_dict(),
"players": players_state,
}
def restore_snapshot(snapshot: dict[str, Any]) -> tuple[WorldState, dict[str, Any]]:
"""Return ``(world, players_state)`` from a loaded snapshot envelope."""
world = world_from_dict(snapshot["world"])
players_state = snapshot.get("players") or {}
return world, players_state
# -- storage -------------------------------------------------------------- #
class SnapshotStore:
"""Two-slot snapshot file on disk (typically a mounted bucket).
Writes always overwrite the *older* slot, so the freshest good snapshot is
never the one being clobbered -- a process killed mid-write loses at most
the in-flight slot, and :meth:`load` falls back to the other one.
"""
def __init__(self, directory: Path) -> None:
self._dir = directory
self._slot_ticks = [-1, -1]
def load(self) -> dict[str, Any] | None:
best: tuple[int, dict[str, Any]] | None = None
for index, name in enumerate(_SLOT_NAMES):
data = self._read(self._dir / name)
if data is None:
continue
tick = data.get("tick", -1)
tick = tick if isinstance(tick, int) else -1
self._slot_ticks[index] = tick
if best is None or tick > best[0]:
best = (tick, data)
return best[1] if best else None
def clear(self) -> None:
"""Delete both slot files and forget their ticks.
Missing files are ignored so a clear on a never-persisted world is a
no-op. After this, :meth:`load` returns ``None`` and the next
:meth:`save` starts the ping-pong from slot 0 again.
"""
for name in _SLOT_NAMES:
try:
(self._dir / name).unlink()
except FileNotFoundError:
pass
except OSError as exc:
print(f"world snapshot clear failed for {name}: {exc}", flush=True)
self._slot_ticks = [-1, -1]
def save(self, snapshot: dict[str, Any]) -> None:
target = 0 if self._slot_ticks[0] <= self._slot_ticks[1] else 1
path = self._dir / _SLOT_NAMES[target]
self._dir.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(f".tmp.{os.getpid()}")
with open(tmp, "w", encoding="utf-8") as handle:
handle.write(json.dumps(snapshot))
handle.flush()
try:
os.fsync(handle.fileno())
except OSError:
# Some FUSE-mounted buckets do not support fsync; the os.replace
# below still gives us a best-effort all-or-nothing publish.
pass
os.replace(tmp, path)
tick = snapshot.get("tick", -1)
self._slot_ticks[target] = tick if isinstance(tick, int) else -1
try:
size = path.stat().st_size
except OSError:
size = -1
print(
f"[persistence] wrote snapshot tick={tick} -> {path} ({size} bytes)",
flush=True,
)
@staticmethod
def _read(path: Path) -> dict[str, Any] | None:
try:
with open(path, encoding="utf-8") as handle:
data = json.load(handle)
except (OSError, ValueError):
return None
return data if isinstance(data, dict) else None
class BackgroundSnapshotWriter:
"""Coalescing writer: keeps only the latest pending snapshot and writes it
off the runtime lock on a daemon thread."""
def __init__(self, store: SnapshotStore) -> None:
self._store = store
self._cond = threading.Condition()
self._pending: dict[str, Any] | None = None
self._closed = False
self._thread = threading.Thread(
target=self._run, name="world-snapshot-writer", daemon=True
)
self._thread.start()
def submit(self, snapshot: dict[str, Any]) -> None:
with self._cond:
self._pending = snapshot
self._cond.notify()
def drop_pending(self) -> None:
"""Discard any not-yet-written snapshot so a reset can't be undone by a
stale snapshot submitted just before it."""
with self._cond:
self._pending = None
def _run(self) -> None:
while True:
with self._cond:
while self._pending is None and not self._closed:
self._cond.wait()
if self._pending is None:
return
snapshot = self._pending
self._pending = None
try:
self._store.save(snapshot)
except Exception as exc:
print(f"[persistence] snapshot write FAILED: {exc!r}", flush=True)
def close(self, timeout: float = 5.0) -> None:
with self._cond:
self._closed = True
self._cond.notify()
self._thread.join(timeout)
|