Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| from dataclasses import asdict, is_dataclass | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Any | |
| _ACTIVE_TICK: TickLedgerBuffer | None = None | |
| _ACTIVE_LOCK = Lock() | |
| class RunLedger: | |
| def __init__(self, root: Path = Path("logs")) -> None: | |
| timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") | |
| run_dir = root / f"run_{timestamp}" | |
| suffix = 1 | |
| while run_dir.exists(): | |
| run_dir = root / f"run_{timestamp}_{suffix:02d}" | |
| suffix += 1 | |
| run_dir.mkdir(parents=True, exist_ok=False) | |
| self.run_dir = run_dir | |
| self.ledger_path = run_dir / "ledger.jsonl" | |
| self.pretty_path = run_dir / "pretty.log" | |
| self._jsonl = self.ledger_path.open("a", encoding="utf-8") | |
| self._pretty = self.pretty_path.open("a", encoding="utf-8") | |
| self._lock = Lock() | |
| def start_tick(self, tick: int, *, npc_order: list[str]) -> TickLedgerBuffer: | |
| return TickLedgerBuffer(tick=tick, npc_order=npc_order) | |
| def flush_tick(self, buffer: TickLedgerBuffer) -> None: | |
| self.write_records(buffer.sorted_records()) | |
| def write_records(self, records: list[dict[str, Any]]) -> None: | |
| if not records: | |
| return | |
| with self._lock: | |
| for record in records: | |
| clean = {key: value for key, value in record.items() if not key.startswith("_")} | |
| self._jsonl.write(json.dumps(_json_safe(clean), ensure_ascii=False, sort_keys=True)) | |
| self._jsonl.write("\n") | |
| self._pretty.write(_pretty_line(clean)) | |
| self._pretty.write("\n") | |
| self._jsonl.flush() | |
| self._pretty.flush() | |
| class TickLedgerBuffer: | |
| def __init__(self, *, tick: int, npc_order: list[str]) -> None: | |
| self.tick = tick | |
| self._npc_order = {npc_id: index for index, npc_id in enumerate(npc_order)} | |
| self._records: list[dict[str, Any]] = [] | |
| self._counter = 0 | |
| self._lock = Lock() | |
| def append(self, record: dict[str, Any]) -> None: | |
| with self._lock: | |
| self._counter += 1 | |
| enriched = dict(record) | |
| enriched.setdefault("tick", self.tick) | |
| enriched["_insert_order"] = self._counter | |
| enriched["_sort_order"] = self._sort_order(enriched) | |
| self._records.append(enriched) | |
| def sorted_records(self) -> list[dict[str, Any]]: | |
| with self._lock: | |
| return sorted( | |
| (dict(record) for record in self._records), | |
| key=lambda record: (record["_sort_order"], record["_insert_order"]), | |
| ) | |
| def _sort_order(self, record: dict[str, Any]) -> float: | |
| phase = record.get("phase") | |
| if phase == "god_command": | |
| return 0 | |
| if phase == "directive_expired": | |
| return 0.5 | |
| if phase == "overseer_request": | |
| return 1 | |
| if phase == "overseer_response": | |
| return 2 | |
| if phase in {"npc_request", "npc_response", "npc_fallback"}: | |
| npc_index = self._npc_order.get(str(record.get("npc_id")), 9999) | |
| offset = {"npc_request": 0, "npc_response": 1, "npc_fallback": 2}[phase] | |
| return 10 + (npc_index * 3) + offset | |
| if phase == "engine_events": | |
| return 99999 | |
| return 50000 | |
| def set_active_tick_buffer(buffer: TickLedgerBuffer | None) -> None: | |
| global _ACTIVE_TICK | |
| with _ACTIVE_LOCK: | |
| _ACTIVE_TICK = buffer | |
| def append_record(record: dict[str, Any]) -> None: | |
| with _ACTIVE_LOCK: | |
| buffer = _ACTIVE_TICK | |
| if buffer is not None: | |
| buffer.append(record) | |
| def _json_safe(value: Any) -> Any: | |
| if is_dataclass(value): | |
| return _json_safe(asdict(value)) | |
| if isinstance(value, dict): | |
| return {str(key): _json_safe(item) for key, item in value.items()} | |
| if isinstance(value, list | tuple): | |
| return [_json_safe(item) for item in value] | |
| if isinstance(value, Path): | |
| return str(value) | |
| return value | |
| def _pretty_line(record: dict[str, Any]) -> str: | |
| tick = record.get("tick", "?") | |
| phase = record.get("phase", "unknown") | |
| npc = record.get("npc_id") | |
| suffix = f" npc={npc}" if npc else "" | |
| compact = json.dumps(_json_safe(record), ensure_ascii=False, sort_keys=True) | |
| if len(compact) > 900: | |
| compact = compact[:897] + "..." | |
| return f"tick={tick} phase={phase}{suffix} {compact}" | |