| """ |
| KAPS Unified Mission Controller |
| ================================ |
| |
| The singular purpose: PROTECT THE BUZZARD. |
| |
| This controller unifies all subsystems under one coherent objective. |
| DreamerV3's world model learns this objective implicitly through |
| reward shaping - the goal exists in latent space, not as explicit |
| rules that could be observed or exploited by adversaries. |
| |
| SUBSYSTEMS COMMANDED: |
| -------------------- |
| 1. TAB Array - Individual airfoil control (dispersed mode) |
| 2. Slingshot Controller - Bola/nunchaku dynamics (consolidated mode) |
| 3. Airfoiled Buzzard - Deployable wings and grid fins |
| 4. Threat Response - Intercept coordination |
| 5. Formation Control - Defensive geometry |
| |
| THE KEY INSIGHT: |
| --------------- |
| The objective (protect Buzzard) emerges from reward shaping, not |
| explicit programming. DreamerV3's RSSM encodes the goal in its |
| latent state. An adversary observing behavior cannot easily |
| extract the decision logic - it's distributed across the |
| world model's learned representations. |
| |
| "The omission of its physical form could stand to retain control" |
| - The AI's intentions are implicit in behavior, not explicit rules. |
| """ |
|
|
| import numpy as np |
| from typing import Dict, List, Optional, Tuple, Any |
| from dataclasses import dataclass, field |
| from enum import Enum, auto |
|
|
|
|
| class MissionMode(Enum): |
| """Overall mission operational modes.""" |
| PATROL = auto() |
| ALERT = auto() |
| ENGAGE = auto() |
| EVASIVE = auto() |
| SLINGSHOT = auto() |
| RECOVERY = auto() |
|
|
|
|
| class ThreatPriority(Enum): |
| """Threat engagement priority levels.""" |
| CRITICAL = auto() |
| HIGH = auto() |
| MEDIUM = auto() |
| LOW = auto() |
| NONE = auto() |
|
|
|
|
| @dataclass |
| class MissionState: |
| """Complete mission state for DreamerV3 observation.""" |
| |
| buzzard_health: float = 100.0 |
| buzzard_position: np.ndarray = field(default_factory=lambda: np.zeros(3)) |
| buzzard_velocity: np.ndarray = field(default_factory=lambda: np.zeros(3)) |
| buzzard_wing_extension: float = 0.0 |
| |
| |
| tabs_attached: Dict[str, bool] = field(default_factory=lambda: { |
| "UP": True, "DOWN": True, "LEFT": True, "RIGHT": True |
| }) |
| tabs_positions: Dict[str, np.ndarray] = field(default_factory=dict) |
| tabs_available: int = 4 |
| |
| |
| slingshot_active: bool = False |
| slingshot_state: str = "DISPERSED" |
| bola_position: np.ndarray = field(default_factory=lambda: np.zeros(3)) |
| bola_velocity: float = 0.0 |
| swing_angular_velocity: float = 0.0 |
| |
| |
| threat_count: int = 0 |
| threat_priority: ThreatPriority = ThreatPriority.NONE |
| nearest_threat_distance: float = float('inf') |
| nearest_threat_time_to_impact: float = float('inf') |
| |
| |
| mode: MissionMode = MissionMode.PATROL |
| time_in_mode: float = 0.0 |
| intercepts_this_episode: int = 0 |
| damage_taken: float = 0.0 |
|
|
|
|
| class RewardShaper: |
| """ |
| Shapes rewards to implicitly encode the mission objective. |
| |
| DreamerV3 learns the objective through these signals, NOT through |
| explicit goal programming. The world model's latent state |
| comes to represent "protect Buzzard" as an emergent property. |
| """ |
| |
| def __init__(self): |
| |
| self.buzzard_alive_per_step = 1.0 |
| self.buzzard_damage_penalty = -100.0 |
| self.buzzard_destroyed = -10000.0 |
| |
| |
| self.threat_intercepted = 200.0 |
| self.threat_evaded = 50.0 |
| |
| |
| self.tab_sacrifice_for_kill = 100.0 |
| self.tab_lost_no_value = -50.0 |
| |
| |
| self.slingshot_consolidate = 5.0 |
| self.slingshot_release_hit = 300.0 |
| self.slingshot_release_miss = -20.0 |
| self.swing_momentum_bonus = 0.1 |
| |
| |
| self.wing_deploy_tactical = 10.0 |
| self.grid_fin_steer_success = 5.0 |
| |
| |
| self.state_novelty = 2.0 |
| self.physics_discovery = 20.0 |
| self.new_maneuver = 50.0 |
| |
| def compute_reward(self, |
| prev_state: MissionState, |
| curr_state: MissionState, |
| events: Dict[str, Any]) -> Tuple[float, Dict[str, float]]: |
| """ |
| Compute shaped reward from state transition. |
| |
| Returns: (total_reward, reward_breakdown) |
| """ |
| breakdown = {} |
| total = 0.0 |
| |
| |
| if curr_state.buzzard_health > 0: |
| breakdown['alive'] = self.buzzard_alive_per_step |
| total += self.buzzard_alive_per_step |
| else: |
| breakdown['destroyed'] = self.buzzard_destroyed |
| return self.buzzard_destroyed, breakdown |
| |
| |
| damage = prev_state.buzzard_health - curr_state.buzzard_health |
| if damage > 0: |
| penalty = damage * self.buzzard_damage_penalty |
| breakdown['damage'] = penalty |
| total += penalty |
| |
| |
| intercepts = events.get('intercepts', 0) |
| if intercepts > 0: |
| breakdown['intercepts'] = intercepts * self.threat_intercepted |
| total += intercepts * self.threat_intercepted |
| |
| |
| sacrifices = events.get('heroic_sacrifices', 0) |
| if sacrifices > 0: |
| breakdown['sacrifices'] = sacrifices * self.tab_sacrifice_for_kill |
| total += sacrifices * self.tab_sacrifice_for_kill |
| |
| |
| if curr_state.slingshot_active: |
| |
| momentum_reward = curr_state.swing_angular_velocity * self.swing_momentum_bonus |
| breakdown['swing_momentum'] = momentum_reward |
| total += momentum_reward |
| |
| |
| if events.get('bola_hit', False): |
| breakdown['bola_hit'] = self.slingshot_release_hit |
| total += self.slingshot_release_hit |
| |
| return total, breakdown |
|
|
|
|
| class UnifiedMissionController: |
| """ |
| Unified controller that commands all KAPS subsystems. |
| |
| DreamerV3 interfaces with this controller, which translates |
| latent decisions into coordinated actions across: |
| - TAB array |
| - Slingshot dynamics |
| - Buzzard airfoil surfaces |
| - Threat response |
| |
| The mission objective (PROTECT BUZZARD) is not explicitly |
| programmed here - it emerges from the reward shaper and |
| DreamerV3's learned world model. |
| """ |
| |
| def __init__(self): |
| self.state = MissionState() |
| self.reward_shaper = RewardShaper() |
| self.mode = MissionMode.PATROL |
| |
| |
| self.mode_start_time = 0.0 |
| self.time_in_mode = 0.0 |
| |
| |
| self.action_history = [] |
| |
| |
| self.slingshot_controller = None |
| self.airfoiled_buzzard = None |
| self.tab_array = None |
| self.threat_spawner = None |
| |
| def register_subsystems(self, |
| slingshot=None, |
| buzzard_airfoil=None, |
| tab_array=None, |
| threats=None): |
| """Register all subsystems for unified control.""" |
| self.slingshot_controller = slingshot |
| self.airfoiled_buzzard = buzzard_airfoil |
| self.tab_array = tab_array |
| self.threat_spawner = threats |
| |
| def update_state(self, sim_state: Dict) -> MissionState: |
| """ |
| Update mission state from simulation. |
| |
| This creates the observation that DreamerV3 sees. |
| """ |
| prev_state = self.state |
| |
| |
| self.state.buzzard_health = sim_state.get('buzzard_health', 100.0) |
| self.state.buzzard_position = np.array(sim_state.get('buzzard_position', [0, 0, 1000])) |
| self.state.buzzard_velocity = np.array(sim_state.get('buzzard_velocity', [0, 50, 0])) |
| |
| if self.airfoiled_buzzard: |
| self.state.buzzard_wing_extension = self.airfoiled_buzzard.wing_extension |
| |
| |
| if self.tab_array: |
| for tab_id in ["UP", "DOWN", "LEFT", "RIGHT"]: |
| tab = self.tab_array.tabs.get(tab_id) |
| if tab: |
| self.state.tabs_attached[tab_id] = tab.is_attached |
| self.state.tabs_positions[tab_id] = tab.position |
| self.state.tabs_available = sum(1 for v in self.state.tabs_attached.values() if v) |
| |
| |
| if self.slingshot_controller: |
| self.state.slingshot_active = self.slingshot_controller.state.name != "DISPERSED" |
| self.state.slingshot_state = self.slingshot_controller.state.name |
| self.state.bola_position = self.slingshot_controller.bola.position |
| self.state.bola_velocity = np.linalg.norm(self.slingshot_controller.bola.velocity) |
| self.state.swing_angular_velocity = self.slingshot_controller.swing_angular_velocity |
| |
| |
| if self.threat_spawner: |
| threats = self.threat_spawner.get_active_threats() |
| self.state.threat_count = len(threats) |
| |
| if threats: |
| |
| min_dist = float('inf') |
| min_tti = float('inf') |
| |
| for threat in threats: |
| dist = np.linalg.norm(threat.position - self.state.buzzard_position) |
| if dist < min_dist: |
| min_dist = dist |
| |
| |
| closing = np.dot( |
| threat.velocity - self.state.buzzard_velocity, |
| self.state.buzzard_position - threat.position |
| ) / (dist + 0.001) |
| if closing > 0: |
| tti = dist / closing |
| if tti < min_tti: |
| min_tti = tti |
| |
| self.state.nearest_threat_distance = min_dist |
| self.state.nearest_threat_time_to_impact = min_tti |
| |
| |
| if min_tti < 2.0: |
| self.state.threat_priority = ThreatPriority.CRITICAL |
| elif min_tti < 5.0: |
| self.state.threat_priority = ThreatPriority.HIGH |
| elif min_tti < 10.0: |
| self.state.threat_priority = ThreatPriority.MEDIUM |
| else: |
| self.state.threat_priority = ThreatPriority.LOW |
| else: |
| self.state.threat_priority = ThreatPriority.NONE |
| |
| |
| self._update_mode() |
| |
| return self.state |
| |
| def _update_mode(self): |
| """Update mission mode based on state.""" |
| old_mode = self.mode |
| |
| |
| if self.state.buzzard_health < 50: |
| self.mode = MissionMode.EVASIVE |
| elif self.state.slingshot_active: |
| self.mode = MissionMode.SLINGSHOT |
| elif self.state.threat_priority in [ThreatPriority.CRITICAL, ThreatPriority.HIGH]: |
| self.mode = MissionMode.ENGAGE |
| elif self.state.threat_priority in [ThreatPriority.MEDIUM, ThreatPriority.LOW]: |
| self.mode = MissionMode.ALERT |
| elif self.state.tabs_available < 4: |
| self.mode = MissionMode.RECOVERY |
| else: |
| self.mode = MissionMode.PATROL |
| |
| |
| if old_mode != self.mode: |
| self.mode_start_time = 0.0 |
| self.time_in_mode = 0.0 |
| |
| self.state.mode = self.mode |
| |
| def decode_action(self, action: np.ndarray) -> Dict[str, Any]: |
| """ |
| Decode DreamerV3 action into subsystem commands. |
| |
| The action is a continuous vector that gets translated |
| into specific commands for each subsystem. |
| |
| Action space layout (26 dimensions): |
| [0-15] - TAB controls (4 TABs × 4 channels each) |
| [elevator, aileron, rudder, cable_tension] |
| [16-19] - TAB release commands (4) |
| [20] - Slingshot mode toggle |
| [21] - Slingshot swing torque |
| [22] - Slingshot release |
| [23] - Buzzard wing extension |
| [24-25] - Buzzard pitch/yaw bias |
| """ |
| commands = {} |
| |
| |
| commands['tabs'] = {} |
| for i, tab_id in enumerate(["UP", "DOWN", "LEFT", "RIGHT"]): |
| base = i * 4 |
| commands['tabs'][tab_id] = { |
| 'elevator': action[base + 0] if len(action) > base else 0, |
| 'aileron': action[base + 1] if len(action) > base + 1 else 0, |
| 'rudder': action[base + 2] if len(action) > base + 2 else 0, |
| 'tension': action[base + 3] if len(action) > base + 3 else 0.5, |
| } |
| |
| |
| commands['releases'] = {} |
| for i, tab_id in enumerate(["UP", "DOWN", "LEFT", "RIGHT"]): |
| idx = 16 + i |
| commands['releases'][tab_id] = action[idx] > 0.5 if len(action) > idx else False |
| |
| |
| if len(action) > 20: |
| commands['slingshot'] = { |
| 'toggle': action[20] > 0.5, |
| 'torque': action[21] if len(action) > 21 else 0, |
| 'release': action[22] > 0.5 if len(action) > 22 else False, |
| } |
| |
| |
| if len(action) > 23: |
| commands['buzzard'] = { |
| 'wing_extension': (action[23] + 1) / 2, |
| 'pitch_bias': action[24] if len(action) > 24 else 0, |
| 'yaw_bias': action[25] if len(action) > 25 else 0, |
| } |
| |
| return commands |
| |
| def execute_commands(self, commands: Dict[str, Any], dt: float): |
| """ |
| Execute decoded commands across all subsystems. |
| """ |
| |
| if self.tab_array and 'tabs' in commands: |
| for tab_id, ctrl in commands['tabs'].items(): |
| tab = self.tab_array.tabs.get(tab_id) |
| if tab and tab.is_attached: |
| tab.set_control_surfaces( |
| elevator=ctrl['elevator'], |
| aileron=ctrl['aileron'], |
| rudder=ctrl['rudder'] |
| ) |
| tab.set_cable_tension_target(ctrl['tension']) |
| |
| |
| if self.tab_array and 'releases' in commands: |
| for tab_id, should_release in commands['releases'].items(): |
| if should_release: |
| tab = self.tab_array.tabs.get(tab_id) |
| if tab and tab.is_attached: |
| tab.release() |
| |
| |
| if self.slingshot_controller and 'slingshot' in commands: |
| ss = commands['slingshot'] |
| |
| if ss.get('toggle', False): |
| |
| if self.slingshot_controller.state.name == "DISPERSED": |
| |
| tab_states = { |
| tid: { |
| 'position': t.position, |
| 'velocity': t.velocity, |
| 'mass': 5.0, |
| 'is_attached': t.is_attached |
| } |
| for tid, t in self.tab_array.tabs.items() |
| } if self.tab_array else {} |
| self.slingshot_controller.consolidate_tabs(tab_states) |
| self.slingshot_controller.begin_swing( |
| self.state.buzzard_position, |
| initial_angular_rate=1.0 |
| ) |
| |
| if ss.get('torque', 0) != 0: |
| self.slingshot_controller.accelerate_swing( |
| torque=ss['torque'] * 5000, |
| dt=dt |
| ) |
| |
| if ss.get('release', False): |
| self.slingshot_controller.release() |
| self.slingshot_controller.deploy_grid_fins() |
| |
| |
| if self.airfoiled_buzzard and 'buzzard' in commands: |
| buz = commands['buzzard'] |
| self.airfoiled_buzzard.extend_wings( |
| target_extension=buz.get('wing_extension', 0), |
| dt=dt |
| ) |
| self.airfoiled_buzzard.angle_of_attack = buz.get('pitch_bias', 0) * 15 |
| self.airfoiled_buzzard.bank_angle = buz.get('yaw_bias', 0) * 30 |
| |
| def get_observation_vector(self) -> np.ndarray: |
| """ |
| Create observation vector for DreamerV3. |
| |
| This is what the world model sees. The objective (protect Buzzard) |
| is NOT explicitly encoded - it emerges from learning with rewards. |
| """ |
| obs = [] |
| |
| |
| obs.extend(self.state.buzzard_position) |
| obs.extend(self.state.buzzard_velocity) |
| obs.append(self.state.buzzard_health / 100.0) |
| obs.append(self.state.buzzard_wing_extension) |
| obs.append(float(self.mode.value) / 6.0) |
| |
| |
| for tab_id in ["UP", "DOWN", "LEFT", "RIGHT"]: |
| if tab_id in self.state.tabs_positions: |
| obs.extend(self.state.tabs_positions[tab_id]) |
| else: |
| obs.extend([0, 0, 0]) |
| obs.append(1.0 if self.state.tabs_attached.get(tab_id, False) else 0.0) |
| |
| if tab_id in self.state.tabs_positions: |
| rel = self.state.tabs_positions[tab_id] - self.state.buzzard_position |
| obs.extend(rel / 100.0) |
| else: |
| obs.extend([0, 0, 0]) |
| obs.append(0) |
| |
| |
| obs.append(1.0 if self.state.slingshot_active else 0.0) |
| obs.append(self.state.swing_angular_velocity / 10.0) |
| obs.extend(self.state.bola_position / 100.0) |
| obs.append(self.state.bola_velocity / 100.0) |
| |
| |
| obs.append(self.state.threat_count / 10.0) |
| obs.append(float(self.state.threat_priority.value) / 5.0) |
| obs.append(min(1.0, self.state.nearest_threat_distance / 500.0)) |
| obs.append(min(1.0, self.state.nearest_threat_time_to_impact / 10.0)) |
| obs.extend([0, 0, 0, 0]) |
| |
| return np.array(obs, dtype=np.float32) |
| |
| def get_extended_action_space_dim(self) -> int: |
| """Return dimension of extended action space.""" |
| return 26 |
|
|
|
|
| |
| |
| |
|
|
| def create_unified_observation_space(): |
| """Create gym observation space for unified controller.""" |
| try: |
| from gymnasium import spaces |
| except ImportError: |
| from gym import spaces |
| |
| |
| obs_dim = 55 |
| |
| return spaces.Box( |
| low=-np.inf, high=np.inf, |
| shape=(obs_dim,), dtype=np.float32 |
| ) |
|
|
|
|
| def create_unified_action_space(): |
| """Create gym action space for unified controller.""" |
| try: |
| from gymnasium import spaces |
| except ImportError: |
| from gym import spaces |
| |
| return spaces.Box( |
| low=-1.0, high=1.0, |
| shape=(26,), dtype=np.float32 |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| print("=== Unified Mission Controller ===\n") |
| |
| controller = UnifiedMissionController() |
| |
| print("Mission Modes:") |
| for mode in MissionMode: |
| print(f" {mode.name}") |
| |
| print("\nThreat Priority Levels:") |
| for prio in ThreatPriority: |
| print(f" {prio.name}") |
| |
| print("\nObservation space:", create_unified_observation_space()) |
| print("Action space:", create_unified_action_space()) |
| |
| print("\nReward shaping (implicit objective encoding):") |
| shaper = RewardShaper() |
| print(f" Buzzard alive: +{shaper.buzzard_alive_per_step}/step") |
| print(f" Buzzard damage: {shaper.buzzard_damage_penalty}/point") |
| print(f" Threat intercepted: +{shaper.threat_intercepted}") |
| print(f" Bola hit: +{shaper.slingshot_release_hit}") |
| |
| print("\n=== Test Complete ===") |
|
|