"""Python reimplementation of OpenRA's Normal AI (ModularBot@NormalAI). Mirrors the C# modular bot architecture with these managers: - Economy (HarvesterBotModule@normal-turtle) - Base building (BaseBuilderBotModule@normal) - Unit production (UnitBuilderBotModule@normal) - Squads (SquadManagerBotModule@normal) - Repairs (BuildingRepairBotModule) - Power (PowerDownBotModule) All unit weights are taken directly from mods/ra/rules/ai.yaml. """ import base64 import random import struct from typing import List, Optional, Tuple from openra_env.models import ( ActionType, BuildingInfoModel, CommandModel, OpenRAAction, OpenRAObservation, UnitInfoModel, ) STANCE_DEFEND = 2 STANCE_ATTACK_ANYTHING = 3 # --------------------------------------------------------------------------- # Constants from ai.yaml (@normal) # --------------------------------------------------------------------------- UNITS_TO_BUILD: dict[str, int] = { "e1": 65, "e2": 15, "e3": 30, "e4": 15, "e7": 1, "dog": 15, "shok": 15, "harv": 15, "apc": 30, "jeep": 20, "arty": 15, "v2rl": 40, "ftrk": 30, "1tnk": 40, "2tnk": 50, "3tnk": 50, "4tnk": 25, "ttnk": 25, "stnk": 5, "heli": 30, "mh60": 30, "mig": 30, "yak": 30, "ss": 10, "msub": 10, "dd": 10, "ca": 10, "pt": 10, } UNIT_LIMITS: dict[str, int] = {"dog": 4, "harv": 8, "jeep": 4, "ftrk": 4} INFANTRY_TYPES = {"e1", "e2", "e3", "e4", "e7", "shok", "dog"} VEHICLE_TYPES = {"harv", "apc", "jeep", "arty", "v2rl", "ftrk", "1tnk", "2tnk", "3tnk", "4tnk", "ttnk", "stnk", "mcv"} AIRCRAFT_TYPES = {"heli", "mh60", "mig", "yak", "hind"} PLANE_TYPES = {"mig", "yak"} SHIP_TYPES = {"ss", "msub", "dd", "ca", "pt"} COMBAT_TYPES = ( {"e1", "e2", "e3", "e4", "e7", "shok"} | {"apc", "jeep", "arty", "v2rl", "ftrk", "1tnk", "2tnk", "3tnk", "4tnk", "ttnk", "stnk"} | SHIP_TYPES | AIRCRAFT_TYPES ) SQUAD_SIZE = 40 SQUAD_SIZE_RANDOM_BONUS = 30 EXCLUDE_FROM_SQUADS = {"harv", "mcv", "dog", "badr.bomber", "u2"} BARRACKS_TYPES = {"tent", "barr"} WAR_FACTORY_TYPES = {"weap"} PRODUCTION_BUILDING_TYPES = BARRACKS_TYPES | WAR_FACTORY_TYPES TECH_BUILDING_TYPES = {"dome", "atek", "stek", "fix", "afld", "afld.ukraine", "hpad"} POWER_DOWN_TYPES = {"dome", "tsla", "mslo", "agun", "sam"} PROTECTION_TYPES = { "harv", "mcv", "mslo", "gap", "spen", "syrd", "iron", "pdox", "tsla", "agun", "dome", "pbox", "hbox", "gun", "ftur", "sam", "atek", "weap", "fact", "proc", "silo", "hpad", "afld", "afld.ukraine", "powr", "apwr", "stek", "barr", "kenn", "tent", "fix", "fpwr", "tenf", "syrf", "spef", "weaf", "domf", "fixf", "fapw", "atef", "pdof", "mslf", "facf", } UNIT_QUEUE_ORDER: tuple[tuple[str, set[str]], ...] = ( ("Vehicle", VEHICLE_TYPES - {"mcv"}), ("Infantry", INFANTRY_TYPES), ("Plane", PLANE_TYPES), ("Ship", SHIP_TYPES), ("Aircraft", AIRCRAFT_TYPES - PLANE_TYPES), ) STRUCTURE_QUEUE_TYPES = {"Building", "Defense"} DEFENSE_STRUCTURE_TYPES = {"pbox", "hbox", "gun", "ftur", "tsla", "agun", "sam", "gap", "mslo"} ATTACKING_BUILDING_TYPES = {"pbox", "hbox", "gun", "ftur", "tsla", "agun", "sam"} NAVAL_STRUCTURE_TYPES = {"spen", "syrd"} ENEMY_FACING_STRUCTURE_TYPES = {"pbox", "hbox", "gun", "ftur", "tsla", "agun", "sam"} NO_BUILDABLE_AREA_TYPES = NAVAL_STRUCTURE_TYPES | {"silo", "kenn"} BUILDING_VARIANT_CHOICES: dict[str, tuple[str, ...]] = { "barracks": ("tent", "barr"), "afld": ("afld", "afld.ukraine"), } BUILDING_CANONICAL_TYPES: dict[str, str] = {"afld.ukraine": "afld"} BUILDING_DIMENSIONS: dict[str, tuple[int, int]] = { "fact": (3, 4), "powr": (2, 3), "apwr": (3, 3), "proc": (3, 4), "weap": (3, 4), "barr": (2, 3), "tent": (2, 3), "dome": (2, 3), "atek": (2, 3), "hpad": (2, 3), "afld": (3, 2), "afld.ukraine": (3, 2), "fix": (3, 3), "stek": (3, 3), "spen": (3, 3), "syrd": (3, 3), "sam": (2, 1), "mslo": (2, 1), "silo": (2, 1), "kenn": (2, 2), "pbox": (2, 1), "hbox": (2, 1), "gun": (2, 2), "ftur": (2, 2), "tsla": (2, 2), "agun": (2, 2), "gap": (3, 3), } BUILDING_TOPLEFT_OFFSETS: dict[str, tuple[int, int]] = { "fact": (1, 1), "powr": (1, 1), "apwr": (1, 1), "proc": (1, 1), "weap": (1, 1), "barr": (1, 1), "tent": (1, 1), "dome": (1, 1), "atek": (1, 1), "hpad": (1, 1), "afld": (1, 1), "afld.ukraine": (1, 1), "fix": (1, 1), "stek": (1, 1), "spen": (1, 1), "syrd": (1, 1), "sam": (1, 0), "mslo": (1, 0), "silo": (1, 0), "kenn": (1, 1), "pbox": (1, 0), "hbox": (1, 0), "gun": (1, 1), "ftur": (1, 1), "tsla": (1, 1), "agun": (1, 1), "gap": (1, 1), } BUILDING_COSTS: dict[str, int] = { "powr": 300, "apwr": 500, "proc": 1400, "weap": 2000, "barr": 500, "tent": 500, "kenn": 200, "dome": 1500, "hpad": 500, "afld": 500, "afld.ukraine": 500, "fix": 1200, "atek": 1500, "stek": 1500, "silo": 150, "pbox": 600, "hbox": 750, "gun": 800, "ftur": 600, "tsla": 1200, "agun": 800, "sam": 700, "gap": 800, "mslo": 2500, "spen": 800, "syrd": 1000, } BUILDING_LIMITS: dict[str, int] = { "barr": 7, "tent": 7, "dome": 1, "weap": 4, "hpad": 4, "afld": 4, "atek": 1, "stek": 1, "fix": 1, "kenn": 1, "mslo": 1, "spen": 1, "syrd": 1, } BUILDING_FRACTIONS: dict[str, int] = { "powr": 1, "proc": 1, "tent": 3, "barr": 3, "kenn": 1, "dome": 1, "weap": 4, "hpad": 1, "spen": 1, "syrd": 1, "afld": 1, "pbox": 9, "gun": 9, "ftur": 10, "tsla": 5, "gap": 2, "fix": 1, "agun": 5, "sam": 1, "atek": 1, "stek": 1, "mslo": 1, } BUILDING_DELAYS: dict[str, int] = { "dome": 6000, "fix": 3000, "pbox": 1500, "gun": 2000, "ftur": 1500, "tsla": 2800, "kenn": 7000, "atek": 9000, "stek": 9000, "spen": 6000, "syrd": 6000, } UNIT_COMBAT_POWER: dict[str, int] = { "e1": 25, "e2": 20, "e3": 45, "e4": 35, "e7": 30, "shok": 55, "dog": 5, "jeep": 45, "apc": 55, "arty": 110, "v2rl": 120, "ftrk": 70, "1tnk": 90, "2tnk": 120, "3tnk": 145, "4tnk": 135, "ttnk": 130, "stnk": 120, "heli": 95, "mh60": 90, "mig": 100, "yak": 95, "hind": 95, "ss": 90, "msub": 90, "dd": 100, "ca": 120, "pt": 60, } BUILDING_THREAT_POWER: dict[str, int] = { "pbox": 40, "hbox": 45, "gun": 90, "ftur": 110, "tsla": 160, "agun": 120, "sam": 70, "fact": 55, "weap": 60, "proc": 55, "dome": 40, "atek": 45, "stek": 45, "fix": 40, } TARGET_BUILDING_PRIORITY: dict[str, int] = { "fact": 100, "weap": 95, "proc": 90, "dome": 82, "fix": 80, "atek": 78, "stek": 78, "afld": 75, "afld.ukraine": 75, "hpad": 72, "barr": 68, "tent": 68, "powr": 62, "apwr": 62, "silo": 58, "tsla": 56, "agun": 54, "ftur": 52, "gun": 50, "sam": 48, "pbox": 45, "hbox": 42, } TARGET_UNIT_PRIORITY: dict[str, int] = { "mcv": 100, "harv": 95, "v2rl": 90, "arty": 88, "ftrk": 84, "4tnk": 82, "3tnk": 80, "2tnk": 76, "1tnk": 72, "ttnk": 78, "stnk": 78, "apc": 65, "jeep": 62, "shok": 58, "e4": 52, "e3": 50, "e2": 45, "e1": 40, "dog": 10, } # Initial build order — same sequence the C# AI follows in practice. # Uses "barracks" as a placeholder resolved to tent or barr at runtime. BUILD_ORDER = ["powr", "barracks", "proc", "weap", "powr"] STRUCTURE_PRODUCTION_ACTIVE_DELAY = 25 STRUCTURE_PRODUCTION_INACTIVE_DELAY = 125 STRUCTURE_PRODUCTION_RANDOM_BONUS_DELAY = 10 STRUCTURE_PRODUCTION_RESUME_DELAY = 1500 PLACEMENT_ATTEMPT_INTERVAL = 25 PLACEMENT_CONFIRMATION_DELAY = 300 MAX_FAILED_PLACEMENT_ATTEMPTS = 8 BASE_BUILD_MIN_RADIUS = 2 BASE_BUILD_MAX_RADIUS = 20 DEFENSE_BUILD_MIN_RADIUS = 5 DEFENSE_BUILD_MAX_RADIUS = 20 CHECK_FOR_WATER_RADIUS = 8 NAVAL_WATER_SCAN_STRIDE = 1 NAVAL_WATER_SCAN_RADIUS = 12 NAVAL_MIN_OPEN_WATER_WINDOWS = 1 NAVAL_MIN_WATER_SCORE = 20 NAVAL_EARLY_BUILD_WATER_SCORE = 22 NAVAL_EARLY_BUILD_CREDIT_BUFFER = 250 NAVAL_BUILD_MAX_RADIUS = 24 NAVAL_GATE_CACHE_TICKS = 151 RESOURCE_MAP_UPDATE_INTERVAL = 151 RESOURCE_PATCH_LINK_RADIUS = 3 RESOURCE_PATCH_MIN_CELLS = 2 RESOURCE_PATCH_SEARCH_MARGIN = 8 RESOURCE_PATCH_THREAT_RADIUS = 12 RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS = 14 MAX_REFINERIES_PER_PATCH = 2 NAVAL_CANDIDATE_MIN_COUNT = 1 RESOURCE_PATCH_MEMORY_MATCH_RADIUS = 6 RESOURCE_PATCH_MAX_CAPACITY = 6 ATTACK_FORCE_INTERVAL = 75 RUSH_INTERVAL = 600 RUSH_TICKS = 4000 ASSIGN_ROLES_INTERVAL = 50 HARVESTER_SCAN_INTERVAL = 50 UNIT_FEEDBACK_TIME = 30 STALE_TARGET_REACHED_RADIUS = 8 STALE_TARGET_REDIRECT_LIMIT = 3 PRODUCTION_MIN_CASH_REQUIREMENT = 500 # C# UnitBuilderBotModule@normal relies on FeedbackTime plus queue occupancy; # it does not add extra synthetic cooldowns on top of that. QUEUE_PRODUCTION_DELAYS: dict[str, int] = {} UNIT_PRODUCTION_DELAYS: dict[str, int] = {} FOG_CHANNEL = 4 # spatial-map channel: 0=hidden, 0.5=explored, 1=visible FRONTIER_REFRESH_TICKS = 650 LAST_SEEN_ENEMY_TTL_TICKS = 6000 LAST_SEEN_BASE_TTL_TICKS = 18000 # RA NormalAI does not configure IdleBaseUnitsMaximum for UnitBuilderBotModule@normal, # so avoid damping production just because squads are currently idling near base. QUEUE_IDLE_BASE_CAPS: dict[str, int] = {} IDLE_BASE_UNIT_RADIUS = 15 AIRFIELD_PLANE_CAPACITY = 4 HELIPAD_AIRCRAFT_CAPACITY = 1 INITIAL_HARVESTERS = 4 MINIMUM_EXCESS_POWER = 0 MAXIMUM_EXCESS_POWER = 200 EXCESS_POWER_INCREMENT = 40 EXCESS_POWER_INCREASE_THRESHOLD = 4 INITIAL_MIN_REFINERY_COUNT = 0 ADDITIONAL_MIN_REFINERY_COUNT = 2 NEW_PRODUCTION_CASH_THRESHOLD = 8000 NEW_PRODUCTION_CHANCE = 50 SILO_BUILD_THRESHOLD = 0.8 PROTECT_UNIT_SCAN_RADIUS = 15 PROTECTION_SCAN_RADIUS = 12 PROTECTION_TARGET_BACKOFF_TICKS = 4 HOME_BASE_THREAT_RADIUS = BASE_BUILD_MAX_RADIUS + PROTECTION_SCAN_RADIUS REPAIR_ALL_BUILDINGS_COOLDOWN = 107 REPAIR_REACTIVE_HP_THRESHOLD = 0.67 POWER_TOGGLE_INTERVAL = 150 ATTACK_SCAN_RADIUS = 12 REGROUP_RADIUS = 14 LOCAL_FIGHT_RADIUS = 12 RUSH_ATTACK_SCAN_RADIUS = 15 RETREAT_HEALTH_THRESHOLD = 0.42 MINIMUM_CONSTRUCTION_YARD_COUNT = 2 ADDITIONAL_CONSTRUCTION_YARD_COUNT = 0 BUILD_ADDITIONAL_MCV_CASH_AMOUNT = 5000 SCAN_FOR_NEW_MCV_INTERVAL = 20 BUILD_MCV_INTERVAL = 101 MCV_MIN_DEPLOY_RADIUS = 2 MCV_MAX_DEPLOY_RADIUS = 20 MCV_TARGET_REACHED_RADIUS = 2 MCV_TRY_MAINTAIN_RANGE = 8 MCV_FRIENDLY_CONYARD_DISLIKE_RANGE = 14 MCV_FRIENDLY_REFINERY_DISLIKE_RANGE = 14 MCV_DEPLOY_OFFSET = (-1, -1) EXPANSION_TOLERATE_VALUES = (1, 2) FORCE_EXPANSION_TOLERATE_VALUES = (2, 3) # Match the C# McvExpansionManager default MoveConyardTick cadence. CONYARD_UNDEPLOY_COOLDOWN = 5700 MCV_DEPLOY_COMMAND_COOLDOWN = 800 REQUESTED_REFINERY_TTL = 2400 HARVESTER_THREAT_RADIUS = 10 HARVESTER_RETREAT_COOLDOWN = 120 LOW_EFFECT_HARVESTER_SCAN_INTERVAL = 433 RESOURCE_CELLS_PER_HARVESTER = 4 HARVESTER_PATCH_ASSIGN_RADIUS = 12 HARVESTER_REASSIGN_COOLDOWN = 650 HARVESTER_REASSIGN_REFINERY_RADIUS = 14 HARVESTER_LOW_EFFECT_TIMEOUT = 500 HARVESTER_NO_RESOURCE_COOLDOWN = 300 HARVESTER_PROGRESS_MOVE_THRESHOLD = 2 HARVESTER_LOCAL_RESOURCE_MIN = 0.5 BASE_ATTACK_MEMORY_TICKS = 150 BASE_EMERGENCY_VISIBILITY_RADIUS = 5 ATTACK_POINT_MERGE_RADIUS = 4 POST_CONTACT_WINDOW = 2400 RECOVERY_DURATION = 2600 RECOVERY_TRIGGER_PEAK = 24 RECOVERY_DROP_RATIO = 0.6 RECOVERY_MIN_COMBAT = 16 RECOVERY_EXIT_COMBAT = 24 RECOVERY_HARVESTER_CAP = 2 RECOVERY_CLEAR_CONTACT_GAP = 450 RECOVERY_REFINERY_REBUILD_CREDITS = 2000 HOME_GUARD_MIN_RESERVE = 6 HOME_GUARD_MAX_RESERVE = 10 RUSH_SQUAD_MIN_SIZE = 6 AIR_SQUAD_MIN_SIZE = 2 NAVAL_SQUAD_MIN_SIZE = 2 PROTECTION_SQUAD_MIN_SIZE = 4 RESPOND_TO_ATTACK_COOLDOWN = 30 SQUAD_RETREAT_HOLD_TICKS = 180 SQUAD_RECOVER_HOLD_TICKS = 240 RUSH_COMBAT_TYPES = {"e1", "e3", "apc", "jeep", "1tnk", "2tnk", "3tnk", "arty", "v2rl"} FORCE_COMMIT_UNIT_THRESHOLD = 30 ECONOMY_ATTACK_HOLD_FORCE = 60 TECH_ATTACK_HOLD_FORCE = 70 FORCE_COMMIT_REGROUPS = 3 FORCE_COMMIT_COOLDOWN = 400 FORCE_COMMIT_TIME = 1200 # fallback wave timing (ticks) to mirror NormalAI periodic pushes FORCE_COMMIT_MIN_SIZE = 12 FORCE_COMMIT_GLOBAL_INTERVAL = 1200 # hard periodic wave, closer to NormalAI cadence STALE_TARGET_CLEAR_INTERVAL = 2400 # reduce aggressive stale clearing SEARCH_TARGET_STALL_TICKS = 3600 SQUAD_STUCK_TICKS = 63 NAVAL_FAILURE_DISABLE_TICKS = 12000 MCV_EXPANSION_MODES = ("resource", "base", "current") MCV_EXPANSION_MODE_FAILURES = 2 FUZZY_ANY_HEALTH = ("NearDead", "Injured", "Normal") FUZZY_ANY_POWER = ("Weak", "Equal", "Strong") FUZZY_ANY_SPEED = ("Slow", "Equal", "Fast") FUZZY_DEFAULT_RULES = ( (("Normal",), FUZZY_ANY_HEALTH, FUZZY_ANY_POWER, FUZZY_ANY_SPEED, "Attack"), (("Injured",), ("NearDead",), FUZZY_ANY_POWER, FUZZY_ANY_SPEED, "Attack"), (("Injured",), ("Injured", "Normal"), ("Equal", "Strong"), FUZZY_ANY_SPEED, "Attack"), (("Injured",), ("Injured", "Normal"), ("Weak",), ("Slow",), "Attack"), (("Injured",), ("Injured", "Normal"), ("Weak",), ("Equal", "Fast"), "Flee"), (("Injured",), FUZZY_ANY_HEALTH, FUZZY_ANY_POWER, ("Slow",), "Attack"), (("NearDead",), ("NearDead", "Injured"), ("Equal", "Strong"), ("Slow", "Equal"), "Attack"), (("NearDead",), ("NearDead", "Injured"), ("Weak",), ("Equal", "Fast"), "Flee"), (("NearDead",), ("Normal",), ("Weak",), ("Equal", "Fast"), "Flee"), (("NearDead",), ("Normal",), ("Equal", "Strong"), ("Fast",), "Flee"), (("NearDead",), ("Injured",), ("Equal",), ("Fast",), "Flee"), ) FUZZY_RUSH_RULES = ( (("Normal",), FUZZY_ANY_HEALTH, ("Strong",), FUZZY_ANY_SPEED, "Attack"), (("Normal",), FUZZY_ANY_HEALTH, ("Weak", "Equal"), FUZZY_ANY_SPEED, "Flee"), *FUZZY_DEFAULT_RULES[1:], ) class NormalAIBot: """Python reimplementation of OpenRA's Normal AI.""" def __init__(self, verbose: bool = False): self.verbose = verbose self.phase = "deploy_mcv" # Base building self._build_index = 0 self._placement_count = 0 self._deploy_issued = False self._last_build_tick: int = -9999 self._next_build_check_tick = 0 self._placement_fail_counts: dict[str, int] = {} self._placement_pending: dict[str, tuple[str, int, int]] = {} self._placement_backoff_until: dict[str, int] = {} self._placement_backoff_snapshot: dict[str, tuple[int, int]] = {} self._next_placement_attempt_tick: dict[str, int] = {} self._naval_retry_buildable_count = -1 self._naval_disabled_until = -9999 # Rally self._rally_set: set[int] = set() # Squads self._attack_squad: list[int] = [] self._protection_squad: list[int] = [] self._rush_squad: list[int] = [] self._air_squad: list[int] = [] self._naval_squad: list[int] = [] self._idle_ground_units: list[int] = [] self._temporary_defenders: set[int] = set() self._last_attack_tick = 0 self._last_attack_eval_tick = -random.randrange(ATTACK_FORCE_INTERVAL) if ATTACK_FORCE_INTERVAL > 0 else 0 rush_jitter = max(1, RUSH_INTERVAL // 20) self._last_rush_tick = random.randint(-rush_jitter, rush_jitter) self._last_assign_tick = -random.randrange(ASSIGN_ROLES_INTERVAL) if ASSIGN_ROLES_INTERVAL > 0 else 0 self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS self._protect_from_actor_id = 0 self._protect_from_kind = "point" self._protect_from_point: Optional[Tuple[int, int]] = None self._protect_from_until = -9999 self._respond_to_attack_cooldown_until = -9999 self._assault_threshold = self._roll_assault_threshold() self._squad_states: dict[str, str] = { "assault": "assemble", "protection": "assemble", "rush": "assemble", "air": "assemble", "naval": "assemble", } self._squad_state_until: dict[str, int] = {} self._squad_regroup_count: dict[str, int] = {} self._squad_last_commit_tick: dict[str, int] = {} self._squad_target_actor_id: dict[str, int] = {} self._squad_target_kind: dict[str, str] = {} self._squad_target_point: dict[str, Tuple[int, int]] = {} self._enemy_base_pos: Optional[Tuple[int, int]] = None # Bridge only exposes currently visible enemies; keep a short-lived memory. self._last_seen_enemy_pos: Optional[Tuple[int, int]] = None self._last_seen_enemy_tick: int = -9999 self._last_seen_base_pos: Optional[Tuple[int, int]] = None self._last_seen_base_tick: int = -9999 self._rush_target_pos: Optional[Tuple[int, int]] = None self._rush_target_actor_id = 0 self._rush_target_kind = "point" self._stale_attack_target: Optional[Tuple[int, int]] = None self._stale_attack_redirects = 0 self._search_target: Optional[Tuple[int, int]] = None self._search_target_started_tick = -9999 self._squad_last_progress_tick: dict[str, int] = {} self._squad_last_progress_pos: dict[str, Tuple[int, int]] = {} self._squad_last_target_point: dict[str, Tuple[int, int]] = {} self._squad_leader_id: dict[str, int] = {} self._attack_commands_issued = 0 self._attack_move_commands_issued = 0 self._unit_target_events = 0 self._building_target_events = 0 self._unique_unit_targets: set[int] = set() self._unique_building_targets: set[int] = set() # Repair / power self._repair_issued: set[int] = set() self._reactive_repair_targets: set[int] = set() self._last_repair_tick = -9999 self._powered_down: dict[int, int] = {} self._last_power_toggle_tick = -9999 # Economy / production self._last_harvester_scan_tick = -9999 self._last_harvester_reassign_tick = -9999 self._last_unit_tick = -9999 self._current_queue_index = -1 self._unit_requests: list[str] = [] self._queue_delay_until: dict[str, int] = {} self._unit_delay_until: dict[str, int] = {} self._last_mcv_scan_tick = random.randrange(SCAN_FOR_NEW_MCV_INTERVAL) if SCAN_FOR_NEW_MCV_INTERVAL > 0 else 0 self._last_mcv_build_tick = random.randrange(BUILD_MCV_INTERVAL) if BUILD_MCV_INTERVAL > 0 else 0 self._last_conyard_undeploy_tick = random.randrange(CONYARD_UNDEPLOY_COOLDOWN) if CONYARD_UNDEPLOY_COOLDOWN > 0 else 0 self._mcv_targets: dict[int, tuple[int, int]] = {} self._mcv_resource_targets: dict[int, tuple[int, int]] = {} self._requested_refineries: dict[int, tuple[tuple[int, int], tuple[int, int], int]] = {} self._mcv_deploy_until: dict[int, int] = {} self._harvester_retreat_until: dict[int, int] = {} self._harvester_recent_damage_until: dict[int, int] = {} self._harvester_reassign_until: dict[int, int] = {} self._harvester_patch_targets: dict[int, tuple[int, int]] = {} self._harvester_last_cells: dict[int, tuple[int, int]] = {} self._harvester_last_progress_tick: dict[int, int] = {} self._harvester_no_resource_until: dict[int, int] = {} self._expansion_refinery_goal = 0 self._expansion_refinery_until_tick = -9999 self._combat_peak = 0 self._last_contact_tick = -9999 self._recovery_until_tick = -9999 # Map self._cached_map_size: Optional[Tuple[int, int]] = None self._candidate_targets: list[Tuple[int, int]] = [] self._target_index = 0 self._frontier_cache_tick = -9999 self._recent_attack_points: list[tuple[int, int, int]] = [] self._previous_building_hp: dict[int, float] = {} self._previous_unit_hp: dict[int, float] = {} self._spatial_raw: bytes = b"" self._spatial_channels = 0 self._last_spatial_update_tick = -9999 self._resource_patches: list[dict[str, float | int]] = [] self._resource_patch_memory: dict[tuple[int, int], dict[str, float | int]] = {} self._last_naval_gate_tick = -9999 self._cached_naval_gate_ok = False self._mcv_expansion_mode = MCV_EXPANSION_MODES[0] self._mcv_expansion_failures = 0 def decide(self, obs: OpenRAObservation) -> OpenRAAction: commands: List[CommandModel] = [] self._update_map_size(obs) self._update_spatial_analysis(obs) self._update_enemy_memory(obs) self._update_phase(obs) self._cleanup_dead(obs) self._update_damage_memory(obs) self._update_post_contact_state(obs) commands.extend(self._handle_placement(obs)) if self.phase == "deploy_mcv": cmd = self._handle_deploy(obs) if cmd: commands.append(cmd) commands.extend(self._handle_rally_points(obs)) commands.extend(self._manage_power(obs)) commands.extend(self._manage_repairs(obs)) commands.extend(self._manage_economy(obs)) commands.extend(self._manage_expansion(obs)) commands.extend(self._manage_base_building(obs)) commands.extend(self._manage_unit_production(obs)) commands.extend(self._manage_squads(obs)) if not commands: commands.append(CommandModel(action=ActionType.NO_OP)) return OpenRAAction(commands=commands) def _update_enemy_memory(self, obs: OpenRAObservation) -> None: """Update last-seen enemy memory from currently visible enemies/buildings.""" if not (obs.visible_enemies or obs.visible_enemy_buildings): return best = self._pick_priority_target(obs, None, None, local_only=False, squad_name="assault") if best is None: # Fallback to any visible contact if obs.visible_enemy_buildings: b = obs.visible_enemy_buildings[0] self._last_seen_enemy_pos = (b.cell_x, b.cell_y) self._last_seen_enemy_tick = obs.tick self._last_seen_base_pos = (b.cell_x, b.cell_y) self._last_seen_base_tick = obs.tick elif obs.visible_enemies: e = obs.visible_enemies[0] self._last_seen_enemy_pos = (e.cell_x, e.cell_y) self._last_seen_enemy_tick = obs.tick return _, tx, ty, _, kind = best self._last_seen_enemy_pos = (tx, ty) self._last_seen_enemy_tick = obs.tick if kind == "building": self._last_seen_base_pos = (tx, ty) self._last_seen_base_tick = obs.tick # ── Phase ───────────────────────────────────────────────────── def _update_phase(self, obs: OpenRAObservation): has_cy = any(b.type == "fact" for b in obs.buildings) if self.phase == "deploy_mcv" and has_cy: self.phase = "build_base" self._log("Phase -> build_base") elif self.phase == "build_base": has_barracks = any(b.type in BARRACKS_TYPES for b in obs.buildings) if has_barracks: self.phase = "produce" self._log("Phase -> produce") elif self.phase == "produce": combat = [u for u in obs.units if u.type in COMBAT_TYPES] if self._build_index >= len(BUILD_ORDER): self.phase = "active" self._log(f"Phase -> active ({len(combat)} combat units)") # ── Deploy MCV ──────────────────────────────────────────────── def _handle_deploy(self, obs: OpenRAObservation) -> Optional[CommandModel]: if self._deploy_issued: return None mcv = next((u for u in obs.units if u.type == "mcv"), None) if mcv: self._deploy_issued = True self._log(f"Deploying MCV #{mcv.actor_id}") return CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id) return None # ── Building placement ──────────────────────────────────────── def _handle_placement(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] cy = self._find_building(obs, "fact") if not cy: return commands active_queues: set[str] = set() counts = self._building_counts(obs) for prod in obs.production: if self._is_structure_queue(prod.queue_type) and prod.progress >= 0.99: queue_type = prod.queue_type active_queues.add(queue_type) pending = self._placement_pending.get(queue_type) current_count = counts.get(self._canonical_building_type(prod.item), 0) if pending is not None: pending_item, pending_count, pending_tick = pending if pending_item == prod.item: if current_count > pending_count: self._placement_fail_counts[queue_type] = 0 self._placement_pending.pop(queue_type, None) continue if obs.tick < pending_tick + PLACEMENT_CONFIRMATION_DELAY: continue self._placement_fail_counts[queue_type] = self._placement_fail_counts.get(queue_type, 0) + 1 self._placement_pending.pop(queue_type, None) elif pending_item != prod.item: self._placement_pending.pop(queue_type, None) self._placement_fail_counts.pop(queue_type, None) if self._queue_backoff_active(queue_type, obs): if obs.tick >= self._next_placement_attempt_tick.get(queue_type, -9999): commands.append(CommandModel(action=ActionType.CANCEL_PRODUCTION, item_type=prod.item)) self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL continue if self._placement_fail_counts.get(queue_type, 0) >= MAX_FAILED_PLACEMENT_ATTEMPTS: commands.append(CommandModel(action=ActionType.CANCEL_PRODUCTION, item_type=prod.item)) if self._canonical_building_type(prod.item) in NAVAL_STRUCTURE_TYPES: self._naval_retry_buildable_count = self._buildable_area_structure_count(obs) self._naval_disabled_until = obs.tick + NAVAL_FAILURE_DISABLE_TICKS self._cached_naval_gate_ok = False self._last_naval_gate_tick = obs.tick else: self._placement_backoff_until[queue_type] = obs.tick + STRUCTURE_PRODUCTION_RESUME_DELAY self._placement_backoff_snapshot[queue_type] = ( len(obs.buildings), sum(1 for b in obs.buildings if b.type == "fact"), ) self._placement_fail_counts[queue_type] = 0 self._placement_pending.pop(queue_type, None) self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL self._rewind_build_order_after_cancel(obs, prod.item) if self._canonical_building_type(prod.item) in NAVAL_STRUCTURE_TYPES: self._log(f"Canceling {prod.item} after repeated placement failures; disabling naval production") else: self._log(f"Canceling {prod.item} after repeated placement failures; backing off {queue_type} queue") continue if obs.tick < self._next_placement_attempt_tick.get(queue_type, -9999): continue location = self._placement_offset(obs, cy, prod.item) if location is None: self._placement_fail_counts[queue_type] = self._placement_fail_counts.get(queue_type, 0) + 1 self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL continue x, y = location commands.append(CommandModel( action=ActionType.PLACE_BUILDING, item_type=prod.item, target_x=x, target_y=y, )) self._placement_count += 1 self._placement_pending[queue_type] = (prod.item, current_count, obs.tick) self._next_placement_attempt_tick[queue_type] = obs.tick + PLACEMENT_ATTEMPT_INTERVAL for queue_type in list(self._placement_pending): if queue_type not in active_queues: self._placement_pending.pop(queue_type, None) self._placement_fail_counts.pop(queue_type, None) return commands def _placement_offset( self, obs: OpenRAObservation, cy: BuildingInfoModel, item_type: str, ) -> Optional[Tuple[int, int]]: center = self._placement_base_center(obs) if center is None: center = self._building_top_left(cy) center = self._placement_anchor(obs, item_type, center) cx, cy_y = center queue_type = self._structure_queue_type(item_type) retry_index = self._placement_fail_counts.get(queue_type, 0) min_radius = DEFENSE_BUILD_MIN_RADIUS if item_type in ENEMY_FACING_STRUCTURE_TYPES else BASE_BUILD_MIN_RADIUS max_radius = DEFENSE_BUILD_MAX_RADIUS if item_type in ENEMY_FACING_STRUCTURE_TYPES else BASE_BUILD_MAX_RADIUS if item_type not in ENEMY_FACING_STRUCTURE_TYPES: max_radius += min(retry_index * 4, 20) candidates = self._placement_candidates(obs, item_type, cx, cy_y, min_radius, max_radius) if not candidates: return None if item_type == "proc": plan = self._best_refinery_plan(obs) target = plan["target"] if plan is not None else center refineries = [b for b in obs.buildings if b.type == "proc"] candidates.sort( key=lambda p: ( self._resource_amount_at(*p) > 0.0, -self._local_resource_score(p[0], p[1], 4), self._cell_distance(p[0], p[1], target[0], target[1]), -self._nearest_distance_to_buildings(p[0], p[1], refineries), self._cell_distance(p[0], p[1], cx, cy_y), ) ) idx = (self._placement_count + retry_index * 5) % min(len(candidates), 16) if plan is not None and "request_id" in plan: self._consume_requested_refinery(plan["request_id"]) # type: ignore[arg-type] return candidates[idx] if item_type in NAVAL_STRUCTURE_TYPES: candidates.sort( key=lambda p: ( -self._local_water_score(p[0], p[1], 2), self._cell_distance(p[0], p[1], cx, cy_y), ) ) idx = (self._placement_count + retry_index * 5) % min(len(candidates), 16) return candidates[idx] if item_type in ENEMY_FACING_STRUCTURE_TYPES and self._enemy_base_pos is not None: tx, ty = self._enemy_base_pos candidates.sort(key=lambda p: ((p[0] - tx) ** 2 + (p[1] - ty) ** 2, (p[0] - cx) ** 2 + (p[1] - cy_y) ** 2)) idx = (self._placement_count + retry_index * 5) % min(len(candidates), 16) return candidates[idx] idx = (self._placement_count + retry_index * 7) % len(candidates) return candidates[idx] # ── Rally points ────────────────────────────────────────────── def _handle_rally_points(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] cy = self._find_building(obs, "fact") if not cy: return commands rally_x = cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024 rally_y = cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024 for b in obs.buildings: if b.type in ("tent", "barr", "weap") and b.actor_id not in self._rally_set: commands.append(CommandModel( action=ActionType.SET_RALLY_POINT, actor_id=b.actor_id, target_x=rally_x, target_y=rally_y, )) self._rally_set.add(b.actor_id) return commands # ── Base Building (fixed order then dynamic) ────────────────── def _manage_base_building(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] if self.phase == "deploy_mcv": return commands if obs.tick < self._next_build_check_tick: return commands credits = self._available_credits(obs) if credits < PRODUCTION_MIN_CASH_REQUIREMENT: self._schedule_next_build_check(obs, active=False) return commands # Phase 1: follow the fixed build order if self._build_index < len(BUILD_ORDER): item = self._resolve_build_item(obs, BUILD_ORDER[self._build_index]) if item is None: self._schedule_next_build_check(obs, active=False) return commands if not self._structure_queue_available(obs, item): self._schedule_next_build_check(obs, active=False) return commands if self._structure_queue_busy(obs, item): self._schedule_next_build_check(obs, active=False) return commands if self._already_have(obs, item, self._build_index): self._build_index += 1 return commands if self._can_produce(obs, item): cost = self._build_cost(item) if credits >= cost: self._log( f"Building {item} [{self._build_index+1}/{len(BUILD_ORDER)}] " f"({self._credits_str(obs)})" ) commands.append(CommandModel(action=ActionType.BUILD, item_type=item)) self._last_build_tick = obs.tick self._schedule_next_build_check(obs, active=True) self._build_index += 1 else: self._schedule_next_build_check(obs, active=False) else: self._schedule_next_build_check(obs, active=False) return commands # Phase 2: dynamic base building driven by the normal AI priorities. remaining_credits = credits picker = self._choose_recovery_building if self._in_recovery_mode(obs) else self._choose_dynamic_building for queue_type in ("Building", "Defense"): item = picker(obs, queue_type=queue_type) if item is None: continue if not self._structure_queue_available(obs, item): continue if self._structure_queue_busy(obs, item): continue if not self._can_produce(obs, item): continue cost = self._build_cost(item) if remaining_credits < cost: continue self._log(f"Building {item} (dynamic, {self._credits_str(obs)})") commands.append(CommandModel(action=ActionType.BUILD, item_type=item)) remaining_credits -= cost if item == "proc": self._clear_expansion_refinery_need() if commands: self._last_build_tick = obs.tick self._schedule_next_build_check(obs, active=True) else: self._schedule_next_build_check(obs, active=False) return commands def _resolve_build_item(self, obs: OpenRAObservation, placeholder: str) -> Optional[str]: variants = BUILDING_VARIANT_CHOICES.get(placeholder) if variants is not None: for btype in variants: if self._can_produce(obs, btype): return btype return None return placeholder def _already_have(self, obs: OpenRAObservation, item: str, idx: int) -> bool: count = sum(1 for b in obs.buildings if b.type == item) target = sum(1 for i, p in enumerate(BUILD_ORDER[:idx+1]) if self._resolve_build_item(obs, p) == item) return count >= target def _choose_dynamic_building( self, obs: OpenRAObservation, queue_type: str = "Building", ) -> Optional[str]: bldg_counts = self._building_counts(obs) credits = self._available_credits(obs) power_balance = obs.economy.power_provided - obs.economy.power_drained minimum_excess_power = self._minimum_excess_power_target(obs) power_item = self._best_power_building(obs) if queue_type == "Defense": total_buildings = max(1, len(obs.buildings)) candidates = list(BUILDING_FRACTIONS.keys()) random.shuffle(candidates) for item in candidates: if BUILDING_DELAYS.get(item, 0) > obs.tick: continue resolved_item = self._resolve_build_item(obs, item) if resolved_item is None or not self._can_produce(obs, resolved_item): continue canonical_item = self._canonical_building_type(resolved_item) if canonical_item not in DEFENSE_STRUCTURE_TYPES: continue if not self._structure_queue_available(obs, resolved_item): continue count = bldg_counts.get(canonical_item, 0) limit = BUILDING_LIMITS.get(canonical_item) if limit is not None and count >= limit: continue if count * 100 > BUILDING_FRACTIONS[item] * total_buildings: continue return resolved_item return None if power_balance < minimum_excess_power and power_item: return power_item naval_item = self._preferred_early_naval_building(obs, credits) if naval_item is not None: return naval_item if not self._has_adequate_refinery_count(obs): if self._can_produce(obs, "proc"): return "proc" return power_item if ( self._expansion_refinery_pending(obs) and self._can_produce(obs, "proc") and self._structure_queue_available(obs, "proc") ): return "proc" if ( credits > NEW_PRODUCTION_CASH_THRESHOLD and random.randrange(100) < NEW_PRODUCTION_CHANCE ): production = self._best_production_building(obs) if production: return production if ( credits > NEW_PRODUCTION_CASH_THRESHOLD and random.randrange(100) < NEW_PRODUCTION_CHANCE and self._can_safely_build_naval_structure(obs) ): naval_production = self._best_naval_production_building(obs) if naval_production: return naval_production if ( obs.economy.resource_capacity > 0 and obs.economy.ore >= obs.economy.resource_capacity * SILO_BUILD_THRESHOLD and self._can_produce(obs, "silo") and self._structure_queue_available(obs, "silo") ): return "silo" total_buildings = max(1, len(obs.buildings)) candidates = list(BUILDING_FRACTIONS.keys()) random.shuffle(candidates) for item in candidates: if BUILDING_DELAYS.get(item, 0) > obs.tick: continue resolved_item = self._resolve_build_item(obs, item) if resolved_item is None or not self._can_produce(obs, resolved_item): continue canonical_item = self._canonical_building_type(resolved_item) if canonical_item in NAVAL_STRUCTURE_TYPES or canonical_item in DEFENSE_STRUCTURE_TYPES: continue if not self._structure_queue_available(obs, resolved_item): continue count = bldg_counts.get(canonical_item, 0) limit = BUILDING_LIMITS.get(canonical_item) if limit is not None and count >= limit: continue if count * 100 > BUILDING_FRACTIONS[item] * total_buildings: continue return resolved_item return None def _choose_recovery_building( self, obs: OpenRAObservation, queue_type: str = "Building", ) -> Optional[str]: bldg_counts = self._building_counts(obs) credits = self._available_credits(obs) power_balance = obs.economy.power_provided - obs.economy.power_drained minimum_excess_power = self._minimum_excess_power_target(obs) power_item = self._best_power_building(obs) if queue_type == "Defense": if not self._base_under_pressure(obs): return None defense_count = sum(bldg_counts.get(item, 0) for item in ("ftur", "gun", "pbox")) defense_cap = 1 if self._combat_unit_count(obs) < RECOVERY_EXIT_COMBAT else 2 if defense_count >= defense_cap: return None for item in ("ftur", "gun", "pbox"): if not self._can_produce(obs, item): continue if not self._structure_queue_available(obs, item): continue limit = BUILDING_LIMITS.get(item) if limit is not None and bldg_counts.get(item, 0) >= limit: continue return item return None if power_balance < minimum_excess_power and power_item: return power_item refinery_count = bldg_counts.get("proc", 0) if ( refinery_count == 0 and not self._base_under_pressure(obs) and credits >= RECOVERY_REFINERY_REBUILD_CREDITS and self._can_produce(obs, "proc") and self._structure_queue_available(obs, "proc") ): return "proc" if not any(b.type in WAR_FACTORY_TYPES for b in obs.buildings): if self._can_produce(obs, "weap") and self._structure_queue_available(obs, "weap"): return "weap" if not any(b.type in BARRACKS_TYPES for b in obs.buildings): barracks = self._resolve_build_item(obs, "barracks") if barracks and self._can_produce(obs, barracks) and self._structure_queue_available(obs, barracks): return barracks if self._base_under_pressure(obs): return power_item if power_balance < 0 and power_item else None return None # ── Unit Production ─────────────────────────────────────────── def _manage_unit_production(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] if self.phase == "deploy_mcv": return commands if obs.tick - self._last_unit_tick < UNIT_FEEDBACK_TIME: return commands credits = self._available_credits(obs) if credits < PRODUCTION_MIN_CASH_REQUIREMENT: return commands requested = self._queue_requested_unit(obs) if requested: self._last_unit_tick = obs.tick commands.append(requested) return commands if not any(self._is_structure_queue(p.queue_type) for p in obs.production): structure_reservation = self._priority_structure_reservation(obs) if structure_reservation > 0 and credits < structure_reservation: return commands self._last_unit_tick = obs.tick for _ in range(len(UNIT_QUEUE_ORDER)): self._current_queue_index = (self._current_queue_index + 1) % len(UNIT_QUEUE_ORDER) queue_type, allowed = UNIT_QUEUE_ORDER[self._current_queue_index] if any(p.queue_type == queue_type for p in obs.production): continue if self._queue_delay_active(obs, queue_type): continue unit = self._pick_unit(obs, allowed) if unit: commands.append(CommandModel(action=ActionType.TRAIN, item_type=unit)) self._mark_unit_trained(obs, unit, queue_type) break return commands def _pick_unit(self, obs: OpenRAObservation, allowed: set[str]) -> Optional[str]: unit_counts: dict[str, int] = {} total_units = 0 for u in obs.units: unit_counts[u.type] = unit_counts.get(u.type, 0) + 1 if u.type in UNITS_TO_BUILD: total_units += 1 for p in obs.production: unit_counts[p.item] = unit_counts.get(p.item, 0) + 1 if p.item in UNITS_TO_BUILD: total_units += 1 for item in self._unit_requests: unit_counts[item] = unit_counts.get(item, 0) + 1 if item in UNITS_TO_BUILD: total_units += 1 desired: Optional[str] = None desired_error = float("inf") candidates = list(allowed) random.shuffle(candidates) for utype in candidates: if utype not in allowed: continue if not self._can_produce(obs, utype): continue if self._unit_at_limit(obs, utype): continue share = self._desired_unit_share(obs, utype, unit_counts) if share <= 0: continue count = unit_counts.get(utype, 0) error = (count * 100 / total_units - share) if total_units > 0 else -1 if error < 0: return utype if error < desired_error: desired_error = error desired = utype return desired # ── Economy ────────────────────────────────────────────────── def _manage_economy(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] if self.phase == "deploy_mcv": return commands if obs.tick - self._last_harvester_scan_tick < HARVESTER_SCAN_INTERVAL: return commands self._last_harvester_scan_tick = obs.tick reassignment_commands, redirected_harvesters = self._reassign_low_effect_harvesters(obs) commands.extend(reassignment_commands) patch_states = self._resource_patch_states(obs) for u in obs.units: if u.type != "harv": continue self._update_harvester_progress(obs, u) no_resource_target = self._harvester_patch_targets.get(u.actor_id) if ( no_resource_target is not None and self._local_resource_score(no_resource_target[0], no_resource_target[1], 2) <= HARVESTER_LOCAL_RESOURCE_MIN ): self._harvester_no_resource_until[u.actor_id] = obs.tick + HARVESTER_NO_RESOURCE_COOLDOWN self._harvester_patch_targets.pop(u.actor_id, None) if u.actor_id in redirected_harvesters: continue recent_damage = self._harvester_recently_damaged(obs, u.actor_id) threat = self._nearest_enemy_to_unit(obs, u, HARVESTER_THREAT_RADIUS) if threat is not None: self._last_contact_tick = obs.tick current_target = self._harvester_patch_targets.get(u.actor_id) if current_target is not None: threatened_state = self._nearest_patch_state( patch_states, current_target[0], current_target[1], HARVESTER_PATCH_ASSIGN_RADIUS, allow_fallback=True, ) if threatened_state is not None and int(threatened_state["threat"]) > 0: self._harvester_patch_targets.pop(u.actor_id, None) if threat is not None or recent_damage: fallback = self._pick_harvester_retreat_point(obs, u, patch_states=patch_states) if fallback is not None and ( threat is not None or self._cell_distance(u.cell_x, u.cell_y, fallback[0], fallback[1]) > 2 ): commands.append(CommandModel( action=ActionType.MOVE, actor_id=u.actor_id, target_x=fallback[0], target_y=fallback[1], )) self._harvester_retreat_until[u.actor_id] = obs.tick + HARVESTER_RETREAT_COOLDOWN self._harvester_reassign_until[u.actor_id] = obs.tick + HARVESTER_REASSIGN_COOLDOWN self._harvester_last_progress_tick[u.actor_id] = obs.tick continue if self._is_low_effect_harvester(obs, u): fallback_target = self._fallback_harvest_target( obs, u, patch_states=patch_states, prefer_safe=recent_damage, exclude_target=self._harvester_patch_targets.get(u.actor_id), ) if fallback_target is not None: commands.append(CommandModel( action=ActionType.HARVEST, actor_id=u.actor_id, target_x=fallback_target[0], target_y=fallback_target[1], )) self._harvester_patch_targets[u.actor_id] = fallback_target self._harvester_reassign_until[u.actor_id] = obs.tick + HARVESTER_REASSIGN_COOLDOWN self._harvester_last_progress_tick[u.actor_id] = obs.tick redirected_harvesters.add(u.actor_id) continue self._harvester_no_resource_until[u.actor_id] = obs.tick + HARVESTER_NO_RESOURCE_COOLDOWN if u.is_idle: target = self._harvester_patch_targets.get(u.actor_id) target_state = None if target is not None: target_state = self._nearest_patch_state( patch_states, target[0], target[1], HARVESTER_PATCH_ASSIGN_RADIUS, allow_fallback=True, ) if ( target is not None and self._local_resource_score(target[0], target[1], 2) > HARVESTER_LOCAL_RESOURCE_MIN and ( target_state is None or (int(target_state["threat"]) == 0 and float(target_state["depletion_ratio"]) < 0.95) ) ): commands.append( CommandModel( action=ActionType.HARVEST, actor_id=u.actor_id, target_x=target[0], target_y=target[1], ) ) self._harvester_last_progress_tick[u.actor_id] = obs.tick else: self._harvester_patch_targets.pop(u.actor_id, None) target = self._fallback_harvest_target( obs, u, patch_states=patch_states, prefer_safe=recent_damage or self._base_under_pressure(obs), ) if target is not None: commands.append( CommandModel( action=ActionType.HARVEST, actor_id=u.actor_id, target_x=target[0], target_y=target[1], ) ) self._harvester_patch_targets[u.actor_id] = target self._harvester_last_progress_tick[u.actor_id] = obs.tick else: commands.append(CommandModel(action=ActionType.HARVEST, actor_id=u.actor_id)) self._harvester_last_progress_tick[u.actor_id] = obs.tick self._ensure_harvester_requests(obs) return commands # ── Expansion ──────────────────────────────────────────────── def _manage_expansion(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] if self.phase == "deploy_mcv" or self._in_recovery_mode(obs): return commands if obs.tick - self._last_mcv_build_tick >= BUILD_MCV_INTERVAL: self._last_mcv_build_tick = obs.tick self._ensure_mcv_requests(obs) if obs.tick - self._last_mcv_scan_tick < SCAN_FOR_NEW_MCV_INTERVAL: return commands self._last_mcv_scan_tick = obs.tick undeploy_command = self._maybe_undeploy_conyard_for_expansion(obs) if undeploy_command is not None: commands.append(undeploy_command) conyards = [b for b in obs.buildings if b.type == "fact"] active_mcvs = [u for u in obs.units if u.type == "mcv"] if len(conyards) >= MINIMUM_CONSTRUCTION_YARD_COUNT and not active_mcvs: return commands for mcv in active_mcvs: if obs.tick < self._mcv_deploy_until.get(mcv.actor_id, -9999): continue target = self._mcv_targets.get(mcv.actor_id) resource_target = self._mcv_resource_targets.get(mcv.actor_id) if target is not None and self._cell_distance(mcv.cell_x, mcv.cell_y, *target) <= MCV_TARGET_REACHED_RADIUS: if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y): self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})") self._remember_requested_refinery( obs, mcv.actor_id, (mcv.cell_x, mcv.cell_y), resource_target or target, ) self._remember_expansion_refinery_need(obs) self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id)) self._mcv_targets.pop(mcv.actor_id, None) self._mcv_resource_targets.pop(mcv.actor_id, None) continue self._mcv_targets.pop(mcv.actor_id, None) self._mcv_resource_targets.pop(mcv.actor_id, None) target = None if not mcv.is_idle: continue if target is None: expansion_target = self._pick_expansion_target(obs) if expansion_target is None: if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y): self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})") self._remember_requested_refinery( obs, mcv.actor_id, (mcv.cell_x, mcv.cell_y), resource_target or (mcv.cell_x, mcv.cell_y), ) self._remember_expansion_refinery_need(obs) self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id)) self._mcv_resource_targets.pop(mcv.actor_id, None) continue target = self._best_mcv_deploy_target(obs, mcv, expansion_target) if target is None: if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y): self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})") self._remember_requested_refinery( obs, mcv.actor_id, (mcv.cell_x, mcv.cell_y), resource_target or expansion_target, ) self._remember_expansion_refinery_need(obs) self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id)) self._mcv_resource_targets.pop(mcv.actor_id, None) continue self._mcv_targets[mcv.actor_id] = target self._mcv_resource_targets[mcv.actor_id] = expansion_target self._log(f"Dispatching MCV #{mcv.actor_id} -> {target} for expansion {expansion_target}") if self._cell_distance(mcv.cell_x, mcv.cell_y, *target) <= MCV_TARGET_REACHED_RADIUS: if self._can_mcv_deploy_at(obs, mcv.cell_x, mcv.cell_y): self._log(f"Deploying expansion MCV #{mcv.actor_id} at ({mcv.cell_x}, {mcv.cell_y})") self._remember_requested_refinery( obs, mcv.actor_id, (mcv.cell_x, mcv.cell_y), resource_target or target, ) self._remember_expansion_refinery_need(obs) self._mcv_deploy_until[mcv.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN commands.append(CommandModel(action=ActionType.DEPLOY, actor_id=mcv.actor_id)) self._mcv_targets.pop(mcv.actor_id, None) self._mcv_resource_targets.pop(mcv.actor_id, None) continue commands.append(CommandModel( action=ActionType.MOVE, actor_id=mcv.actor_id, target_x=target[0], target_y=target[1], )) return commands def _ensure_harvester_requests(self, obs: OpenRAObservation): target = self._harvester_target(obs) current = sum(1 for u in obs.units if u.type == "harv") current += sum(1 for p in obs.production if p.item == "harv") current += self._requested_production_count("harv") if current < target: self._request_unit_production("harv") def _hold_attack_for_economy(self, obs: OpenRAObservation) -> bool: if self._base_under_pressure(obs): return False if self._current_requested_refinery(obs) is not None or self._expansion_refinery_pending(obs): return True if self._tech_anchor_pending(obs) and self._combat_unit_count(obs) < TECH_ATTACK_HOLD_FORCE: return True expanding = bool(self._mcv_targets) or any(u.type == "mcv" for u in obs.units) expanding = expanding or any(p.item == "mcv" for p in obs.production) if expanding and self._combat_unit_count(obs) < ECONOMY_ATTACK_HOLD_FORCE: return True return False def _tech_anchor_pending(self, obs: OpenRAObservation) -> bool: if not self._can_produce(obs, "dome"): return False if any(self._canonical_building_type(b.type) == "dome" for b in obs.buildings): return False if any(p.item == "dome" for p in obs.production): return False return True def _request_unit_production(self, item_type: str): if self._requested_production_count(item_type) == 0: self._unit_requests.append(item_type) self._log(f"Requesting {item_type} production") def _requested_production_count(self, item_type: str) -> int: return sum(1 for item in self._unit_requests if item == item_type) def _queue_requested_unit(self, obs: OpenRAObservation) -> Optional[CommandModel]: while self._unit_requests: item_type = self._unit_requests.pop(0) if item_type == "harv": current = self._current_unit_count(obs, "harv") pending = sum(1 for p in obs.production if p.item == "harv") queued = self._requested_production_count("harv") if current + pending + queued >= self._harvester_target(obs): continue if self._should_delay_harvester_request(obs, current): return None queue_type = self._queue_type_for_unit(item_type) if queue_type is None: continue if item_type not in {"harv", "mcv"} and self._queue_delay_active(obs, queue_type): return None if item_type not in {"harv", "mcv"} and self._unit_delay_active(obs, item_type): return None if any(p.queue_type == queue_type for p in obs.production): return None if not self._can_produce(obs, item_type): return None if not self._production_support_available(obs, item_type): return None if self._unit_at_limit(obs, item_type): continue self._mark_unit_trained(obs, item_type, queue_type) self._log(f"Training {item_type} (requested)") return CommandModel(action=ActionType.TRAIN, item_type=item_type) return None # ── Squads ──────────────────────────────────────────────────── def _manage_squads(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] if self.phase in ("deploy_mcv", "build_base"): return commands self._temporary_defenders = set() if obs.tick - self._last_assign_tick >= ASSIGN_ROLES_INTERVAL: self._last_assign_tick = obs.tick self._assign_squad_roles(obs) commands.extend(self._handle_defense(obs)) if obs.tick - self._last_attack_eval_tick >= ATTACK_FORCE_INTERVAL: self._last_attack_eval_tick = obs.tick commands.extend(self._handle_attack(obs)) commands.extend(self._manage_unit_stances(obs)) return commands def _assign_squad_roles(self, obs: OpenRAObservation): combat_units = [ u for u in obs.units if u.type not in EXCLUDE_FROM_SQUADS and u.type in COMBAT_TYPES ] air_ids = [u.actor_id for u in combat_units if u.type in AIRCRAFT_TYPES] naval_ids = [u.actor_id for u in combat_units if u.type in SHIP_TYPES] ground_units = [u for u in combat_units if u.type not in AIRCRAFT_TYPES | SHIP_TYPES] ground_by_id = {u.actor_id: u for u in ground_units} self._air_squad = air_ids self._naval_squad = naval_ids # Keep squad membership persistent instead of re-slicing ground units every assign tick. self._attack_squad = [uid for uid in self._attack_squad if uid in ground_by_id] self._rush_squad = [uid for uid in self._rush_squad if uid in ground_by_id] self._idle_ground_units = [ uid for uid in self._idle_ground_units if uid in ground_by_id and uid not in set(self._attack_squad) and uid not in set(self._rush_squad) ] self._protection_squad = [ uid for uid in self._protection_squad if uid in ground_by_id and uid not in set(self._attack_squad) and uid not in set(self._rush_squad) ] assigned_ground = set(self._attack_squad) | set(self._rush_squad) | set(self._idle_ground_units) | set(self._protection_squad) unassigned_ground = [u for u in ground_units if u.actor_id not in assigned_ground] base_center = self._base_center(obs) if base_center is not None: unassigned_ground.sort( key=lambda u: self._cell_distance(u.cell_x, u.cell_y, base_center[0], base_center[1]) ) else: unassigned_ground.sort(key=lambda u: u.actor_id) self._idle_ground_units.extend(u.actor_id for u in unassigned_ground) if self.phase != "active": return # OpenRA keeps rush squads separate from assault squads and periodically # moves all idle base units into the rush squad when the rush trigger fires. total_ground_troops = len(self._idle_ground_units) + len(self._attack_squad) + len(self._rush_squad) + len(self._protection_squad) rush_units = [ground_by_id[uid] for uid in self._idle_ground_units if uid in ground_by_id] hold_for_economy = self._hold_attack_for_economy(obs) if ( not hold_for_economy and not self._base_under_pressure(obs) and obs.tick - self._last_rush_tick >= RUSH_INTERVAL and total_ground_troops >= SQUAD_SIZE and rush_units ): rush_target = self._select_rush_target(obs, rush_units) if rush_target is not None: target_x, target_y, target_actor_id, target_kind = rush_target launched = list(self._idle_ground_units) existing_rush = set(self._rush_squad) self._rush_squad.extend(uid for uid in launched if uid not in existing_rush) self._idle_ground_units = [] self._last_rush_tick = obs.tick self._set_squad_target("rush", target_actor_id, target_x, target_y, target_kind) self._enemy_base_pos = (target_x, target_y) self._clear_search_target() self._reset_stale_attack_target() self._log(f"Launching rush wave ({len(launched)} units) -> {(target_x, target_y)}") # Launch a fresh assault wave from idle base units once enough have accumulated. if not self._base_under_pressure(obs): self._assault_threshold = self._roll_assault_threshold() if ( not hold_for_economy and not self._base_under_pressure(obs) and len(self._idle_ground_units) >= self._assault_threshold ): launched = list(self._idle_ground_units) existing_attack = set(self._attack_squad) self._attack_squad.extend(uid for uid in launched if uid not in existing_attack) self._idle_ground_units = [] self._assault_threshold = self._roll_assault_threshold() self._log(f"Launching assault wave ({len(launched)} units)") def _squad_units(self, obs: OpenRAObservation, squad_ids: list[int]) -> list[UnitInfoModel]: alive = {u.actor_id: u for u in obs.units} return [alive[uid] for uid in squad_ids if uid in alive] def _set_squad_state(self, squad_name: str, state: str, until: Optional[int] = None): self._squad_states[squad_name] = state if until is None: self._squad_state_until.pop(squad_name, None) else: self._squad_state_until[squad_name] = until def _current_squad_state(self, obs: OpenRAObservation, squad_name: str) -> str: state = self._squad_states.get(squad_name, "assemble") hold_until = self._squad_state_until.get(squad_name, -9999) if state in {"retreat", "recover"} and hold_until > obs.tick: return state if state in {"retreat", "recover"} and hold_until <= obs.tick: self._set_squad_state(squad_name, "assemble") return "assemble" return state def _assemble_squad_commands( self, obs: OpenRAObservation, squad_name: str, squad_units: list[UnitInfoModel], ) -> List[CommandModel]: if not squad_units or squad_name == "naval": return [] anchor = self._base_center(obs) if anchor is None: leader = self._select_squad_leader(squad_units) anchor = (leader.cell_x, leader.cell_y) commands: list[CommandModel] = [] redirected = 0 for unit in squad_units: if self._cell_distance(unit.cell_x, unit.cell_y, anchor[0], anchor[1]) <= REGROUP_RADIUS: continue commands.append(CommandModel( action=ActionType.ATTACK_MOVE, actor_id=unit.actor_id, target_x=anchor[0], target_y=anchor[1], )) redirected += 1 if redirected: self._log(f"Assembling {squad_name} squad ({redirected}/{len(squad_units)})") return commands def _emergency_defense_units( self, obs: OpenRAObservation, needed: int, threat: Optional[UnitInfoModel] = None, ) -> list[UnitInfoModel]: if needed <= 0: return [] alive = {u.actor_id: u for u in obs.units} reserve_ids = set(self._protection_squad) candidates = [ alive[uid] for uid in self._rush_squad + self._attack_squad if uid in alive and uid not in reserve_ids and alive[uid].can_attack and ( threat is None or self._cell_distance(alive[uid].cell_x, alive[uid].cell_y, threat.cell_x, threat.cell_y) <= PROTECT_UNIT_SCAN_RADIUS ) ] if threat is not None: base_center = self._base_center(obs) or (threat.cell_x, threat.cell_y) candidates.sort( key=lambda u: ( self._cell_distance(u.cell_x, u.cell_y, threat.cell_x, threat.cell_y), self._cell_distance(u.cell_x, u.cell_y, base_center[0], base_center[1]), ) ) else: base_center = self._base_center(obs) if base_center is not None: candidates.sort( key=lambda u: self._cell_distance(u.cell_x, u.cell_y, base_center[0], base_center[1]) ) return candidates[:needed] def _can_cover_protection_threat( self, unit: UnitInfoModel, threat: UnitInfoModel, ) -> bool: return self._cell_distance(unit.cell_x, unit.cell_y, threat.cell_x, threat.cell_y) <= PROTECT_UNIT_SCAN_RADIUS def _manage_unit_stances(self, obs: OpenRAObservation) -> List[CommandModel]: protection_ids = set(self._protection_squad) | set(self._temporary_defenders) commands: list[CommandModel] = [] for unit in obs.units: if unit.type not in COMBAT_TYPES or not unit.can_attack: continue desired_stance = STANCE_DEFEND if unit.actor_id in protection_ids else STANCE_ATTACK_ANYTHING if unit.stance == desired_stance: continue commands.append(CommandModel( action=ActionType.SET_STANCE, actor_id=unit.actor_id, target_x=desired_stance, )) return commands def _select_ground_squad_leader( self, squad_name: str, squad_units: list[UnitInfoModel], force_new: bool = False, ) -> UnitInfoModel: leader_id = self._squad_leader_id.get(squad_name, 0) if not force_new and leader_id > 0: for unit in squad_units: if unit.actor_id == leader_id: return unit leader = self._select_squad_leader(squad_units) self._squad_leader_id[squad_name] = leader.actor_id return leader def _unregister_ground_squad( self, squad_name: str, squad_units: list[UnitInfoModel], ) -> None: unit_ids = [u.actor_id for u in squad_units] if squad_name == "assault": self._attack_squad = [] elif squad_name == "rush": self._rush_squad = [] existing = set(self._idle_ground_units) self._idle_ground_units.extend(uid for uid in unit_ids if uid not in existing) self._squad_leader_id.pop(squad_name, None) self._clear_squad_target(squad_name) self._set_squad_state(squad_name, "assemble") def _ground_flee_commands( self, obs: OpenRAObservation, squad_name: str, squad_units: list[UnitInfoModel], ) -> List[CommandModel]: fallback = self._random_own_building_cell(obs) commands: list[CommandModel] = [] if fallback is not None: tx, ty = fallback for unit in squad_units: commands.append(CommandModel( action=ActionType.MOVE, actor_id=unit.actor_id, target_x=tx, target_y=ty, )) self._unregister_ground_squad(squad_name, squad_units) return commands def _handle_field_squad( self, obs: OpenRAObservation, squad_name: str, squad_units: list[UnitInfoModel], minimum_commitment: int, rush: bool, ) -> List[CommandModel]: commands: list[CommandModel] = [] if not squad_units: self._set_squad_state(squad_name, "assemble") return commands if squad_name not in self._squad_regroup_count: self._squad_regroup_count[squad_name] = 0 if squad_name not in self._squad_last_commit_tick: self._squad_last_commit_tick[squad_name] = -9999 state = self._current_squad_state(obs, squad_name) literal_ground = squad_name in {"assault", "rush"} if literal_ground and state in {"retreat", "recover"}: state = "assemble" self._set_squad_state(squad_name, "assemble") leader = ( self._select_ground_squad_leader(squad_name, squad_units) if literal_ground else self._select_squad_leader(squad_units) ) local_enemy_units = self._visible_enemy_units_near(obs, leader.cell_x, leader.cell_y, LOCAL_FIGHT_RADIUS) local_enemy_buildings = self._visible_enemy_buildings_near(obs, leader.cell_x, leader.cell_y, LOCAL_FIGHT_RADIUS) if local_enemy_units or local_enemy_buildings: self._last_contact_tick = obs.tick self._reset_stale_attack_target() should_flee = not self._should_take_local_fight( squad_units, local_enemy_units, local_enemy_buildings, rush=rush or squad_name in {"air", "naval"}, cautious=state == "recover", squad_name=squad_name, ) if ( should_flee and squad_name in {"assault", "rush"} and state != "assemble" and self._has_own_building_near(obs, leader.cell_x, leader.cell_y, LOCAL_FIGHT_RADIUS) ): should_flee = False if should_flee and literal_ground: return self._ground_flee_commands(obs, squad_name, squad_units) if should_flee: retreat_commands = self._retreat_squad_commands(obs, squad_units, leader) if retreat_commands: self._clear_squad_target(squad_name) self._set_squad_state(squad_name, "retreat", obs.tick + SQUAD_RETREAT_HOLD_TICKS) self._squad_regroup_count[squad_name] = 0 return retreat_commands priority_target = self._pick_priority_target( obs, leader.cell_x, leader.cell_y, local_only=True, squad_name=squad_name, ) if priority_target is not None: self._set_squad_state(squad_name, "commit") self._set_squad_target( squad_name, priority_target[0], priority_target[1], priority_target[2], priority_target[4], ) if not literal_ground: focus_commands = self._focus_fire_commands(squad_units, priority_target) if focus_commands: return focus_commands if state == "retreat": if literal_ground: return self._ground_flee_commands(obs, squad_name, squad_units) retreat_commands = self._retreat_squad_commands(obs, squad_units, leader) if retreat_commands: return retreat_commands self._set_squad_state(squad_name, "recover", obs.tick + SQUAD_RECOVER_HOLD_TICKS) self._squad_regroup_count[squad_name] = 0 if state == "recover" and not (local_enemy_units or local_enemy_buildings): return self._assemble_squad_commands(obs, squad_name, squad_units) # Precompute a global force trigger (unconditional periodic wave) to avoid early exit on minimum_commitment force_global = False force_commit = False if squad_name in {"assault", "rush"} and squad_units: if obs.tick - self._last_attack_tick >= FORCE_COMMIT_GLOBAL_INTERVAL: force_global = True force_commit = True if len(squad_units) < minimum_commitment and not force_global: self._set_squad_state(squad_name, "assemble") if squad_name == "assault": self._assault_threshold = self._roll_assault_threshold() self._squad_regroup_count[squad_name] = 0 return self._assemble_squad_commands(obs, squad_name, squad_units) if squad_name == "naval" and not ( local_enemy_units or local_enemy_buildings or obs.visible_enemies or obs.visible_enemy_buildings ): self._set_squad_state(squad_name, "assemble") return commands # Continue evaluating other force-commit paths (regroup/time-based); retain any global trigger from above if squad_name in {"assault", "rush"}: regroup_attempts = self._squad_regroup_count.get(squad_name, 0) last_commit_tick = self._squad_last_commit_tick.get(squad_name, -9999) time_since_commit = obs.tick - last_commit_tick if ( regroup_attempts >= FORCE_COMMIT_REGROUPS and len(squad_units) >= FORCE_COMMIT_UNIT_THRESHOLD and not local_enemy_units and not local_enemy_buildings and time_since_commit >= FORCE_COMMIT_COOLDOWN and not self._base_under_pressure(obs) ): force_commit = True # Timed fallback push toward known target (mirror NormalAI periodic wave) if ( not force_commit and len(squad_units) >= max(FORCE_COMMIT_MIN_SIZE, minimum_commitment) and time_since_commit >= FORCE_COMMIT_TIME ): force_commit = True # Hard periodic global wave aligned to NormalAI cadence if ( not force_commit # remove size gating to mirror C# periodic pushes and len(squad_units) >= 1 and obs.tick - self._last_attack_tick >= FORCE_COMMIT_GLOBAL_INTERVAL ): force_commit = True force_global = True regroup_commands = self._regroup_squad_commands( squad_units, leader, regroup_radius=max(1, len(squad_units) // 3) if literal_ground else REGROUP_RADIUS, min_close_units=len(squad_units) if literal_ground else None, circular=literal_ground, ) if regroup_commands and not force_global: self._set_squad_state(squad_name, "regroup") self._squad_regroup_count[squad_name] = self._squad_regroup_count.get(squad_name, 0) + 1 return regroup_commands self._set_squad_state(squad_name, "commit") target = self._find_attack_target_info(obs, leader.cell_x, leader.cell_y, squad_name=squad_name) if target is None: fallback = self._enemy_base_pos or (self._get_map_size()[0] // 2, self._get_map_size()[1] // 2) tx, ty = fallback target_actor_id = 0 target_kind = "point" else: tx, ty, target_actor_id, target_kind = target if ( not local_enemy_units and not local_enemy_buildings and self._squad_is_stuck(obs, squad_name, leader, (tx, ty)) ): self._log(f"{squad_name.capitalize()} squad stuck near ({tx}, {ty}); clearing target and reevaluating") self._clear_squad_target(squad_name) if squad_name == "rush" and self._rush_target_pos == (tx, ty): self._rush_target_pos = None self._rush_target_actor_id = 0 self._rush_target_kind = "point" if self._enemy_base_pos == (tx, ty): self._enemy_base_pos = None if target_kind == "search": self._clear_search_target() self._reset_stale_attack_target() tx, ty, target_actor_id, target_kind = self._find_attack_target_info( obs, leader.cell_x, leader.cell_y, squad_name=squad_name, ) if squad_name in {"assault", "rush"}: self._track_stale_attack_target(obs, leader, tx, ty) if (force_commit or force_global) and squad_name == "assault": attackers = squad_units else: attackers = self._attack_wave_units(obs, squad_units) if squad_name == "assault" else squad_units direct_attack = self._target_actor_is_visible(obs, target_actor_id, target_kind) if literal_ground: direct_attack = False for unit in attackers: if direct_attack and (not unit.can_attack or self._busy_attacking(unit)): continue commands.append(CommandModel( action=ActionType.ATTACK if direct_attack else ActionType.ATTACK_MOVE, actor_id=unit.actor_id, target_actor_id=target_actor_id if direct_attack else 0, target_x=tx, target_y=ty, )) if commands and not direct_attack: self._record_attack_issue( direct_attack=False, command_count=len(commands), target_actor_id=target_actor_id, target_kind=target_kind, ) if literal_ground: self._last_attack_tick = obs.tick self._squad_regroup_count[squad_name] = 0 self._squad_last_commit_tick[squad_name] = obs.tick elif commands: self._record_attack_issue( direct_attack=True, command_count=len(commands), target_actor_id=target_actor_id, target_kind=target_kind, ) self._log(f"{squad_name.capitalize()} squad: {len(commands)} units attacking {target_kind} at ({tx}, {ty})") self._last_attack_tick = obs.tick self._squad_regroup_count[squad_name] = 0 self._squad_last_commit_tick[squad_name] = obs.tick return commands def _recruit_protection_units( self, obs: OpenRAObservation, threat: UnitInfoModel, needed: int, ) -> list[UnitInfoModel]: if needed <= 0: return [] alive = {u.actor_id: u for u in obs.units} candidates = [ alive[uid] for uid in self._idle_ground_units if uid in alive and alive[uid].can_attack and alive[uid].type not in AIRCRAFT_TYPES | SHIP_TYPES and self._can_cover_protection_threat(alive[uid], threat) ] candidates.sort( key=lambda u: ( self._cell_distance(u.cell_x, u.cell_y, threat.cell_x, threat.cell_y), self._cell_distance(u.cell_x, u.cell_y, *(self._base_center(obs) or (u.cell_x, u.cell_y))), ) ) selected = candidates[:needed] if not selected: return [] selected_ids = {u.actor_id for u in selected} existing = set(self._protection_squad) self._protection_squad.extend(uid for uid in selected_ids if uid not in existing) self._idle_ground_units = [uid for uid in self._idle_ground_units if uid not in selected_ids] return selected def _random_own_building_cell(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]: if not obs.buildings: return self._base_center(obs) building = random.choice(obs.buildings) return self._actor_cell(building) def _release_protection_squad( self, obs: OpenRAObservation, squad_units: list[UnitInfoModel], reason: str, ) -> List[CommandModel]: fallback = self._random_own_building_cell(obs) commands: list[CommandModel] = [] if fallback is not None: tx, ty = fallback for unit in squad_units: commands.append(CommandModel( action=ActionType.MOVE, actor_id=unit.actor_id, target_x=tx, target_y=ty, )) if squad_units: self._log(f"Releasing protection squad ({reason})") self._protection_squad = [] self._temporary_defenders = set() self._clear_squad_target("protection") self._clear_protection_response() self._set_squad_state("protection", "assemble") self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS return commands def _clear_protection_response(self) -> None: self._protect_from_actor_id = 0 self._protect_from_kind = "point" self._protect_from_point = None self._protect_from_until = -9999 def _best_visible_protection_response( self, obs: OpenRAObservation, x: int, y: int, ) -> Optional[Tuple[int, int, int, str]]: best: Optional[tuple[tuple[int, int, int], Tuple[int, int, int, str]]] = None for enemy in self._visible_enemy_units_near(obs, x, y, PROTECT_UNIT_SCAN_RADIUS): if not enemy.can_attack: continue dist = self._cell_distance(x, y, enemy.cell_x, enemy.cell_y) priority = TARGET_UNIT_PRIORITY.get(enemy.type, 30) key = (dist, -priority, -int((1.0 - enemy.hp_percent) * 100)) candidate = (enemy.cell_x, enemy.cell_y, enemy.actor_id, "unit") if best is None or key < best[0]: best = (key, candidate) for building in self._visible_enemy_buildings_near(obs, x, y, PROTECT_UNIT_SCAN_RADIUS): if not self._building_can_attack(building): continue bx, by = self._actor_cell(building) dist = self._cell_distance(x, y, bx, by) priority = TARGET_BUILDING_PRIORITY.get(building.type, 40) key = (dist, -priority, -int((1.0 - building.hp_percent) * 100)) candidate = (bx, by, building.actor_id, "building") if best is None or key < best[0]: best = (key, candidate) return best[1] if best is not None else None def _remember_protection_response( self, obs: OpenRAObservation, x: int, y: int, ) -> None: if obs.tick < self._respond_to_attack_cooldown_until: return candidate = self._best_visible_protection_response(obs, x, y) if candidate is None: return tx, ty, target_actor_id, target_kind = candidate self._protect_from_actor_id = target_actor_id self._protect_from_kind = target_kind self._protect_from_point = (tx, ty) self._protect_from_until = obs.tick + 1 self._respond_to_attack_cooldown_until = obs.tick + RESPOND_TO_ATTACK_COOLDOWN def _current_protection_response( self, obs: OpenRAObservation, ) -> Optional[Tuple[int, int, int, str]]: if obs.tick > self._protect_from_until: self._clear_protection_response() return None if self._protect_from_actor_id > 0: actor = self._visible_actor_by_target(obs, self._protect_from_actor_id, self._protect_from_kind) if actor is not None: tx, ty = self._actor_cell(actor) self._protect_from_point = (tx, ty) return tx, ty, self._protect_from_actor_id, self._protect_from_kind if self._protect_from_point is None: return None candidate = self._best_visible_protection_response(obs, self._protect_from_point[0], self._protect_from_point[1]) if candidate is not None: tx, ty, target_actor_id, target_kind = candidate self._protect_from_actor_id = target_actor_id self._protect_from_kind = target_kind self._protect_from_point = (tx, ty) return candidate self._clear_protection_response() return None def _retarget_nearby_squads_to_protection( self, obs: OpenRAObservation, tx: int, ty: int, target_actor_id: int, target_kind: str, ) -> None: for squad_name, squad_ids in ( ("assault", self._attack_squad), ("rush", self._rush_squad), ("air", self._air_squad), ("naval", self._naval_squad), ): squad_units = self._squad_units(obs, squad_ids) if not squad_units or not any(u.can_attack for u in squad_units): continue leader = self._select_squad_leader(squad_units) if self._cell_distance(leader.cell_x, leader.cell_y, tx, ty) > PROTECT_UNIT_SCAN_RADIUS: continue self._set_squad_target(squad_name, target_actor_id, tx, ty, target_kind) def _find_closest_protection_target( self, obs: OpenRAObservation, leader: UnitInfoModel, ) -> Optional[Tuple[int, int, int, str]]: best: Optional[Tuple[tuple[int, int, int], Tuple[int, int, int, str]]] = None for enemy in self._visible_enemy_units_near(obs, leader.cell_x, leader.cell_y, PROTECTION_SCAN_RADIUS): dist = self._cell_distance(leader.cell_x, leader.cell_y, enemy.cell_x, enemy.cell_y) priority = TARGET_UNIT_PRIORITY.get(enemy.type, 30 if enemy.can_attack else 10) key = (dist, -priority, -int((1.0 - enemy.hp_percent) * 100)) candidate = (enemy.cell_x, enemy.cell_y, enemy.actor_id, "unit") if best is None or key < best[0]: best = (key, candidate) for building in self._visible_enemy_buildings_near(obs, leader.cell_x, leader.cell_y, PROTECTION_SCAN_RADIUS): dist = self._cell_distance(leader.cell_x, leader.cell_y, building.cell_x, building.cell_y) priority = TARGET_BUILDING_PRIORITY.get(building.type, 40) key = (dist, -priority, -int((1.0 - building.hp_percent) * 100)) candidate = (building.cell_x, building.cell_y, building.actor_id, "building") if best is None or key < best[0]: best = (key, candidate) return best[1] if best is not None else None def _pick_protection_threat( self, obs: OpenRAObservation, threat_enemies: list[UnitInfoModel], ) -> Optional[UnitInfoModel]: if not threat_enemies: return None protected_points = self._protected_points(obs) best: Optional[tuple[tuple[int, int, int, int], UnitInfoModel]] = None for enemy in threat_enemies: priority = TARGET_UNIT_PRIORITY.get(enemy.type, 30 if enemy.can_attack else 10) nearest_protected = min( ( self._cell_distance(enemy.cell_x, enemy.cell_y, px, py) for px, py, _ in protected_points ), default=0, ) key = ( nearest_protected, -int(enemy.can_attack), -priority, -int((1.0 - enemy.hp_percent) * 100), ) if best is None or key < best[0]: best = (key, enemy) return best[1] if best is not None else None def _handle_defense(self, obs: OpenRAObservation) -> List[CommandModel]: response_target = self._current_protection_response(obs) protection_units = self._squad_units(obs, self._protection_squad) threat: Optional[UnitInfoModel] = None if response_target is not None and response_target[3] == "unit": threat = next((enemy for enemy in obs.visible_enemies if enemy.actor_id == response_target[2]), None) if threat is not None and response_target is not None: tx, ty, target_actor_id, target_kind = response_target self._retarget_nearby_squads_to_protection(obs, tx, ty, target_actor_id, target_kind) alive_units = {u.actor_id: u for u in obs.units} idle_defender_count = sum( 1 for uid in self._idle_ground_units if uid in alive_units and alive_units[uid].can_attack and alive_units[uid].type not in AIRCRAFT_TYPES | SHIP_TYPES and self._can_cover_protection_threat(alive_units[uid], threat) ) if idle_defender_count > 0: self._recruit_protection_units(obs, threat, idle_defender_count) protection_units = self._squad_units(obs, self._protection_squad) temporary_support: list[UnitInfoModel] = [] self._temporary_defenders = set() if response_target is None and not protection_units: self._clear_squad_target("protection") self._clear_protection_response() self._set_squad_state("protection", "assemble") self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS return [] if not protection_units and not temporary_support: self._clear_squad_target("protection") self._set_squad_state("protection", "assemble") return [] leader = self._select_squad_leader(protection_units or temporary_support) visible_target = self._get_visible_squad_target_info(obs, "protection") if response_target is not None: tx, ty, target_actor_id, target_kind = response_target self._set_squad_target("protection", target_actor_id, tx, ty, target_kind) self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS elif visible_target is not None: tx, ty, target_actor_id, target_kind = visible_target self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS else: replacement = self._find_closest_protection_target(obs, leader) if replacement is not None: tx, ty, target_actor_id, target_kind = replacement self._set_squad_target("protection", target_actor_id, tx, ty, target_kind) self._protection_backoff = PROTECTION_TARGET_BACKOFF_TICKS else: target_point = self._squad_target_point.get("protection") if target_point is None: return self._release_protection_squad(obs, protection_units, "no target") if self._protection_backoff < 0: return self._release_protection_squad(obs, protection_units, "lost target") tx, ty = target_point target_actor_id = 0 target_kind = "point" self._protection_backoff -= 1 self._last_contact_tick = obs.tick self._set_squad_state("protection", "commit") commands: list[CommandModel] = [] issued_ids: set[int] = set() direct_attack = target_actor_id > 0 and target_kind in {"unit", "building"} for defender in protection_units + temporary_support: if not defender.can_attack or defender.actor_id in issued_ids: continue issued_ids.add(defender.actor_id) commands.append(CommandModel( action=ActionType.ATTACK if direct_attack else ActionType.ATTACK_MOVE, actor_id=defender.actor_id, target_actor_id=target_actor_id if direct_attack else 0, target_x=tx, target_y=ty, )) if commands: self._record_attack_issue( direct_attack=direct_attack, command_count=len(commands), target_actor_id=target_actor_id, target_kind=target_kind, ) return commands def _handle_attack(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] commands.extend( self._handle_field_squad( obs, "assault", [u for u in self._squad_units(obs, self._attack_squad) if u.actor_id not in self._temporary_defenders], 1, rush=False, ) ) commands.extend( self._handle_field_squad( obs, "rush", [u for u in self._squad_units(obs, self._rush_squad) if u.actor_id not in self._temporary_defenders], 1, rush=True, ) ) commands.extend( self._handle_field_squad( obs, "air", [u for u in self._squad_units(obs, self._air_squad) if u.actor_id not in self._temporary_defenders], AIR_SQUAD_MIN_SIZE, rush=False, ) ) commands.extend( self._handle_field_squad( obs, "naval", [u for u in self._squad_units(obs, self._naval_squad) if u.actor_id not in self._temporary_defenders], NAVAL_SQUAD_MIN_SIZE, rush=False, ) ) return commands def _find_attack_target_info( self, obs: OpenRAObservation, leader_x: Optional[int], leader_y: Optional[int], squad_name: str = "assault", ) -> Tuple[int, int, int, str]: current_target = self._get_visible_squad_target_info(obs, squad_name) if current_target is not None: tx, ty, target_actor_id, target_kind = current_target if target_kind == "building": self._enemy_base_pos = (tx, ty) elif self._enemy_base_pos is None: self._enemy_base_pos = (tx, ty) self._clear_search_target() self._reset_stale_attack_target() return current_target target_point = self._squad_target_point.get(squad_name) target_actor_id = self._squad_target_actor_id.get(squad_name, 0) if target_point is not None and target_actor_id > 0: if not self._should_clear_point_target(obs, target_point, leader_x, leader_y): return target_point[0], target_point[1], 0, "point" self._clear_squad_target(squad_name) if squad_name == "rush" and self._rush_target_pos is not None: if self._should_clear_point_target(obs, self._rush_target_pos, leader_x, leader_y): self._log(f"Clearing stale rush target {self._rush_target_pos}") if self._enemy_base_pos == self._rush_target_pos: self._enemy_base_pos = None self._rush_target_pos = None self._rush_target_actor_id = 0 self._rush_target_kind = "point" self._reset_stale_attack_target() else: self._enemy_base_pos = self._rush_target_pos self._clear_search_target() return ( self._rush_target_pos[0], self._rush_target_pos[1], self._rush_target_actor_id, self._rush_target_kind, ) closest = self._pick_closest_visible_target(obs, leader_x, leader_y, squad_name=squad_name) if closest is not None: actor_id, tx, ty, _, kind = closest self._set_squad_target(squad_name, actor_id, tx, ty, kind) if kind == "building": self._enemy_base_pos = (tx, ty) elif self._enemy_base_pos is None: self._enemy_base_pos = (tx, ty) self._clear_search_target() self._reset_stale_attack_target() return tx, ty, actor_id, kind if self._enemy_base_pos and self._should_clear_enemy_base_target(obs, leader_x, leader_y): self._log(f"Clearing stale enemy base target {self._enemy_base_pos}") if self._rush_target_pos == self._enemy_base_pos: self._rush_target_pos = None self._rush_target_actor_id = 0 self._rush_target_kind = "point" self._enemy_base_pos = None self._reset_stale_attack_target() if self._enemy_base_pos: self._clear_search_target() self._set_squad_target(squad_name, 0, self._enemy_base_pos[0], self._enemy_base_pos[1], "point") return self._enemy_base_pos[0], self._enemy_base_pos[1], 0, "point" # If we don't have a persistent base target, prefer last-seen memory before blind exploration. if self._last_seen_base_pos is not None and obs.tick - self._last_seen_base_tick <= LAST_SEEN_BASE_TTL_TICKS: self._clear_search_target() self._set_squad_target(squad_name, 0, self._last_seen_base_pos[0], self._last_seen_base_pos[1], "point") return self._last_seen_base_pos[0], self._last_seen_base_pos[1], 0, "point" if self._last_seen_enemy_pos is not None and obs.tick - self._last_seen_enemy_tick <= LAST_SEEN_ENEMY_TTL_TICKS: self._clear_search_target() self._set_squad_target(squad_name, 0, self._last_seen_enemy_pos[0], self._last_seen_enemy_pos[1], "point") return self._last_seen_enemy_pos[0], self._last_seen_enemy_pos[1], 0, "point" tx, ty = self._select_search_target(obs, leader_x, leader_y) self._set_squad_target(squad_name, 0, tx, ty, "search") return tx, ty, 0, "search" def _find_attack_target( self, obs: OpenRAObservation, leader_x: Optional[int], leader_y: Optional[int], squad_name: str = "assault", ) -> Tuple[int, int]: tx, ty, _, _ = self._find_attack_target_info(obs, leader_x, leader_y, squad_name=squad_name) return tx, ty def _track_stale_attack_target( self, obs: OpenRAObservation, leader: UnitInfoModel, tx: int, ty: int, ) -> None: if obs.visible_enemies or obs.visible_enemy_buildings: self._reset_stale_attack_target() return target = (tx, ty) reached_target = self._cell_distance(leader.cell_x, leader.cell_y, tx, ty) <= STALE_TARGET_REACHED_RADIUS if self._stale_attack_target == target: if reached_target: self._stale_attack_redirects += 1 else: self._stale_attack_target = target self._stale_attack_redirects = 1 if reached_target else 0 def _should_clear_enemy_base_target( self, obs: OpenRAObservation, leader_x: Optional[int], leader_y: Optional[int], ) -> bool: return self._enemy_base_pos is not None and self._should_clear_point_target( obs, self._enemy_base_pos, leader_x, leader_y, ) def _should_clear_point_target( self, obs: OpenRAObservation, target: Tuple[int, int], leader_x: Optional[int], leader_y: Optional[int], ) -> bool: if obs.visible_enemies or obs.visible_enemy_buildings: return False recent_enemy_sighting_tick = max(self._last_seen_enemy_tick, self._last_seen_base_tick) if obs.tick - recent_enemy_sighting_tick < STALE_TARGET_CLEAR_INTERVAL: return False tx, ty = target if ( leader_x is not None and leader_y is not None and self._cell_distance(leader_x, leader_y, tx, ty) <= STALE_TARGET_REACHED_RADIUS ): # Only clear after we have actually explored the area around the point. # Otherwise we can drop targets prematurely and re-sweep the same zones. if self._spatial_raw: return self._area_is_explored(tx, ty, radius=5, threshold=0.5) return True return ( self._stale_attack_target == target and self._stale_attack_redirects >= STALE_TARGET_REDIRECT_LIMIT ) def _squad_is_stuck( self, obs: OpenRAObservation, squad_name: str, leader: UnitInfoModel, target: Tuple[int, int], ) -> bool: current_pos = (leader.cell_x, leader.cell_y) previous_pos = self._squad_last_progress_pos.get(squad_name) previous_target = self._squad_last_target_point.get(squad_name) if previous_pos != current_pos or previous_target != target: self._squad_last_progress_pos[squad_name] = current_pos self._squad_last_target_point[squad_name] = target self._squad_last_progress_tick[squad_name] = obs.tick return False last_tick = self._squad_last_progress_tick.get(squad_name, obs.tick) if obs.tick <= last_tick + SQUAD_STUCK_TICKS: return False self._squad_last_progress_tick[squad_name] = obs.tick return True def _reset_stale_attack_target(self) -> None: self._stale_attack_target = None self._stale_attack_redirects = 0 def _clear_search_target(self) -> None: self._search_target = None self._search_target_started_tick = -9999 def _set_squad_target( self, squad_name: str, target_actor_id: int, tx: int, ty: int, target_kind: str, ) -> None: self._squad_target_point[squad_name] = (tx, ty) if target_actor_id > 0 and target_kind in {"unit", "building"}: self._squad_target_actor_id[squad_name] = target_actor_id self._squad_target_kind[squad_name] = target_kind else: self._squad_target_actor_id.pop(squad_name, None) self._squad_target_kind.pop(squad_name, None) if squad_name == "rush": self._rush_target_pos = (tx, ty) self._rush_target_actor_id = target_actor_id if target_kind in {"unit", "building"} else 0 self._rush_target_kind = target_kind def _clear_squad_target(self, squad_name: str) -> None: self._squad_target_actor_id.pop(squad_name, None) self._squad_target_kind.pop(squad_name, None) self._squad_target_point.pop(squad_name, None) if squad_name == "rush": self._rush_target_pos = None self._rush_target_actor_id = 0 self._rush_target_kind = "point" def _visible_actor_by_target( self, obs: OpenRAObservation, target_actor_id: int, target_kind: str, ): if target_actor_id <= 0: return None if target_kind == "unit": return next((enemy for enemy in obs.visible_enemies if enemy.actor_id == target_actor_id), None) if target_kind == "building": return next((building for building in obs.visible_enemy_buildings if building.actor_id == target_actor_id), None) return None def _get_visible_squad_target_info( self, obs: OpenRAObservation, squad_name: str, ) -> Optional[Tuple[int, int, int, str]]: target_actor_id = self._squad_target_actor_id.get(squad_name, 0) target_kind = self._squad_target_kind.get(squad_name, "point") actor = self._visible_actor_by_target(obs, target_actor_id, target_kind) if actor is None: return None tx, ty = self._actor_cell(actor) self._squad_target_point[squad_name] = (tx, ty) return tx, ty, target_actor_id, target_kind def _advance_search_target(self, obs: OpenRAObservation) -> Tuple[int, int]: if not self._candidate_targets: self._candidate_targets = self._search_grid(obs) self._target_index = 0 target = self._candidate_targets[self._target_index % len(self._candidate_targets)] self._target_index = (self._target_index + 1) % len(self._candidate_targets) self._search_target = target self._search_target_started_tick = obs.tick self._log(f"Search target -> {target}") return target def _select_search_target( self, obs: OpenRAObservation, leader_x: Optional[int], leader_y: Optional[int], ) -> Tuple[int, int]: if self._search_target is None: return self._advance_search_target(obs) if ( leader_x is not None and leader_y is not None and self._cell_distance(leader_x, leader_y, self._search_target[0], self._search_target[1]) <= STALE_TARGET_REACHED_RADIUS ): return self._advance_search_target(obs) if obs.tick - self._search_target_started_tick >= SEARCH_TARGET_STALL_TICKS: self._log(f"Search target stalled -> rotating from {self._search_target}") return self._advance_search_target(obs) return self._search_target def _spatial_fog(self, x: int, y: int) -> float: return self._spatial_value(x, y, FOG_CHANNEL, 0.0) def _area_is_explored(self, x: int, y: int, radius: int = 5, threshold: float = 0.5) -> bool: """True if any cell near (x,y) is explored/visible (fog>=0.5).""" if not self._spatial_raw: return False w, h = self._get_map_size() x0 = max(0, x - radius) y0 = max(0, y - radius) x1 = min(w - 1, x + radius) y1 = min(h - 1, y + radius) for yy in range(y0, y1 + 1): for xx in range(x0, x1 + 1): if self._spatial_fog(xx, yy) >= threshold: return True return False def _search_grid(self, obs: OpenRAObservation) -> list[Tuple[int, int]]: """Exploration candidates. Prefer frontier/unexplored regions when spatial fog exists (channel 4). Otherwise fall back to a coarse far-from-base grid. """ w, h = self._get_map_size() cy = self._find_building(obs, "fact") if not cy: return [(w // 2, h // 2)] bx = cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024 by = cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024 if self._spatial_raw and obs.tick - self._frontier_cache_tick >= FRONTIER_REFRESH_TICKS: self._frontier_cache_tick = obs.tick block = 8 gx_max = max(1, w // block) gy_max = max(1, h // block) scored: list[tuple[float, tuple[int, int]]] = [] for gx in range(gx_max): for gy in range(gy_max): cx = min(w - 1, gx * block + block // 2) cyy = min(h - 1, gy * block + block // 2) if not self._is_passable_cell(cx, cyy): continue unseen = 0 frontier = 0 samples = 0 for sx in (0, block // 2, block - 1): for sy in (0, block // 2, block - 1): x = gx * block + sx y = gy * block + sy if x < 0 or y < 0 or x >= w or y >= h: continue samples += 1 fog = self._spatial_fog(x, y) if fog < 0.25: unseen += 1 # frontier if adjacent to explored/visible for dx, dy in ((1, 0), (-1, 0), (0, 1), (0, -1)): nx, ny = x + dx, y + dy if 0 <= nx < w and 0 <= ny < h and self._spatial_fog(nx, ny) >= 0.5: frontier += 1 break if samples <= 0: continue unseen_frac = unseen / samples frontier_frac = frontier / samples d2 = (cx - bx) * (cx - bx) + (cyy - by) * (cyy - by) score = unseen_frac * 3.0 + frontier_frac * 4.0 + (d2 ** 0.5) * 0.01 if d2 < 20 * 20: score -= 1.5 scored.append((score, (cx, cyy))) scored.sort(key=lambda t: t[0], reverse=True) candidates = [p for _, p in scored[:40]] if candidates: return candidates n = 3 cw, ch = max(1, w // n), max(1, h // n) centers = [(cw * gx + cw // 2, ch * gy + ch // 2) for gx in range(n) for gy in range(n)] min_d2 = (min(w, h) // n) ** 2 far = [p for p in centers if (p[0] - bx) ** 2 + (p[1] - by) ** 2 > min_d2] if not far: far = [(w // 2, h // 2)] far.sort(key=lambda p: (p[0] - bx) ** 2 + (p[1] - by) ** 2, reverse=True) return far # ── Repairs ─────────────────────────────────────────────────── def _manage_repairs(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] for b in obs.buildings: if b.hp_percent >= 0.98: self._repair_issued.discard(b.actor_id) self._reactive_repair_targets.discard(b.actor_id) continue if ( b.actor_id in self._reactive_repair_targets and not b.is_repairing and b.actor_id not in self._repair_issued and self._available_credits(obs) >= 500 ): commands.append(CommandModel(action=ActionType.REPAIR, actor_id=b.actor_id)) self._repair_issued.add(b.actor_id) self._reactive_repair_targets.discard(b.actor_id) if obs.tick - self._last_repair_tick < REPAIR_ALL_BUILDINGS_COOLDOWN: return commands self._last_repair_tick = obs.tick for b in obs.buildings: if (b.hp_percent < 0.75 and not b.is_repairing and b.actor_id not in self._repair_issued and self._available_credits(obs) >= 500): commands.append(CommandModel(action=ActionType.REPAIR, actor_id=b.actor_id)) self._repair_issued.add(b.actor_id) return commands # ── Power ───────────────────────────────────────────────────── def _manage_power(self, obs: OpenRAObservation) -> List[CommandModel]: commands = [] if obs.tick - self._last_power_toggle_tick < POWER_TOGGLE_INTERVAL: return commands self._last_power_toggle_tick = obs.tick bal = obs.economy.power_provided - obs.economy.power_drained buildings_by_id = {b.actor_id: b for b in obs.buildings} self._powered_down = { actor_id: expected_change for actor_id, expected_change in self._powered_down.items() if actor_id in buildings_by_id and not buildings_by_id[actor_id].is_powered } if bal > 0 and self._powered_down: for actor_id, expected_change in sorted( self._powered_down.items(), key=lambda item: item[1], reverse=True, ): if bal - expected_change < 0: continue commands.append(CommandModel(action=ActionType.POWER_DOWN, actor_id=actor_id)) bal -= expected_change for cmd in commands: self._powered_down.pop(cmd.actor_id, None) return commands if bal < 0: candidates: list[tuple[int, int]] = [] for b in obs.buildings: if b.type not in POWER_DOWN_TYPES or not b.is_powered or b.actor_id in self._powered_down: continue expected_change = max(0, -b.power_amount) if expected_change <= 0: continue candidates.append((expected_change, b.actor_id)) for expected_change, actor_id in sorted(candidates): commands.append(CommandModel(action=ActionType.POWER_DOWN, actor_id=actor_id)) self._powered_down[actor_id] = expected_change bal += expected_change if bal >= 0: break return commands # ── Cleanup ─────────────────────────────────────────────────── def _cleanup_dead(self, obs: OpenRAObservation): alive = {u.actor_id for u in obs.units} self._attack_squad = [uid for uid in self._attack_squad if uid in alive] self._protection_squad = [uid for uid in self._protection_squad if uid in alive] self._rush_squad = [uid for uid in self._rush_squad if uid in alive] self._air_squad = [uid for uid in self._air_squad if uid in alive] self._naval_squad = [uid for uid in self._naval_squad if uid in alive] self._idle_ground_units = [uid for uid in self._idle_ground_units if uid in alive] self._temporary_defenders &= alive self._squad_regroup_count = {k: v for k, v in self._squad_regroup_count.items() if k in self._squad_states} self._squad_last_progress_tick = { squad_name: tick for squad_name, tick in self._squad_last_progress_tick.items() if squad_name in self._squad_states } self._squad_last_progress_pos = { squad_name: pos for squad_name, pos in self._squad_last_progress_pos.items() if squad_name in self._squad_states } self._squad_last_target_point = { squad_name: target for squad_name, target in self._squad_last_target_point.items() if squad_name in self._squad_states } self._squad_leader_id = { squad_name: actor_id for squad_name, actor_id in self._squad_leader_id.items() if squad_name in self._squad_states and actor_id in alive } self._squad_target_actor_id = { squad_name: actor_id for squad_name, actor_id in self._squad_target_actor_id.items() if squad_name in self._squad_states } self._squad_target_kind = { squad_name: kind for squad_name, kind in self._squad_target_kind.items() if squad_name in self._squad_states } self._squad_target_point = { squad_name: point for squad_name, point in self._squad_target_point.items() if squad_name in self._squad_states } self._previous_unit_hp = { actor_id: hp for actor_id, hp in self._previous_unit_hp.items() if actor_id in alive } self._mcv_targets = { actor_id: target for actor_id, target in self._mcv_targets.items() if actor_id in alive } self._mcv_resource_targets = { actor_id: target for actor_id, target in self._mcv_resource_targets.items() if actor_id in alive } alive_b = {b.actor_id for b in obs.buildings} self._requested_refineries = { actor_id: request for actor_id, request in self._requested_refineries.items() if request[2] > obs.tick } self._mcv_deploy_until = { actor_id: tick for actor_id, tick in self._mcv_deploy_until.items() if actor_id in alive or actor_id in alive_b if tick > obs.tick } self._repair_issued &= alive_b self._reactive_repair_targets &= alive_b self._rally_set &= alive_b self._previous_building_hp = { actor_id: hp for actor_id, hp in self._previous_building_hp.items() if actor_id in alive_b } self._powered_down = { actor_id: expected_change for actor_id, expected_change in self._powered_down.items() if actor_id in alive_b } self._harvester_retreat_until = { actor_id: tick for actor_id, tick in self._harvester_retreat_until.items() if actor_id in alive } self._harvester_recent_damage_until = { actor_id: tick for actor_id, tick in self._harvester_recent_damage_until.items() if actor_id in alive } self._harvester_reassign_until = { actor_id: tick for actor_id, tick in self._harvester_reassign_until.items() if actor_id in alive } self._harvester_patch_targets = { actor_id: target for actor_id, target in self._harvester_patch_targets.items() if actor_id in alive } self._harvester_last_cells = { actor_id: cell for actor_id, cell in self._harvester_last_cells.items() if actor_id in alive } self._harvester_last_progress_tick = { actor_id: tick for actor_id, tick in self._harvester_last_progress_tick.items() if actor_id in alive } self._harvester_no_resource_until = { actor_id: tick for actor_id, tick in self._harvester_no_resource_until.items() if actor_id in alive } self._recent_attack_points = [ (x, y, tick) for x, y, tick in self._recent_attack_points if obs.tick - tick <= BASE_ATTACK_MEMORY_TICKS ] if not self._rush_squad: self._clear_squad_target("rush") if not self._attack_squad: self._clear_squad_target("assault") def _update_damage_memory(self, obs: OpenRAObservation): self._recent_attack_points = [ (x, y, tick) for x, y, tick in self._recent_attack_points if obs.tick - tick <= BASE_ATTACK_MEMORY_TICKS ] next_building_hp: dict[int, float] = {} for building in obs.buildings: next_building_hp[building.actor_id] = building.hp_percent previous_hp = self._previous_building_hp.get(building.actor_id) if previous_hp is not None and building.hp_percent + 1e-6 < previous_hp: bx, by = self._actor_cell(building) if self._is_home_base_cell(obs, bx, by): self._remember_attack_point(bx, by, obs.tick) if building.type in PROTECTION_TYPES: self._remember_protection_response(obs, bx, by) if previous_hp >= REPAIR_REACTIVE_HP_THRESHOLD and building.hp_percent < REPAIR_REACTIVE_HP_THRESHOLD: self._reactive_repair_targets.add(building.actor_id) self._previous_building_hp = next_building_hp next_unit_hp: dict[int, float] = {} for unit in obs.units: next_unit_hp[unit.actor_id] = unit.hp_percent previous_hp = self._previous_unit_hp.get(unit.actor_id) if previous_hp is not None and unit.hp_percent + 1e-6 < previous_hp: if unit.type == "mcv" or ( unit.type == "harv" and self._is_local_protection_asset(obs, unit.cell_x, unit.cell_y) ): self._remember_attack_point(unit.cell_x, unit.cell_y, obs.tick) self._remember_protection_response(obs, unit.cell_x, unit.cell_y) if unit.type == "harv": self._harvester_recent_damage_until[unit.actor_id] = obs.tick + HARVESTER_RETREAT_COOLDOWN self._previous_unit_hp = next_unit_hp def _remember_attack_point(self, x: int, y: int, tick: int) -> None: for idx, (px, py, _) in enumerate(self._recent_attack_points): if self._cell_distance(x, y, px, py) <= ATTACK_POINT_MERGE_RADIUS: self._recent_attack_points[idx] = ((px + x) // 2, (py + y) // 2, tick) return self._recent_attack_points.append((x, y, tick)) def _update_post_contact_state(self, obs: OpenRAObservation): combat_count = self._combat_unit_count(obs) was_recovering = self._in_recovery_mode(obs) self._combat_peak = max(self._combat_peak, combat_count) if self._base_under_pressure(obs): self._last_contact_tick = obs.tick had_recent_contact = obs.tick - self._last_contact_tick <= POST_CONTACT_WINDOW collapse_threshold = max(RECOVERY_MIN_COMBAT, int(self._combat_peak * RECOVERY_DROP_RATIO)) if had_recent_contact and self._combat_peak >= RECOVERY_TRIGGER_PEAK and combat_count <= collapse_threshold: self._recovery_until_tick = max(self._recovery_until_tick, obs.tick + RECOVERY_DURATION) if ( was_recovering and combat_count >= max(RECOVERY_EXIT_COMBAT, int(self._combat_peak * 0.75)) and obs.tick - self._last_contact_tick >= RECOVERY_CLEAR_CONTACT_GAP and not self._base_under_pressure(obs) ): self._recovery_until_tick = obs.tick self._combat_peak = combat_count elif not had_recent_contact and combat_count < RECOVERY_TRIGGER_PEAK: self._combat_peak = max(combat_count, self._combat_peak - 1) is_recovering = self._in_recovery_mode(obs) if not was_recovering and is_recovering: self._log(f"Recovery mode -> rebuild ({combat_count} combat units, peak {self._combat_peak})") elif was_recovering and not is_recovering: self._log(f"Recovery mode -> cleared ({combat_count} combat units)") # ── Map ─────────────────────────────────────────────────────── def _update_map_size(self, obs: OpenRAObservation): w, h = obs.map_info.width, obs.map_info.height if w > 0 and h > 0: if self._cached_map_size is None: self._cached_map_size = (w, h) else: cw, ch = self._cached_map_size if w < cw or h < ch: self._cached_map_size = (w, h) self._candidate_targets = [] self._target_index = 0 self._clear_search_target() def _get_map_size(self) -> Tuple[int, int]: return self._cached_map_size or (128, 128) def _update_spatial_analysis(self, obs: OpenRAObservation): if not obs.spatial_map or obs.spatial_channels <= 0: return if ( self._spatial_raw and obs.tick >= self._last_spatial_update_tick and obs.tick - self._last_spatial_update_tick < RESOURCE_MAP_UPDATE_INTERVAL ): return try: raw = base64.b64decode(obs.spatial_map) except Exception: return w, h = self._get_map_size() channels = obs.spatial_channels if w <= 0 or h <= 0 or channels <= 0: return self._spatial_raw = raw self._spatial_channels = channels self._last_spatial_update_tick = obs.tick resource_cells: list[tuple[int, int, float]] = [] for y in range(h): for x in range(w): base_idx = (y * w + x) * channels try: resource = struct.unpack_from("f", raw, (base_idx + 2) * 4)[0] except struct.error: continue if resource > 0: resource_cells.append((x, y, resource)) self._resource_patches = self._cluster_resource_patches(resource_cells) self._sync_resource_patch_memory(obs.tick) def _cluster_resource_patches( self, resource_cells: list[tuple[int, int, float]], ) -> list[dict[str, float | int]]: if not resource_cells: return [] density_by_cell = {(x, y): density for x, y, density in resource_cells} unvisited = set(density_by_cell.keys()) patches: list[dict[str, float | int]] = [] while unvisited: start = unvisited.pop() queue = [start] cluster = [(start[0], start[1], density_by_cell[start])] while queue: cx, cy = queue.pop() for dx in range(-RESOURCE_PATCH_LINK_RADIUS, RESOURCE_PATCH_LINK_RADIUS + 1): for dy in range(-RESOURCE_PATCH_LINK_RADIUS, RESOURCE_PATCH_LINK_RADIUS + 1): nx, ny = cx + dx, cy + dy if (nx, ny) not in unvisited: continue unvisited.remove((nx, ny)) queue.append((nx, ny)) cluster.append((nx, ny, density_by_cell[(nx, ny)])) if len(cluster) < RESOURCE_PATCH_MIN_CELLS: continue center_x = sum(c[0] for c in cluster) // len(cluster) center_y = sum(c[1] for c in cluster) // len(cluster) total_density = sum(c[2] for c in cluster) resource_center = min( cluster, key=lambda c: ((c[0] - center_x) ** 2 + (c[1] - center_y) ** 2, -c[2]), ) patches.append( { "center_x": center_x, "center_y": center_y, "resource_center_x": resource_center[0], "resource_center_y": resource_center[1], "cells": len(cluster), "total_density": round(total_density, 1), } ) patches.sort(key=lambda p: (int(p["cells"]), float(p["total_density"])), reverse=True) return patches def _sync_resource_patch_memory(self, tick: int): previous = dict(self._resource_patch_memory) refreshed: dict[tuple[int, int], dict[str, float | int]] = {} for patch in self._resource_patches: target = self._patch_target(patch) match_key: Optional[tuple[int, int]] = None best_dist = RESOURCE_PATCH_MEMORY_MATCH_RADIUS + 1 for key in previous: dist = self._cell_distance(target[0], target[1], key[0], key[1]) if dist <= RESOURCE_PATCH_MEMORY_MATCH_RADIUS and dist < best_dist: match_key = key best_dist = dist memory = previous.pop(match_key) if match_key is not None else {} current_density = float(patch["total_density"]) previous_density = float(memory.get("last_density", current_density)) peak_density = max( current_density, previous_density, float(memory.get("peak_density", current_density)), ) density_drop_ratio = 0.0 if previous_density > 1e-6 and current_density < previous_density: density_drop_ratio = (previous_density - current_density) / previous_density depletion_ratio = 0.0 if peak_density > 1e-6 and current_density < peak_density: depletion_ratio = (peak_density - current_density) / peak_density depletion_trend = float(memory.get("depletion_trend", 0.0)) * 0.7 + density_drop_ratio * 0.3 refreshed[target] = { "last_density": current_density, "peak_density": peak_density, "depletion_ratio": max(0.0, min(1.0, depletion_ratio)), "depletion_trend": max(0.0, min(1.0, depletion_trend)), "last_tick": tick, } self._resource_patch_memory = refreshed def _patch_memory(self, patch: dict[str, float | int]) -> dict[str, float | int]: return self._resource_patch_memory.get(self._patch_target(patch), {}) def _nearest_anchor_distance( self, x: int, y: int, anchors: list[tuple[int, int]], ) -> int: if not anchors: return 0 return min(self._cell_distance(x, y, ax, ay) for ax, ay in anchors) def _resource_patch_capacity( self, total_density: float, cells: int, refinery_count: int, depletion_ratio: float, threat: int, ) -> int: capacity = max(1, cells // RESOURCE_CELLS_PER_HARVESTER) if total_density >= cells * 2.0: capacity += 1 if total_density >= cells * 3.5: capacity += 1 if refinery_count > 0: capacity += 1 capacity = min(RESOURCE_PATCH_MAX_CAPACITY, capacity) floor = 1 if refinery_count > 0 else 0 if depletion_ratio >= 0.55: capacity = max(floor, capacity - 1) if threat > 0: capacity = max(0, capacity - min(threat, 2)) return capacity def _spatial_value(self, x: int, y: int, channel: int, default: float = 0.0) -> float: w, h = self._get_map_size() if ( not self._spatial_raw or self._spatial_channels <= channel or x < 0 or y < 0 or x >= w or y >= h ): return default base_idx = (y * w + x) * self._spatial_channels try: return struct.unpack_from("f", self._spatial_raw, (base_idx + channel) * 4)[0] except struct.error: return default def _resource_amount_at(self, x: int, y: int) -> float: return self._spatial_value(x, y, 2, 0.0) def _terrain_index_at(self, x: int, y: int) -> int: return int(self._spatial_value(x, y, 0, 0.0)) def _is_passable_cell(self, x: int, y: int) -> bool: if not self._spatial_raw: return True return self._spatial_value(x, y, 3, 1.0) > 0.5 def _is_water_candidate_cell(self, x: int, y: int) -> bool: if not self._spatial_raw: return False # Prefer terrain-index water (common case), but fall back to a human-visible cue: # large contiguous impassable regions (water) in the passability channel. if self._terrain_index_at(x, y) in {7, 8}: return True passability = self._spatial_value(x, y, 3, 1.0) if passability > 0.05: return False # Reject isolated impassables (cliffs/rocks) by requiring most neighbors # to also be strongly impassable. imp = 0 for dx in (-1, 0, 1): for dy in (-1, 0, 1): if self._spatial_value(x + dx, y + dy, 3, 1.0) <= 0.05: imp += 1 return imp >= 8 def _is_open_water_cell(self, x: int, y: int) -> bool: w, h = self._get_map_size() if x <= 0 or y <= 0 or x >= w - 1 or y >= h - 1: return False for dx in (-1, 0, 1): for dy in (-1, 0, 1): if not self._is_water_candidate_cell(x + dx, y + dy): return False return True def _local_resource_score(self, x: int, y: int, radius: int) -> float: total = 0.0 for dx in range(-radius, radius + 1): for dy in range(-radius, radius + 1): total += self._resource_amount_at(x + dx, y + dy) return total def _local_water_score(self, x: int, y: int, radius: int) -> int: total = 0 for dx in range(-radius, radius + 1): for dy in range(-radius, radius + 1): if self._is_water_candidate_cell(x + dx, y + dy): total += 1 return total # ── Helpers ─────────────────────────────────────────────────── def _find_building(self, obs: OpenRAObservation, btype: str) -> Optional[BuildingInfoModel]: return next((b for b in obs.buildings if b.type == btype), None) def _is_structure_queue(self, queue_type: str) -> bool: return queue_type in STRUCTURE_QUEUE_TYPES def _structure_queue_busy(self, obs: OpenRAObservation, item_type: str) -> bool: queue_type = self._structure_queue_type(item_type) return any(p.queue_type == queue_type for p in obs.production) def _available_credits(self, obs: OpenRAObservation) -> int: # OpenRA splits spendable funds between liquid cash and stored ore/resources. return obs.economy.cash + obs.economy.ore def _combat_unit_count(self, obs: OpenRAObservation) -> int: return sum(1 for u in obs.units if u.type in COMBAT_TYPES) def _in_recovery_mode(self, obs: OpenRAObservation) -> bool: return obs.tick < self._recovery_until_tick def _base_center(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]: cy = self._find_building(obs, "fact") if cy is not None: return ( cy.cell_x if cy.cell_x > 0 else cy.pos_x // 1024, cy.cell_y if cy.cell_y > 0 else cy.pos_y // 1024, ) if obs.buildings: b = obs.buildings[0] return ( b.cell_x if b.cell_x > 0 else b.pos_x // 1024, b.cell_y if b.cell_y > 0 else b.pos_y // 1024, ) return None def _is_home_base_cell(self, obs: OpenRAObservation, x: int, y: int) -> bool: base_center = self._base_center(obs) return base_center is None or self._cell_distance(x, y, base_center[0], base_center[1]) <= HOME_BASE_THREAT_RADIUS def _is_local_protection_asset(self, obs: OpenRAObservation, x: int, y: int) -> bool: if self._is_home_base_cell(obs, x, y): return True anchors = [ b for b in obs.buildings if self._canonical_building_type(b.type) in {"fact", "proc"} and self._is_home_base_cell( obs, b.cell_x if b.cell_x > 0 else b.pos_x // 1024, b.cell_y if b.cell_y > 0 else b.pos_y // 1024, ) ] if anchors and self._nearest_distance_to_buildings(x, y, anchors) <= PROTECT_UNIT_SCAN_RADIUS: return True return False def _placement_base_center(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]: cy = self._find_building(obs, "fact") if cy is not None: return self._building_top_left(cy) if obs.buildings: return self._building_top_left(obs.buildings[0]) return None def _protected_points(self, obs: OpenRAObservation) -> list[tuple[int, int, int]]: protected_points: list[tuple[int, int, int]] = [] for b in obs.buildings: if b.type in PROTECTION_TYPES: bx = b.cell_x if b.cell_x > 0 else b.pos_x // 1024 by = b.cell_y if b.cell_y > 0 else b.pos_y // 1024 if not self._is_home_base_cell(obs, bx, by): continue protected_points.append((bx, by, PROTECTION_SCAN_RADIUS)) for u in obs.units: if u.type in {"harv", "mcv"} and self._is_local_protection_asset(obs, u.cell_x, u.cell_y): protected_points.append((u.cell_x, u.cell_y, PROTECT_UNIT_SCAN_RADIUS)) return protected_points def _base_threat_enemies(self, obs: OpenRAObservation) -> list[UnitInfoModel]: protected_points = self._protected_points(obs) emergency_points = [ (x, y, BASE_EMERGENCY_VISIBILITY_RADIUS) for x, y, tick in self._recent_attack_points if obs.tick - tick <= BASE_ATTACK_MEMORY_TICKS ] threat_points = protected_points + emergency_points if not threat_points: return [] return [ e for e in obs.visible_enemies if any(self._cell_distance(e.cell_x, e.cell_y, px, py) <= radius for px, py, radius in threat_points) ] def _base_under_pressure(self, obs: OpenRAObservation) -> bool: return bool(self._base_threat_enemies(obs)) def _nearest_enemy_to_unit( self, obs: OpenRAObservation, unit: UnitInfoModel, radius: int, ) -> Optional[UnitInfoModel]: nearby = self._visible_enemy_units_near(obs, unit.cell_x, unit.cell_y, radius) if not nearby: return None return min(nearby, key=lambda e: self._cell_distance(unit.cell_x, unit.cell_y, e.cell_x, e.cell_y)) def _pick_harvester_retreat_point( self, obs: OpenRAObservation, harvester: UnitInfoModel, patch_states: Optional[list[dict[str, object]]] = None, ) -> Optional[Tuple[int, int]]: refineries = [b for b in obs.buildings if b.type == "proc"] if patch_states is None: patch_states = self._resource_patch_states(obs) threatened_patch = self._nearest_patch_state( patch_states, harvester.cell_x, harvester.cell_y, HARVESTER_PATCH_ASSIGN_RADIUS, allow_fallback=True, ) avoid_target: Optional[tuple[int, int]] = None if threatened_patch is not None and int(threatened_patch["threat"]) > 0: avoid_target = threatened_patch["target"] # type: ignore[index] if refineries: scored_refineries = sorted( refineries, key=lambda b: ( 1 if avoid_target is not None and self._cell_distance( b.cell_x if b.cell_x > 0 else b.pos_x // 1024, b.cell_y if b.cell_y > 0 else b.pos_y // 1024, avoid_target[0], avoid_target[1], ) <= HARVESTER_PATCH_ASSIGN_RADIUS else 0, self._cell_distance( harvester.cell_x, harvester.cell_y, b.cell_x if b.cell_x > 0 else b.pos_x // 1024, b.cell_y if b.cell_y > 0 else b.pos_y // 1024, ), ), ) best = scored_refineries[0] return ( best.cell_x if best.cell_x > 0 else best.pos_x // 1024, best.cell_y if best.cell_y > 0 else best.pos_y // 1024, ) return self._base_center(obs) def _harvester_recently_damaged(self, obs: OpenRAObservation, actor_id: int) -> bool: return obs.tick < self._harvester_recent_damage_until.get(actor_id, -9999) def _update_harvester_progress(self, obs: OpenRAObservation, harvester: UnitInfoModel): actor_id = harvester.actor_id current_cell = (harvester.cell_x, harvester.cell_y) previous_cell = self._harvester_last_cells.get(actor_id) patch_target = self._harvester_patch_targets.get(actor_id) if previous_cell is None: self._harvester_last_progress_tick[actor_id] = obs.tick else: moved = self._cell_distance(current_cell[0], current_cell[1], previous_cell[0], previous_cell[1]) if moved >= HARVESTER_PROGRESS_MOVE_THRESHOLD: self._harvester_last_progress_tick[actor_id] = obs.tick if patch_target is not None and self._cell_distance(current_cell[0], current_cell[1], patch_target[0], patch_target[1]) <= 2: self._harvester_last_progress_tick[actor_id] = obs.tick self._harvester_last_cells[actor_id] = current_cell def _is_low_effect_harvester(self, obs: OpenRAObservation, harvester: UnitInfoModel) -> bool: if harvester.is_idle: return False if obs.tick < self._harvester_retreat_until.get(harvester.actor_id, -9999): return False if self._harvester_recently_damaged(obs, harvester.actor_id): return False if obs.tick < self._harvester_no_resource_until.get(harvester.actor_id, -9999): return False last_progress = self._harvester_last_progress_tick.get(harvester.actor_id, obs.tick) if obs.tick - last_progress < HARVESTER_LOW_EFFECT_TIMEOUT: return False patch_target = self._harvester_patch_targets.get(harvester.actor_id) if patch_target is not None and self._local_resource_score(patch_target[0], patch_target[1], 2) <= HARVESTER_LOCAL_RESOURCE_MIN: return True activity = harvester.current_activity.lower() if "harvest" in activity or "move" in activity or "dock" in activity: return True return False def _fallback_harvest_target( self, obs: OpenRAObservation, harvester: UnitInfoModel, patch_states: Optional[list[dict[str, object]]] = None, prefer_safe: bool = False, exclude_target: Optional[tuple[int, int]] = None, ) -> Optional[tuple[int, int]]: if patch_states is None: patch_states = self._resource_patch_states(obs) scan_origin = self._pick_harvester_retreat_point(obs, harvester, patch_states=patch_states) if scan_origin is None: scan_origin = (harvester.cell_x, harvester.cell_y) current_target = self._harvester_patch_targets.get(harvester.actor_id) candidates = [] for state in patch_states: target = state["target"] # type: ignore[index] if exclude_target is not None and target == exclude_target: continue if int(state["capacity"]) <= 0: continue if float(state["depletion_ratio"]) >= 0.95: continue if self._local_resource_score(target[0], target[1], 2) <= HARVESTER_LOCAL_RESOURCE_MIN: continue if prefer_safe and int(state["threat"]) > 0: continue candidates.append(state) if not candidates: return None best = max( candidates, key=lambda state: ( int(state["score"]) + (140 if int(state["refinery_count"]) > 0 else 0) + (80 if current_target == state["target"] and int(state["threat"]) == 0 and not prefer_safe else 0) - int(max(0.0, float(state["saturation"]) - 1.0) * 420) - self._cell_distance( scan_origin[0], scan_origin[1], state["target"][0], # type: ignore[index] state["target"][1], # type: ignore[index] ) * 18 -self._cell_distance( harvester.cell_x, harvester.cell_y, state["target"][0], # type: ignore[index] state["target"][1], # type: ignore[index] ) * 6 - int(state["threat"]) * (480 if prefer_safe else 260), ), ) return best["target"] # type: ignore[return-value] def _pending_build_cost(self, obs: OpenRAObservation) -> int: if self._build_index >= len(BUILD_ORDER): return 0 item = self._resolve_build_item(obs, BUILD_ORDER[self._build_index]) if item is None or self._already_have(obs, item, self._build_index): return 0 if not self._can_produce(obs, item): return 0 if not self._structure_queue_available(obs, item): return 0 return self._build_cost(item) def _building_counts(self, obs: OpenRAObservation) -> dict[str, int]: counts: dict[str, int] = {} for b in obs.buildings: btype = self._canonical_building_type(b.type) counts[btype] = counts.get(btype, 0) + 1 return counts def _canonical_building_type(self, item_type: str) -> str: return BUILDING_CANONICAL_TYPES.get(item_type, item_type) def _build_cost(self, item_type: str) -> int: canonical = self._canonical_building_type(item_type) return BUILDING_COSTS.get(item_type, BUILDING_COSTS.get(canonical, 500)) def _building_dimensions(self, item_type: str) -> tuple[int, int]: canonical = self._canonical_building_type(item_type) return BUILDING_DIMENSIONS.get(item_type, BUILDING_DIMENSIONS.get(canonical, (2, 2))) def _building_top_left(self, building: BuildingInfoModel) -> tuple[int, int]: canonical = self._canonical_building_type(building.type) offset_x, offset_y = BUILDING_TOPLEFT_OFFSETS.get( building.type, BUILDING_TOPLEFT_OFFSETS.get(canonical, (0, 0)), ) return ( (building.cell_x if building.cell_x > 0 else building.pos_x // 1024) - offset_x, (building.cell_y if building.cell_y > 0 else building.pos_y // 1024) - offset_y, ) def _occupied_building_cells(self, obs: OpenRAObservation) -> set[tuple[int, int]]: occupied: set[tuple[int, int]] = set() for building in obs.buildings: bx, by = self._building_top_left(building) width, height = self._building_dimensions(building.type) for dx in range(width): for dy in range(height): occupied.add((bx + dx, by + dy)) return occupied def _buildable_area_cells(self, obs: OpenRAObservation) -> set[tuple[int, int]]: cells: set[tuple[int, int]] = set() for building in obs.buildings: if self._canonical_building_type(building.type) in NO_BUILDABLE_AREA_TYPES: continue bx, by = self._building_top_left(building) width, height = self._building_dimensions(building.type) for dx in range(width): for dy in range(height): cells.add((bx + dx, by + dy)) return cells def _buildable_area_structure_count(self, obs: OpenRAObservation) -> int: return sum( 1 for building in obs.buildings if self._canonical_building_type(building.type) not in NO_BUILDABLE_AREA_TYPES ) def _footprint_close_enough_to_base( self, top_left_x: int, top_left_y: int, width: int, height: int, base_cells: set[tuple[int, int]], radius: int, ) -> bool: if not base_cells: return False max_x = top_left_x + width - 1 max_y = top_left_y + height - 1 for bx, by in base_cells: dx = 0 if top_left_x <= bx <= max_x else min(abs(bx - top_left_x), abs(bx - max_x)) dy = 0 if top_left_y <= by <= max_y else min(abs(by - top_left_y), abs(by - max_y)) if max(dx, dy) <= radius: return True return False def _candidate_fits_building_footprint( self, obs: OpenRAObservation, item_type: str, top_left_x: int, top_left_y: int, occupied: Optional[set[tuple[int, int]]] = None, base_cells: Optional[set[tuple[int, int]]] = None, ) -> bool: width, height = self._building_dimensions(item_type) w, h = self._get_map_size() if top_left_x < 0 or top_left_y < 0 or top_left_x + width > w or top_left_y + height > h: return False occupied = occupied or self._occupied_building_cells(obs) is_naval = self._canonical_building_type(item_type) in NAVAL_STRUCTURE_TYPES if base_cells is None: base_cells = self._buildable_area_cells(obs) if is_naval and not self._footprint_close_enough_to_base( top_left_x, top_left_y, width, height, base_cells, CHECK_FOR_WATER_RADIUS ): return False for dx in range(width): for dy in range(height): cell = (top_left_x + dx, top_left_y + dy) if cell in occupied: return False if is_naval: if not self._is_water_candidate_cell(*cell): return False else: if not self._is_passable_cell(*cell): return False if self._resource_amount_at(*cell) > 0.0: return False return True def _schedule_next_build_check(self, obs: OpenRAObservation, active: bool): delay = STRUCTURE_PRODUCTION_ACTIVE_DELAY if active else STRUCTURE_PRODUCTION_INACTIVE_DELAY random_bonus = random.randrange(STRUCTURE_PRODUCTION_RANDOM_BONUS_DELAY) if STRUCTURE_PRODUCTION_RANDOM_BONUS_DELAY > 0 else 0 self._next_build_check_tick = obs.tick + delay + random_bonus def _structure_queue_type(self, item_type: str) -> str: canonical = self._canonical_building_type(item_type) return "Defense" if canonical in DEFENSE_STRUCTURE_TYPES else "Building" def _clear_queue_backoff(self, queue_type: str): self._placement_backoff_until.pop(queue_type, None) self._placement_backoff_snapshot.pop(queue_type, None) def _queue_backoff_active(self, queue_type: str, obs: OpenRAObservation) -> bool: until = self._placement_backoff_until.get(queue_type, -9999) if until <= obs.tick: self._clear_queue_backoff(queue_type) return False snapshot = self._placement_backoff_snapshot.get(queue_type) if snapshot is not None: prev_buildings, prev_conyards = snapshot current_conyards = sum(1 for b in obs.buildings if b.type == "fact") if len(obs.buildings) < prev_buildings or current_conyards > prev_conyards: self._clear_queue_backoff(queue_type) return False return True def _structure_queue_available(self, obs: OpenRAObservation, item_type: str) -> bool: canonical = self._canonical_building_type(item_type) if canonical in NAVAL_STRUCTURE_TYPES and not self._can_safely_build_naval_structure(obs): return False return not self._queue_backoff_active(self._structure_queue_type(canonical), obs) def _priority_structure_reservation(self, obs: OpenRAObservation) -> int: if self._build_index < len(BUILD_ORDER): return self._pending_build_cost(obs) power_balance = obs.economy.power_provided - obs.economy.power_drained minimum_excess_power = self._minimum_excess_power_target(obs) power_item = self._best_power_building(obs) if power_balance < minimum_excess_power and power_item and self._structure_queue_available(obs, power_item): return self._build_cost(power_item) if not self._has_adequate_refinery_count(obs) and self._can_produce(obs, "proc") and self._structure_queue_available(obs, "proc"): return self._build_cost("proc") if self._expansion_refinery_pending(obs) and self._can_produce(obs, "proc") and self._structure_queue_available(obs, "proc"): return self._build_cost("proc") if self._in_recovery_mode(obs): bldg_counts = self._building_counts(obs) if ( bldg_counts.get("proc", 0) == 0 and not self._base_under_pressure(obs) and self._can_produce(obs, "proc") and self._structure_queue_available(obs, "proc") ): return self._build_cost("proc") return 0 def _rewind_build_order_after_cancel(self, obs: OpenRAObservation, item_type: str): canonical = self._canonical_building_type(item_type) for idx, placeholder in enumerate(BUILD_ORDER): resolved = self._resolve_build_item(obs, placeholder) if resolved is None: continue if self._canonical_building_type(resolved) != canonical: continue existing = self._building_counts(obs).get(canonical, 0) required = sum( 1 for p in BUILD_ORDER[: idx + 1] if (rp := self._resolve_build_item(obs, p)) is not None and self._canonical_building_type(rp) == canonical ) if existing < required: self._build_index = min(self._build_index, idx) return def _placement_anchor( self, obs: OpenRAObservation, item_type: str, fallback: Tuple[int, int], ) -> Tuple[int, int]: if item_type == "proc": plan = self._best_refinery_plan(obs) if plan is not None: return plan["anchor"] return fallback def _placement_candidates( self, obs: OpenRAObservation, item_type: str, cx: int, cy: int, min_radius: int, max_radius: int, ) -> list[tuple[int, int]]: if item_type in NAVAL_STRUCTURE_TYPES: naval_candidates = self._naval_build_candidates(obs, item_type) if naval_candidates: return naval_candidates occupied = self._occupied_building_cells(obs) buildable_area = self._buildable_area_cells(obs) candidates: list[tuple[int, int]] = [] w, h = self._get_map_size() for radius in range(min_radius, max_radius + 1): for dx in range(-radius, radius + 1): for dy in range(-radius, radius + 1): if max(abs(dx), abs(dy)) != radius: continue x = cx + dx y = cy + dy if x < 0 or y < 0 or x >= w or y >= h: continue candidates.append((x, y)) fitting = [ c for c in candidates if self._candidate_fits_building_footprint(obs, item_type, c[0], c[1], occupied, buildable_area) ] if fitting: candidates = fitting return candidates def _resource_patch_threat(self, obs: OpenRAObservation, patch: dict[str, float | int]) -> int: px = int(patch.get("resource_center_x", patch["center_x"])) py = int(patch.get("resource_center_y", patch["center_y"])) enemies = 0 for enemy in obs.visible_enemies: if self._cell_distance(enemy.cell_x, enemy.cell_y, px, py) <= RESOURCE_PATCH_THREAT_RADIUS: enemies += 1 for building in obs.visible_enemy_buildings: if self._cell_distance(building.cell_x, building.cell_y, px, py) <= RESOURCE_PATCH_THREAT_RADIUS: enemies += 2 return enemies def _patch_target(self, patch: dict[str, float | int]) -> tuple[int, int]: return ( int(patch.get("resource_center_x", patch["center_x"])), int(patch.get("resource_center_y", patch["center_y"])), ) def _nearest_patch_state( self, patch_states: list[dict[str, object]], x: int, y: int, radius: int, allow_fallback: bool = False, ) -> Optional[dict[str, object]]: best: Optional[dict[str, object]] = None best_dist = radius + 1 for state in patch_states: tx, ty = state["target"] # type: ignore[index] dist = self._cell_distance(x, y, tx, ty) if dist <= radius and dist < best_dist: best = state best_dist = dist if best is None and allow_fallback and patch_states: best = min( patch_states, key=lambda state: self._cell_distance(x, y, state["target"][0], state["target"][1]), # type: ignore[index] ) return best def _resource_patch_states(self, obs: OpenRAObservation) -> list[dict[str, object]]: patch_states: list[dict[str, object]] = [] refineries = [b for b in obs.buildings if b.type == "proc"] conyards = [b for b in obs.buildings if b.type == "fact"] base_center = self._base_center(obs) anchors = [ ( conyard.cell_x if conyard.cell_x > 0 else conyard.pos_x // 1024, conyard.cell_y if conyard.cell_y > 0 else conyard.pos_y // 1024, ) for conyard in conyards ] if not anchors and base_center is not None: anchors = [base_center] for idx, patch in enumerate(self._resource_patches): target = self._patch_target(patch) memory = self._patch_memory(patch) nearest_refinery_distance = self._nearest_distance_to_buildings(target[0], target[1], refineries) nearby_refineries = sum( 1 for refinery in refineries if self._cell_distance( target[0], target[1], refinery.cell_x if refinery.cell_x > 0 else refinery.pos_x // 1024, refinery.cell_y if refinery.cell_y > 0 else refinery.pos_y // 1024, ) <= RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS ) anchor_distance = self._nearest_anchor_distance(target[0], target[1], anchors) base_distance = ( self._cell_distance(target[0], target[1], base_center[0], base_center[1]) if base_center is not None else anchor_distance ) patch_states.append( { "id": idx, "patch": patch, "target": target, "harvesters": [], "harvester_count": 0, "refinery_count": 0, "nearby_refineries": nearby_refineries, "nearest_refinery_distance": nearest_refinery_distance, "anchor_distance": anchor_distance, "base_distance": base_distance, "threat": self._resource_patch_threat(obs, patch), "depletion_ratio": float(memory.get("depletion_ratio", 0.0)), "depletion_trend": float(memory.get("depletion_trend", 0.0)), "travel_cost": 0, "capacity": 0, "saturation": 0.0, "lack": 0, "density_score": 0, "score": 0, "refinery_score": 0, "expansion_score": 0, } ) if not patch_states: return patch_states for refinery in refineries: rx = refinery.cell_x if refinery.cell_x > 0 else refinery.pos_x // 1024 ry = refinery.cell_y if refinery.cell_y > 0 else refinery.pos_y // 1024 state = self._nearest_patch_state( patch_states, rx, ry, HARVESTER_REASSIGN_REFINERY_RADIUS, allow_fallback=True, ) if state is not None: state["refinery_count"] = int(state["refinery_count"]) + 1 state["nearest_refinery_distance"] = min( int(state["nearest_refinery_distance"]), self._cell_distance(rx, ry, state["target"][0], state["target"][1]), # type: ignore[index] ) for harvester in [u for u in obs.units if u.type == "harv"]: state = self._nearest_patch_state( patch_states, harvester.cell_x, harvester.cell_y, HARVESTER_PATCH_ASSIGN_RADIUS, allow_fallback=True, ) if state is None: forced_target = self._harvester_patch_targets.get(harvester.actor_id) if forced_target is not None: state = self._nearest_patch_state( patch_states, forced_target[0], forced_target[1], HARVESTER_PATCH_ASSIGN_RADIUS * 2, allow_fallback=True, ) if state is not None: state["harvesters"].append(harvester) # type: ignore[index] for state in patch_states: patch = state["patch"] # type: ignore[assignment] harvester_count = len(state["harvesters"]) # type: ignore[arg-type] cells = int(patch["cells"]) # type: ignore[index] total_density = float(patch["total_density"]) # type: ignore[index] threat = int(state["threat"]) refinery_count = int(state["refinery_count"]) nearby_refineries = int(state["nearby_refineries"]) nearest_refinery_distance = min( int(state["nearest_refinery_distance"]), RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS * 3, ) anchor_distance = int(state["anchor_distance"]) base_distance = int(state["base_distance"]) depletion_ratio = float(state["depletion_ratio"]) depletion_trend = float(state["depletion_trend"]) capacity = self._resource_patch_capacity( total_density, cells, refinery_count, depletion_ratio, threat, ) if capacity > harvester_count: lack = capacity - harvester_count elif capacity < harvester_count: lack = -(harvester_count - capacity) else: lack = 0 if refinery_count <= 0 and lack > 0: lack = min(lack, 1) if threat > 0 and lack > 0: lack = 0 saturation = harvester_count / max(1, capacity) if capacity > 0 else float(harvester_count) travel_cost = nearest_refinery_distance if refineries else anchor_distance + 8 if refinery_count <= 0 and refineries: travel_cost = min(travel_cost + 4, anchor_distance + 10) density_score = int(total_density * 8) + cells * 24 depletion_penalty = int(depletion_ratio * 400) + int(depletion_trend * 320) support_bonus = refinery_count * 350 if refineries: support_bonus += max( 0, RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS - min(nearest_refinery_distance, RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS), ) * 14 score = density_score + support_bonus score -= threat * 240 score -= travel_cost * 12 score -= depletion_penalty if harvester_count == 0 and refinery_count > 0 and threat == 0: score += 90 if saturation > 1.0: score -= int((saturation - 1.0) * 300) refinery_score = density_score refinery_score -= anchor_distance * 16 refinery_score -= nearby_refineries * 600 refinery_score -= threat * 220 refinery_score -= depletion_penalty if nearby_refineries == 0: refinery_score += 250 if refineries: refinery_score += min(nearest_refinery_distance, RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS) * 20 if threat == 0 and saturation >= 0.75: refinery_score += 80 expansion_score = density_score expansion_score -= base_distance * 6 expansion_score -= threat * 240 expansion_score -= depletion_penalty if nearby_refineries == 0: expansion_score += 120 if anchor_distance > MCV_FRIENDLY_CONYARD_DISLIKE_RANGE: expansion_score += 100 if nearest_refinery_distance > MCV_FRIENDLY_REFINERY_DISLIKE_RANGE: expansion_score += 80 if saturation >= 1.0 and threat == 0: expansion_score += 60 state["harvester_count"] = harvester_count state["capacity"] = capacity state["saturation"] = saturation state["travel_cost"] = travel_cost state["lack"] = lack state["density_score"] = density_score state["score"] = score state["refinery_score"] = refinery_score state["expansion_score"] = expansion_score return patch_states def _can_reassign_harvester(self, obs: OpenRAObservation, harvester: UnitInfoModel) -> bool: if obs.tick < self._harvester_retreat_until.get(harvester.actor_id, -9999): return False if obs.tick < self._harvester_reassign_until.get(harvester.actor_id, -9999): return False if self._harvester_recently_damaged(obs, harvester.actor_id): return False if obs.tick < self._harvester_no_resource_until.get(harvester.actor_id, -9999): return False if self._nearest_enemy_to_unit(obs, harvester, HARVESTER_THREAT_RADIUS) is not None: return False activity = harvester.current_activity.lower() if "dock" in activity: return False return True def _reassign_low_effect_harvesters( self, obs: OpenRAObservation, ) -> tuple[list[CommandModel], set[int]]: if obs.tick - self._last_harvester_reassign_tick < LOW_EFFECT_HARVESTER_SCAN_INTERVAL: return [], set() self._last_harvester_reassign_tick = obs.tick if self._base_under_pressure(obs) or len(self._resource_patches) < 2: return [], set() patch_states = self._resource_patch_states(obs) donors = [state for state in patch_states if int(state["lack"]) < 0] receivers = [ state for state in patch_states if int(state["lack"]) > 0 and int(state["threat"]) == 0 ] if not donors or not receivers: fallback_donors = [ state for state in patch_states if int(state["harvester_count"]) > 1 and int(state["threat"]) == 0 and float(state["saturation"]) >= 1.0 ] if not receivers or not fallback_donors: return [], set() best_receiver = max(receivers, key=lambda state: int(state["score"])) donors = [ state for state in fallback_donors if int(best_receiver["score"]) > int(state["score"]) + 600 ] if not donors: return [], set() for donor in donors: donor["lack"] = min(int(donor["lack"]), -1) donors.sort(key=lambda state: int(state["lack"])) receivers.sort( key=lambda state: ( int(state["score"]), int(state["lack"]), -int(state["travel_cost"]), ), reverse=True, ) commands: list[CommandModel] = [] redirected: set[int] = set() for receiver in receivers: need = int(receiver["lack"]) if int(receiver["refinery_count"]) <= 0: need = min(need, 1) if need <= 0: continue tx, ty = receiver["target"] # type: ignore[index] for donor in donors: if need <= 0 or int(donor["lack"]) >= 0: continue harvesters = sorted( donor["harvesters"], # type: ignore[index] key=lambda u: self._cell_distance(u.cell_x, u.cell_y, tx, ty), ) for harvester in harvesters: if need <= 0 or int(donor["lack"]) >= 0: break if harvester.actor_id in redirected: continue if not self._can_reassign_harvester(obs, harvester): continue if self._harvester_patch_targets.get(harvester.actor_id) == (tx, ty): continue commands.append( CommandModel( action=ActionType.HARVEST, actor_id=harvester.actor_id, target_x=tx, target_y=ty, ) ) redirected.add(harvester.actor_id) donor["lack"] = int(donor["lack"]) + 1 need -= 1 self._harvester_patch_targets[harvester.actor_id] = (tx, ty) self._harvester_reassign_until[harvester.actor_id] = obs.tick + HARVESTER_REASSIGN_COOLDOWN self._harvester_last_progress_tick[harvester.actor_id] = obs.tick self._log( f"Redirecting harv #{harvester.actor_id} -> patch ({tx},{ty}) " f"from overloaded patch {donor['target']}" ) return commands, redirected def _best_refinery_plan( self, obs: OpenRAObservation, ) -> Optional[dict[str, Tuple[int, int]]]: patch_states = self._resource_patch_states(obs) if not patch_states: return None conyards = [b for b in obs.buildings if b.type == "fact"] if not conyards: return None requested = self._current_requested_refinery(obs) if requested is not None: request_id, anchor, target = requested return { "anchor": anchor, "target": target, "request_id": request_id, } # type: ignore[return-value] best: Optional[tuple[int, Tuple[int, int], Tuple[int, int]]] = None for conyard in conyards: anchor = ( conyard.cell_x if conyard.cell_x > 0 else conyard.pos_x // 1024, conyard.cell_y if conyard.cell_y > 0 else conyard.pos_y // 1024, ) for state in patch_states: target = state["target"] # type: ignore[index] dist = self._cell_distance(anchor[0], anchor[1], target[0], target[1]) if dist > BASE_BUILD_MAX_RADIUS + RESOURCE_PATCH_SEARCH_MARGIN: continue if int(state["nearby_refineries"]) >= MAX_REFINERIES_PER_PATCH: continue score = int(state["refinery_score"]) - dist * 8 if int(state["threat"]) == 0 and float(state["depletion_ratio"]) < 0.6: score += 60 if best is None or score > best[0]: best = (score, anchor, target) if best is None: conyard = conyards[0] anchor = ( conyard.cell_x if conyard.cell_x > 0 else conyard.pos_x // 1024, conyard.cell_y if conyard.cell_y > 0 else conyard.pos_y // 1024, ) state = min( patch_states, key=lambda s: self._cell_distance(anchor[0], anchor[1], s["target"][0], s["target"][1]), # type: ignore[index] ) return {"anchor": anchor, "target": state["target"]} # type: ignore[return-value] return {"anchor": best[1], "target": best[2]} def _best_expansion_patch_target(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]: patch_states = self._resource_patch_states(obs) if not patch_states: return None conyards = [b for b in obs.buildings if b.type == "fact"] refineries = [b for b in obs.buildings if b.type == "proc"] active_targets = list(self._mcv_targets.values()) best: Optional[tuple[int, Tuple[int, int]]] = None for state in patch_states: target = state["target"] # type: ignore[index] if conyards and self._nearest_distance_to_buildings(target[0], target[1], conyards) < MCV_FRIENDLY_CONYARD_DISLIKE_RANGE: continue if refineries and self._nearest_distance_to_buildings(target[0], target[1], refineries) < MCV_FRIENDLY_REFINERY_DISLIKE_RANGE: continue if any( self._cell_distance(target[0], target[1], active_target[0], active_target[1]) < MCV_FRIENDLY_CONYARD_DISLIKE_RANGE for active_target in active_targets ): continue if int(state["threat"]) > 0: continue score = int(state["expansion_score"]) if float(state["depletion_ratio"]) >= 0.8: score -= 200 if int(state["harvester_count"]) == 0 and int(state["capacity"]) >= 2: score += 80 if best is None or score > best[0]: best = (score, target) return None if best is None or best[0] <= 0 else best[1] def _naval_build_candidates( self, obs: OpenRAObservation, item_type: str, occupied: Optional[set[tuple[int, int]]] = None, ) -> list[tuple[int, int]]: center = self._placement_base_center(obs) or (0, 0) w, h = self._get_map_size() if not self._spatial_raw: return [] occupied = occupied or self._occupied_building_cells(obs) buildable_area = self._buildable_area_cells(obs) width, height = self._building_dimensions(item_type) candidates: set[tuple[int, int]] = set() origins = [ self._building_top_left(building) for building in obs.buildings if self._canonical_building_type(building.type) not in NO_BUILDABLE_AREA_TYPES ] for ox, oy in origins: for dx in range(-NAVAL_WATER_SCAN_RADIUS, NAVAL_WATER_SCAN_RADIUS + 1, NAVAL_WATER_SCAN_STRIDE): for dy in range(-NAVAL_WATER_SCAN_RADIUS, NAVAL_WATER_SCAN_RADIUS + 1, NAVAL_WATER_SCAN_STRIDE): if dx * dx + dy * dy > NAVAL_WATER_SCAN_RADIUS * NAVAL_WATER_SCAN_RADIUS: continue x = ox + dx y = oy + dy if x < 0 or y < 0 or x + width > w or y + height > h: continue candidate_center = (x + width // 2, y + height // 2) center_radius = max(abs(candidate_center[0] - center[0]), abs(candidate_center[1] - center[1])) if center_radius < BASE_BUILD_MIN_RADIUS or center_radius > NAVAL_BUILD_MAX_RADIUS: continue if not self._candidate_fits_building_footprint( obs, item_type, x, y, occupied=occupied, base_cells=buildable_area, ): continue candidates.add((x, y)) ordered = list(candidates) ordered.sort( key=lambda p: ( -self._naval_anchor_score(item_type, p[0], p[1]), self._cell_distance(p[0], p[1], center[0], center[1]), ) ) return ordered def _naval_anchor_score(self, item_type: str, top_left_x: int, top_left_y: int) -> int: width, height = self._building_dimensions(item_type) center_x = top_left_x + width // 2 center_y = top_left_y + height // 2 return self._local_water_score(center_x, center_y, 2) def _naval_gate_open_water_windows(self, obs: OpenRAObservation) -> int: if not self._spatial_raw: return 0 # Count only footprint-valid naval anchors with enough surrounding water. candidates = self._naval_build_candidates(obs, "spen") if not candidates: return 0 viable = [ candidate for candidate in candidates if self._naval_anchor_score("spen", candidate[0], candidate[1]) >= NAVAL_MIN_WATER_SCORE ] return min(len(viable), NAVAL_MIN_OPEN_WATER_WINDOWS) def _best_naval_anchor(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]: candidates = [ candidate for candidate in self._naval_build_candidates(obs, "spen") if self._naval_anchor_score("spen", candidate[0], candidate[1]) >= NAVAL_MIN_WATER_SCORE ] if len(candidates) < NAVAL_CANDIDATE_MIN_COUNT: return None return candidates[0] def _can_safely_build_naval_structure(self, obs: OpenRAObservation) -> bool: if obs.tick < self._naval_disabled_until: return False if obs.tick - self._last_naval_gate_tick <= NAVAL_GATE_CACHE_TICKS: return self._cached_naval_gate_ok if any(self._canonical_building_type(b.type) in NAVAL_STRUCTURE_TYPES for b in obs.buildings): self._naval_retry_buildable_count = -1 self._cached_naval_gate_ok = True self._last_naval_gate_tick = obs.tick return True elif self._naval_retry_buildable_count >= 0: if self._buildable_area_structure_count(obs) <= self._naval_retry_buildable_count: self._cached_naval_gate_ok = False self._last_naval_gate_tick = obs.tick return False self._naval_retry_buildable_count = -1 # Human-like: only enable naval when there is a strong, footprint-valid # naval anchor near our current buildable area. if self._naval_gate_open_water_windows(obs) < NAVAL_MIN_OPEN_WATER_WINDOWS: self._cached_naval_gate_ok = False self._last_naval_gate_tick = obs.tick return False ok = self._best_naval_anchor(obs) is not None self._cached_naval_gate_ok = ok self._last_naval_gate_tick = obs.tick return ok def _minimum_excess_power_target(self, obs: OpenRAObservation) -> int: bonus = EXCESS_POWER_INCREMENT * (len(obs.buildings) // max(1, EXCESS_POWER_INCREASE_THRESHOLD)) return max(MINIMUM_EXCESS_POWER, min(MAXIMUM_EXCESS_POWER, MINIMUM_EXCESS_POWER + bonus)) def _has_any_production_building(self, obs: OpenRAObservation) -> bool: return any(self._canonical_building_type(b.type) in PRODUCTION_BUILDING_TYPES for b in obs.buildings) def _optimal_refinery_count(self, obs: OpenRAObservation) -> int: target = INITIAL_MIN_REFINERY_COUNT if self._has_any_production_building(obs): target += ADDITIONAL_MIN_REFINERY_COUNT return target def _has_adequate_refinery_count(self, obs: OpenRAObservation) -> bool: refinery_count = sum(1 for b in obs.buildings if b.type == "proc") has_power = any(b.type in {"powr", "apwr"} for b in obs.buildings) has_conyard = any(b.type == "fact" for b in obs.buildings) return ( refinery_count >= self._optimal_refinery_count(obs) or not has_power or not has_conyard ) def _best_power_building(self, obs: OpenRAObservation) -> Optional[str]: for item in ("apwr", "powr"): if self._can_produce(obs, item) and self._structure_queue_available(obs, item): return item return None def _best_production_building(self, obs: OpenRAObservation) -> Optional[str]: candidates = [] counts = self._building_counts(obs) for item in ("weap", "barr", "tent"): if not self._can_produce(obs, item): continue if not self._structure_queue_available(obs, item): continue limit = BUILDING_LIMITS.get(item) if limit is not None and counts.get(item, 0) >= limit: continue candidates.append(item) if not candidates: return None return random.choice(candidates) def _preferred_early_naval_building(self, obs: OpenRAObservation, credits: int) -> Optional[str]: if self._in_recovery_mode(obs): return None if any(self._canonical_building_type(b.type) in NAVAL_STRUCTURE_TYPES for b in obs.buildings): return None if not any(b.type == "proc" for b in obs.buildings): return None if not any(b.type in WAR_FACTORY_TYPES for b in obs.buildings): return None if not self._can_safely_build_naval_structure(obs): return None anchor = self._best_naval_anchor(obs) if anchor is None or self._naval_anchor_score("spen", anchor[0], anchor[1]) < NAVAL_EARLY_BUILD_WATER_SCORE: return None naval_item = self._best_naval_production_building(obs) if naval_item is None: return None if credits < self._build_cost(naval_item) + NAVAL_EARLY_BUILD_CREDIT_BUFFER: return None return naval_item def _best_naval_production_building(self, obs: OpenRAObservation) -> Optional[str]: candidates = [] counts = self._building_counts(obs) for item in NAVAL_STRUCTURE_TYPES: if not self._can_produce(obs, item): continue if not self._structure_queue_available(obs, item): continue limit = BUILDING_LIMITS.get(item) if limit is not None and counts.get(item, 0) >= limit: continue candidates.append(item) if not candidates: return None return random.choice(candidates) def _unit_at_limit(self, obs: OpenRAObservation, item_type: str) -> bool: limit = UNIT_LIMITS.get(item_type) if limit is None: return False current = sum(1 for u in obs.units if u.type == item_type) current += sum(1 for p in obs.production if p.item == item_type) current += self._requested_production_count(item_type) return current >= limit def _current_unit_count(self, obs: OpenRAObservation, item_type: str) -> int: return sum(1 for u in obs.units if u.type == item_type) def _queue_delay_active(self, obs: OpenRAObservation, queue_type: str) -> bool: return obs.tick < self._queue_delay_until.get(queue_type, -9999) def _unit_delay_active(self, obs: OpenRAObservation, item_type: str) -> bool: return obs.tick < self._unit_delay_until.get(item_type, -9999) def _mark_unit_trained(self, obs: OpenRAObservation, item_type: str, queue_type: str): queue_delay = QUEUE_PRODUCTION_DELAYS.get(queue_type, 0) if queue_delay > 0: self._queue_delay_until[queue_type] = max( self._queue_delay_until.get(queue_type, -9999), obs.tick + queue_delay, ) unit_delay = UNIT_PRODUCTION_DELAYS.get(item_type, 0) if unit_delay > 0: self._unit_delay_until[item_type] = max( self._unit_delay_until.get(item_type, -9999), obs.tick + unit_delay, ) def _idle_base_unit_count(self, obs: OpenRAObservation, queue_type: Optional[str] = None) -> int: base_center = self._base_center(obs) if base_center is None: return 0 count = 0 for unit in obs.units: if unit.type in {"harv", "mcv"}: continue if not getattr(unit, "is_idle", False): continue if self._cell_distance(unit.cell_x, unit.cell_y, base_center[0], base_center[1]) > IDLE_BASE_UNIT_RADIUS: continue if queue_type is not None and self._queue_type_for_unit(unit.type) != queue_type: continue count += 1 return count def _production_support_available( self, obs: OpenRAObservation, item_type: str, unit_counts: Optional[dict[str, int]] = None, ) -> bool: if item_type in SHIP_TYPES: return any(self._canonical_building_type(b.type) in NAVAL_STRUCTURE_TYPES for b in obs.buildings) if unit_counts is None: unit_counts = {} for unit in obs.units: unit_counts[unit.type] = unit_counts.get(unit.type, 0) + 1 for prod in obs.production: unit_counts[prod.item] = unit_counts.get(prod.item, 0) + 1 for requested in self._unit_requests: unit_counts[requested] = unit_counts.get(requested, 0) + 1 if item_type in PLANE_TYPES: airfields = sum(1 for b in obs.buildings if self._canonical_building_type(b.type) == "afld") if airfields <= 0: return False plane_count = sum(unit_counts.get(t, 0) for t in PLANE_TYPES) return plane_count < airfields * AIRFIELD_PLANE_CAPACITY if item_type in AIRCRAFT_TYPES - PLANE_TYPES: helipads = sum(1 for b in obs.buildings if self._canonical_building_type(b.type) == "hpad") if helipads <= 0: return False aircraft_count = sum(unit_counts.get(t, 0) for t in AIRCRAFT_TYPES - PLANE_TYPES) return aircraft_count < helipads * HELIPAD_AIRCRAFT_CAPACITY return True def _economy_ready_for_tech(self, obs: OpenRAObservation) -> bool: refinery_count = sum(1 for b in obs.buildings if b.type == "proc") if refinery_count < 2: return False return self._harvester_target(obs) >= max(2, refinery_count) def _harvester_target(self, obs: OpenRAObservation) -> int: refinery_count = sum(1 for b in obs.buildings if b.type == "proc") if refinery_count <= 0: return 0 target = max(INITIAL_HARVESTERS, refinery_count) return min(target, UNIT_LIMITS.get("harv", target)) def _should_delay_harvester_request(self, obs: OpenRAObservation, current_harvesters: int) -> bool: return False def _desired_unit_share( self, obs: OpenRAObservation, item_type: str, unit_counts: dict[str, int], ) -> int: share = UNITS_TO_BUILD.get(item_type, 0) if share <= 0: return 0 queue_type = self._queue_type_for_unit(item_type) if queue_type is not None and self._unit_delay_active(obs, item_type): return 0 if not self._production_support_available(obs, item_type, unit_counts): return 0 if item_type == "harv": return share if unit_counts.get("harv", 0) < self._harvester_target(obs) else 0 return share def _ensure_mcv_requests(self, obs: OpenRAObservation): if not any(b.type in WAR_FACTORY_TYPES for b in obs.buildings): return mcvs = sum(1 for u in obs.units if u.type == "mcv") conyards = sum(1 for b in obs.buildings if b.type == "fact") if (conyards <= 0 and mcvs > 1) or (conyards > 0 and mcvs > 0): return pending = sum(1 for p in obs.production if p.item == "mcv") + self._requested_production_count("mcv") if conyards + mcvs + pending >= self._desired_mcv_count(obs): return if pending > 0: return self._request_unit_production("mcv") def _desired_mcv_count(self, obs: OpenRAObservation) -> int: if self._available_credits(obs) >= BUILD_ADDITIONAL_MCV_CASH_AMOUNT: return MINIMUM_CONSTRUCTION_YARD_COUNT + ADDITIONAL_CONSTRUCTION_YARD_COUNT return MINIMUM_CONSTRUCTION_YARD_COUNT def _expansion_refinery_pending(self, obs: OpenRAObservation) -> bool: if self._expansion_refinery_goal <= 0: return False if obs.tick >= self._expansion_refinery_until_tick: self._clear_expansion_refinery_need() return False current = sum(1 for b in obs.buildings if b.type == "proc") current += sum(1 for p in obs.production if p.item == "proc") if current >= self._expansion_refinery_goal: self._clear_expansion_refinery_need() return False return True def _remember_expansion_refinery_need(self, obs: OpenRAObservation): current = sum(1 for b in obs.buildings if b.type == "proc") current += sum(1 for p in obs.production if p.item == "proc") self._expansion_refinery_goal = max(self._expansion_refinery_goal, current) + 1 self._expansion_refinery_until_tick = max(self._expansion_refinery_until_tick, obs.tick + 2400) def _clear_expansion_refinery_need(self): self._expansion_refinery_goal = 0 self._expansion_refinery_until_tick = -9999 def _remember_requested_refinery( self, obs: OpenRAObservation, actor_id: int, conyard_loc: Tuple[int, int], resource_loc: Tuple[int, int], ): self._requested_refineries[actor_id] = (conyard_loc, resource_loc, obs.tick + REQUESTED_REFINERY_TTL) def _current_requested_refinery( self, obs: OpenRAObservation, ) -> Optional[tuple[int, Tuple[int, int], Tuple[int, int]]]: refineries = [b for b in obs.buildings if b.type == "proc"] for actor_id, (conyard_loc, resource_loc, expiry_tick) in list(self._requested_refineries.items()): if expiry_tick <= obs.tick: self._requested_refineries.pop(actor_id, None) continue if ( refineries and self._nearest_distance_to_buildings(resource_loc[0], resource_loc[1], refineries) < RESOURCE_PATCH_REFINERY_DISLIKE_RADIUS ): self._requested_refineries.pop(actor_id, None) continue return actor_id, conyard_loc, resource_loc return None def _consume_requested_refinery(self, actor_id: int): self._requested_refineries.pop(actor_id, None) def _expansion_pressure(self, obs: OpenRAObservation) -> tuple[bool, bool]: if self._pick_expansion_target(obs) is None: return False, False refinery_count = sum(1 for b in obs.buildings if b.type == "proc") production_count = sum( 1 for b in obs.buildings if self._canonical_building_type(b.type) in PRODUCTION_BUILDING_TYPES ) if refinery_count < INITIAL_MIN_REFINERY_COUNT + ADDITIONAL_MIN_REFINERY_COUNT or production_count <= 0: return False, False tech_count = sum( 1 for b in obs.buildings if self._canonical_building_type(b.type) in TECH_BUILDING_TYPES ) tolerate_on_cash = self._available_credits(obs) // 12000 expand_now = ( production_count + tech_count - random.choice(EXPANSION_TOLERATE_VALUES) - tolerate_on_cash >= refinery_count ) force_undeploy_even_no_base = ( production_count + tech_count - random.choice(FORCE_EXPANSION_TOLERATE_VALUES) - tolerate_on_cash >= refinery_count ) return expand_now, force_undeploy_even_no_base def _maybe_undeploy_conyard_for_expansion(self, obs: OpenRAObservation) -> Optional[CommandModel]: if obs.tick - self._last_conyard_undeploy_tick < CONYARD_UNDEPLOY_COOLDOWN: return None if any(u.type == "mcv" for u in obs.units): return None expand_now, force_undeploy_even_no_base = self._expansion_pressure(obs) if not expand_now: return None expansion_target = self._pick_expansion_target(obs) if expansion_target is None: return None conyards = [b for b in obs.buildings if b.type == "fact"] if len(conyards) <= 1 and not force_undeploy_even_no_base: return None movable_conyards = [ b for b in conyards if not (getattr(b, "is_producing", False) and getattr(b, "producing_item", "") == "proc") and obs.tick >= self._mcv_deploy_until.get(b.actor_id, -9999) ] if not movable_conyards: return None movable_conyards.sort( key=lambda b: ( self._cell_distance( b.cell_x if b.cell_x > 0 else b.pos_x // 1024, b.cell_y if b.cell_y > 0 else b.pos_y // 1024, expansion_target[0], expansion_target[1], ), b.actor_id, ) ) conyard = movable_conyards[0] self._last_conyard_undeploy_tick = obs.tick self._mcv_deploy_until[conyard.actor_id] = obs.tick + MCV_DEPLOY_COMMAND_COOLDOWN self._log(f"Undeploying conyard #{conyard.actor_id} for expansion -> {expansion_target}") return CommandModel(action=ActionType.DEPLOY, actor_id=conyard.actor_id) def _mcv_deploy_top_left(self, cell_x: int, cell_y: int) -> tuple[int, int]: return cell_x + MCV_DEPLOY_OFFSET[0], cell_y + MCV_DEPLOY_OFFSET[1] def _mcv_cell_for_top_left(self, top_left_x: int, top_left_y: int) -> tuple[int, int]: return top_left_x - MCV_DEPLOY_OFFSET[0], top_left_y - MCV_DEPLOY_OFFSET[1] def _can_mcv_deploy_at(self, obs: OpenRAObservation, cell_x: int, cell_y: int) -> bool: top_left_x, top_left_y = self._mcv_deploy_top_left(cell_x, cell_y) return self._candidate_fits_building_footprint(obs, "fact", top_left_x, top_left_y) def _best_mcv_deploy_target( self, obs: OpenRAObservation, mcv: UnitInfoModel, expansion_target: Tuple[int, int], ) -> Optional[Tuple[int, int]]: candidates = self._placement_candidates( obs, "fact", expansion_target[0], expansion_target[1], MCV_MIN_DEPLOY_RADIUS, MCV_MAX_DEPLOY_RADIUS, ) if not candidates: return None source = (mcv.cell_x, mcv.cell_y) deploy_cells = [self._mcv_cell_for_top_left(x, y) for x, y in candidates] deploy_cells.sort( key=lambda cell: ( abs(self._cell_distance(cell[0], cell[1], expansion_target[0], expansion_target[1]) - MCV_TRY_MAINTAIN_RANGE), self._cell_distance(cell[0], cell[1], source[0], source[1]), self._cell_distance(cell[0], cell[1], expansion_target[0], expansion_target[1]), ) ) return deploy_cells[0] def _pick_expansion_target(self, obs: OpenRAObservation) -> Optional[Tuple[int, int]]: patch_target = self._best_expansion_patch_target(obs) if patch_target is not None: return patch_target patch_states = self._resource_patch_states(obs) fallback_patch_states = [ state for state in patch_states if int(state["threat"]) == 0 and float(state["depletion_ratio"]) < 0.9 ] if fallback_patch_states: best_state = max( fallback_patch_states, key=lambda state: ( int(state["expansion_score"]), int(state["capacity"]), -int(state["base_distance"]), ), ) if int(best_state["expansion_score"]) > -200: return best_state["target"] # type: ignore[return-value] conyards = [b for b in obs.buildings if b.type == "fact"] refineries = [b for b in obs.buildings if b.type == "proc"] candidates = self._search_grid(obs) for target in candidates: if self._nearest_distance_to_buildings(target[0], target[1], conyards) < MCV_FRIENDLY_CONYARD_DISLIKE_RANGE: continue if refineries and self._nearest_distance_to_buildings(target[0], target[1], refineries) < MCV_FRIENDLY_REFINERY_DISLIKE_RANGE: continue return target return candidates[0] if candidates else None def _nearest_distance_to_buildings(self, x: int, y: int, buildings: list[BuildingInfoModel]) -> int: if not buildings: return 10**9 return min( self._cell_distance( x, y, b.cell_x if b.cell_x > 0 else b.pos_x // 1024, b.cell_y if b.cell_y > 0 else b.pos_y // 1024, ) for b in buildings ) def _cell_distance(self, ax: int, ay: int, bx: int, by: int) -> int: return abs(ax - bx) + abs(ay - by) def _queue_type_for_unit(self, item_type: str) -> Optional[str]: if item_type == "mcv": return "Vehicle" for queue_type, allowed in UNIT_QUEUE_ORDER: if item_type in allowed: return queue_type return None def _roll_assault_threshold(self) -> int: return SQUAD_SIZE + random.randrange(SQUAD_SIZE_RANDOM_BONUS) def _select_squad_leader(self, squad_units: list[UnitInfoModel]) -> UnitInfoModel: avg_x = sum(u.cell_x for u in squad_units) / len(squad_units) avg_y = sum(u.cell_y for u in squad_units) / len(squad_units) return min(squad_units, key=lambda u: (u.cell_x - avg_x) ** 2 + (u.cell_y - avg_y) ** 2) def _attack_wave_units( self, obs: OpenRAObservation, squad_units: list[UnitInfoModel], ) -> list[UnitInfoModel]: return squad_units def _regroup_squad_commands( self, squad_units: list[UnitInfoModel], leader: UnitInfoModel, regroup_radius: int = REGROUP_RADIUS, min_close_units: Optional[int] = None, circular: bool = False, ) -> List[CommandModel]: def within_radius(unit: UnitInfoModel) -> bool: dx = unit.cell_x - leader.cell_x dy = unit.cell_y - leader.cell_y if circular: return dx * dx + dy * dy <= regroup_radius * regroup_radius return self._cell_distance(unit.cell_x, unit.cell_y, leader.cell_x, leader.cell_y) <= regroup_radius close_units = [ u for u in squad_units if within_radius(u) ] if min_close_units is None: min_close_units = max(2, int(len(squad_units) * 0.4)) if len(close_units) >= min_close_units: return [] commands = [CommandModel(action=ActionType.STOP, actor_id=leader.actor_id)] redirected = 0 for u in squad_units: if u.actor_id == leader.actor_id: continue if not within_radius(u): commands.append(CommandModel( action=ActionType.ATTACK_MOVE, actor_id=u.actor_id, target_x=leader.cell_x, target_y=leader.cell_y, )) redirected += 1 if redirected: self._log(f"Regrouping {redirected}/{len(squad_units)} units around leader") return commands return [] def _visible_enemy_units_near( self, obs: OpenRAObservation, x: int, y: int, radius: int, ) -> list[UnitInfoModel]: return [ e for e in obs.visible_enemies if self._cell_distance(x, y, e.cell_x, e.cell_y) <= radius ] def _visible_enemy_buildings_near( self, obs: OpenRAObservation, x: int, y: int, radius: int, ) -> list[BuildingInfoModel]: return [ b for b in obs.visible_enemy_buildings if self._cell_distance(x, y, b.cell_x, b.cell_y) <= radius ] def _estimate_combat_power(self, actor) -> float: base = UNIT_COMBAT_POWER.get(actor.type, 0) if base == 0: base = BUILDING_THREAT_POWER.get(actor.type, 0) if base == 0: return 0.0 hp = getattr(actor, "hp_percent", 1.0) speed = getattr(actor, "speed", 0) attack_range = getattr(actor, "attack_range", 0) return base * max(0.2, hp) * (1.0 + min(speed / 200.0, 0.25) + min(attack_range / 12000.0, 0.25)) def _building_can_attack(self, building: BuildingInfoModel) -> bool: return ( self._canonical_building_type(building.type) in ATTACKING_BUILDING_TYPES and getattr(building, "is_powered", True) ) def _actor_cell(self, actor) -> Tuple[int, int]: if getattr(actor, "cell_x", 0) > 0 or getattr(actor, "cell_y", 0) > 0: return actor.cell_x, actor.cell_y return actor.pos_x // 1024, actor.pos_y // 1024 def _attack_or_flee_rules(self, rush: bool) -> tuple[tuple[tuple[str, ...], tuple[str, ...], tuple[str, ...], tuple[str, ...], str], ...]: return FUZZY_RUSH_RULES if rush else FUZZY_DEFAULT_RULES def _fuzzy_trapezoid(self, value: float, left: float, left_top: float, right_top: float, right: float) -> float: if value <= left or value >= right: if (value == left == left_top) or (value == right == right_top): return 1.0 return 0.0 if left_top <= value <= right_top: return 1.0 if value < left_top: if left_top == left: return 1.0 return (value - left) / (left_top - left) if right == right_top: return 1.0 return (right - value) / (right - right_top) def _fuzzy_input_membership(self, variable: str, term: str, value: float) -> float: if variable in {"OwnHealth", "EnemyHealth"}: shapes = { "NearDead": (0.0, 0.0, 20.0, 40.0), "Injured": (30.0, 50.0, 50.0, 70.0), "Normal": (50.0, 80.0, 100.0, 100.0), } else: shapes = { "Weak": (0.0, 0.0, 70.0, 90.0), "Equal": (85.0, 100.0, 100.0, 115.0), "Strong": (110.0, 150.0, 150.0, 1000.0), "Slow": (0.0, 0.0, 70.0, 90.0), "Fast": (110.0, 150.0, 150.0, 1000.0), } left, left_top, right_top, right = shapes[term] return self._fuzzy_trapezoid(value, left, left_top, right_top, right) def _fuzzy_output_membership(self, term: str, value: float) -> float: shapes = { "Attack": (0.0, 15.0, 15.0, 30.0), "Flee": (25.0, 35.0, 35.0, 50.0), } left, left_top, right_top, right = shapes[term] return self._fuzzy_trapezoid(value, left, left_top, right_top, right) def _normalized_health(self, actors: list) -> float: if not actors: return 0.0 return max(0.0, min(100.0, 100.0 * sum(max(0.0, getattr(actor, "hp_percent", 1.0)) for actor in actors) / len(actors))) def _attack_power_metric(self, actors: list) -> float: total = 0.0 for actor in actors: if isinstance(actor, UnitInfoModel): if actor.can_attack: total += UNIT_COMBAT_POWER.get(actor.type, 0) elif isinstance(actor, BuildingInfoModel) and self._building_can_attack(actor): total += BUILDING_THREAT_POWER.get(actor.type, 0) return total def _speed_metric(self, actors: list) -> float: speeds = [max(0, getattr(actor, "speed", 0)) for actor in actors if isinstance(actor, UnitInfoModel) and getattr(actor, "speed", 0) > 0] if not speeds: return 0.0 return sum(speeds) / len(speeds) def _relative_metric(self, own_value: float, enemy_value: float) -> float: if enemy_value <= 0: return 999.0 if own_value > 0 else 100.0 if own_value <= 0: return 0.0 return max(0.0, min(999.0, own_value / enemy_value * 100.0)) def _attack_or_flee_score(self, own_actors: list, enemy_actors: list, rush: bool) -> Optional[float]: inputs = { "OwnHealth": self._normalized_health(own_actors), "EnemyHealth": self._normalized_health(enemy_actors), "RelativeAttackPower": self._relative_metric( self._attack_power_metric(own_actors), self._attack_power_metric(enemy_actors), ), "RelativeSpeed": self._relative_metric( self._speed_metric(own_actors), self._speed_metric(enemy_actors), ), } activation = {"Attack": 0.0, "Flee": 0.0} for own_terms, enemy_terms, power_terms, speed_terms, outcome in self._attack_or_flee_rules(rush): degree = min( max(self._fuzzy_input_membership("OwnHealth", term, inputs["OwnHealth"]) for term in own_terms), max(self._fuzzy_input_membership("EnemyHealth", term, inputs["EnemyHealth"]) for term in enemy_terms), max(self._fuzzy_input_membership("RelativeAttackPower", term, inputs["RelativeAttackPower"]) for term in power_terms), max(self._fuzzy_input_membership("RelativeSpeed", term, inputs["RelativeSpeed"]) for term in speed_terms), ) activation[outcome] = max(activation[outcome], degree) if activation["Attack"] <= 0.0 and activation["Flee"] <= 0.0: return None numerator = 0.0 denominator = 0.0 for sample in range(101): value = sample * 0.5 membership = max( min(activation["Attack"], self._fuzzy_output_membership("Attack", value)), min(activation["Flee"], self._fuzzy_output_membership("Flee", value)), ) numerator += value * membership denominator += membership if denominator <= 0.0: return None return numerator / denominator def _attack_or_flee_can_attack(self, own_actors: list, enemy_actors: list, rush: bool) -> bool: score = self._attack_or_flee_score(own_actors, enemy_actors, rush) return score is not None and score < 30.0 def _select_rush_target( self, obs: OpenRAObservation, rush_units: list[UnitInfoModel], ) -> Optional[Tuple[int, int, int, str]]: attackable_units = [ unit for unit in rush_units if unit.can_attack and unit.type not in AIRCRAFT_TYPES | SHIP_TYPES ] if not attackable_units: return None sample_unit = random.choice(attackable_units) conyards = [ building for building in obs.visible_enemy_buildings if self._canonical_building_type(building.type) == "fact" ] conyards.sort(key=lambda building: self._cell_distance(sample_unit.cell_x, sample_unit.cell_y, *self._actor_cell(building))) for conyard in conyards: cx, cy = self._actor_cell(conyard) defenders: list = [ enemy for enemy in obs.visible_enemies if enemy.can_attack and enemy.type not in AIRCRAFT_TYPES | SHIP_TYPES and self._cell_distance(enemy.cell_x, enemy.cell_y, cx, cy) <= RUSH_ATTACK_SCAN_RADIUS ] defenders.extend( building for building in obs.visible_enemy_buildings if building.actor_id != conyard.actor_id and self._building_can_attack(building) and self._cell_distance(*self._actor_cell(building), cx, cy) <= RUSH_ATTACK_SCAN_RADIUS ) if not self._attack_or_flee_can_attack(attackable_units, defenders, rush=True): continue target = random.choice(defenders) if defenders else conyard target_kind = "unit" if isinstance(target, UnitInfoModel) else "building" tx, ty = self._actor_cell(target) return tx, ty, target.actor_id, target_kind return None def _should_take_local_fight( self, squad_units: list[UnitInfoModel], enemy_units: list[UnitInfoModel], enemy_buildings: list[BuildingInfoModel], rush: bool, cautious: bool = False, squad_name: str = "assault", ) -> bool: own_units = [u for u in squad_units if u.can_attack] if not own_units: return False if squad_name in {"assault", "rush"}: enemy_actors = list(enemy_units) + list(enemy_buildings) if cautious and self._normalized_health(own_units) < 55.0: return False return self._attack_or_flee_can_attack(own_units, enemy_actors, rush=rush) own_power = sum(self._estimate_combat_power(u) for u in own_units) enemy_power = ( sum(self._estimate_combat_power(u) for u in enemy_units) + sum(self._estimate_combat_power(b) * 0.7 for b in enemy_buildings) ) own_avg_hp = sum(u.hp_percent for u in own_units) / max(1, len(own_units)) enemy_avg_hp = ( sum(u.hp_percent for u in enemy_units) + sum(b.hp_percent for b in enemy_buildings) ) / max(1, len(enemy_units) + len(enemy_buildings)) own_avg_speed = sum(max(1, getattr(u, "speed", 1)) for u in own_units) / max(1, len(own_units)) enemy_avg_speed = sum(max(1, getattr(u, "speed", 1)) for u in enemy_units) / max(1, len(enemy_units)) if enemy_power <= 1: return True power_ratio = own_power / enemy_power speed_ratio = own_avg_speed / max(1.0, enemy_avg_speed) required_ratio = 1.05 min_hp = 0.48 if squad_name == "protection": required_ratio = 0.92 min_hp = 0.4 elif squad_name == "rush": required_ratio = 1.0 min_hp = 0.55 elif squad_name in {"air", "naval"}: required_ratio = 1.08 min_hp = 0.5 if rush: required_ratio = min(required_ratio, 1.04) if cautious: required_ratio += 0.12 min_hp = max(min_hp, 0.5) if own_avg_hp < RETREAT_HEALTH_THRESHOLD: required_ratio += 0.18 elif own_avg_hp >= enemy_avg_hp: required_ratio -= 0.08 if speed_ratio < 0.9 and squad_name not in {"air", "rush"}: required_ratio += 0.05 if squad_name == "air" and any(b.type in {"sam", "agun", "tsla"} for b in enemy_buildings): required_ratio += 0.15 if squad_name == "naval" and enemy_buildings: required_ratio += 0.08 if squad_name != "protection" and any(b.type in {"tsla", "gun", "ftur", "agun"} for b in enemy_buildings): required_ratio += 0.08 if not enemy_units and not any(b.type in DEFENSE_STRUCTURE_TYPES | {"agun", "sam"} for b in enemy_buildings): required_ratio -= 0.1 required_ratio = max(0.82, required_ratio) return own_avg_hp >= min_hp and power_ratio >= required_ratio def _pick_priority_target( self, obs: OpenRAObservation, x: Optional[int], y: Optional[int], local_only: bool, squad_name: str = "assault", ) -> Optional[Tuple[int, int, int, str, str]]: best: Optional[Tuple[float, Tuple[int, int, int, str, str]]] = None local_radius = LOCAL_FIGHT_RADIUS + (4 if squad_name in {"air", "naval"} else 0) for b in obs.visible_enemy_buildings: if local_only and x is not None and y is not None and self._cell_distance(x, y, b.cell_x, b.cell_y) > local_radius: continue priority = TARGET_BUILDING_PRIORITY.get(b.type, 40) if squad_name == "protection": priority += 8 if b.type in DEFENSE_STRUCTURE_TYPES else -12 elif squad_name == "rush": if b.type in {"proc", "weap", "fact", "powr", "apwr"}: priority += 12 elif squad_name == "air": if b.type in {"proc", "weap", "fact", "powr", "apwr", "hpad", "afld", "afld.ukraine"}: priority += 14 if b.type in {"sam", "agun", "tsla"}: priority -= 25 elif squad_name == "naval": if b.type in NAVAL_STRUCTURE_TYPES | {"proc", "weap"}: priority += 10 if b.type in {"sam", "agun", "tsla"}: priority -= 10 dist = self._cell_distance(x, y, b.cell_x, b.cell_y) if x is not None and y is not None else 0 score = priority * 1000 - dist * 20 + (1.0 - b.hp_percent) * 120 candidate = (b.actor_id, b.cell_x, b.cell_y, b.type, "building") if best is None or score > best[0]: best = (score, candidate) for e in obs.visible_enemies: if local_only and x is not None and y is not None and self._cell_distance(x, y, e.cell_x, e.cell_y) > local_radius: continue if "husk" in e.type: continue if not e.can_attack and e.type not in {"harv", "mcv"} and squad_name not in {"rush", "air"}: continue priority = TARGET_UNIT_PRIORITY.get(e.type, 30 if e.can_attack else 10) if squad_name == "protection": if e.can_attack: priority += 15 if e.type in {"harv", "mcv"}: priority -= 20 elif squad_name == "rush": if e.type in {"harv", "mcv", "arty", "v2rl"}: priority += 10 elif squad_name == "air": if e.type in {"harv", "mcv", "arty", "v2rl", "ftrk"}: priority += 14 elif squad_name == "naval": if e.type in SHIP_TYPES: priority += 18 elif e.type in AIRCRAFT_TYPES: priority -= 10 dist = self._cell_distance(x, y, e.cell_x, e.cell_y) if x is not None and y is not None else 0 score = priority * 1000 - dist * 25 + (1.0 - e.hp_percent) * 150 candidate = (e.actor_id, e.cell_x, e.cell_y, e.type, "unit") if best is None or score > best[0]: best = (score, candidate) return best[1] if best is not None else None def _pick_closest_visible_target( self, obs: OpenRAObservation, x: Optional[int], y: Optional[int], squad_name: str = "assault", ) -> Optional[Tuple[int, int, int, str, str]]: if x is None or y is None: return self._pick_priority_target(obs, x, y, local_only=False, squad_name=squad_name) best: Optional[Tuple[tuple[int, int, int], Tuple[int, int, int, str, str]]] = None for b in obs.visible_enemy_buildings: priority = TARGET_BUILDING_PRIORITY.get(b.type, 40) if squad_name == "protection": priority += 8 if b.type in DEFENSE_STRUCTURE_TYPES else -12 elif squad_name == "rush" and b.type in {"proc", "weap", "fact", "powr", "apwr"}: priority += 12 elif squad_name == "air": if b.type in {"proc", "weap", "fact", "powr", "apwr", "hpad", "afld", "afld.ukraine"}: priority += 14 if b.type in {"sam", "agun", "tsla"}: priority -= 25 elif squad_name == "naval": if b.type in NAVAL_STRUCTURE_TYPES | {"proc", "weap"}: priority += 10 if b.type in {"sam", "agun", "tsla"}: priority -= 10 dist = self._cell_distance(x, y, b.cell_x, b.cell_y) key = (dist, -priority, -int((1.0 - b.hp_percent) * 100)) candidate = (b.actor_id, b.cell_x, b.cell_y, b.type, "building") if best is None or key < best[0]: best = (key, candidate) for e in obs.visible_enemies: if "husk" in e.type: continue if not e.can_attack and e.type not in {"harv", "mcv"} and squad_name not in {"rush", "air"}: continue priority = TARGET_UNIT_PRIORITY.get(e.type, 30 if e.can_attack else 10) if squad_name == "protection": if e.can_attack: priority += 15 if e.type in {"harv", "mcv"}: priority -= 20 elif squad_name == "rush" and e.type in {"harv", "mcv", "arty", "v2rl"}: priority += 10 elif squad_name == "air" and e.type in {"harv", "mcv", "arty", "v2rl", "ftrk"}: priority += 14 elif squad_name == "naval": if e.type in SHIP_TYPES: priority += 18 elif e.type in AIRCRAFT_TYPES: priority -= 10 dist = self._cell_distance(x, y, e.cell_x, e.cell_y) key = (dist, -priority, -int((1.0 - e.hp_percent) * 100)) candidate = (e.actor_id, e.cell_x, e.cell_y, e.type, "unit") if best is None or key < best[0]: best = (key, candidate) return best[1] if best is not None else None def _target_actor_is_visible( self, obs: OpenRAObservation, target_actor_id: int, target_kind: str, ) -> bool: return self._visible_actor_by_target(obs, target_actor_id, target_kind) is not None def _focus_fire_commands( self, squad_units: list[UnitInfoModel], target: Tuple[int, int, int, str, str], ) -> List[CommandModel]: target_actor_id, tx, ty, _, target_kind = target commands = [] for u in squad_units: if not u.can_attack or self._busy_attacking(u): continue commands.append(CommandModel( action=ActionType.ATTACK, actor_id=u.actor_id, target_actor_id=target_actor_id, target_x=tx, target_y=ty, )) if commands: self._record_attack_issue( direct_attack=True, command_count=len(commands), target_actor_id=target_actor_id, target_kind=target_kind, ) return commands def _record_attack_issue( self, direct_attack: bool, command_count: int, target_actor_id: int = 0, target_kind: str = "point", ) -> None: if command_count <= 0: return if direct_attack: self._attack_commands_issued += command_count else: self._attack_move_commands_issued += command_count if target_actor_id <= 0: return if target_kind == "unit": self._unit_target_events += 1 self._unique_unit_targets.add(target_actor_id) elif target_kind == "building": self._building_target_events += 1 self._unique_building_targets.add(target_actor_id) def get_attack_stats(self, obs: Optional[OpenRAObservation] = None) -> dict[str, int]: stats = { "attack_commands": self._attack_commands_issued, "attack_move_commands": self._attack_move_commands_issued, "unit_target_events": self._unit_target_events, "building_target_events": self._building_target_events, "unique_unit_targets": len(self._unique_unit_targets), "unique_building_targets": len(self._unique_building_targets), } if obs is not None: stats["units_killed"] = obs.military.units_killed stats["buildings_killed"] = obs.military.buildings_killed return stats def get_squad_stats(self) -> dict[str, object]: return { "idle_ground": len(self._idle_ground_units), "attack_squad": len(self._attack_squad), "rush_squad": len(self._rush_squad), "protection_squad": len(self._protection_squad), "air_squad": len(self._air_squad), "naval_squad": len(self._naval_squad), "assault_threshold": self._assault_threshold, "states": dict(self._squad_states), "targets": {name: self._squad_target_point[name] for name in self._squad_target_point}, } def _busy_attacking(self, unit: UnitInfoModel) -> bool: if unit.is_idle: return False activity = getattr(unit, "current_activity", "").lower().replace(" ", "") if not activity or "attackmove" in activity: return False return "attack" in activity def _retreat_squad_commands( self, obs: OpenRAObservation, squad_units: list[UnitInfoModel], leader: UnitInfoModel, ) -> List[CommandModel]: fallback = self._pick_retreat_point(obs, leader) if fallback is None: return [] tx, ty = fallback return [ CommandModel( action=ActionType.MOVE, actor_id=u.actor_id, target_x=tx, target_y=ty, ) for u in squad_units ] def _pick_retreat_point( self, obs: OpenRAObservation, leader: UnitInfoModel, ) -> Optional[Tuple[int, int]]: if not obs.buildings: return self._base_center(obs) type_bonus = { "fact": 420, "weap": 280, "proc": 260, "ftur": 220, "gun": 200, "tsla": 220, "pbox": 140, "hbox": 140, "powr": 90, "apwr": 110, } best_score: Optional[int] = None best_pos: Optional[Tuple[int, int]] = None for building in obs.buildings: bx = building.cell_x if building.cell_x > 0 else building.pos_x // 1024 by = building.cell_y if building.cell_y > 0 else building.pos_y // 1024 leader_dist = self._cell_distance(leader.cell_x, leader.cell_y, bx, by) enemy_clearance = min( [self._cell_distance(enemy.cell_x, enemy.cell_y, bx, by) for enemy in obs.visible_enemies] + [LOCAL_FIGHT_RADIUS + 8] ) static_clearance = min( [self._cell_distance(enemy.cell_x, enemy.cell_y, bx, by) for enemy in obs.visible_enemy_buildings] + [LOCAL_FIGHT_RADIUS + 8] ) canonical = self._canonical_building_type(building.type) score = type_bonus.get(canonical, 80) score += min(enemy_clearance, LOCAL_FIGHT_RADIUS + 8) * 18 score += min(static_clearance, LOCAL_FIGHT_RADIUS + 8) * 10 score -= leader_dist * 8 if canonical in DEFENSE_STRUCTURE_TYPES: score += 40 if best_score is None or score > best_score: best_score = score best_pos = (bx, by) return best_pos or self._base_center(obs) def _has_own_building_near(self, obs: OpenRAObservation, x: int, y: int, radius: int) -> bool: return any( self._cell_distance( building.cell_x if building.cell_x > 0 else building.pos_x // 1024, building.cell_y if building.cell_y > 0 else building.pos_y // 1024, x, y, ) <= radius for building in obs.buildings ) def _credits_str(self, obs: OpenRAObservation) -> str: return ( f"${obs.economy.cash} cash + ${obs.economy.ore} ore" f" = ${self._available_credits(obs)}" ) def _can_produce(self, obs: OpenRAObservation, item_type: str) -> bool: if item_type in obs.available_production: return True for b in obs.buildings: if item_type in b.can_produce: return True return False def _log(self, msg: str): if self.verbose: print(f" [NormalAI] {msg}")