Spaces:
Runtime error
Runtime error
| """LLM-shaped self-play harness. | |
| Drives the *real* runtime pipeline (RoutingWorldSimulator -> OpenAICompatible | |
| connectors -> ledger) but injects a deterministic, behaviour-rich "model" | |
| instead of calling a remote endpoint. This makes a full 100-300 tick self-play | |
| reproducible offline while still exercising: | |
| * per-NPC npc_request / npc_response ledger records with model attribution; | |
| * the Qwen secondary connector routing (qwen-* NPCs) vs the default connector | |
| (nemotron NPCs), each logged with its own model/model_profile; | |
| * the fallback path (a few "unusable" model outputs) being explicitly logged; | |
| * elections + vote, treasury deposit/steal via transfer, cannon craft via use, | |
| speak, gather, eat/heal — the full action surface. | |
| Usage: | |
| .venv\\Scripts\\python.exe scripts\\selfplay_sim.py --ticks 150 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import re | |
| from collections import Counter | |
| from pathlib import Path | |
| import sys | |
| REPO_ROOT = Path(__file__).resolve().parents[1] | |
| SRC_ROOT = REPO_ROOT / "src" | |
| if str(SRC_ROOT) not in sys.path: | |
| sys.path.insert(0, str(SRC_ROOT)) | |
| from world_simulator.api.runtime import GameRuntime # noqa: E402 | |
| from world_simulator.config import load_game_config # noqa: E402 | |
| from world_simulator.simulation.connectors.deterministic import ( # noqa: E402 | |
| DeterministicWorldSimulator, | |
| ) | |
| from world_simulator.simulation.connectors.openai_compatible import ( # noqa: E402 | |
| OpenAICompatibleWorldSimulator, | |
| ) | |
| from world_simulator.simulation.connectors.routing import RoutingWorldSimulator # noqa: E402 | |
| from world_simulator.simulation.spawning import create_world # noqa: E402 | |
| def _tool_response(name: str, arguments: dict[str, object]) -> dict[str, object]: | |
| return { | |
| "choices": [ | |
| { | |
| "message": { | |
| "role": "assistant", | |
| "content": "", | |
| "tool_calls": [ | |
| { | |
| "id": "call_0", | |
| "type": "function", | |
| "function": { | |
| "name": name, | |
| "arguments": json.dumps(arguments), | |
| }, | |
| } | |
| ], | |
| } | |
| } | |
| ] | |
| } | |
| def _unusable_response() -> dict[str, object]: | |
| # Neither tool calls nor JSON content -> connector logs a fallback explicitly. | |
| return {"choices": [{"message": {"role": "assistant", "content": ""}}]} | |
| def _parse_briefing(content: str) -> dict[str, object]: | |
| """Extract the structured facts a scripted agent needs from the text briefing. | |
| The briefing is plain language for the LLM; this parser doubles as a check | |
| that every coordinate/id needed to act is actually present in it. | |
| """ | |
| facts: dict[str, object] = {} | |
| m = re.search(r"YOU are .*?\((\S+?)\), a (\w+)", content) | |
| facts["npc_id"] = m.group(1) if m else "unknown" | |
| facts["role"] = m.group(2) if m else "gatherer" | |
| m = re.search(r"Turn (\d+)", content) | |
| facts["tick"] = int(m.group(1)) if m else 0 | |
| m = re.search(r"Health (\d+)/100", content) | |
| facts["health"] = int(m.group(1)) if m else 100 | |
| m = re.search(r"Hunger (\d+)/100", content) | |
| facts["hunger"] = int(m.group(1)) if m else 0 | |
| inv: dict[str, int] = {} | |
| m = re.search(r"Carrying: (.+?)\.", content) | |
| if m: | |
| for amount, name in re.findall(r"(\d+) (food|herbs|wood|weapon|coins)", m.group(1)): | |
| inv[name] = int(amount) | |
| facts["inventory"] = inv | |
| m = re.search(r"YOUR POSITION: X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content) | |
| facts["pos"] = (float(m.group(1)), float(m.group(2))) if m else (0.0, 0.0) | |
| facts["is_ruler"] = "Ruler: you." in content | |
| m = re.search( | |
| r"enemy treasury \"(\S+?)\" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content | |
| ) | |
| if m: | |
| facts["enemy_treasury_id"] = m.group(1) | |
| facts["enemy_treasury_pos"] = (float(m.group(2)), float(m.group(3))) | |
| m = re.search(r"enemy treasury.*?Holds (\d+) coins", content, re.S) | |
| facts["enemy_coins"] = int(m.group(1)) if m else 0 | |
| m = re.search( | |
| r"Home treasury \"(\S+?)\" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content | |
| ) | |
| if m: | |
| facts["home_treasury_id"] = m.group(1) | |
| facts["home_treasury_pos"] = (float(m.group(2)), float(m.group(3))) | |
| m = re.search(r"Your cannon \"(\S+?)\".*?operator: (\S+?)\)", content) | |
| if m: | |
| facts["cannon_id"] = m.group(1) | |
| facts["cannon_operator"] = None if m.group(2) == "none" else m.group(2) | |
| facts["voting_open"] = "ELECTION IN PROGRESS" in content and "You have NOT voted" in content | |
| m = re.search(r"Candidates: (.+?)\.", content) | |
| facts["candidates"] = ( | |
| [c.strip() for c in m.group(1).split(",")] if m else [] | |
| ) | |
| facts["threats"] = _bullets_section(content, "THREATS NEAR YOU") | |
| facts["allies"] = _bullets_section(content, "ALLIES NEAR YOU") | |
| facts["resources"] = _bullets_section(content, "RESOURCES NEAR YOU") | |
| # Self-preservation override line printed only when hurt with a threat on you. | |
| facts["in_danger"] = "*** DANGER" in content | |
| # BASE STATUS: which homes need a builder, and where home is (to rest/breed). | |
| repair_targets: list[str] = [] | |
| rebuild_targets: list[str] = [] | |
| in_base = False | |
| for line in content.splitlines(): | |
| if line.startswith("YOUR BASE"): | |
| in_base = True | |
| continue | |
| if in_base: | |
| if line.strip().startswith("- "): | |
| mid = re.search(r'"(\S+?)"', line) | |
| mpos = re.search( | |
| r'"(\S+?)" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)', line | |
| ) | |
| if mid and "DAMAGED" in line: | |
| repair_targets.append(mid.group(1)) | |
| elif mid and "DESTROYED" in line: | |
| rebuild_targets.append(mid.group(1)) | |
| if ( | |
| mpos | |
| and ("your home" in line or "safe" in line) | |
| and "home_house_pos" not in facts | |
| ): | |
| facts["home_house_pos"] = (float(mpos.group(2)), float(mpos.group(3))) | |
| elif line.strip() == "" or not line.startswith(" "): | |
| in_base = False | |
| facts["repair_targets"] = repair_targets | |
| facts["rebuild_targets"] = rebuild_targets | |
| m = re.search(r"ACTIONS YOU CAN TAKE: (.+?)\.", content) | |
| facts["allowed"] = ( | |
| {a.strip() for a in m.group(1).split(",")} if m else {"move", "speak"} | |
| ) | |
| return facts | |
| def _bullets_section(content: str, header: str) -> list[dict[str, object]]: | |
| """Parse the " - <id> ..." bullet lines under a section header.""" | |
| lines = content.splitlines() | |
| items: list[dict[str, object]] = [] | |
| capturing = False | |
| for line in lines: | |
| if line.startswith(header): | |
| capturing = True | |
| continue | |
| if capturing: | |
| if line.strip().startswith("- "): | |
| m = re.search(r"\"(\S+?)\"", line) | |
| if m: | |
| items.append( | |
| { | |
| "id": m.group(1), | |
| "can_attack": "IN ATTACK RANGE" in line, | |
| "gatherable": "close enough to gather" in line, | |
| } | |
| ) | |
| elif line.strip() == "" or not line.startswith(" "): | |
| break | |
| return items | |
| # Varied speech pools so NPCs sound alive instead of chanting one war cry. The | |
| # old single hard-coded line was exactly the "zombie raider spam" we are fixing. | |
| _RULER_ORDERS = [ | |
| "{ally}, gather food and bank your coins in our treasury.", | |
| "Builders, repair our homes before the beast comes back.", | |
| "Guards, hold the line and keep the families safe.", | |
| "Bring your wages home, {ally} - our nation must grow rich.", | |
| "Stay near the houses tonight; we raise the next generation.", | |
| "{ally}, watch the eastern approach and report what you see.", | |
| ] | |
| _CITIZEN_CHATTER = [ | |
| "Hauling my coins to the treasury, {ally}.", | |
| "Careful, {ally} - a beast was prowling nearby.", | |
| "I'll gather what I can for us today.", | |
| "Our home could use repairs soon.", | |
| "Stay safe out there, {ally}.", | |
| "Good harvest today - the treasury grows.", | |
| ] | |
| def make_fake_completer(): | |
| """Return a thread-safe completer that decides one action from the briefing. | |
| This stands in for a well-prompted model: it follows the role mandates in the | |
| new briefing (rulers govern, builders repair, guards protect, everyone banks | |
| coins and flees lethal danger) so the offline ledger shows a living society. | |
| """ | |
| def _say(pool: list[str], seed: int, ally_id: str | None, target: str | None): | |
| line = pool[seed % len(pool)].format(ally=ally_id or "friend") | |
| args: dict[str, object] = {"message": line, "intent": "social"} | |
| if target is not None: | |
| args["target_id"] = target | |
| return _tool_response("speak", args) | |
| def complete(request: dict[str, object]) -> dict[str, object]: | |
| messages = request.get("messages") or [] | |
| user = next((m for m in reversed(messages) if m.get("role") == "user"), None) | |
| if user is None: | |
| return _unusable_response() | |
| f = _parse_briefing(user["content"]) | |
| npc_id = str(f["npc_id"]) | |
| role = str(f["role"]) | |
| tick = int(f["tick"]) | |
| allowed = set(f["allowed"]) # type: ignore[arg-type] | |
| inv = dict(f["inventory"]) # type: ignore[arg-type] | |
| threats = list(f["threats"]) # type: ignore[arg-type] | |
| allies = list(f["allies"]) # type: ignore[arg-type] | |
| resources = list(f["resources"]) # type: ignore[arg-type] | |
| coins = int(inv.get("coins", 0)) | |
| ally_id = allies[0]["id"] if allies else None | |
| own_x, own_z = f["pos"] # type: ignore[misc] | |
| seed = (hash(npc_id) ^ (tick * 2654435761)) & 0xFFFFFFFF | |
| # Rarely emit an unusable output so the fallback path is visibly logged. | |
| if seed % 47 == 0: | |
| return _unusable_response() | |
| # --- elections: vote when the window is open and we have not voted --- | |
| if "vote" in allowed and f["voting_open"] and f["candidates"]: | |
| candidates = list(f["candidates"]) # type: ignore[arg-type] | |
| return _tool_response("vote", {"candidate_id": candidates[seed % len(candidates)]}) | |
| # --- SELF-PRESERVATION first: hurt + a threat on you -> flee or shout --- | |
| if f["in_danger"]: | |
| if seed % 2 == 0: | |
| return _tool_response( | |
| "speak", | |
| {"message": "Help! A beast is on me!", "intent": "help_request"}, | |
| ) | |
| return _tool_response("move", {"away": True}) | |
| # --- survival upkeep --- | |
| if int(f["hunger"]) >= 45 and inv.get("food", 0) > 0: | |
| return _tool_response("use", {"use_type": "eat", "resource_type": "food"}) | |
| if int(f["health"]) < 55 and inv.get("herbs", 0) > 0: | |
| return _tool_response("use", {"use_type": "heal", "resource_type": "herbs"}) | |
| # --- RULER: govern; never personally raid --- | |
| if f["is_ruler"]: | |
| if "cannon_id" not in f and inv.get("coins", 0) >= 0: | |
| return _tool_response("use", {"use_type": "craft", "params": {"recipe_id": "cannon"}}) | |
| if "cannon_id" in f and f.get("cannon_operator") is None and ally_id: | |
| return _tool_response( | |
| "use", | |
| {"use_type": "assign_cannon_operator", "params": {"npc_id": ally_id}}, | |
| ) | |
| if coins > 0 and "home_treasury_id" in f: | |
| return _tool_response( | |
| "transfer", | |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, | |
| ) | |
| # Command the nation: address citizens by name with varied orders. | |
| return _say(_RULER_ORDERS, seed, ally_id, ally_id) | |
| # --- GUARD: protect allies and homes; raid only when home is safe --- | |
| if role == "guard": | |
| if threats: | |
| return _tool_response("attack", {"target_entity_id": threats[0]["id"]}) | |
| if coins >= 3 and "home_treasury_id" in f: | |
| return _tool_response( | |
| "transfer", | |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, | |
| ) | |
| if ( | |
| "enemy_treasury_pos" in f | |
| and int(f["enemy_coins"]) > 0 | |
| and not f["repair_targets"] | |
| and not f["rebuild_targets"] | |
| ): | |
| ex, ez = f["enemy_treasury_pos"] # type: ignore[misc] | |
| if abs(ex - own_x) + abs(ez - own_z) <= 6 and seed % 3 == 0: | |
| return _tool_response( | |
| "transfer", | |
| { | |
| "target_id": f["enemy_treasury_id"], | |
| "resource_type": "coins", | |
| "amount": 2, | |
| "take": True, | |
| }, | |
| ) | |
| if seed % 11 == 0: | |
| return _tool_response("move", {"x": ex, "z": ez}) | |
| # Otherwise stay home and stand guard / earn. | |
| if resources: | |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) | |
| return _tool_response("move", {"x": own_x, "z": own_z}) | |
| # --- BUILDER: repair/raise homes before anything else --- | |
| if role == "builder": | |
| if f["repair_targets"]: | |
| return _tool_response("use", {"use_type": "repair"}) | |
| if f["rebuild_targets"]: | |
| return _tool_response("use", {"use_type": "build"}) | |
| if coins >= 3 and "home_treasury_id" in f: | |
| return _tool_response( | |
| "transfer", | |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, | |
| ) | |
| wood_nodes = [r for r in resources if "wood" in str(r.get("id", ""))] | |
| if wood_nodes: | |
| return _tool_response("use", {"use_type": "gather", "resource_id": wood_nodes[0]["id"]}) | |
| if inv.get("wood", 0) >= 5: | |
| return _tool_response("use", {"use_type": "build"}) | |
| if resources: | |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) | |
| # --- bank wages: everyone carries coins home to enrich the nation --- | |
| if coins >= 2 and "home_treasury_id" in f: | |
| return _tool_response( | |
| "transfer", | |
| {"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins}, | |
| ) | |
| # --- deposit other surplus occasionally --- | |
| if "home_treasury_id" in f and seed % 6 == 0: | |
| for res in ("wood", "food", "herbs"): | |
| if inv.get(res, 0) > 1: | |
| return _tool_response( | |
| "transfer", | |
| {"target_id": f["home_treasury_id"], "resource_type": res, "amount": 1}, | |
| ) | |
| # --- when safe, fed and healthy, rest at home to raise a family --- | |
| if ( | |
| int(f["health"]) >= 85 | |
| and int(f["hunger"]) <= 35 | |
| and not threats | |
| and coins == 0 | |
| and "home_house_pos" in f | |
| and seed % 2 == 0 | |
| ): | |
| hx, hz = f["home_house_pos"] # type: ignore[misc] | |
| return _tool_response("move", {"x": hx, "z": hz}) | |
| # --- gather to earn coins, with the occasional friendly word --- | |
| if resources and seed % 7 != 0: | |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) | |
| if allies and seed % 5 == 0: | |
| return _say(_CITIZEN_CHATTER, seed, ally_id, ally_id) | |
| if resources: | |
| return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]}) | |
| x = own_x + ((seed % 7) - 3) | |
| z = own_z + ((seed % 5) - 2) | |
| return _tool_response("move", {"x": x, "z": z}) | |
| return complete | |
| def build_runtime(config_path: Path) -> GameRuntime: | |
| config = load_game_config(config_path) | |
| world = create_world(config) | |
| completer = make_fake_completer() | |
| deterministic = DeterministicWorldSimulator() | |
| routes = { | |
| connector_id: OpenAICompatibleWorldSimulator( | |
| connector_cfg, | |
| fallback=deterministic, | |
| connector_id_filter=connector_id, | |
| chat_completer=completer, | |
| ) | |
| for connector_id, connector_cfg in config.secondary_connectors.items() | |
| if connector_cfg.type == "openai_compatible" | |
| } | |
| default = OpenAICompatibleWorldSimulator( | |
| config.connector, | |
| fallback=deterministic, | |
| connector_id_filter=None, | |
| chat_completer=completer, | |
| ) | |
| simulator = RoutingWorldSimulator(routes=routes, default=default) | |
| return GameRuntime(world=world, simulator=simulator, config=config) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Offline LLM-shaped self-play.") | |
| parser.add_argument("--ticks", type=int, default=150) | |
| parser.add_argument("--config", type=Path, default=REPO_ROOT / "config" / "game.modal.local.json") | |
| args = parser.parse_args() | |
| runtime = build_runtime(args.config) | |
| print(f"SELFPLAY start config={args.config.name} ticks={args.ticks} simulator={runtime.simulator_name}") | |
| for _ in range(args.ticks): | |
| status, _payload = runtime.tick() | |
| if int(status) != 200: | |
| print(f"tick failed: {status}") | |
| break | |
| ledger_path = runtime._ledger.ledger_path # noqa: SLF001 - harness introspection | |
| print(f"SELFPLAY done. ledger={ledger_path}") | |
| _summarize(ledger_path) | |
| def _summarize(ledger_path: Path) -> None: | |
| phases: Counter[str] = Counter() | |
| models_by_npc: dict[str, set[str]] = {} | |
| verdicts: Counter[str] = Counter() | |
| event_types: Counter[str] = Counter() | |
| actions: Counter[str] = Counter() | |
| fallback_reasons: Counter[str] = Counter() | |
| with ledger_path.open("r", encoding="utf-8") as handle: | |
| for line in handle: | |
| if not line.strip(): | |
| continue | |
| rec = json.loads(line) | |
| phase = rec.get("phase", "?") | |
| phases[phase] += 1 | |
| if phase == "npc_request": | |
| models_by_npc.setdefault(str(rec.get("npc_id")), set()).add( | |
| f"{rec.get('model_profile')}::{rec.get('model')}" | |
| ) | |
| if phase == "npc_response": | |
| verdict = rec.get("validator_verdict") or {} | |
| verdicts[str(verdict.get("status"))] += 1 | |
| parsed = rec.get("parsed_action") or {} | |
| if isinstance(parsed, dict) and parsed.get("action"): | |
| actions[str(parsed["action"])] += 1 | |
| if phase == "npc_fallback": | |
| fallback_reasons[str(rec.get("reason"))] += 1 | |
| if phase == "engine_events": | |
| for event in rec.get("events") or []: | |
| event_types[str(event.get("type"))] += 1 | |
| print("\n== ledger phases ==") | |
| for phase, count in phases.most_common(): | |
| print(f" {phase}: {count}") | |
| print("\n== model per NPC (from npc_request) ==") | |
| for npc_id in sorted(models_by_npc): | |
| print(f" {npc_id}: {sorted(models_by_npc[npc_id])}") | |
| print("\n== validator verdicts (npc_response) ==") | |
| for status, count in verdicts.most_common(): | |
| print(f" {status}: {count}") | |
| print("\n== parsed LLM actions ==") | |
| for action, count in actions.most_common(): | |
| print(f" {action}: {count}") | |
| print("\n== fallback reasons ==") | |
| for reason, count in fallback_reasons.most_common(): | |
| print(f" {reason}: {count}") | |
| print("\n== engine event types ==") | |
| for event_type, count in event_types.most_common(): | |
| print(f" {event_type}: {count}") | |
| if __name__ == "__main__": | |
| main() | |