world-simulator / src /world_simulator /observability.py
DeltaZN
feat: rename god -> world
c58d3eb
Raw
History Blame Contribute Delete
4.51 kB
from __future__ import annotations
import json
from dataclasses import asdict, is_dataclass
from datetime import UTC, datetime
from pathlib import Path
from threading import Lock
from typing import Any
_ACTIVE_TICK: TickLedgerBuffer | None = None
_ACTIVE_LOCK = Lock()
class RunLedger:
def __init__(self, root: Path = Path("logs")) -> None:
timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
run_dir = root / f"run_{timestamp}"
suffix = 1
while run_dir.exists():
run_dir = root / f"run_{timestamp}_{suffix:02d}"
suffix += 1
run_dir.mkdir(parents=True, exist_ok=False)
self.run_dir = run_dir
self.ledger_path = run_dir / "ledger.jsonl"
self.pretty_path = run_dir / "pretty.log"
self._jsonl = self.ledger_path.open("a", encoding="utf-8")
self._pretty = self.pretty_path.open("a", encoding="utf-8")
self._lock = Lock()
def start_tick(self, tick: int, *, npc_order: list[str]) -> TickLedgerBuffer:
return TickLedgerBuffer(tick=tick, npc_order=npc_order)
def flush_tick(self, buffer: TickLedgerBuffer) -> None:
self.write_records(buffer.sorted_records())
def write_records(self, records: list[dict[str, Any]]) -> None:
if not records:
return
with self._lock:
for record in records:
clean = {key: value for key, value in record.items() if not key.startswith("_")}
self._jsonl.write(json.dumps(_json_safe(clean), ensure_ascii=False, sort_keys=True))
self._jsonl.write("\n")
self._pretty.write(_pretty_line(clean))
self._pretty.write("\n")
self._jsonl.flush()
self._pretty.flush()
class TickLedgerBuffer:
def __init__(self, *, tick: int, npc_order: list[str]) -> None:
self.tick = tick
self._npc_order = {npc_id: index for index, npc_id in enumerate(npc_order)}
self._records: list[dict[str, Any]] = []
self._counter = 0
self._lock = Lock()
def append(self, record: dict[str, Any]) -> None:
with self._lock:
self._counter += 1
enriched = dict(record)
enriched.setdefault("tick", self.tick)
enriched["_insert_order"] = self._counter
enriched["_sort_order"] = self._sort_order(enriched)
self._records.append(enriched)
def sorted_records(self) -> list[dict[str, Any]]:
with self._lock:
return sorted(
(dict(record) for record in self._records),
key=lambda record: (record["_sort_order"], record["_insert_order"]),
)
def _sort_order(self, record: dict[str, Any]) -> float:
phase = record.get("phase")
if phase == "god_command":
return 0
if phase == "directive_expired":
return 0.5
if phase == "overseer_request":
return 1
if phase == "overseer_response":
return 2
if phase in {"npc_request", "npc_response", "npc_fallback"}:
npc_index = self._npc_order.get(str(record.get("npc_id")), 9999)
offset = {"npc_request": 0, "npc_response": 1, "npc_fallback": 2}[phase]
return 10 + (npc_index * 3) + offset
if phase == "engine_events":
return 99999
return 50000
def set_active_tick_buffer(buffer: TickLedgerBuffer | None) -> None:
global _ACTIVE_TICK
with _ACTIVE_LOCK:
_ACTIVE_TICK = buffer
def append_record(record: dict[str, Any]) -> None:
with _ACTIVE_LOCK:
buffer = _ACTIVE_TICK
if buffer is not None:
buffer.append(record)
def _json_safe(value: Any) -> Any:
if is_dataclass(value):
return _json_safe(asdict(value))
if isinstance(value, dict):
return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, list | tuple):
return [_json_safe(item) for item in value]
if isinstance(value, Path):
return str(value)
return value
def _pretty_line(record: dict[str, Any]) -> str:
tick = record.get("tick", "?")
phase = record.get("phase", "unknown")
npc = record.get("npc_id")
suffix = f" npc={npc}" if npc else ""
compact = json.dumps(_json_safe(record), ensure_ascii=False, sort_keys=True)
if len(compact) > 900:
compact = compact[:897] + "..."
return f"tick={tick} phase={phase}{suffix} {compact}"