# Copyright (c) 2024 TeeUnit Project # SPDX-License-Identifier: MIT """ TeeUnit Environment Implementation. A MCP environment that wraps the Teeworlds game for LLM-based RL training. All interactions happen through MCP tools that translate to game actions. Supports two modes: - Simulation mode (default): Uses built-in physics simulation - Real server mode: Connects to actual Teeworlds 0.7.5 server MCP Tools: - `move(direction)`: Move the tee left, right, or none - `jump()`: Make the tee jump - `aim(x, y)`: Aim at target coordinates - `shoot(weapon)`: Fire the specified weapon - `hook()`: Use the grappling hook - `get_status()`: Get current game state as text Example: >>> from openenv.core.env_server.mcp_types import ListToolsAction, CallToolAction >>> env = TeeEnvironment() >>> env.reset() >>> >>> # List available tools >>> obs = env.step(ListToolsAction()) >>> print([t.name for t in obs.tools]) # ["move", "jump", "aim", "shoot", "hook", "get_status"] >>> >>> # Get game state >>> obs = env.step(CallToolAction(tool_name="get_status", arguments={})) >>> print(obs.result) # With real Teeworlds server: >>> env = TeeEnvironment(use_real_server=True, server_host="127.0.0.1", server_port=8303) >>> env.reset() # Connects to server """ from typing import Any, Optional, Dict, List from uuid import uuid4 import random import math import logging logger = logging.getLogger(__name__) # Try to import real server components try: import sys import os # Add parent path for teeunit package _parent = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if _parent not in sys.path: sys.path.insert(0, _parent) from teeunit.server.bot_manager import BotManager, GameState as RealGameState from teeunit.protocol.objects import PlayerInput, Character REAL_SERVER_AVAILABLE = True except ImportError as e: logger.warning(f"Real server support not available: {e}") REAL_SERVER_AVAILABLE = False BotManager = None PlayerInput = None Character = None RealGameState = None # Support both in-repo and standalone imports try: from openenv.core.env_server.mcp_environment import MCPEnvironment from openenv.core.env_server.types import Action, Observation, State except ImportError: # Fallback for development/testing from dataclasses import dataclass @dataclass class State: episode_id: str = "" step_count: int = 0 @dataclass class Observation: done: bool = False reward: float = 0.0 metadata: dict = None def __post_init__(self): if self.metadata is None: self.metadata = {} class Action: pass class MCPEnvironment: def __init__(self, mcp): self._mcp = mcp def step(self, action, **kwargs): return Observation() from fastmcp import FastMCP # Weapon definitions WEAPONS = { 0: {"name": "hammer", "ammo": -1, "damage": 3}, 1: {"name": "pistol", "ammo": 10, "damage": 1}, 2: {"name": "shotgun", "ammo": 10, "damage": 3}, 3: {"name": "grenade", "ammo": 10, "damage": 6}, 4: {"name": "laser", "ammo": 10, "damage": 5}, 5: {"name": "ninja", "ammo": -1, "damage": 9}, } class GameAgent: """Represents a player/bot in the game.""" def __init__(self, agent_id: int): self.agent_id = agent_id self.x = 400.0 + random.uniform(-200, 200) self.y = 300.0 + random.uniform(-100, 100) self.vel_x = 0.0 self.vel_y = 0.0 self.health = 10 self.armor = 0 self.weapon = 1 # pistol self.ammo = {w: WEAPONS[w]["ammo"] for w in WEAPONS} self.direction = 1 # 1 = right, -1 = left self.is_alive = True self.score = 0 self.aim_x = self.x + 100 self.aim_y = self.y self.is_hooking = False self.is_grounded = True def respawn(self): """Respawn at random location.""" self.x = 400.0 + random.uniform(-200, 200) self.y = 300.0 + random.uniform(-100, 100) self.vel_x = 0.0 self.vel_y = 0.0 self.health = 10 self.armor = 0 self.weapon = 1 self.ammo = {w: WEAPONS[w]["ammo"] for w in WEAPONS} self.is_alive = True self.is_hooking = False class TeeEnvironment(MCPEnvironment): """ OpenEnv-compatible Teeworlds environment with MCP tool interface. This environment provides a text-based interface for LLM agents to play Teeworlds. The LLM receives game state as natural language descriptions and issues commands through MCP tools. For hackathon demo, this uses a simplified game simulation. For production, it can be connected to the real Teeworlds server via bot_manager. Example: >>> with TeeEnv(base_url="http://localhost:8000") as env: ... env.reset() ... status = env.call_tool("get_status") ... env.call_tool("move", direction="right") ... env.call_tool("shoot", weapon=2) """ def __init__( self, num_agents: int = 4, max_steps: int = 1000, use_real_server: bool = False, server_host: str = "127.0.0.1", server_port: int = 8303, ): """ Initialize the TeeUnit environment. Args: num_agents: Number of agents in the arena max_steps: Maximum steps per episode use_real_server: If True, connect to real Teeworlds server server_host: Teeworlds server host server_port: Teeworlds server port """ # Validate real server mode if use_real_server and not REAL_SERVER_AVAILABLE: raise RuntimeError( "Real server mode requested but teeunit package not available. " "Make sure teeunit is installed or in PYTHONPATH." ) # Create MCP server and define tools inline mcp = FastMCP("teeunit_env") # Store config self._num_agents = num_agents self._max_steps = max_steps self._use_real_server = use_real_server self._server_host = server_host self._server_port = server_port # Game state (simulation mode) self._agents: Dict[int, GameAgent] = {} self._tick = 0 self._kill_events: List[dict] = [] self._current_agent_id = 0 # LLM controls agent 0 # Episode state self._state = State(episode_id=str(uuid4()), step_count=0) # Real server connection self._bot_manager: Optional[BotManager] = None self._pending_input: Optional[PlayerInput] = None if not REAL_SERVER_AVAILABLE else PlayerInput() self._fire_counter = 0 # Track fire presses for real server # Store tool functions for direct synchronous access self._tool_fns = {} # Define MCP tools @mcp.tool def move(direction: str) -> str: """ Move the tee horizontally. Args: direction: "left", "right", or "none" Returns: Result message describing the action taken """ if self._use_real_server: # Real server mode: update pending input if direction == "left": self._pending_input.direction = -1 return "Moving left." elif direction == "right": self._pending_input.direction = 1 return "Moving right." else: self._pending_input.direction = 0 return "Stopped." else: # Simulation mode agent = self._agents.get(self._current_agent_id) if not agent or not agent.is_alive: return "Cannot move: agent is dead" if direction == "left": agent.direction = -1 agent.vel_x = max(agent.vel_x - 5, -15) return f"Moving left. Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" elif direction == "right": agent.direction = 1 agent.vel_x = min(agent.vel_x + 5, 15) return f"Moving right. Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" else: agent.vel_x *= 0.8 # friction return f"Stopped. Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" @mcp.tool def jump() -> str: """ Make the tee jump. Can double-jump in the air. Returns: Result message describing the jump """ if self._use_real_server: # Real server mode: set jump flag self._pending_input.jump = True return "Jumping!" else: # Simulation mode agent = self._agents.get(self._current_agent_id) if not agent or not agent.is_alive: return "Cannot jump: agent is dead" if agent.is_grounded: agent.vel_y = -12 agent.is_grounded = False return f"Jumped! Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" else: # Air jump (weaker) agent.vel_y = -8 return f"Air jumped! Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})" @mcp.tool def aim(x: int, y: int) -> str: """ Aim at target coordinates. Args: x: Target X coordinate y: Target Y coordinate Returns: Result message confirming aim direction """ if self._use_real_server: # Real server mode: set target position (relative to player) # In Teeworlds, target is relative to player position self._pending_input.target_x = x self._pending_input.target_y = y angle = math.atan2(y, x) * 180 / math.pi return f"Aiming at ({x}, {y}). Angle: {angle:.1f} deg" else: # Simulation mode agent = self._agents.get(self._current_agent_id) if not agent or not agent.is_alive: return "Cannot aim: agent is dead" agent.aim_x = x agent.aim_y = y # Calculate angle for display dx = x - agent.x dy = y - agent.y angle = math.atan2(dy, dx) * 180 / math.pi distance = math.sqrt(dx*dx + dy*dy) return f"Aiming at ({x}, {y}). Angle: {angle:.1f} deg, Distance: {distance:.1f} units" @mcp.tool def shoot(weapon: int = -1) -> str: """ Fire the current or specified weapon. Args: weapon: Weapon ID (0=hammer, 1=pistol, 2=shotgun, 3=grenade, 4=laser, 5=ninja). Use -1 for current weapon. Returns: Result message describing the shot and any hits """ if self._use_real_server: # Real server mode: increment fire counter and set weapon self._fire_counter += 1 self._pending_input.fire = self._fire_counter # Set wanted weapon (Teeworlds uses 1-indexed: 1=hammer, 2=gun, etc.) if weapon >= 0 and weapon <= 5: self._pending_input.wanted_weapon = weapon + 1 # Convert to 1-indexed wpn_name = WEAPONS[weapon]["name"] else: wpn_name = "current weapon" return f"Fired {wpn_name}! (fire counter: {self._fire_counter})" else: # Simulation mode agent = self._agents.get(self._current_agent_id) if not agent or not agent.is_alive: return "Cannot shoot: agent is dead" # Switch weapon if specified if weapon >= 0 and weapon <= 5: agent.weapon = weapon wpn = WEAPONS[agent.weapon] wpn_name = wpn["name"] # Check ammo if wpn["ammo"] > 0 and agent.ammo[agent.weapon] <= 0: return f"Out of ammo for {wpn_name}!" # Use ammo if wpn["ammo"] > 0: agent.ammo[agent.weapon] -= 1 # Check for hits on other agents hits = [] for other_id, other in self._agents.items(): if other_id == self._current_agent_id or not other.is_alive: continue # Simple hit detection based on aim dx = other.x - agent.x dy = other.y - agent.y distance = math.sqrt(dx*dx + dy*dy) aim_dx = agent.aim_x - agent.x aim_dy = agent.aim_y - agent.y aim_dist = math.sqrt(aim_dx*aim_dx + aim_dy*aim_dy) if aim_dist > 0: # Check if enemy is roughly in line of fire dot = (dx * aim_dx + dy * aim_dy) / (aim_dist * max(distance, 1)) # Hit probability based on weapon and distance hit_range = 400 if agent.weapon != 0 else 50 # hammer short range if distance < hit_range and dot > 0.8: # Hit! damage = wpn["damage"] other.health -= damage other.armor = max(0, other.armor - damage // 2) if other.health <= 0: other.is_alive = False agent.score += 1 self._kill_events.append({ "killer_id": self._current_agent_id, "victim_id": other_id, "weapon": agent.weapon, "tick": self._tick, }) hits.append(f"KILLED Player {other_id} with {wpn_name}!") else: hits.append(f"Hit Player {other_id} for {damage} damage ({other.health}HP remaining)") ammo_str = f"({agent.ammo[agent.weapon]} ammo)" if wpn["ammo"] > 0 else "" if hits: return f"Fired {wpn_name} {ammo_str}. " + " ".join(hits) else: return f"Fired {wpn_name} {ammo_str}. No hits." @mcp.tool def hook() -> str: """ Use the grappling hook in the aim direction. The hook can grab walls or enemies to pull yourself toward them. Returns: Result message describing hook action """ if self._use_real_server: # Real server mode: toggle hook flag self._pending_input.hook = not self._pending_input.hook if self._pending_input.hook: return "Hook deployed!" else: return "Hook released." else: # Simulation mode agent = self._agents.get(self._current_agent_id) if not agent or not agent.is_alive: return "Cannot hook: agent is dead" agent.is_hooking = not agent.is_hooking if agent.is_hooking: # Pull toward aim point dx = agent.aim_x - agent.x dy = agent.aim_y - agent.y dist = math.sqrt(dx*dx + dy*dy) if dist > 0: agent.vel_x += (dx / dist) * 3 agent.vel_y += (dy / dist) * 3 return f"Hook deployed! Pulling toward ({agent.aim_x}, {agent.aim_y})" else: return "Hook released." @mcp.tool def get_status() -> str: """ Get the current game state as a text description. Returns: Detailed text description of current game state including: - Your position, health, weapon, ammo - Visible enemies with positions and health - Recent events (kills, deaths) """ if self._use_real_server: # Real server mode: read from BotManager.game_state if not self._bot_manager or not self._bot_manager.all_connected: return "Not connected to server." gs = self._bot_manager.game_state my_char = gs.get_character(self._current_agent_id) my_info = gs.get_player_info(self._current_agent_id) lines = [] lines.append(f"=== Teeworlds Game State (Tick {gs.tick}) ===") lines.append("") if my_char is None: lines.append("STATUS: DEAD - Waiting for respawn...") lines.append("") else: # Position is in fixed-point (divide by 32 for world units) x = my_char.x / 32.0 y = my_char.y / 32.0 vel_x = my_char.vel_x / 256.0 vel_y = my_char.vel_y / 256.0 lines.append(f"Position: ({x:.0f}, {y:.0f}) | Velocity: ({vel_x:.1f}, {vel_y:.1f})") lines.append(f"Health: {my_char.health}/10 | Armor: {my_char.armor}/10") # Weapon (0=hammer, 1=gun, etc.) wpn_id = my_char.weapon wpn_name = WEAPONS.get(wpn_id, {}).get("name", f"weapon_{wpn_id}") lines.append(f"Weapon: {wpn_name} ({my_char.ammo_count} ammo)") if my_info: lines.append(f"Score: {my_info.score} kills") lines.append("") # Other players enemies = [] for client_id, char in gs.characters.items(): if client_id == self._current_agent_id: continue other_info = gs.get_player_info(client_id) x = char.x / 32.0 y = char.y / 32.0 if my_char: dx = x - (my_char.x / 32.0) dy = y - (my_char.y / 32.0) dist = math.sqrt(dx*dx + dy*dy) else: dist = 0 wpn_name = WEAPONS.get(char.weapon, {}).get("name", "unknown") score = other_info.score if other_info else 0 enemies.append( f" - Player {client_id}: pos({x:.0f}, {y:.0f}), " f"{char.health}HP, {wpn_name}, {dist:.0f} units away, {score} kills" ) if enemies: lines.append("OTHER PLAYERS:") lines.extend(enemies) else: lines.append("OTHER PLAYERS: None") lines.append("") # Recent kills recent = gs.kill_events[-5:] if gs.kill_events else [] if recent: lines.append("RECENT EVENTS:") for event in recent: killer = event.killer_id victim = event.victim_id wpn_name = WEAPONS.get(event.weapon, {}).get("name", "unknown") if killer == self._current_agent_id: lines.append(f" - You killed Player {victim} with {wpn_name}") elif victim == self._current_agent_id: lines.append(f" - Player {killer} killed you with {wpn_name}") else: lines.append(f" - Player {killer} killed Player {victim} with {wpn_name}") lines.append("") lines.append("AVAILABLE ACTIONS: move, jump, aim, shoot, hook, get_status") return "\n".join(lines) else: # Simulation mode agent = self._agents.get(self._current_agent_id) lines = [] lines.append(f"=== Teeworlds Game State (Tick {self._tick}) ===") lines.append("") if not agent or not agent.is_alive: lines.append("STATUS: DEAD - Waiting for respawn...") lines.append("") else: lines.append(f"Position: ({agent.x:.0f}, {agent.y:.0f}) | Velocity: ({agent.vel_x:.1f}, {agent.vel_y:.1f})") lines.append(f"Health: {agent.health}/10 | Armor: {agent.armor}/10") wpn = WEAPONS[agent.weapon] ammo_str = str(agent.ammo[agent.weapon]) if wpn["ammo"] > 0 else "infinite" lines.append(f"Weapon: {wpn['name']} ({ammo_str} ammo)") lines.append(f"Score: {agent.score} kills") lines.append(f"Aim: ({agent.aim_x:.0f}, {agent.aim_y:.0f})") lines.append("") # Other players enemies = [] for other_id, other in self._agents.items(): if other_id == self._current_agent_id: continue if other.is_alive: dx = other.x - agent.x if agent else other.x dy = other.y - agent.y if agent else other.y dist = math.sqrt(dx*dx + dy*dy) wpn_name = WEAPONS[other.weapon]["name"] enemies.append( f" - Player {other_id}: pos({other.x:.0f}, {other.y:.0f}), " f"{other.health}HP, {wpn_name}, {dist:.0f} units away" ) else: enemies.append(f" - Player {other_id}: DEAD") if enemies: lines.append("OTHER PLAYERS:") lines.extend(enemies) else: lines.append("OTHER PLAYERS: None") lines.append("") # Recent kills recent = self._kill_events[-5:] if self._kill_events else [] if recent: lines.append("RECENT EVENTS:") for event in recent: killer = event["killer_id"] victim = event["victim_id"] wpn_name = WEAPONS[event["weapon"]]["name"] if killer == self._current_agent_id: lines.append(f" - You killed Player {victim} with {wpn_name}") elif victim == self._current_agent_id: lines.append(f" - Player {killer} killed you with {wpn_name}") else: lines.append(f" - Player {killer} killed Player {victim} with {wpn_name}") lines.append("") lines.append("AVAILABLE ACTIONS: move, jump, aim, shoot, hook, get_status") return "\n".join(lines) # Store tool functions for direct synchronous access (for Colab/notebooks) self._tool_fns = { "move": move, "jump": jump, "aim": aim, "shoot": shoot, "hook": hook, "get_status": get_status, } # Store MCP reference and pass to base class self._mcp = mcp super().__init__(mcp) def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs: Any, ) -> Observation: """ Reset the environment for a new episode. Args: seed: Optional random seed episode_id: Optional episode ID **kwargs: Additional reset options Returns: Observation indicating the environment is ready """ if seed is not None: random.seed(seed) # Reset episode state self._state = State( episode_id=episode_id or str(uuid4()), step_count=0, ) self._tick = 0 self._kill_events = [] self._fire_counter = 0 if self._use_real_server: # Real server mode: connect via BotManager if self._bot_manager: # Disconnect existing connection self._bot_manager.disconnect() # Create new BotManager self._bot_manager = BotManager( host=self._server_host, port=self._server_port, num_bots=self._num_agents, ticks_per_step=10, # 200ms per step at 50 ticks/sec bot_name_prefix="TeeUnit", ) # Connect to server connected = self._bot_manager.connect(timeout=10.0) if not connected: return Observation( done=True, reward=0.0, metadata={ "status": "error", "message": f"Failed to connect to Teeworlds server at {self._server_host}:{self._server_port}", "episode_id": self._state.episode_id, }, ) # Initialize pending input self._pending_input = PlayerInput() # Wait for initial game state self._bot_manager.step() # Get initial snapshot status = self._get_status_text_real() return Observation( done=False, reward=0.0, metadata={ "status": "ready", "message": status, "episode_id": self._state.episode_id, "mode": "real_server", "server": f"{self._server_host}:{self._server_port}", }, ) else: # Simulation mode self._agents = {} for i in range(self._num_agents): self._agents[i] = GameAgent(i) status = self._get_status_text() return Observation( done=False, reward=0.0, metadata={ "status": "ready", "message": status, "episode_id": self._state.episode_id, "mode": "simulation", }, ) def _get_status_text(self) -> str: """Generate current game status text (simulation mode).""" agent = self._agents.get(self._current_agent_id) lines = [] lines.append(f"=== Teeworlds Game State (Tick {self._tick}) ===") if agent and agent.is_alive: lines.append(f"Position: ({agent.x:.0f}, {agent.y:.0f})") lines.append(f"Health: {agent.health}/10 | Armor: {agent.armor}/10") wpn = WEAPONS[agent.weapon] lines.append(f"Weapon: {wpn['name']}") lines.append(f"Score: {agent.score} kills") else: lines.append("STATUS: DEAD") return "\n".join(lines) def _get_status_text_real(self) -> str: """Generate current game status text (real server mode).""" if not self._bot_manager: return "Not connected to server." gs = self._bot_manager.game_state my_char = gs.get_character(self._current_agent_id) lines = [] lines.append(f"=== Teeworlds Game State (Tick {gs.tick}) ===") if my_char: x = my_char.x / 32.0 y = my_char.y / 32.0 lines.append(f"Position: ({x:.0f}, {y:.0f})") lines.append(f"Health: {my_char.health}/10 | Armor: {my_char.armor}/10") wpn_name = WEAPONS.get(my_char.weapon, {}).get("name", "unknown") lines.append(f"Weapon: {wpn_name}") my_info = gs.get_player_info(self._current_agent_id) if my_info: lines.append(f"Score: {my_info.score} kills") else: lines.append("STATUS: DEAD") return "\n".join(lines) def _execute_real_step(self): """Execute one step on the real server.""" if not self._bot_manager: return # Send pending input for our controlled bot inputs = {self._current_agent_id: self._pending_input} # Execute the step (waits for ticks_per_step game ticks) self._bot_manager.step(inputs) # Update tick from game state self._tick = self._bot_manager.game_state.tick # Reset one-shot inputs (jump resets automatically in Teeworlds) self._pending_input.jump = False def _simulate_tick(self): """Simulate one game tick (physics, AI, etc.).""" self._tick += 1 for agent in self._agents.values(): if not agent.is_alive: continue # Apply gravity agent.vel_y += 0.5 # Apply velocity agent.x += agent.vel_x agent.y += agent.vel_y # Ground collision (simple) if agent.y > 500: agent.y = 500 agent.vel_y = 0 agent.is_grounded = True # Wall collision agent.x = max(50, min(750, agent.x)) # Friction agent.vel_x *= 0.95 # Simple AI for non-player agents if agent.agent_id != self._current_agent_id: self._simple_ai(agent) def _simple_ai(self, agent: GameAgent): """Simple AI behavior for non-player agents.""" # Random movement if random.random() < 0.1: agent.vel_x += random.uniform(-3, 3) # Random jump if agent.is_grounded and random.random() < 0.05: agent.vel_y = -10 agent.is_grounded = False # Aim at player player = self._agents.get(self._current_agent_id) if player and player.is_alive: agent.aim_x = player.x agent.aim_y = player.y # Occasionally shoot if random.random() < 0.02: dx = player.x - agent.x dy = player.y - agent.y dist = math.sqrt(dx*dx + dy*dy) if dist < 300: # Attack player wpn = WEAPONS[agent.weapon] if wpn["ammo"] < 0 or agent.ammo[agent.weapon] > 0: if wpn["ammo"] > 0: agent.ammo[agent.weapon] -= 1 # Check hit (simplified) if dist < 200 and random.random() < 0.3: damage = wpn["damage"] player.health -= damage if player.health <= 0: player.is_alive = False agent.score += 1 self._kill_events.append({ "killer_id": agent.agent_id, "victim_id": self._current_agent_id, "weapon": agent.weapon, "tick": self._tick, }) def _step_impl( self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any, ) -> Observation: """ Handle non-MCP actions. Args: action: The action to execute timeout_s: Optional timeout **kwargs: Additional arguments Returns: Observation with error for unknown action types """ return Observation( done=False, reward=0.0, metadata={ "error": f"Unknown action type: {type(action).__name__}. " "Use ListToolsAction or CallToolAction for MCP interactions." }, ) def step( self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any, ) -> Observation: """ Execute a step in the environment. Args: action: The MCP action to execute timeout_s: Optional timeout **kwargs: Additional arguments Returns: Observation from the action execution """ # Increment step count self._state.step_count += 1 if self._use_real_server: # Real server mode: execute step on actual Teeworlds server self._execute_real_step() # Calculate reward from real game state reward = self._calculate_reward_real() # Check done conditions done = self._state.step_count >= self._max_steps # Check if our bot is dead if self._bot_manager: my_char = self._bot_manager.game_state.get_character(self._current_agent_id) if my_char is None: # Character not in snapshot = dead done = True reward -= 5.0 # Check for kill events this step for event in self._bot_manager.game_state.kill_events: if event.killer_id == self._current_agent_id: reward += 1.0 # We killed someone else: # Simulation mode self._simulate_tick() # Calculate reward reward = self._calculate_reward() # Check done done = self._state.step_count >= self._max_steps # Check if all enemies dead (win condition) enemies_alive = sum(1 for a in self._agents.values() if a.agent_id != self._current_agent_id and a.is_alive) if enemies_alive == 0: done = True reward += 10.0 # Win bonus # Check if player dead player = self._agents.get(self._current_agent_id) if player and not player.is_alive: done = True reward -= 5.0 # Death penalty # Let the base class handle MCP actions obs = super().step(action, timeout_s=timeout_s, **kwargs) # Update observation with reward and done obs.reward = reward obs.done = done obs.metadata["step"] = self._state.step_count obs.metadata["tick"] = self._tick obs.metadata["mode"] = "real_server" if self._use_real_server else "simulation" return obs def _calculate_reward(self) -> float: """Calculate reward for current step (simulation mode).""" reward = 0.0 player = self._agents.get(self._current_agent_id) if not player: return reward # Survival bonus if player.is_alive: reward += 0.01 # Kill bonus (from recent events) for event in self._kill_events: if event["tick"] == self._tick: if event["killer_id"] == self._current_agent_id: reward += 1.0 elif event["victim_id"] == self._current_agent_id: reward -= 0.5 return reward def _calculate_reward_real(self) -> float: """Calculate reward for current step (real server mode).""" reward = 0.0 if not self._bot_manager: return reward gs = self._bot_manager.game_state my_char = gs.get_character(self._current_agent_id) # Survival bonus if my_char is not None: reward += 0.01 # Kill/death events are already handled in step() method # The step() adds +1.0 for kills and -5.0 for death return reward @property def state(self) -> State: """Get the current environment state.""" return self._state def call_tool_sync(self, name: str, **kwargs) -> str: """ Call a tool synchronously (for notebooks/Colab). Args: name: Tool name (move, jump, aim, shoot, hook, get_status) **kwargs: Arguments for the tool Returns: Tool result as string """ if name not in self._tool_fns: return f"Unknown tool: {name}" return self._tool_fns[name](**kwargs) def close(self): """Clean up resources and disconnect from server.""" if self._bot_manager: self._bot_manager.disconnect() self._bot_manager = None def __del__(self): """Destructor - ensure cleanup.""" self.close() def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close() return False