""" A 2D drone environment with dynamic wind forces for reinforcement learning. The drone can apply discrete thrust actions while being affected by smoothly varying wind. The goal is to navigate and survive within the bounded world. """ import numpy as np import gymnasium as gym from gymnasium import spaces from typing import Optional, Tuple, Dict, Any # Constants DT = 0.1 # Time step MAX_VEL = 2.0 # Maximum velocity magnitude WIND_MAX = 2.0 # Maximum wind magnitude WIND_SMOOTHING = 0.05 # Wind interpolation rate toward target WIND_TARGET_INTERVAL = 50 # Steps between sampling new wind target MAX_STEPS = 500 # Maximum episode length POSITION_MIN = 0.0 # Minimum position (x, y) POSITION_MAX = 1.0 # Maximum position (x, y) THRUST = 0.25 # Thrust magnitude per action (slightly higher for control authority) # Target zone (box) constants TARGET_X_MIN = 0.7 # Target box left edge TARGET_X_MAX = 0.9 # Target box right edge TARGET_Y_MIN = 0.3 # Target box bottom edge TARGET_Y_MAX = 0.7 # Target box top edge TARGET_REWARD = 2.0 # Bonus reward for being in target zone TARGET_SPAWN_DELAY = 50 # Steps before target zone appears (after wind starts) # Stabilization and shaping DRAG_COEFF = 0.3 # Linear velocity drag coefficient SPEED_PENALTY_COEFF = 0.05 # Penalize high speeds to encourage smooth control EDGE_MARGIN = 0.06 # Margin near boundaries where penalty increases EDGE_PENALTY_COEFF = 0.5 # Strength of boundary proximity penalty class DroneWindEnv(gym.Env): """ A 2D drone environment with dynamic wind. Observation: [x, y, vx, vy, wind_x, wind_y] Action: Discrete(5) - 0: no thrust, 1: up, 2: down, 3: left, 4: right """ metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 10} def __init__(self): super().__init__() # Observation space: [x, y, vx, vy, wind_x, wind_y] self.observation_space = spaces.Box( low=np.array([POSITION_MIN, POSITION_MIN, -MAX_VEL, -MAX_VEL, -WIND_MAX, -WIND_MAX], dtype=np.float32), high=np.array([POSITION_MAX, POSITION_MAX, MAX_VEL, MAX_VEL, WIND_MAX, WIND_MAX], dtype=np.float32), dtype=np.float32 ) # Action space: 5 discrete thrust directions self.action_space = spaces.Discrete(5) # Internal state self.x: float = 0.0 self.y: float = 0.0 self.vx: float = 0.0 self.vy: float = 0.0 self.wind_x: float = 0.0 self.wind_y: float = 0.0 self.wind_target_x: float = 0.0 self.wind_target_y: float = 0.0 self.step_count: int = 0 def reset( self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None ) -> Tuple[np.ndarray, Dict[str, Any]]: """ Reset the environment to initial state. Args: seed: Optional random seed options: Optional reset options Returns: observation: Initial observation array info: Empty info dict """ # Always call super().reset to ensure seeding and np_random are initialized super().reset(seed=seed) # Initialize state self.x = 0.1 self.y = 0.5 self.vx = 0.0 self.vy = 0.0 self.wind_x = 0.0 self.wind_y = 0.0 self.wind_target_x = 0.0 self.wind_target_y = 0.0 self.step_count = 0 # Build observation obs = self._get_observation() info = {} return obs, info def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]: """ Execute one environment step. Args: action: Discrete action (0-4) Returns: observation: New observation array reward: Reward for this step terminated: Whether episode ended due to boundary crash truncated: Whether episode ended due to max steps info: Info dict with step_count """ # Increment step count self.step_count += 1 # Update wind model self._update_wind() # Apply physics update self._apply_physics(action) # Compute reward base_reward = 1.0 # Survival reward # Check if drone is in target zone (only if target has spawned) target_spawned = self.step_count >= TARGET_SPAWN_DELAY in_target = False if target_spawned: in_target = ( TARGET_X_MIN <= self.x <= TARGET_X_MAX and TARGET_Y_MIN <= self.y <= TARGET_Y_MAX ) target_bonus = TARGET_REWARD if in_target else 0.0 # Speed penalty (discourage excessive velocity) speed_sq = self.vx * self.vx + self.vy * self.vy speed_penalty = -SPEED_PENALTY_COEFF * float(speed_sq) # Boundary proximity penalty (discourage hovering near walls) dist_left = self.x - POSITION_MIN dist_right = POSITION_MAX - self.x dist_bottom = self.y - POSITION_MIN dist_top = POSITION_MAX - self.y min_dist = min(dist_left, dist_right, dist_bottom, dist_top) edge_penalty = 0.0 if min_dist < EDGE_MARGIN: edge_penalty = -EDGE_PENALTY_COEFF * (EDGE_MARGIN - float(min_dist)) / EDGE_MARGIN reward = base_reward + target_bonus + speed_penalty + edge_penalty # Check termination (boundary crash) terminated = ( self.x <= POSITION_MIN or self.x >= POSITION_MAX or self.y <= POSITION_MIN or self.y >= POSITION_MAX ) # Check truncation (max steps) truncated = self.step_count >= MAX_STEPS # Build observation obs = self._get_observation() # Check if in target zone (only if target has spawned) target_spawned = self.step_count >= TARGET_SPAWN_DELAY in_target = False if target_spawned: in_target = ( TARGET_X_MIN <= self.x <= TARGET_X_MAX and TARGET_Y_MIN <= self.y <= TARGET_Y_MAX ) info = {"step_count": self.step_count, "in_target": in_target, "target_spawned": target_spawned} return obs, reward, terminated, truncated, info def _update_wind(self) -> None: """Update wind by smoothly moving toward target, resampling target periodically.""" # Resample wind target every WIND_TARGET_INTERVAL steps if self.step_count % WIND_TARGET_INTERVAL == 0: self.wind_target_x = self.np_random.uniform(-WIND_MAX, WIND_MAX) self.wind_target_y = self.np_random.uniform(-WIND_MAX, WIND_MAX) # Smoothly interpolate wind toward target self.wind_x += WIND_SMOOTHING * (self.wind_target_x - self.wind_x) self.wind_y += WIND_SMOOTHING * (self.wind_target_y - self.wind_y) # Clamp wind to bounds self.wind_x = np.clip(self.wind_x, -WIND_MAX, WIND_MAX) self.wind_y = np.clip(self.wind_y, -WIND_MAX, WIND_MAX) def _apply_physics(self, action: int) -> None: """Apply physics update: convert action to thrust, update velocity and position.""" # Convert action to thrust vector if action == 0: # No thrust ax, ay = 0.0, 0.0 elif action == 1: # Thrust up ax, ay = 0.0, THRUST elif action == 2: # Thrust down ax, ay = 0.0, -THRUST elif action == 3: # Thrust left ax, ay = -THRUST, 0.0 elif action == 4: # Thrust right ax, ay = THRUST, 0.0 else: raise ValueError(f"Invalid action: {action}. Must be in [0, 4]") # Update velocity with thrust and wind self.vx = self.vx + ax + self.wind_x * DT self.vy = self.vy + ay + self.wind_y * DT # Apply linear drag (proportional to velocity) for stability self.vx -= DRAG_COEFF * self.vx * DT self.vy -= DRAG_COEFF * self.vy * DT # Clamp velocity self.vx = np.clip(self.vx, -MAX_VEL, MAX_VEL) self.vy = np.clip(self.vy, -MAX_VEL, MAX_VEL) # Update position self.x = self.x + self.vx * DT self.y = self.y + self.vy * DT # Clamp position to bounds self.x = np.clip(self.x, POSITION_MIN, POSITION_MAX) self.y = np.clip(self.y, POSITION_MIN, POSITION_MAX) def _get_observation(self) -> np.ndarray: """Build observation array from current state.""" return np.array( [self.x, self.y, self.vx, self.vy, self.wind_x, self.wind_y], dtype=np.float32 ) def render(self) -> None: """ Render the environment state (stub implementation for Phase 1). Prints state to stdout. """ print( f"Step {self.step_count}: " f"x={self.x:.2f}, y={self.y:.2f}, " f"vx={self.vx:.2f}, vy={self.vy:.2f}, " f"wind=({self.wind_x:.2f}, {self.wind_y:.2f})" ) def make_drone_env() -> DroneWindEnv: """Helper function to create a DroneWindEnv instance.""" return DroneWindEnv() if __name__ == "__main__": # Manual test block print("Testing DroneWindEnv...") print("=" * 60) env = make_drone_env() obs, info = env.reset(seed=42) print(f"Initial observation: {obs}") print() for t in range(200): action = env.action_space.sample() obs, reward, terminated, truncated, info = env.step(action) env.render() if terminated: print(f"\nEpisode terminated at step {t} (boundary crash)") break if truncated: print(f"\nEpisode truncated at step {t} (max steps reached)") break print("=" * 60) print("Test completed!")