Spaces:
Sleeping
Sleeping
| # Copyright (c) 2024 TeeUnit Project | |
| # SPDX-License-Identifier: MIT | |
| """ | |
| TeeUnit Environment Implementation. | |
| A MCP environment that wraps the Teeworlds game for LLM-based RL training. | |
| All interactions happen through MCP tools that translate to game actions. | |
| Supports two modes: | |
| - Simulation mode (default): Uses built-in physics simulation | |
| - Real server mode: Connects to actual Teeworlds 0.7.5 server | |
| MCP Tools: | |
| - `move(direction)`: Move the tee left, right, or none | |
| - `jump()`: Make the tee jump | |
| - `aim(x, y)`: Aim at target coordinates | |
| - `shoot(weapon)`: Fire the specified weapon | |
| - `hook()`: Use the grappling hook | |
| - `get_status()`: Get current game state as text | |
| Example: | |
| >>> from openenv.core.env_server.mcp_types import ListToolsAction, CallToolAction | |
| >>> env = TeeEnvironment() | |
| >>> env.reset() | |
| >>> | |
| >>> # List available tools | |
| >>> obs = env.step(ListToolsAction()) | |
| >>> print([t.name for t in obs.tools]) # ["move", "jump", "aim", "shoot", "hook", "get_status"] | |
| >>> | |
| >>> # Get game state | |
| >>> obs = env.step(CallToolAction(tool_name="get_status", arguments={})) | |
| >>> print(obs.result) | |
| # With real Teeworlds server: | |
| >>> env = TeeEnvironment(use_real_server=True, server_host="127.0.0.1", server_port=8303) | |
| >>> env.reset() # Connects to server | |
| """ | |
| from typing import Any, Optional, Dict, List | |
| from uuid import uuid4 | |
| import random | |
| import math | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # Try to import real server components | |
| try: | |
| import sys | |
| import os | |
| # Add parent path for teeunit package | |
| _parent = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| if _parent not in sys.path: | |
| sys.path.insert(0, _parent) | |
| from teeunit.server.bot_manager import BotManager, GameState as RealGameState | |
| from teeunit.protocol.objects import PlayerInput, Character | |
| REAL_SERVER_AVAILABLE = True | |
| except ImportError as e: | |
| logger.warning(f"Real server support not available: {e}") | |
| REAL_SERVER_AVAILABLE = False | |
| BotManager = None | |
| PlayerInput = None | |
| Character = None | |
| RealGameState = None | |
| # Support both in-repo and standalone imports | |
| try: | |
| from openenv.core.env_server.mcp_environment import MCPEnvironment | |
| from openenv.core.env_server.types import Action, Observation, State | |
| except ImportError: | |
| # Fallback for development/testing | |
| from dataclasses import dataclass | |
| class State: | |
| episode_id: str = "" | |
| step_count: int = 0 | |
| class Observation: | |
| done: bool = False | |
| reward: float = 0.0 | |
| metadata: dict = None | |
| def __post_init__(self): | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class Action: | |
| pass | |
| class MCPEnvironment: | |
| def __init__(self, mcp): | |
| self._mcp = mcp | |
| def step(self, action, **kwargs): | |
| return Observation() | |
| from fastmcp import FastMCP | |
| # Weapon definitions | |
| WEAPONS = { | |
| 0: {"name": "hammer", "ammo": -1, "damage": 3}, | |
| 1: {"name": "pistol", "ammo": 10, "damage": 1}, | |
| 2: {"name": "shotgun", "ammo": 10, "damage": 3}, | |
| 3: {"name": "grenade", "ammo": 10, "damage": 6}, | |
| 4: {"name": "laser", "ammo": 10, "damage": 5}, | |
| 5: {"name": "ninja", "ammo": -1, "damage": 9}, | |
| } | |
| class GameAgent: | |
| """Represents a player/bot in the game.""" | |
| def __init__(self, agent_id: int): | |
| self.agent_id = agent_id | |
| self.x = 400.0 + random.uniform(-200, 200) | |
| self.y = 300.0 + random.uniform(-100, 100) | |
| self.vel_x = 0.0 | |
| self.vel_y = 0.0 | |
| self.health = 10 | |
| self.armor = 0 | |
| self.weapon = 1 # pistol | |
| self.ammo = {w: WEAPONS[w]["ammo"] for w in WEAPONS} | |
| self.direction = 1 # 1 = right, -1 = left | |
| self.is_alive = True | |
| self.score = 0 | |
| self.aim_x = self.x + 100 | |
| self.aim_y = self.y | |
| self.is_hooking = False | |
| self.is_grounded = True | |
| def respawn(self): | |
| """Respawn at random location.""" | |
| self.x = 400.0 + random.uniform(-200, 200) | |
| self.y = 300.0 + random.uniform(-100, 100) | |
| self.vel_x = 0.0 | |
| self.vel_y = 0.0 | |
| self.health = 10 | |
| self.armor = 0 | |
| self.weapon = 1 | |
| self.ammo = {w: WEAPONS[w]["ammo"] for w in WEAPONS} | |
| self.is_alive = True | |
| self.is_hooking = False | |
| class TeeEnvironment(MCPEnvironment): | |
| """ | |
| OpenEnv-compatible Teeworlds environment with MCP tool interface. | |
| This environment provides a text-based interface for LLM agents to play | |
| Teeworlds. The LLM receives game state as natural language descriptions | |
| and issues commands through MCP tools. | |
| For hackathon demo, this uses a simplified game simulation. For production, | |
| it can be connected to the real Teeworlds server via bot_manager. | |
| Example: | |
| >>> with TeeEnv(base_url="http://localhost:8000") as env: | |
| ... env.reset() | |
| ... status = env.call_tool("get_status") | |
| ... env.call_tool("move", direction="right") | |
| ... env.call_tool("shoot", weapon=2) | |
| """ | |
| def __init__( | |
| self, | |
| num_agents: int = 4, | |
| max_steps: int = 1000, | |
| use_real_server: bool = False, | |
| server_host: str = "127.0.0.1", | |
| server_port: int = 8303, | |
| ): | |
| """ | |
| Initialize the TeeUnit environment. | |
| Args: | |
| num_agents: Number of agents in the arena | |
| max_steps: Maximum steps per episode | |
| use_real_server: If True, connect to real Teeworlds server | |
| server_host: Teeworlds server host | |
| server_port: Teeworlds server port | |
| """ | |
| # Validate real server mode | |
| if use_real_server and not REAL_SERVER_AVAILABLE: | |
| raise RuntimeError( | |
| "Real server mode requested but teeunit package not available. " | |
| "Make sure teeunit is installed or in PYTHONPATH." | |
| ) | |
| # Create MCP server and define tools inline | |
| mcp = FastMCP("teeunit_env") | |
| # Store config | |
| self._num_agents = num_agents | |
| self._max_steps = max_steps | |
| self._use_real_server = use_real_server | |
| self._server_host = server_host | |
| self._server_port = server_port | |
| # Game state (simulation mode) | |
| self._agents: Dict[int, GameAgent] = {} | |
| self._tick = 0 | |
| self._kill_events: List[dict] = [] | |
| self._current_agent_id = 0 # LLM controls agent 0 | |
| # Episode state | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| # Real server connection | |
| self._bot_manager: Optional[BotManager] = None | |
| self._pending_input: Optional[PlayerInput] = None if not REAL_SERVER_AVAILABLE else PlayerInput() | |
| self._fire_counter = 0 # Track fire presses for real server | |
| # Store tool functions for direct synchronous access | |
| self._tool_fns = {} | |
| # Define MCP tools | |
| def move(direction: str) -> str: | |
| """ | |
| Move the tee horizontally. | |
| Args: | |
| direction: "left", "right", or "none" | |
| Returns: | |
| Result message describing the action taken | |
| """ | |
| if self._use_real_server: | |
| # Real server mode: update pending input | |
| if direction == "left": | |
| self._pending_input.direction = -1 | |
| return "Moving left." | |
| elif direction == "right": | |
| self._pending_input.direction = 1 | |
| return "Moving right." | |
| else: | |
| self._pending_input.direction = 0 | |
| return "Stopped." | |
| else: | |
| # Simulation mode | |
| agent = self._agents.get(self._current_agent_id) | |
| if not agent or not agent.is_alive: | |
| return "Cannot move: agent is dead" | |
| if direction == "left": | |
| agent.direction = -1 | |
| agent.vel_x = max(agent.vel_x - 5, -15) | |
| return f"Moving left. Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" | |
| elif direction == "right": | |
| agent.direction = 1 | |
| agent.vel_x = min(agent.vel_x + 5, 15) | |
| return f"Moving right. Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" | |
| else: | |
| agent.vel_x *= 0.8 # friction | |
| return f"Stopped. Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" | |
| def jump() -> str: | |
| """ | |
| Make the tee jump. Can double-jump in the air. | |
| Returns: | |
| Result message describing the jump | |
| """ | |
| if self._use_real_server: | |
| # Real server mode: set jump flag | |
| self._pending_input.jump = True | |
| return "Jumping!" | |
| else: | |
| # Simulation mode | |
| agent = self._agents.get(self._current_agent_id) | |
| if not agent or not agent.is_alive: | |
| return "Cannot jump: agent is dead" | |
| if agent.is_grounded: | |
| agent.vel_y = -12 | |
| agent.is_grounded = False | |
| return f"Jumped! Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" | |
| else: | |
| # Air jump (weaker) | |
| agent.vel_y = -8 | |
| return f"Air jumped! Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" | |
| def aim(x: int, y: int) -> str: | |
| """ | |
| Aim at target coordinates. | |
| Args: | |
| x: Target X coordinate | |
| y: Target Y coordinate | |
| Returns: | |
| Result message confirming aim direction | |
| """ | |
| if self._use_real_server: | |
| # Real server mode: set target position (relative to player) | |
| # In Teeworlds, target is relative to player position | |
| self._pending_input.target_x = x | |
| self._pending_input.target_y = y | |
| angle = math.atan2(y, x) * 180 / math.pi | |
| return f"Aiming at ({x}, {y}). Angle: {angle:.1f} deg" | |
| else: | |
| # Simulation mode | |
| agent = self._agents.get(self._current_agent_id) | |
| if not agent or not agent.is_alive: | |
| return "Cannot aim: agent is dead" | |
| agent.aim_x = x | |
| agent.aim_y = y | |
| # Calculate angle for display | |
| dx = x - agent.x | |
| dy = y - agent.y | |
| angle = math.atan2(dy, dx) * 180 / math.pi | |
| distance = math.sqrt(dx*dx + dy*dy) | |
| return f"Aiming at ({x}, {y}). Angle: {angle:.1f} deg, Distance: {distance:.1f} units" | |
| def shoot(weapon: int = -1) -> str: | |
| """ | |
| Fire the current or specified weapon. | |
| Args: | |
| weapon: Weapon ID (0=hammer, 1=pistol, 2=shotgun, 3=grenade, 4=laser, 5=ninja). | |
| Use -1 for current weapon. | |
| Returns: | |
| Result message describing the shot and any hits | |
| """ | |
| if self._use_real_server: | |
| # Real server mode: increment fire counter and set weapon | |
| self._fire_counter += 1 | |
| self._pending_input.fire = self._fire_counter | |
| # Set wanted weapon (Teeworlds uses 1-indexed: 1=hammer, 2=gun, etc.) | |
| if weapon >= 0 and weapon <= 5: | |
| self._pending_input.wanted_weapon = weapon + 1 # Convert to 1-indexed | |
| wpn_name = WEAPONS[weapon]["name"] | |
| else: | |
| wpn_name = "current weapon" | |
| return f"Fired {wpn_name}! (fire counter: {self._fire_counter})" | |
| else: | |
| # Simulation mode | |
| agent = self._agents.get(self._current_agent_id) | |
| if not agent or not agent.is_alive: | |
| return "Cannot shoot: agent is dead" | |
| # Switch weapon if specified | |
| if weapon >= 0 and weapon <= 5: | |
| agent.weapon = weapon | |
| wpn = WEAPONS[agent.weapon] | |
| wpn_name = wpn["name"] | |
| # Check ammo | |
| if wpn["ammo"] > 0 and agent.ammo[agent.weapon] <= 0: | |
| return f"Out of ammo for {wpn_name}!" | |
| # Use ammo | |
| if wpn["ammo"] > 0: | |
| agent.ammo[agent.weapon] -= 1 | |
| # Check for hits on other agents | |
| hits = [] | |
| for other_id, other in self._agents.items(): | |
| if other_id == self._current_agent_id or not other.is_alive: | |
| continue | |
| # Simple hit detection based on aim | |
| dx = other.x - agent.x | |
| dy = other.y - agent.y | |
| distance = math.sqrt(dx*dx + dy*dy) | |
| aim_dx = agent.aim_x - agent.x | |
| aim_dy = agent.aim_y - agent.y | |
| aim_dist = math.sqrt(aim_dx*aim_dx + aim_dy*aim_dy) | |
| if aim_dist > 0: | |
| # Check if enemy is roughly in line of fire | |
| dot = (dx * aim_dx + dy * aim_dy) / (aim_dist * max(distance, 1)) | |
| # Hit probability based on weapon and distance | |
| hit_range = 400 if agent.weapon != 0 else 50 # hammer short range | |
| if distance < hit_range and dot > 0.8: | |
| # Hit! | |
| damage = wpn["damage"] | |
| other.health -= damage | |
| other.armor = max(0, other.armor - damage // 2) | |
| if other.health <= 0: | |
| other.is_alive = False | |
| agent.score += 1 | |
| self._kill_events.append({ | |
| "killer_id": self._current_agent_id, | |
| "victim_id": other_id, | |
| "weapon": agent.weapon, | |
| "tick": self._tick, | |
| }) | |
| hits.append(f"KILLED Player {other_id} with {wpn_name}!") | |
| else: | |
| hits.append(f"Hit Player {other_id} for {damage} damage ({other.health}HP remaining)") | |
| ammo_str = f"({agent.ammo[agent.weapon]} ammo)" if wpn["ammo"] > 0 else "" | |
| if hits: | |
| return f"Fired {wpn_name} {ammo_str}. " + " ".join(hits) | |
| else: | |
| return f"Fired {wpn_name} {ammo_str}. No hits." | |
| def hook() -> str: | |
| """ | |
| Use the grappling hook in the aim direction. | |
| The hook can grab walls or enemies to pull yourself toward them. | |
| Returns: | |
| Result message describing hook action | |
| """ | |
| if self._use_real_server: | |
| # Real server mode: toggle hook flag | |
| self._pending_input.hook = not self._pending_input.hook | |
| if self._pending_input.hook: | |
| return "Hook deployed!" | |
| else: | |
| return "Hook released." | |
| else: | |
| # Simulation mode | |
| agent = self._agents.get(self._current_agent_id) | |
| if not agent or not agent.is_alive: | |
| return "Cannot hook: agent is dead" | |
| agent.is_hooking = not agent.is_hooking | |
| if agent.is_hooking: | |
| # Pull toward aim point | |
| dx = agent.aim_x - agent.x | |
| dy = agent.aim_y - agent.y | |
| dist = math.sqrt(dx*dx + dy*dy) | |
| if dist > 0: | |
| agent.vel_x += (dx / dist) * 3 | |
| agent.vel_y += (dy / dist) * 3 | |
| return f"Hook deployed! Pulling toward ({agent.aim_x}, {agent.aim_y})" | |
| else: | |
| return "Hook released." | |
| def get_status() -> str: | |
| """ | |
| Get the current game state as a text description. | |
| Returns: | |
| Detailed text description of current game state including: | |
| - Your position, health, weapon, ammo | |
| - Visible enemies with positions and health | |
| - Recent events (kills, deaths) | |
| """ | |
| if self._use_real_server: | |
| # Real server mode: read from BotManager.game_state | |
| if not self._bot_manager or not self._bot_manager.all_connected: | |
| return "Not connected to server." | |
| gs = self._bot_manager.game_state | |
| my_char = gs.get_character(self._current_agent_id) | |
| my_info = gs.get_player_info(self._current_agent_id) | |
| lines = [] | |
| lines.append(f"=== Teeworlds Game State (Tick {gs.tick}) ===") | |
| lines.append("") | |
| if my_char is None: | |
| lines.append("STATUS: DEAD - Waiting for respawn...") | |
| lines.append("") | |
| else: | |
| # Position is in fixed-point (divide by 32 for world units) | |
| x = my_char.x / 32.0 | |
| y = my_char.y / 32.0 | |
| vel_x = my_char.vel_x / 256.0 | |
| vel_y = my_char.vel_y / 256.0 | |
| lines.append(f"Position: ({x:.0f}, {y:.0f}) | Velocity: ({vel_x:.1f}, {vel_y:.1f})") | |
| lines.append(f"Health: {my_char.health}/10 | Armor: {my_char.armor}/10") | |
| # Weapon (0=hammer, 1=gun, etc.) | |
| wpn_id = my_char.weapon | |
| wpn_name = WEAPONS.get(wpn_id, {}).get("name", f"weapon_{wpn_id}") | |
| lines.append(f"Weapon: {wpn_name} ({my_char.ammo_count} ammo)") | |
| if my_info: | |
| lines.append(f"Score: {my_info.score} kills") | |
| lines.append("") | |
| # Other players | |
| enemies = [] | |
| for client_id, char in gs.characters.items(): | |
| if client_id == self._current_agent_id: | |
| continue | |
| other_info = gs.get_player_info(client_id) | |
| x = char.x / 32.0 | |
| y = char.y / 32.0 | |
| if my_char: | |
| dx = x - (my_char.x / 32.0) | |
| dy = y - (my_char.y / 32.0) | |
| dist = math.sqrt(dx*dx + dy*dy) | |
| else: | |
| dist = 0 | |
| wpn_name = WEAPONS.get(char.weapon, {}).get("name", "unknown") | |
| score = other_info.score if other_info else 0 | |
| enemies.append( | |
| f" - Player {client_id}: pos({x:.0f}, {y:.0f}), " | |
| f"{char.health}HP, {wpn_name}, {dist:.0f} units away, {score} kills" | |
| ) | |
| if enemies: | |
| lines.append("OTHER PLAYERS:") | |
| lines.extend(enemies) | |
| else: | |
| lines.append("OTHER PLAYERS: None") | |
| lines.append("") | |
| # Recent kills | |
| recent = gs.kill_events[-5:] if gs.kill_events else [] | |
| if recent: | |
| lines.append("RECENT EVENTS:") | |
| for event in recent: | |
| killer = event.killer_id | |
| victim = event.victim_id | |
| wpn_name = WEAPONS.get(event.weapon, {}).get("name", "unknown") | |
| if killer == self._current_agent_id: | |
| lines.append(f" - You killed Player {victim} with {wpn_name}") | |
| elif victim == self._current_agent_id: | |
| lines.append(f" - Player {killer} killed you with {wpn_name}") | |
| else: | |
| lines.append(f" - Player {killer} killed Player {victim} with {wpn_name}") | |
| lines.append("") | |
| lines.append("AVAILABLE ACTIONS: move, jump, aim, shoot, hook, get_status") | |
| return "\n".join(lines) | |
| else: | |
| # Simulation mode | |
| agent = self._agents.get(self._current_agent_id) | |
| lines = [] | |
| lines.append(f"=== Teeworlds Game State (Tick {self._tick}) ===") | |
| lines.append("") | |
| if not agent or not agent.is_alive: | |
| lines.append("STATUS: DEAD - Waiting for respawn...") | |
| lines.append("") | |
| else: | |
| lines.append(f"Position: ({agent.x:.0f}, {agent.y:.0f}) | Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})") | |
| lines.append(f"Health: {agent.health}/10 | Armor: {agent.armor}/10") | |
| wpn = WEAPONS[agent.weapon] | |
| ammo_str = str(agent.ammo[agent.weapon]) if wpn["ammo"] > 0 else "infinite" | |
| lines.append(f"Weapon: {wpn['name']} ({ammo_str} ammo)") | |
| lines.append(f"Score: {agent.score} kills") | |
| lines.append(f"Aim: ({agent.aim_x:.0f}, {agent.aim_y:.0f})") | |
| lines.append("") | |
| # Other players | |
| enemies = [] | |
| for other_id, other in self._agents.items(): | |
| if other_id == self._current_agent_id: | |
| continue | |
| if other.is_alive: | |
| dx = other.x - agent.x if agent else other.x | |
| dy = other.y - agent.y if agent else other.y | |
| dist = math.sqrt(dx*dx + dy*dy) | |
| wpn_name = WEAPONS[other.weapon]["name"] | |
| enemies.append( | |
| f" - Player {other_id}: pos({other.x:.0f}, {other.y:.0f}), " | |
| f"{other.health}HP, {wpn_name}, {dist:.0f} units away" | |
| ) | |
| else: | |
| enemies.append(f" - Player {other_id}: DEAD") | |
| if enemies: | |
| lines.append("OTHER PLAYERS:") | |
| lines.extend(enemies) | |
| else: | |
| lines.append("OTHER PLAYERS: None") | |
| lines.append("") | |
| # Recent kills | |
| recent = self._kill_events[-5:] if self._kill_events else [] | |
| if recent: | |
| lines.append("RECENT EVENTS:") | |
| for event in recent: | |
| killer = event["killer_id"] | |
| victim = event["victim_id"] | |
| wpn_name = WEAPONS[event["weapon"]]["name"] | |
| if killer == self._current_agent_id: | |
| lines.append(f" - You killed Player {victim} with {wpn_name}") | |
| elif victim == self._current_agent_id: | |
| lines.append(f" - Player {killer} killed you with {wpn_name}") | |
| else: | |
| lines.append(f" - Player {killer} killed Player {victim} with {wpn_name}") | |
| lines.append("") | |
| lines.append("AVAILABLE ACTIONS: move, jump, aim, shoot, hook, get_status") | |
| return "\n".join(lines) | |
| # Store tool functions for direct synchronous access (for Colab/notebooks) | |
| self._tool_fns = { | |
| "move": move, | |
| "jump": jump, | |
| "aim": aim, | |
| "shoot": shoot, | |
| "hook": hook, | |
| "get_status": get_status, | |
| } | |
| # Store MCP reference and pass to base class | |
| self._mcp = mcp | |
| super().__init__(mcp) | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| """ | |
| Reset the environment for a new episode. | |
| Args: | |
| seed: Optional random seed | |
| episode_id: Optional episode ID | |
| **kwargs: Additional reset options | |
| Returns: | |
| Observation indicating the environment is ready | |
| """ | |
| if seed is not None: | |
| random.seed(seed) | |
| # Reset episode state | |
| self._state = State( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| ) | |
| self._tick = 0 | |
| self._kill_events = [] | |
| self._fire_counter = 0 | |
| if self._use_real_server: | |
| # Real server mode: connect via BotManager | |
| if self._bot_manager: | |
| # Disconnect existing connection | |
| self._bot_manager.disconnect() | |
| # Create new BotManager | |
| self._bot_manager = BotManager( | |
| host=self._server_host, | |
| port=self._server_port, | |
| num_bots=self._num_agents, | |
| ticks_per_step=10, # 200ms per step at 50 ticks/sec | |
| bot_name_prefix="TeeUnit", | |
| ) | |
| # Connect to server | |
| connected = self._bot_manager.connect(timeout=10.0) | |
| if not connected: | |
| return Observation( | |
| done=True, | |
| reward=0.0, | |
| metadata={ | |
| "status": "error", | |
| "message": f"Failed to connect to Teeworlds server at {self._server_host}:{self._server_port}", | |
| "episode_id": self._state.episode_id, | |
| }, | |
| ) | |
| # Initialize pending input | |
| self._pending_input = PlayerInput() | |
| # Wait for initial game state | |
| self._bot_manager.step() # Get initial snapshot | |
| status = self._get_status_text_real() | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "status": "ready", | |
| "message": status, | |
| "episode_id": self._state.episode_id, | |
| "mode": "real_server", | |
| "server": f"{self._server_host}:{self._server_port}", | |
| }, | |
| ) | |
| else: | |
| # Simulation mode | |
| self._agents = {} | |
| for i in range(self._num_agents): | |
| self._agents[i] = GameAgent(i) | |
| status = self._get_status_text() | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "status": "ready", | |
| "message": status, | |
| "episode_id": self._state.episode_id, | |
| "mode": "simulation", | |
| }, | |
| ) | |
| def _get_status_text(self) -> str: | |
| """Generate current game status text (simulation mode).""" | |
| agent = self._agents.get(self._current_agent_id) | |
| lines = [] | |
| lines.append(f"=== Teeworlds Game State (Tick {self._tick}) ===") | |
| if agent and agent.is_alive: | |
| lines.append(f"Position: ({agent.x:.0f}, {agent.y:.0f})") | |
| lines.append(f"Health: {agent.health}/10 | Armor: {agent.armor}/10") | |
| wpn = WEAPONS[agent.weapon] | |
| lines.append(f"Weapon: {wpn['name']}") | |
| lines.append(f"Score: {agent.score} kills") | |
| else: | |
| lines.append("STATUS: DEAD") | |
| return "\n".join(lines) | |
| def _get_status_text_real(self) -> str: | |
| """Generate current game status text (real server mode).""" | |
| if not self._bot_manager: | |
| return "Not connected to server." | |
| gs = self._bot_manager.game_state | |
| my_char = gs.get_character(self._current_agent_id) | |
| lines = [] | |
| lines.append(f"=== Teeworlds Game State (Tick {gs.tick}) ===") | |
| if my_char: | |
| x = my_char.x / 32.0 | |
| y = my_char.y / 32.0 | |
| lines.append(f"Position: ({x:.0f}, {y:.0f})") | |
| lines.append(f"Health: {my_char.health}/10 | Armor: {my_char.armor}/10") | |
| wpn_name = WEAPONS.get(my_char.weapon, {}).get("name", "unknown") | |
| lines.append(f"Weapon: {wpn_name}") | |
| my_info = gs.get_player_info(self._current_agent_id) | |
| if my_info: | |
| lines.append(f"Score: {my_info.score} kills") | |
| else: | |
| lines.append("STATUS: DEAD") | |
| return "\n".join(lines) | |
| def _execute_real_step(self): | |
| """Execute one step on the real server.""" | |
| if not self._bot_manager: | |
| return | |
| # Send pending input for our controlled bot | |
| inputs = {self._current_agent_id: self._pending_input} | |
| # Execute the step (waits for ticks_per_step game ticks) | |
| self._bot_manager.step(inputs) | |
| # Update tick from game state | |
| self._tick = self._bot_manager.game_state.tick | |
| # Reset one-shot inputs (jump resets automatically in Teeworlds) | |
| self._pending_input.jump = False | |
| def _simulate_tick(self): | |
| """Simulate one game tick (physics, AI, etc.).""" | |
| self._tick += 1 | |
| for agent in self._agents.values(): | |
| if not agent.is_alive: | |
| continue | |
| # Apply gravity | |
| agent.vel_y += 0.5 | |
| # Apply velocity | |
| agent.x += agent.vel_x | |
| agent.y += agent.vel_y | |
| # Ground collision (simple) | |
| if agent.y > 500: | |
| agent.y = 500 | |
| agent.vel_y = 0 | |
| agent.is_grounded = True | |
| # Wall collision | |
| agent.x = max(50, min(750, agent.x)) | |
| # Friction | |
| agent.vel_x *= 0.95 | |
| # Simple AI for non-player agents | |
| if agent.agent_id != self._current_agent_id: | |
| self._simple_ai(agent) | |
| def _simple_ai(self, agent: GameAgent): | |
| """Simple AI behavior for non-player agents.""" | |
| # Random movement | |
| if random.random() < 0.1: | |
| agent.vel_x += random.uniform(-3, 3) | |
| # Random jump | |
| if agent.is_grounded and random.random() < 0.05: | |
| agent.vel_y = -10 | |
| agent.is_grounded = False | |
| # Aim at player | |
| player = self._agents.get(self._current_agent_id) | |
| if player and player.is_alive: | |
| agent.aim_x = player.x | |
| agent.aim_y = player.y | |
| # Occasionally shoot | |
| if random.random() < 0.02: | |
| dx = player.x - agent.x | |
| dy = player.y - agent.y | |
| dist = math.sqrt(dx*dx + dy*dy) | |
| if dist < 300: | |
| # Attack player | |
| wpn = WEAPONS[agent.weapon] | |
| if wpn["ammo"] < 0 or agent.ammo[agent.weapon] > 0: | |
| if wpn["ammo"] > 0: | |
| agent.ammo[agent.weapon] -= 1 | |
| # Check hit (simplified) | |
| if dist < 200 and random.random() < 0.3: | |
| damage = wpn["damage"] | |
| player.health -= damage | |
| if player.health <= 0: | |
| player.is_alive = False | |
| agent.score += 1 | |
| self._kill_events.append({ | |
| "killer_id": agent.agent_id, | |
| "victim_id": self._current_agent_id, | |
| "weapon": agent.weapon, | |
| "tick": self._tick, | |
| }) | |
| def _step_impl( | |
| self, | |
| action: Action, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| """ | |
| Handle non-MCP actions. | |
| Args: | |
| action: The action to execute | |
| timeout_s: Optional timeout | |
| **kwargs: Additional arguments | |
| Returns: | |
| Observation with error for unknown action types | |
| """ | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "error": f"Unknown action type: {type(action).__name__}. " | |
| "Use ListToolsAction or CallToolAction for MCP interactions." | |
| }, | |
| ) | |
| def step( | |
| self, | |
| action: Action, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| """ | |
| Execute a step in the environment. | |
| Args: | |
| action: The MCP action to execute | |
| timeout_s: Optional timeout | |
| **kwargs: Additional arguments | |
| Returns: | |
| Observation from the action execution | |
| """ | |
| # Increment step count | |
| self._state.step_count += 1 | |
| if self._use_real_server: | |
| # Real server mode: execute step on actual Teeworlds server | |
| self._execute_real_step() | |
| # Calculate reward from real game state | |
| reward = self._calculate_reward_real() | |
| # Check done conditions | |
| done = self._state.step_count >= self._max_steps | |
| # Check if our bot is dead | |
| if self._bot_manager: | |
| my_char = self._bot_manager.game_state.get_character(self._current_agent_id) | |
| if my_char is None: | |
| # Character not in snapshot = dead | |
| done = True | |
| reward -= 5.0 | |
| # Check for kill events this step | |
| for event in self._bot_manager.game_state.kill_events: | |
| if event.killer_id == self._current_agent_id: | |
| reward += 1.0 # We killed someone | |
| else: | |
| # Simulation mode | |
| self._simulate_tick() | |
| # Calculate reward | |
| reward = self._calculate_reward() | |
| # Check done | |
| done = self._state.step_count >= self._max_steps | |
| # Check if all enemies dead (win condition) | |
| enemies_alive = sum(1 for a in self._agents.values() | |
| if a.agent_id != self._current_agent_id and a.is_alive) | |
| if enemies_alive == 0: | |
| done = True | |
| reward += 10.0 # Win bonus | |
| # Check if player dead | |
| player = self._agents.get(self._current_agent_id) | |
| if player and not player.is_alive: | |
| done = True | |
| reward -= 5.0 # Death penalty | |
| # Let the base class handle MCP actions | |
| obs = super().step(action, timeout_s=timeout_s, **kwargs) | |
| # Update observation with reward and done | |
| obs.reward = reward | |
| obs.done = done | |
| obs.metadata["step"] = self._state.step_count | |
| obs.metadata["tick"] = self._tick | |
| obs.metadata["mode"] = "real_server" if self._use_real_server else "simulation" | |
| return obs | |
| def _calculate_reward(self) -> float: | |
| """Calculate reward for current step (simulation mode).""" | |
| reward = 0.0 | |
| player = self._agents.get(self._current_agent_id) | |
| if not player: | |
| return reward | |
| # Survival bonus | |
| if player.is_alive: | |
| reward += 0.01 | |
| # Kill bonus (from recent events) | |
| for event in self._kill_events: | |
| if event["tick"] == self._tick: | |
| if event["killer_id"] == self._current_agent_id: | |
| reward += 1.0 | |
| elif event["victim_id"] == self._current_agent_id: | |
| reward -= 0.5 | |
| return reward | |
| def _calculate_reward_real(self) -> float: | |
| """Calculate reward for current step (real server mode).""" | |
| reward = 0.0 | |
| if not self._bot_manager: | |
| return reward | |
| gs = self._bot_manager.game_state | |
| my_char = gs.get_character(self._current_agent_id) | |
| # Survival bonus | |
| if my_char is not None: | |
| reward += 0.01 | |
| # Kill/death events are already handled in step() method | |
| # The step() adds +1.0 for kills and -5.0 for death | |
| return reward | |
| def state(self) -> State: | |
| """Get the current environment state.""" | |
| return self._state | |
| def call_tool_sync(self, name: str, **kwargs) -> str: | |
| """ | |
| Call a tool synchronously (for notebooks/Colab). | |
| Args: | |
| name: Tool name (move, jump, aim, shoot, hook, get_status) | |
| **kwargs: Arguments for the tool | |
| Returns: | |
| Tool result as string | |
| """ | |
| if name not in self._tool_fns: | |
| return f"Unknown tool: {name}" | |
| return self._tool_fns[name](**kwargs) | |
| def close(self): | |
| """Clean up resources and disconnect from server.""" | |
| if self._bot_manager: | |
| self._bot_manager.disconnect() | |
| self._bot_manager = None | |
| def __del__(self): | |
| """Destructor - ensure cleanup.""" | |
| self.close() | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit.""" | |
| self.close() | |
| return False | |