from __future__ import annotations import json from dataclasses import asdict, is_dataclass from datetime import UTC, datetime from pathlib import Path from threading import Lock from typing import Any _ACTIVE_TICK: TickLedgerBuffer | None = None _ACTIVE_LOCK = Lock() class RunLedger: def __init__(self, root: Path = Path("logs")) -> None: timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") run_dir = root / f"run_{timestamp}" suffix = 1 while run_dir.exists(): run_dir = root / f"run_{timestamp}_{suffix:02d}" suffix += 1 run_dir.mkdir(parents=True, exist_ok=False) self.run_dir = run_dir self.ledger_path = run_dir / "ledger.jsonl" self.pretty_path = run_dir / "pretty.log" self._jsonl = self.ledger_path.open("a", encoding="utf-8") self._pretty = self.pretty_path.open("a", encoding="utf-8") self._lock = Lock() def start_tick(self, tick: int, *, npc_order: list[str]) -> TickLedgerBuffer: return TickLedgerBuffer(tick=tick, npc_order=npc_order) def flush_tick(self, buffer: TickLedgerBuffer) -> None: self.write_records(buffer.sorted_records()) def write_records(self, records: list[dict[str, Any]]) -> None: if not records: return with self._lock: for record in records: clean = {key: value for key, value in record.items() if not key.startswith("_")} self._jsonl.write(json.dumps(_json_safe(clean), ensure_ascii=False, sort_keys=True)) self._jsonl.write("\n") self._pretty.write(_pretty_line(clean)) self._pretty.write("\n") self._jsonl.flush() self._pretty.flush() class TickLedgerBuffer: def __init__(self, *, tick: int, npc_order: list[str]) -> None: self.tick = tick self._npc_order = {npc_id: index for index, npc_id in enumerate(npc_order)} self._records: list[dict[str, Any]] = [] self._counter = 0 self._lock = Lock() def append(self, record: dict[str, Any]) -> None: with self._lock: self._counter += 1 enriched = dict(record) enriched.setdefault("tick", self.tick) enriched["_insert_order"] = self._counter enriched["_sort_order"] = self._sort_order(enriched) self._records.append(enriched) def sorted_records(self) -> list[dict[str, Any]]: with self._lock: return sorted( (dict(record) for record in self._records), key=lambda record: (record["_sort_order"], record["_insert_order"]), ) def _sort_order(self, record: dict[str, Any]) -> float: phase = record.get("phase") if phase == "god_command": return 0 if phase == "directive_expired": return 0.5 if phase == "overseer_request": return 1 if phase == "overseer_response": return 2 if phase in {"npc_request", "npc_response", "npc_fallback"}: npc_index = self._npc_order.get(str(record.get("npc_id")), 9999) offset = {"npc_request": 0, "npc_response": 1, "npc_fallback": 2}[phase] return 10 + (npc_index * 3) + offset if phase == "engine_events": return 99999 return 50000 def set_active_tick_buffer(buffer: TickLedgerBuffer | None) -> None: global _ACTIVE_TICK with _ACTIVE_LOCK: _ACTIVE_TICK = buffer def append_record(record: dict[str, Any]) -> None: with _ACTIVE_LOCK: buffer = _ACTIVE_TICK if buffer is not None: buffer.append(record) def _json_safe(value: Any) -> Any: if is_dataclass(value): return _json_safe(asdict(value)) if isinstance(value, dict): return {str(key): _json_safe(item) for key, item in value.items()} if isinstance(value, list | tuple): return [_json_safe(item) for item in value] if isinstance(value, Path): return str(value) return value def _pretty_line(record: dict[str, Any]) -> str: tick = record.get("tick", "?") phase = record.get("phase", "unknown") npc = record.get("npc_id") suffix = f" npc={npc}" if npc else "" compact = json.dumps(_json_safe(record), ensure_ascii=False, sort_keys=True) if len(compact) > 900: compact = compact[:897] + "..." return f"tick={tick} phase={phase}{suffix} {compact}"