openra-rl / examples /scripted_bot.py
github-actions[bot]
Sync from GitHub ac82c3e
02f4a63
#!/usr/bin/env python3
"""Scripted Red Alert bot that plays a full game via the OpenEnv client API.
Exercises ALL Sprint 4+5 observation fields and action types:
- Observations: spatial_map, visible_enemy_buildings, unit facing/stance/speed/
attack_range/experience/passengers, building cell coords/can_produce/power/
rally/repair/sell_value
- Actions: all 20 types including GUARD, SET_STANCE, ENTER_TRANSPORT, UNLOAD,
SET_RALLY_POINT, REPAIR, SELL, POWER_DOWN, SET_PRIMARY
Usage:
docker run -p 8000:8000 openra-rl
python examples/scripted_bot.py --verbose
"""
import argparse
import asyncio
import base64
import sys
from typing import List, Optional, Tuple
from openra_env.client import OpenRAEnv
from openra_env.models import (
ActionType,
BuildingInfoModel,
CommandModel,
OpenRAAction,
OpenRAObservation,
UnitInfoModel,
)
# Stance constants matching C# AutoTarget.UnitStance enum
STANCE_HOLD_FIRE = 0
STANCE_RETURN_FIRE = 1
STANCE_DEFEND = 2
STANCE_ATTACK_ANYTHING = 3
STANCE_NAMES = {0: "HoldFire", 1: "ReturnFire", 2: "Defend", 3: "AttackAnything"}
class ScriptedBot:
"""State-machine bot with a Red Alert build order exercising all actions.
Phases:
deploy_mcv - Deploy MCV, set stance on starting units
build_base - Build power/barracks/war factory, set rally points
train_army - Train infantry + APC, guard CY, load transport
attack - Attack-move toward enemy buildings, unload APC
sustain - Continuous production, repair, sell damaged buildings
"""
# Build order uses both faction names โ€” bot picks whichever is available
BARRACKS_TYPES = {"tent", "barr"} # Allied / Soviet
WAR_FACTORY_TYPES = {"weap"}
BUILD_PRIORITY = [
"powr", # Power Plant ($300) โ€” shared
"barracks", # Placeholder: tent (Allied) or barr (Soviet)
"proc", # Ore Refinery ($2000) โ€” needed before war factory
"weap", # War Factory ($2000) โ€” shared
"powr", # Second Power Plant
]
INFANTRY_TRAIN_TARGET = 6
GUARD_COUNT = 2 # infantry to guard CY
TRANSPORT_TYPE = "apc"
COMBAT_UNIT_TYPES = {"e1", "e2", "e3", "e4", "1tnk", "2tnk", "3tnk", "arty", "jeep", "apc"}
INFANTRY_TYPES = {"e1", "e2", "e3", "e4"}
VEHICLE_TYPES = {"1tnk", "2tnk", "3tnk", "arty", "jeep"}
def __init__(self, verbose: bool = False):
self.phase = "deploy_mcv"
self.build_index = 0
self.placement_count = 0
self.deploy_issued = False
self.verbose = verbose
self._guards_assigned: set[int] = set() # actor IDs guarding CY
self._stances_set: set[int] = set() # actor IDs with stance already set
self._rally_set: set[int] = set() # building actor IDs with rally point set
self._apc_trained = False
self._apc_loaded = False
self._repair_issued: set[int] = set() # building actor IDs being repaired
self._sold: set[int] = set() # building actor IDs sold
self._powered_down: set[int] = set() # building actor IDs powered down
self._primary_set: set[int] = set() # building actor IDs set as primary
def decide(self, obs: OpenRAObservation) -> OpenRAAction:
"""Given current observation, return commands for this tick."""
commands: List[CommandModel] = []
self._update_phase(obs)
# Priority 1: Place completed buildings
commands.extend(self._handle_placement(obs))
# Priority 2: Deploy MCV
if self.phase == "deploy_mcv":
cmd = self._handle_deploy(obs)
if cmd:
commands.append(cmd)
# Priority 3: Set rally points on production buildings
commands.extend(self._handle_rally_points(obs))
# Priority 4: Power management (power down buildings if power negative)
commands.extend(self._handle_power_management(obs))
# Priority 5: Set primary production buildings
commands.extend(self._handle_set_primary(obs))
# Priority 6: Repair damaged buildings
commands.extend(self._handle_repairs(obs))
# Priority 7: Queue production (buildings + units)
commands.extend(self._handle_production(obs))
# Priority 8: Set stances on new units
commands.extend(self._handle_stances(obs))
# Priority 9: Assign guards to CY
commands.extend(self._handle_guards(obs))
# Priority 10: Load infantry into APC
commands.extend(self._handle_transport(obs))
# Priority 11: Combat โ€” attack + unload
commands.extend(self._handle_combat(obs))
# Priority 12: Sell heavily damaged buildings
commands.extend(self._handle_sell(obs))
if not commands:
commands.append(CommandModel(action=ActionType.NO_OP))
return OpenRAAction(commands=commands)
# โ”€โ”€ Phase transitions โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _update_phase(self, obs: OpenRAObservation):
has_cy = any(b.type == "fact" for b in obs.buildings)
has_barracks = any(b.type in self.BARRACKS_TYPES for b in obs.buildings)
combat_units = [u for u in obs.units if u.type in self.COMBAT_UNIT_TYPES]
non_guard_combat = [u for u in combat_units if u.actor_id not in self._guards_assigned]
if self.phase == "deploy_mcv" and has_cy:
self.phase = "build_base"
self._log("Phase โ†’ build_base")
elif self.phase == "build_base" and self.build_index >= len(self.BUILD_PRIORITY):
self.phase = "train_army"
self._log("Phase โ†’ train_army")
elif self.phase == "train_army" and len(non_guard_combat) >= self.INFANTRY_TRAIN_TARGET:
self.phase = "attack"
self._log(f"Phase โ†’ attack ({len(non_guard_combat)} combat units ready)")
elif self.phase == "attack" and has_barracks:
# Stay in attack but also sustain production
pass
# โ”€โ”€ Deploy MCV โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_deploy(self, obs: OpenRAObservation) -> Optional[CommandModel]:
if self.deploy_issued:
return None
mcv = next((u for u in obs.units if u.type == "mcv"), None)
if mcv:
self.deploy_issued = True
self._log(f"Deploying MCV (actor {mcv.actor_id}, facing={mcv.facing})")
return CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id)
return None
# โ”€โ”€ Building placement โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_placement(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
cy = self._find_building(obs, "fact")
if not cy:
return commands
for prod in obs.production:
if prod.queue_type == "Building" and prod.progress >= 0.99:
x, y = self._placement_offset(cy)
self._log(f"Placing {prod.item} at cell ({x}, {y}) [attempt {self.placement_count}]")
commands.append(CommandModel(
action=ActionType.PLACE_BUILDING,
item_type=prod.item,
target_x=x,
target_y=y,
))
self.placement_count += 1
return commands
def _placement_offset(self, cy: BuildingInfoModel) -> Tuple[int, int]:
"""Calculate placement position relative to CY using cell coords."""
# Use pos_x // 1024 as CenterPosition maps to cell more reliably
cx = cy.pos_x // 1024
cy_y = cy.pos_y // 1024
# Many offsets to maximize chance of finding valid terrain
offsets = [
(3, 0), (-3, 0), (0, 3), (0, -3),
(3, 3), (-3, 3), (3, -3), (-3, -3),
(6, 0), (-6, 0), (0, 6), (0, -6),
(2, 0), (-2, 0), (0, 2), (0, -2),
(4, 0), (-4, 0), (0, 4), (0, -4),
]
idx = self.placement_count % len(offsets)
dx, dy = offsets[idx]
return cx + dx, cy_y + dy
# โ”€โ”€ Rally points (Sprint 4 action) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_rally_points(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
cy = self._find_building(obs, "fact")
if not cy:
return commands
# Set rally point on barracks and war factory toward CY
for b in obs.buildings:
if b.type in ("tent", "weap") and b.actor_id not in self._rally_set:
rally_x = cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024
rally_y = cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024
self._log(f"Setting rally on {b.type} (actor {b.actor_id}) โ†’ ({rally_x}, {rally_y})")
commands.append(CommandModel(
action=ActionType.SET_RALLY_POINT,
actor_id=b.actor_id,
target_x=rally_x,
target_y=rally_y,
))
self._rally_set.add(b.actor_id)
return commands
# โ”€โ”€ Power management (Sprint 5 action) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_power_management(self, obs: OpenRAObservation) -> List[CommandModel]:
"""Power down non-essential buildings when power balance is negative."""
commands = []
power_balance = obs.economy.power_provided - obs.economy.power_drained
if power_balance >= 0:
return commands
# Power down radar/tech buildings first (keep production running)
POWER_DOWN_PRIORITY = ["dome", "spen", "syrd", "hpad", "afld", "fix"]
for btype in POWER_DOWN_PRIORITY:
for b in obs.buildings:
if b.type == btype and b.is_powered and b.actor_id not in self._powered_down:
commands.append(CommandModel(action=ActionType.POWER_DOWN, actor_id=b.actor_id))
self._powered_down.add(b.actor_id)
self._log(f"Powering down {b.type} (actor {b.actor_id}) โ€” power balance: {power_balance}")
return commands # one at a time
return commands
# โ”€โ”€ Set primary building (Sprint 5 action) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_set_primary(self, obs: OpenRAObservation) -> List[CommandModel]:
"""Set primary on newest production building of each type."""
commands = []
for btype_set in [self.BARRACKS_TYPES, self.WAR_FACTORY_TYPES]:
buildings_of_type = [b for b in obs.buildings if b.type in btype_set]
if len(buildings_of_type) >= 2:
newest = max(buildings_of_type, key=lambda b: b.actor_id)
if newest.actor_id not in self._primary_set:
commands.append(CommandModel(action=ActionType.SET_PRIMARY, actor_id=newest.actor_id))
self._primary_set.add(newest.actor_id)
self._log(f"Setting primary: {newest.type} (actor {newest.actor_id})")
return commands
# โ”€โ”€ Repair damaged buildings (Sprint 4 observation + existing action) โ”€โ”€
def _handle_repairs(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
for b in obs.buildings:
if (b.hp_percent < 0.7
and not b.is_repairing
and b.actor_id not in self._repair_issued
and obs.economy.cash >= 500):
self._log(f"Repairing {b.type} (actor {b.actor_id}, hp={b.hp_percent:.0%})")
commands.append(CommandModel(
action=ActionType.REPAIR,
actor_id=b.actor_id,
))
self._repair_issued.add(b.actor_id)
return commands
# โ”€โ”€ Production โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_production(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
# Building construction โ€” treat any Building queue item as "in progress"
# (includes completed-but-unplaced buildings that block the queue)
building_in_queue = any(
p.queue_type == "Building"
for p in obs.production
)
if not building_in_queue and self.build_index < len(self.BUILD_PRIORITY):
item_type = self._resolve_build_item(obs, self.BUILD_PRIORITY[self.build_index])
if item_type is None:
# Can't resolve this item yet, skip
pass
elif self._has_building_type(obs, item_type, self.build_index):
self.build_index += 1
elif self._can_produce_item(obs, item_type):
self._log(f"Building {item_type} (#{self.build_index + 1}/{len(self.BUILD_PRIORITY)})")
commands.append(CommandModel(action=ActionType.BUILD, item_type=item_type))
self.build_index += 1
# Infantry training
has_barracks = any(b.type in self.BARRACKS_TYPES for b in obs.buildings)
infantry_training = any(
p.queue_type == "Infantry" and p.progress < 0.99
for p in obs.production
)
infantry = [u for u in obs.units if u.type in self.INFANTRY_TYPES]
total_target = self.INFANTRY_TRAIN_TARGET + self.GUARD_COUNT
if has_barracks and not infantry_training and len(infantry) < total_target:
if self._can_produce_item(obs, "e1") and obs.economy.cash >= 100:
self._log(f"Training e1 ({len(infantry)}/{total_target})")
commands.append(CommandModel(action=ActionType.TRAIN, item_type="e1"))
# APC from war factory
has_weap = any(b.type == "weap" for b in obs.buildings)
vehicle_training = any(
p.queue_type == "Vehicle" and p.progress < 0.99
for p in obs.production
)
if (has_weap and not vehicle_training and not self._apc_trained
and self._can_produce_item(obs, self.TRANSPORT_TYPE)
and obs.economy.cash >= 800):
self._log("Training APC for transport ops")
commands.append(CommandModel(action=ActionType.TRAIN, item_type=self.TRANSPORT_TYPE))
self._apc_trained = True
# Continuous vehicle production in attack phase
if (self.phase == "attack" and has_weap and not vehicle_training
and obs.economy.cash >= 800):
# Build light tanks if available
if self._can_produce_item(obs, "1tnk"):
self._log("Training 1tnk (continuous production)")
commands.append(CommandModel(action=ActionType.TRAIN, item_type="1tnk"))
return commands
def _can_produce_item(self, obs: OpenRAObservation, item_type: str) -> bool:
"""Check if item is buildable using per-building can_produce (Sprint 4)."""
# First check global available_production
if item_type in obs.available_production:
return True
# Also check per-building can_produce lists
for b in obs.buildings:
if item_type in b.can_produce:
return True
return False
# โ”€โ”€ Stances (Sprint 4 action) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_stances(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
for u in obs.units:
if u.actor_id in self._stances_set:
continue
if u.type not in self.COMBAT_UNIT_TYPES:
continue
# Guards get Defend stance, attackers get AttackAnything
if u.actor_id in self._guards_assigned:
desired = STANCE_DEFEND
else:
desired = STANCE_ATTACK_ANYTHING
if u.stance != desired:
self._log(
f"Setting {u.type} (actor {u.actor_id}) stance: "
f"{STANCE_NAMES.get(u.stance, '?')} โ†’ {STANCE_NAMES[desired]}"
)
commands.append(CommandModel(
action=ActionType.SET_STANCE,
actor_id=u.actor_id,
target_x=desired,
))
self._stances_set.add(u.actor_id)
return commands
# โ”€โ”€ Guard CY (Sprint 4 action) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_guards(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if len(self._guards_assigned) >= self.GUARD_COUNT:
return commands
cy = self._find_building(obs, "fact")
if not cy:
return commands
# Find idle infantry not yet guarding
for u in obs.units:
if len(self._guards_assigned) >= self.GUARD_COUNT:
break
if (u.type in self.INFANTRY_TYPES
and u.is_idle
and u.actor_id not in self._guards_assigned):
self._log(
f"Assigning {u.type} (actor {u.actor_id}, "
f"range={u.attack_range}) to guard CY"
)
commands.append(CommandModel(
action=ActionType.GUARD,
actor_id=u.actor_id,
target_actor_id=cy.actor_id,
))
self._guards_assigned.add(u.actor_id)
return commands
# โ”€โ”€ Transport: load/unload (Sprint 4 actions) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_transport(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if self._apc_loaded:
return commands
apc = next(
(u for u in obs.units
if u.type == self.TRANSPORT_TYPE and u.passenger_count == 0),
None,
)
if not apc:
return commands
# Load idle infantry (not guards) into the APC
loaded = 0
for u in obs.units:
if loaded >= 4: # APC capacity
break
if (u.type in self.INFANTRY_TYPES
and u.is_idle
and u.actor_id not in self._guards_assigned):
self._log(
f"Loading {u.type} (actor {u.actor_id}, "
f"speed={u.speed}) into APC {apc.actor_id}"
)
commands.append(CommandModel(
action=ActionType.ENTER_TRANSPORT,
actor_id=u.actor_id,
target_actor_id=apc.actor_id,
))
loaded += 1
if loaded > 0:
self._apc_loaded = True
return commands
# โ”€โ”€ Combat โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_combat(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if self.phase != "attack":
return commands
# Unload APC near enemy
commands.extend(self._handle_unload(obs))
# Attack-move idle fighters toward enemy
idle_fighters = [
u for u in obs.units
if (u.type in self.COMBAT_UNIT_TYPES
and u.is_idle
and u.actor_id not in self._guards_assigned)
]
if len(idle_fighters) < 2:
return commands
target_x, target_y = self._find_attack_target(obs)
for unit in idle_fighters:
commands.append(CommandModel(
action=ActionType.ATTACK_MOVE,
actor_id=unit.actor_id,
target_x=target_x,
target_y=target_y,
))
if idle_fighters:
self._log(
f"Attacking with {len(idle_fighters)} units "
f"toward ({target_x}, {target_y})"
)
return commands
def _handle_unload(self, obs: OpenRAObservation) -> List[CommandModel]:
"""Unload APC when near enemies."""
commands = []
for u in obs.units:
if u.type != self.TRANSPORT_TYPE or u.passenger_count <= 0:
continue
# Check if any enemy is within ~15 cells
for enemy in obs.visible_enemies:
dx = abs(u.cell_x - enemy.cell_x)
dy = abs(u.cell_y - enemy.cell_y)
if dx + dy < 15:
self._log(
f"Unloading APC (actor {u.actor_id}, "
f"{u.passenger_count} passengers) near enemy"
)
commands.append(CommandModel(
action=ActionType.UNLOAD,
actor_id=u.actor_id,
))
break
# Also unload near enemy buildings
for eb in obs.visible_enemy_buildings:
dx = abs(u.cell_x - eb.cell_x)
dy = abs(u.cell_y - eb.cell_y)
if dx + dy < 15:
self._log(
f"Unloading APC near enemy building {eb.type} "
f"(hp={eb.hp_percent:.0%})"
)
commands.append(CommandModel(
action=ActionType.UNLOAD,
actor_id=u.actor_id,
))
break
return commands
def _find_attack_target(self, obs: OpenRAObservation) -> Tuple[int, int]:
"""Prioritize enemy buildings > enemy units > map center."""
# Priority 1: visible enemy buildings (Sprint 4 field)
if obs.visible_enemy_buildings:
# Prefer production buildings
prod_buildings = [
b for b in obs.visible_enemy_buildings
if b.type in ("fact", "tent", "weap", "hpad", "afld")
]
target = prod_buildings[0] if prod_buildings else obs.visible_enemy_buildings[0]
return target.cell_x, target.cell_y
# Priority 2: visible enemy units
if obs.visible_enemies:
enemy = obs.visible_enemies[0]
return enemy.cell_x, enemy.cell_y
# Fallback: map center
if obs.map_info.width > 0:
return obs.map_info.width // 2, obs.map_info.height // 2
return 64, 64
# โ”€โ”€ Sell heavily damaged buildings โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _handle_sell(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
for b in obs.buildings:
if (b.hp_percent < 0.2
and b.type != "fact" # never sell CY
and b.actor_id not in self._sold):
self._log(
f"Selling {b.type} (actor {b.actor_id}, hp={b.hp_percent:.0%}, "
f"refund=${b.sell_value})"
)
commands.append(CommandModel(
action=ActionType.SELL,
actor_id=b.actor_id,
))
self._sold.add(b.actor_id)
return commands
# โ”€โ”€ Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _resolve_build_item(self, obs: OpenRAObservation, placeholder: str) -> Optional[str]:
"""Resolve faction-agnostic build item to actual producible type."""
if placeholder == "barracks":
# Find which barracks type is available
for btype in self.BARRACKS_TYPES:
if self._can_produce_item(obs, btype):
return btype
return None
return placeholder
def _has_building_type(self, obs: OpenRAObservation, item_type: str, build_index: int) -> bool:
"""Check if we already have enough of this building type."""
already_built = sum(1 for b in obs.buildings if b.type == item_type)
# Count how many times this item appears up to current index
resolved_order = []
for i, p in enumerate(self.BUILD_PRIORITY[:build_index + 1]):
if p == "barracks":
resolved_order.append(item_type if item_type in self.BARRACKS_TYPES else p)
else:
resolved_order.append(p)
target_count = resolved_order.count(item_type)
return already_built >= target_count
def _find_building(self, obs: OpenRAObservation, btype: str) -> Optional[BuildingInfoModel]:
return next((b for b in obs.buildings if b.type == btype), None)
def _log(self, msg: str):
if self.verbose:
print(f" [Bot] {msg}")
# โ”€โ”€ Status display โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def print_status(step: int, obs: OpenRAObservation, bot: ScriptedBot):
"""Print a rich status line using Sprint 4 observation fields."""
combat = [u for u in obs.units if u.type in bot.COMBAT_UNIT_TYPES]
buildings = ", ".join(sorted(set(b.type for b in obs.buildings))) or "none"
power_balance = obs.economy.power_provided - obs.economy.power_drained
# Count enemy intel
enemy_units = len(obs.visible_enemies)
enemy_buildings = len(obs.visible_enemy_buildings)
print(
f"Step {step:4d} | Tick {obs.tick:5d} | "
f"${obs.economy.cash:5d} | Pwr:{power_balance:+d} | "
f"Units:{len(obs.units)} (combat:{len(combat)}) | "
f"Enemy:{enemy_units}u/{enemy_buildings}b | "
f"Bldgs:[{buildings}] | {bot.phase}"
)
def print_detailed_status(obs: OpenRAObservation):
"""Print full observation details using all Sprint 4 fields."""
print("\nโ”€โ”€ Detailed Observation โ”€โ”€")
# Spatial map
if obs.spatial_channels > 0 and obs.spatial_map:
raw_bytes = base64.b64decode(obs.spatial_map)
w, h = obs.map_info.width, obs.map_info.height
expected_bytes = w * h * obs.spatial_channels * 4
print(
f" Spatial: {w}x{h} map, {obs.spatial_channels} channels, "
f"{len(raw_bytes)} bytes (expected {expected_bytes})"
)
else:
print(" Spatial: not populated")
# Economy
e = obs.economy
print(
f" Economy: ${e.cash} cash, {e.ore} ore, "
f"power {e.power_provided}/{e.power_drained} "
f"({e.power_provided - e.power_drained:+d}), "
f"{e.harvester_count} harvesters"
)
# Production queue
if obs.production:
print(f" Production queue ({len(obs.production)}):")
for p in obs.production:
print(f" {p.queue_type}: {p.item} @ {p.progress:.0%} (paused={p.paused})")
if obs.available_production:
print(f" Available production: {', '.join(obs.available_production[:15])}")
else:
print(" Available production: (none)")
# Own buildings with Sprint 4 fields
print(f" Buildings ({len(obs.buildings)}):")
for b in obs.buildings:
extras = []
if b.power_amount != 0:
extras.append(f"pwr={b.power_amount:+d}")
if b.is_producing:
extras.append(f"producing={b.producing_item}@{b.production_progress:.0%}")
if b.is_repairing:
extras.append("REPAIRING")
if b.rally_x >= 0:
extras.append(f"rally=({b.rally_x},{b.rally_y})")
if b.can_produce:
extras.append(f"can_produce=[{','.join(b.can_produce[:5])}{'...' if len(b.can_produce) > 5 else ''}]")
extra_str = f" ({', '.join(extras)})" if extras else ""
print(
f" {b.type:6s} #{b.actor_id:4d} "
f"cell=({b.cell_x},{b.cell_y}) "
f"hp={b.hp_percent:.0%} "
f"sell=${b.sell_value}{extra_str}"
)
# Own units with Sprint 4 fields
print(f" Units ({len(obs.units)}):")
for u in obs.units[:10]: # cap at 10 for readability
stance_name = STANCE_NAMES.get(u.stance, f"?{u.stance}")
extras = []
if u.experience_level > 0:
extras.append(f"vet={u.experience_level}")
if u.passenger_count >= 0:
extras.append(f"cargo={u.passenger_count}")
extra_str = f" ({', '.join(extras)})" if extras else ""
print(
f" {u.type:6s} #{u.actor_id:4d} "
f"cell=({u.cell_x},{u.cell_y}) "
f"hp={u.hp_percent:.0%} "
f"face={u.facing:4d} spd={u.speed:3d} "
f"rng={u.attack_range:5d} "
f"stance={stance_name} "
f"{'IDLE' if u.is_idle else u.current_activity}{extra_str}"
)
if len(obs.units) > 10:
print(f" ... and {len(obs.units) - 10} more")
# Visible enemies
if obs.visible_enemies:
print(f" Visible enemy units ({len(obs.visible_enemies)}):")
for u in obs.visible_enemies[:5]:
print(
f" {u.type:6s} #{u.actor_id:4d} "
f"cell=({u.cell_x},{u.cell_y}) hp={u.hp_percent:.0%} "
f"spd={u.speed} rng={u.attack_range}"
)
# Visible enemy buildings (Sprint 4 field)
if obs.visible_enemy_buildings:
print(f" Visible enemy buildings ({len(obs.visible_enemy_buildings)}):")
for b in obs.visible_enemy_buildings[:5]:
print(
f" {b.type:6s} #{b.actor_id:4d} "
f"cell=({b.cell_x},{b.cell_y}) hp={b.hp_percent:.0%} "
f"pwr={b.power_amount:+d}"
)
# โ”€โ”€ Main loop โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def run_bot(url: str, max_steps: int, verbose: bool):
"""Connect to the OpenRA-RL server and play one full game."""
print(f"Connecting to {url}...")
bot = ScriptedBot(verbose=verbose)
async with OpenRAEnv(base_url=url, message_timeout_s=300.0) as env:
print("Resetting environment...")
result = await env.reset()
obs = result.observation
print(f"Game started! Map: {obs.map_info.map_name} ({obs.map_info.width}x{obs.map_info.height})")
# Print initial detailed status
if verbose:
print_detailed_status(obs)
print_status(0, obs, bot)
step = 0
total_reward = 0.0
while not result.done and step < max_steps:
action = bot.decide(result.observation)
result = await env.step(action)
step += 1
total_reward += result.reward or 0.0
obs = result.observation
if step % 100 == 0:
print_status(step, obs, bot)
# Detailed dump at key milestones
if verbose and step in (50, 200, 500, 1000):
print_detailed_status(obs)
# Final report
print()
print("=" * 70)
obs = result.observation
if obs.done:
print(f"GAME OVER: {obs.result.upper()} after {step} steps (tick {obs.tick})")
else:
print(f"Reached max steps ({max_steps}) at tick {obs.tick}")
print(f"Total reward: {total_reward:.3f}")
print(f"Final cash: ${obs.economy.cash}")
print(f"Power balance: {obs.economy.power_provided - obs.economy.power_drained:+d}")
print(f"Units killed: {obs.military.units_killed}")
print(f"Units lost: {obs.military.units_lost}")
print(f"Buildings killed: {obs.military.buildings_killed}")
print(f"Buildings lost: {obs.military.buildings_lost}")
print(f"Army value: ${obs.military.army_value}")
print(f"Own buildings: {len(obs.buildings)}")
print(f"Visible enemies: {len(obs.visible_enemies)} units, {len(obs.visible_enemy_buildings)} buildings")
# Spatial map stats
if obs.spatial_channels > 0 and obs.spatial_map:
raw_bytes = base64.b64decode(obs.spatial_map)
n_floats = len(raw_bytes) // 4
print(f"Spatial map: {n_floats} floats ({obs.spatial_channels} channels)")
else:
print("Spatial map: not populated")
# Show veteran units
vets = [u for u in obs.units if u.experience_level > 0]
if vets:
print(f"Veterans: {', '.join(f'{u.type}#{u.actor_id}(lvl{u.experience_level})' for u in vets)}")
if verbose:
print_detailed_status(obs)
print("=" * 70)
def main():
parser = argparse.ArgumentParser(description="Scripted Red Alert bot via OpenEnv")
parser.add_argument(
"--url",
default="http://localhost:8000",
help="OpenRA-RL server URL (default: http://localhost:8000)",
)
parser.add_argument(
"--max-steps",
type=int,
default=5000,
help="Maximum steps before stopping (default: 5000)",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Print detailed bot decisions and observation dumps",
)
args = parser.parse_args()
try:
asyncio.run(run_bot(args.url, args.max_steps, args.verbose))
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(0)
except ConnectionRefusedError:
print(f"\nCould not connect to {args.url}")
print("Is the OpenRA-RL server running?")
print(" docker run -p 8000:8000 openra-rl")
sys.exit(1)
if __name__ == "__main__":
main()