Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Scripted Red Alert bot that plays a full game via the OpenEnv client API. | |
| Exercises ALL Sprint 4+5 observation fields and action types: | |
| - Observations: spatial_map, visible_enemy_buildings, unit facing/stance/speed/ | |
| attack_range/experience/passengers, building cell coords/can_produce/power/ | |
| rally/repair/sell_value | |
| - Actions: all 20 types including GUARD, SET_STANCE, ENTER_TRANSPORT, UNLOAD, | |
| SET_RALLY_POINT, REPAIR, SELL, POWER_DOWN, SET_PRIMARY | |
| Usage: | |
| docker run -p 8000:8000 openra-rl | |
| python examples/scripted_bot.py --verbose | |
| """ | |
| import argparse | |
| import asyncio | |
| import base64 | |
| import sys | |
| from typing import List, Optional, Tuple | |
| from openra_env.client import OpenRAEnv | |
| from openra_env.models import ( | |
| ActionType, | |
| BuildingInfoModel, | |
| CommandModel, | |
| OpenRAAction, | |
| OpenRAObservation, | |
| UnitInfoModel, | |
| ) | |
| # Stance constants matching C# AutoTarget.UnitStance enum | |
| STANCE_HOLD_FIRE = 0 | |
| STANCE_RETURN_FIRE = 1 | |
| STANCE_DEFEND = 2 | |
| STANCE_ATTACK_ANYTHING = 3 | |
| STANCE_NAMES = {0: "HoldFire", 1: "ReturnFire", 2: "Defend", 3: "AttackAnything"} | |
| class ScriptedBot: | |
| """State-machine bot with a Red Alert build order exercising all actions. | |
| Phases: | |
| deploy_mcv - Deploy MCV, set stance on starting units | |
| build_base - Build power/barracks/war factory, set rally points | |
| train_army - Train infantry + APC, guard CY, load transport | |
| attack - Attack-move toward enemy buildings, unload APC | |
| sustain - Continuous production, repair, sell damaged buildings | |
| """ | |
| # Build order uses both faction names β bot picks whichever is available | |
| BARRACKS_TYPES = {"tent", "barr"} # Allied / Soviet | |
| WAR_FACTORY_TYPES = {"weap"} | |
| BUILD_PRIORITY = [ | |
| "powr", # Power Plant ($300) β shared | |
| "barracks", # Placeholder: tent (Allied) or barr (Soviet) | |
| "proc", # Ore Refinery ($2000) β needed before war factory | |
| "weap", # War Factory ($2000) β shared | |
| "powr", # Second Power Plant | |
| ] | |
| INFANTRY_TRAIN_TARGET = 6 | |
| GUARD_COUNT = 2 # infantry to guard CY | |
| TRANSPORT_TYPE = "apc" | |
| COMBAT_UNIT_TYPES = {"e1", "e2", "e3", "e4", "1tnk", "2tnk", "3tnk", "arty", "jeep", "apc"} | |
| INFANTRY_TYPES = {"e1", "e2", "e3", "e4"} | |
| VEHICLE_TYPES = {"1tnk", "2tnk", "3tnk", "arty", "jeep"} | |
| def __init__(self, verbose: bool = False): | |
| self.phase = "deploy_mcv" | |
| self.build_index = 0 | |
| self.placement_count = 0 | |
| self.deploy_issued = False | |
| self.verbose = verbose | |
| self._guards_assigned: set[int] = set() # actor IDs guarding CY | |
| self._stances_set: set[int] = set() # actor IDs with stance already set | |
| self._rally_set: set[int] = set() # building actor IDs with rally point set | |
| self._apc_trained = False | |
| self._apc_loaded = False | |
| self._repair_issued: set[int] = set() # building actor IDs being repaired | |
| self._sold: set[int] = set() # building actor IDs sold | |
| self._powered_down: set[int] = set() # building actor IDs powered down | |
| self._primary_set: set[int] = set() # building actor IDs set as primary | |
| def decide(self, obs: OpenRAObservation) -> OpenRAAction: | |
| """Given current observation, return commands for this tick.""" | |
| commands: List[CommandModel] = [] | |
| self._update_phase(obs) | |
| # Priority 1: Place completed buildings | |
| commands.extend(self._handle_placement(obs)) | |
| # Priority 2: Deploy MCV | |
| if self.phase == "deploy_mcv": | |
| cmd = self._handle_deploy(obs) | |
| if cmd: | |
| commands.append(cmd) | |
| # Priority 3: Set rally points on production buildings | |
| commands.extend(self._handle_rally_points(obs)) | |
| # Priority 4: Power management (power down buildings if power negative) | |
| commands.extend(self._handle_power_management(obs)) | |
| # Priority 5: Set primary production buildings | |
| commands.extend(self._handle_set_primary(obs)) | |
| # Priority 6: Repair damaged buildings | |
| commands.extend(self._handle_repairs(obs)) | |
| # Priority 7: Queue production (buildings + units) | |
| commands.extend(self._handle_production(obs)) | |
| # Priority 8: Set stances on new units | |
| commands.extend(self._handle_stances(obs)) | |
| # Priority 9: Assign guards to CY | |
| commands.extend(self._handle_guards(obs)) | |
| # Priority 10: Load infantry into APC | |
| commands.extend(self._handle_transport(obs)) | |
| # Priority 11: Combat β attack + unload | |
| commands.extend(self._handle_combat(obs)) | |
| # Priority 12: Sell heavily damaged buildings | |
| commands.extend(self._handle_sell(obs)) | |
| if not commands: | |
| commands.append(CommandModel(action=ActionType.NO_OP)) | |
| return OpenRAAction(commands=commands) | |
| # ββ Phase transitions ββββββββββββββββββββββββββββββββββββββββββ | |
| def _update_phase(self, obs: OpenRAObservation): | |
| has_cy = any(b.type == "fact" for b in obs.buildings) | |
| has_barracks = any(b.type in self.BARRACKS_TYPES for b in obs.buildings) | |
| combat_units = [u for u in obs.units if u.type in self.COMBAT_UNIT_TYPES] | |
| non_guard_combat = [u for u in combat_units if u.actor_id not in self._guards_assigned] | |
| if self.phase == "deploy_mcv" and has_cy: | |
| self.phase = "build_base" | |
| self._log("Phase β build_base") | |
| elif self.phase == "build_base" and self.build_index >= len(self.BUILD_PRIORITY): | |
| self.phase = "train_army" | |
| self._log("Phase β train_army") | |
| elif self.phase == "train_army" and len(non_guard_combat) >= self.INFANTRY_TRAIN_TARGET: | |
| self.phase = "attack" | |
| self._log(f"Phase β attack ({len(non_guard_combat)} combat units ready)") | |
| elif self.phase == "attack" and has_barracks: | |
| # Stay in attack but also sustain production | |
| pass | |
| # ββ Deploy MCV βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _handle_deploy(self, obs: OpenRAObservation) -> Optional[CommandModel]: | |
| if self.deploy_issued: | |
| return None | |
| mcv = next((u for u in obs.units if u.type == "mcv"), None) | |
| if mcv: | |
| self.deploy_issued = True | |
| self._log(f"Deploying MCV (actor {mcv.actor_id}, facing={mcv.facing})") | |
| return CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id) | |
| return None | |
| # ββ Building placement βββββββββββββββββββββββββββββββββββββββββ | |
| def _handle_placement(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| cy = self._find_building(obs, "fact") | |
| if not cy: | |
| return commands | |
| for prod in obs.production: | |
| if prod.queue_type == "Building" and prod.progress >= 0.99: | |
| x, y = self._placement_offset(cy) | |
| self._log(f"Placing {prod.item} at cell ({x}, {y}) [attempt {self.placement_count}]") | |
| commands.append(CommandModel( | |
| action=ActionType.PLACE_BUILDING, | |
| item_type=prod.item, | |
| target_x=x, | |
| target_y=y, | |
| )) | |
| self.placement_count += 1 | |
| return commands | |
| def _placement_offset(self, cy: BuildingInfoModel) -> Tuple[int, int]: | |
| """Calculate placement position relative to CY using cell coords.""" | |
| # Use pos_x // 1024 as CenterPosition maps to cell more reliably | |
| cx = cy.pos_x // 1024 | |
| cy_y = cy.pos_y // 1024 | |
| # Many offsets to maximize chance of finding valid terrain | |
| offsets = [ | |
| (3, 0), (-3, 0), (0, 3), (0, -3), | |
| (3, 3), (-3, 3), (3, -3), (-3, -3), | |
| (6, 0), (-6, 0), (0, 6), (0, -6), | |
| (2, 0), (-2, 0), (0, 2), (0, -2), | |
| (4, 0), (-4, 0), (0, 4), (0, -4), | |
| ] | |
| idx = self.placement_count % len(offsets) | |
| dx, dy = offsets[idx] | |
| return cx + dx, cy_y + dy | |
| # ββ Rally points (Sprint 4 action) βββββββββββββββββββββββββββββ | |
| def _handle_rally_points(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| cy = self._find_building(obs, "fact") | |
| if not cy: | |
| return commands | |
| # Set rally point on barracks and war factory toward CY | |
| for b in obs.buildings: | |
| if b.type in ("tent", "weap") and b.actor_id not in self._rally_set: | |
| rally_x = cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024 | |
| rally_y = cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024 | |
| self._log(f"Setting rally on {b.type} (actor {b.actor_id}) β ({rally_x}, {rally_y})") | |
| commands.append(CommandModel( | |
| action=ActionType.SET_RALLY_POINT, | |
| actor_id=b.actor_id, | |
| target_x=rally_x, | |
| target_y=rally_y, | |
| )) | |
| self._rally_set.add(b.actor_id) | |
| return commands | |
| # ββ Power management (Sprint 5 action) βββββββββββββββββββββββββ | |
| def _handle_power_management(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| """Power down non-essential buildings when power balance is negative.""" | |
| commands = [] | |
| power_balance = obs.economy.power_provided - obs.economy.power_drained | |
| if power_balance >= 0: | |
| return commands | |
| # Power down radar/tech buildings first (keep production running) | |
| POWER_DOWN_PRIORITY = ["dome", "spen", "syrd", "hpad", "afld", "fix"] | |
| for btype in POWER_DOWN_PRIORITY: | |
| for b in obs.buildings: | |
| if b.type == btype and b.is_powered and b.actor_id not in self._powered_down: | |
| commands.append(CommandModel(action=ActionType.POWER_DOWN, actor_id=b.actor_id)) | |
| self._powered_down.add(b.actor_id) | |
| self._log(f"Powering down {b.type} (actor {b.actor_id}) β power balance: {power_balance}") | |
| return commands # one at a time | |
| return commands | |
| # ββ Set primary building (Sprint 5 action) βββββββββββββββββββ | |
| def _handle_set_primary(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| """Set primary on newest production building of each type.""" | |
| commands = [] | |
| for btype_set in [self.BARRACKS_TYPES, self.WAR_FACTORY_TYPES]: | |
| buildings_of_type = [b for b in obs.buildings if b.type in btype_set] | |
| if len(buildings_of_type) >= 2: | |
| newest = max(buildings_of_type, key=lambda b: b.actor_id) | |
| if newest.actor_id not in self._primary_set: | |
| commands.append(CommandModel(action=ActionType.SET_PRIMARY, actor_id=newest.actor_id)) | |
| self._primary_set.add(newest.actor_id) | |
| self._log(f"Setting primary: {newest.type} (actor {newest.actor_id})") | |
| return commands | |
| # ββ Repair damaged buildings (Sprint 4 observation + existing action) ββ | |
| def _handle_repairs(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| for b in obs.buildings: | |
| if (b.hp_percent < 0.7 | |
| and not b.is_repairing | |
| and b.actor_id not in self._repair_issued | |
| and obs.economy.cash >= 500): | |
| self._log(f"Repairing {b.type} (actor {b.actor_id}, hp={b.hp_percent:.0%})") | |
| commands.append(CommandModel( | |
| action=ActionType.REPAIR, | |
| actor_id=b.actor_id, | |
| )) | |
| self._repair_issued.add(b.actor_id) | |
| return commands | |
| # ββ Production βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _handle_production(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| # Building construction β treat any Building queue item as "in progress" | |
| # (includes completed-but-unplaced buildings that block the queue) | |
| building_in_queue = any( | |
| p.queue_type == "Building" | |
| for p in obs.production | |
| ) | |
| if not building_in_queue and self.build_index < len(self.BUILD_PRIORITY): | |
| item_type = self._resolve_build_item(obs, self.BUILD_PRIORITY[self.build_index]) | |
| if item_type is None: | |
| # Can't resolve this item yet, skip | |
| pass | |
| elif self._has_building_type(obs, item_type, self.build_index): | |
| self.build_index += 1 | |
| elif self._can_produce_item(obs, item_type): | |
| self._log(f"Building {item_type} (#{self.build_index + 1}/{len(self.BUILD_PRIORITY)})") | |
| commands.append(CommandModel(action=ActionType.BUILD, item_type=item_type)) | |
| self.build_index += 1 | |
| # Infantry training | |
| has_barracks = any(b.type in self.BARRACKS_TYPES for b in obs.buildings) | |
| infantry_training = any( | |
| p.queue_type == "Infantry" and p.progress < 0.99 | |
| for p in obs.production | |
| ) | |
| infantry = [u for u in obs.units if u.type in self.INFANTRY_TYPES] | |
| total_target = self.INFANTRY_TRAIN_TARGET + self.GUARD_COUNT | |
| if has_barracks and not infantry_training and len(infantry) < total_target: | |
| if self._can_produce_item(obs, "e1") and obs.economy.cash >= 100: | |
| self._log(f"Training e1 ({len(infantry)}/{total_target})") | |
| commands.append(CommandModel(action=ActionType.TRAIN, item_type="e1")) | |
| # APC from war factory | |
| has_weap = any(b.type == "weap" for b in obs.buildings) | |
| vehicle_training = any( | |
| p.queue_type == "Vehicle" and p.progress < 0.99 | |
| for p in obs.production | |
| ) | |
| if (has_weap and not vehicle_training and not self._apc_trained | |
| and self._can_produce_item(obs, self.TRANSPORT_TYPE) | |
| and obs.economy.cash >= 800): | |
| self._log("Training APC for transport ops") | |
| commands.append(CommandModel(action=ActionType.TRAIN, item_type=self.TRANSPORT_TYPE)) | |
| self._apc_trained = True | |
| # Continuous vehicle production in attack phase | |
| if (self.phase == "attack" and has_weap and not vehicle_training | |
| and obs.economy.cash >= 800): | |
| # Build light tanks if available | |
| if self._can_produce_item(obs, "1tnk"): | |
| self._log("Training 1tnk (continuous production)") | |
| commands.append(CommandModel(action=ActionType.TRAIN, item_type="1tnk")) | |
| return commands | |
| def _can_produce_item(self, obs: OpenRAObservation, item_type: str) -> bool: | |
| """Check if item is buildable using per-building can_produce (Sprint 4).""" | |
| # First check global available_production | |
| if item_type in obs.available_production: | |
| return True | |
| # Also check per-building can_produce lists | |
| for b in obs.buildings: | |
| if item_type in b.can_produce: | |
| return True | |
| return False | |
| # ββ Stances (Sprint 4 action) ββββββββββββββββββββββββββββββββββ | |
| def _handle_stances(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| for u in obs.units: | |
| if u.actor_id in self._stances_set: | |
| continue | |
| if u.type not in self.COMBAT_UNIT_TYPES: | |
| continue | |
| # Guards get Defend stance, attackers get AttackAnything | |
| if u.actor_id in self._guards_assigned: | |
| desired = STANCE_DEFEND | |
| else: | |
| desired = STANCE_ATTACK_ANYTHING | |
| if u.stance != desired: | |
| self._log( | |
| f"Setting {u.type} (actor {u.actor_id}) stance: " | |
| f"{STANCE_NAMES.get(u.stance, '?')} β {STANCE_NAMES[desired]}" | |
| ) | |
| commands.append(CommandModel( | |
| action=ActionType.SET_STANCE, | |
| actor_id=u.actor_id, | |
| target_x=desired, | |
| )) | |
| self._stances_set.add(u.actor_id) | |
| return commands | |
| # ββ Guard CY (Sprint 4 action) ββββββββββββββββββββββββββββββββ | |
| def _handle_guards(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| if len(self._guards_assigned) >= self.GUARD_COUNT: | |
| return commands | |
| cy = self._find_building(obs, "fact") | |
| if not cy: | |
| return commands | |
| # Find idle infantry not yet guarding | |
| for u in obs.units: | |
| if len(self._guards_assigned) >= self.GUARD_COUNT: | |
| break | |
| if (u.type in self.INFANTRY_TYPES | |
| and u.is_idle | |
| and u.actor_id not in self._guards_assigned): | |
| self._log( | |
| f"Assigning {u.type} (actor {u.actor_id}, " | |
| f"range={u.attack_range}) to guard CY" | |
| ) | |
| commands.append(CommandModel( | |
| action=ActionType.GUARD, | |
| actor_id=u.actor_id, | |
| target_actor_id=cy.actor_id, | |
| )) | |
| self._guards_assigned.add(u.actor_id) | |
| return commands | |
| # ββ Transport: load/unload (Sprint 4 actions) βββββββββββββββββ | |
| def _handle_transport(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| if self._apc_loaded: | |
| return commands | |
| apc = next( | |
| (u for u in obs.units | |
| if u.type == self.TRANSPORT_TYPE and u.passenger_count == 0), | |
| None, | |
| ) | |
| if not apc: | |
| return commands | |
| # Load idle infantry (not guards) into the APC | |
| loaded = 0 | |
| for u in obs.units: | |
| if loaded >= 4: # APC capacity | |
| break | |
| if (u.type in self.INFANTRY_TYPES | |
| and u.is_idle | |
| and u.actor_id not in self._guards_assigned): | |
| self._log( | |
| f"Loading {u.type} (actor {u.actor_id}, " | |
| f"speed={u.speed}) into APC {apc.actor_id}" | |
| ) | |
| commands.append(CommandModel( | |
| action=ActionType.ENTER_TRANSPORT, | |
| actor_id=u.actor_id, | |
| target_actor_id=apc.actor_id, | |
| )) | |
| loaded += 1 | |
| if loaded > 0: | |
| self._apc_loaded = True | |
| return commands | |
| # ββ Combat βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _handle_combat(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| if self.phase != "attack": | |
| return commands | |
| # Unload APC near enemy | |
| commands.extend(self._handle_unload(obs)) | |
| # Attack-move idle fighters toward enemy | |
| idle_fighters = [ | |
| u for u in obs.units | |
| if (u.type in self.COMBAT_UNIT_TYPES | |
| and u.is_idle | |
| and u.actor_id not in self._guards_assigned) | |
| ] | |
| if len(idle_fighters) < 2: | |
| return commands | |
| target_x, target_y = self._find_attack_target(obs) | |
| for unit in idle_fighters: | |
| commands.append(CommandModel( | |
| action=ActionType.ATTACK_MOVE, | |
| actor_id=unit.actor_id, | |
| target_x=target_x, | |
| target_y=target_y, | |
| )) | |
| if idle_fighters: | |
| self._log( | |
| f"Attacking with {len(idle_fighters)} units " | |
| f"toward ({target_x}, {target_y})" | |
| ) | |
| return commands | |
| def _handle_unload(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| """Unload APC when near enemies.""" | |
| commands = [] | |
| for u in obs.units: | |
| if u.type != self.TRANSPORT_TYPE or u.passenger_count <= 0: | |
| continue | |
| # Check if any enemy is within ~15 cells | |
| for enemy in obs.visible_enemies: | |
| dx = abs(u.cell_x - enemy.cell_x) | |
| dy = abs(u.cell_y - enemy.cell_y) | |
| if dx + dy < 15: | |
| self._log( | |
| f"Unloading APC (actor {u.actor_id}, " | |
| f"{u.passenger_count} passengers) near enemy" | |
| ) | |
| commands.append(CommandModel( | |
| action=ActionType.UNLOAD, | |
| actor_id=u.actor_id, | |
| )) | |
| break | |
| # Also unload near enemy buildings | |
| for eb in obs.visible_enemy_buildings: | |
| dx = abs(u.cell_x - eb.cell_x) | |
| dy = abs(u.cell_y - eb.cell_y) | |
| if dx + dy < 15: | |
| self._log( | |
| f"Unloading APC near enemy building {eb.type} " | |
| f"(hp={eb.hp_percent:.0%})" | |
| ) | |
| commands.append(CommandModel( | |
| action=ActionType.UNLOAD, | |
| actor_id=u.actor_id, | |
| )) | |
| break | |
| return commands | |
| def _find_attack_target(self, obs: OpenRAObservation) -> Tuple[int, int]: | |
| """Prioritize enemy buildings > enemy units > map center.""" | |
| # Priority 1: visible enemy buildings (Sprint 4 field) | |
| if obs.visible_enemy_buildings: | |
| # Prefer production buildings | |
| prod_buildings = [ | |
| b for b in obs.visible_enemy_buildings | |
| if b.type in ("fact", "tent", "weap", "hpad", "afld") | |
| ] | |
| target = prod_buildings[0] if prod_buildings else obs.visible_enemy_buildings[0] | |
| return target.cell_x, target.cell_y | |
| # Priority 2: visible enemy units | |
| if obs.visible_enemies: | |
| enemy = obs.visible_enemies[0] | |
| return enemy.cell_x, enemy.cell_y | |
| # Fallback: map center | |
| if obs.map_info.width > 0: | |
| return obs.map_info.width // 2, obs.map_info.height // 2 | |
| return 64, 64 | |
| # ββ Sell heavily damaged buildings βββββββββββββββββββββββββββββ | |
| def _handle_sell(self, obs: OpenRAObservation) -> List[CommandModel]: | |
| commands = [] | |
| for b in obs.buildings: | |
| if (b.hp_percent < 0.2 | |
| and b.type != "fact" # never sell CY | |
| and b.actor_id not in self._sold): | |
| self._log( | |
| f"Selling {b.type} (actor {b.actor_id}, hp={b.hp_percent:.0%}, " | |
| f"refund=${b.sell_value})" | |
| ) | |
| commands.append(CommandModel( | |
| action=ActionType.SELL, | |
| actor_id=b.actor_id, | |
| )) | |
| self._sold.add(b.actor_id) | |
| return commands | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _resolve_build_item(self, obs: OpenRAObservation, placeholder: str) -> Optional[str]: | |
| """Resolve faction-agnostic build item to actual producible type.""" | |
| if placeholder == "barracks": | |
| # Find which barracks type is available | |
| for btype in self.BARRACKS_TYPES: | |
| if self._can_produce_item(obs, btype): | |
| return btype | |
| return None | |
| return placeholder | |
| def _has_building_type(self, obs: OpenRAObservation, item_type: str, build_index: int) -> bool: | |
| """Check if we already have enough of this building type.""" | |
| already_built = sum(1 for b in obs.buildings if b.type == item_type) | |
| # Count how many times this item appears up to current index | |
| resolved_order = [] | |
| for i, p in enumerate(self.BUILD_PRIORITY[:build_index + 1]): | |
| if p == "barracks": | |
| resolved_order.append(item_type if item_type in self.BARRACKS_TYPES else p) | |
| else: | |
| resolved_order.append(p) | |
| target_count = resolved_order.count(item_type) | |
| return already_built >= target_count | |
| def _find_building(self, obs: OpenRAObservation, btype: str) -> Optional[BuildingInfoModel]: | |
| return next((b for b in obs.buildings if b.type == btype), None) | |
| def _log(self, msg: str): | |
| if self.verbose: | |
| print(f" [Bot] {msg}") | |
| # ββ Status display βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_status(step: int, obs: OpenRAObservation, bot: ScriptedBot): | |
| """Print a rich status line using Sprint 4 observation fields.""" | |
| combat = [u for u in obs.units if u.type in bot.COMBAT_UNIT_TYPES] | |
| buildings = ", ".join(sorted(set(b.type for b in obs.buildings))) or "none" | |
| power_balance = obs.economy.power_provided - obs.economy.power_drained | |
| # Count enemy intel | |
| enemy_units = len(obs.visible_enemies) | |
| enemy_buildings = len(obs.visible_enemy_buildings) | |
| print( | |
| f"Step {step:4d} | Tick {obs.tick:5d} | " | |
| f"${obs.economy.cash:5d} | Pwr:{power_balance:+d} | " | |
| f"Units:{len(obs.units)} (combat:{len(combat)}) | " | |
| f"Enemy:{enemy_units}u/{enemy_buildings}b | " | |
| f"Bldgs:[{buildings}] | {bot.phase}" | |
| ) | |
| def print_detailed_status(obs: OpenRAObservation): | |
| """Print full observation details using all Sprint 4 fields.""" | |
| print("\nββ Detailed Observation ββ") | |
| # Spatial map | |
| if obs.spatial_channels > 0 and obs.spatial_map: | |
| raw_bytes = base64.b64decode(obs.spatial_map) | |
| w, h = obs.map_info.width, obs.map_info.height | |
| expected_bytes = w * h * obs.spatial_channels * 4 | |
| print( | |
| f" Spatial: {w}x{h} map, {obs.spatial_channels} channels, " | |
| f"{len(raw_bytes)} bytes (expected {expected_bytes})" | |
| ) | |
| else: | |
| print(" Spatial: not populated") | |
| # Economy | |
| e = obs.economy | |
| print( | |
| f" Economy: ${e.cash} cash, {e.ore} ore, " | |
| f"power {e.power_provided}/{e.power_drained} " | |
| f"({e.power_provided - e.power_drained:+d}), " | |
| f"{e.harvester_count} harvesters" | |
| ) | |
| # Production queue | |
| if obs.production: | |
| print(f" Production queue ({len(obs.production)}):") | |
| for p in obs.production: | |
| print(f" {p.queue_type}: {p.item} @ {p.progress:.0%} (paused={p.paused})") | |
| if obs.available_production: | |
| print(f" Available production: {', '.join(obs.available_production[:15])}") | |
| else: | |
| print(" Available production: (none)") | |
| # Own buildings with Sprint 4 fields | |
| print(f" Buildings ({len(obs.buildings)}):") | |
| for b in obs.buildings: | |
| extras = [] | |
| if b.power_amount != 0: | |
| extras.append(f"pwr={b.power_amount:+d}") | |
| if b.is_producing: | |
| extras.append(f"producing={b.producing_item}@{b.production_progress:.0%}") | |
| if b.is_repairing: | |
| extras.append("REPAIRING") | |
| if b.rally_x >= 0: | |
| extras.append(f"rally=({b.rally_x},{b.rally_y})") | |
| if b.can_produce: | |
| extras.append(f"can_produce=[{','.join(b.can_produce[:5])}{'...' if len(b.can_produce) > 5 else ''}]") | |
| extra_str = f" ({', '.join(extras)})" if extras else "" | |
| print( | |
| f" {b.type:6s} #{b.actor_id:4d} " | |
| f"cell=({b.cell_x},{b.cell_y}) " | |
| f"hp={b.hp_percent:.0%} " | |
| f"sell=${b.sell_value}{extra_str}" | |
| ) | |
| # Own units with Sprint 4 fields | |
| print(f" Units ({len(obs.units)}):") | |
| for u in obs.units[:10]: # cap at 10 for readability | |
| stance_name = STANCE_NAMES.get(u.stance, f"?{u.stance}") | |
| extras = [] | |
| if u.experience_level > 0: | |
| extras.append(f"vet={u.experience_level}") | |
| if u.passenger_count >= 0: | |
| extras.append(f"cargo={u.passenger_count}") | |
| extra_str = f" ({', '.join(extras)})" if extras else "" | |
| print( | |
| f" {u.type:6s} #{u.actor_id:4d} " | |
| f"cell=({u.cell_x},{u.cell_y}) " | |
| f"hp={u.hp_percent:.0%} " | |
| f"face={u.facing:4d} spd={u.speed:3d} " | |
| f"rng={u.attack_range:5d} " | |
| f"stance={stance_name} " | |
| f"{'IDLE' if u.is_idle else u.current_activity}{extra_str}" | |
| ) | |
| if len(obs.units) > 10: | |
| print(f" ... and {len(obs.units) - 10} more") | |
| # Visible enemies | |
| if obs.visible_enemies: | |
| print(f" Visible enemy units ({len(obs.visible_enemies)}):") | |
| for u in obs.visible_enemies[:5]: | |
| print( | |
| f" {u.type:6s} #{u.actor_id:4d} " | |
| f"cell=({u.cell_x},{u.cell_y}) hp={u.hp_percent:.0%} " | |
| f"spd={u.speed} rng={u.attack_range}" | |
| ) | |
| # Visible enemy buildings (Sprint 4 field) | |
| if obs.visible_enemy_buildings: | |
| print(f" Visible enemy buildings ({len(obs.visible_enemy_buildings)}):") | |
| for b in obs.visible_enemy_buildings[:5]: | |
| print( | |
| f" {b.type:6s} #{b.actor_id:4d} " | |
| f"cell=({b.cell_x},{b.cell_y}) hp={b.hp_percent:.0%} " | |
| f"pwr={b.power_amount:+d}" | |
| ) | |
| # ββ Main loop ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_bot(url: str, max_steps: int, verbose: bool): | |
| """Connect to the OpenRA-RL server and play one full game.""" | |
| print(f"Connecting to {url}...") | |
| bot = ScriptedBot(verbose=verbose) | |
| async with OpenRAEnv(base_url=url, message_timeout_s=300.0) as env: | |
| print("Resetting environment...") | |
| result = await env.reset() | |
| obs = result.observation | |
| print(f"Game started! Map: {obs.map_info.map_name} ({obs.map_info.width}x{obs.map_info.height})") | |
| # Print initial detailed status | |
| if verbose: | |
| print_detailed_status(obs) | |
| print_status(0, obs, bot) | |
| step = 0 | |
| total_reward = 0.0 | |
| while not result.done and step < max_steps: | |
| action = bot.decide(result.observation) | |
| result = await env.step(action) | |
| step += 1 | |
| total_reward += result.reward or 0.0 | |
| obs = result.observation | |
| if step % 100 == 0: | |
| print_status(step, obs, bot) | |
| # Detailed dump at key milestones | |
| if verbose and step in (50, 200, 500, 1000): | |
| print_detailed_status(obs) | |
| # Final report | |
| print() | |
| print("=" * 70) | |
| obs = result.observation | |
| if obs.done: | |
| print(f"GAME OVER: {obs.result.upper()} after {step} steps (tick {obs.tick})") | |
| else: | |
| print(f"Reached max steps ({max_steps}) at tick {obs.tick}") | |
| print(f"Total reward: {total_reward:.3f}") | |
| print(f"Final cash: ${obs.economy.cash}") | |
| print(f"Power balance: {obs.economy.power_provided - obs.economy.power_drained:+d}") | |
| print(f"Units killed: {obs.military.units_killed}") | |
| print(f"Units lost: {obs.military.units_lost}") | |
| print(f"Buildings killed: {obs.military.buildings_killed}") | |
| print(f"Buildings lost: {obs.military.buildings_lost}") | |
| print(f"Army value: ${obs.military.army_value}") | |
| print(f"Own buildings: {len(obs.buildings)}") | |
| print(f"Visible enemies: {len(obs.visible_enemies)} units, {len(obs.visible_enemy_buildings)} buildings") | |
| # Spatial map stats | |
| if obs.spatial_channels > 0 and obs.spatial_map: | |
| raw_bytes = base64.b64decode(obs.spatial_map) | |
| n_floats = len(raw_bytes) // 4 | |
| print(f"Spatial map: {n_floats} floats ({obs.spatial_channels} channels)") | |
| else: | |
| print("Spatial map: not populated") | |
| # Show veteran units | |
| vets = [u for u in obs.units if u.experience_level > 0] | |
| if vets: | |
| print(f"Veterans: {', '.join(f'{u.type}#{u.actor_id}(lvl{u.experience_level})' for u in vets)}") | |
| if verbose: | |
| print_detailed_status(obs) | |
| print("=" * 70) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Scripted Red Alert bot via OpenEnv") | |
| parser.add_argument( | |
| "--url", | |
| default="http://localhost:8000", | |
| help="OpenRA-RL server URL (default: http://localhost:8000)", | |
| ) | |
| parser.add_argument( | |
| "--max-steps", | |
| type=int, | |
| default=5000, | |
| help="Maximum steps before stopping (default: 5000)", | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| action="store_true", | |
| help="Print detailed bot decisions and observation dumps", | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| asyncio.run(run_bot(args.url, args.max_steps, args.verbose)) | |
| except KeyboardInterrupt: | |
| print("\nInterrupted by user") | |
| sys.exit(0) | |
| except ConnectionRefusedError: | |
| print(f"\nCould not connect to {args.url}") | |
| print("Is the OpenRA-RL server running?") | |
| print(" docker run -p 8000:8000 openra-rl") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |