| """LLM-shaped self-play harness. |
| |
| Drives the *real* runtime pipeline (RoutingWorldSimulator -> OpenAICompatible |
| connectors -> ledger) but injects a deterministic, behaviour-rich "model" |
| instead of calling a remote endpoint. This makes a full 100-300 tick self-play |
| reproducible offline while still exercising: |
| |
| * per-NPC npc_request / npc_response ledger records with model attribution; |
| * the Qwen secondary connector routing (qwen-* NPCs) vs the default connector |
| (nemotron NPCs), each logged with its own model/model_profile; |
| * the fallback path (a few "unusable" model outputs) being explicitly logged; |
| * elections + vote, treasury deposit/steal via transfer, cannon craft via use, |
| speak, gather, eat/heal — the full action surface. |
| |
| Usage: |
| .venv\\Scripts\\python.exe scripts\\selfplay_sim.py --ticks 150 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import re |
| from collections import Counter |
| from pathlib import Path |
| import sys |
|
|
| REPO_ROOT = Path(__file__).resolve().parents[1] |
| SRC_ROOT = REPO_ROOT / "src" |
| if str(SRC_ROOT) not in sys.path: |
| sys.path.insert(0, str(SRC_ROOT)) |
|
|
| from world_simulator.api.runtime import GameRuntime |
| from world_simulator.config import load_game_config |
| from world_simulator.simulation.connectors.deterministic import ( |
| DeterministicWorldSimulator, |
| ) |
| from world_simulator.simulation.connectors.openai_compatible import ( |
| OpenAICompatibleWorldSimulator, |
| ) |
| from world_simulator.simulation.connectors.routing import RoutingWorldSimulator |
| from world_simulator.simulation.spawning import create_world |
|
|
|
|
| def _tool_response(name: str, arguments: dict[str, object]) -> dict[str, object]: |
| return { |
| "choices": [ |
| { |
| "message": { |
| "role": "assistant", |
| "content": "", |
| "tool_calls": [ |
| { |
| "id": "call_0", |
| "type": "function", |
| "function": { |
| "name": name, |
| "arguments": json.dumps(arguments), |
| }, |
| } |
| ], |
| } |
| } |
| ] |
| } |
|
|
|
|
| def _unusable_response() -> dict[str, object]: |
| |
| return {"choices": [{"message": {"role": "assistant", "content": ""}}]} |
|
|
|
|
| def _parse_briefing(content: str) -> dict[str, object]: |
| """Extract the structured facts a scripted agent needs from the text briefing. |
| |
| The briefing is plain language for the LLM; this parser doubles as a check |
| that every coordinate/id needed to act is actually present in it. |
| """ |
| facts: dict[str, object] = {} |
|
|
| m = re.search(r"YOU are .*?\((\S+?)\), a (\w+)", content) |
| facts["npc_id"] = m.group(1) if m else "unknown" |
| facts["role"] = m.group(2) if m else "gatherer" |
|
|
| m = re.search(r"Turn (\d+)", content) |
| facts["tick"] = int(m.group(1)) if m else 0 |
| m = re.search(r"Health (\d+)/100", content) |
| facts["health"] = int(m.group(1)) if m else 100 |
| m = re.search(r"Hunger (\d+)/100", content) |
| facts["hunger"] = int(m.group(1)) if m else 0 |
|
|
| inv: dict[str, int] = {} |
| m = re.search(r"Carrying: (.+?)\.", content) |
| if m: |
| for amount, name in re.findall(r"(\d+) (food|herbs|wood|weapon|coins)", m.group(1)): |
| inv[name] = int(amount) |
| facts["inventory"] = inv |
|
|
| m = re.search(r"YOUR POSITION: X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content) |
| facts["pos"] = (float(m.group(1)), float(m.group(2))) if m else (0.0, 0.0) |
|
|
| facts["is_ruler"] = "Ruler: you." in content |
|
|
| m = re.search( |
| r"enemy treasury \"(\S+?)\" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content |
| ) |
| if m: |
| facts["enemy_treasury_id"] = m.group(1) |
| facts["enemy_treasury_pos"] = (float(m.group(2)), float(m.group(3))) |
| m = re.search(r"enemy treasury.*?Holds (\d+) coins", content, re.S) |
| facts["enemy_coins"] = int(m.group(1)) if m else 0 |
|
|
| m = re.search( |
| r"Home treasury \"(\S+?)\" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content |
| ) |
| if m: |
| facts["home_treasury_id"] = m.group(1) |
| facts["home_treasury_pos"] = (float(m.group(2)), float(m.group(3))) |
|
|
| m = re.search(r"Your cannon \"(\S+?)\".*?operator: (\S+?)\)", content) |
| if m: |
| facts["cannon_id"] = m.group(1) |
| facts["cannon_operator"] = None if m.group(2) == "none" else m.group(2) |
|
|
| facts["voting_open"] = "ELECTION IN PROGRESS" in content and "You have NOT voted" in content |
| m = re.search(r"Candidates: (.+?)\.", content) |
| facts["candidates"] = ( |
| [c.strip() for c in m.group(1).split(",")] if m else [] |
| ) |
|
|
| facts["threats"] = _bullets_section(content, "THREATS NEAR YOU") |
| facts["allies"] = _bullets_section(content, "ALLIES NEAR YOU") |
| facts["resources"] = _bullets_section(content, "RESOURCES NEAR YOU") |
|
|
| |
| facts["in_danger"] = "*** DANGER" in content |
|
|
| |
| repair_targets: list[str] = [] |
| rebuild_targets: list[str] = [] |
| in_base = False |
| for line in content.splitlines(): |
| if line.startswith("YOUR BASE"): |
| in_base = True |
| continue |
| if in_base: |
| if line.strip().startswith("- "): |
| mid = re.search(r'"(\S+?)"', line) |
| mpos = re.search( |
| r'"(\S+?)" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)', line |
| ) |
| if mid and "DAMAGED" in line: |
| repair_targets.append(mid.group(1)) |
| elif mid and "DESTROYED" in line: |
| rebuild_targets.append(mid.group(1)) |
| if ( |
| mpos |
| and ("your home" in line or "safe" in line) |
| and "home_house_pos" not in facts |
| ): |
| facts["home_house_pos"] = (float(mpos.group(2)), float(mpos.group(3))) |
| elif line.strip() == "" or not line.startswith(" "): |
| in_base = False |
| facts["repair_targets"] = repair_targets |
| facts["rebuild_targets"] = rebuild_targets |
|
|
| m = re.search(r"ACTIONS YOU CAN TAKE: (.+?)\.", content) |
| facts["allowed"] = ( |
| {a.strip() for a in m.group(1).split(",")} if m else {"move", "speak"} |
| ) |
| return facts |
|
|
|
|
| def _bullets_section(content: str, header: str) -> list[dict[str, object]]: |
| """Parse the " - <id> ..." bullet lines under a section header.""" |
| lines = content.splitlines() |
| items: list[dict[str, object]] = [] |
| capturing = False |
| for line in lines: |
| if line.startswith(header): |
| capturing = True |
| continue |
| if capturing: |
| if line.strip().startswith("- "): |
| m = re.search(r"\"(\S+?)\"", line) |
| if m: |
| items.append( |
| { |
| "id": m.group(1), |
| "can_attack": "IN ATTACK RANGE" in line, |
| "gatherable": "close enough to gather" in line, |
| } |
| ) |
| elif line.strip() == "" or not line.startswith(" "): |
| break |
| return items |
|
|
|
|
| |
| |
| _RULER_ORDERS = [ |
| "{ally}, gather food and bank your coins in our treasury.", |
| "Builders, repair our homes before the beast comes back.", |
| "Guards, hold the line and keep the families safe.", |
| "Bring your wages home, {ally} - our nation must grow rich.", |
| "Stay near the houses tonight; we raise the next generation.", |
| "{ally}, watch the eastern approach and report what you see.", |
| ] |
| _CITIZEN_CHATTER = [ |
| "Hauling my coins to the treasury, {ally}.", |
| "Careful, {ally} - a beast was prowling nearby.", |
| "I'll gather what I can for us today.", |
| "Our home could use repairs soon.", |
| "Stay safe out there, {ally}.", |
| "Good harvest today - the treasury grows.", |
| ] |
|
|
|
|
| def make_fake_completer(): |
| """Return a thread-safe completer that decides one action from the briefing. |
| |
| This stands in for a well-prompted model: it follows the role mandates in the |
| new briefing (rulers govern, builders repair, guards protect, everyone banks |
| coins and flees lethal danger) so the offline ledger shows a living society. |
| """ |
|
|
| def _say(pool: list[str], seed: int, ally_id: str | None, target: str | None): |
| line = pool[seed % len(pool)].format(ally=ally_id or "friend") |
| args: dict[str, object] = {"message": line, "intent": "social"} |
| if target is not None: |
| args["target_id"] = target |
| return _tool_response("speak", args) |
|
|
| def complete(request: dict[str, object]) -> dict[str, object]: |
| messages = request.get("messages") or [] |
| user = next((m for m in reversed(messages) if m.get("role") == "user"), None) |
| if user is None: |
| return _unusable_response() |
| f = _parse_briefing(user["content"]) |
|
|
| npc_id = str(f["npc_id"]) |
| role = str(f["role"]) |
| tick = int(f["tick"]) |
| allowed = set(f["allowed"]) |
| inv = dict(f["inventory"]) |
| threats = list(f["threats"]) |
| allies = list(f["allies"]) |
| resources = list(f["resources"]) |
| coins = int(inv.get("coins", 0)) |
| ally_id = allies[0]["id"] if allies else None |
| own_x, own_z = f["pos"] |
| seed = (hash(npc_id) ^ (tick * 2654435761)) & 0xFFFFFFFF |
|
|
| |
| if seed % 47 == 0: |
| return _unusable_response() |
|
|
| |
| if "vote" in allowed and f["voting_open"] and f["candidates"]: |
| candidates = list(f["candidates"]) |
| return _tool_response("vote", {"candidate_id": candidates[seed % len(candidates)]}) |
|
|
| |
| if f["in_danger"]: |
| if seed % 2 == 0: |
| return _tool_response( |
| "speak", |
| {"message": "Help! A beast is on me!", "intent": "help_request"}, |
| ) |
| return _tool_response("move", {"away": True}) |
|
|
| |
| if int(f["hunger"]) >= 45 and inv.get("food", 0) > 0: |
| return _tool_response("use", {"use_type": "eat", "resource_type": "food"}) |
| if int(f["health"]) < 55 and inv.get("herbs", 0) > 0: |
| return _tool_response("use", {"use_type": "heal", "resource_type": "herbs"}) |
|
|
| |
| if f["is_ruler"]: |
| if "cannon_id" not in f and inv.get("coins", 0) >= 0: |
| return _tool_response("use", {"use_type": "craft", "params": {"recipe_id": "cannon"}}) |
| if "cannon_id" in f and f.get("cannon_operator") is None and ally_id: |
| return _tool_response( |
| "use", |
| {"use_type": "assign_cannon_operator", "params": {"npc_id": ally_id}}, |
| ) |
| if coins > 0 and "home_treasury_id" in f: |
| return _tool_response( |
| "transfer", |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, |
| ) |
| |
| return _say(_RULER_ORDERS, seed, ally_id, ally_id) |
|
|
| |
| if role == "guard": |
| if threats: |
| return _tool_response("attack", {"target_entity_id": threats[0]["id"]}) |
| if coins >= 3 and "home_treasury_id" in f: |
| return _tool_response( |
| "transfer", |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, |
| ) |
| if ( |
| "enemy_treasury_pos" in f |
| and int(f["enemy_coins"]) > 0 |
| and not f["repair_targets"] |
| and not f["rebuild_targets"] |
| ): |
| ex, ez = f["enemy_treasury_pos"] |
| if abs(ex - own_x) + abs(ez - own_z) <= 6 and seed % 3 == 0: |
| return _tool_response( |
| "transfer", |
| { |
| "target_id": f["enemy_treasury_id"], |
| "resource_type": "coins", |
| "amount": 2, |
| "take": True, |
| }, |
| ) |
| if seed % 11 == 0: |
| return _tool_response("move", {"x": ex, "z": ez}) |
| |
| if resources: |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) |
| return _tool_response("move", {"x": own_x, "z": own_z}) |
|
|
| |
| if role == "builder": |
| if f["repair_targets"]: |
| return _tool_response("use", {"use_type": "repair"}) |
| if f["rebuild_targets"]: |
| return _tool_response("use", {"use_type": "build"}) |
| if coins >= 3 and "home_treasury_id" in f: |
| return _tool_response( |
| "transfer", |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, |
| ) |
| wood_nodes = [r for r in resources if "wood" in str(r.get("id", ""))] |
| if wood_nodes: |
| return _tool_response("use", {"use_type": "gather", "resource_id": wood_nodes[0]["id"]}) |
| if inv.get("wood", 0) >= 5: |
| return _tool_response("use", {"use_type": "build"}) |
| if resources: |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) |
|
|
| |
| if coins >= 2 and "home_treasury_id" in f: |
| return _tool_response( |
| "transfer", |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, |
| ) |
|
|
| |
| if "home_treasury_id" in f and seed % 6 == 0: |
| for res in ("wood", "food", "herbs"): |
| if inv.get(res, 0) > 1: |
| return _tool_response( |
| "transfer", |
| {"target_id": f["home_treasury_id"], "resource_type": res, "amount": 1}, |
| ) |
|
|
| |
| if ( |
| int(f["health"]) >= 85 |
| and int(f["hunger"]) <= 35 |
| and not threats |
| and coins == 0 |
| and "home_house_pos" in f |
| and seed % 2 == 0 |
| ): |
| hx, hz = f["home_house_pos"] |
| return _tool_response("move", {"x": hx, "z": hz}) |
|
|
| |
| if resources and seed % 7 != 0: |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) |
| if allies and seed % 5 == 0: |
| return _say(_CITIZEN_CHATTER, seed, ally_id, ally_id) |
| if resources: |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) |
|
|
| x = own_x + ((seed % 7) - 3) |
| z = own_z + ((seed % 5) - 2) |
| return _tool_response("move", {"x": x, "z": z}) |
|
|
| return complete |
|
|
|
|
| def build_runtime(config_path: Path) -> GameRuntime: |
| config = load_game_config(config_path) |
| world = create_world(config) |
| completer = make_fake_completer() |
| deterministic = DeterministicWorldSimulator() |
|
|
| routes = { |
| connector_id: OpenAICompatibleWorldSimulator( |
| connector_cfg, |
| fallback=deterministic, |
| connector_id_filter=connector_id, |
| chat_completer=completer, |
| ) |
| for connector_id, connector_cfg in config.secondary_connectors.items() |
| if connector_cfg.type == "openai_compatible" |
| } |
| default = OpenAICompatibleWorldSimulator( |
| config.connector, |
| fallback=deterministic, |
| connector_id_filter=None, |
| chat_completer=completer, |
| ) |
| simulator = RoutingWorldSimulator(routes=routes, default=default) |
| return GameRuntime(world=world, simulator=simulator, config=config) |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Offline LLM-shaped self-play.") |
| parser.add_argument("--ticks", type=int, default=150) |
| parser.add_argument("--config", type=Path, default=REPO_ROOT / "config" / "game.modal.local.json") |
| args = parser.parse_args() |
|
|
| runtime = build_runtime(args.config) |
| print(f"SELFPLAY start config={args.config.name} ticks={args.ticks} simulator={runtime.simulator_name}") |
|
|
| for _ in range(args.ticks): |
| status, _payload = runtime.tick() |
| if int(status) != 200: |
| print(f"tick failed: {status}") |
| break |
|
|
| ledger_path = runtime._ledger.ledger_path |
| print(f"SELFPLAY done. ledger={ledger_path}") |
| _summarize(ledger_path) |
|
|
|
|
| def _summarize(ledger_path: Path) -> None: |
| phases: Counter[str] = Counter() |
| models_by_npc: dict[str, set[str]] = {} |
| verdicts: Counter[str] = Counter() |
| event_types: Counter[str] = Counter() |
| actions: Counter[str] = Counter() |
| fallback_reasons: Counter[str] = Counter() |
|
|
| with ledger_path.open("r", encoding="utf-8") as handle: |
| for line in handle: |
| if not line.strip(): |
| continue |
| rec = json.loads(line) |
| phase = rec.get("phase", "?") |
| phases[phase] += 1 |
| if phase == "npc_request": |
| models_by_npc.setdefault(str(rec.get("npc_id")), set()).add( |
| f"{rec.get('model_profile')}::{rec.get('model')}" |
| ) |
| if phase == "npc_response": |
| verdict = rec.get("validator_verdict") or {} |
| verdicts[str(verdict.get("status"))] += 1 |
| parsed = rec.get("parsed_action") or {} |
| if isinstance(parsed, dict) and parsed.get("action"): |
| actions[str(parsed["action"])] += 1 |
| if phase == "npc_fallback": |
| fallback_reasons[str(rec.get("reason"))] += 1 |
| if phase == "engine_events": |
| for event in rec.get("events") or []: |
| event_types[str(event.get("type"))] += 1 |
|
|
| print("\n== ledger phases ==") |
| for phase, count in phases.most_common(): |
| print(f" {phase}: {count}") |
| print("\n== model per NPC (from npc_request) ==") |
| for npc_id in sorted(models_by_npc): |
| print(f" {npc_id}: {sorted(models_by_npc[npc_id])}") |
| print("\n== validator verdicts (npc_response) ==") |
| for status, count in verdicts.most_common(): |
| print(f" {status}: {count}") |
| print("\n== parsed LLM actions ==") |
| for action, count in actions.most_common(): |
| print(f" {action}: {count}") |
| print("\n== fallback reasons ==") |
| for reason, count in fallback_reasons.most_common(): |
| print(f" {reason}: {count}") |
| print("\n== engine event types ==") |
| for event_type, count in event_types.most_common(): |
| print(f" {event_type}: {count}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|