world-simulator / scripts /selfplay_sim.py
kikikita's picture
Enhance survival system with economic mechanics and role-specific directives
ef618e5
Raw
History Blame Contribute Delete
20.2 kB
"""LLM-shaped self-play harness.
Drives the *real* runtime pipeline (RoutingWorldSimulator -> OpenAICompatible
connectors -> ledger) but injects a deterministic, behaviour-rich "model"
instead of calling a remote endpoint. This makes a full 100-300 tick self-play
reproducible offline while still exercising:
* per-NPC npc_request / npc_response ledger records with model attribution;
* the Qwen secondary connector routing (qwen-* NPCs) vs the default connector
(nemotron NPCs), each logged with its own model/model_profile;
* the fallback path (a few "unusable" model outputs) being explicitly logged;
* elections + vote, treasury deposit/steal via transfer, cannon craft via use,
speak, gather, eat/heal — the full action surface.
Usage:
.venv\\Scripts\\python.exe scripts\\selfplay_sim.py --ticks 150
"""
from __future__ import annotations
import argparse
import json
import re
from collections import Counter
from pathlib import Path
import sys
REPO_ROOT = Path(__file__).resolve().parents[1]
SRC_ROOT = REPO_ROOT / "src"
if str(SRC_ROOT) not in sys.path:
sys.path.insert(0, str(SRC_ROOT))
from world_simulator.api.runtime import GameRuntime # noqa: E402
from world_simulator.config import load_game_config # noqa: E402
from world_simulator.simulation.connectors.deterministic import ( # noqa: E402
DeterministicWorldSimulator,
)
from world_simulator.simulation.connectors.openai_compatible import ( # noqa: E402
OpenAICompatibleWorldSimulator,
)
from world_simulator.simulation.connectors.routing import RoutingWorldSimulator # noqa: E402
from world_simulator.simulation.spawning import create_world # noqa: E402
def _tool_response(name: str, arguments: dict[str, object]) -> dict[str, object]:
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_0",
"type": "function",
"function": {
"name": name,
"arguments": json.dumps(arguments),
},
}
],
}
}
]
}
def _unusable_response() -> dict[str, object]:
# Neither tool calls nor JSON content -> connector logs a fallback explicitly.
return {"choices": [{"message": {"role": "assistant", "content": ""}}]}
def _parse_briefing(content: str) -> dict[str, object]:
"""Extract the structured facts a scripted agent needs from the text briefing.
The briefing is plain language for the LLM; this parser doubles as a check
that every coordinate/id needed to act is actually present in it.
"""
facts: dict[str, object] = {}
m = re.search(r"YOU are .*?\((\S+?)\), a (\w+)", content)
facts["npc_id"] = m.group(1) if m else "unknown"
facts["role"] = m.group(2) if m else "gatherer"
m = re.search(r"Turn (\d+)", content)
facts["tick"] = int(m.group(1)) if m else 0
m = re.search(r"Health (\d+)/100", content)
facts["health"] = int(m.group(1)) if m else 100
m = re.search(r"Hunger (\d+)/100", content)
facts["hunger"] = int(m.group(1)) if m else 0
inv: dict[str, int] = {}
m = re.search(r"Carrying: (.+?)\.", content)
if m:
for amount, name in re.findall(r"(\d+) (food|herbs|wood|weapon|coins)", m.group(1)):
inv[name] = int(amount)
facts["inventory"] = inv
m = re.search(r"YOUR POSITION: X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content)
facts["pos"] = (float(m.group(1)), float(m.group(2))) if m else (0.0, 0.0)
facts["is_ruler"] = "Ruler: you." in content
m = re.search(
r"enemy treasury \"(\S+?)\" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content
)
if m:
facts["enemy_treasury_id"] = m.group(1)
facts["enemy_treasury_pos"] = (float(m.group(2)), float(m.group(3)))
m = re.search(r"enemy treasury.*?Holds (\d+) coins", content, re.S)
facts["enemy_coins"] = int(m.group(1)) if m else 0
m = re.search(
r"Home treasury \"(\S+?)\" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)", content
)
if m:
facts["home_treasury_id"] = m.group(1)
facts["home_treasury_pos"] = (float(m.group(2)), float(m.group(3)))
m = re.search(r"Your cannon \"(\S+?)\".*?operator: (\S+?)\)", content)
if m:
facts["cannon_id"] = m.group(1)
facts["cannon_operator"] = None if m.group(2) == "none" else m.group(2)
facts["voting_open"] = "ELECTION IN PROGRESS" in content and "You have NOT voted" in content
m = re.search(r"Candidates: (.+?)\.", content)
facts["candidates"] = (
[c.strip() for c in m.group(1).split(",")] if m else []
)
facts["threats"] = _bullets_section(content, "THREATS NEAR YOU")
facts["allies"] = _bullets_section(content, "ALLIES NEAR YOU")
facts["resources"] = _bullets_section(content, "RESOURCES NEAR YOU")
# Self-preservation override line printed only when hurt with a threat on you.
facts["in_danger"] = "*** DANGER" in content
# BASE STATUS: which homes need a builder, and where home is (to rest/breed).
repair_targets: list[str] = []
rebuild_targets: list[str] = []
in_base = False
for line in content.splitlines():
if line.startswith("YOUR BASE"):
in_base = True
continue
if in_base:
if line.strip().startswith("- "):
mid = re.search(r'"(\S+?)"', line)
mpos = re.search(
r'"(\S+?)" at X=(-?\d+(?:\.\d+)?) Z=(-?\d+(?:\.\d+)?)', line
)
if mid and "DAMAGED" in line:
repair_targets.append(mid.group(1))
elif mid and "DESTROYED" in line:
rebuild_targets.append(mid.group(1))
if (
mpos
and ("your home" in line or "safe" in line)
and "home_house_pos" not in facts
):
facts["home_house_pos"] = (float(mpos.group(2)), float(mpos.group(3)))
elif line.strip() == "" or not line.startswith(" "):
in_base = False
facts["repair_targets"] = repair_targets
facts["rebuild_targets"] = rebuild_targets
m = re.search(r"ACTIONS YOU CAN TAKE: (.+?)\.", content)
facts["allowed"] = (
{a.strip() for a in m.group(1).split(",")} if m else {"move", "speak"}
)
return facts
def _bullets_section(content: str, header: str) -> list[dict[str, object]]:
"""Parse the " - <id> ..." bullet lines under a section header."""
lines = content.splitlines()
items: list[dict[str, object]] = []
capturing = False
for line in lines:
if line.startswith(header):
capturing = True
continue
if capturing:
if line.strip().startswith("- "):
m = re.search(r"\"(\S+?)\"", line)
if m:
items.append(
{
"id": m.group(1),
"can_attack": "IN ATTACK RANGE" in line,
"gatherable": "close enough to gather" in line,
}
)
elif line.strip() == "" or not line.startswith(" "):
break
return items
# Varied speech pools so NPCs sound alive instead of chanting one war cry. The
# old single hard-coded line was exactly the "zombie raider spam" we are fixing.
_RULER_ORDERS = [
"{ally}, gather food and bank your coins in our treasury.",
"Builders, repair our homes before the beast comes back.",
"Guards, hold the line and keep the families safe.",
"Bring your wages home, {ally} - our nation must grow rich.",
"Stay near the houses tonight; we raise the next generation.",
"{ally}, watch the eastern approach and report what you see.",
]
_CITIZEN_CHATTER = [
"Hauling my coins to the treasury, {ally}.",
"Careful, {ally} - a beast was prowling nearby.",
"I'll gather what I can for us today.",
"Our home could use repairs soon.",
"Stay safe out there, {ally}.",
"Good harvest today - the treasury grows.",
]
def make_fake_completer():
"""Return a thread-safe completer that decides one action from the briefing.
This stands in for a well-prompted model: it follows the role mandates in the
new briefing (rulers govern, builders repair, guards protect, everyone banks
coins and flees lethal danger) so the offline ledger shows a living society.
"""
def _say(pool: list[str], seed: int, ally_id: str | None, target: str | None):
line = pool[seed % len(pool)].format(ally=ally_id or "friend")
args: dict[str, object] = {"message": line, "intent": "social"}
if target is not None:
args["target_id"] = target
return _tool_response("speak", args)
def complete(request: dict[str, object]) -> dict[str, object]:
messages = request.get("messages") or []
user = next((m for m in reversed(messages) if m.get("role") == "user"), None)
if user is None:
return _unusable_response()
f = _parse_briefing(user["content"])
npc_id = str(f["npc_id"])
role = str(f["role"])
tick = int(f["tick"])
allowed = set(f["allowed"]) # type: ignore[arg-type]
inv = dict(f["inventory"]) # type: ignore[arg-type]
threats = list(f["threats"]) # type: ignore[arg-type]
allies = list(f["allies"]) # type: ignore[arg-type]
resources = list(f["resources"]) # type: ignore[arg-type]
coins = int(inv.get("coins", 0))
ally_id = allies[0]["id"] if allies else None
own_x, own_z = f["pos"] # type: ignore[misc]
seed = (hash(npc_id) ^ (tick * 2654435761)) & 0xFFFFFFFF
# Rarely emit an unusable output so the fallback path is visibly logged.
if seed % 47 == 0:
return _unusable_response()
# --- elections: vote when the window is open and we have not voted ---
if "vote" in allowed and f["voting_open"] and f["candidates"]:
candidates = list(f["candidates"]) # type: ignore[arg-type]
return _tool_response("vote", {"candidate_id": candidates[seed % len(candidates)]})
# --- SELF-PRESERVATION first: hurt + a threat on you -> flee or shout ---
if f["in_danger"]:
if seed % 2 == 0:
return _tool_response(
"speak",
{"message": "Help! A beast is on me!", "intent": "help_request"},
)
return _tool_response("move", {"away": True})
# --- survival upkeep ---
if int(f["hunger"]) >= 45 and inv.get("food", 0) > 0:
return _tool_response("use", {"use_type": "eat", "resource_type": "food"})
if int(f["health"]) < 55 and inv.get("herbs", 0) > 0:
return _tool_response("use", {"use_type": "heal", "resource_type": "herbs"})
# --- RULER: govern; never personally raid ---
if f["is_ruler"]:
if "cannon_id" not in f and inv.get("coins", 0) >= 0:
return _tool_response("use", {"use_type": "craft", "params": {"recipe_id": "cannon"}})
if "cannon_id" in f and f.get("cannon_operator") is None and ally_id:
return _tool_response(
"use",
{"use_type": "assign_cannon_operator", "params": {"npc_id": ally_id}},
)
if coins > 0 and "home_treasury_id" in f:
return _tool_response(
"transfer",
{"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins},
)
# Command the nation: address citizens by name with varied orders.
return _say(_RULER_ORDERS, seed, ally_id, ally_id)
# --- GUARD: protect allies and homes; raid only when home is safe ---
if role == "guard":
if threats:
return _tool_response("attack", {"target_entity_id": threats[0]["id"]})
if coins >= 3 and "home_treasury_id" in f:
return _tool_response(
"transfer",
{"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins},
)
if (
"enemy_treasury_pos" in f
and int(f["enemy_coins"]) > 0
and not f["repair_targets"]
and not f["rebuild_targets"]
):
ex, ez = f["enemy_treasury_pos"] # type: ignore[misc]
if abs(ex - own_x) + abs(ez - own_z) <= 6 and seed % 3 == 0:
return _tool_response(
"transfer",
{
"target_id": f["enemy_treasury_id"],
"resource_type": "coins",
"amount": 2,
"take": True,
},
)
if seed % 11 == 0:
return _tool_response("move", {"x": ex, "z": ez})
# Otherwise stay home and stand guard / earn.
if resources:
return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]})
return _tool_response("move", {"x": own_x, "z": own_z})
# --- BUILDER: repair/raise homes before anything else ---
if role == "builder":
if f["repair_targets"]:
return _tool_response("use", {"use_type": "repair"})
if f["rebuild_targets"]:
return _tool_response("use", {"use_type": "build"})
if coins >= 3 and "home_treasury_id" in f:
return _tool_response(
"transfer",
{"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins},
)
wood_nodes = [r for r in resources if "wood" in str(r.get("id", ""))]
if wood_nodes:
return _tool_response("use", {"use_type": "gather", "resource_id": wood_nodes[0]["id"]})
if inv.get("wood", 0) >= 5:
return _tool_response("use", {"use_type": "build"})
if resources:
return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]})
# --- bank wages: everyone carries coins home to enrich the nation ---
if coins >= 2 and "home_treasury_id" in f:
return _tool_response(
"transfer",
{"target_id": f["home_treasury_id"], "resource_type": "coins", "amount": coins},
)
# --- deposit other surplus occasionally ---
if "home_treasury_id" in f and seed % 6 == 0:
for res in ("wood", "food", "herbs"):
if inv.get(res, 0) > 1:
return _tool_response(
"transfer",
{"target_id": f["home_treasury_id"], "resource_type": res, "amount": 1},
)
# --- when safe, fed and healthy, rest at home to raise a family ---
if (
int(f["health"]) >= 85
and int(f["hunger"]) <= 35
and not threats
and coins == 0
and "home_house_pos" in f
and seed % 2 == 0
):
hx, hz = f["home_house_pos"] # type: ignore[misc]
return _tool_response("move", {"x": hx, "z": hz})
# --- gather to earn coins, with the occasional friendly word ---
if resources and seed % 7 != 0:
return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]})
if allies and seed % 5 == 0:
return _say(_CITIZEN_CHATTER, seed, ally_id, ally_id)
if resources:
return _tool_response("use", {"use_type": "gather", "resource_id": resources[0]["id"]})
x = own_x + ((seed % 7) - 3)
z = own_z + ((seed % 5) - 2)
return _tool_response("move", {"x": x, "z": z})
return complete
def build_runtime(config_path: Path) -> GameRuntime:
config = load_game_config(config_path)
world = create_world(config)
completer = make_fake_completer()
deterministic = DeterministicWorldSimulator()
routes = {
connector_id: OpenAICompatibleWorldSimulator(
connector_cfg,
fallback=deterministic,
connector_id_filter=connector_id,
chat_completer=completer,
)
for connector_id, connector_cfg in config.secondary_connectors.items()
if connector_cfg.type == "openai_compatible"
}
default = OpenAICompatibleWorldSimulator(
config.connector,
fallback=deterministic,
connector_id_filter=None,
chat_completer=completer,
)
simulator = RoutingWorldSimulator(routes=routes, default=default)
return GameRuntime(world=world, simulator=simulator, config=config)
def main() -> None:
parser = argparse.ArgumentParser(description="Offline LLM-shaped self-play.")
parser.add_argument("--ticks", type=int, default=150)
parser.add_argument("--config", type=Path, default=REPO_ROOT / "config" / "game.modal.local.json")
args = parser.parse_args()
runtime = build_runtime(args.config)
print(f"SELFPLAY start config={args.config.name} ticks={args.ticks} simulator={runtime.simulator_name}")
for _ in range(args.ticks):
status, _payload = runtime.tick()
if int(status) != 200:
print(f"tick failed: {status}")
break
ledger_path = runtime._ledger.ledger_path # noqa: SLF001 - harness introspection
print(f"SELFPLAY done. ledger={ledger_path}")
_summarize(ledger_path)
def _summarize(ledger_path: Path) -> None:
phases: Counter[str] = Counter()
models_by_npc: dict[str, set[str]] = {}
verdicts: Counter[str] = Counter()
event_types: Counter[str] = Counter()
actions: Counter[str] = Counter()
fallback_reasons: Counter[str] = Counter()
with ledger_path.open("r", encoding="utf-8") as handle:
for line in handle:
if not line.strip():
continue
rec = json.loads(line)
phase = rec.get("phase", "?")
phases[phase] += 1
if phase == "npc_request":
models_by_npc.setdefault(str(rec.get("npc_id")), set()).add(
f"{rec.get('model_profile')}::{rec.get('model')}"
)
if phase == "npc_response":
verdict = rec.get("validator_verdict") or {}
verdicts[str(verdict.get("status"))] += 1
parsed = rec.get("parsed_action") or {}
if isinstance(parsed, dict) and parsed.get("action"):
actions[str(parsed["action"])] += 1
if phase == "npc_fallback":
fallback_reasons[str(rec.get("reason"))] += 1
if phase == "engine_events":
for event in rec.get("events") or []:
event_types[str(event.get("type"))] += 1
print("\n== ledger phases ==")
for phase, count in phases.most_common():
print(f" {phase}: {count}")
print("\n== model per NPC (from npc_request) ==")
for npc_id in sorted(models_by_npc):
print(f" {npc_id}: {sorted(models_by_npc[npc_id])}")
print("\n== validator verdicts (npc_response) ==")
for status, count in verdicts.most_common():
print(f" {status}: {count}")
print("\n== parsed LLM actions ==")
for action, count in actions.most_common():
print(f" {action}: {count}")
print("\n== fallback reasons ==")
for reason, count in fallback_reasons.most_common():
print(f" {reason}: {count}")
print("\n== engine event types ==")
for event_type, count in event_types.most_common():
print(f" {event_type}: {count}")
if __name__ == "__main__":
main()