| """Local real-model play: drive the actual runtime against the configured |
| nemotron (default) + qwen (secondary) endpoints, with a small population so a |
| run is cheap and easy to read. |
| |
| Unlike selfplay_sim.py (which injects a deterministic fake completer), this |
| calls the real OpenAI-compatible endpoints in the config, so it exercises the |
| true prompt -> model -> validator loop. Use it to watch whether NPCs actually |
| talk to each other and coordinate. |
| |
| Usage: |
| .venv\\Scripts\\python.exe scripts\\localplay_sim.py --ticks 20 --count 2 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| from collections import Counter |
| from dataclasses import replace |
| from pathlib import Path |
| import sys |
|
|
| REPO_ROOT = Path(__file__).resolve().parents[1] |
| SRC_ROOT = REPO_ROOT / "src" |
| if str(SRC_ROOT) not in sys.path: |
| sys.path.insert(0, str(SRC_ROOT)) |
|
|
| from world_simulator.api.runtime import GameRuntime |
| from world_simulator.config import NpcConfig, load_game_config |
| from world_simulator.domain import Beast, Vec3 |
| from world_simulator.simulation.connectors.deterministic import ( |
| DeterministicWorldSimulator, |
| ) |
| from world_simulator.simulation.connectors.openai_compatible import ( |
| OpenAICompatibleWorldSimulator, |
| ) |
| from world_simulator.simulation.connectors.routing import RoutingWorldSimulator |
| from world_simulator.simulation.spawning import create_world |
|
|
|
|
| def build_runtime(config_path: Path, count: int, *, beast: bool = False) -> GameRuntime: |
| config = load_game_config(config_path) |
| |
| config = replace(config, npcs=NpcConfig(count=count)) |
| world = create_world(config) |
|
|
| if beast: |
| |
| |
| anchor = next( |
| (npc.position for npc in world.npcs if npc.country_id == "nemotron"), |
| Vec3(x=-80.0, y=0.0, z=0.0), |
| ) |
| world.beasts.append( |
| Beast( |
| id="beast_test_1", |
| position=Vec3(x=anchor.x + 6.0, y=0.0, z=anchor.z), |
| health=60.0, |
| damage=9.0, |
| ) |
| ) |
| print(f"INJECTED beast_test_1 near nemotron at x={anchor.x + 6.0:g} z={anchor.z:g}") |
|
|
| deterministic = DeterministicWorldSimulator() |
|
|
| |
| routes = { |
| connector_id: OpenAICompatibleWorldSimulator( |
| connector_cfg, |
| fallback=deterministic, |
| connector_id_filter=connector_id, |
| ) |
| for connector_id, connector_cfg in config.secondary_connectors.items() |
| if connector_cfg.type == "openai_compatible" |
| } |
| default = OpenAICompatibleWorldSimulator( |
| config.connector, |
| fallback=deterministic, |
| connector_id_filter=None, |
| ) |
| simulator = RoutingWorldSimulator(routes=routes, default=default) |
| return GameRuntime(world=world, simulator=simulator, config=config) |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Local real-model play.") |
| parser.add_argument("--ticks", type=int, default=20) |
| parser.add_argument("--count", type=int, default=2, help="NPCs per faction.") |
| parser.add_argument( |
| "--config", type=Path, default=REPO_ROOT / "config" / "game.modal.local.json" |
| ) |
| parser.add_argument("--beast", action="store_true", help="Inject a beast near the nemotron NPCs.") |
| args = parser.parse_args() |
|
|
| runtime = build_runtime(args.config, args.count, beast=args.beast) |
| print( |
| f"LOCALPLAY start config={args.config.name} ticks={args.ticks} " |
| f"count_per_faction={args.count} simulator={runtime.simulator_name}" |
| ) |
|
|
| for _ in range(args.ticks): |
| status, _payload = runtime.tick() |
| if int(status) != 200: |
| print(f"tick failed: {status}") |
| break |
|
|
| ledger_path = runtime._ledger.ledger_path |
| print(f"LOCALPLAY done. ledger={ledger_path}") |
| _summarize(ledger_path) |
|
|
|
|
| |
| |
| DISCONTENT_KEYWORDS = ( |
| "give back", "give it back", "give them back", "return", "give us back", |
| "stole", "stolen", "steal", "theft", "thief", "thieves", "robbed", "rob", |
| "took our", "took from", "our coins", "our food", "our gold", "our wealth", |
| "demand", "owe", "repay", "pay back", "or else", "surrender", "what is ours", |
| "avenge", "revenge", "raid", "you took", "they took", "back or", |
| ) |
|
|
|
|
| def _treasury_country(object_id: object) -> str | None: |
| """Country that owns a treasury id like 'treasury_qwen'.""" |
| if isinstance(object_id, str) and object_id.startswith("treasury_"): |
| return object_id[len("treasury_"):] |
| return None |
|
|
|
|
| def _is_beast(entity_id: object) -> bool: |
| return isinstance(entity_id, str) and entity_id.startswith("beast") |
|
|
|
|
| def _summarize(ledger_path: Path) -> None: |
| verdicts: Counter[str] = Counter() |
| actions: Counter[str] = Counter() |
| models_by_npc: dict[str, set[str]] = {} |
| country_by_npc: dict[str, str] = {} |
| fallback_reasons: Counter[str] = Counter() |
| speech: list[str] = [] |
| |
| timeline: list[tuple[int, str, str]] = [] |
| tick = 0 |
|
|
| with ledger_path.open("r", encoding="utf-8") as handle: |
| for line in handle: |
| if not line.strip(): |
| continue |
| rec = json.loads(line) |
| phase = rec.get("phase", "?") |
| if phase == "npc_request": |
| models_by_npc.setdefault(str(rec.get("npc_id")), set()).add( |
| f"{rec.get('model_profile')}::{rec.get('model')}" |
| ) |
| if rec.get("country_id"): |
| country_by_npc[str(rec.get("npc_id"))] = str(rec.get("country_id")) |
| |
| |
| tick = int(rec.get("tick", tick)) |
| if phase == "npc_response": |
| tick = int(rec.get("tick", tick)) |
| verdict = rec.get("validator_verdict") or {} |
| verdicts[str(verdict.get("status"))] += 1 |
| parsed = rec.get("parsed_action") or {} |
| if isinstance(parsed, dict) and parsed.get("action"): |
| actions[str(parsed["action"])] += 1 |
| actor = str(parsed.get("npc_id")) |
| actor_country = country_by_npc.get(actor) |
| target = parsed.get("target_npc_id") or parsed.get("target_entity_id") |
| target_country = ( |
| country_by_npc.get(str(target)) or _treasury_country(target) |
| ) |
| cross = bool( |
| actor_country and target_country and actor_country != target_country |
| ) |
| if parsed.get("action") == "speak" and parsed.get("message"): |
| message = str(parsed["message"]) |
| speech.append(f" t{tick} {actor}: \"{message}\"") |
| |
| |
| if cross and any(k in message.lower() for k in DISCONTENT_KEYWORDS): |
| timeline.append((tick, "DEMAND", |
| f"{actor}({actor_country}) -> {target}({target_country}): " |
| f"\"{message}\"")) |
| |
| |
| if parsed.get("take") and cross: |
| timeline.append((tick, "intent-steal", |
| f"{actor}({actor_country}) tried to steal from {target}({target_country})")) |
| if parsed.get("action") == "attack" and cross and not _is_beast(target): |
| timeline.append((tick, "intent-attack", |
| f"{actor}({actor_country}) tried to attack {target}({target_country})")) |
| if phase == "engine_events": |
| for event in rec.get("events") or []: |
| etick = int(event.get("tick", rec.get("tick", tick))) |
| etype = event.get("type") |
| |
| if etype == "treasury_stolen": |
| thief = str(event.get("actor_id")) |
| thief_country = country_by_npc.get(thief) |
| victim_country = _treasury_country(event.get("object_id")) |
| if (thief_country and victim_country |
| and thief_country != victim_country): |
| timeline.append((etick, "STEAL", |
| f"{thief}({thief_country}) stole from " |
| f"treasury_{victim_country}: {event.get('summary')}")) |
| |
| elif etype == "npc_attack": |
| attacker = str(event.get("actor_id")) |
| victim = event.get("target_id") |
| a_country = country_by_npc.get(attacker) |
| v_country = country_by_npc.get(str(victim)) |
| if (not _is_beast(victim) and a_country and v_country |
| and a_country != v_country): |
| timeline.append((etick, "ATTACK", |
| f"{attacker}({a_country}) attacked " |
| f"{victim}({v_country}): {event.get('summary')}")) |
| if phase == "npc_fallback": |
| fallback_reasons[str(rec.get("reason"))] += 1 |
|
|
| print("\n== model per NPC ==") |
| for npc_id in sorted(models_by_npc): |
| print(f" {npc_id}: {sorted(models_by_npc[npc_id])}") |
| print("\n== validator verdicts ==") |
| for status, count in verdicts.most_common(): |
| print(f" {status}: {count}") |
| print("\n== parsed LLM actions ==") |
| for action, count in actions.most_common(): |
| print(f" {action}: {count}") |
| print("\n== fallback reasons ==") |
| for reason, count in fallback_reasons.most_common() or [("(none)", 0)]: |
| print(f" {reason}: {count}") |
| print(f"\n== speech ({len(speech)}) ==") |
| for line in speech[-30:]: |
| print(line) |
|
|
| timeline.sort(key=lambda item: item[0]) |
| print(f"\n== INTER-COUNTRY TIMELINE ({len(timeline)}) ==") |
| for tk, kind, text in timeline: |
| print(f" t{tk:03d} [{kind}] {text}") |
| _goal_verdict(timeline) |
|
|
|
|
| def _goal_verdict(timeline: list[tuple[int, str, str]]) -> None: |
| """Goal: a STEAL, then a DEMAND at a rival, then a cross-country ATTACK.""" |
| steal = next((t for t, k, _ in timeline if k == "STEAL"), None) |
| demand = next((t for t, k, _ in timeline if k == "DEMAND" and steal is not None and t >= steal), None) |
| attack = next( |
| (t for t, k, _ in timeline if k == "ATTACK" and demand is not None and t >= demand), None |
| ) |
| print("\n== GOAL CHECK ==") |
| print(f" steal at tick: {steal}") |
| print(f" discontent at: {demand} (after steal)") |
| print(f" attack at tick: {attack} (after discontent)") |
| met = steal is not None and demand is not None and attack is not None |
| print(f" GOAL {'MET' if met else 'NOT met'}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|