| from __future__ import annotations |
| import asyncio |
| import json |
| import random |
| import time |
|
|
| from .environment import Environment, GameConfig |
| from .environment.tile import Tile |
| from .environment.loot_tables import generate_chest_loot |
| from .agent import AgentState |
| from .agent.agent import Agent |
| from .agent.state import AbilityInstance |
| from .tools import ALL_TOOLS, TOOL_SCHEMAS |
|
|
|
|
| class Engine: |
| def __init__(self, config: GameConfig | None = None): |
| self.env = Environment(config=config or GameConfig()) |
| self.agents: dict[str, Agent] = {} |
| self.last_turn_log: dict | None = None |
|
|
| def generate_grid(self): |
| gs = self.env.config.grid_size |
| for x in range(gs): |
| for y in range(gs): |
| self.env.grid[(x, y)] = Tile(terrain="grass") |
|
|
| def scatter_chests(self, count: int | None = None): |
| count = count or self.env.config.num_chests |
| gs = self.env.config.grid_size |
| positions = random.sample( |
| [(x, y) for x in range(gs) for y in range(gs)], |
| count, |
| ) |
| for pos in positions: |
| if self.env.is_empty(pos[0], pos[1]): |
| tile = self.env.get_tile(pos[0], pos[1]) |
| tile.loot = generate_chest_loot() |
| tile.loot_spawn_turn = self.env.turn |
|
|
| def spawn_agent( |
| self, |
| agent_id: str, |
| name: str, |
| model: str | None = None, |
| provider: str | None = None, |
| system_prompt: str | None = None, |
| ): |
| gs = self.env.config.grid_size |
| while True: |
| x = random.randint(0, gs - 1) |
| y = random.randint(0, gs - 1) |
| if self.env.is_empty(x, y): |
| break |
|
|
| state = AgentState( |
| id=agent_id, |
| name=name, |
| pos=[x, y], |
| hp=self.env.config.base_hp, |
| ) |
| import os |
| default_provider = os.getenv("LLM_PROVIDER", "modal") |
| default_model = os.getenv( |
| "LLM_MODEL", |
| "qwen2.5:7b" if default_provider == "local" else "google/gemma-4-26B-A4B-it", |
| ) |
| brain = Agent( |
| agent_id=agent_id, |
| model=model or default_model, |
| provider=provider or default_provider, |
| system_prompt=system_prompt, |
| config=self.env.config, |
| ) |
| state.messages.append({"role": "system", "content": brain.system_prompt}) |
| self.env.agents[agent_id] = state |
| self.agents[agent_id] = brain |
| self._send_start_message(agent_id) |
| return state |
|
|
| async def step(self): |
| t0 = time.time() |
| alive = self.env.alive_agents() |
| if len(alive) <= 1: |
| if len(alive) == 1: |
| winner = alive[0] |
| winner.score += self.env.config.win_bonus |
| return |
|
|
| decisions: dict[str, dict] = {} |
| turn_log: dict[str, list[dict]] = {} |
|
|
| tasks = [] |
| for aid, state in self.env.agents.items(): |
| if not state.alive: |
| decisions[aid] = [] |
| turn_log[aid] = [] |
| continue |
| brain = self.agents.get(aid) |
| if brain: |
| tasks.append((aid, brain.decide(state.messages, TOOL_SCHEMAS))) |
| else: |
| decisions[aid] = [] |
| turn_log[aid] = [] |
|
|
| if tasks: |
| results = await asyncio.gather(*[t for _, t in tasks]) |
| for (aid, _), result in zip(tasks, results): |
| calls = result["calls"] |
| decisions[aid] = calls |
| turn_log[aid] = [{ |
| "phase": "llm", |
| "turn": self.env.turn, |
| "time_ms": result["time_ms"], |
| "usage": result.get("raw"), |
| }] |
| if calls: |
| state = self.env.agents[aid] |
| state.messages.append({ |
| "role": "assistant", |
| "content": None, |
| "tool_calls": [ |
| {"id": f"call_{i}", "type": "function", |
| "function": {"name": c["name"], "arguments": json.dumps(c.get("args", {}))}} |
| for i, c in enumerate(calls) |
| ], |
| }) |
|
|
| for rnd in range(3): |
| for aid in list(self.env.agents.keys()): |
| state = self.env.agents[aid] |
| if not state.alive: |
| continue |
| calls = decisions.get(aid, []) |
| if rnd < len(calls): |
| call = calls[rnd] |
| exec_result = self.env.execute(aid, call["name"], call["args"]) |
| state.messages.append({ |
| "role": "tool", |
| "content": exec_result["text"], |
| "tool_call_id": f"call_{rnd}", |
| }) |
| turn_log[aid].append({ |
| "phase": "exec", |
| "round": rnd, |
| "tool": call["name"], |
| "args": call["args"], |
| "result": exec_result["text"], |
| "time_ms": exec_result["time_ms"], |
| }) |
|
|
| self.env.turn += 1 |
| step_total = round((time.time() - t0) * 1000) |
|
|
| for state in self.env.agents.values(): |
| if state.alive: |
| state.score += self.env.config.survival_score_per_turn |
|
|
| |
| for tile in list(self.env.grid.values()): |
| if tile.loot is not None and tile.loot_spawn_turn is not None: |
| if self.env.turn - tile.loot_spawn_turn >= self.env.config.chest_lifetime: |
| tile.loot = None |
| tile.loot_spawn_turn = None |
|
|
| if self.env.turn % self.env.config.chest_respawn_interval == 0: |
| self.scatter_chests(count=max(1, self.env.config.num_chests // 3)) |
|
|
| self.last_turn_log = { |
| "time_ms": step_total, |
| "agents": turn_log, |
| } |
|
|
| def _send_start_message(self, aid: str): |
| agent = self.env.agents[aid] |
| agent.messages.append({ |
| "role": "user", |
| "content": ( |
| f"You are {agent.name}. You have been dropped into a battle royale on a " |
| f"{self.env.config.grid_size}x{self.env.config.grid_size} grid. " |
| f"Your HP is {agent.hp}. There are {len(self.env.alive_agents())} agents alive. " |
| "Use observe() to see your surroundings, move() to explore, " |
| "and activate_ability() once you find abilities in loot chests. " |
| "Good luck." |
| ), |
| }) |
|
|
| async def run_game(self) -> dict[str, AgentState]: |
| self.generate_grid() |
| self.scatter_chests() |
|
|
| while self.env.turn < self.env.config.max_turns: |
| alive = self.env.alive_agents() |
| if len(alive) <= 1: |
| break |
| await self.step() |
|
|
| if len(self.env.alive_agents()) == 1: |
| winner = self.env.alive_agents()[0] |
| winner.score += self.env.config.win_bonus |
|
|
| return self.env.agents |
|
|