Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import gymnasium as gym | |
| from gymnasium import spaces | |
| import pybullet as p | |
| from PyFlyt.core import Aviary | |
| from PyFlyt.core.drones import QuadX | |
| class Drone3DEnv(gym.Env): | |
| metadata = {"render_modes": ["human"]} | |
| def __init__(self, render_mode=None): | |
| super().__init__() | |
| self.render_mode = render_mode | |
| # Constants | |
| self.BOUNDS = np.array([[-5, -5, 0], [5, 5, 10]]) # x_min, y_min, z_min, x_max, y_max, z_max | |
| self.TARGET_BOUNDS = np.array([[-4, -4, 1], [4, 4, 9]]) | |
| self.MAX_STEPS = 1000 | |
| self.WIND_SCALE = 1.0 | |
| # Initialize PyFlyt Aviary | |
| self.start_pos = np.array([[0.0, 0.0, 1.0]]) | |
| self.start_orn = np.array([[0.0, 0.0, 0.0]]) | |
| self.aviary = Aviary( | |
| start_pos=self.start_pos, | |
| start_orn=self.start_orn, | |
| drone_type="quadx", | |
| drone_options=[{ | |
| "model_dir": "/home/ylop/Documents/drone go brr/Drone-go-brrrrr/Drone-go-brrrrr/my_models", | |
| "drone_model": "cf2x_big" | |
| }], | |
| render=self.render_mode is not None | |
| ) | |
| # Action Space: 4 motors (0..1) | |
| self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(4,), dtype=np.float32) | |
| # Observation Space: 12 dim | |
| self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(12,), dtype=np.float32) | |
| self.wind_vector = np.zeros(3) | |
| self.target_pos = np.zeros(3) | |
| self.step_count = 0 | |
| # Visual IDs | |
| self.wind_arrow_ids = [] | |
| self.target_visual_id = None | |
| self.bound_box_ids = [] | |
| self.path_points = [] | |
| self.text_ids = [] | |
| def reset(self, seed=None, options=None): | |
| super().reset(seed=seed) | |
| self.aviary.reset() | |
| self.step_count = 0 | |
| self.path_points = [] | |
| # Clear text | |
| for uid in self.text_ids: | |
| p.removeUserDebugItem(uid) | |
| self.text_ids = [] | |
| # Disable PyBullet GUI sidebars but enable mouse | |
| if self.render_mode == "human": | |
| p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) | |
| p.configureDebugVisualizer(p.COV_ENABLE_MOUSE_PICKING, 1) | |
| # Zoom out camera | |
| p.resetDebugVisualizerCamera( | |
| cameraDistance=12.0, | |
| cameraYaw=45, | |
| cameraPitch=-30, | |
| cameraTargetPosition=[0, 0, 5] | |
| ) | |
| # Random Target | |
| self.target_pos = np.random.uniform(self.TARGET_BOUNDS[0], self.TARGET_BOUNDS[1]) | |
| # Reset Wind (Random Walk start) | |
| self.wind_vector = np.random.uniform(-1, 1, 3) * self.WIND_SCALE | |
| self.wind_vector[2] = 0 | |
| # Visuals | |
| if self.render_mode == "human": | |
| self._draw_bounds() | |
| self._draw_target() | |
| self._init_wind_field() | |
| return self._get_obs(), {} | |
| def step(self, action): | |
| self.step_count += 1 | |
| # 1. Update Wind (Chaotic) | |
| self.wind_vector += np.random.normal(0, 0.1, 3) | |
| self.wind_vector = np.clip(self.wind_vector, -5, 5) | |
| # 2. Update Target (Moving) | |
| # Random walk for target | |
| target_move = np.random.normal(0, 0.05, 3) | |
| self.target_pos += target_move | |
| self.target_pos = np.clip(self.target_pos, self.TARGET_BOUNDS[0], self.TARGET_BOUNDS[1]) | |
| # 3. Apply Wind Force | |
| drone = self.aviary.drones[0] | |
| p.applyExternalForce( | |
| drone.Id, | |
| -1, # Base link | |
| forceObj=self.wind_vector, | |
| posObj=[0, 0, 0], | |
| flags=p.LINK_FRAME | |
| ) | |
| # 4. Step Physics | |
| motor_command = (action + 1.0) / 2.0 | |
| self.aviary.set_all_setpoints(motor_command.reshape(1, -1)) | |
| self.aviary.step() | |
| # 5. Get State | |
| state = self.aviary.state(0) | |
| pos = state[-1] | |
| orn = state[-3] | |
| lin_vel = state[-2] | |
| ang_vel = state[-4] | |
| # 6. Compute Reward | |
| reward = self._compute_reward(pos, orn, lin_vel, ang_vel) | |
| # 7. Check Termination | |
| terminated = False | |
| # Out of bounds | |
| if (pos < self.BOUNDS[0]).any() or (pos > self.BOUNDS[1]).any(): | |
| terminated = True | |
| reward -= 100.0 # Massive penalty for leaving | |
| # Crash (ground) | |
| if pos[2] < 0.1: | |
| terminated = True | |
| reward -= 100.0 | |
| # Target Reached | |
| dist = np.linalg.norm(pos - self.target_pos) | |
| if dist < 0.5: | |
| terminated = True | |
| reward += 100.0 | |
| truncated = self.step_count >= self.MAX_STEPS | |
| # 8. Update Visuals | |
| if self.render_mode == "human": | |
| self._update_wind_field() | |
| self._draw_target() # Update target position | |
| # Flight Path | |
| self.path_points.append(pos) | |
| if len(self.path_points) > 1: | |
| p.addUserDebugLine(self.path_points[-2], self.path_points[-1], [0, 1, 0], 2, 0) | |
| # Info Overlay | |
| # Clear old text | |
| for uid in self.text_ids: | |
| p.removeUserDebugItem(uid) | |
| self.text_ids = [] | |
| wind_speed = np.linalg.norm(self.wind_vector) | |
| info_text = f"Wind: {wind_speed:.2f} m/s\nDrone: {pos.round(2)}\nTarget: {self.target_pos.round(2)}" | |
| # Draw text near the drone or fixed on screen (PyBullet text is 3D mostly, use text3d) | |
| # Or use addUserDebugText with textPosition | |
| uid = p.addUserDebugText(info_text, [pos[0], pos[1], pos[2] + 0.5], [0, 0, 0], textSize=1.5) | |
| self.text_ids.append(uid) | |
| return self._get_obs(), reward, terminated, truncated, {} | |
| def _get_obs(self): | |
| state = self.aviary.state(0) | |
| pos = state[-1] | |
| orn = state[-3] | |
| lin_vel = state[-2] | |
| ang_vel = state[-4] | |
| # Relative position to target | |
| rel_pos = self.target_pos - pos | |
| # [rel_x, rel_y, rel_z, roll, pitch, yaw, u, v, w, p, q, r] | |
| obs = np.concatenate([rel_pos, orn, lin_vel, ang_vel]) | |
| return obs.astype(np.float32) | |
| def _compute_reward(self, pos, orn, lin_vel, ang_vel): | |
| # 1. Stability (Smoothness) | |
| # Penalize high angular velocity (spinning/shaking) | |
| r_smooth = -0.1 * np.linalg.norm(ang_vel) | |
| # Penalize extreme angles (flipping) | |
| deviation = np.linalg.norm(orn[0:2]) | |
| r_stability = -deviation | |
| # 2. Target | |
| dist = np.linalg.norm(pos - self.target_pos) | |
| r_target = -dist * 0.5 # Stronger distance penalty | |
| # 3. Boundary Safety (Proximity Penalty) | |
| # Calculate distance to nearest wall | |
| d_min = np.min([ | |
| pos[0] - self.BOUNDS[0][0], | |
| self.BOUNDS[1][0] - pos[0], | |
| pos[1] - self.BOUNDS[0][1], | |
| self.BOUNDS[1][1] - pos[1], | |
| pos[2] - self.BOUNDS[0][2], | |
| self.BOUNDS[1][2] - pos[2] | |
| ]) | |
| # Exponential penalty as it gets closer than 1.0m | |
| r_boundary = 0.0 | |
| if d_min < 1.0: | |
| r_boundary = -np.exp(1.0 - d_min) # -1 at 1m, -e at 0m | |
| return r_stability + r_smooth + r_target + r_boundary | |
| def _init_wind_field(self): | |
| # Clear old arrows | |
| for uid, _ in self.wind_arrow_ids: | |
| p.removeUserDebugItem(uid) | |
| self.wind_arrow_ids = [] | |
| # Create grid of arrows | |
| # Sparse grid: 3x3x3 | |
| xs = np.linspace(self.BOUNDS[0][0], self.BOUNDS[1][0], 4) | |
| ys = np.linspace(self.BOUNDS[0][1], self.BOUNDS[1][1], 4) | |
| zs = np.linspace(self.BOUNDS[0][2] + 1, self.BOUNDS[1][2] - 1, 3) | |
| for x in xs: | |
| for y in ys: | |
| for z in zs: | |
| start_pos = [x, y, z] | |
| # Initial draw (will be updated) | |
| # Use a placeholder end_pos | |
| end_pos = [x+0.1, y, z] | |
| uid = p.addUserDebugLine(start_pos, end_pos, [1, 1, 0], 2) | |
| self.wind_arrow_ids.append((uid, start_pos)) | |
| def _update_wind_field(self): | |
| # Update all arrows to point in wind direction | |
| # Scale wind for visual length | |
| scale = 0.5 | |
| wind_end_offset = self.wind_vector * scale | |
| for uid, start_pos in self.wind_arrow_ids: | |
| end_pos = [ | |
| start_pos[0] + wind_end_offset[0], | |
| start_pos[1] + wind_end_offset[1], | |
| start_pos[2] + wind_end_offset[2] | |
| ] | |
| # Color based on intensity? | |
| intensity = np.linalg.norm(self.wind_vector) | |
| # Map 0-5 to Color (Green to Red) | |
| t = np.clip(intensity / 5.0, 0, 1) | |
| color = [t, 1.0 - t, 0] | |
| p.addUserDebugLine( | |
| start_pos, | |
| end_pos, | |
| lineColorRGB=color, | |
| lineWidth=2, | |
| replaceItemUniqueId=uid | |
| ) | |
| def _draw_bounds(self): | |
| min_x, min_y, min_z = self.BOUNDS[0] | |
| max_x, max_y, max_z = self.BOUNDS[1] | |
| corners = [ | |
| [min_x, min_y, min_z], [max_x, min_y, min_z], | |
| [min_x, max_y, min_z], [max_x, max_y, min_z], | |
| [min_x, min_y, max_z], [max_x, min_y, max_z], | |
| [min_x, max_y, max_z], [max_x, max_y, max_z] | |
| ] | |
| lines = [ | |
| (0, 1), (1, 3), (3, 2), (2, 0), # Bottom | |
| (4, 5), (5, 7), (7, 6), (6, 4), # Top | |
| (0, 4), (1, 5), (2, 6), (3, 7) # Vertical | |
| ] | |
| for start, end in lines: | |
| p.addUserDebugLine(corners[start], corners[end], [1, 0, 0], 4) | |
| def _draw_target(self): | |
| if self.target_visual_id is not None: | |
| p.removeBody(self.target_visual_id) | |
| visual_shape = p.createVisualShape(shapeType=p.GEOM_SPHERE, radius=0.3, rgbaColor=[1, 0, 1, 0.7]) | |
| self.target_visual_id = p.createMultiBody(baseVisualShapeIndex=visual_shape, basePosition=self.target_pos) | |
| def close(self): | |
| self.aviary.disconnect() | |