openra-rl-challenge / scripts /normal_ai_bot.py
github-actions[bot]
Sync Space files from 04ee2ab23ee580fa351550303a8efd99a52df7e2
82d84b1
"""Python reimplementation of OpenRA's Normal AI (ModularBot@NormalAI).
Mirrors the C# modular bot architecture with these managers:
- Economy (HarvesterBotModule@normal-turtle)
- Base building (BaseBuilderBotModule@normal)
- Unit production (UnitBuilderBotModule@normal)
- Squads (SquadManagerBotModule@normal)
- Repairs (BuildingRepairBotModule)
- Power (PowerDownBotModule)
All unit weights are taken directly from mods/ra/rules/ai.yaml.
"""
import base64
import random
import struct
from typing import List, Optional, Tuple
from openra_env.models import (
ActionType,
BuildingInfoModel,
CommandModel,
OpenRAAction,
OpenRAObservation,
UnitInfoModel,
)
STANCE_DEFEND = 2
STANCE_ATTACK_ANYTHING = 3
# ---------------------------------------------------------------------------
# Constants from ai.yaml (@normal)
# ---------------------------------------------------------------------------
UNITS_TO_BUILD: dict[str, int] = {
"e1": 65, "e2": 15, "e3": 30, "e4": 15, "e7": 1, "dog": 15,
"shok": 15, "harv": 15, "apc": 30, "jeep": 20, "arty": 15,
"v2rl": 40, "ftrk": 30, "1tnk": 40, "2tnk": 50, "3tnk": 50,
"4tnk": 25, "ttnk": 25, "stnk": 5, "heli": 30, "mh60": 30,
"mig": 30, "yak": 30, "ss": 10, "msub": 10, "dd": 10,
"ca": 10, "pt": 10,
}
UNIT_LIMITS: dict[str, int] = {"dog": 4, "harv": 8, "jeep": 4, "ftrk": 4}
INFANTRY_TYPES = {"e1", "e2", "e3", "e4", "e7", "shok", "dog"}
VEHICLE_TYPES = {"harv", "apc", "jeep", "arty", "v2rl", "ftrk",
"1tnk", "2tnk", "3tnk", "4tnk", "ttnk", "stnk", "mcv"}
AIRCRAFT_TYPES = {"heli", "mh60", "mig", "yak", "hind"}
PLANE_TYPES = {"mig", "yak"}
SHIP_TYPES = {"ss", "msub", "dd", "ca", "pt"}
COMBAT_TYPES = (
{"e1", "e2", "e3", "e4", "e7", "shok"} |
{"apc", "jeep", "arty", "v2rl", "ftrk", "1tnk", "2tnk", "3tnk", "4tnk", "ttnk", "stnk"} |
SHIP_TYPES |
AIRCRAFT_TYPES
)
SQUAD_SIZE = 40
SQUAD_SIZE_RANDOM_BONUS = 30
EXCLUDE_FROM_SQUADS = {"harv", "mcv", "dog", "badr.bomber", "u2"}
BARRACKS_TYPES = {"tent", "barr"}
WAR_FACTORY_TYPES = {"weap"}
PRODUCTION_BUILDING_TYPES = BARRACKS_TYPES | WAR_FACTORY_TYPES
TECH_BUILDING_TYPES = {"dome", "atek", "stek", "fix", "afld", "afld.ukraine", "hpad"}
POWER_DOWN_TYPES = {"dome", "tsla", "mslo", "agun", "sam"}
PROTECTION_TYPES = {
"harv", "mcv", "mslo", "gap", "spen", "syrd", "iron", "pdox", "tsla", "agun",
"dome", "pbox", "hbox", "gun", "ftur", "sam", "atek", "weap", "fact", "proc",
"silo", "hpad", "afld", "afld.ukraine", "powr", "apwr", "stek", "barr", "kenn",
"tent", "fix", "fpwr", "tenf", "syrf", "spef", "weaf", "domf", "fixf", "fapw",
"atef", "pdof", "mslf", "facf",
}
UNIT_QUEUE_ORDER: tuple[tuple[str, set[str]], ...] = (
("Vehicle", VEHICLE_TYPES - {"mcv"}),
("Infantry", INFANTRY_TYPES),
("Plane", PLANE_TYPES),
("Ship", SHIP_TYPES),
("Aircraft", AIRCRAFT_TYPES - PLANE_TYPES),
)
STRUCTURE_QUEUE_TYPES = {"Building", "Defense"}
DEFENSE_STRUCTURE_TYPES = {"pbox", "hbox", "gun", "ftur", "tsla", "agun", "sam", "gap", "mslo"}
ATTACKING_BUILDING_TYPES = {"pbox", "hbox", "gun", "ftur", "tsla", "agun", "sam"}
NAVAL_STRUCTURE_TYPES = {"spen", "syrd"}
ENEMY_FACING_STRUCTURE_TYPES = {"pbox", "hbox", "gun", "ftur", "tsla", "agun", "sam"}
NO_BUILDABLE_AREA_TYPES = NAVAL_STRUCTURE_TYPES | {"silo", "kenn"}
BUILDING_VARIANT_CHOICES: dict[str, tuple[str, ...]] = {
"barracks": ("tent", "barr"),
"afld": ("afld", "afld.ukraine"),
}
BUILDING_CANONICAL_TYPES: dict[str, str] = {"afld.ukraine": "afld"}
BUILDING_DIMENSIONS: dict[str, tuple[int, int]] = {
"fact": (3, 4),
"powr": (2, 3),
"apwr": (3, 3),
"proc": (3, 4),
"weap": (3, 4),
"barr": (2, 3),
"tent": (2, 3),
"dome": (2, 3),
"atek": (2, 3),
"hpad": (2, 3),
"afld": (3, 2),
"afld.ukraine": (3, 2),
"fix": (3, 3),
"stek": (3, 3),
"spen": (3, 3),
"syrd": (3, 3),
"sam": (2, 1),
"mslo": (2, 1),
"silo": (2, 1),
"kenn": (2, 2),
"pbox": (2, 1),
"hbox": (2, 1),
"gun": (2, 2),
"ftur": (2, 2),
"tsla": (2, 2),
"agun": (2, 2),
"gap": (3, 3),
}
BUILDING_TOPLEFT_OFFSETS: dict[str, tuple[int, int]] = {
"fact": (1, 1),
"powr": (1, 1),
"apwr": (1, 1),
"proc": (1, 1),
"weap": (1, 1),
"barr": (1, 1),
"tent": (1, 1),
"dome": (1, 1),
"atek": (1, 1),
"hpad": (1, 1),
"afld": (1, 1),
"afld.ukraine": (1, 1),
"fix": (1, 1),
"stek": (1, 1),
"spen": (1, 1),
"syrd": (1, 1),
"sam": (1, 0),
"mslo": (1, 0),
"silo": (1, 0),
"kenn": (1, 1),
"pbox": (1, 0),
"hbox": (1, 0),
"gun": (1, 1),
"ftur": (1, 1),
"tsla": (1, 1),
"agun": (1, 1),
"gap": (1, 1),
}
BUILDING_COSTS: dict[str, int] = {
"powr": 300, "apwr": 500, "proc": 1400, "weap": 2000,
"barr": 500, "tent": 500, "kenn": 200,
"dome": 1500, "hpad": 500, "afld": 500, "afld.ukraine": 500,
"fix": 1200, "atek": 1500, "stek": 1500, "silo": 150,
"pbox": 600, "hbox": 750, "gun": 800, "ftur": 600,
"tsla": 1200, "agun": 800, "sam": 700,
"gap": 800, "mslo": 2500, "spen": 800, "syrd": 1000,
}
BUILDING_LIMITS: dict[str, int] = {
"barr": 7, "tent": 7, "dome": 1, "weap": 4, "hpad": 4,
"afld": 4, "atek": 1, "stek": 1, "fix": 1, "kenn": 1,
"mslo": 1, "spen": 1, "syrd": 1,
}
BUILDING_FRACTIONS: dict[str, int] = {
"powr": 1, "proc": 1, "tent": 3, "barr": 3, "kenn": 1,
"dome": 1, "weap": 4, "hpad": 1, "spen": 1, "syrd": 1, "afld": 1,
"pbox": 9, "gun": 9, "ftur": 10, "tsla": 5, "gap": 2,
"fix": 1, "agun": 5, "sam": 1, "atek": 1, "stek": 1,
"mslo": 1,
}
BUILDING_DELAYS: dict[str, int] = {
"dome": 6000, "fix": 3000, "pbox": 1500, "gun": 2000,
"ftur": 1500, "tsla": 2800, "kenn": 7000, "atek": 9000,
"stek": 9000, "spen": 6000, "syrd": 6000,
}
UNIT_COMBAT_POWER: dict[str, int] = {
"e1": 25, "e2": 20, "e3": 45, "e4": 35, "e7": 30, "shok": 55,
"dog": 5, "jeep": 45, "apc": 55, "arty": 110, "v2rl": 120,
"ftrk": 70, "1tnk": 90, "2tnk": 120, "3tnk": 145, "4tnk": 135,
"ttnk": 130, "stnk": 120, "heli": 95, "mh60": 90, "mig": 100,
"yak": 95, "hind": 95, "ss": 90, "msub": 90, "dd": 100,
"ca": 120, "pt": 60,
}
BUILDING_THREAT_POWER: dict[str, int] = {
"pbox": 40, "hbox": 45, "gun": 90, "ftur": 110, "tsla": 160,
"agun": 120, "sam": 70, "fact": 55, "weap": 60, "proc": 55,
"dome": 40, "atek": 45, "stek": 45, "fix": 40,
}
TARGET_BUILDING_PRIORITY: dict[str, int] = {
"fact": 100, "weap": 95, "proc": 90, "dome": 82, "fix": 80,
"atek": 78, "stek": 78, "afld": 75, "afld.ukraine": 75, "hpad": 72,
"barr": 68, "tent": 68, "powr": 62, "apwr": 62, "silo": 58,
"tsla": 56, "agun": 54, "ftur": 52, "gun": 50, "sam": 48,
"pbox": 45, "hbox": 42,
}
TARGET_UNIT_PRIORITY: dict[str, int] = {
"mcv": 100, "harv": 95, "v2rl": 90, "arty": 88, "ftrk": 84,
"4tnk": 82, "3tnk": 80, "2tnk": 76, "1tnk": 72, "ttnk": 78,
"stnk": 78, "apc": 65, "jeep": 62, "shok": 58, "e4": 52,
"e3": 50, "e2": 45, "e1": 40, "dog": 10,
}
# Initial build order — same sequence the C# AI follows in practice.
# Uses "barracks" as a placeholder resolved to tent or barr at runtime.
BUILD_ORDER = ["powr", "barracks", "proc", "weap", "powr"]
STRUCTURE_PRODUCTION_ACTIVE_DELAY = 25
STRUCTURE_PRODUCTION_INACTIVE_DELAY = 125
STRUCTURE_PRODUCTION_RANDOM_BONUS_DELAY = 10
STRUCTURE_PRODUCTION_RESUME_DELAY = 1500
PLACEMENT_ATTEMPT_INTERVAL = 25
PLACEMENT_CONFIRMATION_DELAY = 300
MAX_FAILED_PLACEMENT_ATTEMPTS = 8
BASE_BUILD_MIN_RADIUS = 2
BASE_BUILD_MAX_RADIUS = 20
DEFENSE_BUILD_MIN_RADIUS = 5
DEFENSE_BUILD_MAX_RADIUS = 20
CHECK_FOR_WATER_RADIUS = 8
NAVAL_WATER_SCAN_STRIDE = 1
NAVAL_WATER_SCAN_RADIUS = 12
NAVAL_MIN_OPEN_WATER_WINDOWS = 1
NAVAL_MIN_WATER_SCORE = 20
NAVAL_EARLY_BUILD_WATER_SCORE = 22
NAVAL_EARLY_BUILD_CREDIT_BUFFER = 250
NAVAL_BUILD_MAX_RADIUS = 24
NAVAL_GATE_CACHE_TICKS = 151
RESOURCE_MAP_UPDATE_INTERVAL = 151
RESOURCE_PATCH_LINK_RADIUS = 3
RESOURCE_PATCH_MIN_CELLS = 2
RESOURCE_PATCH_SEARCH_MARGIN = 8
RESOURCE_PATCH_THREAT_RADIUS = 12
RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS = 14
MAX_REFINERIES_PER_PATCH = 2
NAVAL_CANDIDATE_MIN_COUNT = 1
RESOURCE_PATCH_MEMORY_MATCH_RADIUS = 6
RESOURCE_PATCH_MAX_CAPACITY = 6
ATTACK_FORCE_INTERVAL = 75
RUSH_INTERVAL = 600
RUSH_TICKS = 4000
ASSIGN_ROLES_INTERVAL = 50
HARVESTER_SCAN_INTERVAL = 50
UNIT_FEEDBACK_TIME = 30
STALE_TARGET_REACHED_RADIUS = 8
STALE_TARGET_REDIRECT_LIMIT = 3
PRODUCTION_MIN_CASH_REQUIREMENT = 500
# C# UnitBuilderBotModule@normal relies on FeedbackTime plus queue occupancy;
# it does not add extra synthetic cooldowns on top of that.
QUEUE_PRODUCTION_DELAYS: dict[str, int] = {}
UNIT_PRODUCTION_DELAYS: dict[str, int] = {}
FOG_CHANNEL = 4 # spatial-map channel: 0=hidden, 0.5=explored, 1=visible
FRONTIER_REFRESH_TICKS = 650
LAST_SEEN_ENEMY_TTL_TICKS = 6000
LAST_SEEN_BASE_TTL_TICKS = 18000
# RA NormalAI does not configure IdleBaseUnitsMaximum for UnitBuilderBotModule@normal,
# so avoid damping production just because squads are currently idling near base.
QUEUE_IDLE_BASE_CAPS: dict[str, int] = {}
IDLE_BASE_UNIT_RADIUS = 15
AIRFIELD_PLANE_CAPACITY = 4
HELIPAD_AIRCRAFT_CAPACITY = 1
INITIAL_HARVESTERS = 4
MINIMUM_EXCESS_POWER = 0
MAXIMUM_EXCESS_POWER = 200
EXCESS_POWER_INCREMENT = 40
EXCESS_POWER_INCREASE_THRESHOLD = 4
INITIAL_MIN_REFINERY_COUNT = 0
ADDITIONAL_MIN_REFINERY_COUNT = 2
NEW_PRODUCTION_CASH_THRESHOLD = 8000
NEW_PRODUCTION_CHANCE = 50
SILO_BUILD_THRESHOLD = 0.8
PROTECT_UNIT_SCAN_RADIUS = 15
PROTECTION_SCAN_RADIUS = 12
PROTECTION_TARGET_BACKOFF_TICKS = 4
HOME_BASE_THREAT_RADIUS = BASE_BUILD_MAX_RADIUS + PROTECTION_SCAN_RADIUS
REPAIR_ALL_BUILDINGS_COOLDOWN = 107
REPAIR_REACTIVE_HP_THRESHOLD = 0.67
POWER_TOGGLE_INTERVAL = 150
ATTACK_SCAN_RADIUS = 12
REGROUP_RADIUS = 14
LOCAL_FIGHT_RADIUS = 12
RUSH_ATTACK_SCAN_RADIUS = 15
RETREAT_HEALTH_THRESHOLD = 0.42
MINIMUM_CONSTRUCTION_YARD_COUNT = 2
ADDITIONAL_CONSTRUCTION_YARD_COUNT = 0
BUILD_ADDITIONAL_MCV_CASH_AMOUNT = 5000
SCAN_FOR_NEW_MCV_INTERVAL = 20
BUILD_MCV_INTERVAL = 101
MCV_MIN_DEPLOY_RADIUS = 2
MCV_MAX_DEPLOY_RADIUS = 20
MCV_TARGET_REACHED_RADIUS = 2
MCV_TRY_MAINTAIN_RANGE = 8
MCV_FRIENDLY_CONYARD_DISLIKE_RANGE = 14
MCV_FRIENDLY_REFINERY_DISLIKE_RANGE = 14
MCV_DEPLOY_OFFSET = (-1, -1)
EXPANSION_TOLERATE_VALUES = (1, 2)
FORCE_EXPANSION_TOLERATE_VALUES = (2, 3)
# Match the C# McvExpansionManager default MoveConyardTick cadence.
CONYARD_UNDEPLOY_COOLDOWN = 5700
MCV_DEPLOY_COMMAND_COOLDOWN = 800
REQUESTED_REFINERY_TTL = 2400
HARVESTER_THREAT_RADIUS = 10
HARVESTER_RETREAT_COOLDOWN = 120
LOW_EFFECT_HARVESTER_SCAN_INTERVAL = 433
RESOURCE_CELLS_PER_HARVESTER = 4
HARVESTER_PATCH_ASSIGN_RADIUS = 12
HARVESTER_REASSIGN_COOLDOWN = 650
HARVESTER_REASSIGN_REFINERY_RADIUS = 14
HARVESTER_LOW_EFFECT_TIMEOUT = 500
HARVESTER_NO_RESOURCE_COOLDOWN = 300
HARVESTER_PROGRESS_MOVE_THRESHOLD = 2
HARVESTER_LOCAL_RESOURCE_MIN = 0.5
BASE_ATTACK_MEMORY_TICKS = 150
BASE_EMERGENCY_VISIBILITY_RADIUS = 5
ATTACK_POINT_MERGE_RADIUS = 4
POST_CONTACT_WINDOW = 2400
RECOVERY_DURATION = 2600
RECOVERY_TRIGGER_PEAK = 24
RECOVERY_DROP_RATIO = 0.6
RECOVERY_MIN_COMBAT = 16
RECOVERY_EXIT_COMBAT = 24
RECOVERY_HARVESTER_CAP = 2
RECOVERY_CLEAR_CONTACT_GAP = 450
RECOVERY_REFINERY_REBUILD_CREDITS = 2000
HOME_GUARD_MIN_RESERVE = 6
HOME_GUARD_MAX_RESERVE = 10
RUSH_SQUAD_MIN_SIZE = 6
AIR_SQUAD_MIN_SIZE = 2
NAVAL_SQUAD_MIN_SIZE = 2
PROTECTION_SQUAD_MIN_SIZE = 4
RESPOND_TO_ATTACK_COOLDOWN = 30
SQUAD_RETREAT_HOLD_TICKS = 180
SQUAD_RECOVER_HOLD_TICKS = 240
RUSH_COMBAT_TYPES = {"e1", "e3", "apc", "jeep", "1tnk", "2tnk", "3tnk", "arty", "v2rl"}
FORCE_COMMIT_UNIT_THRESHOLD = 30
ECONOMY_ATTACK_HOLD_FORCE = 60
TECH_ATTACK_HOLD_FORCE = 70
FORCE_COMMIT_REGROUPS = 3
FORCE_COMMIT_COOLDOWN = 400
FORCE_COMMIT_TIME = 1200 # fallback wave timing (ticks) to mirror NormalAI periodic pushes
FORCE_COMMIT_MIN_SIZE = 12
FORCE_COMMIT_GLOBAL_INTERVAL = 1200 # hard periodic wave, closer to NormalAI cadence
STALE_TARGET_CLEAR_INTERVAL = 2400 # reduce aggressive stale clearing
SEARCH_TARGET_STALL_TICKS = 3600
SQUAD_STUCK_TICKS = 63
NAVAL_FAILURE_DISABLE_TICKS = 12000
MCV_EXPANSION_MODES = ("resource", "base", "current")
MCV_EXPANSION_MODE_FAILURES = 2
FUZZY_ANY_HEALTH = ("NearDead", "Injured", "Normal")
FUZZY_ANY_POWER = ("Weak", "Equal", "Strong")
FUZZY_ANY_SPEED = ("Slow", "Equal", "Fast")
FUZZY_DEFAULT_RULES = (
(("Normal",), FUZZY_ANY_HEALTH, FUZZY_ANY_POWER, FUZZY_ANY_SPEED, "Attack"),
(("Injured",), ("NearDead",), FUZZY_ANY_POWER, FUZZY_ANY_SPEED, "Attack"),
(("Injured",), ("Injured", "Normal"), ("Equal", "Strong"), FUZZY_ANY_SPEED, "Attack"),
(("Injured",), ("Injured", "Normal"), ("Weak",), ("Slow",), "Attack"),
(("Injured",), ("Injured", "Normal"), ("Weak",), ("Equal", "Fast"), "Flee"),
(("Injured",), FUZZY_ANY_HEALTH, FUZZY_ANY_POWER, ("Slow",), "Attack"),
(("NearDead",), ("NearDead", "Injured"), ("Equal", "Strong"), ("Slow", "Equal"), "Attack"),
(("NearDead",), ("NearDead", "Injured"), ("Weak",), ("Equal", "Fast"), "Flee"),
(("NearDead",), ("Normal",), ("Weak",), ("Equal", "Fast"), "Flee"),
(("NearDead",), ("Normal",), ("Equal", "Strong"), ("Fast",), "Flee"),
(("NearDead",), ("Injured",), ("Equal",), ("Fast",), "Flee"),
)
FUZZY_RUSH_RULES = (
(("Normal",), FUZZY_ANY_HEALTH, ("Strong",), FUZZY_ANY_SPEED, "Attack"),
(("Normal",), FUZZY_ANY_HEALTH, ("Weak", "Equal"), FUZZY_ANY_SPEED, "Flee"),
*FUZZY_DEFAULT_RULES[1:],
)
class NormalAIBot:
"""Python reimplementation of OpenRA's Normal AI."""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.phase = "deploy_mcv"
# Base building
self._build_index = 0
self._placement_count = 0
self._deploy_issued = False
self._last_build_tick: int = -9999
self._next_build_check_tick = 0
self._placement_fail_counts: dict[str, int] = {}
self._placement_pending: dict[str, tuple[str, int, int]] = {}
self._placement_backoff_until: dict[str, int] = {}
self._placement_backoff_snapshot: dict[str, tuple[int, int]] = {}
self._next_placement_attempt_tick: dict[str, int] = {}
self._naval_retry_buildable_count = -1
self._naval_disabled_until = -9999
# Rally
self._rally_set: set[int] = set()
# Squads
self._attack_squad: list[int] = []
self._protection_squad: list[int] = []
self._rush_squad: list[int] = []
self._air_squad: list[int] = []
self._naval_squad: list[int] = []
self._idle_ground_units: list[int] = []
self._temporary_defenders: set[int] = set()
self._last_attack_tick = 0
self._last_attack_eval_tick = -random.randrange(ATTACK_FORCE_INTERVAL) if ATTACK_FORCE_INTERVAL > 0 else 0
rush_jitter = max(1, RUSH_INTERVAL // 20)
self._last_rush_tick = random.randint(-rush_jitter, rush_jitter)
self._last_assign_tick = -random.randrange(ASSIGN_ROLES_INTERVAL) if ASSIGN_ROLES_INTERVAL > 0 else 0
self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS
self._protect_from_actor_id = 0
self._protect_from_kind = "point"
self._protect_from_point: Optional[Tuple[int, int]] = None
self._protect_from_until = -9999
self._respond_to_attack_cooldown_until = -9999
self._assault_threshold = self._roll_assault_threshold()
self._squad_states: dict[str, str] = {
"assault": "assemble",
"protection": "assemble",
"rush": "assemble",
"air": "assemble",
"naval": "assemble",
}
self._squad_state_until: dict[str, int] = {}
self._squad_regroup_count: dict[str, int] = {}
self._squad_last_commit_tick: dict[str, int] = {}
self._squad_target_actor_id: dict[str, int] = {}
self._squad_target_kind: dict[str, str] = {}
self._squad_target_point: dict[str, Tuple[int, int]] = {}
self._enemy_base_pos: Optional[Tuple[int, int]] = None
# Bridge only exposes currently visible enemies; keep a short-lived memory.
self._last_seen_enemy_pos: Optional[Tuple[int, int]] = None
self._last_seen_enemy_tick: int = -9999
self._last_seen_base_pos: Optional[Tuple[int, int]] = None
self._last_seen_base_tick: int = -9999
self._rush_target_pos: Optional[Tuple[int, int]] = None
self._rush_target_actor_id = 0
self._rush_target_kind = "point"
self._stale_attack_target: Optional[Tuple[int, int]] = None
self._stale_attack_redirects = 0
self._search_target: Optional[Tuple[int, int]] = None
self._search_target_started_tick = -9999
self._squad_last_progress_tick: dict[str, int] = {}
self._squad_last_progress_pos: dict[str, Tuple[int, int]] = {}
self._squad_last_target_point: dict[str, Tuple[int, int]] = {}
self._squad_leader_id: dict[str, int] = {}
self._attack_commands_issued = 0
self._attack_move_commands_issued = 0
self._unit_target_events = 0
self._building_target_events = 0
self._unique_unit_targets: set[int] = set()
self._unique_building_targets: set[int] = set()
# Repair / power
self._repair_issued: set[int] = set()
self._reactive_repair_targets: set[int] = set()
self._last_repair_tick = -9999
self._powered_down: dict[int, int] = {}
self._last_power_toggle_tick = -9999
# Economy / production
self._last_harvester_scan_tick = -9999
self._last_harvester_reassign_tick = -9999
self._last_unit_tick = -9999
self._current_queue_index = -1
self._unit_requests: list[str] = []
self._queue_delay_until: dict[str, int] = {}
self._unit_delay_until: dict[str, int] = {}
self._last_mcv_scan_tick = random.randrange(SCAN_FOR_NEW_MCV_INTERVAL) if SCAN_FOR_NEW_MCV_INTERVAL > 0 else 0
self._last_mcv_build_tick = random.randrange(BUILD_MCV_INTERVAL) if BUILD_MCV_INTERVAL > 0 else 0
self._last_conyard_undeploy_tick = random.randrange(CONYARD_UNDEPLOY_COOLDOWN) if CONYARD_UNDEPLOY_COOLDOWN > 0 else 0
self._mcv_targets: dict[int, tuple[int, int]] = {}
self._mcv_resource_targets: dict[int, tuple[int, int]] = {}
self._requested_refineries: dict[int, tuple[tuple[int, int], tuple[int, int], int]] = {}
self._mcv_deploy_until: dict[int, int] = {}
self._harvester_retreat_until: dict[int, int] = {}
self._harvester_recent_damage_until: dict[int, int] = {}
self._harvester_reassign_until: dict[int, int] = {}
self._harvester_patch_targets: dict[int, tuple[int, int]] = {}
self._harvester_last_cells: dict[int, tuple[int, int]] = {}
self._harvester_last_progress_tick: dict[int, int] = {}
self._harvester_no_resource_until: dict[int, int] = {}
self._expansion_refinery_goal = 0
self._expansion_refinery_until_tick = -9999
self._combat_peak = 0
self._last_contact_tick = -9999
self._recovery_until_tick = -9999
# Map
self._cached_map_size: Optional[Tuple[int, int]] = None
self._candidate_targets: list[Tuple[int, int]] = []
self._target_index = 0
self._frontier_cache_tick = -9999
self._recent_attack_points: list[tuple[int, int, int]] = []
self._previous_building_hp: dict[int, float] = {}
self._previous_unit_hp: dict[int, float] = {}
self._spatial_raw: bytes = b""
self._spatial_channels = 0
self._last_spatial_update_tick = -9999
self._resource_patches: list[dict[str, float | int]] = []
self._resource_patch_memory: dict[tuple[int, int], dict[str, float | int]] = {}
self._last_naval_gate_tick = -9999
self._cached_naval_gate_ok = False
self._mcv_expansion_mode = MCV_EXPANSION_MODES[0]
self._mcv_expansion_failures = 0
def decide(self, obs: OpenRAObservation) -> OpenRAAction:
commands: List[CommandModel] = []
self._update_map_size(obs)
self._update_spatial_analysis(obs)
self._update_enemy_memory(obs)
self._update_phase(obs)
self._cleanup_dead(obs)
self._update_damage_memory(obs)
self._update_post_contact_state(obs)
commands.extend(self._handle_placement(obs))
if self.phase == "deploy_mcv":
cmd = self._handle_deploy(obs)
if cmd:
commands.append(cmd)
commands.extend(self._handle_rally_points(obs))
commands.extend(self._manage_power(obs))
commands.extend(self._manage_repairs(obs))
commands.extend(self._manage_economy(obs))
commands.extend(self._manage_expansion(obs))
commands.extend(self._manage_base_building(obs))
commands.extend(self._manage_unit_production(obs))
commands.extend(self._manage_squads(obs))
if not commands:
commands.append(CommandModel(action=ActionType.NO_OP))
return OpenRAAction(commands=commands)
def _update_enemy_memory(self, obs: OpenRAObservation) -> None:
"""Update last-seen enemy memory from currently visible enemies/buildings."""
if not (obs.visible_enemies or obs.visible_enemy_buildings):
return
best = self._pick_priority_target(obs, None, None, local_only=False, squad_name="assault")
if best is None:
# Fallback to any visible contact
if obs.visible_enemy_buildings:
b = obs.visible_enemy_buildings[0]
self._last_seen_enemy_pos = (b.cell_x, b.cell_y)
self._last_seen_enemy_tick = obs.tick
self._last_seen_base_pos = (b.cell_x, b.cell_y)
self._last_seen_base_tick = obs.tick
elif obs.visible_enemies:
e = obs.visible_enemies[0]
self._last_seen_enemy_pos = (e.cell_x, e.cell_y)
self._last_seen_enemy_tick = obs.tick
return
_, tx, ty, _, kind = best
self._last_seen_enemy_pos = (tx, ty)
self._last_seen_enemy_tick = obs.tick
if kind == "building":
self._last_seen_base_pos = (tx, ty)
self._last_seen_base_tick = obs.tick
# ── Phase ─────────────────────────────────────────────────────
def _update_phase(self, obs: OpenRAObservation):
has_cy = any(b.type == "fact" for b in obs.buildings)
if self.phase == "deploy_mcv" and has_cy:
self.phase = "build_base"
self._log("Phase -> build_base")
elif self.phase == "build_base":
has_barracks = any(b.type in BARRACKS_TYPES for b in obs.buildings)
if has_barracks:
self.phase = "produce"
self._log("Phase -> produce")
elif self.phase == "produce":
combat = [u for u in obs.units if u.type in COMBAT_TYPES]
if self._build_index >= len(BUILD_ORDER):
self.phase = "active"
self._log(f"Phase -> active ({len(combat)} combat units)")
# ── Deploy MCV ────────────────────────────────────────────────
def _handle_deploy(self, obs: OpenRAObservation) -> Optional[CommandModel]:
if self._deploy_issued:
return None
mcv = next((u for u in obs.units if u.type == "mcv"), None)
if mcv:
self._deploy_issued = True
self._log(f"Deploying MCV #{mcv.actor_id}")
return CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id)
return None
# ── Building placement ────────────────────────────────────────
def _handle_placement(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
cy = self._find_building(obs, "fact")
if not cy:
return commands
active_queues: set[str] = set()
counts = self._building_counts(obs)
for prod in obs.production:
if self._is_structure_queue(prod.queue_type) and prod.progress >= 0.99:
queue_type = prod.queue_type
active_queues.add(queue_type)
pending = self._placement_pending.get(queue_type)
current_count = counts.get(self._canonical_building_type(prod.item), 0)
if pending is not None:
pending_item, pending_count, pending_tick = pending
if pending_item == prod.item:
if current_count > pending_count:
self._placement_fail_counts[queue_type] = 0
self._placement_pending.pop(queue_type, None)
continue
if obs.tick < pending_tick + PLACEMENT_CONFIRMATION_DELAY:
continue
self._placement_fail_counts[queue_type] = self._placement_fail_counts.get(queue_type, 0) + 1
self._placement_pending.pop(queue_type, None)
elif pending_item != prod.item:
self._placement_pending.pop(queue_type, None)
self._placement_fail_counts.pop(queue_type, None)
if self._queue_backoff_active(queue_type, obs):
if obs.tick >= self._next_placement_attempt_tick.get(queue_type, -9999):
commands.append(CommandModel(action=ActionType.CANCEL_PRODUCTION, item_type=prod.item))
self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL
continue
if self._placement_fail_counts.get(queue_type, 0) >= MAX_FAILED_PLACEMENT_ATTEMPTS:
commands.append(CommandModel(action=ActionType.CANCEL_PRODUCTION, item_type=prod.item))
if self._canonical_building_type(prod.item) in NAVAL_STRUCTURE_TYPES:
self._naval_retry_buildable_count = self._buildable_area_structure_count(obs)
self._naval_disabled_until = obs.tick + NAVAL_FAILURE_DISABLE_TICKS
self._cached_naval_gate_ok = False
self._last_naval_gate_tick = obs.tick
else:
self._placement_backoff_until[queue_type] = obs.tick + STRUCTURE_PRODUCTION_RESUME_DELAY
self._placement_backoff_snapshot[queue_type] = (
len(obs.buildings),
sum(1 for b in obs.buildings if b.type == "fact"),
)
self._placement_fail_counts[queue_type] = 0
self._placement_pending.pop(queue_type, None)
self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL
self._rewind_build_order_after_cancel(obs, prod.item)
if self._canonical_building_type(prod.item) in NAVAL_STRUCTURE_TYPES:
self._log(f"Canceling {prod.item} after repeated placement failures; disabling naval production")
else:
self._log(f"Canceling {prod.item} after repeated placement failures; backing off {queue_type} queue")
continue
if obs.tick < self._next_placement_attempt_tick.get(queue_type, -9999):
continue
location = self._placement_offset(obs, cy, prod.item)
if location is None:
self._placement_fail_counts[queue_type] = self._placement_fail_counts.get(queue_type, 0) + 1
self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL
continue
x, y = location
commands.append(CommandModel(
action=ActionType.PLACE_BUILDING,
item_type=prod.item,
target_x=x, target_y=y,
))
self._placement_count += 1
self._placement_pending[queue_type] = (prod.item, current_count, obs.tick)
self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL
for queue_type in list(self._placement_pending):
if queue_type not in active_queues:
self._placement_pending.pop(queue_type, None)
self._placement_fail_counts.pop(queue_type, None)
return commands
def _placement_offset(
self,
obs: OpenRAObservation,
cy: BuildingInfoModel,
item_type: str,
) -> Optional[Tuple[int, int]]:
center = self._placement_base_center(obs)
if center is None:
center = self._building_top_left(cy)
center = self._placement_anchor(obs, item_type, center)
cx, cy_y = center
queue_type = self._structure_queue_type(item_type)
retry_index = self._placement_fail_counts.get(queue_type, 0)
min_radius = DEFENSE_BUILD_MIN_RADIUS if item_type in ENEMY_FACING_STRUCTURE_TYPES else BASE_BUILD_MIN_RADIUS
max_radius = DEFENSE_BUILD_MAX_RADIUS if item_type in ENEMY_FACING_STRUCTURE_TYPES else BASE_BUILD_MAX_RADIUS
if item_type not in ENEMY_FACING_STRUCTURE_TYPES:
max_radius += min(retry_index * 4, 20)
candidates = self._placement_candidates(obs, item_type, cx, cy_y, min_radius, max_radius)
if not candidates:
return None
if item_type == "proc":
plan = self._best_refinery_plan(obs)
target = plan["target"] if plan is not None else center
refineries = [b for b in obs.buildings if b.type == "proc"]
candidates.sort(
key=lambda p: (
self._resource_amount_at(*p) > 0.0,
-self._local_resource_score(p[0], p[1], 4),
self._cell_distance(p[0], p[1], target[0], target[1]),
-self._nearest_distance_to_buildings(p[0], p[1], refineries),
self._cell_distance(p[0], p[1], cx, cy_y),
)
)
idx = (self._placement_count + retry_index * 5) % min(len(candidates), 16)
if plan is not None and "request_id" in plan:
self._consume_requested_refinery(plan["request_id"]) # type: ignore[arg-type]
return candidates[idx]
if item_type in NAVAL_STRUCTURE_TYPES:
candidates.sort(
key=lambda p: (
-self._local_water_score(p[0], p[1], 2),
self._cell_distance(p[0], p[1], cx, cy_y),
)
)
idx = (self._placement_count + retry_index * 5) % min(len(candidates), 16)
return candidates[idx]
if item_type in ENEMY_FACING_STRUCTURE_TYPES and self._enemy_base_pos is not None:
tx, ty = self._enemy_base_pos
candidates.sort(key=lambda p: ((p[0] - tx) ** 2 + (p[1] - ty) ** 2, (p[0] - cx) ** 2 + (p[1] - cy_y) ** 2))
idx = (self._placement_count + retry_index * 5) % min(len(candidates), 16)
return candidates[idx]
idx = (self._placement_count + retry_index * 7) % len(candidates)
return candidates[idx]
# ── Rally points ──────────────────────────────────────────────
def _handle_rally_points(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
cy = self._find_building(obs, "fact")
if not cy:
return commands
rally_x = cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024
rally_y = cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024
for b in obs.buildings:
if b.type in ("tent", "barr", "weap") and b.actor_id not in self._rally_set:
commands.append(CommandModel(
action=ActionType.SET_RALLY_POINT,
actor_id=b.actor_id,
target_x=rally_x, target_y=rally_y,
))
self._rally_set.add(b.actor_id)
return commands
# ── Base Building (fixed order then dynamic) ──────────────────
def _manage_base_building(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if self.phase == "deploy_mcv":
return commands
if obs.tick < self._next_build_check_tick:
return commands
credits = self._available_credits(obs)
if credits < PRODUCTION_MIN_CASH_REQUIREMENT:
self._schedule_next_build_check(obs, active=False)
return commands
# Phase 1: follow the fixed build order
if self._build_index < len(BUILD_ORDER):
item = self._resolve_build_item(obs, BUILD_ORDER[self._build_index])
if item is None:
self._schedule_next_build_check(obs, active=False)
return commands
if not self._structure_queue_available(obs, item):
self._schedule_next_build_check(obs, active=False)
return commands
if self._structure_queue_busy(obs, item):
self._schedule_next_build_check(obs, active=False)
return commands
if self._already_have(obs, item, self._build_index):
self._build_index += 1
return commands
if self._can_produce(obs, item):
cost = self._build_cost(item)
if credits >= cost:
self._log(
f"Building {item} [{self._build_index+1}/{len(BUILD_ORDER)}] "
f"({self._credits_str(obs)})"
)
commands.append(CommandModel(action=ActionType.BUILD, item_type=item))
self._last_build_tick = obs.tick
self._schedule_next_build_check(obs, active=True)
self._build_index += 1
else:
self._schedule_next_build_check(obs, active=False)
else:
self._schedule_next_build_check(obs, active=False)
return commands
# Phase 2: dynamic base building driven by the normal AI priorities.
remaining_credits = credits
picker = self._choose_recovery_building if self._in_recovery_mode(obs) else self._choose_dynamic_building
for queue_type in ("Building", "Defense"):
item = picker(obs, queue_type=queue_type)
if item is None:
continue
if not self._structure_queue_available(obs, item):
continue
if self._structure_queue_busy(obs, item):
continue
if not self._can_produce(obs, item):
continue
cost = self._build_cost(item)
if remaining_credits < cost:
continue
self._log(f"Building {item} (dynamic, {self._credits_str(obs)})")
commands.append(CommandModel(action=ActionType.BUILD, item_type=item))
remaining_credits -= cost
if item == "proc":
self._clear_expansion_refinery_need()
if commands:
self._last_build_tick = obs.tick
self._schedule_next_build_check(obs, active=True)
else:
self._schedule_next_build_check(obs, active=False)
return commands
def _resolve_build_item(self, obs: OpenRAObservation, placeholder: str) -> Optional[str]:
variants = BUILDING_VARIANT_CHOICES.get(placeholder)
if variants is not None:
for btype in variants:
if self._can_produce(obs, btype):
return btype
return None
return placeholder
def _already_have(self, obs: OpenRAObservation, item: str, idx: int) -> bool:
count = sum(1 for b in obs.buildings if b.type == item)
target = sum(1 for i, p in enumerate(BUILD_ORDER[:idx+1])
if self._resolve_build_item(obs, p) == item)
return count >= target
def _choose_dynamic_building(
self,
obs: OpenRAObservation,
queue_type: str = "Building",
) -> Optional[str]:
bldg_counts = self._building_counts(obs)
credits = self._available_credits(obs)
power_balance = obs.economy.power_provided - obs.economy.power_drained
minimum_excess_power = self._minimum_excess_power_target(obs)
power_item = self._best_power_building(obs)
if queue_type == "Defense":
total_buildings = max(1, len(obs.buildings))
candidates = list(BUILDING_FRACTIONS.keys())
random.shuffle(candidates)
for item in candidates:
if BUILDING_DELAYS.get(item, 0) > obs.tick:
continue
resolved_item = self._resolve_build_item(obs, item)
if resolved_item is None or not self._can_produce(obs, resolved_item):
continue
canonical_item = self._canonical_building_type(resolved_item)
if canonical_item not in DEFENSE_STRUCTURE_TYPES:
continue
if not self._structure_queue_available(obs, resolved_item):
continue
count = bldg_counts.get(canonical_item, 0)
limit = BUILDING_LIMITS.get(canonical_item)
if limit is not None and count >= limit:
continue
if count * 100 > BUILDING_FRACTIONS[item] * total_buildings:
continue
return resolved_item
return None
if power_balance < minimum_excess_power and power_item:
return power_item
naval_item = self._preferred_early_naval_building(obs, credits)
if naval_item is not None:
return naval_item
if not self._has_adequate_refinery_count(obs):
if self._can_produce(obs, "proc"):
return "proc"
return power_item
if (
self._expansion_refinery_pending(obs)
and self._can_produce(obs, "proc")
and self._structure_queue_available(obs, "proc")
):
return "proc"
if (
credits > NEW_PRODUCTION_CASH_THRESHOLD
and random.randrange(100) < NEW_PRODUCTION_CHANCE
):
production = self._best_production_building(obs)
if production:
return production
if (
credits > NEW_PRODUCTION_CASH_THRESHOLD
and random.randrange(100) < NEW_PRODUCTION_CHANCE
and self._can_safely_build_naval_structure(obs)
):
naval_production = self._best_naval_production_building(obs)
if naval_production:
return naval_production
if (
obs.economy.resource_capacity > 0
and obs.economy.ore >= obs.economy.resource_capacity * SILO_BUILD_THRESHOLD
and self._can_produce(obs, "silo")
and self._structure_queue_available(obs, "silo")
):
return "silo"
total_buildings = max(1, len(obs.buildings))
candidates = list(BUILDING_FRACTIONS.keys())
random.shuffle(candidates)
for item in candidates:
if BUILDING_DELAYS.get(item, 0) > obs.tick:
continue
resolved_item = self._resolve_build_item(obs, item)
if resolved_item is None or not self._can_produce(obs, resolved_item):
continue
canonical_item = self._canonical_building_type(resolved_item)
if canonical_item in NAVAL_STRUCTURE_TYPES or canonical_item in DEFENSE_STRUCTURE_TYPES:
continue
if not self._structure_queue_available(obs, resolved_item):
continue
count = bldg_counts.get(canonical_item, 0)
limit = BUILDING_LIMITS.get(canonical_item)
if limit is not None and count >= limit:
continue
if count * 100 > BUILDING_FRACTIONS[item] * total_buildings:
continue
return resolved_item
return None
def _choose_recovery_building(
self,
obs: OpenRAObservation,
queue_type: str = "Building",
) -> Optional[str]:
bldg_counts = self._building_counts(obs)
credits = self._available_credits(obs)
power_balance = obs.economy.power_provided - obs.economy.power_drained
minimum_excess_power = self._minimum_excess_power_target(obs)
power_item = self._best_power_building(obs)
if queue_type == "Defense":
if not self._base_under_pressure(obs):
return None
defense_count = sum(bldg_counts.get(item, 0) for item in ("ftur", "gun", "pbox"))
defense_cap = 1 if self._combat_unit_count(obs) < RECOVERY_EXIT_COMBAT else 2
if defense_count >= defense_cap:
return None
for item in ("ftur", "gun", "pbox"):
if not self._can_produce(obs, item):
continue
if not self._structure_queue_available(obs, item):
continue
limit = BUILDING_LIMITS.get(item)
if limit is not None and bldg_counts.get(item, 0) >= limit:
continue
return item
return None
if power_balance < minimum_excess_power and power_item:
return power_item
refinery_count = bldg_counts.get("proc", 0)
if (
refinery_count == 0
and not self._base_under_pressure(obs)
and credits >= RECOVERY_REFINERY_REBUILD_CREDITS
and self._can_produce(obs, "proc")
and self._structure_queue_available(obs, "proc")
):
return "proc"
if not any(b.type in WAR_FACTORY_TYPES for b in obs.buildings):
if self._can_produce(obs, "weap") and self._structure_queue_available(obs, "weap"):
return "weap"
if not any(b.type in BARRACKS_TYPES for b in obs.buildings):
barracks = self._resolve_build_item(obs, "barracks")
if barracks and self._can_produce(obs, barracks) and self._structure_queue_available(obs, barracks):
return barracks
if self._base_under_pressure(obs):
return power_item if power_balance < 0 and power_item else None
return None
# ── Unit Production ───────────────────────────────────────────
def _manage_unit_production(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if self.phase == "deploy_mcv":
return commands
if obs.tick - self._last_unit_tick < UNIT_FEEDBACK_TIME:
return commands
credits = self._available_credits(obs)
if credits < PRODUCTION_MIN_CASH_REQUIREMENT:
return commands
requested = self._queue_requested_unit(obs)
if requested:
self._last_unit_tick = obs.tick
commands.append(requested)
return commands
if not any(self._is_structure_queue(p.queue_type) for p in obs.production):
structure_reservation = self._priority_structure_reservation(obs)
if structure_reservation > 0 and credits < structure_reservation:
return commands
self._last_unit_tick = obs.tick
for _ in range(len(UNIT_QUEUE_ORDER)):
self._current_queue_index = (self._current_queue_index + 1) % len(UNIT_QUEUE_ORDER)
queue_type, allowed = UNIT_QUEUE_ORDER[self._current_queue_index]
if any(p.queue_type == queue_type for p in obs.production):
continue
if self._queue_delay_active(obs, queue_type):
continue
unit = self._pick_unit(obs, allowed)
if unit:
commands.append(CommandModel(action=ActionType.TRAIN, item_type=unit))
self._mark_unit_trained(obs, unit, queue_type)
break
return commands
def _pick_unit(self, obs: OpenRAObservation, allowed: set[str]) -> Optional[str]:
unit_counts: dict[str, int] = {}
total_units = 0
for u in obs.units:
unit_counts[u.type] = unit_counts.get(u.type, 0) + 1
if u.type in UNITS_TO_BUILD:
total_units += 1
for p in obs.production:
unit_counts[p.item] = unit_counts.get(p.item, 0) + 1
if p.item in UNITS_TO_BUILD:
total_units += 1
for item in self._unit_requests:
unit_counts[item] = unit_counts.get(item, 0) + 1
if item in UNITS_TO_BUILD:
total_units += 1
desired: Optional[str] = None
desired_error = float("inf")
candidates = list(allowed)
random.shuffle(candidates)
for utype in candidates:
if utype not in allowed:
continue
if not self._can_produce(obs, utype):
continue
if self._unit_at_limit(obs, utype):
continue
share = self._desired_unit_share(obs, utype, unit_counts)
if share <= 0:
continue
count = unit_counts.get(utype, 0)
error = (count * 100 / total_units - share) if total_units > 0 else -1
if error < 0:
return utype
if error < desired_error:
desired_error = error
desired = utype
return desired
# ── Economy ──────────────────────────────────────────────────
def _manage_economy(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if self.phase == "deploy_mcv":
return commands
if obs.tick - self._last_harvester_scan_tick < HARVESTER_SCAN_INTERVAL:
return commands
self._last_harvester_scan_tick = obs.tick
reassignment_commands, redirected_harvesters = self._reassign_low_effect_harvesters(obs)
commands.extend(reassignment_commands)
patch_states = self._resource_patch_states(obs)
for u in obs.units:
if u.type != "harv":
continue
self._update_harvester_progress(obs, u)
no_resource_target = self._harvester_patch_targets.get(u.actor_id)
if (
no_resource_target is not None
and self._local_resource_score(no_resource_target[0], no_resource_target[1], 2) <= HARVESTER_LOCAL_RESOURCE_MIN
):
self._harvester_no_resource_until[u.actor_id] = obs.tick + HARVESTER_NO_RESOURCE_COOLDOWN
self._harvester_patch_targets.pop(u.actor_id, None)
if u.actor_id in redirected_harvesters:
continue
recent_damage = self._harvester_recently_damaged(obs, u.actor_id)
threat = self._nearest_enemy_to_unit(obs, u, HARVESTER_THREAT_RADIUS)
if threat is not None:
self._last_contact_tick = obs.tick
current_target = self._harvester_patch_targets.get(u.actor_id)
if current_target is not None:
threatened_state = self._nearest_patch_state(
patch_states,
current_target[0],
current_target[1],
HARVESTER_PATCH_ASSIGN_RADIUS,
allow_fallback=True,
)
if threatened_state is not None and int(threatened_state["threat"]) > 0:
self._harvester_patch_targets.pop(u.actor_id, None)
if threat is not None or recent_damage:
fallback = self._pick_harvester_retreat_point(obs, u, patch_states=patch_states)
if fallback is not None and (
threat is not None
or self._cell_distance(u.cell_x, u.cell_y, fallback[0], fallback[1]) > 2
):
commands.append(CommandModel(
action=ActionType.MOVE,
actor_id=u.actor_id,
target_x=fallback[0],
target_y=fallback[1],
))
self._harvester_retreat_until[u.actor_id] = obs.tick + HARVESTER_RETREAT_COOLDOWN
self._harvester_reassign_until[u.actor_id] = obs.tick + HARVESTER_REASSIGN_COOLDOWN
self._harvester_last_progress_tick[u.actor_id] = obs.tick
continue
if self._is_low_effect_harvester(obs, u):
fallback_target = self._fallback_harvest_target(
obs,
u,
patch_states=patch_states,
prefer_safe=recent_damage,
exclude_target=self._harvester_patch_targets.get(u.actor_id),
)
if fallback_target is not None:
commands.append(CommandModel(
action=ActionType.HARVEST,
actor_id=u.actor_id,
target_x=fallback_target[0],
target_y=fallback_target[1],
))
self._harvester_patch_targets[u.actor_id] = fallback_target
self._harvester_reassign_until[u.actor_id] = obs.tick + HARVESTER_REASSIGN_COOLDOWN
self._harvester_last_progress_tick[u.actor_id] = obs.tick
redirected_harvesters.add(u.actor_id)
continue
self._harvester_no_resource_until[u.actor_id] = obs.tick + HARVESTER_NO_RESOURCE_COOLDOWN
if u.is_idle:
target = self._harvester_patch_targets.get(u.actor_id)
target_state = None
if target is not None:
target_state = self._nearest_patch_state(
patch_states,
target[0],
target[1],
HARVESTER_PATCH_ASSIGN_RADIUS,
allow_fallback=True,
)
if (
target is not None
and self._local_resource_score(target[0], target[1], 2) > HARVESTER_LOCAL_RESOURCE_MIN
and (
target_state is None
or (int(target_state["threat"]) == 0 and float(target_state["depletion_ratio"]) < 0.95)
)
):
commands.append(
CommandModel(
action=ActionType.HARVEST,
actor_id=u.actor_id,
target_x=target[0],
target_y=target[1],
)
)
self._harvester_last_progress_tick[u.actor_id] = obs.tick
else:
self._harvester_patch_targets.pop(u.actor_id, None)
target = self._fallback_harvest_target(
obs,
u,
patch_states=patch_states,
prefer_safe=recent_damage or self._base_under_pressure(obs),
)
if target is not None:
commands.append(
CommandModel(
action=ActionType.HARVEST,
actor_id=u.actor_id,
target_x=target[0],
target_y=target[1],
)
)
self._harvester_patch_targets[u.actor_id] = target
self._harvester_last_progress_tick[u.actor_id] = obs.tick
else:
commands.append(CommandModel(action=ActionType.HARVEST, actor_id=u.actor_id))
self._harvester_last_progress_tick[u.actor_id] = obs.tick
self._ensure_harvester_requests(obs)
return commands
# ── Expansion ────────────────────────────────────────────────
def _manage_expansion(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if self.phase == "deploy_mcv" or self._in_recovery_mode(obs):
return commands
if obs.tick - self._last_mcv_build_tick >= BUILD_MCV_INTERVAL:
self._last_mcv_build_tick = obs.tick
self._ensure_mcv_requests(obs)
if obs.tick - self._last_mcv_scan_tick < SCAN_FOR_NEW_MCV_INTERVAL:
return commands
self._last_mcv_scan_tick = obs.tick
undeploy_command = self._maybe_undeploy_conyard_for_expansion(obs)
if undeploy_command is not None:
commands.append(undeploy_command)
conyards = [b for b in obs.buildings if b.type == "fact"]
active_mcvs = [u for u in obs.units if u.type == "mcv"]
if len(conyards) >= MINIMUM_CONSTRUCTION_YARD_COUNT and not active_mcvs:
return commands
for mcv in active_mcvs:
if obs.tick < self._mcv_deploy_until.get(mcv.actor_id, -9999):
continue
target = self._mcv_targets.get(mcv.actor_id)
resource_target = self._mcv_resource_targets.get(mcv.actor_id)
if target is not None and self._cell_distance(mcv.cell_x, mcv.cell_y, *target) <= MCV_TARGET_REACHED_RADIUS:
if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y):
self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})")
self._remember_requested_refinery(
obs,
mcv.actor_id,
(mcv.cell_x, mcv.cell_y),
resource_target or target,
)
self._remember_expansion_refinery_need(obs)
self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN
commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id))
self._mcv_targets.pop(mcv.actor_id, None)
self._mcv_resource_targets.pop(mcv.actor_id, None)
continue
self._mcv_targets.pop(mcv.actor_id, None)
self._mcv_resource_targets.pop(mcv.actor_id, None)
target = None
if not mcv.is_idle:
continue
if target is None:
expansion_target = self._pick_expansion_target(obs)
if expansion_target is None:
if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y):
self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})")
self._remember_requested_refinery(
obs,
mcv.actor_id,
(mcv.cell_x, mcv.cell_y),
resource_target or (mcv.cell_x, mcv.cell_y),
)
self._remember_expansion_refinery_need(obs)
self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN
commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id))
self._mcv_resource_targets.pop(mcv.actor_id, None)
continue
target = self._best_mcv_deploy_target(obs, mcv, expansion_target)
if target is None:
if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y):
self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})")
self._remember_requested_refinery(
obs,
mcv.actor_id,
(mcv.cell_x, mcv.cell_y),
resource_target or expansion_target,
)
self._remember_expansion_refinery_need(obs)
self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN
commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id))
self._mcv_resource_targets.pop(mcv.actor_id, None)
continue
self._mcv_targets[mcv.actor_id] = target
self._mcv_resource_targets[mcv.actor_id] = expansion_target
self._log(f"Dispatching MCV #{mcv.actor_id} -> {target} for expansion {expansion_target}")
if self._cell_distance(mcv.cell_x, mcv.cell_y, *target) <= MCV_TARGET_REACHED_RADIUS:
if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y):
self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})")
self._remember_requested_refinery(
obs,
mcv.actor_id,
(mcv.cell_x, mcv.cell_y),
resource_target or target,
)
self._remember_expansion_refinery_need(obs)
self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN
commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id))
self._mcv_targets.pop(mcv.actor_id, None)
self._mcv_resource_targets.pop(mcv.actor_id, None)
continue
commands.append(CommandModel(
action=ActionType.MOVE,
actor_id=mcv.actor_id,
target_x=target[0],
target_y=target[1],
))
return commands
def _ensure_harvester_requests(self, obs: OpenRAObservation):
target = self._harvester_target(obs)
current = sum(1 for u in obs.units if u.type == "harv")
current += sum(1 for p in obs.production if p.item == "harv")
current += self._requested_production_count("harv")
if current < target:
self._request_unit_production("harv")
def _hold_attack_for_economy(self, obs: OpenRAObservation) -> bool:
if self._base_under_pressure(obs):
return False
if self._current_requested_refinery(obs) is not None or self._expansion_refinery_pending(obs):
return True
if self._tech_anchor_pending(obs) and self._combat_unit_count(obs) < TECH_ATTACK_HOLD_FORCE:
return True
expanding = bool(self._mcv_targets) or any(u.type == "mcv" for u in obs.units)
expanding = expanding or any(p.item == "mcv" for p in obs.production)
if expanding and self._combat_unit_count(obs) < ECONOMY_ATTACK_HOLD_FORCE:
return True
return False
def _tech_anchor_pending(self, obs: OpenRAObservation) -> bool:
if not self._can_produce(obs, "dome"):
return False
if any(self._canonical_building_type(b.type) == "dome" for b in obs.buildings):
return False
if any(p.item == "dome" for p in obs.production):
return False
return True
def _request_unit_production(self, item_type: str):
if self._requested_production_count(item_type) == 0:
self._unit_requests.append(item_type)
self._log(f"Requesting {item_type} production")
def _requested_production_count(self, item_type: str) -> int:
return sum(1 for item in self._unit_requests if item == item_type)
def _queue_requested_unit(self, obs: OpenRAObservation) -> Optional[CommandModel]:
while self._unit_requests:
item_type = self._unit_requests.pop(0)
if item_type == "harv":
current = self._current_unit_count(obs, "harv")
pending = sum(1 for p in obs.production if p.item == "harv")
queued = self._requested_production_count("harv")
if current + pending + queued >= self._harvester_target(obs):
continue
if self._should_delay_harvester_request(obs, current):
return None
queue_type = self._queue_type_for_unit(item_type)
if queue_type is None:
continue
if item_type not in {"harv", "mcv"} and self._queue_delay_active(obs, queue_type):
return None
if item_type not in {"harv", "mcv"} and self._unit_delay_active(obs, item_type):
return None
if any(p.queue_type == queue_type for p in obs.production):
return None
if not self._can_produce(obs, item_type):
return None
if not self._production_support_available(obs, item_type):
return None
if self._unit_at_limit(obs, item_type):
continue
self._mark_unit_trained(obs, item_type, queue_type)
self._log(f"Training {item_type} (requested)")
return CommandModel(action=ActionType.TRAIN, item_type=item_type)
return None
# ── Squads ────────────────────────────────────────────────────
def _manage_squads(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if self.phase in ("deploy_mcv", "build_base"):
return commands
self._temporary_defenders = set()
if obs.tick - self._last_assign_tick >= ASSIGN_ROLES_INTERVAL:
self._last_assign_tick = obs.tick
self._assign_squad_roles(obs)
commands.extend(self._handle_defense(obs))
if obs.tick - self._last_attack_eval_tick >= ATTACK_FORCE_INTERVAL:
self._last_attack_eval_tick = obs.tick
commands.extend(self._handle_attack(obs))
commands.extend(self._manage_unit_stances(obs))
return commands
def _assign_squad_roles(self, obs: OpenRAObservation):
combat_units = [
u
for u in obs.units
if u.type not in EXCLUDE_FROM_SQUADS and u.type in COMBAT_TYPES
]
air_ids = [u.actor_id for u in combat_units if u.type in AIRCRAFT_TYPES]
naval_ids = [u.actor_id for u in combat_units if u.type in SHIP_TYPES]
ground_units = [u for u in combat_units if u.type not in AIRCRAFT_TYPES | SHIP_TYPES]
ground_by_id = {u.actor_id: u for u in ground_units}
self._air_squad = air_ids
self._naval_squad = naval_ids
# Keep squad membership persistent instead of re-slicing ground units every assign tick.
self._attack_squad = [uid for uid in self._attack_squad if uid in ground_by_id]
self._rush_squad = [uid for uid in self._rush_squad if uid in ground_by_id]
self._idle_ground_units = [
uid for uid in self._idle_ground_units
if uid in ground_by_id and uid not in set(self._attack_squad) and uid not in set(self._rush_squad)
]
self._protection_squad = [
uid for uid in self._protection_squad
if uid in ground_by_id and uid not in set(self._attack_squad) and uid not in set(self._rush_squad)
]
assigned_ground = set(self._attack_squad) | set(self._rush_squad) | set(self._idle_ground_units) | set(self._protection_squad)
unassigned_ground = [u for u in ground_units if u.actor_id not in assigned_ground]
base_center = self._base_center(obs)
if base_center is not None:
unassigned_ground.sort(
key=lambda u: self._cell_distance(u.cell_x, u.cell_y, base_center[0], base_center[1])
)
else:
unassigned_ground.sort(key=lambda u: u.actor_id)
self._idle_ground_units.extend(u.actor_id for u in unassigned_ground)
if self.phase != "active":
return
# OpenRA keeps rush squads separate from assault squads and periodically
# moves all idle base units into the rush squad when the rush trigger fires.
total_ground_troops = len(self._idle_ground_units) + len(self._attack_squad) + len(self._rush_squad) + len(self._protection_squad)
rush_units = [ground_by_id[uid] for uid in self._idle_ground_units if uid in ground_by_id]
hold_for_economy = self._hold_attack_for_economy(obs)
if (
not hold_for_economy
and
not self._base_under_pressure(obs)
and obs.tick - self._last_rush_tick >= RUSH_INTERVAL
and total_ground_troops >= SQUAD_SIZE
and rush_units
):
rush_target = self._select_rush_target(obs, rush_units)
if rush_target is not None:
target_x, target_y, target_actor_id, target_kind = rush_target
launched = list(self._idle_ground_units)
existing_rush = set(self._rush_squad)
self._rush_squad.extend(uid for uid in launched if uid not in existing_rush)
self._idle_ground_units = []
self._last_rush_tick = obs.tick
self._set_squad_target("rush", target_actor_id, target_x, target_y, target_kind)
self._enemy_base_pos = (target_x, target_y)
self._clear_search_target()
self._reset_stale_attack_target()
self._log(f"Launching rush wave ({len(launched)} units) -> {(target_x, target_y)}")
# Launch a fresh assault wave from idle base units once enough have accumulated.
if not self._base_under_pressure(obs):
self._assault_threshold = self._roll_assault_threshold()
if (
not hold_for_economy
and
not self._base_under_pressure(obs)
and len(self._idle_ground_units) >= self._assault_threshold
):
launched = list(self._idle_ground_units)
existing_attack = set(self._attack_squad)
self._attack_squad.extend(uid for uid in launched if uid not in existing_attack)
self._idle_ground_units = []
self._assault_threshold = self._roll_assault_threshold()
self._log(f"Launching assault wave ({len(launched)} units)")
def _squad_units(self, obs: OpenRAObservation, squad_ids: list[int]) -> list[UnitInfoModel]:
alive = {u.actor_id: u for u in obs.units}
return [alive[uid] for uid in squad_ids if uid in alive]
def _set_squad_state(self, squad_name: str, state: str, until: Optional[int] = None):
self._squad_states[squad_name] = state
if until is None:
self._squad_state_until.pop(squad_name, None)
else:
self._squad_state_until[squad_name] = until
def _current_squad_state(self, obs: OpenRAObservation, squad_name: str) -> str:
state = self._squad_states.get(squad_name, "assemble")
hold_until = self._squad_state_until.get(squad_name, -9999)
if state in {"retreat", "recover"} and hold_until > obs.tick:
return state
if state in {"retreat", "recover"} and hold_until <= obs.tick:
self._set_squad_state(squad_name, "assemble")
return "assemble"
return state
def _assemble_squad_commands(
self,
obs: OpenRAObservation,
squad_name: str,
squad_units: list[UnitInfoModel],
) -> List[CommandModel]:
if not squad_units or squad_name == "naval":
return []
anchor = self._base_center(obs)
if anchor is None:
leader = self._select_squad_leader(squad_units)
anchor = (leader.cell_x, leader.cell_y)
commands: list[CommandModel] = []
redirected = 0
for unit in squad_units:
if self._cell_distance(unit.cell_x, unit.cell_y, anchor[0], anchor[1]) <= REGROUP_RADIUS:
continue
commands.append(CommandModel(
action=ActionType.ATTACK_MOVE,
actor_id=unit.actor_id,
target_x=anchor[0],
target_y=anchor[1],
))
redirected += 1
if redirected:
self._log(f"Assembling {squad_name} squad ({redirected}/{len(squad_units)})")
return commands
def _emergency_defense_units(
self,
obs: OpenRAObservation,
needed: int,
threat: Optional[UnitInfoModel] = None,
) -> list[UnitInfoModel]:
if needed <= 0:
return []
alive = {u.actor_id: u for u in obs.units}
reserve_ids = set(self._protection_squad)
candidates = [
alive[uid]
for uid in self._rush_squad + self._attack_squad
if uid in alive
and uid not in reserve_ids
and alive[uid].can_attack
and (
threat is None
or self._cell_distance(alive[uid].cell_x, alive[uid].cell_y, threat.cell_x, threat.cell_y)
<= PROTECT_UNIT_SCAN_RADIUS
)
]
if threat is not None:
base_center = self._base_center(obs) or (threat.cell_x, threat.cell_y)
candidates.sort(
key=lambda u: (
self._cell_distance(u.cell_x, u.cell_y, threat.cell_x, threat.cell_y),
self._cell_distance(u.cell_x, u.cell_y, base_center[0], base_center[1]),
)
)
else:
base_center = self._base_center(obs)
if base_center is not None:
candidates.sort(
key=lambda u: self._cell_distance(u.cell_x, u.cell_y, base_center[0], base_center[1])
)
return candidates[:needed]
def _can_cover_protection_threat(
self,
unit: UnitInfoModel,
threat: UnitInfoModel,
) -> bool:
return self._cell_distance(unit.cell_x, unit.cell_y, threat.cell_x, threat.cell_y) <= PROTECT_UNIT_SCAN_RADIUS
def _manage_unit_stances(self, obs: OpenRAObservation) -> List[CommandModel]:
protection_ids = set(self._protection_squad) | set(self._temporary_defenders)
commands: list[CommandModel] = []
for unit in obs.units:
if unit.type not in COMBAT_TYPES or not unit.can_attack:
continue
desired_stance = STANCE_DEFEND if unit.actor_id in protection_ids else STANCE_ATTACK_ANYTHING
if unit.stance == desired_stance:
continue
commands.append(CommandModel(
action=ActionType.SET_STANCE,
actor_id=unit.actor_id,
target_x=desired_stance,
))
return commands
def _select_ground_squad_leader(
self,
squad_name: str,
squad_units: list[UnitInfoModel],
force_new: bool = False,
) -> UnitInfoModel:
leader_id = self._squad_leader_id.get(squad_name, 0)
if not force_new and leader_id > 0:
for unit in squad_units:
if unit.actor_id == leader_id:
return unit
leader = self._select_squad_leader(squad_units)
self._squad_leader_id[squad_name] = leader.actor_id
return leader
def _unregister_ground_squad(
self,
squad_name: str,
squad_units: list[UnitInfoModel],
) -> None:
unit_ids = [u.actor_id for u in squad_units]
if squad_name == "assault":
self._attack_squad = []
elif squad_name == "rush":
self._rush_squad = []
existing = set(self._idle_ground_units)
self._idle_ground_units.extend(uid for uid in unit_ids if uid not in existing)
self._squad_leader_id.pop(squad_name, None)
self._clear_squad_target(squad_name)
self._set_squad_state(squad_name, "assemble")
def _ground_flee_commands(
self,
obs: OpenRAObservation,
squad_name: str,
squad_units: list[UnitInfoModel],
) -> List[CommandModel]:
fallback = self._random_own_building_cell(obs)
commands: list[CommandModel] = []
if fallback is not None:
tx, ty = fallback
for unit in squad_units:
commands.append(CommandModel(
action=ActionType.MOVE,
actor_id=unit.actor_id,
target_x=tx,
target_y=ty,
))
self._unregister_ground_squad(squad_name, squad_units)
return commands
def _handle_field_squad(
self,
obs: OpenRAObservation,
squad_name: str,
squad_units: list[UnitInfoModel],
minimum_commitment: int,
rush: bool,
) -> List[CommandModel]:
commands: list[CommandModel] = []
if not squad_units:
self._set_squad_state(squad_name, "assemble")
return commands
if squad_name not in self._squad_regroup_count:
self._squad_regroup_count[squad_name] = 0
if squad_name not in self._squad_last_commit_tick:
self._squad_last_commit_tick[squad_name] = -9999
state = self._current_squad_state(obs, squad_name)
literal_ground = squad_name in {"assault", "rush"}
if literal_ground and state in {"retreat", "recover"}:
state = "assemble"
self._set_squad_state(squad_name, "assemble")
leader = (
self._select_ground_squad_leader(squad_name, squad_units)
if literal_ground
else self._select_squad_leader(squad_units)
)
local_enemy_units = self._visible_enemy_units_near(obs, leader.cell_x, leader.cell_y, LOCAL_FIGHT_RADIUS)
local_enemy_buildings = self._visible_enemy_buildings_near(obs, leader.cell_x, leader.cell_y, LOCAL_FIGHT_RADIUS)
if local_enemy_units or local_enemy_buildings:
self._last_contact_tick = obs.tick
self._reset_stale_attack_target()
should_flee = not self._should_take_local_fight(
squad_units,
local_enemy_units,
local_enemy_buildings,
rush=rush or squad_name in {"air", "naval"},
cautious=state == "recover",
squad_name=squad_name,
)
if (
should_flee
and squad_name in {"assault", "rush"}
and state != "assemble"
and self._has_own_building_near(obs, leader.cell_x, leader.cell_y, LOCAL_FIGHT_RADIUS)
):
should_flee = False
if should_flee and literal_ground:
return self._ground_flee_commands(obs, squad_name, squad_units)
if should_flee:
retreat_commands = self._retreat_squad_commands(obs, squad_units, leader)
if retreat_commands:
self._clear_squad_target(squad_name)
self._set_squad_state(squad_name, "retreat", obs.tick + SQUAD_RETREAT_HOLD_TICKS)
self._squad_regroup_count[squad_name] = 0
return retreat_commands
priority_target = self._pick_priority_target(
obs,
leader.cell_x,
leader.cell_y,
local_only=True,
squad_name=squad_name,
)
if priority_target is not None:
self._set_squad_state(squad_name, "commit")
self._set_squad_target(
squad_name,
priority_target[0],
priority_target[1],
priority_target[2],
priority_target[4],
)
if not literal_ground:
focus_commands = self._focus_fire_commands(squad_units, priority_target)
if focus_commands:
return focus_commands
if state == "retreat":
if literal_ground:
return self._ground_flee_commands(obs, squad_name, squad_units)
retreat_commands = self._retreat_squad_commands(obs, squad_units, leader)
if retreat_commands:
return retreat_commands
self._set_squad_state(squad_name, "recover", obs.tick + SQUAD_RECOVER_HOLD_TICKS)
self._squad_regroup_count[squad_name] = 0
if state == "recover" and not (local_enemy_units or local_enemy_buildings):
return self._assemble_squad_commands(obs, squad_name, squad_units)
# Precompute a global force trigger (unconditional periodic wave) to avoid early exit on minimum_commitment
force_global = False
force_commit = False
if squad_name in {"assault", "rush"} and squad_units:
if obs.tick - self._last_attack_tick >= FORCE_COMMIT_GLOBAL_INTERVAL:
force_global = True
force_commit = True
if len(squad_units) < minimum_commitment and not force_global:
self._set_squad_state(squad_name, "assemble")
if squad_name == "assault":
self._assault_threshold = self._roll_assault_threshold()
self._squad_regroup_count[squad_name] = 0
return self._assemble_squad_commands(obs, squad_name, squad_units)
if squad_name == "naval" and not (
local_enemy_units or local_enemy_buildings or obs.visible_enemies or obs.visible_enemy_buildings
):
self._set_squad_state(squad_name, "assemble")
return commands
# Continue evaluating other force-commit paths (regroup/time-based); retain any global trigger from above
if squad_name in {"assault", "rush"}:
regroup_attempts = self._squad_regroup_count.get(squad_name, 0)
last_commit_tick = self._squad_last_commit_tick.get(squad_name, -9999)
time_since_commit = obs.tick - last_commit_tick
if (
regroup_attempts >= FORCE_COMMIT_REGROUPS
and len(squad_units) >= FORCE_COMMIT_UNIT_THRESHOLD
and not local_enemy_units
and not local_enemy_buildings
and time_since_commit >= FORCE_COMMIT_COOLDOWN
and not self._base_under_pressure(obs)
):
force_commit = True
# Timed fallback push toward known target (mirror NormalAI periodic wave)
if (
not force_commit
and len(squad_units) >= max(FORCE_COMMIT_MIN_SIZE, minimum_commitment)
and time_since_commit >= FORCE_COMMIT_TIME
):
force_commit = True
# Hard periodic global wave aligned to NormalAI cadence
if (
not force_commit
# remove size gating to mirror C# periodic pushes
and len(squad_units) >= 1
and obs.tick - self._last_attack_tick >= FORCE_COMMIT_GLOBAL_INTERVAL
):
force_commit = True
force_global = True
regroup_commands = self._regroup_squad_commands(
squad_units,
leader,
regroup_radius=max(1, len(squad_units) // 3) if literal_ground else REGROUP_RADIUS,
min_close_units=len(squad_units) if literal_ground else None,
circular=literal_ground,
)
if regroup_commands and not force_global:
self._set_squad_state(squad_name, "regroup")
self._squad_regroup_count[squad_name] = self._squad_regroup_count.get(squad_name, 0) + 1
return regroup_commands
self._set_squad_state(squad_name, "commit")
target = self._find_attack_target_info(obs, leader.cell_x, leader.cell_y, squad_name=squad_name)
if target is None:
fallback = self._enemy_base_pos or (self._get_map_size()[0] // 2, self._get_map_size()[1] // 2)
tx, ty = fallback
target_actor_id = 0
target_kind = "point"
else:
tx, ty, target_actor_id, target_kind = target
if (
not local_enemy_units
and not local_enemy_buildings
and self._squad_is_stuck(obs, squad_name, leader, (tx, ty))
):
self._log(f"{squad_name.capitalize()} squad stuck near ({tx}, {ty}); clearing target and reevaluating")
self._clear_squad_target(squad_name)
if squad_name == "rush" and self._rush_target_pos == (tx, ty):
self._rush_target_pos = None
self._rush_target_actor_id = 0
self._rush_target_kind = "point"
if self._enemy_base_pos == (tx, ty):
self._enemy_base_pos = None
if target_kind == "search":
self._clear_search_target()
self._reset_stale_attack_target()
tx, ty, target_actor_id, target_kind = self._find_attack_target_info(
obs,
leader.cell_x,
leader.cell_y,
squad_name=squad_name,
)
if squad_name in {"assault", "rush"}:
self._track_stale_attack_target(obs, leader, tx, ty)
if (force_commit or force_global) and squad_name == "assault":
attackers = squad_units
else:
attackers = self._attack_wave_units(obs, squad_units) if squad_name == "assault" else squad_units
direct_attack = self._target_actor_is_visible(obs, target_actor_id, target_kind)
if literal_ground:
direct_attack = False
for unit in attackers:
if direct_attack and (not unit.can_attack or self._busy_attacking(unit)):
continue
commands.append(CommandModel(
action=ActionType.ATTACK if direct_attack else ActionType.ATTACK_MOVE,
actor_id=unit.actor_id,
target_actor_id=target_actor_id if direct_attack else 0,
target_x=tx,
target_y=ty,
))
if commands and not direct_attack:
self._record_attack_issue(
direct_attack=False,
command_count=len(commands),
target_actor_id=target_actor_id,
target_kind=target_kind,
)
if literal_ground:
self._last_attack_tick = obs.tick
self._squad_regroup_count[squad_name] = 0
self._squad_last_commit_tick[squad_name] = obs.tick
elif commands:
self._record_attack_issue(
direct_attack=True,
command_count=len(commands),
target_actor_id=target_actor_id,
target_kind=target_kind,
)
self._log(f"{squad_name.capitalize()} squad: {len(commands)} units attacking {target_kind} at ({tx}, {ty})")
self._last_attack_tick = obs.tick
self._squad_regroup_count[squad_name] = 0
self._squad_last_commit_tick[squad_name] = obs.tick
return commands
def _recruit_protection_units(
self,
obs: OpenRAObservation,
threat: UnitInfoModel,
needed: int,
) -> list[UnitInfoModel]:
if needed <= 0:
return []
alive = {u.actor_id: u for u in obs.units}
candidates = [
alive[uid]
for uid in self._idle_ground_units
if uid in alive
and alive[uid].can_attack
and alive[uid].type not in AIRCRAFT_TYPES | SHIP_TYPES
and self._can_cover_protection_threat(alive[uid], threat)
]
candidates.sort(
key=lambda u: (
self._cell_distance(u.cell_x, u.cell_y, threat.cell_x, threat.cell_y),
self._cell_distance(u.cell_x, u.cell_y, *(self._base_center(obs) or (u.cell_x, u.cell_y))),
)
)
selected = candidates[:needed]
if not selected:
return []
selected_ids = {u.actor_id for u in selected}
existing = set(self._protection_squad)
self._protection_squad.extend(uid for uid in selected_ids if uid not in existing)
self._idle_ground_units = [uid for uid in self._idle_ground_units if uid not in selected_ids]
return selected
def _random_own_building_cell(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]:
if not obs.buildings:
return self._base_center(obs)
building = random.choice(obs.buildings)
return self._actor_cell(building)
def _release_protection_squad(
self,
obs: OpenRAObservation,
squad_units: list[UnitInfoModel],
reason: str,
) -> List[CommandModel]:
fallback = self._random_own_building_cell(obs)
commands: list[CommandModel] = []
if fallback is not None:
tx, ty = fallback
for unit in squad_units:
commands.append(CommandModel(
action=ActionType.MOVE,
actor_id=unit.actor_id,
target_x=tx,
target_y=ty,
))
if squad_units:
self._log(f"Releasing protection squad ({reason})")
self._protection_squad = []
self._temporary_defenders = set()
self._clear_squad_target("protection")
self._clear_protection_response()
self._set_squad_state("protection", "assemble")
self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS
return commands
def _clear_protection_response(self) -> None:
self._protect_from_actor_id = 0
self._protect_from_kind = "point"
self._protect_from_point = None
self._protect_from_until = -9999
def _best_visible_protection_response(
self,
obs: OpenRAObservation,
x: int,
y: int,
) -> Optional[Tuple[int, int, int, str]]:
best: Optional[tuple[tuple[int, int, int], Tuple[int, int, int, str]]] = None
for enemy in self._visible_enemy_units_near(obs, x, y, PROTECT_UNIT_SCAN_RADIUS):
if not enemy.can_attack:
continue
dist = self._cell_distance(x, y, enemy.cell_x, enemy.cell_y)
priority = TARGET_UNIT_PRIORITY.get(enemy.type, 30)
key = (dist, -priority, -int((1.0 - enemy.hp_percent) * 100))
candidate = (enemy.cell_x, enemy.cell_y, enemy.actor_id, "unit")
if best is None or key < best[0]:
best = (key, candidate)
for building in self._visible_enemy_buildings_near(obs, x, y, PROTECT_UNIT_SCAN_RADIUS):
if not self._building_can_attack(building):
continue
bx, by = self._actor_cell(building)
dist = self._cell_distance(x, y, bx, by)
priority = TARGET_BUILDING_PRIORITY.get(building.type, 40)
key = (dist, -priority, -int((1.0 - building.hp_percent) * 100))
candidate = (bx, by, building.actor_id, "building")
if best is None or key < best[0]:
best = (key, candidate)
return best[1] if best is not None else None
def _remember_protection_response(
self,
obs: OpenRAObservation,
x: int,
y: int,
) -> None:
if obs.tick < self._respond_to_attack_cooldown_until:
return
candidate = self._best_visible_protection_response(obs, x, y)
if candidate is None:
return
tx, ty, target_actor_id, target_kind = candidate
self._protect_from_actor_id = target_actor_id
self._protect_from_kind = target_kind
self._protect_from_point = (tx, ty)
self._protect_from_until = obs.tick + 1
self._respond_to_attack_cooldown_until = obs.tick + RESPOND_TO_ATTACK_COOLDOWN
def _current_protection_response(
self,
obs: OpenRAObservation,
) -> Optional[Tuple[int, int, int, str]]:
if obs.tick > self._protect_from_until:
self._clear_protection_response()
return None
if self._protect_from_actor_id > 0:
actor = self._visible_actor_by_target(obs, self._protect_from_actor_id, self._protect_from_kind)
if actor is not None:
tx, ty = self._actor_cell(actor)
self._protect_from_point = (tx, ty)
return tx, ty, self._protect_from_actor_id, self._protect_from_kind
if self._protect_from_point is None:
return None
candidate = self._best_visible_protection_response(obs, self._protect_from_point[0], self._protect_from_point[1])
if candidate is not None:
tx, ty, target_actor_id, target_kind = candidate
self._protect_from_actor_id = target_actor_id
self._protect_from_kind = target_kind
self._protect_from_point = (tx, ty)
return candidate
self._clear_protection_response()
return None
def _retarget_nearby_squads_to_protection(
self,
obs: OpenRAObservation,
tx: int,
ty: int,
target_actor_id: int,
target_kind: str,
) -> None:
for squad_name, squad_ids in (
("assault", self._attack_squad),
("rush", self._rush_squad),
("air", self._air_squad),
("naval", self._naval_squad),
):
squad_units = self._squad_units(obs, squad_ids)
if not squad_units or not any(u.can_attack for u in squad_units):
continue
leader = self._select_squad_leader(squad_units)
if self._cell_distance(leader.cell_x, leader.cell_y, tx, ty) > PROTECT_UNIT_SCAN_RADIUS:
continue
self._set_squad_target(squad_name, target_actor_id, tx, ty, target_kind)
def _find_closest_protection_target(
self,
obs: OpenRAObservation,
leader: UnitInfoModel,
) -> Optional[Tuple[int, int, int, str]]:
best: Optional[Tuple[tuple[int, int, int], Tuple[int, int, int, str]]] = None
for enemy in self._visible_enemy_units_near(obs, leader.cell_x, leader.cell_y, PROTECTION_SCAN_RADIUS):
dist = self._cell_distance(leader.cell_x, leader.cell_y, enemy.cell_x, enemy.cell_y)
priority = TARGET_UNIT_PRIORITY.get(enemy.type, 30 if enemy.can_attack else 10)
key = (dist, -priority, -int((1.0 - enemy.hp_percent) * 100))
candidate = (enemy.cell_x, enemy.cell_y, enemy.actor_id, "unit")
if best is None or key < best[0]:
best = (key, candidate)
for building in self._visible_enemy_buildings_near(obs, leader.cell_x, leader.cell_y, PROTECTION_SCAN_RADIUS):
dist = self._cell_distance(leader.cell_x, leader.cell_y, building.cell_x, building.cell_y)
priority = TARGET_BUILDING_PRIORITY.get(building.type, 40)
key = (dist, -priority, -int((1.0 - building.hp_percent) * 100))
candidate = (building.cell_x, building.cell_y, building.actor_id, "building")
if best is None or key < best[0]:
best = (key, candidate)
return best[1] if best is not None else None
def _pick_protection_threat(
self,
obs: OpenRAObservation,
threat_enemies: list[UnitInfoModel],
) -> Optional[UnitInfoModel]:
if not threat_enemies:
return None
protected_points = self._protected_points(obs)
best: Optional[tuple[tuple[int, int, int, int], UnitInfoModel]] = None
for enemy in threat_enemies:
priority = TARGET_UNIT_PRIORITY.get(enemy.type, 30 if enemy.can_attack else 10)
nearest_protected = min(
(
self._cell_distance(enemy.cell_x, enemy.cell_y, px, py)
for px, py, _ in protected_points
),
default=0,
)
key = (
nearest_protected,
-int(enemy.can_attack),
-priority,
-int((1.0 - enemy.hp_percent) * 100),
)
if best is None or key < best[0]:
best = (key, enemy)
return best[1] if best is not None else None
def _handle_defense(self, obs: OpenRAObservation) -> List[CommandModel]:
response_target = self._current_protection_response(obs)
protection_units = self._squad_units(obs, self._protection_squad)
threat: Optional[UnitInfoModel] = None
if response_target is not None and response_target[3] == "unit":
threat = next((enemy for enemy in obs.visible_enemies if enemy.actor_id == response_target[2]), None)
if threat is not None and response_target is not None:
tx, ty, target_actor_id, target_kind = response_target
self._retarget_nearby_squads_to_protection(obs, tx, ty, target_actor_id, target_kind)
alive_units = {u.actor_id: u for u in obs.units}
idle_defender_count = sum(
1
for uid in self._idle_ground_units
if uid in alive_units
and alive_units[uid].can_attack
and alive_units[uid].type not in AIRCRAFT_TYPES | SHIP_TYPES
and self._can_cover_protection_threat(alive_units[uid], threat)
)
if idle_defender_count > 0:
self._recruit_protection_units(obs, threat, idle_defender_count)
protection_units = self._squad_units(obs, self._protection_squad)
temporary_support: list[UnitInfoModel] = []
self._temporary_defenders = set()
if response_target is None and not protection_units:
self._clear_squad_target("protection")
self._clear_protection_response()
self._set_squad_state("protection", "assemble")
self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS
return []
if not protection_units and not temporary_support:
self._clear_squad_target("protection")
self._set_squad_state("protection", "assemble")
return []
leader = self._select_squad_leader(protection_units or temporary_support)
visible_target = self._get_visible_squad_target_info(obs, "protection")
if response_target is not None:
tx, ty, target_actor_id, target_kind = response_target
self._set_squad_target("protection", target_actor_id, tx, ty, target_kind)
self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS
elif visible_target is not None:
tx, ty, target_actor_id, target_kind = visible_target
self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS
else:
replacement = self._find_closest_protection_target(obs, leader)
if replacement is not None:
tx, ty, target_actor_id, target_kind = replacement
self._set_squad_target("protection", target_actor_id, tx, ty, target_kind)
self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS
else:
target_point = self._squad_target_point.get("protection")
if target_point is None:
return self._release_protection_squad(obs, protection_units, "no target")
if self._protection_backoff < 0:
return self._release_protection_squad(obs, protection_units, "lost target")
tx, ty = target_point
target_actor_id = 0
target_kind = "point"
self._protection_backoff -= 1
self._last_contact_tick = obs.tick
self._set_squad_state("protection", "commit")
commands: list[CommandModel] = []
issued_ids: set[int] = set()
direct_attack = target_actor_id > 0 and target_kind in {"unit", "building"}
for defender in protection_units + temporary_support:
if not defender.can_attack or defender.actor_id in issued_ids:
continue
issued_ids.add(defender.actor_id)
commands.append(CommandModel(
action=ActionType.ATTACK if direct_attack else ActionType.ATTACK_MOVE,
actor_id=defender.actor_id,
target_actor_id=target_actor_id if direct_attack else 0,
target_x=tx,
target_y=ty,
))
if commands:
self._record_attack_issue(
direct_attack=direct_attack,
command_count=len(commands),
target_actor_id=target_actor_id,
target_kind=target_kind,
)
return commands
def _handle_attack(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
commands.extend(
self._handle_field_squad(
obs,
"assault",
[u for u in self._squad_units(obs, self._attack_squad) if u.actor_id not in self._temporary_defenders],
1,
rush=False,
)
)
commands.extend(
self._handle_field_squad(
obs,
"rush",
[u for u in self._squad_units(obs, self._rush_squad) if u.actor_id not in self._temporary_defenders],
1,
rush=True,
)
)
commands.extend(
self._handle_field_squad(
obs,
"air",
[u for u in self._squad_units(obs, self._air_squad) if u.actor_id not in self._temporary_defenders],
AIR_SQUAD_MIN_SIZE,
rush=False,
)
)
commands.extend(
self._handle_field_squad(
obs,
"naval",
[u for u in self._squad_units(obs, self._naval_squad) if u.actor_id not in self._temporary_defenders],
NAVAL_SQUAD_MIN_SIZE,
rush=False,
)
)
return commands
def _find_attack_target_info(
self,
obs: OpenRAObservation,
leader_x: Optional[int],
leader_y: Optional[int],
squad_name: str = "assault",
) -> Tuple[int, int, int, str]:
current_target = self._get_visible_squad_target_info(obs, squad_name)
if current_target is not None:
tx, ty, target_actor_id, target_kind = current_target
if target_kind == "building":
self._enemy_base_pos = (tx, ty)
elif self._enemy_base_pos is None:
self._enemy_base_pos = (tx, ty)
self._clear_search_target()
self._reset_stale_attack_target()
return current_target
target_point = self._squad_target_point.get(squad_name)
target_actor_id = self._squad_target_actor_id.get(squad_name, 0)
if target_point is not None and target_actor_id > 0:
if not self._should_clear_point_target(obs, target_point, leader_x, leader_y):
return target_point[0], target_point[1], 0, "point"
self._clear_squad_target(squad_name)
if squad_name == "rush" and self._rush_target_pos is not None:
if self._should_clear_point_target(obs, self._rush_target_pos, leader_x, leader_y):
self._log(f"Clearing stale rush target {self._rush_target_pos}")
if self._enemy_base_pos == self._rush_target_pos:
self._enemy_base_pos = None
self._rush_target_pos = None
self._rush_target_actor_id = 0
self._rush_target_kind = "point"
self._reset_stale_attack_target()
else:
self._enemy_base_pos = self._rush_target_pos
self._clear_search_target()
return (
self._rush_target_pos[0],
self._rush_target_pos[1],
self._rush_target_actor_id,
self._rush_target_kind,
)
closest = self._pick_closest_visible_target(obs, leader_x, leader_y, squad_name=squad_name)
if closest is not None:
actor_id, tx, ty, _, kind = closest
self._set_squad_target(squad_name, actor_id, tx, ty, kind)
if kind == "building":
self._enemy_base_pos = (tx, ty)
elif self._enemy_base_pos is None:
self._enemy_base_pos = (tx, ty)
self._clear_search_target()
self._reset_stale_attack_target()
return tx, ty, actor_id, kind
if self._enemy_base_pos and self._should_clear_enemy_base_target(obs, leader_x, leader_y):
self._log(f"Clearing stale enemy base target {self._enemy_base_pos}")
if self._rush_target_pos == self._enemy_base_pos:
self._rush_target_pos = None
self._rush_target_actor_id = 0
self._rush_target_kind = "point"
self._enemy_base_pos = None
self._reset_stale_attack_target()
if self._enemy_base_pos:
self._clear_search_target()
self._set_squad_target(squad_name, 0, self._enemy_base_pos[0], self._enemy_base_pos[1], "point")
return self._enemy_base_pos[0], self._enemy_base_pos[1], 0, "point"
# If we don't have a persistent base target, prefer last-seen memory before blind exploration.
if self._last_seen_base_pos is not None and obs.tick - self._last_seen_base_tick <= LAST_SEEN_BASE_TTL_TICKS:
self._clear_search_target()
self._set_squad_target(squad_name, 0, self._last_seen_base_pos[0], self._last_seen_base_pos[1], "point")
return self._last_seen_base_pos[0], self._last_seen_base_pos[1], 0, "point"
if self._last_seen_enemy_pos is not None and obs.tick - self._last_seen_enemy_tick <= LAST_SEEN_ENEMY_TTL_TICKS:
self._clear_search_target()
self._set_squad_target(squad_name, 0, self._last_seen_enemy_pos[0], self._last_seen_enemy_pos[1], "point")
return self._last_seen_enemy_pos[0], self._last_seen_enemy_pos[1], 0, "point"
tx, ty = self._select_search_target(obs, leader_x, leader_y)
self._set_squad_target(squad_name, 0, tx, ty, "search")
return tx, ty, 0, "search"
def _find_attack_target(
self,
obs: OpenRAObservation,
leader_x: Optional[int],
leader_y: Optional[int],
squad_name: str = "assault",
) -> Tuple[int, int]:
tx, ty, _, _ = self._find_attack_target_info(obs, leader_x, leader_y, squad_name=squad_name)
return tx, ty
def _track_stale_attack_target(
self,
obs: OpenRAObservation,
leader: UnitInfoModel,
tx: int,
ty: int,
) -> None:
if obs.visible_enemies or obs.visible_enemy_buildings:
self._reset_stale_attack_target()
return
target = (tx, ty)
reached_target = self._cell_distance(leader.cell_x, leader.cell_y, tx, ty) <= STALE_TARGET_REACHED_RADIUS
if self._stale_attack_target == target:
if reached_target:
self._stale_attack_redirects += 1
else:
self._stale_attack_target = target
self._stale_attack_redirects = 1 if reached_target else 0
def _should_clear_enemy_base_target(
self,
obs: OpenRAObservation,
leader_x: Optional[int],
leader_y: Optional[int],
) -> bool:
return self._enemy_base_pos is not None and self._should_clear_point_target(
obs,
self._enemy_base_pos,
leader_x,
leader_y,
)
def _should_clear_point_target(
self,
obs: OpenRAObservation,
target: Tuple[int, int],
leader_x: Optional[int],
leader_y: Optional[int],
) -> bool:
if obs.visible_enemies or obs.visible_enemy_buildings:
return False
recent_enemy_sighting_tick = max(self._last_seen_enemy_tick, self._last_seen_base_tick)
if obs.tick - recent_enemy_sighting_tick < STALE_TARGET_CLEAR_INTERVAL:
return False
tx, ty = target
if (
leader_x is not None
and leader_y is not None
and self._cell_distance(leader_x, leader_y, tx, ty) <= STALE_TARGET_REACHED_RADIUS
):
# Only clear after we have actually explored the area around the point.
# Otherwise we can drop targets prematurely and re-sweep the same zones.
if self._spatial_raw:
return self._area_is_explored(tx, ty, radius=5, threshold=0.5)
return True
return (
self._stale_attack_target == target
and self._stale_attack_redirects >= STALE_TARGET_REDIRECT_LIMIT
)
def _squad_is_stuck(
self,
obs: OpenRAObservation,
squad_name: str,
leader: UnitInfoModel,
target: Tuple[int, int],
) -> bool:
current_pos = (leader.cell_x, leader.cell_y)
previous_pos = self._squad_last_progress_pos.get(squad_name)
previous_target = self._squad_last_target_point.get(squad_name)
if previous_pos != current_pos or previous_target != target:
self._squad_last_progress_pos[squad_name] = current_pos
self._squad_last_target_point[squad_name] = target
self._squad_last_progress_tick[squad_name] = obs.tick
return False
last_tick = self._squad_last_progress_tick.get(squad_name, obs.tick)
if obs.tick <= last_tick + SQUAD_STUCK_TICKS:
return False
self._squad_last_progress_tick[squad_name] = obs.tick
return True
def _reset_stale_attack_target(self) -> None:
self._stale_attack_target = None
self._stale_attack_redirects = 0
def _clear_search_target(self) -> None:
self._search_target = None
self._search_target_started_tick = -9999
def _set_squad_target(
self,
squad_name: str,
target_actor_id: int,
tx: int,
ty: int,
target_kind: str,
) -> None:
self._squad_target_point[squad_name] = (tx, ty)
if target_actor_id > 0 and target_kind in {"unit", "building"}:
self._squad_target_actor_id[squad_name] = target_actor_id
self._squad_target_kind[squad_name] = target_kind
else:
self._squad_target_actor_id.pop(squad_name, None)
self._squad_target_kind.pop(squad_name, None)
if squad_name == "rush":
self._rush_target_pos = (tx, ty)
self._rush_target_actor_id = target_actor_id if target_kind in {"unit", "building"} else 0
self._rush_target_kind = target_kind
def _clear_squad_target(self, squad_name: str) -> None:
self._squad_target_actor_id.pop(squad_name, None)
self._squad_target_kind.pop(squad_name, None)
self._squad_target_point.pop(squad_name, None)
if squad_name == "rush":
self._rush_target_pos = None
self._rush_target_actor_id = 0
self._rush_target_kind = "point"
def _visible_actor_by_target(
self,
obs: OpenRAObservation,
target_actor_id: int,
target_kind: str,
):
if target_actor_id <= 0:
return None
if target_kind == "unit":
return next((enemy for enemy in obs.visible_enemies if enemy.actor_id == target_actor_id), None)
if target_kind == "building":
return next((building for building in obs.visible_enemy_buildings if building.actor_id == target_actor_id), None)
return None
def _get_visible_squad_target_info(
self,
obs: OpenRAObservation,
squad_name: str,
) -> Optional[Tuple[int, int, int, str]]:
target_actor_id = self._squad_target_actor_id.get(squad_name, 0)
target_kind = self._squad_target_kind.get(squad_name, "point")
actor = self._visible_actor_by_target(obs, target_actor_id, target_kind)
if actor is None:
return None
tx, ty = self._actor_cell(actor)
self._squad_target_point[squad_name] = (tx, ty)
return tx, ty, target_actor_id, target_kind
def _advance_search_target(self, obs: OpenRAObservation) -> Tuple[int, int]:
if not self._candidate_targets:
self._candidate_targets = self._search_grid(obs)
self._target_index = 0
target = self._candidate_targets[self._target_index % len(self._candidate_targets)]
self._target_index = (self._target_index + 1) % len(self._candidate_targets)
self._search_target = target
self._search_target_started_tick = obs.tick
self._log(f"Search target -> {target}")
return target
def _select_search_target(
self,
obs: OpenRAObservation,
leader_x: Optional[int],
leader_y: Optional[int],
) -> Tuple[int, int]:
if self._search_target is None:
return self._advance_search_target(obs)
if (
leader_x is not None
and leader_y is not None
and self._cell_distance(leader_x, leader_y, self._search_target[0], self._search_target[1])
<= STALE_TARGET_REACHED_RADIUS
):
return self._advance_search_target(obs)
if obs.tick - self._search_target_started_tick >= SEARCH_TARGET_STALL_TICKS:
self._log(f"Search target stalled -> rotating from {self._search_target}")
return self._advance_search_target(obs)
return self._search_target
def _spatial_fog(self, x: int, y: int) -> float:
return self._spatial_value(x, y, FOG_CHANNEL, 0.0)
def _area_is_explored(self, x: int, y: int, radius: int = 5, threshold: float = 0.5) -> bool:
"""True if any cell near (x,y) is explored/visible (fog>=0.5)."""
if not self._spatial_raw:
return False
w, h = self._get_map_size()
x0 = max(0, x - radius)
y0 = max(0, y - radius)
x1 = min(w - 1, x + radius)
y1 = min(h - 1, y + radius)
for yy in range(y0, y1 + 1):
for xx in range(x0, x1 + 1):
if self._spatial_fog(xx, yy) >= threshold:
return True
return False
def _search_grid(self, obs: OpenRAObservation) -> list[Tuple[int, int]]:
"""Exploration candidates.
Prefer frontier/unexplored regions when spatial fog exists (channel 4).
Otherwise fall back to a coarse far-from-base grid.
"""
w, h = self._get_map_size()
cy = self._find_building(obs, "fact")
if not cy:
return [(w // 2, h // 2)]
bx = cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024
by = cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024
if self._spatial_raw and obs.tick - self._frontier_cache_tick >= FRONTIER_REFRESH_TICKS:
self._frontier_cache_tick = obs.tick
block = 8
gx_max = max(1, w // block)
gy_max = max(1, h // block)
scored: list[tuple[float, tuple[int, int]]] = []
for gx in range(gx_max):
for gy in range(gy_max):
cx = min(w - 1, gx * block + block // 2)
cyy = min(h - 1, gy * block + block // 2)
if not self._is_passable_cell(cx, cyy):
continue
unseen = 0
frontier = 0
samples = 0
for sx in (0, block // 2, block - 1):
for sy in (0, block // 2, block - 1):
x = gx * block + sx
y = gy * block + sy
if x < 0 or y < 0 or x >= w or y >= h:
continue
samples += 1
fog = self._spatial_fog(x, y)
if fog < 0.25:
unseen += 1
# frontier if adjacent to explored/visible
for dx, dy in ((1, 0), (-1, 0), (0, 1), (0, -1)):
nx, ny = x + dx, y + dy
if 0 <= nx < w and 0 <= ny < h and self._spatial_fog(nx, ny) >= 0.5:
frontier += 1
break
if samples <= 0:
continue
unseen_frac = unseen / samples
frontier_frac = frontier / samples
d2 = (cx - bx) * (cx - bx) + (cyy - by) * (cyy - by)
score = unseen_frac * 3.0 + frontier_frac * 4.0 + (d2 ** 0.5) * 0.01
if d2 < 20 * 20:
score -= 1.5
scored.append((score, (cx, cyy)))
scored.sort(key=lambda t: t[0], reverse=True)
candidates = [p for _, p in scored[:40]]
if candidates:
return candidates
n = 3
cw, ch = max(1, w // n), max(1, h // n)
centers = [(cw * gx + cw // 2, ch * gy + ch // 2)
for gx in range(n) for gy in range(n)]
min_d2 = (min(w, h) // n) ** 2
far = [p for p in centers if (p[0] - bx) ** 2 + (p[1] - by) ** 2 > min_d2]
if not far:
far = [(w // 2, h // 2)]
far.sort(key=lambda p: (p[0] - bx) ** 2 + (p[1] - by) ** 2, reverse=True)
return far
# ── Repairs ───────────────────────────────────────────────────
def _manage_repairs(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
for b in obs.buildings:
if b.hp_percent >= 0.98:
self._repair_issued.discard(b.actor_id)
self._reactive_repair_targets.discard(b.actor_id)
continue
if (
b.actor_id in self._reactive_repair_targets
and not b.is_repairing
and b.actor_id not in self._repair_issued
and self._available_credits(obs) >= 500
):
commands.append(CommandModel(action=ActionType.REPAIR, actor_id=b.actor_id))
self._repair_issued.add(b.actor_id)
self._reactive_repair_targets.discard(b.actor_id)
if obs.tick - self._last_repair_tick < REPAIR_ALL_BUILDINGS_COOLDOWN:
return commands
self._last_repair_tick = obs.tick
for b in obs.buildings:
if (b.hp_percent < 0.75 and not b.is_repairing
and b.actor_id not in self._repair_issued
and self._available_credits(obs) >= 500):
commands.append(CommandModel(action=ActionType.REPAIR, actor_id=b.actor_id))
self._repair_issued.add(b.actor_id)
return commands
# ── Power ─────────────────────────────────────────────────────
def _manage_power(self, obs: OpenRAObservation) -> List[CommandModel]:
commands = []
if obs.tick - self._last_power_toggle_tick < POWER_TOGGLE_INTERVAL:
return commands
self._last_power_toggle_tick = obs.tick
bal = obs.economy.power_provided - obs.economy.power_drained
buildings_by_id = {b.actor_id: b for b in obs.buildings}
self._powered_down = {
actor_id: expected_change
for actor_id, expected_change in self._powered_down.items()
if actor_id in buildings_by_id and not buildings_by_id[actor_id].is_powered
}
if bal > 0 and self._powered_down:
for actor_id, expected_change in sorted(
self._powered_down.items(),
key=lambda item: item[1],
reverse=True,
):
if bal - expected_change < 0:
continue
commands.append(CommandModel(action=ActionType.POWER_DOWN, actor_id=actor_id))
bal -= expected_change
for cmd in commands:
self._powered_down.pop(cmd.actor_id, None)
return commands
if bal < 0:
candidates: list[tuple[int, int]] = []
for b in obs.buildings:
if b.type not in POWER_DOWN_TYPES or not b.is_powered or b.actor_id in self._powered_down:
continue
expected_change = max(0, -b.power_amount)
if expected_change <= 0:
continue
candidates.append((expected_change, b.actor_id))
for expected_change, actor_id in sorted(candidates):
commands.append(CommandModel(action=ActionType.POWER_DOWN, actor_id=actor_id))
self._powered_down[actor_id] = expected_change
bal += expected_change
if bal >= 0:
break
return commands
# ── Cleanup ───────────────────────────────────────────────────
def _cleanup_dead(self, obs: OpenRAObservation):
alive = {u.actor_id for u in obs.units}
self._attack_squad = [uid for uid in self._attack_squad if uid in alive]
self._protection_squad = [uid for uid in self._protection_squad if uid in alive]
self._rush_squad = [uid for uid in self._rush_squad if uid in alive]
self._air_squad = [uid for uid in self._air_squad if uid in alive]
self._naval_squad = [uid for uid in self._naval_squad if uid in alive]
self._idle_ground_units = [uid for uid in self._idle_ground_units if uid in alive]
self._temporary_defenders &= alive
self._squad_regroup_count = {k: v for k, v in self._squad_regroup_count.items() if k in self._squad_states}
self._squad_last_progress_tick = {
squad_name: tick
for squad_name, tick in self._squad_last_progress_tick.items()
if squad_name in self._squad_states
}
self._squad_last_progress_pos = {
squad_name: pos
for squad_name, pos in self._squad_last_progress_pos.items()
if squad_name in self._squad_states
}
self._squad_last_target_point = {
squad_name: target
for squad_name, target in self._squad_last_target_point.items()
if squad_name in self._squad_states
}
self._squad_leader_id = {
squad_name: actor_id
for squad_name, actor_id in self._squad_leader_id.items()
if squad_name in self._squad_states and actor_id in alive
}
self._squad_target_actor_id = {
squad_name: actor_id
for squad_name, actor_id in self._squad_target_actor_id.items()
if squad_name in self._squad_states
}
self._squad_target_kind = {
squad_name: kind
for squad_name, kind in self._squad_target_kind.items()
if squad_name in self._squad_states
}
self._squad_target_point = {
squad_name: point
for squad_name, point in self._squad_target_point.items()
if squad_name in self._squad_states
}
self._previous_unit_hp = {
actor_id: hp
for actor_id, hp in self._previous_unit_hp.items()
if actor_id in alive
}
self._mcv_targets = {
actor_id: target
for actor_id, target in self._mcv_targets.items()
if actor_id in alive
}
self._mcv_resource_targets = {
actor_id: target
for actor_id, target in self._mcv_resource_targets.items()
if actor_id in alive
}
alive_b = {b.actor_id for b in obs.buildings}
self._requested_refineries = {
actor_id: request
for actor_id, request in self._requested_refineries.items()
if request[2] > obs.tick
}
self._mcv_deploy_until = {
actor_id: tick
for actor_id, tick in self._mcv_deploy_until.items()
if actor_id in alive or actor_id in alive_b
if tick > obs.tick
}
self._repair_issued &= alive_b
self._reactive_repair_targets &= alive_b
self._rally_set &= alive_b
self._previous_building_hp = {
actor_id: hp
for actor_id, hp in self._previous_building_hp.items()
if actor_id in alive_b
}
self._powered_down = {
actor_id: expected_change
for actor_id, expected_change in self._powered_down.items()
if actor_id in alive_b
}
self._harvester_retreat_until = {
actor_id: tick
for actor_id, tick in self._harvester_retreat_until.items()
if actor_id in alive
}
self._harvester_recent_damage_until = {
actor_id: tick
for actor_id, tick in self._harvester_recent_damage_until.items()
if actor_id in alive
}
self._harvester_reassign_until = {
actor_id: tick
for actor_id, tick in self._harvester_reassign_until.items()
if actor_id in alive
}
self._harvester_patch_targets = {
actor_id: target
for actor_id, target in self._harvester_patch_targets.items()
if actor_id in alive
}
self._harvester_last_cells = {
actor_id: cell
for actor_id, cell in self._harvester_last_cells.items()
if actor_id in alive
}
self._harvester_last_progress_tick = {
actor_id: tick
for actor_id, tick in self._harvester_last_progress_tick.items()
if actor_id in alive
}
self._harvester_no_resource_until = {
actor_id: tick
for actor_id, tick in self._harvester_no_resource_until.items()
if actor_id in alive
}
self._recent_attack_points = [
(x, y, tick)
for x, y, tick in self._recent_attack_points
if obs.tick - tick <= BASE_ATTACK_MEMORY_TICKS
]
if not self._rush_squad:
self._clear_squad_target("rush")
if not self._attack_squad:
self._clear_squad_target("assault")
def _update_damage_memory(self, obs: OpenRAObservation):
self._recent_attack_points = [
(x, y, tick)
for x, y, tick in self._recent_attack_points
if obs.tick - tick <= BASE_ATTACK_MEMORY_TICKS
]
next_building_hp: dict[int, float] = {}
for building in obs.buildings:
next_building_hp[building.actor_id] = building.hp_percent
previous_hp = self._previous_building_hp.get(building.actor_id)
if previous_hp is not None and building.hp_percent + 1e-6 < previous_hp:
bx, by = self._actor_cell(building)
if self._is_home_base_cell(obs, bx, by):
self._remember_attack_point(bx, by, obs.tick)
if building.type in PROTECTION_TYPES:
self._remember_protection_response(obs, bx, by)
if previous_hp >= REPAIR_REACTIVE_HP_THRESHOLD and building.hp_percent < REPAIR_REACTIVE_HP_THRESHOLD:
self._reactive_repair_targets.add(building.actor_id)
self._previous_building_hp = next_building_hp
next_unit_hp: dict[int, float] = {}
for unit in obs.units:
next_unit_hp[unit.actor_id] = unit.hp_percent
previous_hp = self._previous_unit_hp.get(unit.actor_id)
if previous_hp is not None and unit.hp_percent + 1e-6 < previous_hp:
if unit.type == "mcv" or (
unit.type == "harv" and self._is_local_protection_asset(obs, unit.cell_x, unit.cell_y)
):
self._remember_attack_point(unit.cell_x, unit.cell_y, obs.tick)
self._remember_protection_response(obs, unit.cell_x, unit.cell_y)
if unit.type == "harv":
self._harvester_recent_damage_until[unit.actor_id] = obs.tick + HARVESTER_RETREAT_COOLDOWN
self._previous_unit_hp = next_unit_hp
def _remember_attack_point(self, x: int, y: int, tick: int) -> None:
for idx, (px, py, _) in enumerate(self._recent_attack_points):
if self._cell_distance(x, y, px, py) <= ATTACK_POINT_MERGE_RADIUS:
self._recent_attack_points[idx] = ((px + x) // 2, (py + y) // 2, tick)
return
self._recent_attack_points.append((x, y, tick))
def _update_post_contact_state(self, obs: OpenRAObservation):
combat_count = self._combat_unit_count(obs)
was_recovering = self._in_recovery_mode(obs)
self._combat_peak = max(self._combat_peak, combat_count)
if self._base_under_pressure(obs):
self._last_contact_tick = obs.tick
had_recent_contact = obs.tick - self._last_contact_tick <= POST_CONTACT_WINDOW
collapse_threshold = max(RECOVERY_MIN_COMBAT, int(self._combat_peak * RECOVERY_DROP_RATIO))
if had_recent_contact and self._combat_peak >= RECOVERY_TRIGGER_PEAK and combat_count <= collapse_threshold:
self._recovery_until_tick = max(self._recovery_until_tick, obs.tick + RECOVERY_DURATION)
if (
was_recovering
and combat_count >= max(RECOVERY_EXIT_COMBAT, int(self._combat_peak * 0.75))
and obs.tick - self._last_contact_tick >= RECOVERY_CLEAR_CONTACT_GAP
and not self._base_under_pressure(obs)
):
self._recovery_until_tick = obs.tick
self._combat_peak = combat_count
elif not had_recent_contact and combat_count < RECOVERY_TRIGGER_PEAK:
self._combat_peak = max(combat_count, self._combat_peak - 1)
is_recovering = self._in_recovery_mode(obs)
if not was_recovering and is_recovering:
self._log(f"Recovery mode -> rebuild ({combat_count} combat units, peak {self._combat_peak})")
elif was_recovering and not is_recovering:
self._log(f"Recovery mode -> cleared ({combat_count} combat units)")
# ── Map ───────────────────────────────────────────────────────
def _update_map_size(self, obs: OpenRAObservation):
w, h = obs.map_info.width, obs.map_info.height
if w > 0 and h > 0:
if self._cached_map_size is None:
self._cached_map_size = (w, h)
else:
cw, ch = self._cached_map_size
if w < cw or h < ch:
self._cached_map_size = (w, h)
self._candidate_targets = []
self._target_index = 0
self._clear_search_target()
def _get_map_size(self) -> Tuple[int, int]:
return self._cached_map_size or (128, 128)
def _update_spatial_analysis(self, obs: OpenRAObservation):
if not obs.spatial_map or obs.spatial_channels <= 0:
return
if (
self._spatial_raw
and obs.tick >= self._last_spatial_update_tick
and obs.tick - self._last_spatial_update_tick < RESOURCE_MAP_UPDATE_INTERVAL
):
return
try:
raw = base64.b64decode(obs.spatial_map)
except Exception:
return
w, h = self._get_map_size()
channels = obs.spatial_channels
if w <= 0 or h <= 0 or channels <= 0:
return
self._spatial_raw = raw
self._spatial_channels = channels
self._last_spatial_update_tick = obs.tick
resource_cells: list[tuple[int, int, float]] = []
for y in range(h):
for x in range(w):
base_idx = (y * w + x) * channels
try:
resource = struct.unpack_from("f", raw, (base_idx + 2) * 4)[0]
except struct.error:
continue
if resource > 0:
resource_cells.append((x, y, resource))
self._resource_patches = self._cluster_resource_patches(resource_cells)
self._sync_resource_patch_memory(obs.tick)
def _cluster_resource_patches(
self,
resource_cells: list[tuple[int, int, float]],
) -> list[dict[str, float | int]]:
if not resource_cells:
return []
density_by_cell = {(x, y): density for x, y, density in resource_cells}
unvisited = set(density_by_cell.keys())
patches: list[dict[str, float | int]] = []
while unvisited:
start = unvisited.pop()
queue = [start]
cluster = [(start[0], start[1], density_by_cell[start])]
while queue:
cx, cy = queue.pop()
for dx in range(-RESOURCE_PATCH_LINK_RADIUS, RESOURCE_PATCH_LINK_RADIUS + 1):
for dy in range(-RESOURCE_PATCH_LINK_RADIUS, RESOURCE_PATCH_LINK_RADIUS + 1):
nx, ny = cx + dx, cy + dy
if (nx, ny) not in unvisited:
continue
unvisited.remove((nx, ny))
queue.append((nx, ny))
cluster.append((nx, ny, density_by_cell[(nx, ny)]))
if len(cluster) < RESOURCE_PATCH_MIN_CELLS:
continue
center_x = sum(c[0] for c in cluster) // len(cluster)
center_y = sum(c[1] for c in cluster) // len(cluster)
total_density = sum(c[2] for c in cluster)
resource_center = min(
cluster,
key=lambda c: ((c[0] - center_x) ** 2 + (c[1] - center_y) ** 2, -c[2]),
)
patches.append(
{
"center_x": center_x,
"center_y": center_y,
"resource_center_x": resource_center[0],
"resource_center_y": resource_center[1],
"cells": len(cluster),
"total_density": round(total_density, 1),
}
)
patches.sort(key=lambda p: (int(p["cells"]), float(p["total_density"])), reverse=True)
return patches
def _sync_resource_patch_memory(self, tick: int):
previous = dict(self._resource_patch_memory)
refreshed: dict[tuple[int, int], dict[str, float | int]] = {}
for patch in self._resource_patches:
target = self._patch_target(patch)
match_key: Optional[tuple[int, int]] = None
best_dist = RESOURCE_PATCH_MEMORY_MATCH_RADIUS + 1
for key in previous:
dist = self._cell_distance(target[0], target[1], key[0], key[1])
if dist <= RESOURCE_PATCH_MEMORY_MATCH_RADIUS and dist < best_dist:
match_key = key
best_dist = dist
memory = previous.pop(match_key) if match_key is not None else {}
current_density = float(patch["total_density"])
previous_density = float(memory.get("last_density", current_density))
peak_density = max(
current_density,
previous_density,
float(memory.get("peak_density", current_density)),
)
density_drop_ratio = 0.0
if previous_density > 1e-6 and current_density < previous_density:
density_drop_ratio = (previous_density - current_density) / previous_density
depletion_ratio = 0.0
if peak_density > 1e-6 and current_density < peak_density:
depletion_ratio = (peak_density - current_density) / peak_density
depletion_trend = float(memory.get("depletion_trend", 0.0)) * 0.7 + density_drop_ratio * 0.3
refreshed[target] = {
"last_density": current_density,
"peak_density": peak_density,
"depletion_ratio": max(0.0, min(1.0, depletion_ratio)),
"depletion_trend": max(0.0, min(1.0, depletion_trend)),
"last_tick": tick,
}
self._resource_patch_memory = refreshed
def _patch_memory(self, patch: dict[str, float | int]) -> dict[str, float | int]:
return self._resource_patch_memory.get(self._patch_target(patch), {})
def _nearest_anchor_distance(
self,
x: int,
y: int,
anchors: list[tuple[int, int]],
) -> int:
if not anchors:
return 0
return min(self._cell_distance(x, y, ax, ay) for ax, ay in anchors)
def _resource_patch_capacity(
self,
total_density: float,
cells: int,
refinery_count: int,
depletion_ratio: float,
threat: int,
) -> int:
capacity = max(1, cells // RESOURCE_CELLS_PER_HARVESTER)
if total_density >= cells * 2.0:
capacity += 1
if total_density >= cells * 3.5:
capacity += 1
if refinery_count > 0:
capacity += 1
capacity = min(RESOURCE_PATCH_MAX_CAPACITY, capacity)
floor = 1 if refinery_count > 0 else 0
if depletion_ratio >= 0.55:
capacity = max(floor, capacity - 1)
if threat > 0:
capacity = max(0, capacity - min(threat, 2))
return capacity
def _spatial_value(self, x: int, y: int, channel: int, default: float = 0.0) -> float:
w, h = self._get_map_size()
if (
not self._spatial_raw
or self._spatial_channels <= channel
or x < 0
or y < 0
or x >= w
or y >= h
):
return default
base_idx = (y * w + x) * self._spatial_channels
try:
return struct.unpack_from("f", self._spatial_raw, (base_idx + channel) * 4)[0]
except struct.error:
return default
def _resource_amount_at(self, x: int, y: int) -> float:
return self._spatial_value(x, y, 2, 0.0)
def _terrain_index_at(self, x: int, y: int) -> int:
return int(self._spatial_value(x, y, 0, 0.0))
def _is_passable_cell(self, x: int, y: int) -> bool:
if not self._spatial_raw:
return True
return self._spatial_value(x, y, 3, 1.0) > 0.5
def _is_water_candidate_cell(self, x: int, y: int) -> bool:
if not self._spatial_raw:
return False
# Prefer terrain-index water (common case), but fall back to a human-visible cue:
# large contiguous impassable regions (water) in the passability channel.
if self._terrain_index_at(x, y) in {7, 8}:
return True
passability = self._spatial_value(x, y, 3, 1.0)
if passability > 0.05:
return False
# Reject isolated impassables (cliffs/rocks) by requiring most neighbors
# to also be strongly impassable.
imp = 0
for dx in (-1, 0, 1):
for dy in (-1, 0, 1):
if self._spatial_value(x + dx, y + dy, 3, 1.0) <= 0.05:
imp += 1
return imp >= 8
def _is_open_water_cell(self, x: int, y: int) -> bool:
w, h = self._get_map_size()
if x <= 0 or y <= 0 or x >= w - 1 or y >= h - 1:
return False
for dx in (-1, 0, 1):
for dy in (-1, 0, 1):
if not self._is_water_candidate_cell(x + dx, y + dy):
return False
return True
def _local_resource_score(self, x: int, y: int, radius: int) -> float:
total = 0.0
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
total += self._resource_amount_at(x + dx, y + dy)
return total
def _local_water_score(self, x: int, y: int, radius: int) -> int:
total = 0
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
if self._is_water_candidate_cell(x + dx, y + dy):
total += 1
return total
# ── Helpers ───────────────────────────────────────────────────
def _find_building(self, obs: OpenRAObservation, btype: str) -> Optional[BuildingInfoModel]:
return next((b for b in obs.buildings if b.type == btype), None)
def _is_structure_queue(self, queue_type: str) -> bool:
return queue_type in STRUCTURE_QUEUE_TYPES
def _structure_queue_busy(self, obs: OpenRAObservation, item_type: str) -> bool:
queue_type = self._structure_queue_type(item_type)
return any(p.queue_type == queue_type for p in obs.production)
def _available_credits(self, obs: OpenRAObservation) -> int:
# OpenRA splits spendable funds between liquid cash and stored ore/resources.
return obs.economy.cash + obs.economy.ore
def _combat_unit_count(self, obs: OpenRAObservation) -> int:
return sum(1 for u in obs.units if u.type in COMBAT_TYPES)
def _in_recovery_mode(self, obs: OpenRAObservation) -> bool:
return obs.tick < self._recovery_until_tick
def _base_center(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]:
cy = self._find_building(obs, "fact")
if cy is not None:
return (
cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024,
cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024,
)
if obs.buildings:
b = obs.buildings[0]
return (
b.cell_x if b.cell_x > 0 else b.pos_x // 1024,
b.cell_y if b.cell_y > 0 else b.pos_y // 1024,
)
return None
def _is_home_base_cell(self, obs: OpenRAObservation, x: int, y: int) -> bool:
base_center = self._base_center(obs)
return base_center is None or self._cell_distance(x, y, base_center[0], base_center[1]) <= HOME_BASE_THREAT_RADIUS
def _is_local_protection_asset(self, obs: OpenRAObservation, x: int, y: int) -> bool:
if self._is_home_base_cell(obs, x, y):
return True
anchors = [
b
for b in obs.buildings
if self._canonical_building_type(b.type) in {"fact", "proc"}
and self._is_home_base_cell(
obs,
b.cell_x if b.cell_x > 0 else b.pos_x // 1024,
b.cell_y if b.cell_y > 0 else b.pos_y // 1024,
)
]
if anchors and self._nearest_distance_to_buildings(x, y, anchors) <= PROTECT_UNIT_SCAN_RADIUS:
return True
return False
def _placement_base_center(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]:
cy = self._find_building(obs, "fact")
if cy is not None:
return self._building_top_left(cy)
if obs.buildings:
return self._building_top_left(obs.buildings[0])
return None
def _protected_points(self, obs: OpenRAObservation) -> list[tuple[int, int, int]]:
protected_points: list[tuple[int, int, int]] = []
for b in obs.buildings:
if b.type in PROTECTION_TYPES:
bx = b.cell_x if b.cell_x > 0 else b.pos_x // 1024
by = b.cell_y if b.cell_y > 0 else b.pos_y // 1024
if not self._is_home_base_cell(obs, bx, by):
continue
protected_points.append((bx, by, PROTECTION_SCAN_RADIUS))
for u in obs.units:
if u.type in {"harv", "mcv"} and self._is_local_protection_asset(obs, u.cell_x, u.cell_y):
protected_points.append((u.cell_x, u.cell_y, PROTECT_UNIT_SCAN_RADIUS))
return protected_points
def _base_threat_enemies(self, obs: OpenRAObservation) -> list[UnitInfoModel]:
protected_points = self._protected_points(obs)
emergency_points = [
(x, y, BASE_EMERGENCY_VISIBILITY_RADIUS)
for x, y, tick in self._recent_attack_points
if obs.tick - tick <= BASE_ATTACK_MEMORY_TICKS
]
threat_points = protected_points + emergency_points
if not threat_points:
return []
return [
e for e in obs.visible_enemies
if any(self._cell_distance(e.cell_x, e.cell_y, px, py) <= radius for px, py, radius in threat_points)
]
def _base_under_pressure(self, obs: OpenRAObservation) -> bool:
return bool(self._base_threat_enemies(obs))
def _nearest_enemy_to_unit(
self,
obs: OpenRAObservation,
unit: UnitInfoModel,
radius: int,
) -> Optional[UnitInfoModel]:
nearby = self._visible_enemy_units_near(obs, unit.cell_x, unit.cell_y, radius)
if not nearby:
return None
return min(nearby, key=lambda e: self._cell_distance(unit.cell_x, unit.cell_y, e.cell_x, e.cell_y))
def _pick_harvester_retreat_point(
self,
obs: OpenRAObservation,
harvester: UnitInfoModel,
patch_states: Optional[list[dict[str, object]]] = None,
) -> Optional[Tuple[int, int]]:
refineries = [b for b in obs.buildings if b.type == "proc"]
if patch_states is None:
patch_states = self._resource_patch_states(obs)
threatened_patch = self._nearest_patch_state(
patch_states,
harvester.cell_x,
harvester.cell_y,
HARVESTER_PATCH_ASSIGN_RADIUS,
allow_fallback=True,
)
avoid_target: Optional[tuple[int, int]] = None
if threatened_patch is not None and int(threatened_patch["threat"]) > 0:
avoid_target = threatened_patch["target"] # type: ignore[index]
if refineries:
scored_refineries = sorted(
refineries,
key=lambda b: (
1
if avoid_target is not None and self._cell_distance(
b.cell_x if b.cell_x > 0 else b.pos_x // 1024,
b.cell_y if b.cell_y > 0 else b.pos_y // 1024,
avoid_target[0],
avoid_target[1],
) <= HARVESTER_PATCH_ASSIGN_RADIUS
else 0,
self._cell_distance(
harvester.cell_x,
harvester.cell_y,
b.cell_x if b.cell_x > 0 else b.pos_x // 1024,
b.cell_y if b.cell_y > 0 else b.pos_y // 1024,
),
),
)
best = scored_refineries[0]
return (
best.cell_x if best.cell_x > 0 else best.pos_x // 1024,
best.cell_y if best.cell_y > 0 else best.pos_y // 1024,
)
return self._base_center(obs)
def _harvester_recently_damaged(self, obs: OpenRAObservation, actor_id: int) -> bool:
return obs.tick < self._harvester_recent_damage_until.get(actor_id, -9999)
def _update_harvester_progress(self, obs: OpenRAObservation, harvester: UnitInfoModel):
actor_id = harvester.actor_id
current_cell = (harvester.cell_x, harvester.cell_y)
previous_cell = self._harvester_last_cells.get(actor_id)
patch_target = self._harvester_patch_targets.get(actor_id)
if previous_cell is None:
self._harvester_last_progress_tick[actor_id] = obs.tick
else:
moved = self._cell_distance(current_cell[0], current_cell[1], previous_cell[0], previous_cell[1])
if moved >= HARVESTER_PROGRESS_MOVE_THRESHOLD:
self._harvester_last_progress_tick[actor_id] = obs.tick
if patch_target is not None and self._cell_distance(current_cell[0], current_cell[1], patch_target[0], patch_target[1]) <= 2:
self._harvester_last_progress_tick[actor_id] = obs.tick
self._harvester_last_cells[actor_id] = current_cell
def _is_low_effect_harvester(self, obs: OpenRAObservation, harvester: UnitInfoModel) -> bool:
if harvester.is_idle:
return False
if obs.tick < self._harvester_retreat_until.get(harvester.actor_id, -9999):
return False
if self._harvester_recently_damaged(obs, harvester.actor_id):
return False
if obs.tick < self._harvester_no_resource_until.get(harvester.actor_id, -9999):
return False
last_progress = self._harvester_last_progress_tick.get(harvester.actor_id, obs.tick)
if obs.tick - last_progress < HARVESTER_LOW_EFFECT_TIMEOUT:
return False
patch_target = self._harvester_patch_targets.get(harvester.actor_id)
if patch_target is not None and self._local_resource_score(patch_target[0], patch_target[1], 2) <= HARVESTER_LOCAL_RESOURCE_MIN:
return True
activity = harvester.current_activity.lower()
if "harvest" in activity or "move" in activity or "dock" in activity:
return True
return False
def _fallback_harvest_target(
self,
obs: OpenRAObservation,
harvester: UnitInfoModel,
patch_states: Optional[list[dict[str, object]]] = None,
prefer_safe: bool = False,
exclude_target: Optional[tuple[int, int]] = None,
) -> Optional[tuple[int, int]]:
if patch_states is None:
patch_states = self._resource_patch_states(obs)
scan_origin = self._pick_harvester_retreat_point(obs, harvester, patch_states=patch_states)
if scan_origin is None:
scan_origin = (harvester.cell_x, harvester.cell_y)
current_target = self._harvester_patch_targets.get(harvester.actor_id)
candidates = []
for state in patch_states:
target = state["target"] # type: ignore[index]
if exclude_target is not None and target == exclude_target:
continue
if int(state["capacity"]) <= 0:
continue
if float(state["depletion_ratio"]) >= 0.95:
continue
if self._local_resource_score(target[0], target[1], 2) <= HARVESTER_LOCAL_RESOURCE_MIN:
continue
if prefer_safe and int(state["threat"]) > 0:
continue
candidates.append(state)
if not candidates:
return None
best = max(
candidates,
key=lambda state: (
int(state["score"])
+ (140 if int(state["refinery_count"]) > 0 else 0)
+ (80 if current_target == state["target"] and int(state["threat"]) == 0 and not prefer_safe else 0)
- int(max(0.0, float(state["saturation"]) - 1.0) * 420)
- self._cell_distance(
scan_origin[0],
scan_origin[1],
state["target"][0], # type: ignore[index]
state["target"][1], # type: ignore[index]
) * 18
-self._cell_distance(
harvester.cell_x,
harvester.cell_y,
state["target"][0], # type: ignore[index]
state["target"][1], # type: ignore[index]
) * 6
- int(state["threat"]) * (480 if prefer_safe else 260),
),
)
return best["target"] # type: ignore[return-value]
def _pending_build_cost(self, obs: OpenRAObservation) -> int:
if self._build_index >= len(BUILD_ORDER):
return 0
item = self._resolve_build_item(obs, BUILD_ORDER[self._build_index])
if item is None or self._already_have(obs, item, self._build_index):
return 0
if not self._can_produce(obs, item):
return 0
if not self._structure_queue_available(obs, item):
return 0
return self._build_cost(item)
def _building_counts(self, obs: OpenRAObservation) -> dict[str, int]:
counts: dict[str, int] = {}
for b in obs.buildings:
btype = self._canonical_building_type(b.type)
counts[btype] = counts.get(btype, 0) + 1
return counts
def _canonical_building_type(self, item_type: str) -> str:
return BUILDING_CANONICAL_TYPES.get(item_type, item_type)
def _build_cost(self, item_type: str) -> int:
canonical = self._canonical_building_type(item_type)
return BUILDING_COSTS.get(item_type, BUILDING_COSTS.get(canonical, 500))
def _building_dimensions(self, item_type: str) -> tuple[int, int]:
canonical = self._canonical_building_type(item_type)
return BUILDING_DIMENSIONS.get(item_type, BUILDING_DIMENSIONS.get(canonical, (2, 2)))
def _building_top_left(self, building: BuildingInfoModel) -> tuple[int, int]:
canonical = self._canonical_building_type(building.type)
offset_x, offset_y = BUILDING_TOPLEFT_OFFSETS.get(
building.type,
BUILDING_TOPLEFT_OFFSETS.get(canonical, (0, 0)),
)
return (
(building.cell_x if building.cell_x > 0 else building.pos_x // 1024) - offset_x,
(building.cell_y if building.cell_y > 0 else building.pos_y // 1024) - offset_y,
)
def _occupied_building_cells(self, obs: OpenRAObservation) -> set[tuple[int, int]]:
occupied: set[tuple[int, int]] = set()
for building in obs.buildings:
bx, by = self._building_top_left(building)
width, height = self._building_dimensions(building.type)
for dx in range(width):
for dy in range(height):
occupied.add((bx + dx, by + dy))
return occupied
def _buildable_area_cells(self, obs: OpenRAObservation) -> set[tuple[int, int]]:
cells: set[tuple[int, int]] = set()
for building in obs.buildings:
if self._canonical_building_type(building.type) in NO_BUILDABLE_AREA_TYPES:
continue
bx, by = self._building_top_left(building)
width, height = self._building_dimensions(building.type)
for dx in range(width):
for dy in range(height):
cells.add((bx + dx, by + dy))
return cells
def _buildable_area_structure_count(self, obs: OpenRAObservation) -> int:
return sum(
1
for building in obs.buildings
if self._canonical_building_type(building.type) not in NO_BUILDABLE_AREA_TYPES
)
def _footprint_close_enough_to_base(
self,
top_left_x: int,
top_left_y: int,
width: int,
height: int,
base_cells: set[tuple[int, int]],
radius: int,
) -> bool:
if not base_cells:
return False
max_x = top_left_x + width - 1
max_y = top_left_y + height - 1
for bx, by in base_cells:
dx = 0 if top_left_x <= bx <= max_x else min(abs(bx - top_left_x), abs(bx - max_x))
dy = 0 if top_left_y <= by <= max_y else min(abs(by - top_left_y), abs(by - max_y))
if max(dx, dy) <= radius:
return True
return False
def _candidate_fits_building_footprint(
self,
obs: OpenRAObservation,
item_type: str,
top_left_x: int,
top_left_y: int,
occupied: Optional[set[tuple[int, int]]] = None,
base_cells: Optional[set[tuple[int, int]]] = None,
) -> bool:
width, height = self._building_dimensions(item_type)
w, h = self._get_map_size()
if top_left_x < 0 or top_left_y < 0 or top_left_x + width > w or top_left_y + height > h:
return False
occupied = occupied or self._occupied_building_cells(obs)
is_naval = self._canonical_building_type(item_type) in NAVAL_STRUCTURE_TYPES
if base_cells is None:
base_cells = self._buildable_area_cells(obs)
if is_naval and not self._footprint_close_enough_to_base(
top_left_x, top_left_y, width, height, base_cells, CHECK_FOR_WATER_RADIUS
):
return False
for dx in range(width):
for dy in range(height):
cell = (top_left_x + dx, top_left_y + dy)
if cell in occupied:
return False
if is_naval:
if not self._is_water_candidate_cell(*cell):
return False
else:
if not self._is_passable_cell(*cell):
return False
if self._resource_amount_at(*cell) > 0.0:
return False
return True
def _schedule_next_build_check(self, obs: OpenRAObservation, active: bool):
delay = STRUCTURE_PRODUCTION_ACTIVE_DELAY if active else STRUCTURE_PRODUCTION_INACTIVE_DELAY
random_bonus = random.randrange(STRUCTURE_PRODUCTION_RANDOM_BONUS_DELAY) if STRUCTURE_PRODUCTION_RANDOM_BONUS_DELAY > 0 else 0
self._next_build_check_tick = obs.tick + delay + random_bonus
def _structure_queue_type(self, item_type: str) -> str:
canonical = self._canonical_building_type(item_type)
return "Defense" if canonical in DEFENSE_STRUCTURE_TYPES else "Building"
def _clear_queue_backoff(self, queue_type: str):
self._placement_backoff_until.pop(queue_type, None)
self._placement_backoff_snapshot.pop(queue_type, None)
def _queue_backoff_active(self, queue_type: str, obs: OpenRAObservation) -> bool:
until = self._placement_backoff_until.get(queue_type, -9999)
if until <= obs.tick:
self._clear_queue_backoff(queue_type)
return False
snapshot = self._placement_backoff_snapshot.get(queue_type)
if snapshot is not None:
prev_buildings, prev_conyards = snapshot
current_conyards = sum(1 for b in obs.buildings if b.type == "fact")
if len(obs.buildings) < prev_buildings or current_conyards > prev_conyards:
self._clear_queue_backoff(queue_type)
return False
return True
def _structure_queue_available(self, obs: OpenRAObservation, item_type: str) -> bool:
canonical = self._canonical_building_type(item_type)
if canonical in NAVAL_STRUCTURE_TYPES and not self._can_safely_build_naval_structure(obs):
return False
return not self._queue_backoff_active(self._structure_queue_type(canonical), obs)
def _priority_structure_reservation(self, obs: OpenRAObservation) -> int:
if self._build_index < len(BUILD_ORDER):
return self._pending_build_cost(obs)
power_balance = obs.economy.power_provided - obs.economy.power_drained
minimum_excess_power = self._minimum_excess_power_target(obs)
power_item = self._best_power_building(obs)
if power_balance < minimum_excess_power and power_item and self._structure_queue_available(obs, power_item):
return self._build_cost(power_item)
if not self._has_adequate_refinery_count(obs) and self._can_produce(obs, "proc") and self._structure_queue_available(obs, "proc"):
return self._build_cost("proc")
if self._expansion_refinery_pending(obs) and self._can_produce(obs, "proc") and self._structure_queue_available(obs, "proc"):
return self._build_cost("proc")
if self._in_recovery_mode(obs):
bldg_counts = self._building_counts(obs)
if (
bldg_counts.get("proc", 0) == 0
and not self._base_under_pressure(obs)
and self._can_produce(obs, "proc")
and self._structure_queue_available(obs, "proc")
):
return self._build_cost("proc")
return 0
def _rewind_build_order_after_cancel(self, obs: OpenRAObservation, item_type: str):
canonical = self._canonical_building_type(item_type)
for idx, placeholder in enumerate(BUILD_ORDER):
resolved = self._resolve_build_item(obs, placeholder)
if resolved is None:
continue
if self._canonical_building_type(resolved) != canonical:
continue
existing = self._building_counts(obs).get(canonical, 0)
required = sum(
1
for p in BUILD_ORDER[: idx + 1]
if (rp := self._resolve_build_item(obs, p)) is not None and self._canonical_building_type(rp) == canonical
)
if existing < required:
self._build_index = min(self._build_index, idx)
return
def _placement_anchor(
self,
obs: OpenRAObservation,
item_type: str,
fallback: Tuple[int, int],
) -> Tuple[int, int]:
if item_type == "proc":
plan = self._best_refinery_plan(obs)
if plan is not None:
return plan["anchor"]
return fallback
def _placement_candidates(
self,
obs: OpenRAObservation,
item_type: str,
cx: int,
cy: int,
min_radius: int,
max_radius: int,
) -> list[tuple[int, int]]:
if item_type in NAVAL_STRUCTURE_TYPES:
naval_candidates = self._naval_build_candidates(obs, item_type)
if naval_candidates:
return naval_candidates
occupied = self._occupied_building_cells(obs)
buildable_area = self._buildable_area_cells(obs)
candidates: list[tuple[int, int]] = []
w, h = self._get_map_size()
for radius in range(min_radius, max_radius + 1):
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
if max(abs(dx), abs(dy)) != radius:
continue
x = cx + dx
y = cy + dy
if x < 0 or y < 0 or x >= w or y >= h:
continue
candidates.append((x, y))
fitting = [
c for c in candidates
if self._candidate_fits_building_footprint(obs, item_type, c[0], c[1], occupied, buildable_area)
]
if fitting:
candidates = fitting
return candidates
def _resource_patch_threat(self, obs: OpenRAObservation, patch: dict[str, float | int]) -> int:
px = int(patch.get("resource_center_x", patch["center_x"]))
py = int(patch.get("resource_center_y", patch["center_y"]))
enemies = 0
for enemy in obs.visible_enemies:
if self._cell_distance(enemy.cell_x, enemy.cell_y, px, py) <= RESOURCE_PATCH_THREAT_RADIUS:
enemies += 1
for building in obs.visible_enemy_buildings:
if self._cell_distance(building.cell_x, building.cell_y, px, py) <= RESOURCE_PATCH_THREAT_RADIUS:
enemies += 2
return enemies
def _patch_target(self, patch: dict[str, float | int]) -> tuple[int, int]:
return (
int(patch.get("resource_center_x", patch["center_x"])),
int(patch.get("resource_center_y", patch["center_y"])),
)
def _nearest_patch_state(
self,
patch_states: list[dict[str, object]],
x: int,
y: int,
radius: int,
allow_fallback: bool = False,
) -> Optional[dict[str, object]]:
best: Optional[dict[str, object]] = None
best_dist = radius + 1
for state in patch_states:
tx, ty = state["target"] # type: ignore[index]
dist = self._cell_distance(x, y, tx, ty)
if dist <= radius and dist < best_dist:
best = state
best_dist = dist
if best is None and allow_fallback and patch_states:
best = min(
patch_states,
key=lambda state: self._cell_distance(x, y, state["target"][0], state["target"][1]), # type: ignore[index]
)
return best
def _resource_patch_states(self, obs: OpenRAObservation) -> list[dict[str, object]]:
patch_states: list[dict[str, object]] = []
refineries = [b for b in obs.buildings if b.type == "proc"]
conyards = [b for b in obs.buildings if b.type == "fact"]
base_center = self._base_center(obs)
anchors = [
(
conyard.cell_x if conyard.cell_x > 0 else conyard.pos_x // 1024,
conyard.cell_y if conyard.cell_y > 0 else conyard.pos_y // 1024,
)
for conyard in conyards
]
if not anchors and base_center is not None:
anchors = [base_center]
for idx, patch in enumerate(self._resource_patches):
target = self._patch_target(patch)
memory = self._patch_memory(patch)
nearest_refinery_distance = self._nearest_distance_to_buildings(target[0], target[1], refineries)
nearby_refineries = sum(
1
for refinery in refineries
if self._cell_distance(
target[0],
target[1],
refinery.cell_x if refinery.cell_x > 0 else refinery.pos_x // 1024,
refinery.cell_y if refinery.cell_y > 0 else refinery.pos_y // 1024,
)
<= RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS
)
anchor_distance = self._nearest_anchor_distance(target[0], target[1], anchors)
base_distance = (
self._cell_distance(target[0], target[1], base_center[0], base_center[1])
if base_center is not None
else anchor_distance
)
patch_states.append(
{
"id": idx,
"patch": patch,
"target": target,
"harvesters": [],
"harvester_count": 0,
"refinery_count": 0,
"nearby_refineries": nearby_refineries,
"nearest_refinery_distance": nearest_refinery_distance,
"anchor_distance": anchor_distance,
"base_distance": base_distance,
"threat": self._resource_patch_threat(obs, patch),
"depletion_ratio": float(memory.get("depletion_ratio", 0.0)),
"depletion_trend": float(memory.get("depletion_trend", 0.0)),
"travel_cost": 0,
"capacity": 0,
"saturation": 0.0,
"lack": 0,
"density_score": 0,
"score": 0,
"refinery_score": 0,
"expansion_score": 0,
}
)
if not patch_states:
return patch_states
for refinery in refineries:
rx = refinery.cell_x if refinery.cell_x > 0 else refinery.pos_x // 1024
ry = refinery.cell_y if refinery.cell_y > 0 else refinery.pos_y // 1024
state = self._nearest_patch_state(
patch_states,
rx,
ry,
HARVESTER_REASSIGN_REFINERY_RADIUS,
allow_fallback=True,
)
if state is not None:
state["refinery_count"] = int(state["refinery_count"]) + 1
state["nearest_refinery_distance"] = min(
int(state["nearest_refinery_distance"]),
self._cell_distance(rx, ry, state["target"][0], state["target"][1]), # type: ignore[index]
)
for harvester in [u for u in obs.units if u.type == "harv"]:
state = self._nearest_patch_state(
patch_states,
harvester.cell_x,
harvester.cell_y,
HARVESTER_PATCH_ASSIGN_RADIUS,
allow_fallback=True,
)
if state is None:
forced_target = self._harvester_patch_targets.get(harvester.actor_id)
if forced_target is not None:
state = self._nearest_patch_state(
patch_states,
forced_target[0],
forced_target[1],
HARVESTER_PATCH_ASSIGN_RADIUS * 2,
allow_fallback=True,
)
if state is not None:
state["harvesters"].append(harvester) # type: ignore[index]
for state in patch_states:
patch = state["patch"] # type: ignore[assignment]
harvester_count = len(state["harvesters"]) # type: ignore[arg-type]
cells = int(patch["cells"]) # type: ignore[index]
total_density = float(patch["total_density"]) # type: ignore[index]
threat = int(state["threat"])
refinery_count = int(state["refinery_count"])
nearby_refineries = int(state["nearby_refineries"])
nearest_refinery_distance = min(
int(state["nearest_refinery_distance"]),
RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS * 3,
)
anchor_distance = int(state["anchor_distance"])
base_distance = int(state["base_distance"])
depletion_ratio = float(state["depletion_ratio"])
depletion_trend = float(state["depletion_trend"])
capacity = self._resource_patch_capacity(
total_density,
cells,
refinery_count,
depletion_ratio,
threat,
)
if capacity > harvester_count:
lack = capacity - harvester_count
elif capacity < harvester_count:
lack = -(harvester_count - capacity)
else:
lack = 0
if refinery_count <= 0 and lack > 0:
lack = min(lack, 1)
if threat > 0 and lack > 0:
lack = 0
saturation = harvester_count / max(1, capacity) if capacity > 0 else float(harvester_count)
travel_cost = nearest_refinery_distance if refineries else anchor_distance + 8
if refinery_count <= 0 and refineries:
travel_cost = min(travel_cost + 4, anchor_distance + 10)
density_score = int(total_density * 8) + cells * 24
depletion_penalty = int(depletion_ratio * 400) + int(depletion_trend * 320)
support_bonus = refinery_count * 350
if refineries:
support_bonus += max(
0,
RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS
- min(nearest_refinery_distance, RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS),
) * 14
score = density_score + support_bonus
score -= threat * 240
score -= travel_cost * 12
score -= depletion_penalty
if harvester_count == 0 and refinery_count > 0 and threat == 0:
score += 90
if saturation > 1.0:
score -= int((saturation - 1.0) * 300)
refinery_score = density_score
refinery_score -= anchor_distance * 16
refinery_score -= nearby_refineries * 600
refinery_score -= threat * 220
refinery_score -= depletion_penalty
if nearby_refineries == 0:
refinery_score += 250
if refineries:
refinery_score += min(nearest_refinery_distance, RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS) * 20
if threat == 0 and saturation >= 0.75:
refinery_score += 80
expansion_score = density_score
expansion_score -= base_distance * 6
expansion_score -= threat * 240
expansion_score -= depletion_penalty
if nearby_refineries == 0:
expansion_score += 120
if anchor_distance > MCV_FRIENDLY_CONYARD_DISLIKE_RANGE:
expansion_score += 100
if nearest_refinery_distance > MCV_FRIENDLY_REFINERY_DISLIKE_RANGE:
expansion_score += 80
if saturation >= 1.0 and threat == 0:
expansion_score += 60
state["harvester_count"] = harvester_count
state["capacity"] = capacity
state["saturation"] = saturation
state["travel_cost"] = travel_cost
state["lack"] = lack
state["density_score"] = density_score
state["score"] = score
state["refinery_score"] = refinery_score
state["expansion_score"] = expansion_score
return patch_states
def _can_reassign_harvester(self, obs: OpenRAObservation, harvester: UnitInfoModel) -> bool:
if obs.tick < self._harvester_retreat_until.get(harvester.actor_id, -9999):
return False
if obs.tick < self._harvester_reassign_until.get(harvester.actor_id, -9999):
return False
if self._harvester_recently_damaged(obs, harvester.actor_id):
return False
if obs.tick < self._harvester_no_resource_until.get(harvester.actor_id, -9999):
return False
if self._nearest_enemy_to_unit(obs, harvester, HARVESTER_THREAT_RADIUS) is not None:
return False
activity = harvester.current_activity.lower()
if "dock" in activity:
return False
return True
def _reassign_low_effect_harvesters(
self,
obs: OpenRAObservation,
) -> tuple[list[CommandModel], set[int]]:
if obs.tick - self._last_harvester_reassign_tick < LOW_EFFECT_HARVESTER_SCAN_INTERVAL:
return [], set()
self._last_harvester_reassign_tick = obs.tick
if self._base_under_pressure(obs) or len(self._resource_patches) < 2:
return [], set()
patch_states = self._resource_patch_states(obs)
donors = [state for state in patch_states if int(state["lack"]) < 0]
receivers = [
state
for state in patch_states
if int(state["lack"]) > 0 and int(state["threat"]) == 0
]
if not donors or not receivers:
fallback_donors = [
state
for state in patch_states
if int(state["harvester_count"]) > 1 and int(state["threat"]) == 0 and float(state["saturation"]) >= 1.0
]
if not receivers or not fallback_donors:
return [], set()
best_receiver = max(receivers, key=lambda state: int(state["score"]))
donors = [
state
for state in fallback_donors
if int(best_receiver["score"]) > int(state["score"]) + 600
]
if not donors:
return [], set()
for donor in donors:
donor["lack"] = min(int(donor["lack"]), -1)
donors.sort(key=lambda state: int(state["lack"]))
receivers.sort(
key=lambda state: (
int(state["score"]),
int(state["lack"]),
-int(state["travel_cost"]),
),
reverse=True,
)
commands: list[CommandModel] = []
redirected: set[int] = set()
for receiver in receivers:
need = int(receiver["lack"])
if int(receiver["refinery_count"]) <= 0:
need = min(need, 1)
if need <= 0:
continue
tx, ty = receiver["target"] # type: ignore[index]
for donor in donors:
if need <= 0 or int(donor["lack"]) >= 0:
continue
harvesters = sorted(
donor["harvesters"], # type: ignore[index]
key=lambda u: self._cell_distance(u.cell_x, u.cell_y, tx, ty),
)
for harvester in harvesters:
if need <= 0 or int(donor["lack"]) >= 0:
break
if harvester.actor_id in redirected:
continue
if not self._can_reassign_harvester(obs, harvester):
continue
if self._harvester_patch_targets.get(harvester.actor_id) == (tx, ty):
continue
commands.append(
CommandModel(
action=ActionType.HARVEST,
actor_id=harvester.actor_id,
target_x=tx,
target_y=ty,
)
)
redirected.add(harvester.actor_id)
donor["lack"] = int(donor["lack"]) + 1
need -= 1
self._harvester_patch_targets[harvester.actor_id] = (tx, ty)
self._harvester_reassign_until[harvester.actor_id] = obs.tick + HARVESTER_REASSIGN_COOLDOWN
self._harvester_last_progress_tick[harvester.actor_id] = obs.tick
self._log(
f"Redirecting harv #{harvester.actor_id} -> patch ({tx},{ty}) "
f"from overloaded patch {donor['target']}"
)
return commands, redirected
def _best_refinery_plan(
self,
obs: OpenRAObservation,
) -> Optional[dict[str, Tuple[int, int]]]:
patch_states = self._resource_patch_states(obs)
if not patch_states:
return None
conyards = [b for b in obs.buildings if b.type == "fact"]
if not conyards:
return None
requested = self._current_requested_refinery(obs)
if requested is not None:
request_id, anchor, target = requested
return {
"anchor": anchor,
"target": target,
"request_id": request_id,
} # type: ignore[return-value]
best: Optional[tuple[int, Tuple[int, int], Tuple[int, int]]] = None
for conyard in conyards:
anchor = (
conyard.cell_x if conyard.cell_x > 0 else conyard.pos_x // 1024,
conyard.cell_y if conyard.cell_y > 0 else conyard.pos_y // 1024,
)
for state in patch_states:
target = state["target"] # type: ignore[index]
dist = self._cell_distance(anchor[0], anchor[1], target[0], target[1])
if dist > BASE_BUILD_MAX_RADIUS + RESOURCE_PATCH_SEARCH_MARGIN:
continue
if int(state["nearby_refineries"]) >= MAX_REFINERIES_PER_PATCH:
continue
score = int(state["refinery_score"]) - dist * 8
if int(state["threat"]) == 0 and float(state["depletion_ratio"]) < 0.6:
score += 60
if best is None or score > best[0]:
best = (score, anchor, target)
if best is None:
conyard = conyards[0]
anchor = (
conyard.cell_x if conyard.cell_x > 0 else conyard.pos_x // 1024,
conyard.cell_y if conyard.cell_y > 0 else conyard.pos_y // 1024,
)
state = min(
patch_states,
key=lambda s: self._cell_distance(anchor[0], anchor[1], s["target"][0], s["target"][1]), # type: ignore[index]
)
return {"anchor": anchor, "target": state["target"]} # type: ignore[return-value]
return {"anchor": best[1], "target": best[2]}
def _best_expansion_patch_target(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]:
patch_states = self._resource_patch_states(obs)
if not patch_states:
return None
conyards = [b for b in obs.buildings if b.type == "fact"]
refineries = [b for b in obs.buildings if b.type == "proc"]
active_targets = list(self._mcv_targets.values())
best: Optional[tuple[int, Tuple[int, int]]] = None
for state in patch_states:
target = state["target"] # type: ignore[index]
if conyards and self._nearest_distance_to_buildings(target[0], target[1], conyards) < MCV_FRIENDLY_CONYARD_DISLIKE_RANGE:
continue
if refineries and self._nearest_distance_to_buildings(target[0], target[1], refineries) < MCV_FRIENDLY_REFINERY_DISLIKE_RANGE:
continue
if any(
self._cell_distance(target[0], target[1], active_target[0], active_target[1]) < MCV_FRIENDLY_CONYARD_DISLIKE_RANGE
for active_target in active_targets
):
continue
if int(state["threat"]) > 0:
continue
score = int(state["expansion_score"])
if float(state["depletion_ratio"]) >= 0.8:
score -= 200
if int(state["harvester_count"]) == 0 and int(state["capacity"]) >= 2:
score += 80
if best is None or score > best[0]:
best = (score, target)
return None if best is None or best[0] <= 0 else best[1]
def _naval_build_candidates(
self,
obs: OpenRAObservation,
item_type: str,
occupied: Optional[set[tuple[int, int]]] = None,
) -> list[tuple[int, int]]:
center = self._placement_base_center(obs) or (0, 0)
w, h = self._get_map_size()
if not self._spatial_raw:
return []
occupied = occupied or self._occupied_building_cells(obs)
buildable_area = self._buildable_area_cells(obs)
width, height = self._building_dimensions(item_type)
candidates: set[tuple[int, int]] = set()
origins = [
self._building_top_left(building)
for building in obs.buildings
if self._canonical_building_type(building.type) not in NO_BUILDABLE_AREA_TYPES
]
for ox, oy in origins:
for dx in range(-NAVAL_WATER_SCAN_RADIUS, NAVAL_WATER_SCAN_RADIUS + 1, NAVAL_WATER_SCAN_STRIDE):
for dy in range(-NAVAL_WATER_SCAN_RADIUS, NAVAL_WATER_SCAN_RADIUS + 1, NAVAL_WATER_SCAN_STRIDE):
if dx * dx + dy * dy > NAVAL_WATER_SCAN_RADIUS * NAVAL_WATER_SCAN_RADIUS:
continue
x = ox + dx
y = oy + dy
if x < 0 or y < 0 or x + width > w or y + height > h:
continue
candidate_center = (x + width // 2, y + height // 2)
center_radius = max(abs(candidate_center[0] - center[0]), abs(candidate_center[1] - center[1]))
if center_radius < BASE_BUILD_MIN_RADIUS or center_radius > NAVAL_BUILD_MAX_RADIUS:
continue
if not self._candidate_fits_building_footprint(
obs,
item_type,
x,
y,
occupied=occupied,
base_cells=buildable_area,
):
continue
candidates.add((x, y))
ordered = list(candidates)
ordered.sort(
key=lambda p: (
-self._naval_anchor_score(item_type, p[0], p[1]),
self._cell_distance(p[0], p[1], center[0], center[1]),
)
)
return ordered
def _naval_anchor_score(self, item_type: str, top_left_x: int, top_left_y: int) -> int:
width, height = self._building_dimensions(item_type)
center_x = top_left_x + width // 2
center_y = top_left_y + height // 2
return self._local_water_score(center_x, center_y, 2)
def _naval_gate_open_water_windows(self, obs: OpenRAObservation) -> int:
if not self._spatial_raw:
return 0
# Count only footprint-valid naval anchors with enough surrounding water.
candidates = self._naval_build_candidates(obs, "spen")
if not candidates:
return 0
viable = [
candidate
for candidate in candidates
if self._naval_anchor_score("spen", candidate[0], candidate[1]) >= NAVAL_MIN_WATER_SCORE
]
return min(len(viable), NAVAL_MIN_OPEN_WATER_WINDOWS)
def _best_naval_anchor(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]:
candidates = [
candidate
for candidate in self._naval_build_candidates(obs, "spen")
if self._naval_anchor_score("spen", candidate[0], candidate[1]) >= NAVAL_MIN_WATER_SCORE
]
if len(candidates) < NAVAL_CANDIDATE_MIN_COUNT:
return None
return candidates[0]
def _can_safely_build_naval_structure(self, obs: OpenRAObservation) -> bool:
if obs.tick < self._naval_disabled_until:
return False
if obs.tick - self._last_naval_gate_tick <= NAVAL_GATE_CACHE_TICKS:
return self._cached_naval_gate_ok
if any(self._canonical_building_type(b.type) in NAVAL_STRUCTURE_TYPES for b in obs.buildings):
self._naval_retry_buildable_count = -1
self._cached_naval_gate_ok = True
self._last_naval_gate_tick = obs.tick
return True
elif self._naval_retry_buildable_count >= 0:
if self._buildable_area_structure_count(obs) <= self._naval_retry_buildable_count:
self._cached_naval_gate_ok = False
self._last_naval_gate_tick = obs.tick
return False
self._naval_retry_buildable_count = -1
# Human-like: only enable naval when there is a strong, footprint-valid
# naval anchor near our current buildable area.
if self._naval_gate_open_water_windows(obs) < NAVAL_MIN_OPEN_WATER_WINDOWS:
self._cached_naval_gate_ok = False
self._last_naval_gate_tick = obs.tick
return False
ok = self._best_naval_anchor(obs) is not None
self._cached_naval_gate_ok = ok
self._last_naval_gate_tick = obs.tick
return ok
def _minimum_excess_power_target(self, obs: OpenRAObservation) -> int:
bonus = EXCESS_POWER_INCREMENT * (len(obs.buildings) // max(1, EXCESS_POWER_INCREASE_THRESHOLD))
return max(MINIMUM_EXCESS_POWER, min(MAXIMUM_EXCESS_POWER, MINIMUM_EXCESS_POWER + bonus))
def _has_any_production_building(self, obs: OpenRAObservation) -> bool:
return any(self._canonical_building_type(b.type) in PRODUCTION_BUILDING_TYPES for b in obs.buildings)
def _optimal_refinery_count(self, obs: OpenRAObservation) -> int:
target = INITIAL_MIN_REFINERY_COUNT
if self._has_any_production_building(obs):
target += ADDITIONAL_MIN_REFINERY_COUNT
return target
def _has_adequate_refinery_count(self, obs: OpenRAObservation) -> bool:
refinery_count = sum(1 for b in obs.buildings if b.type == "proc")
has_power = any(b.type in {"powr", "apwr"} for b in obs.buildings)
has_conyard = any(b.type == "fact" for b in obs.buildings)
return (
refinery_count >= self._optimal_refinery_count(obs)
or not has_power
or not has_conyard
)
def _best_power_building(self, obs: OpenRAObservation) -> Optional[str]:
for item in ("apwr", "powr"):
if self._can_produce(obs, item) and self._structure_queue_available(obs, item):
return item
return None
def _best_production_building(self, obs: OpenRAObservation) -> Optional[str]:
candidates = []
counts = self._building_counts(obs)
for item in ("weap", "barr", "tent"):
if not self._can_produce(obs, item):
continue
if not self._structure_queue_available(obs, item):
continue
limit = BUILDING_LIMITS.get(item)
if limit is not None and counts.get(item, 0) >= limit:
continue
candidates.append(item)
if not candidates:
return None
return random.choice(candidates)
def _preferred_early_naval_building(self, obs: OpenRAObservation, credits: int) -> Optional[str]:
if self._in_recovery_mode(obs):
return None
if any(self._canonical_building_type(b.type) in NAVAL_STRUCTURE_TYPES for b in obs.buildings):
return None
if not any(b.type == "proc" for b in obs.buildings):
return None
if not any(b.type in WAR_FACTORY_TYPES for b in obs.buildings):
return None
if not self._can_safely_build_naval_structure(obs):
return None
anchor = self._best_naval_anchor(obs)
if anchor is None or self._naval_anchor_score("spen", anchor[0], anchor[1]) < NAVAL_EARLY_BUILD_WATER_SCORE:
return None
naval_item = self._best_naval_production_building(obs)
if naval_item is None:
return None
if credits < self._build_cost(naval_item) + NAVAL_EARLY_BUILD_CREDIT_BUFFER:
return None
return naval_item
def _best_naval_production_building(self, obs: OpenRAObservation) -> Optional[str]:
candidates = []
counts = self._building_counts(obs)
for item in NAVAL_STRUCTURE_TYPES:
if not self._can_produce(obs, item):
continue
if not self._structure_queue_available(obs, item):
continue
limit = BUILDING_LIMITS.get(item)
if limit is not None and counts.get(item, 0) >= limit:
continue
candidates.append(item)
if not candidates:
return None
return random.choice(candidates)
def _unit_at_limit(self, obs: OpenRAObservation, item_type: str) -> bool:
limit = UNIT_LIMITS.get(item_type)
if limit is None:
return False
current = sum(1 for u in obs.units if u.type == item_type)
current += sum(1 for p in obs.production if p.item == item_type)
current += self._requested_production_count(item_type)
return current >= limit
def _current_unit_count(self, obs: OpenRAObservation, item_type: str) -> int:
return sum(1 for u in obs.units if u.type == item_type)
def _queue_delay_active(self, obs: OpenRAObservation, queue_type: str) -> bool:
return obs.tick < self._queue_delay_until.get(queue_type, -9999)
def _unit_delay_active(self, obs: OpenRAObservation, item_type: str) -> bool:
return obs.tick < self._unit_delay_until.get(item_type, -9999)
def _mark_unit_trained(self, obs: OpenRAObservation, item_type: str, queue_type: str):
queue_delay = QUEUE_PRODUCTION_DELAYS.get(queue_type, 0)
if queue_delay > 0:
self._queue_delay_until[queue_type] = max(
self._queue_delay_until.get(queue_type, -9999),
obs.tick + queue_delay,
)
unit_delay = UNIT_PRODUCTION_DELAYS.get(item_type, 0)
if unit_delay > 0:
self._unit_delay_until[item_type] = max(
self._unit_delay_until.get(item_type, -9999),
obs.tick + unit_delay,
)
def _idle_base_unit_count(self, obs: OpenRAObservation, queue_type: Optional[str] = None) -> int:
base_center = self._base_center(obs)
if base_center is None:
return 0
count = 0
for unit in obs.units:
if unit.type in {"harv", "mcv"}:
continue
if not getattr(unit, "is_idle", False):
continue
if self._cell_distance(unit.cell_x, unit.cell_y, base_center[0], base_center[1]) > IDLE_BASE_UNIT_RADIUS:
continue
if queue_type is not None and self._queue_type_for_unit(unit.type) != queue_type:
continue
count += 1
return count
def _production_support_available(
self,
obs: OpenRAObservation,
item_type: str,
unit_counts: Optional[dict[str, int]] = None,
) -> bool:
if item_type in SHIP_TYPES:
return any(self._canonical_building_type(b.type) in NAVAL_STRUCTURE_TYPES for b in obs.buildings)
if unit_counts is None:
unit_counts = {}
for unit in obs.units:
unit_counts[unit.type] = unit_counts.get(unit.type, 0) + 1
for prod in obs.production:
unit_counts[prod.item] = unit_counts.get(prod.item, 0) + 1
for requested in self._unit_requests:
unit_counts[requested] = unit_counts.get(requested, 0) + 1
if item_type in PLANE_TYPES:
airfields = sum(1 for b in obs.buildings if self._canonical_building_type(b.type) == "afld")
if airfields <= 0:
return False
plane_count = sum(unit_counts.get(t, 0) for t in PLANE_TYPES)
return plane_count < airfields * AIRFIELD_PLANE_CAPACITY
if item_type in AIRCRAFT_TYPES - PLANE_TYPES:
helipads = sum(1 for b in obs.buildings if self._canonical_building_type(b.type) == "hpad")
if helipads <= 0:
return False
aircraft_count = sum(unit_counts.get(t, 0) for t in AIRCRAFT_TYPES - PLANE_TYPES)
return aircraft_count < helipads * HELIPAD_AIRCRAFT_CAPACITY
return True
def _economy_ready_for_tech(self, obs: OpenRAObservation) -> bool:
refinery_count = sum(1 for b in obs.buildings if b.type == "proc")
if refinery_count < 2:
return False
return self._harvester_target(obs) >= max(2, refinery_count)
def _harvester_target(self, obs: OpenRAObservation) -> int:
refinery_count = sum(1 for b in obs.buildings if b.type == "proc")
if refinery_count <= 0:
return 0
target = max(INITIAL_HARVESTERS, refinery_count)
return min(target, UNIT_LIMITS.get("harv", target))
def _should_delay_harvester_request(self, obs: OpenRAObservation, current_harvesters: int) -> bool:
return False
def _desired_unit_share(
self,
obs: OpenRAObservation,
item_type: str,
unit_counts: dict[str, int],
) -> int:
share = UNITS_TO_BUILD.get(item_type, 0)
if share <= 0:
return 0
queue_type = self._queue_type_for_unit(item_type)
if queue_type is not None and self._unit_delay_active(obs, item_type):
return 0
if not self._production_support_available(obs, item_type, unit_counts):
return 0
if item_type == "harv":
return share if unit_counts.get("harv", 0) < self._harvester_target(obs) else 0
return share
def _ensure_mcv_requests(self, obs: OpenRAObservation):
if not any(b.type in WAR_FACTORY_TYPES for b in obs.buildings):
return
mcvs = sum(1 for u in obs.units if u.type == "mcv")
conyards = sum(1 for b in obs.buildings if b.type == "fact")
if (conyards <= 0 and mcvs > 1) or (conyards > 0 and mcvs > 0):
return
pending = sum(1 for p in obs.production if p.item == "mcv") + self._requested_production_count("mcv")
if conyards + mcvs + pending >= self._desired_mcv_count(obs):
return
if pending > 0:
return
self._request_unit_production("mcv")
def _desired_mcv_count(self, obs: OpenRAObservation) -> int:
if self._available_credits(obs) >= BUILD_ADDITIONAL_MCV_CASH_AMOUNT:
return MINIMUM_CONSTRUCTION_YARD_COUNT + ADDITIONAL_CONSTRUCTION_YARD_COUNT
return MINIMUM_CONSTRUCTION_YARD_COUNT
def _expansion_refinery_pending(self, obs: OpenRAObservation) -> bool:
if self._expansion_refinery_goal <= 0:
return False
if obs.tick >= self._expansion_refinery_until_tick:
self._clear_expansion_refinery_need()
return False
current = sum(1 for b in obs.buildings if b.type == "proc")
current += sum(1 for p in obs.production if p.item == "proc")
if current >= self._expansion_refinery_goal:
self._clear_expansion_refinery_need()
return False
return True
def _remember_expansion_refinery_need(self, obs: OpenRAObservation):
current = sum(1 for b in obs.buildings if b.type == "proc")
current += sum(1 for p in obs.production if p.item == "proc")
self._expansion_refinery_goal = max(self._expansion_refinery_goal, current) + 1
self._expansion_refinery_until_tick = max(self._expansion_refinery_until_tick, obs.tick + 2400)
def _clear_expansion_refinery_need(self):
self._expansion_refinery_goal = 0
self._expansion_refinery_until_tick = -9999
def _remember_requested_refinery(
self,
obs: OpenRAObservation,
actor_id: int,
conyard_loc: Tuple[int, int],
resource_loc: Tuple[int, int],
):
self._requested_refineries[actor_id] = (conyard_loc, resource_loc, obs.tick + REQUESTED_REFINERY_TTL)
def _current_requested_refinery(
self,
obs: OpenRAObservation,
) -> Optional[tuple[int, Tuple[int, int], Tuple[int, int]]]:
refineries = [b for b in obs.buildings if b.type == "proc"]
for actor_id, (conyard_loc, resource_loc, expiry_tick) in list(self._requested_refineries.items()):
if expiry_tick <= obs.tick:
self._requested_refineries.pop(actor_id, None)
continue
if (
refineries
and self._nearest_distance_to_buildings(resource_loc[0], resource_loc[1], refineries)
< RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS
):
self._requested_refineries.pop(actor_id, None)
continue
return actor_id, conyard_loc, resource_loc
return None
def _consume_requested_refinery(self, actor_id: int):
self._requested_refineries.pop(actor_id, None)
def _expansion_pressure(self, obs: OpenRAObservation) -> tuple[bool, bool]:
if self._pick_expansion_target(obs) is None:
return False, False
refinery_count = sum(1 for b in obs.buildings if b.type == "proc")
production_count = sum(
1
for b in obs.buildings
if self._canonical_building_type(b.type) in PRODUCTION_BUILDING_TYPES
)
if refinery_count < INITIAL_MIN_REFINERY_COUNT + ADDITIONAL_MIN_REFINERY_COUNT or production_count <= 0:
return False, False
tech_count = sum(
1
for b in obs.buildings
if self._canonical_building_type(b.type) in TECH_BUILDING_TYPES
)
tolerate_on_cash = self._available_credits(obs) // 12000
expand_now = (
production_count
+ tech_count
- random.choice(EXPANSION_TOLERATE_VALUES)
- tolerate_on_cash
>= refinery_count
)
force_undeploy_even_no_base = (
production_count
+ tech_count
- random.choice(FORCE_EXPANSION_TOLERATE_VALUES)
- tolerate_on_cash
>= refinery_count
)
return expand_now, force_undeploy_even_no_base
def _maybe_undeploy_conyard_for_expansion(self, obs: OpenRAObservation) -> Optional[CommandModel]:
if obs.tick - self._last_conyard_undeploy_tick < CONYARD_UNDEPLOY_COOLDOWN:
return None
if any(u.type == "mcv" for u in obs.units):
return None
expand_now, force_undeploy_even_no_base = self._expansion_pressure(obs)
if not expand_now:
return None
expansion_target = self._pick_expansion_target(obs)
if expansion_target is None:
return None
conyards = [b for b in obs.buildings if b.type == "fact"]
if len(conyards) <= 1 and not force_undeploy_even_no_base:
return None
movable_conyards = [
b for b in conyards
if not (getattr(b, "is_producing", False) and getattr(b, "producing_item", "") == "proc")
and obs.tick >= self._mcv_deploy_until.get(b.actor_id, -9999)
]
if not movable_conyards:
return None
movable_conyards.sort(
key=lambda b: (
self._cell_distance(
b.cell_x if b.cell_x > 0 else b.pos_x // 1024,
b.cell_y if b.cell_y > 0 else b.pos_y // 1024,
expansion_target[0],
expansion_target[1],
),
b.actor_id,
)
)
conyard = movable_conyards[0]
self._last_conyard_undeploy_tick = obs.tick
self._mcv_deploy_until[conyard.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN
self._log(f"Undeploying conyard #{conyard.actor_id} for expansion -> {expansion_target}")
return CommandModel(action=ActionType.DEPLOY, actor_id=conyard.actor_id)
def _mcv_deploy_top_left(self, cell_x: int, cell_y: int) -> tuple[int, int]:
return cell_x + MCV_DEPLOY_OFFSET[0], cell_y + MCV_DEPLOY_OFFSET[1]
def _mcv_cell_for_top_left(self, top_left_x: int, top_left_y: int) -> tuple[int, int]:
return top_left_x - MCV_DEPLOY_OFFSET[0], top_left_y - MCV_DEPLOY_OFFSET[1]
def _can_mcv_deploy_at(self, obs: OpenRAObservation, cell_x: int, cell_y: int) -> bool:
top_left_x, top_left_y = self._mcv_deploy_top_left(cell_x, cell_y)
return self._candidate_fits_building_footprint(obs, "fact", top_left_x, top_left_y)
def _best_mcv_deploy_target(
self,
obs: OpenRAObservation,
mcv: UnitInfoModel,
expansion_target: Tuple[int, int],
) -> Optional[Tuple[int, int]]:
candidates = self._placement_candidates(
obs,
"fact",
expansion_target[0],
expansion_target[1],
MCV_MIN_DEPLOY_RADIUS,
MCV_MAX_DEPLOY_RADIUS,
)
if not candidates:
return None
source = (mcv.cell_x, mcv.cell_y)
deploy_cells = [self._mcv_cell_for_top_left(x, y) for x, y in candidates]
deploy_cells.sort(
key=lambda cell: (
abs(self._cell_distance(cell[0], cell[1], expansion_target[0], expansion_target[1]) - MCV_TRY_MAINTAIN_RANGE),
self._cell_distance(cell[0], cell[1], source[0], source[1]),
self._cell_distance(cell[0], cell[1], expansion_target[0], expansion_target[1]),
)
)
return deploy_cells[0]
def _pick_expansion_target(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]:
patch_target = self._best_expansion_patch_target(obs)
if patch_target is not None:
return patch_target
patch_states = self._resource_patch_states(obs)
fallback_patch_states = [
state
for state in patch_states
if int(state["threat"]) == 0
and float(state["depletion_ratio"]) < 0.9
]
if fallback_patch_states:
best_state = max(
fallback_patch_states,
key=lambda state: (
int(state["expansion_score"]),
int(state["capacity"]),
-int(state["base_distance"]),
),
)
if int(best_state["expansion_score"]) > -200:
return best_state["target"] # type: ignore[return-value]
conyards = [b for b in obs.buildings if b.type == "fact"]
refineries = [b for b in obs.buildings if b.type == "proc"]
candidates = self._search_grid(obs)
for target in candidates:
if self._nearest_distance_to_buildings(target[0], target[1], conyards) < MCV_FRIENDLY_CONYARD_DISLIKE_RANGE:
continue
if refineries and self._nearest_distance_to_buildings(target[0], target[1], refineries) < MCV_FRIENDLY_REFINERY_DISLIKE_RANGE:
continue
return target
return candidates[0] if candidates else None
def _nearest_distance_to_buildings(self, x: int, y: int, buildings: list[BuildingInfoModel]) -> int:
if not buildings:
return 10**9
return min(
self._cell_distance(
x,
y,
b.cell_x if b.cell_x > 0 else b.pos_x // 1024,
b.cell_y if b.cell_y > 0 else b.pos_y // 1024,
)
for b in buildings
)
def _cell_distance(self, ax: int, ay: int, bx: int, by: int) -> int:
return abs(ax - bx) + abs(ay - by)
def _queue_type_for_unit(self, item_type: str) -> Optional[str]:
if item_type == "mcv":
return "Vehicle"
for queue_type, allowed in UNIT_QUEUE_ORDER:
if item_type in allowed:
return queue_type
return None
def _roll_assault_threshold(self) -> int:
return SQUAD_SIZE + random.randrange(SQUAD_SIZE_RANDOM_BONUS)
def _select_squad_leader(self, squad_units: list[UnitInfoModel]) -> UnitInfoModel:
avg_x = sum(u.cell_x for u in squad_units) / len(squad_units)
avg_y = sum(u.cell_y for u in squad_units) / len(squad_units)
return min(squad_units, key=lambda u: (u.cell_x - avg_x) ** 2 + (u.cell_y - avg_y) ** 2)
def _attack_wave_units(
self,
obs: OpenRAObservation,
squad_units: list[UnitInfoModel],
) -> list[UnitInfoModel]:
return squad_units
def _regroup_squad_commands(
self,
squad_units: list[UnitInfoModel],
leader: UnitInfoModel,
regroup_radius: int = REGROUP_RADIUS,
min_close_units: Optional[int] = None,
circular: bool = False,
) -> List[CommandModel]:
def within_radius(unit: UnitInfoModel) -> bool:
dx = unit.cell_x - leader.cell_x
dy = unit.cell_y - leader.cell_y
if circular:
return dx * dx + dy * dy <= regroup_radius * regroup_radius
return self._cell_distance(unit.cell_x, unit.cell_y, leader.cell_x, leader.cell_y) <= regroup_radius
close_units = [
u for u in squad_units
if within_radius(u)
]
if min_close_units is None:
min_close_units = max(2, int(len(squad_units) * 0.4))
if len(close_units) >= min_close_units:
return []
commands = [CommandModel(action=ActionType.STOP, actor_id=leader.actor_id)]
redirected = 0
for u in squad_units:
if u.actor_id == leader.actor_id:
continue
if not within_radius(u):
commands.append(CommandModel(
action=ActionType.ATTACK_MOVE,
actor_id=u.actor_id,
target_x=leader.cell_x,
target_y=leader.cell_y,
))
redirected += 1
if redirected:
self._log(f"Regrouping {redirected}/{len(squad_units)} units around leader")
return commands
return []
def _visible_enemy_units_near(
self,
obs: OpenRAObservation,
x: int,
y: int,
radius: int,
) -> list[UnitInfoModel]:
return [
e for e in obs.visible_enemies
if self._cell_distance(x, y, e.cell_x, e.cell_y) <= radius
]
def _visible_enemy_buildings_near(
self,
obs: OpenRAObservation,
x: int,
y: int,
radius: int,
) -> list[BuildingInfoModel]:
return [
b for b in obs.visible_enemy_buildings
if self._cell_distance(x, y, b.cell_x, b.cell_y) <= radius
]
def _estimate_combat_power(self, actor) -> float:
base = UNIT_COMBAT_POWER.get(actor.type, 0)
if base == 0:
base = BUILDING_THREAT_POWER.get(actor.type, 0)
if base == 0:
return 0.0
hp = getattr(actor, "hp_percent", 1.0)
speed = getattr(actor, "speed", 0)
attack_range = getattr(actor, "attack_range", 0)
return base * max(0.2, hp) * (1.0 + min(speed / 200.0, 0.25) + min(attack_range / 12000.0, 0.25))
def _building_can_attack(self, building: BuildingInfoModel) -> bool:
return (
self._canonical_building_type(building.type) in ATTACKING_BUILDING_TYPES
and getattr(building, "is_powered", True)
)
def _actor_cell(self, actor) -> Tuple[int, int]:
if getattr(actor, "cell_x", 0) > 0 or getattr(actor, "cell_y", 0) > 0:
return actor.cell_x, actor.cell_y
return actor.pos_x // 1024, actor.pos_y // 1024
def _attack_or_flee_rules(self, rush: bool) -> tuple[tuple[tuple[str, ...], tuple[str, ...], tuple[str, ...], tuple[str, ...], str], ...]:
return FUZZY_RUSH_RULES if rush else FUZZY_DEFAULT_RULES
def _fuzzy_trapezoid(self, value: float, left: float, left_top: float, right_top: float, right: float) -> float:
if value <= left or value >= right:
if (value == left == left_top) or (value == right == right_top):
return 1.0
return 0.0
if left_top <= value <= right_top:
return 1.0
if value < left_top:
if left_top == left:
return 1.0
return (value - left) / (left_top - left)
if right == right_top:
return 1.0
return (right - value) / (right - right_top)
def _fuzzy_input_membership(self, variable: str, term: str, value: float) -> float:
if variable in {"OwnHealth", "EnemyHealth"}:
shapes = {
"NearDead": (0.0, 0.0, 20.0, 40.0),
"Injured": (30.0, 50.0, 50.0, 70.0),
"Normal": (50.0, 80.0, 100.0, 100.0),
}
else:
shapes = {
"Weak": (0.0, 0.0, 70.0, 90.0),
"Equal": (85.0, 100.0, 100.0, 115.0),
"Strong": (110.0, 150.0, 150.0, 1000.0),
"Slow": (0.0, 0.0, 70.0, 90.0),
"Fast": (110.0, 150.0, 150.0, 1000.0),
}
left, left_top, right_top, right = shapes[term]
return self._fuzzy_trapezoid(value, left, left_top, right_top, right)
def _fuzzy_output_membership(self, term: str, value: float) -> float:
shapes = {
"Attack": (0.0, 15.0, 15.0, 30.0),
"Flee": (25.0, 35.0, 35.0, 50.0),
}
left, left_top, right_top, right = shapes[term]
return self._fuzzy_trapezoid(value, left, left_top, right_top, right)
def _normalized_health(self, actors: list) -> float:
if not actors:
return 0.0
return max(0.0, min(100.0, 100.0 * sum(max(0.0, getattr(actor, "hp_percent", 1.0)) for actor in actors) / len(actors)))
def _attack_power_metric(self, actors: list) -> float:
total = 0.0
for actor in actors:
if isinstance(actor, UnitInfoModel):
if actor.can_attack:
total += UNIT_COMBAT_POWER.get(actor.type, 0)
elif isinstance(actor, BuildingInfoModel) and self._building_can_attack(actor):
total += BUILDING_THREAT_POWER.get(actor.type, 0)
return total
def _speed_metric(self, actors: list) -> float:
speeds = [max(0, getattr(actor, "speed", 0)) for actor in actors if isinstance(actor, UnitInfoModel) and getattr(actor, "speed", 0) > 0]
if not speeds:
return 0.0
return sum(speeds) / len(speeds)
def _relative_metric(self, own_value: float, enemy_value: float) -> float:
if enemy_value <= 0:
return 999.0 if own_value > 0 else 100.0
if own_value <= 0:
return 0.0
return max(0.0, min(999.0, own_value / enemy_value * 100.0))
def _attack_or_flee_score(self, own_actors: list, enemy_actors: list, rush: bool) -> Optional[float]:
inputs = {
"OwnHealth": self._normalized_health(own_actors),
"EnemyHealth": self._normalized_health(enemy_actors),
"RelativeAttackPower": self._relative_metric(
self._attack_power_metric(own_actors),
self._attack_power_metric(enemy_actors),
),
"RelativeSpeed": self._relative_metric(
self._speed_metric(own_actors),
self._speed_metric(enemy_actors),
),
}
activation = {"Attack": 0.0, "Flee": 0.0}
for own_terms, enemy_terms, power_terms, speed_terms, outcome in self._attack_or_flee_rules(rush):
degree = min(
max(self._fuzzy_input_membership("OwnHealth", term, inputs["OwnHealth"]) for term in own_terms),
max(self._fuzzy_input_membership("EnemyHealth", term, inputs["EnemyHealth"]) for term in enemy_terms),
max(self._fuzzy_input_membership("RelativeAttackPower", term, inputs["RelativeAttackPower"]) for term in power_terms),
max(self._fuzzy_input_membership("RelativeSpeed", term, inputs["RelativeSpeed"]) for term in speed_terms),
)
activation[outcome] = max(activation[outcome], degree)
if activation["Attack"] <= 0.0 and activation["Flee"] <= 0.0:
return None
numerator = 0.0
denominator = 0.0
for sample in range(101):
value = sample * 0.5
membership = max(
min(activation["Attack"], self._fuzzy_output_membership("Attack", value)),
min(activation["Flee"], self._fuzzy_output_membership("Flee", value)),
)
numerator += value * membership
denominator += membership
if denominator <= 0.0:
return None
return numerator / denominator
def _attack_or_flee_can_attack(self, own_actors: list, enemy_actors: list, rush: bool) -> bool:
score = self._attack_or_flee_score(own_actors, enemy_actors, rush)
return score is not None and score < 30.0
def _select_rush_target(
self,
obs: OpenRAObservation,
rush_units: list[UnitInfoModel],
) -> Optional[Tuple[int, int, int, str]]:
attackable_units = [
unit for unit in rush_units
if unit.can_attack and unit.type not in AIRCRAFT_TYPES | SHIP_TYPES
]
if not attackable_units:
return None
sample_unit = random.choice(attackable_units)
conyards = [
building for building in obs.visible_enemy_buildings
if self._canonical_building_type(building.type) == "fact"
]
conyards.sort(key=lambda building: self._cell_distance(sample_unit.cell_x, sample_unit.cell_y, *self._actor_cell(building)))
for conyard in conyards:
cx, cy = self._actor_cell(conyard)
defenders: list = [
enemy
for enemy in obs.visible_enemies
if enemy.can_attack
and enemy.type not in AIRCRAFT_TYPES | SHIP_TYPES
and self._cell_distance(enemy.cell_x, enemy.cell_y, cx, cy) <= RUSH_ATTACK_SCAN_RADIUS
]
defenders.extend(
building
for building in obs.visible_enemy_buildings
if building.actor_id != conyard.actor_id
and self._building_can_attack(building)
and self._cell_distance(*self._actor_cell(building), cx, cy) <= RUSH_ATTACK_SCAN_RADIUS
)
if not self._attack_or_flee_can_attack(attackable_units, defenders, rush=True):
continue
target = random.choice(defenders) if defenders else conyard
target_kind = "unit" if isinstance(target, UnitInfoModel) else "building"
tx, ty = self._actor_cell(target)
return tx, ty, target.actor_id, target_kind
return None
def _should_take_local_fight(
self,
squad_units: list[UnitInfoModel],
enemy_units: list[UnitInfoModel],
enemy_buildings: list[BuildingInfoModel],
rush: bool,
cautious: bool = False,
squad_name: str = "assault",
) -> bool:
own_units = [u for u in squad_units if u.can_attack]
if not own_units:
return False
if squad_name in {"assault", "rush"}:
enemy_actors = list(enemy_units) + list(enemy_buildings)
if cautious and self._normalized_health(own_units) < 55.0:
return False
return self._attack_or_flee_can_attack(own_units, enemy_actors, rush=rush)
own_power = sum(self._estimate_combat_power(u) for u in own_units)
enemy_power = (
sum(self._estimate_combat_power(u) for u in enemy_units)
+ sum(self._estimate_combat_power(b) * 0.7 for b in enemy_buildings)
)
own_avg_hp = sum(u.hp_percent for u in own_units) / max(1, len(own_units))
enemy_avg_hp = (
sum(u.hp_percent for u in enemy_units) + sum(b.hp_percent for b in enemy_buildings)
) / max(1, len(enemy_units) + len(enemy_buildings))
own_avg_speed = sum(max(1, getattr(u, "speed", 1)) for u in own_units) / max(1, len(own_units))
enemy_avg_speed = sum(max(1, getattr(u, "speed", 1)) for u in enemy_units) / max(1, len(enemy_units))
if enemy_power <= 1:
return True
power_ratio = own_power / enemy_power
speed_ratio = own_avg_speed / max(1.0, enemy_avg_speed)
required_ratio = 1.05
min_hp = 0.48
if squad_name == "protection":
required_ratio = 0.92
min_hp = 0.4
elif squad_name == "rush":
required_ratio = 1.0
min_hp = 0.55
elif squad_name in {"air", "naval"}:
required_ratio = 1.08
min_hp = 0.5
if rush:
required_ratio = min(required_ratio, 1.04)
if cautious:
required_ratio += 0.12
min_hp = max(min_hp, 0.5)
if own_avg_hp < RETREAT_HEALTH_THRESHOLD:
required_ratio += 0.18
elif own_avg_hp >= enemy_avg_hp:
required_ratio -= 0.08
if speed_ratio < 0.9 and squad_name not in {"air", "rush"}:
required_ratio += 0.05
if squad_name == "air" and any(b.type in {"sam", "agun", "tsla"} for b in enemy_buildings):
required_ratio += 0.15
if squad_name == "naval" and enemy_buildings:
required_ratio += 0.08
if squad_name != "protection" and any(b.type in {"tsla", "gun", "ftur", "agun"} for b in enemy_buildings):
required_ratio += 0.08
if not enemy_units and not any(b.type in DEFENSE_STRUCTURE_TYPES | {"agun", "sam"} for b in enemy_buildings):
required_ratio -= 0.1
required_ratio = max(0.82, required_ratio)
return own_avg_hp >= min_hp and power_ratio >= required_ratio
def _pick_priority_target(
self,
obs: OpenRAObservation,
x: Optional[int],
y: Optional[int],
local_only: bool,
squad_name: str = "assault",
) -> Optional[Tuple[int, int, int, str, str]]:
best: Optional[Tuple[float, Tuple[int, int, int, str, str]]] = None
local_radius = LOCAL_FIGHT_RADIUS + (4 if squad_name in {"air", "naval"} else 0)
for b in obs.visible_enemy_buildings:
if local_only and x is not None and y is not None and self._cell_distance(x, y, b.cell_x, b.cell_y) > local_radius:
continue
priority = TARGET_BUILDING_PRIORITY.get(b.type, 40)
if squad_name == "protection":
priority += 8 if b.type in DEFENSE_STRUCTURE_TYPES else -12
elif squad_name == "rush":
if b.type in {"proc", "weap", "fact", "powr", "apwr"}:
priority += 12
elif squad_name == "air":
if b.type in {"proc", "weap", "fact", "powr", "apwr", "hpad", "afld", "afld.ukraine"}:
priority += 14
if b.type in {"sam", "agun", "tsla"}:
priority -= 25
elif squad_name == "naval":
if b.type in NAVAL_STRUCTURE_TYPES | {"proc", "weap"}:
priority += 10
if b.type in {"sam", "agun", "tsla"}:
priority -= 10
dist = self._cell_distance(x, y, b.cell_x, b.cell_y) if x is not None and y is not None else 0
score = priority * 1000 - dist * 20 + (1.0 - b.hp_percent) * 120
candidate = (b.actor_id, b.cell_x, b.cell_y, b.type, "building")
if best is None or score > best[0]:
best = (score, candidate)
for e in obs.visible_enemies:
if local_only and x is not None and y is not None and self._cell_distance(x, y, e.cell_x, e.cell_y) > local_radius:
continue
if "husk" in e.type:
continue
if not e.can_attack and e.type not in {"harv", "mcv"} and squad_name not in {"rush", "air"}:
continue
priority = TARGET_UNIT_PRIORITY.get(e.type, 30 if e.can_attack else 10)
if squad_name == "protection":
if e.can_attack:
priority += 15
if e.type in {"harv", "mcv"}:
priority -= 20
elif squad_name == "rush":
if e.type in {"harv", "mcv", "arty", "v2rl"}:
priority += 10
elif squad_name == "air":
if e.type in {"harv", "mcv", "arty", "v2rl", "ftrk"}:
priority += 14
elif squad_name == "naval":
if e.type in SHIP_TYPES:
priority += 18
elif e.type in AIRCRAFT_TYPES:
priority -= 10
dist = self._cell_distance(x, y, e.cell_x, e.cell_y) if x is not None and y is not None else 0
score = priority * 1000 - dist * 25 + (1.0 - e.hp_percent) * 150
candidate = (e.actor_id, e.cell_x, e.cell_y, e.type, "unit")
if best is None or score > best[0]:
best = (score, candidate)
return best[1] if best is not None else None
def _pick_closest_visible_target(
self,
obs: OpenRAObservation,
x: Optional[int],
y: Optional[int],
squad_name: str = "assault",
) -> Optional[Tuple[int, int, int, str, str]]:
if x is None or y is None:
return self._pick_priority_target(obs, x, y, local_only=False, squad_name=squad_name)
best: Optional[Tuple[tuple[int, int, int], Tuple[int, int, int, str, str]]] = None
for b in obs.visible_enemy_buildings:
priority = TARGET_BUILDING_PRIORITY.get(b.type, 40)
if squad_name == "protection":
priority += 8 if b.type in DEFENSE_STRUCTURE_TYPES else -12
elif squad_name == "rush" and b.type in {"proc", "weap", "fact", "powr", "apwr"}:
priority += 12
elif squad_name == "air":
if b.type in {"proc", "weap", "fact", "powr", "apwr", "hpad", "afld", "afld.ukraine"}:
priority += 14
if b.type in {"sam", "agun", "tsla"}:
priority -= 25
elif squad_name == "naval":
if b.type in NAVAL_STRUCTURE_TYPES | {"proc", "weap"}:
priority += 10
if b.type in {"sam", "agun", "tsla"}:
priority -= 10
dist = self._cell_distance(x, y, b.cell_x, b.cell_y)
key = (dist, -priority, -int((1.0 - b.hp_percent) * 100))
candidate = (b.actor_id, b.cell_x, b.cell_y, b.type, "building")
if best is None or key < best[0]:
best = (key, candidate)
for e in obs.visible_enemies:
if "husk" in e.type:
continue
if not e.can_attack and e.type not in {"harv", "mcv"} and squad_name not in {"rush", "air"}:
continue
priority = TARGET_UNIT_PRIORITY.get(e.type, 30 if e.can_attack else 10)
if squad_name == "protection":
if e.can_attack:
priority += 15
if e.type in {"harv", "mcv"}:
priority -= 20
elif squad_name == "rush" and e.type in {"harv", "mcv", "arty", "v2rl"}:
priority += 10
elif squad_name == "air" and e.type in {"harv", "mcv", "arty", "v2rl", "ftrk"}:
priority += 14
elif squad_name == "naval":
if e.type in SHIP_TYPES:
priority += 18
elif e.type in AIRCRAFT_TYPES:
priority -= 10
dist = self._cell_distance(x, y, e.cell_x, e.cell_y)
key = (dist, -priority, -int((1.0 - e.hp_percent) * 100))
candidate = (e.actor_id, e.cell_x, e.cell_y, e.type, "unit")
if best is None or key < best[0]:
best = (key, candidate)
return best[1] if best is not None else None
def _target_actor_is_visible(
self,
obs: OpenRAObservation,
target_actor_id: int,
target_kind: str,
) -> bool:
return self._visible_actor_by_target(obs, target_actor_id, target_kind) is not None
def _focus_fire_commands(
self,
squad_units: list[UnitInfoModel],
target: Tuple[int, int, int, str, str],
) -> List[CommandModel]:
target_actor_id, tx, ty, _, target_kind = target
commands = []
for u in squad_units:
if not u.can_attack or self._busy_attacking(u):
continue
commands.append(CommandModel(
action=ActionType.ATTACK,
actor_id=u.actor_id,
target_actor_id=target_actor_id,
target_x=tx,
target_y=ty,
))
if commands:
self._record_attack_issue(
direct_attack=True,
command_count=len(commands),
target_actor_id=target_actor_id,
target_kind=target_kind,
)
return commands
def _record_attack_issue(
self,
direct_attack: bool,
command_count: int,
target_actor_id: int = 0,
target_kind: str = "point",
) -> None:
if command_count <= 0:
return
if direct_attack:
self._attack_commands_issued += command_count
else:
self._attack_move_commands_issued += command_count
if target_actor_id <= 0:
return
if target_kind == "unit":
self._unit_target_events += 1
self._unique_unit_targets.add(target_actor_id)
elif target_kind == "building":
self._building_target_events += 1
self._unique_building_targets.add(target_actor_id)
def get_attack_stats(self, obs: Optional[OpenRAObservation] = None) -> dict[str, int]:
stats = {
"attack_commands": self._attack_commands_issued,
"attack_move_commands": self._attack_move_commands_issued,
"unit_target_events": self._unit_target_events,
"building_target_events": self._building_target_events,
"unique_unit_targets": len(self._unique_unit_targets),
"unique_building_targets": len(self._unique_building_targets),
}
if obs is not None:
stats["units_killed"] = obs.military.units_killed
stats["buildings_killed"] = obs.military.buildings_killed
return stats
def get_squad_stats(self) -> dict[str, object]:
return {
"idle_ground": len(self._idle_ground_units),
"attack_squad": len(self._attack_squad),
"rush_squad": len(self._rush_squad),
"protection_squad": len(self._protection_squad),
"air_squad": len(self._air_squad),
"naval_squad": len(self._naval_squad),
"assault_threshold": self._assault_threshold,
"states": dict(self._squad_states),
"targets": {name: self._squad_target_point[name] for name in self._squad_target_point},
}
def _busy_attacking(self, unit: UnitInfoModel) -> bool:
if unit.is_idle:
return False
activity = getattr(unit, "current_activity", "").lower().replace(" ", "")
if not activity or "attackmove" in activity:
return False
return "attack" in activity
def _retreat_squad_commands(
self,
obs: OpenRAObservation,
squad_units: list[UnitInfoModel],
leader: UnitInfoModel,
) -> List[CommandModel]:
fallback = self._pick_retreat_point(obs, leader)
if fallback is None:
return []
tx, ty = fallback
return [
CommandModel(
action=ActionType.MOVE,
actor_id=u.actor_id,
target_x=tx,
target_y=ty,
)
for u in squad_units
]
def _pick_retreat_point(
self,
obs: OpenRAObservation,
leader: UnitInfoModel,
) -> Optional[Tuple[int, int]]:
if not obs.buildings:
return self._base_center(obs)
type_bonus = {
"fact": 420,
"weap": 280,
"proc": 260,
"ftur": 220,
"gun": 200,
"tsla": 220,
"pbox": 140,
"hbox": 140,
"powr": 90,
"apwr": 110,
}
best_score: Optional[int] = None
best_pos: Optional[Tuple[int, int]] = None
for building in obs.buildings:
bx = building.cell_x if building.cell_x > 0 else building.pos_x // 1024
by = building.cell_y if building.cell_y > 0 else building.pos_y // 1024
leader_dist = self._cell_distance(leader.cell_x, leader.cell_y, bx, by)
enemy_clearance = min(
[self._cell_distance(enemy.cell_x, enemy.cell_y, bx, by) for enemy in obs.visible_enemies] + [LOCAL_FIGHT_RADIUS + 8]
)
static_clearance = min(
[self._cell_distance(enemy.cell_x, enemy.cell_y, bx, by) for enemy in obs.visible_enemy_buildings] + [LOCAL_FIGHT_RADIUS + 8]
)
canonical = self._canonical_building_type(building.type)
score = type_bonus.get(canonical, 80)
score += min(enemy_clearance, LOCAL_FIGHT_RADIUS + 8) * 18
score += min(static_clearance, LOCAL_FIGHT_RADIUS + 8) * 10
score -= leader_dist * 8
if canonical in DEFENSE_STRUCTURE_TYPES:
score += 40
if best_score is None or score > best_score:
best_score = score
best_pos = (bx, by)
return best_pos or self._base_center(obs)
def _has_own_building_near(self, obs: OpenRAObservation, x: int, y: int, radius: int) -> bool:
return any(
self._cell_distance(
building.cell_x if building.cell_x > 0 else building.pos_x // 1024,
building.cell_y if building.cell_y > 0 else building.pos_y // 1024,
x,
y,
) <= radius
for building in obs.buildings
)
def _credits_str(self, obs: OpenRAObservation) -> str:
return (
f"${obs.economy.cash} cash + ${obs.economy.ore} ore"
f" = ${self._available_credits(obs)}"
)
def _can_produce(self, obs: OpenRAObservation, item_type: str) -> bool:
if item_type in obs.available_production:
return True
for b in obs.buildings:
if item_type in b.can_produce:
return True
return False
def _log(self, msg: str):
if self.verbose:
print(f" [NormalAI] {msg}")