import numpy as np import gymnasium as gym from opensimplex import OpenSimplex import time # Try to import PyFlyt, else fallback try: from PyFlyt.gym_envs.quadx_envs.quadx_waypoints_env import QuadXWaypointsEnv PYFLYT_AVAILABLE = True except ImportError: PYFLYT_AVAILABLE = False # Create a dummy base class if PyFlyt is missing class QuadXWaypointsEnv(gym.Env): metadata = {"render_modes": ["human", "rgb_array"]} def __init__(self, render_mode=None): self.render_mode = render_mode class WindField: def __init__(self, seed=42, scale=0.1, speed=1.0): self.noise = OpenSimplex(seed=seed) self.scale = scale self.speed = speed self.time_offset = 0.0 def get_wind(self, x, y, z, dt): self.time_offset += dt * self.speed u = self.noise.noise4(x * self.scale, y * self.scale, z * self.scale, self.time_offset) v = self.noise.noise4(x * self.scale + 100, y * self.scale + 100, z * self.scale, self.time_offset) w = self.noise.noise4(x * self.scale + 200, y * self.scale + 200, z * self.scale, self.time_offset) return np.array([u, v, w]) class Drone3DEnv(gym.Env): def __init__(self, render_mode=None, wind_scale=10.0, wind_speed=1.0): super().__init__() self.render_mode = render_mode self.wind_field = WindField(scale=0.05, speed=wind_speed) self.wind_strength = wind_scale # Define spaces # Obs: [x, y, z, roll, pitch, yaw, u, v, w, p, q, r] self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(12,), dtype=np.float32) # Action: [motor1, motor2, motor3, motor4] or [thrust, roll, pitch, yaw] # We'll assume simple [thrust_x, thrust_y, thrust_z, yaw] for the mock self.action_space = gym.spaces.Box(low=-1, high=1, shape=(4,), dtype=np.float32) self.state = np.zeros(12) self.dt = 0.05 self.step_count = 0 self.max_steps = 1000 def reset(self, seed=None, options=None): super().reset(seed=seed) self.state = np.zeros(12) self.state[2] = 10.0 # Start at 10m height self.step_count = 0 # Randomize Target # x, y in [-5, 5], z in [5, 15] self.target = np.random.uniform(low=[-5, -5, 5], high=[5, 5, 15]) # Randomize Wind # We can just re-initialize the noise with a random seed new_seed = np.random.randint(0, 10000) self.wind_field = WindField(seed=new_seed, scale=0.05, speed=self.wind_field.speed) return self.state.astype(np.float32), {} def step(self, action): self.step_count += 1 # Unpack state pos = self.state[0:3] vel = self.state[6:9] # Get Wind raw_wind = self.wind_field.get_wind(pos[0], pos[1], pos[2], self.dt) wind_force = raw_wind * self.wind_strength # Simple Kinematics (Double Integrator) # Action is roughly acceleration command # We need enough authority to fight gravity (9.81) + wind # Let's say max thrust is 20 m/s^2 (~2G) # Action [-1, 1] -> [-20, 20] ? # No, usually thrust is positive 0..Max. # But for simplified "QuadX" control often inputs are roll/pitch/yaw/thrust. # Here we are abstracting to "Force/Accel command in 3D". # Let's map action [-1, 1] to [-15, 15] acceleration. accel = action[:3] * 15.0 # Gravity gravity = np.array([0, 0, -9.81]) # Total Force = Control + Wind + Gravity # Note: We REMOVED the "anti-gravity" offset. # The agent MUST output positive Z acceleration to hover. # If action[2] is 0, accel[2] is 0, and it falls due to gravity. total_accel = accel + wind_force + gravity # Update State vel += total_accel * self.dt pos += vel * self.dt # Floor collision if pos[2] < 0: pos[2] = 0 vel[2] = 0 # Crash stop # Drag (Damping) vel *= 0.95 self.state[0:3] = pos self.state[6:9] = vel # Reward: Stay close to Target dist = np.linalg.norm(pos - self.target) # Smoothness: Penalty for high velocity (instability) vel_mag = np.linalg.norm(vel) # Components: # 1. Distance Reward: Higher is better (closer to 0) r_dist = -dist # 2. Stability Penalty: Penalize erratic high-speed movements if far from target # But we need speed to get there. Let's just penalize extreme speed. r_vel = -0.01 * vel_mag # 3. Survival Reward: Bonus for not crashing r_survive = 0.1 reward = r_dist + r_vel + r_survive # Terminate if crashed or too far term = False # Let it crash and stay on floor trunc = self.step_count >= self.max_steps info = {"wind": wind_force, "target": self.target} return self.state.astype(np.float32), reward, term, trunc, info def render(self): # We will handle rendering in the demo script using matplotlib pass