Spaces:
Running
Running
| """ | |
| GameEngine — server-side game loop running at 4 ticks/second. | |
| One engine instance per active room. Responsibilities: | |
| - Tick loop (asyncio task) | |
| - Apply parsed voice commands | |
| - Mining, construction, production, movement, combat | |
| - Win-condition check | |
| - State broadcast via Socket.IO | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import math | |
| from typing import TYPE_CHECKING, Optional, Union | |
| from config import TICK_INTERVAL, HARVEST_INTERVAL_TICKS, MINERAL_PER_HARVEST, GAS_PER_HARVEST, MINING_DRILL_TICKS | |
| from .bot import BOT_TICK_INTERVAL, BotPlayer | |
| from .buildings import Building, BuildingDef, BuildingStatus, BuildingType, BUILDING_DEFS | |
| from .commands import ActionResult, ActionType, CommandResult, GameAction, ParsedCommand | |
| from .map import MAP_HEIGHT, MAP_WIDTH, MAP_LANDMARKS, ResourceType | |
| from .pathfinding import find_path, invalidate_path_cache, is_walkable, nearest_walkable_navpoint, snap_to_walkable | |
| from .state import GamePhase, GameState, PlayerState | |
| from .tech_tree import can_build, can_train, get_producer, missing_for_build, missing_for_train | |
| from .units import Unit, UnitDef, UnitStatus, UnitType, UNIT_DEFS | |
| if TYPE_CHECKING: | |
| import socketio | |
| log = logging.getLogger(__name__) | |
| # Auto-attack trigger range (fraction of weapon range) | |
| AUTO_ATTACK_RANGE_FACTOR = 0.6 | |
| # Radius (in tiles) to consider "units in zone" for query_units | |
| ZONE_RADIUS = 10.0 | |
| # Unit hitbox radius for attack range (center + radius) | |
| UNIT_RADIUS = 0.5 | |
| class GameEngine: | |
| def __init__(self, state: GameState, sio: "socketio.AsyncServer") -> None: | |
| self.state = state | |
| self.sio = sio | |
| self._task: Optional[asyncio.Task] = None # type: ignore[type-arg] | |
| self.bot: Optional[BotPlayer] = None | |
| self._sound_events: list[dict] = [] # fire/death per tick, sent in game_update | |
| self._cmd_lang: str = "fr" | |
| # ------------------------------------------------------------------ | |
| # Lifecycle | |
| # ------------------------------------------------------------------ | |
| def start(self) -> None: | |
| self._task = asyncio.create_task(self._loop()) | |
| async def stop(self) -> None: | |
| if self._task: | |
| self._task.cancel() | |
| try: | |
| await self._task | |
| except asyncio.CancelledError: | |
| pass | |
| # ------------------------------------------------------------------ | |
| # Public command entry point | |
| # ------------------------------------------------------------------ | |
| def apply_command(self, player_id: str, parsed: ParsedCommand) -> CommandResult: | |
| self._cmd_lang = getattr(parsed, "language", "fr") or "fr" | |
| player = self.state.players.get(player_id) | |
| if not player or self.state.phase != GamePhase.PLAYING: | |
| return CommandResult(results=[], feedback_override="game_not_in_progress") | |
| results: list[ActionResult] = [] | |
| last_query_unit_ids: Optional[list[str]] = None | |
| for action in parsed.actions: | |
| # If assign_to_group has no unit_ids, use result of previous query_units | |
| if action.type == ActionType.ASSIGN_TO_GROUP and not action.unit_ids and last_query_unit_ids is not None: | |
| action = action.model_copy(update={"unit_ids": last_query_unit_ids}) | |
| result = self._dispatch(player, action) | |
| results.append(result) | |
| if result.success and action.type == ActionType.QUERY_UNITS: | |
| last_query_unit_ids = result.unit_ids | |
| player.recalculate_supply() | |
| return CommandResult(results=results) | |
| # ------------------------------------------------------------------ | |
| # Tick loop | |
| # ------------------------------------------------------------------ | |
| async def _loop(self) -> None: | |
| while self.state.phase == GamePhase.PLAYING: | |
| await asyncio.sleep(TICK_INTERVAL) | |
| try: | |
| self._tick() | |
| if self.bot and self.state.tick % BOT_TICK_INTERVAL == 0: | |
| self.bot.act() | |
| except Exception: | |
| log.exception("Uncaught exception in _tick (tick=%d)", self.state.tick) | |
| await self._broadcast() | |
| if self.state.phase == GamePhase.GAME_OVER and self.state.winner: | |
| winner_state = self.state.players.get(self.state.winner) | |
| winner_name = winner_state.player_name if winner_state else self.state.winner | |
| await self.sio.emit( | |
| "game_over", | |
| {"winner_id": self.state.winner, "winner_name": winner_name}, | |
| room=self.state.room_id, | |
| ) | |
| break | |
| def _tick(self) -> None: | |
| self.state.tick += 1 | |
| self._sound_events.clear() | |
| for player in self.state.players.values(): | |
| self._tick_construction(player) | |
| self._tick_production(player) | |
| self._tick_mining(player) | |
| self._tick_movement_and_combat() | |
| self._apply_crowd_pressure() | |
| self._tick_healing() | |
| self._remove_dead() | |
| for player in self.state.players.values(): | |
| player.recalculate_supply() | |
| winner = self._check_win() | |
| if winner: | |
| self.state.phase = GamePhase.GAME_OVER | |
| self.state.winner = winner | |
| # ------------------------------------------------------------------ | |
| # Sub-tick processors | |
| # ------------------------------------------------------------------ | |
| def _tick_construction(self, player: PlayerState) -> None: | |
| """Advance SCV-built buildings under construction.""" | |
| for building in player.buildings.values(): | |
| if building.status != BuildingStatus.CONSTRUCTING: | |
| continue | |
| # Only progress if the assigned SCV has arrived (status == BUILDING) | |
| scv = next( | |
| (u for u in player.units.values() | |
| if u.building_target_id == building.id and u.status == UnitStatus.BUILDING), | |
| None, | |
| ) | |
| if not scv: | |
| continue | |
| building.construction_ticks_remaining -= 1 | |
| # HP grows linearly from 15% to 100% over build time | |
| hp_gain = building.max_hp * 0.85 / max(1, building.construction_max_ticks) | |
| building.hp = min(building.hp + hp_gain, float(building.max_hp)) | |
| if building.construction_ticks_remaining <= 0: | |
| building.status = BuildingStatus.ACTIVE | |
| building.construction_ticks_remaining = 0 | |
| building.hp = float(building.max_hp) | |
| scv.building_target_id = None | |
| scv.target_x = scv.target_y = None | |
| # Auto-return SCV to nearest mineral patch | |
| cc = player.command_center() | |
| cx = float(cc.x) + 2.0 if cc else scv.x | |
| cy = float(cc.y) + 1.5 if cc else scv.y | |
| patch = self.state.game_map.nearest_mineral(cx, cy) | |
| if patch: | |
| scv.status = UnitStatus.MINING_MINERALS | |
| scv.assigned_resource_id = patch.id | |
| scv.harvest_carry = False | |
| scv.harvest_amount = 0 | |
| scv.harvest_mining_ticks = 0 | |
| patch.assigned_scv_ids.append(scv.id) | |
| self._set_unit_destination(scv, float(patch.x), float(patch.y), is_flying=False) | |
| else: | |
| scv.status = UnitStatus.IDLE | |
| def _tick_production(self, player: PlayerState) -> None: | |
| """Tick building production queues and spawn units.""" | |
| for building in player.buildings.values(): | |
| if building.status == BuildingStatus.CONSTRUCTING: | |
| continue | |
| if not building.production_queue: | |
| building.status = BuildingStatus.ACTIVE | |
| continue | |
| item = building.production_queue[0] | |
| item.ticks_remaining -= 1 | |
| building.status = BuildingStatus.PRODUCING | |
| if item.ticks_remaining <= 0: | |
| building.production_queue.pop(0) | |
| building.status = ( | |
| BuildingStatus.PRODUCING if building.production_queue | |
| else BuildingStatus.ACTIVE | |
| ) | |
| self._spawn_unit(player, building, UnitType(item.unit_type)) | |
| def _spawn_unit(self, player: PlayerState, building: Building, ut: UnitType) -> None: | |
| raw_sx, raw_sy = building.spawn_point() | |
| is_flying = UNIT_DEFS[ut].is_flying | |
| if is_flying: | |
| sx, sy = raw_sx, raw_sy | |
| else: | |
| blocked = self._building_blocked_rects() | |
| sx, sy = nearest_walkable_navpoint(raw_sx, raw_sy, blocked_rects=blocked) | |
| tx = building.rally_x if building.rally_x is not None else sx | |
| ty = building.rally_y if building.rally_y is not None else sy | |
| unit = Unit.create(ut, player.player_id, sx, sy) | |
| if tx != sx or ty != sy: | |
| unit.status = UnitStatus.MOVING | |
| self._set_unit_destination(unit, tx, ty, is_flying=is_flying) | |
| player.units[unit.id] = unit | |
| if ut == UnitType.SCV: | |
| cc = player.command_center() | |
| bdefn = BUILDING_DEFS[building.building_type] | |
| patch = self.state.game_map.nearest_mineral( | |
| float(building.x) + bdefn.width / 2, | |
| float(building.y) + bdefn.height / 2, | |
| ) | |
| if patch and cc: | |
| unit.status = UnitStatus.MINING_MINERALS | |
| unit.assigned_resource_id = patch.id | |
| unit.harvest_carry = False | |
| unit.harvest_amount = 0 | |
| unit.harvest_mining_ticks = 0 | |
| patch.assigned_scv_ids.append(unit.id) | |
| self._set_unit_destination(unit, float(patch.x), float(patch.y), is_flying=False) | |
| def _tick_mining(self, player: PlayerState) -> None: | |
| """SCVs do round-trips between resources and the Command Center. | |
| Each tick the harvester moves toward its current target (handled by | |
| _tick_movement_and_combat). Here we only detect arrival events and | |
| trigger the collect/deposit logic. | |
| """ | |
| cc = player.command_center() | |
| if not cc: | |
| # If CC is gone reset all mining SCVs | |
| for unit in player.units.values(): | |
| if unit.status in (UnitStatus.MINING_MINERALS, UnitStatus.MINING_GAS): | |
| unit.status = UnitStatus.IDLE | |
| unit.target_x = unit.target_y = None | |
| unit.path_waypoints = [] | |
| unit.harvest_carry = False | |
| unit.harvest_mining_ticks = 0 | |
| return | |
| mineral_arrive = 1.2 # SCV stops right next to the patch | |
| cc_edge_arrive = 1.2 # SCV triggers deposit when ~1 tile from any CC visual edge | |
| for unit in player.units.values(): | |
| if unit.status == UnitStatus.MINING_MINERALS: | |
| resource = self.state.game_map.get_resource(unit.assigned_resource_id or "") | |
| if not resource or resource.is_depleted: | |
| # Remove from old patch assignment | |
| if resource and unit.id in resource.assigned_scv_ids: | |
| resource.assigned_scv_ids.remove(unit.id) | |
| unit.assigned_resource_id = None | |
| # Auto-reassign to next available mineral instead of going idle | |
| next_patch = self.state.game_map.nearest_mineral(cc.x, cc.y) | |
| if next_patch: | |
| unit.assigned_resource_id = next_patch.id | |
| next_patch.assigned_scv_ids.append(unit.id) | |
| unit.harvest_mining_ticks = 0 | |
| self._set_unit_destination(unit, float(next_patch.x), float(next_patch.y), is_flying=False) | |
| else: | |
| unit.status = UnitStatus.IDLE | |
| unit.target_x = unit.target_y = None | |
| unit.path_waypoints = [] | |
| unit.harvest_carry = False | |
| unit.harvest_mining_ticks = 0 | |
| continue | |
| rx, ry = float(resource.x), float(resource.y) | |
| if not unit.harvest_carry: | |
| if unit.target_x is None: | |
| self._set_unit_destination(unit, rx, ry, is_flying=False) | |
| if unit.dist_to(rx, ry) <= mineral_arrive: | |
| unit.harvest_mining_ticks += 1 | |
| if unit.harvest_mining_ticks >= MINING_DRILL_TICKS: | |
| gathered = min(MINERAL_PER_HARVEST, resource.amount) | |
| resource.amount -= gathered | |
| unit.harvest_carry = True | |
| unit.harvest_amount = gathered | |
| unit.harvest_mining_ticks = 0 | |
| tx, ty = self._nearest_building_entry(unit, cc) | |
| self._set_unit_destination(unit, tx, ty, is_flying=False) | |
| else: | |
| if unit.target_x is None: | |
| tx, ty = self._nearest_building_entry(unit, cc) | |
| self._set_unit_destination(unit, tx, ty, is_flying=False) | |
| if self._dist_unit_to_building(unit, cc) <= cc_edge_arrive: | |
| player.minerals += unit.harvest_amount | |
| unit.harvest_carry = False | |
| unit.harvest_amount = 0 | |
| self._set_unit_destination(unit, rx, ry, is_flying=False) | |
| elif unit.status == UnitStatus.MINING_GAS: | |
| resource = self.state.game_map.get_resource(unit.assigned_resource_id or "") | |
| if not resource or not resource.has_refinery: | |
| unit.assigned_resource_id = None | |
| unit.status = UnitStatus.IDLE | |
| unit.target_x = unit.target_y = None | |
| unit.path_waypoints = [] | |
| unit.harvest_carry = False | |
| continue | |
| rx, ry = float(resource.x), float(resource.y) | |
| if not unit.harvest_carry: | |
| if unit.target_x is None: | |
| self._set_unit_destination(unit, rx, ry, is_flying=False) | |
| if unit.dist_to(rx, ry) <= mineral_arrive: | |
| unit.harvest_carry = True | |
| unit.harvest_amount = GAS_PER_HARVEST | |
| tx, ty = self._nearest_building_entry(unit, cc) | |
| self._set_unit_destination(unit, tx, ty, is_flying=False) | |
| else: | |
| if unit.target_x is None: | |
| tx, ty = self._nearest_building_entry(unit, cc) | |
| self._set_unit_destination(unit, tx, ty, is_flying=False) | |
| if self._dist_unit_to_building(unit, cc) <= cc_edge_arrive: | |
| player.gas += unit.harvest_amount | |
| unit.harvest_carry = False | |
| unit.harvest_amount = 0 | |
| self._set_unit_destination(unit, rx, ry, is_flying=False) | |
| def _tick_movement_and_combat(self) -> None: | |
| """Move units toward targets and resolve attacks.""" | |
| # Build flat lookup of all units across both players | |
| all_units: dict[str, tuple[Unit, str]] = {} # id -> (unit, owner_id) | |
| for pid, player in self.state.players.items(): | |
| for uid, unit in player.units.items(): | |
| all_units[uid] = (unit, pid) | |
| for pid, player in self.state.players.items(): | |
| enemy = self.state.enemy_of(pid) | |
| if not enemy: | |
| continue | |
| for unit in player.units.values(): | |
| defn = UNIT_DEFS[unit.unit_type] | |
| # Siege tanks in siege mode cannot move | |
| if unit.is_sieged: | |
| self._combat_attack(unit, defn, all_units, player, enemy, sieged=True) | |
| continue | |
| # Building SCVs stay put; mining SCVs move but skip combat | |
| if unit.status == UnitStatus.BUILDING: | |
| continue | |
| if unit.status == UnitStatus.MOVING_TO_BUILD: | |
| if unit.target_x is not None and unit.target_y is not None: | |
| self._move_toward(unit, defn, unit.target_x, unit.target_y) | |
| continue | |
| if unit.status in (UnitStatus.MINING_MINERALS, UnitStatus.MINING_GAS): | |
| if unit.target_x is not None and unit.target_y is not None: | |
| self._move_toward(unit, defn, unit.target_x, unit.target_y) | |
| continue | |
| # Movement (attack-move: stop and shoot if enemy in range) | |
| if unit.status in (UnitStatus.MOVING, UnitStatus.ATTACKING, UnitStatus.PATROLLING): | |
| target_x = unit.target_x | |
| target_y = unit.target_y | |
| if target_x is not None and target_y is not None: | |
| if unit.status == UnitStatus.ATTACKING: | |
| nearest = self._nearest_enemy_in_range( | |
| unit, enemy, defn.attack_range, for_attack_move=True | |
| ) | |
| if nearest is not None: | |
| if isinstance(nearest, Unit): | |
| unit.attack_target_id = nearest.id | |
| unit.attack_target_building_id = None | |
| else: | |
| unit.attack_target_building_id = nearest.id | |
| unit.attack_target_id = None | |
| # Don't move this tick; combat will run below | |
| else: | |
| self._move_toward(unit, defn, target_x, target_y) | |
| else: | |
| self._move_toward(unit, defn, target_x, target_y) | |
| # Auto-attack: idle units defend themselves | |
| auto_range = defn.attack_range * AUTO_ATTACK_RANGE_FACTOR | |
| if unit.status == UnitStatus.IDLE: | |
| nearest = self._nearest_enemy_in_range( | |
| unit, enemy, auto_range, for_attack_move=False | |
| ) | |
| if nearest: | |
| if isinstance(nearest, Unit): | |
| unit.attack_target_id = nearest.id | |
| unit.attack_target_building_id = None | |
| else: | |
| unit.attack_target_building_id = nearest.id | |
| unit.attack_target_id = None | |
| # Combat (unit or building target) | |
| if unit.attack_target_id or unit.attack_target_building_id: | |
| self._combat_attack(unit, defn, all_units, player, enemy, sieged=False) | |
| def _building_blocked_rects(self) -> list[tuple[float, float, float, float]]: | |
| """Collision footprints (x, y, w, h) used for pathfinding and unit collision. | |
| Uses the shrunk collision box so units can pass between buildings | |
| that have a small visual gap. | |
| """ | |
| rects: list[tuple[float, float, float, float]] = [] | |
| for player in self.state.players.values(): | |
| for b in player.buildings.values(): | |
| if b.status == BuildingStatus.DESTROYED: | |
| continue | |
| defn = BUILDING_DEFS[b.building_type] | |
| chw, chh = defn.col_hw(), defn.col_hh() | |
| rects.append((b.x - chw, b.y - chh, chw * 2, chh * 2)) | |
| return rects | |
| def _set_unit_destination( | |
| self, unit: Unit, tx: float, ty: float, *, is_flying: bool | |
| ) -> None: | |
| """Set unit target and path_waypoints; for ground units uses pathfinding within walkable zones.""" | |
| unit.target_x = tx | |
| unit.target_y = ty | |
| unit.path_waypoints = [] | |
| if is_flying: | |
| return | |
| blocked = self._building_blocked_rects() | |
| sx, sy = unit.x, unit.y | |
| path = find_path(sx, sy, tx, ty, blocked_rects=blocked) | |
| if path is None: | |
| # Start is inside a building footprint — snap start outside first | |
| snapped_start = snap_to_walkable(sx, sy, blocked_rects=blocked) | |
| if snapped_start != (sx, sy): | |
| path = find_path(snapped_start[0], snapped_start[1], tx, ty, blocked_rects=blocked) | |
| if path is None: | |
| snapped_dst = snap_to_walkable(tx, ty, blocked_rects=blocked) | |
| unit.target_x, unit.target_y = snapped_dst[0], snapped_dst[1] | |
| return | |
| if not path: | |
| return | |
| unit.target_x, unit.target_y = path[0][0], path[0][1] | |
| unit.path_waypoints = [[p[0], p[1]] for p in path[1:]] | |
| def _would_overlap( | |
| self, unit: Unit, new_x: float, new_y: float, *, exclude_unit_id: Optional[str] = None | |
| ) -> bool: | |
| """True if (new_x, new_y) would overlap another unit or a building. | |
| Mining SCVs use soft collision (they can pass through each other, like in SC). | |
| For other units, if two already overlap we allow moves that increase separation. | |
| """ | |
| _mining = (UnitStatus.MINING_MINERALS, UnitStatus.MINING_GAS) | |
| unit_is_miner = unit.status in _mining | |
| for player in self.state.players.values(): | |
| for u in player.units.values(): | |
| if u.id == unit.id or (exclude_unit_id and u.id == exclude_unit_id): | |
| continue | |
| # Miners pass through other miners (soft collision, matching SC behaviour) | |
| if unit_is_miner and u.status in _mining: | |
| continue | |
| min_dist = 2 * UNIT_RADIUS | |
| new_dist = math.hypot(new_x - u.x, new_y - u.y) | |
| if new_dist >= min_dist: | |
| continue | |
| # Already overlapping: allow the move only if it increases separation | |
| cur_dist = math.hypot(unit.x - u.x, unit.y - u.y) | |
| if cur_dist < min_dist and new_dist >= cur_dist: | |
| continue # moving away from an already-overlapping unit — allow | |
| return True | |
| for rx, ry, w, h in self._building_blocked_rects(): | |
| px = max(rx, min(rx + w, new_x)) | |
| py = max(ry, min(ry + h, new_y)) | |
| if math.hypot(new_x - px, new_y - py) < UNIT_RADIUS: | |
| return True | |
| return False | |
| def _move_toward(self, unit: Unit, defn: UnitDef, tx: float, ty: float) -> None: | |
| """Move unit up to one full step toward target, consuming intermediate waypoints smoothly.""" | |
| step = defn.move_speed * TICK_INTERVAL | |
| remaining = step | |
| cur_tx, cur_ty = tx, ty | |
| moved = False | |
| arrived = False | |
| while remaining > 1e-6: | |
| dx = cur_tx - unit.x | |
| dy = cur_ty - unit.y | |
| dist = math.sqrt(dx * dx + dy * dy) | |
| if dist < 1e-6: | |
| # Already on this waypoint — advance immediately | |
| if unit.path_waypoints: | |
| nw = unit.path_waypoints.pop(0) | |
| cur_tx = nw[0]; cur_ty = nw[1] | |
| unit.target_x = cur_tx; unit.target_y = cur_ty | |
| else: | |
| arrived = True | |
| break | |
| if dist <= remaining: | |
| # Can reach this waypoint within remaining budget — snap and continue | |
| unit.x = cur_tx | |
| unit.y = cur_ty | |
| remaining -= dist | |
| moved = True | |
| if unit.path_waypoints: | |
| nw = unit.path_waypoints.pop(0) | |
| cur_tx = nw[0]; cur_ty = nw[1] | |
| unit.target_x = cur_tx; unit.target_y = cur_ty | |
| else: | |
| arrived = True | |
| break | |
| else: | |
| # Partial move — try direct, then rotated directions to flow around obstacles | |
| nx = dx / dist | |
| ny = dy / dist | |
| dist_before = dist | |
| new_x = unit.x + nx * remaining | |
| new_y = unit.y + ny * remaining | |
| if not self._would_overlap(unit, new_x, new_y): | |
| unit.x = new_x | |
| unit.y = new_y | |
| moved = True | |
| else: | |
| # Build candidate directions: random jitter first (breaks vortex), | |
| # then deterministic ±30°/±60°/±90° fallbacks. | |
| import random as _random | |
| _rjit = _random.uniform(-0.45, 0.45) # up to ~26° random rotation | |
| _cr, _sr = math.cos(_rjit), math.sin(_rjit) | |
| _cos30, _sin30 = 0.866, 0.5 | |
| _cos60, _sin60 = 0.5, 0.866 | |
| steer_candidates = [ | |
| # Random jitter — breaks symmetry and prevents vortex | |
| ( nx * _cr - ny * _sr, nx * _sr + ny * _cr), | |
| # Deterministic ±30°, ±60°, ±90° | |
| ( nx * _cos30 - ny * _sin30, nx * _sin30 + ny * _cos30), | |
| ( nx * _cos30 + ny * _sin30, -nx * _sin30 + ny * _cos30), | |
| ( nx * _cos60 - ny * _sin60, nx * _sin60 + ny * _cos60), | |
| ( nx * _cos60 + ny * _sin60, -nx * _sin60 + ny * _cos60), | |
| (-ny, nx), | |
| ( ny, -nx), | |
| ] | |
| for ax, ay in steer_candidates: | |
| cx2 = unit.x + ax * remaining | |
| cy2 = unit.y + ay * remaining | |
| if not self._would_overlap(unit, cx2, cy2): | |
| unit.x = cx2 | |
| unit.y = cy2 | |
| moved = True | |
| break | |
| # Oscillation detection: if we moved but didn't get closer to current | |
| # waypoint, increment stall counter. After enough stall ticks, skip | |
| # 2 waypoints ahead to escape the oscillation zone. | |
| if moved: | |
| new_dist = math.hypot(cur_tx - unit.x, cur_ty - unit.y) | |
| if new_dist >= dist_before - step * 0.25: | |
| unit.nav_stall_ticks += 1 | |
| if unit.nav_stall_ticks >= 10: | |
| unit.nav_stall_ticks = 0 | |
| # Skip up to 2 waypoints ahead to jump past the congestion | |
| for _ in range(2): | |
| if unit.path_waypoints: | |
| nw = unit.path_waypoints.pop(0) | |
| cur_tx, cur_ty = nw[0], nw[1] | |
| unit.target_x, unit.target_y = cur_tx, cur_ty | |
| else: | |
| unit.nav_stall_ticks = 0 | |
| break | |
| unit.target_x = cur_tx | |
| unit.target_y = cur_ty | |
| if moved or arrived: | |
| unit.stuck_ticks = 0 | |
| if arrived: | |
| unit.nav_stall_ticks = 0 | |
| else: | |
| unit.stuck_ticks += 1 | |
| if unit.stuck_ticks >= 3: | |
| unit.stuck_ticks = 0 | |
| # Recalculate path from current position to break the deadlock | |
| self._set_unit_destination(unit, tx, ty, is_flying=defn.is_flying) | |
| if unit.path_waypoints: | |
| nw = unit.path_waypoints.pop(0) | |
| unit.target_x, unit.target_y = nw[0], nw[1] | |
| else: | |
| arrived = True | |
| else: | |
| return | |
| if not arrived: | |
| return | |
| if unit.status == UnitStatus.MOVING: | |
| unit.status = UnitStatus.IDLE | |
| unit.target_x = unit.target_y = None | |
| elif unit.status == UnitStatus.ATTACKING: | |
| unit.status = UnitStatus.IDLE | |
| unit.target_x = unit.target_y = None | |
| elif unit.status == UnitStatus.MOVING_TO_BUILD: | |
| unit.status = UnitStatus.BUILDING | |
| unit.target_x = unit.target_y = None | |
| elif unit.status == UnitStatus.PATROLLING: | |
| unit.target_x, unit.patrol_x = unit.patrol_x, unit.target_x | |
| unit.target_y, unit.patrol_y = unit.patrol_y, unit.target_y | |
| def _combat_attack( | |
| self, | |
| unit: Unit, | |
| defn: UnitDef, | |
| all_units: dict[str, tuple[Unit, str]], | |
| player: PlayerState, | |
| enemy: PlayerState, | |
| sieged: bool, | |
| ) -> None: | |
| if unit.attack_cooldown > 0: | |
| unit.attack_cooldown -= 1 | |
| return | |
| rng = defn.siege_range if sieged else defn.attack_range | |
| target_unit, _ = all_units.get(unit.attack_target_id or "", (None, None)) | |
| target_building = enemy.buildings.get(unit.attack_target_building_id or "") | |
| has_valid_unit = ( | |
| unit.attack_target_id and target_unit is not None and target_unit.hp > 0 | |
| ) | |
| has_valid_building = ( | |
| unit.attack_target_building_id | |
| and target_building is not None | |
| and target_building.status != BuildingStatus.DESTROYED | |
| and target_building.hp > 0 | |
| ) | |
| need_acquire = not (has_valid_unit or has_valid_building) | |
| if need_acquire: | |
| unit.attack_target_id = None | |
| unit.attack_target_building_id = None | |
| target = self._nearest_enemy_in_range(unit, enemy, rng) | |
| if not target: | |
| return | |
| if isinstance(target, Unit): | |
| unit.attack_target_id = target.id | |
| target_unit = target | |
| else: | |
| unit.attack_target_building_id = target.id | |
| target_building = target | |
| # --- Target is a unit --- | |
| if unit.attack_target_id and target_unit and target_unit.hp > 0: | |
| dist = unit.dist_to(target_unit.x, target_unit.y) | |
| attack_range = defn.siege_range if sieged else defn.attack_range | |
| in_range = dist <= attack_range + 2 * UNIT_RADIUS | |
| if not in_range: | |
| if not sieged: | |
| unit.target_x = target_unit.x | |
| unit.target_y = target_unit.y | |
| unit.status = UnitStatus.ATTACKING | |
| return | |
| if sieged: | |
| dmg = defn.siege_damage | |
| splash = defn.siege_splash_radius | |
| cooldown = defn.siege_cooldown_ticks | |
| for eu in enemy.units.values(): | |
| if eu.dist_to(target_unit.x, target_unit.y) <= splash: | |
| self._apply_damage(eu, dmg) | |
| self._alert_allies(victim=eu, attacker=unit, victim_player=enemy) | |
| self._sound_events.append({"kind": "fire", "unit_type": unit.unit_type.value}) | |
| else: | |
| target_flying = UNIT_DEFS[target_unit.unit_type].is_flying | |
| dmg = defn.air_damage if target_flying else defn.ground_damage | |
| cooldown = defn.attack_cooldown_ticks | |
| if dmg > 0: | |
| self._apply_damage(target_unit, dmg) | |
| self._sound_events.append({"kind": "fire", "unit_type": unit.unit_type.value}) | |
| self._alert_allies(victim=target_unit, attacker=unit, victim_player=enemy) | |
| unit.attack_cooldown = cooldown | |
| unit.status = UnitStatus.ATTACKING | |
| return | |
| # --- Target is a building --- | |
| if unit.attack_target_building_id and target_building and target_building.hp > 0: | |
| dist = self._dist_unit_to_building(unit, target_building) | |
| attack_range = defn.siege_range if sieged else defn.attack_range | |
| in_range = dist <= attack_range + UNIT_RADIUS | |
| if not in_range: | |
| if not sieged: | |
| cx, cy = self._building_center(target_building) | |
| unit.target_x = cx | |
| unit.target_y = cy | |
| unit.status = UnitStatus.ATTACKING | |
| return | |
| dmg = defn.siege_damage if sieged else defn.ground_damage | |
| cooldown = defn.siege_cooldown_ticks if sieged else defn.attack_cooldown_ticks | |
| if dmg > 0: | |
| target_building.hp = max(0.0, target_building.hp - dmg) | |
| self._sound_events.append({"kind": "fire", "unit_type": unit.unit_type.value}) | |
| unit.attack_cooldown = cooldown | |
| unit.status = UnitStatus.ATTACKING | |
| return | |
| def _apply_damage(self, target: Unit, raw_dmg: int) -> None: | |
| defn = UNIT_DEFS[target.unit_type] | |
| effective = max(0, raw_dmg - defn.armor) | |
| target.hp = max(0.0, target.hp - effective) | |
| def _alert_allies(self, victim: Unit, attacker: Unit, victim_player: PlayerState) -> None: | |
| """Idle combat allies within double their attack range of the victim rally to attack the aggressor.""" | |
| for ally in victim_player.units.values(): | |
| if ally.id == victim.id or ally.status != UnitStatus.IDLE: | |
| continue | |
| ally_defn = UNIT_DEFS[ally.unit_type] | |
| if ally_defn.ground_damage == 0 and ally_defn.air_damage == 0: | |
| continue | |
| alert_range = ally_defn.attack_range * 2 | |
| if ally.dist_to(victim.x, victim.y) <= alert_range: | |
| ally.attack_target_id = attacker.id | |
| ally.attack_target_building_id = None | |
| ally.status = UnitStatus.ATTACKING | |
| ally.target_x = attacker.x | |
| ally.target_y = attacker.y | |
| def _apply_crowd_pressure(self) -> None: | |
| """Nudge ground units that are not firing when allied moving units press behind them. | |
| For each non-firing idle/moving ground unit, we sum the pressure vectors of | |
| nearby allies that are actively advancing toward it (within a ~3-unit cone). | |
| A small fraction of the resulting vector is applied as a nudge, provided the | |
| destination is collision-free. | |
| """ | |
| PUSH_RADIUS = UNIT_RADIUS * 5 # ~2.5 tile look-ahead for pushers | |
| NUDGE = UNIT_RADIUS * 0.35 # ≈0.175 tiles per tick (soft shove) | |
| MIN_DOT = 0.45 # cos ~63° — pusher must face the blockee | |
| for player in self.state.players.values(): | |
| for unit in player.units.values(): | |
| defn = UNIT_DEFS[unit.unit_type] | |
| # Only nudge ground units that are not shooting and not doing special work | |
| if defn.is_flying: | |
| continue | |
| if unit.attack_target_id or unit.attack_target_building_id: | |
| continue | |
| if unit.is_sieged: | |
| continue | |
| if unit.status in ( | |
| UnitStatus.MINING_MINERALS, | |
| UnitStatus.MINING_GAS, | |
| UnitStatus.BUILDING, | |
| UnitStatus.MOVING_TO_BUILD, | |
| ): | |
| continue | |
| fx, fy = 0.0, 0.0 | |
| for pusher in player.units.values(): | |
| if pusher.id == unit.id: | |
| continue | |
| if pusher.status not in (UnitStatus.MOVING, UnitStatus.ATTACKING, UnitStatus.PATROLLING): | |
| continue | |
| if pusher.target_x is None or pusher.target_y is None: | |
| continue | |
| dx_rel = unit.x - pusher.x | |
| dy_rel = unit.y - pusher.y | |
| d = math.hypot(dx_rel, dy_rel) | |
| if d > PUSH_RADIUS or d < 1e-6: | |
| continue | |
| # Direction the pusher wants to go | |
| dx_m = pusher.target_x - pusher.x | |
| dy_m = pusher.target_y - pusher.y | |
| dm = math.hypot(dx_m, dy_m) | |
| if dm < 1e-6: | |
| continue | |
| nx_m, ny_m = dx_m / dm, dy_m / dm | |
| # The blockee must be roughly in the pusher's forward cone | |
| dot = (dx_rel / d) * nx_m + (dy_rel / d) * ny_m | |
| if dot < MIN_DOT: | |
| continue | |
| # Pressure strength: stronger when closer and more aligned | |
| strength = dot * (1.0 - d / PUSH_RADIUS) | |
| fx += nx_m * strength | |
| fy += ny_m * strength | |
| fmag = math.hypot(fx, fy) | |
| if fmag < 1e-6: | |
| continue | |
| new_x = unit.x + (fx / fmag) * NUDGE | |
| new_y = unit.y + (fy / fmag) * NUDGE | |
| if not self._would_overlap(unit, new_x, new_y): | |
| unit.x = new_x | |
| unit.y = new_y | |
| def _tick_healing(self) -> None: | |
| """Medics heal the most-injured adjacent infantry unit.""" | |
| for player in self.state.players.values(): | |
| for medic in player.units_of(UnitType.MEDIC): | |
| defn = UNIT_DEFS[UnitType.MEDIC] | |
| healable = [ | |
| u for u in player.units.values() | |
| if u.unit_type in (UnitType.MARINE, UnitType.MEDIC) | |
| and u.hp < u.max_hp | |
| and medic.dist_to(u.x, u.y) <= defn.attack_range | |
| and u.id != medic.id | |
| ] | |
| if healable: | |
| target = min(healable, key=lambda u: u.hp / u.max_hp) | |
| target.hp = min(float(target.max_hp), target.hp + defn.heal_per_tick) | |
| medic.status = UnitStatus.HEALING | |
| def _remove_dead(self) -> None: | |
| """Remove units with hp <= 0 and mark buildings as destroyed.""" | |
| for player in self.state.players.values(): | |
| dead = [uid for uid, u in player.units.items() if u.hp <= 0] | |
| for uid in dead: | |
| # Unassign from resource patches | |
| unit = player.units[uid] | |
| self._sound_events.append({"kind": "death", "unit_type": unit.unit_type.value}) | |
| if unit.assigned_resource_id: | |
| res = self.state.game_map.get_resource(unit.assigned_resource_id) | |
| if res and uid in res.assigned_scv_ids: | |
| res.assigned_scv_ids.remove(uid) | |
| for gids in player.control_groups.values(): | |
| if uid in gids: | |
| gids.remove(uid) | |
| del player.units[uid] | |
| for building in player.buildings.values(): | |
| if building.hp <= 0 and building.status != BuildingStatus.DESTROYED: | |
| building.status = BuildingStatus.DESTROYED | |
| building.production_queue.clear() | |
| invalidate_path_cache() | |
| def _check_win(self) -> Optional[str]: | |
| if self.state.is_tutorial: | |
| return None | |
| for player_id, player in self.state.players.items(): | |
| cc = player.command_center() | |
| if cc is None: | |
| enemy = self.state.enemy_of(player_id) | |
| return enemy.player_id if enemy else None | |
| return None | |
| # ------------------------------------------------------------------ | |
| # Helpers | |
| # ------------------------------------------------------------------ | |
| def _building_center(self, b: Building) -> tuple[float, float]: | |
| return (b.x, b.y) | |
| def _dist_unit_to_building(self, unit: Unit, building: Building) -> float: | |
| """Distance from unit center to nearest point on the visual building footprint. | |
| Uses the full visual box (not the shrunk collision box) so that attack range | |
| and deposit detection work relative to the visible edge of the building. | |
| """ | |
| defn = BUILDING_DEFS[building.building_type] | |
| x0, x1 = building.x - defn.width / 2, building.x + defn.width / 2 | |
| y0, y1 = building.y - defn.height / 2, building.y + defn.height / 2 | |
| px = max(x0, min(x1, unit.x)) | |
| py = max(y0, min(y1, unit.y)) | |
| return math.sqrt((unit.x - px) ** 2 + (unit.y - py) ** 2) | |
| def _nearest_building_entry(self, unit: Unit, building: Building) -> tuple[float, float]: | |
| """Return a point just outside the nearest visual edge of a building, on the unit's side. | |
| SCVs use this to approach from whichever direction they're coming from, | |
| so they can deposit/interact from any side instead of always queuing at one point. | |
| Uses the visual box (not the collision box) so the SCV visually touches the building. | |
| """ | |
| defn = BUILDING_DEFS[building.building_type] | |
| hw = defn.width / 2 | |
| hh = defn.height / 2 | |
| px = max(building.x - hw, min(building.x + hw, unit.x)) | |
| py = max(building.y - hh, min(building.y + hh, unit.y)) | |
| dx = unit.x - px | |
| dy = unit.y - py | |
| d = math.hypot(dx, dy) | |
| margin = UNIT_RADIUS + 0.3 | |
| if d > 0: | |
| return (px + dx / d * margin, py + dy / d * margin) | |
| return (building.x, building.y + hh + margin) | |
| def _nearest_enemy_in_range( | |
| self, | |
| unit: Unit, | |
| enemy: PlayerState, | |
| max_range: float, | |
| *, | |
| for_attack_move: bool = False, | |
| ) -> Optional[Union[Unit, Building]]: | |
| """Return the nearest enemy unit or building within max_range. | |
| If for_attack_move: use unit radius and building footprint distance.""" | |
| best: Optional[tuple[float, Union[Unit, Building]]] = None | |
| unit_range = max_range + (2 * UNIT_RADIUS if for_attack_move else 0) | |
| for u in enemy.units.values(): | |
| d = unit.dist_to(u.x, u.y) | |
| if d <= unit_range and (best is None or d < best[0]): | |
| best = (d, u) | |
| build_range = max_range + (UNIT_RADIUS if for_attack_move else 0) | |
| for b in enemy.buildings.values(): | |
| if b.status == BuildingStatus.DESTROYED: | |
| continue | |
| d = self._dist_unit_to_building(unit, b) if for_attack_move else unit.dist_to( | |
| *self._building_center(b) | |
| ) | |
| if d <= build_range and (best is None or d < best[0]): | |
| best = (d, b) | |
| return best[1] if best else None | |
| def _resolve_zone(self, player_id: str, zone: str) -> tuple[float, float]: | |
| import re as _re | |
| player = self.state.players[player_id] | |
| enemy = self.state.enemy_of(player_id) | |
| cc = player.command_center() | |
| base_x = cc.x if cc else float(MAP_WIDTH) / 2 | |
| base_y = cc.y if cc else float(MAP_HEIGHT) / 2 | |
| if zone == "my_base": | |
| return (base_x, base_y) | |
| if zone == "enemy_base" and enemy: | |
| ecc = enemy.command_center() | |
| return (ecc.x, ecc.y) if ecc else (MAP_WIDTH - 5, MAP_HEIGHT - 5) | |
| if zone == "center": | |
| return (MAP_WIDTH / 2, MAP_HEIGHT / 2) | |
| if zone == "top_left": | |
| return (4.0, 4.0) | |
| if zone == "top_right": | |
| return (MAP_WIDTH - 4.0, 4.0) | |
| if zone == "bottom_left": | |
| return (4.0, MAP_HEIGHT - 4.0) | |
| if zone == "bottom_right": | |
| return (MAP_WIDTH - 4.0, MAP_HEIGHT - 4.0) | |
| if zone == "front_line": | |
| military = [ | |
| u for u in player.units.values() | |
| if u.unit_type != UnitType.SCV | |
| ] | |
| if military: | |
| avg_x = sum(u.x for u in military) / len(military) | |
| avg_y = sum(u.y for u in military) / len(military) | |
| return (avg_x, avg_y) | |
| m = _re.match(r'^mineral_(\d+)$', zone) | |
| if m: | |
| idx = int(m.group(1)) - 1 | |
| minerals = [ | |
| r for r in self.state.game_map.resources | |
| if r.resource_type == ResourceType.MINERAL and not r.is_depleted | |
| ] | |
| minerals.sort(key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2) | |
| if 0 <= idx < len(minerals): | |
| return (float(minerals[idx].x), float(minerals[idx].y)) | |
| m = _re.match(r'^geyser_(\d+)$', zone) | |
| if m: | |
| idx = int(m.group(1)) - 1 | |
| geysers = [ | |
| r for r in self.state.game_map.resources | |
| if r.resource_type == ResourceType.GEYSER | |
| ] | |
| geysers.sort(key=lambda r: (r.x - base_x) ** 2 + (r.y - base_y) ** 2) | |
| if 0 <= idx < len(geysers): | |
| return (float(geysers[idx].x), float(geysers[idx].y)) | |
| # Named geographic landmark | |
| for lm in MAP_LANDMARKS: | |
| if zone == lm["slug"]: | |
| return (float(lm["x"]), float(lm["y"])) | |
| # Clock-based position: e.g. "3h", "12h", "10h30" | |
| m = _re.match(r'^(\d{1,2})h(?:30)?$', zone) | |
| if m: | |
| hour_str = m.group(0) # full match like "3h" or "10h30" | |
| hour = int(m.group(1)) | |
| half = hour_str.endswith("30") | |
| hour_decimal = hour + (0.5 if half else 0.0) | |
| angle_rad = (hour_decimal / 12.0) * 2.0 * math.pi | |
| dx = math.sin(angle_rad) | |
| dy = -math.cos(angle_rad) # y increases downward | |
| cx_map = MAP_WIDTH / 2.0 | |
| cy_map = MAP_HEIGHT / 2.0 | |
| x = max(4.0, min(MAP_WIDTH - 4.0, cx_map + dx * cx_map * 0.9)) | |
| y = max(4.0, min(MAP_HEIGHT - 4.0, cy_map + dy * cy_map * 0.9)) | |
| return (x, y) | |
| # Fallback: enemy base | |
| if enemy: | |
| ecc = enemy.command_center() | |
| if ecc: | |
| return (float(ecc.x) + 2, float(ecc.y) + 2) | |
| return (MAP_WIDTH / 2, MAP_HEIGHT / 2) | |
| def _resolve_selector(self, player: PlayerState, selector: str, max_count: Optional[int] = None) -> list[Unit]: | |
| s = selector.lower() | |
| if s == "all": | |
| units = list(player.units.values()) | |
| elif s == "all_military": | |
| units = [u for u in player.units.values() if u.unit_type != UnitType.SCV] | |
| elif s == "all_marines": | |
| units = player.units_of(UnitType.MARINE) | |
| elif s == "all_medics": | |
| units = player.units_of(UnitType.MEDIC) | |
| elif s == "all_goliaths": | |
| units = player.units_of(UnitType.GOLIATH) | |
| elif s == "all_tanks": | |
| units = player.units_of(UnitType.TANK) | |
| elif s == "all_wraiths": | |
| units = player.units_of(UnitType.WRAITH) | |
| elif s == "all_scv": | |
| units = player.units_of(UnitType.SCV) | |
| elif s == "idle_scv": | |
| all_scvs = player.units_of(UnitType.SCV) | |
| idle = [u for u in all_scvs if u.status == UnitStatus.IDLE] | |
| if idle: | |
| units = idle | |
| else: | |
| mining = [u for u in all_scvs if u.status in (UnitStatus.MINING_MINERALS, UnitStatus.MINING_GAS)] | |
| units = mining if mining else all_scvs | |
| elif s == "most_damaged": | |
| all_units = list(player.units.values()) | |
| units = [min(all_units, key=lambda u: u.hp / u.max_hp)] if all_units else [] | |
| else: | |
| units = [] | |
| if max_count is not None: | |
| units = units[:max_count] | |
| return units | |
| def _query_unit_ids( | |
| self, player: PlayerState, target_zone: Optional[str] = None, unit_type: Optional[str] = None | |
| ) -> list[str]: | |
| """Return unit IDs matching zone (within ZONE_RADIUS of zone center) and/or unit_type.""" | |
| units: list[Unit] = list(player.units.values()) | |
| if target_zone: | |
| zx, zy = self._resolve_zone(player.player_id, target_zone) | |
| units = [u for u in units if u.dist_to(zx, zy) <= ZONE_RADIUS] | |
| if unit_type: | |
| try: | |
| ut = UnitType(unit_type) | |
| units = [u for u in units if u.unit_type == ut] | |
| except ValueError: | |
| pass | |
| return [u.id for u in units] | |
| # Vision radii (in cells) — must match frontend constants | |
| _UNIT_VISION: dict[UnitType, float] = { | |
| UnitType.SCV: 6, UnitType.MARINE: 6, UnitType.MEDIC: 6, | |
| UnitType.GOLIATH: 8, UnitType.TANK: 8, UnitType.WRAITH: 9, | |
| } | |
| _BUILDING_VISION: dict[BuildingType, float] = { | |
| BuildingType.COMMAND_CENTER: 10, BuildingType.SUPPLY_DEPOT: 7, | |
| BuildingType.BARRACKS: 7, BuildingType.ENGINEERING_BAY: 7, | |
| BuildingType.REFINERY: 7, BuildingType.FACTORY: 7, | |
| BuildingType.ARMORY: 7, BuildingType.STARPORT: 7, | |
| } | |
| _SCV_BUILD_RANGE: float = 6.0 | |
| def _is_visible(self, player: PlayerState, x: float, y: float) -> bool: | |
| """Return True if tile (x, y) is within vision of any own unit or building.""" | |
| for u in player.units.values(): | |
| r = self._UNIT_VISION.get(u.unit_type, 6.0) | |
| if (u.x - x) ** 2 + (u.y - y) ** 2 <= r * r: | |
| return True | |
| for b in player.buildings.values(): | |
| if b.status == BuildingStatus.DESTROYED: | |
| continue | |
| r = self._BUILDING_VISION.get(b.building_type, 7.0) | |
| if (b.x - x) ** 2 + (b.y - y) ** 2 <= r * r: | |
| return True | |
| return False | |
| def _find_build_position( | |
| self, | |
| player: PlayerState, | |
| bt: BuildingType, | |
| near_scv: Unit, | |
| search_center: Optional[tuple[float, float]] = None, | |
| search_radius: Optional[float] = None, | |
| ) -> Optional[tuple[float, float]]: | |
| """Return CENTER coordinates for a new building, or None if no valid spot found. | |
| If search_center is given, the spiral search is anchored there instead of the SCV. | |
| If search_radius is given, it overrides _SCV_BUILD_RANGE as the maximum search distance. | |
| """ | |
| defn = BUILDING_DEFS[bt] | |
| cx, cy = search_center if search_center else (near_scv.x, near_scv.y) | |
| radius_limit = search_radius if search_radius is not None else self._SCV_BUILD_RANGE | |
| for radius in range(1, int(radius_limit) + 2): | |
| for dx in range(-radius, radius + 1): | |
| for dy in range(-radius, radius + 1): | |
| tl_x, tl_y = int(cx) + dx, int(cy) + dy | |
| tile_cx = tl_x + defn.width / 2.0 | |
| tile_cy = tl_y + defn.height / 2.0 | |
| if (tile_cx - cx) ** 2 + (tile_cy - cy) ** 2 > radius_limit ** 2: | |
| continue | |
| if not self._can_place(tl_x, tl_y, defn): | |
| continue | |
| if not self._is_visible(player, tile_cx, tile_cy): | |
| continue | |
| return (tile_cx, tile_cy) | |
| return None | |
| def _find_expansion_position(self, player: PlayerState) -> Optional[tuple[float, float]]: | |
| """Find a valid position for a new command center near unclaimed resource clusters. | |
| An expansion position is a resource cluster (group of minerals) that has no | |
| existing command center (from any player) within CC_CLAIM_RADIUS tiles. | |
| The closest such cluster to the player's current base is preferred. | |
| """ | |
| from .map import ResourceType | |
| CC_CLAIM_RADIUS = 15.0 | |
| CLUSTER_MERGE_DIST = 10.0 | |
| CC_SEARCH_RADIUS = 14.0 | |
| all_ccs = [ | |
| b | |
| for p in self.state.players.values() | |
| for b in p.buildings.values() | |
| if b.building_type == BuildingType.COMMAND_CENTER | |
| and b.status != BuildingStatus.DESTROYED | |
| ] | |
| minerals = [ | |
| r for r in self.state.game_map.resources | |
| if r.resource_type == ResourceType.MINERAL and not r.is_depleted | |
| ] | |
| if not minerals: | |
| return None | |
| free_minerals = [ | |
| m for m in minerals | |
| if not any( | |
| (m.x - cc.x) ** 2 + (m.y - cc.y) ** 2 <= CC_CLAIM_RADIUS ** 2 | |
| for cc in all_ccs | |
| ) | |
| ] | |
| if not free_minerals: | |
| return None | |
| # Group free minerals into clusters | |
| clusters: list[list] = [] | |
| for m in free_minerals: | |
| placed = False | |
| for cluster in clusters: | |
| ccx = sum(r.x for r in cluster) / len(cluster) | |
| ccy = sum(r.y for r in cluster) / len(cluster) | |
| if (m.x - ccx) ** 2 + (m.y - ccy) ** 2 <= CLUSTER_MERGE_DIST ** 2: | |
| cluster.append(m) | |
| placed = True | |
| break | |
| if not placed: | |
| clusters.append([m]) | |
| if not clusters: | |
| return None | |
| cc = player.command_center() | |
| ref_x = cc.x if cc else float(MAP_WIDTH) / 2 | |
| ref_y = cc.y if cc else float(MAP_HEIGHT) / 2 | |
| def cluster_center(cluster: list) -> tuple[float, float]: | |
| return ( | |
| sum(r.x for r in cluster) / len(cluster), | |
| sum(r.y for r in cluster) / len(cluster), | |
| ) | |
| clusters_by_dist = sorted( | |
| clusters, | |
| key=lambda c: (cluster_center(c)[0] - ref_x) ** 2 + (cluster_center(c)[1] - ref_y) ** 2, | |
| ) | |
| cc_defn = BUILDING_DEFS[BuildingType.COMMAND_CENTER] | |
| for cluster in clusters_by_dist: | |
| ecx, ecy = cluster_center(cluster) | |
| for radius in range(0, int(CC_SEARCH_RADIUS) + 2): | |
| for dx in range(-radius, radius + 1): | |
| for dy in range(-radius, radius + 1): | |
| tl_x, tl_y = int(ecx) + dx, int(ecy) + dy | |
| tile_cx = tl_x + cc_defn.width / 2.0 | |
| tile_cy = tl_y + cc_defn.height / 2.0 | |
| if (tile_cx - ecx) ** 2 + (tile_cy - ecy) ** 2 > CC_SEARCH_RADIUS ** 2: | |
| continue | |
| if not self._can_place(tl_x, tl_y, cc_defn): | |
| continue | |
| return (tile_cx, tile_cy) | |
| return None | |
| def _eject_units_from_building(self, building: "Building") -> None: | |
| """Push any ground unit whose centre falls inside building's collision box to the nearest walkable tile.""" | |
| from .pathfinding import snap_to_walkable | |
| defn = BUILDING_DEFS[building.building_type] | |
| chw, chh = defn.col_hw(), defn.col_hh() | |
| bx0 = building.x - chw | |
| by0 = building.y - chh | |
| bx1 = building.x + chw | |
| by1 = building.y + chh | |
| blocked = self._building_blocked_rects() | |
| for p in self.state.players.values(): | |
| for unit in p.units.values(): | |
| if UNIT_DEFS[unit.unit_type].is_flying: | |
| continue | |
| if bx0 <= unit.x <= bx1 and by0 <= unit.y <= by1: | |
| nx, ny = snap_to_walkable(unit.x, unit.y, blocked_rects=blocked) | |
| unit.x = nx | |
| unit.y = ny | |
| # Clear any path that might route back inside | |
| unit.target_x = nx | |
| unit.target_y = ny | |
| unit.path_waypoints = [] | |
| def _can_place(self, tl_x: int, tl_y: int, defn: BuildingDef) -> bool: | |
| """Check if a building with given top-left corner can be placed (no overlap, in bounds).""" | |
| if tl_x < 0 or tl_y < 0 or tl_x + defn.width > MAP_WIDTH or tl_y + defn.height > MAP_HEIGHT: | |
| return False | |
| # Check overlap with existing buildings (stored as center coords) | |
| for player in self.state.players.values(): | |
| for b in player.buildings.values(): | |
| if b.status == BuildingStatus.DESTROYED: | |
| continue | |
| bd = BUILDING_DEFS[b.building_type] | |
| if tl_x < b.x + bd.width / 2 and tl_x + defn.width > b.x - bd.width / 2 \ | |
| and tl_y < b.y + bd.height / 2 and tl_y + defn.height > b.y - bd.height / 2: | |
| return False | |
| # Check overlap with resources | |
| for res in self.state.game_map.resources: | |
| if tl_x <= res.x < tl_x + defn.width and tl_y <= res.y < tl_y + defn.height: | |
| return False | |
| return True | |
| # ------------------------------------------------------------------ | |
| # Command dispatchers | |
| # ------------------------------------------------------------------ | |
| def _dispatch(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| try: | |
| t = action.type | |
| if t == ActionType.BUILD: | |
| return self._cmd_build(player, action) | |
| if t == ActionType.TRAIN: | |
| return self._cmd_train(player, action) | |
| if t == ActionType.MOVE: | |
| return self._cmd_move(player, action) | |
| if t == ActionType.ATTACK: | |
| return self._cmd_attack(player, action) | |
| if t == ActionType.SIEGE: | |
| return self._cmd_siege(player, action, siege=True) | |
| if t == ActionType.UNSIEGE: | |
| return self._cmd_siege(player, action, siege=False) | |
| if t == ActionType.CLOAK: | |
| return self._cmd_cloak(player, action, cloak=True) | |
| if t == ActionType.DECLOAK: | |
| return self._cmd_cloak(player, action, cloak=False) | |
| if t == ActionType.GATHER: | |
| return self._cmd_gather(player, action) | |
| if t == ActionType.STOP: | |
| return self._cmd_stop(player, action) | |
| if t == ActionType.PATROL: | |
| return self._cmd_patrol(player, action) | |
| if t == ActionType.QUERY: | |
| return self._cmd_query(player, action) | |
| if t == ActionType.QUERY_UNITS: | |
| return self._cmd_query_units(player, action) | |
| if t == ActionType.ASSIGN_TO_GROUP: | |
| return self._cmd_assign_to_group(player, action) | |
| if t == ActionType.DEFEND: | |
| return self._cmd_defend(player, action) | |
| if t == ActionType.RESIGN: | |
| return self._cmd_resign(player) | |
| return ActionResult( | |
| action_type=t, success=False, data={"error": "unknown_action"} | |
| ) | |
| except Exception as exc: | |
| log.exception("Error applying action %s", action.type) | |
| return ActionResult( | |
| action_type=str(action.type), success=False, data={"error": "exception", "detail": str(exc)} | |
| ) | |
| def _cmd_build(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| raw = action.building_type | |
| if not raw: | |
| return ActionResult(action_type="build", success=False, data={"error": "build_missing_type"}) | |
| try: | |
| bt = BuildingType(raw) | |
| except ValueError: | |
| return ActionResult(action_type="build", success=False, data={"error": "build_unknown", "raw": raw}) | |
| if not can_build(bt, player): | |
| missing = missing_for_build(bt, player) | |
| names = ", ".join(m.value for m in missing) | |
| return ActionResult( | |
| action_type="build", success=False, data={"error": "build_missing_prereq", "names": names} | |
| ) | |
| defn = BUILDING_DEFS[bt] | |
| if player.minerals < defn.mineral_cost or player.gas < defn.gas_cost: | |
| return ActionResult( | |
| action_type="build", success=False, | |
| data={"error": "build_insufficient_resources", "mineral": defn.mineral_cost, "gas": defn.gas_cost}, | |
| ) | |
| # Clamp count to what resources and SCVs allow | |
| count = max(1, min(action.count or 1, 5)) | |
| if defn.mineral_cost > 0: | |
| count = min(count, player.minerals // defn.mineral_cost) | |
| if defn.gas_cost > 0: | |
| count = min(count, player.gas // defn.gas_cost) | |
| # Pool of free SCVs (idle first, then mining) — one SCV per building | |
| available_scvs = ( | |
| [u for u in player.units_of(UnitType.SCV) if u.status == UnitStatus.IDLE] | |
| + [u for u in player.units_of(UnitType.SCV) | |
| if u.status in (UnitStatus.MINING_MINERALS, UnitStatus.MINING_GAS)] | |
| ) | |
| count = min(count, len(available_scvs)) | |
| if count == 0: | |
| return ActionResult(action_type="build", success=False, data={"error": "build_no_scv"}) | |
| built = 0 | |
| for i in range(count): | |
| scv = available_scvs[i] | |
| # Determine the command center that should anchor this build | |
| nearest_cc = player.nearest_command_center(scv.x, scv.y) | |
| cc_anchor = (float(nearest_cc.x), float(nearest_cc.y)) if nearest_cc else None | |
| # Find center position for this building | |
| if bt == BuildingType.COMMAND_CENTER: | |
| # New CC must go on an expansion (near unclaimed resources) | |
| pos_opt = self._find_expansion_position(player) | |
| if not pos_opt: | |
| # Fallback: build anywhere the SCV can reach | |
| pos_opt = self._find_build_position(player, bt, scv) | |
| if not pos_opt: | |
| break | |
| pos_cx, pos_cy = pos_opt | |
| elif bt == BuildingType.REFINERY: | |
| cx, cy = cc_anchor if cc_anchor else (scv.x, scv.y) | |
| geyser = self.state.game_map.nearest_geyser_without_refinery(cx, cy) | |
| if not geyser: | |
| break | |
| # Refinery (2×2) centered on geyser tile (+1 from integer geyser coord) | |
| pos_cx: float = geyser.x + 1.0 | |
| pos_cy: float = geyser.y + 1.0 | |
| if not self._is_visible(player, pos_cx, pos_cy): | |
| break | |
| geyser.has_refinery = True | |
| else: | |
| # Build around the nearest command center with a wider search radius | |
| pos_opt = self._find_build_position( | |
| player, bt, scv, | |
| search_center=cc_anchor, | |
| search_radius=12.0, | |
| ) | |
| if not pos_opt: | |
| break | |
| pos_cx, pos_cy = pos_opt | |
| # Unassign from resource if mining | |
| if scv.assigned_resource_id: | |
| res = self.state.game_map.get_resource(scv.assigned_resource_id) | |
| if res and scv.id in res.assigned_scv_ids: | |
| res.assigned_scv_ids.remove(scv.id) | |
| scv.assigned_resource_id = None | |
| scv.harvest_carry = False | |
| scv.harvest_amount = 0 | |
| scv.harvest_mining_ticks = 0 | |
| player.minerals -= defn.mineral_cost | |
| player.gas -= defn.gas_cost | |
| building = Building.create(bt, player.player_id, pos_cx, pos_cy) | |
| player.buildings[building.id] = building | |
| invalidate_path_cache() | |
| # Eject any ground units trapped inside the new building's footprint | |
| self._eject_units_from_building(building) | |
| scv.status = UnitStatus.MOVING_TO_BUILD | |
| scv.building_target_id = building.id | |
| # Navigate to the nearest point just outside the building edge | |
| bw = float(defn.width) | |
| bh = float(defn.height) | |
| edge_x = max(pos_cx - bw / 2, min(pos_cx + bw / 2, scv.x)) | |
| edge_y = max(pos_cy - bh / 2, min(pos_cy + bh / 2, scv.y)) | |
| dx = scv.x - edge_x | |
| dy = scv.y - edge_y | |
| edge_dist = math.hypot(dx, dy) | |
| approach_margin = UNIT_RADIUS + 0.6 | |
| if edge_dist > 0: | |
| dest_x = edge_x + dx / edge_dist * approach_margin | |
| dest_y = edge_y + dy / edge_dist * approach_margin | |
| else: | |
| dest_x = pos_cx + bw / 2 + approach_margin | |
| dest_y = pos_cy | |
| self._set_unit_destination(scv, dest_x, dest_y, is_flying=False) | |
| built += 1 | |
| if built == 0: | |
| return ActionResult(action_type="build", success=False, data={"error": "build_no_placement"}) | |
| return ActionResult( | |
| action_type="build", success=True, | |
| data={"built": built, "building": bt.value}, | |
| sound_events=[{"kind": "move_ack", "unit_type": "scv"}], | |
| ) | |
| def _cmd_train(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| raw = action.unit_type | |
| if not raw: | |
| return ActionResult(action_type="train", success=False, data={"error": "train_missing_type"}) | |
| try: | |
| ut = UnitType(raw) | |
| except ValueError: | |
| return ActionResult(action_type="train", success=False, data={"error": "train_unknown", "raw": raw}) | |
| if not can_train(ut, player): | |
| missing = missing_for_train(ut, player) | |
| names = ", ".join(m.value for m in missing) | |
| return ActionResult( | |
| action_type="train", success=False, data={"error": "train_missing_prereq", "names": names} | |
| ) | |
| defn = UNIT_DEFS[ut] | |
| producer_type = get_producer(ut) | |
| producers = player.active_buildings_of(producer_type) | |
| if not producers: | |
| return ActionResult( | |
| action_type="train", success=False, | |
| data={"error": "train_no_producer", "producer": producer_type.value}, | |
| ) | |
| count = max(1, min(action.count or 1, 20)) | |
| num_producers = len(producers) | |
| total_minerals = defn.mineral_cost * count | |
| total_gas = defn.gas_cost * count | |
| if player.minerals < total_minerals or player.gas < total_gas: | |
| return ActionResult( | |
| action_type="train", success=False, data={"error": "train_insufficient_resources"} | |
| ) | |
| queued_supply = sum( | |
| UNIT_DEFS[UnitType(item.unit_type)].supply_cost | |
| for b in player.buildings.values() | |
| for item in b.production_queue | |
| ) | |
| if player.supply_used + queued_supply + defn.supply_cost * count > player.supply_max: | |
| return ActionResult( | |
| action_type="train", success=False, data={"error": "train_insufficient_supply"} | |
| ) | |
| from .buildings import ProductionItem # local import to avoid cycle | |
| for i in range(count): | |
| building = producers[i % num_producers] | |
| building.production_queue.append( | |
| ProductionItem( | |
| unit_type=ut.value, | |
| ticks_remaining=defn.build_time_ticks, | |
| max_ticks=defn.build_time_ticks, | |
| ) | |
| ) | |
| player.minerals -= total_minerals | |
| player.gas -= total_gas | |
| return ActionResult( | |
| action_type="train", success=True, | |
| data={"count": count, "unit": ut.value}, | |
| ) | |
| def _cmd_move(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| units = self._resolve_selector(player, action.unit_selector or "all_military", max_count=action.count) | |
| if not units: | |
| return ActionResult(action_type="move", success=False, data={"error": "no_units_selected"}) | |
| tx, ty = self._resolve_zone(player.player_id, action.target_zone or "center") | |
| for unit in units: | |
| if unit.is_sieged: | |
| continue | |
| unit.status = UnitStatus.MOVING | |
| unit.attack_target_id = None | |
| unit.attack_target_building_id = None | |
| self._set_unit_destination(unit, tx, ty, is_flying=UNIT_DEFS[unit.unit_type].is_flying) | |
| move_ack = [{"kind": "move_ack", "unit_type": units[0].unit_type.value}] | |
| zone = action.target_zone or "center" | |
| return ActionResult( | |
| action_type="move", success=True, | |
| data={"n": len(units), "zone": zone}, sound_events=move_ack, | |
| ) | |
| def _cmd_attack(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| units = self._resolve_selector(player, action.unit_selector or "all_military", max_count=action.count) | |
| if not units: | |
| return ActionResult(action_type="attack", success=False, data={"error": "no_units_selected"}) | |
| tx, ty = self._resolve_zone(player.player_id, action.target_zone or "enemy_base") | |
| for unit in units: | |
| if unit.is_sieged: | |
| continue | |
| unit.status = UnitStatus.ATTACKING | |
| unit.attack_target_id = None | |
| unit.attack_target_building_id = None | |
| self._set_unit_destination(unit, tx, ty, is_flying=UNIT_DEFS[unit.unit_type].is_flying) | |
| move_ack = [{"kind": "move_ack", "unit_type": units[0].unit_type.value}] | |
| zone = action.target_zone or "enemy_base" | |
| return ActionResult( | |
| action_type="attack", success=True, | |
| data={"n": len(units), "zone": zone}, sound_events=move_ack, | |
| ) | |
| def _cmd_siege(self, player: PlayerState, action: GameAction, siege: bool) -> ActionResult: | |
| tanks = self._resolve_selector(player, action.unit_selector or "all_tanks", max_count=action.count) | |
| tanks = [u for u in tanks if u.unit_type == UnitType.TANK] | |
| if not tanks: | |
| return ActionResult(action_type="siege", success=False, data={"error": "siege_no_tanks"}) | |
| for tank in tanks: | |
| tank.is_sieged = siege | |
| tank.status = UnitStatus.SIEGED if siege else UnitStatus.IDLE | |
| if siege: | |
| tank.target_x = tank.target_y = None | |
| mode = "siege" if siege else "mobile" | |
| return ActionResult( | |
| action_type="siege", success=True, | |
| data={"n": len(tanks), "mode": mode}, | |
| ) | |
| def _cmd_cloak(self, player: PlayerState, action: GameAction, cloak: bool) -> ActionResult: | |
| wraiths = self._resolve_selector(player, action.unit_selector or "all_wraiths", max_count=action.count) | |
| wraiths = [u for u in wraiths if u.unit_type == UnitType.WRAITH] | |
| if not wraiths: | |
| return ActionResult(action_type="cloak", success=False, data={"error": "cloak_no_wraiths"}) | |
| for wraith in wraiths: | |
| wraith.is_cloaked = cloak | |
| state = "on" if cloak else "off" | |
| return ActionResult( | |
| action_type="cloak", success=True, | |
| data={"n": len(wraiths), "state": state}, | |
| ) | |
| def _cmd_gather(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| resource_type = (action.resource_type or "minerals").lower() | |
| cc = player.command_center() | |
| cx, cy = (float(cc.x), float(cc.y)) if cc else (0.0, 0.0) | |
| # For gas we default to all_scv so that SCVs currently mining minerals | |
| # can be reassigned; for minerals we prefer idle SCVs to avoid disrupting gas workers. | |
| if resource_type == "gas": | |
| default_selector = "all_scv" | |
| else: | |
| default_selector = "idle_scv" | |
| scvs = self._resolve_selector(player, action.unit_selector or default_selector, max_count=action.count) | |
| scvs = [u for u in scvs if u.unit_type == UnitType.SCV] | |
| if not scvs: | |
| return ActionResult(action_type="gather", success=False, data={"error": "gather_no_scv"}) | |
| assigned = 0 | |
| if resource_type == "gas": | |
| for scv in scvs: | |
| geyser = self.state.game_map.nearest_available_geyser(cx, cy) | |
| if not geyser: | |
| break | |
| if scv.assigned_resource_id and scv.assigned_resource_id in \ | |
| [r.id for r in self.state.game_map.resources]: | |
| old = self.state.game_map.get_resource(scv.assigned_resource_id) | |
| if old and scv.id in old.assigned_scv_ids: | |
| old.assigned_scv_ids.remove(scv.id) | |
| scv.status = UnitStatus.MINING_GAS | |
| scv.assigned_resource_id = geyser.id | |
| scv.harvest_carry = False | |
| scv.harvest_amount = 0 | |
| scv.harvest_mining_ticks = 0 | |
| self._set_unit_destination(scv, float(geyser.x), float(geyser.y), is_flying=False) | |
| geyser.assigned_scv_ids.append(scv.id) | |
| assigned += 1 | |
| else: | |
| # Distribute SCVs across patches evenly: pick the patch with fewest assigned SCVs | |
| # (avoids funnelling all 5 SCVs to the same nearest patch) | |
| mineral_patches = [ | |
| r for r in self.state.game_map.resources | |
| if r.resource_type.value == "mineral" and not r.is_depleted and r.has_capacity | |
| ] | |
| if not mineral_patches: | |
| return ActionResult(action_type="gather", success=False, data={"error": "gather_no_resource"}) | |
| for scv in scvs: | |
| if not mineral_patches: | |
| break | |
| # Choose patch with fewest assigned SCVs (tie-break: nearest to CC) | |
| patch = min( | |
| mineral_patches, | |
| key=lambda r: (len(r.assigned_scv_ids), (r.x - cx) ** 2 + (r.y - cy) ** 2) | |
| ) | |
| if not patch: | |
| break | |
| if scv.assigned_resource_id: | |
| old = self.state.game_map.get_resource(scv.assigned_resource_id) | |
| if old and scv.id in old.assigned_scv_ids: | |
| old.assigned_scv_ids.remove(scv.id) | |
| scv.status = UnitStatus.MINING_MINERALS | |
| scv.assigned_resource_id = patch.id | |
| scv.harvest_carry = False | |
| scv.harvest_amount = 0 | |
| scv.harvest_mining_ticks = 0 | |
| self._set_unit_destination(scv, float(patch.x), float(patch.y), is_flying=False) | |
| patch.assigned_scv_ids.append(scv.id) | |
| assigned += 1 | |
| # Remove full patches from candidates | |
| mineral_patches = [r for r in mineral_patches if r.has_capacity] | |
| if assigned == 0: | |
| return ActionResult( | |
| action_type="gather", success=False, data={"error": "gather_no_resource"} | |
| ) | |
| move_ack = [{"kind": "move_ack", "unit_type": "scv"}] | |
| return ActionResult( | |
| action_type="gather", success=True, | |
| data={"n": assigned, "resource": resource_type}, sound_events=move_ack, | |
| ) | |
| def _cmd_stop(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| units = self._resolve_selector(player, action.unit_selector or "all_military", max_count=action.count) | |
| for unit in units: | |
| unit.status = UnitStatus.IDLE | |
| unit.target_x = unit.target_y = None | |
| unit.path_waypoints = [] | |
| unit.attack_target_id = None | |
| unit.attack_target_building_id = None | |
| return ActionResult(action_type="stop", success=True, data={"n": len(units)}) | |
| def _cmd_patrol(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| units = self._resolve_selector(player, action.unit_selector or "all_military", max_count=action.count) | |
| if not units: | |
| return ActionResult(action_type="patrol", success=False, data={"error": "no_units_selected"}) | |
| tx, ty = self._resolve_zone(player.player_id, action.target_zone or "center") | |
| for unit in units: | |
| if unit.is_sieged: | |
| continue | |
| unit.patrol_x = unit.x | |
| unit.patrol_y = unit.y | |
| unit.status = UnitStatus.PATROLLING | |
| self._set_unit_destination(unit, tx, ty, is_flying=UNIT_DEFS[unit.unit_type].is_flying) | |
| move_ack = [{"kind": "move_ack", "unit_type": units[0].unit_type.value}] | |
| zone = action.target_zone or "center" | |
| return ActionResult( | |
| action_type="patrol", success=True, | |
| data={"n": len(units), "zone": zone}, sound_events=move_ack, | |
| ) | |
| def _cmd_query(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| return ActionResult( | |
| action_type="query", success=True, | |
| data={"summary": player.summary(self._cmd_lang)}, | |
| ) | |
| def _cmd_query_units(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| """Query units by zone and/or type; return their IDs in result.unit_ids.""" | |
| zone = (action.target_zone or "").strip() or None | |
| ut = (action.unit_type or "").strip() or None | |
| ids = self._query_unit_ids(player, target_zone=zone, unit_type=ut) | |
| return ActionResult( | |
| action_type="query_units", | |
| success=True, | |
| data={"n": len(ids)}, | |
| unit_ids=ids, | |
| ) | |
| def _cmd_assign_to_group(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| """Assign unit IDs to a control group (1, 2, or 3). Uses only IDs; invalid IDs are skipped.""" | |
| gi = action.group_index | |
| if gi is None or gi not in (1, 2, 3): | |
| return ActionResult( | |
| action_type="assign_to_group", | |
| success=False, | |
| data={"error": "group_invalid"}, | |
| ) | |
| ids = list(action.unit_ids) if action.unit_ids else [] | |
| valid_ids = [uid for uid in ids if uid in player.units] | |
| player.control_groups[gi] = valid_ids | |
| return ActionResult( | |
| action_type="assign_to_group", | |
| success=True, | |
| data={"gi": gi, "n": len(valid_ids)}, | |
| ) | |
| def _cmd_defend(self, player: PlayerState, action: GameAction) -> ActionResult: | |
| """Send available military units to patrol around a base zone. | |
| Units are spread evenly on a circle around the zone center and bounce | |
| between two diametrically-opposite perimeter points so they continuously | |
| sweep the area and auto-attack any enemy that enters their weapon range. | |
| """ | |
| units = self._resolve_selector( | |
| player, action.unit_selector or "all_military", max_count=action.count | |
| ) | |
| units = [u for u in units if not u.is_sieged and u.unit_type != UnitType.SCV] | |
| if not units: | |
| return ActionResult(action_type="defend", success=False, data={"error": "no_units_selected"}) | |
| zone = action.target_zone or "my_base" | |
| cx, cy = self._resolve_zone(player.player_id, zone) | |
| DEFEND_RADIUS = 6.0 # patrol orbit radius around the base center | |
| n = len(units) | |
| for i, unit in enumerate(units): | |
| angle = (2 * math.pi * i) / n | |
| # Two opposite points on the perimeter | |
| px1 = cx + DEFEND_RADIUS * math.cos(angle) | |
| py1 = cy + DEFEND_RADIUS * math.sin(angle) | |
| px2 = cx + DEFEND_RADIUS * math.cos(angle + math.pi) | |
| py2 = cy + DEFEND_RADIUS * math.sin(angle + math.pi) | |
| is_flying = UNIT_DEFS[unit.unit_type].is_flying | |
| unit.attack_target_id = None | |
| unit.attack_target_building_id = None | |
| # Store the far patrol point as the return waypoint | |
| unit.patrol_x = px2 | |
| unit.patrol_y = py2 | |
| unit.status = UnitStatus.PATROLLING | |
| self._set_unit_destination(unit, px1, py1, is_flying=is_flying) | |
| move_ack = [{"kind": "move_ack", "unit_type": units[0].unit_type.value}] | |
| return ActionResult( | |
| action_type="defend", success=True, | |
| data={"n": len(units), "zone": zone}, sound_events=move_ack, | |
| ) | |
| def _cmd_resign(self, player: PlayerState) -> ActionResult: | |
| """Player forfeits — opponent is declared winner immediately.""" | |
| opponent_id = next( | |
| (pid for pid in self.state.players if pid != player.player_id), None | |
| ) | |
| if not opponent_id: | |
| return ActionResult(action_type="resign", success=False, data={"error": "no_opponent"}) | |
| self.state.phase = GamePhase.GAME_OVER | |
| self.state.winner = opponent_id | |
| return ActionResult(action_type="resign", success=True, data={}) | |
| def cancel_building_construction(self, player_id: str, building_id: str) -> ActionResult: | |
| """Cancel an in-progress construction: remove the building and refund 75% of costs.""" | |
| player = self.state.players.get(player_id) | |
| if not player: | |
| return ActionResult(action_type="cancel_construction", success=False, data={"error": "player_not_found"}) | |
| building = player.buildings.get(building_id) | |
| if not building: | |
| return ActionResult(action_type="cancel_construction", success=False, data={"error": "building_not_found"}) | |
| if building.status != BuildingStatus.CONSTRUCTING: | |
| return ActionResult(action_type="cancel_construction", success=False, data={"error": "not_under_construction"}) | |
| defn = BUILDING_DEFS[building.building_type] | |
| # Free the SCV assigned to this building | |
| for unit in player.units.values(): | |
| if unit.building_target_id == building_id: | |
| unit.building_target_id = None | |
| unit.target_x = unit.target_y = None | |
| unit.path_waypoints = [] | |
| unit.status = UnitStatus.IDLE | |
| # If it's a refinery, mark the underlying geyser as free again | |
| if building.building_type == BuildingType.REFINERY: | |
| geyser_x = building.x - 1.0 | |
| geyser_y = building.y - 1.0 | |
| for resource in self.state.game_map.resources: | |
| if resource.resource_type == ResourceType.GEYSER and abs(resource.x - geyser_x) < 0.5 and abs(resource.y - geyser_y) < 0.5: | |
| resource.has_refinery = False | |
| break | |
| del player.buildings[building_id] | |
| invalidate_path_cache() | |
| # Refund 75% of the original cost (rounded down) | |
| player.minerals += int(defn.mineral_cost * 0.75) | |
| player.gas += int(defn.gas_cost * 0.75) | |
| return ActionResult( | |
| action_type="cancel_construction", success=True, | |
| data={"building": building.building_type.value, "refund_minerals": int(defn.mineral_cost * 0.75), "refund_gas": int(defn.gas_cost * 0.75)}, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Broadcast | |
| # ------------------------------------------------------------------ | |
| async def _broadcast(self) -> None: | |
| payload = self.state.model_dump(mode="json") | |
| payload["sound_events"] = self._sound_events | |
| await self.sio.emit("game_update", payload, room=self.state.room_id) | |