""" Synthetic data generation for training and testing the FSD model. Generates realistic simulated scenarios including: - Camera images (synthetic patterns representing road scenes) - Ultrasonic distance readings - Ground truth labels for all perception/planning/control tasks """ import torch import numpy as np from typing import Dict, Optional, Tuple import math from .config import VehicleConfig class FSDDataGenerator: """ Generates synthetic training data for the FSD model. Can produce: - Simulated camera images - Ultrasonic distance readings with noise - Ground truth detection heatmaps - Ground truth segmentation maps - Ground truth occupancy grids - Ground truth waypoints and control commands """ def __init__( self, vehicle_config: Optional[VehicleConfig] = None, bev_size: int = 200, image_size: Tuple[int, int] = (480, 640), # H, W ): if vehicle_config is None: vehicle_config = VehicleConfig() self.config = vehicle_config self.sensor_config = vehicle_config.sensor_config self.bev_size = bev_size self.image_size = image_size def generate_batch( self, batch_size: int = 4, scenario: str = "urban", device: str = "cpu", ) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]: """ Generate a batch of synthetic data. Args: batch_size: Number of samples scenario: "urban", "highway", "parking", "intersection" device: torch device Returns: inputs: Dict of model inputs targets: Dict of ground truth labels """ B = batch_size N_cam = self.sensor_config.num_cameras N_us = self.sensor_config.num_ultrasonics H, W = self.image_size # ── Generate Camera Data ── camera_images = self._generate_camera_images(B, N_cam, H, W, scenario) camera_intrinsics = self._generate_intrinsics(B, N_cam) camera_extrinsics = self._generate_extrinsics(B, N_cam) # ── Generate Ultrasonic Data ── us_distances, us_placements = self._generate_ultrasonic_data(B, N_us, scenario) # ── Generate Ego State ── ego_state = self._generate_ego_state(B, scenario) # ── Generate Navigation Command ── nav_command = torch.randint(0, 10, (B,)) # ── Generate Ground Truth ── targets = self._generate_targets(B, scenario) inputs = { "camera_images": camera_images.to(device), "camera_intrinsics": camera_intrinsics.to(device), "camera_extrinsics": camera_extrinsics.to(device), "ultrasonic_distances": us_distances.to(device), "ultrasonic_placements": us_placements.to(device), "ego_state": ego_state.to(device), "nav_command": nav_command.to(device), } targets = {k: v.to(device) for k, v in targets.items()} return inputs, targets def _generate_camera_images( self, B: int, N: int, H: int, W: int, scenario: str ) -> torch.Tensor: """Generate synthetic camera images with road-like patterns.""" images = torch.zeros(B, N, 3, H, W) for b in range(B): for n in range(N): # Sky region (top half) sky_color = torch.tensor([0.5, 0.7, 0.9]) + torch.randn(3) * 0.05 images[b, n, :, :H//3, :] = sky_color.view(3, 1, 1) # Road region (bottom half) road_gray = 0.3 + torch.randn(1) * 0.05 images[b, n, :, H//3:, :] = road_gray # Lane lines (white stripes) lane_y = torch.arange(H//3, H) for lane_x in [W//4, W//2, 3*W//4]: x_start = max(0, lane_x - 2) x_end = min(W, lane_x + 2) images[b, n, :, H//3:, x_start:x_end] = 0.9 # Add some random "objects" (colored rectangles) if scenario in ["urban", "intersection"]: num_objects = np.random.randint(1, 5) for _ in range(num_objects): obj_h = np.random.randint(10, 40) obj_w = np.random.randint(10, 30) obj_y = np.random.randint(H//4, H - obj_h) obj_x = np.random.randint(0, W - obj_w) color = torch.rand(3) images[b, n, :, obj_y:obj_y+obj_h, obj_x:obj_x+obj_w] = color.view(3, 1, 1) # Add noise images[b, n] += torch.randn_like(images[b, n]) * 0.02 return images.clamp(0, 1) def _generate_intrinsics(self, B: int, N: int) -> torch.Tensor: """Generate camera intrinsic matrices from config.""" K = torch.zeros(B, N, 3, 3) for i, cam in enumerate(self.sensor_config.cameras): if i >= N: break K[:, i, 0, 0] = cam.fx K[:, i, 1, 1] = cam.fy K[:, i, 0, 2] = cam.cx K[:, i, 1, 2] = cam.cy K[:, i, 2, 2] = 1.0 return K def _generate_extrinsics(self, B: int, N: int) -> torch.Tensor: """Generate camera extrinsic matrices from config.""" T = torch.zeros(B, N, 4, 4) for i, cam in enumerate(self.sensor_config.cameras): if i >= N: break T_np = cam.placement.to_transform_matrix() T[:, i] = torch.from_numpy(T_np).float() return T def _generate_ultrasonic_data( self, B: int, N: int, scenario: str ) -> Tuple[torch.Tensor, torch.Tensor]: """Generate ultrasonic distance readings and placements.""" placements = torch.zeros(B, N, 6) for i, us in enumerate(self.sensor_config.ultrasonics): if i >= N: break p = us.placement placements[:, i] = torch.tensor([p.x, p.y, p.z, p.yaw, p.pitch, p.roll]) # Generate realistic distance readings based on scenario if scenario == "parking": base_dist = torch.rand(B, N, 1) * 2.0 + 0.5 # 0.5-2.5m close range elif scenario == "urban": base_dist = torch.rand(B, N, 1) * 3.0 + 1.0 # 1-4m elif scenario == "highway": base_dist = torch.rand(B, N, 1) * 4.0 + 2.0 # 2-6m (clamped later) else: # intersection base_dist = torch.rand(B, N, 1) * 3.5 + 0.5 # Add realistic noise noise = torch.randn_like(base_dist) * 0.01 distances = (base_dist + noise).clamp(0.02, 5.0) return distances, placements def _generate_ego_state(self, B: int, scenario: str) -> torch.Tensor: """Generate ego vehicle state [speed, accel, steer, yaw_rate, x, y].""" states = torch.zeros(B, 6) max_speed = self.config.max_speed_ms if scenario == "parking": states[:, 0] = torch.rand(B) * 2.0 # speed: 0-2 m/s states[:, 1] = torch.randn(B) * 0.5 # accel states[:, 2] = torch.randn(B) * 0.3 # steer elif scenario == "highway": states[:, 0] = torch.rand(B) * max_speed * 0.3 + max_speed * 0.7 # 70-100% max states[:, 1] = torch.randn(B) * 0.3 states[:, 2] = torch.randn(B) * 0.05 # minimal steering elif scenario == "intersection": states[:, 0] = torch.rand(B) * max_speed * 0.5 # 0-50% max states[:, 1] = torch.randn(B) * 1.0 states[:, 2] = torch.randn(B) * 0.2 else: # urban states[:, 0] = torch.rand(B) * max_speed * 0.7 # 0-70% max states[:, 1] = torch.randn(B) * 0.5 states[:, 2] = torch.randn(B) * 0.15 states[:, 3] = torch.randn(B) * 0.1 # yaw rate states[:, 4] = torch.randn(B) * 10 # x position states[:, 5] = torch.randn(B) * 5 # y position return states def _generate_targets(self, B: int, scenario: str) -> Dict[str, torch.Tensor]: """Generate all ground truth labels.""" bev = self.bev_size targets = {} # Object detection heatmap (10 classes) heatmap = torch.zeros(B, 10, bev, bev) for b in range(B): num_obj = np.random.randint(2, 8) for _ in range(num_obj): cls = np.random.randint(0, 10) cx, cy = np.random.randint(20, bev-20), np.random.randint(20, bev-20) sigma = np.random.uniform(2, 6) y, x = torch.meshgrid(torch.arange(bev), torch.arange(bev), indexing='ij') gaussian = torch.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2)) heatmap[b, cls] = torch.max(heatmap[b, cls], gaussian) targets["gt_heatmap"] = heatmap # Segmentation (7 classes) seg = torch.zeros(B, bev, bev, dtype=torch.long) seg[:, :, :] = 0 # background seg[:, bev//4:3*bev//4, :] = 1 # drivable for b in range(B): # Lane lines for lane_x in [bev//4, bev//2, 3*bev//4]: seg[b, :, max(0,lane_x-1):min(bev,lane_x+1)] = 2 targets["gt_segmentation"] = seg # Occupancy grid occ = torch.zeros(B, 1, bev, bev) occ[:, :, :bev//4, :] = 1.0 # obstacles at edges occ[:, :, 3*bev//4:, :] = 1.0 targets["gt_occupancy"] = occ # Behavior labels if scenario == "parking": targets["gt_behavior"] = torch.full((B,), 7, dtype=torch.long) # park elif scenario == "highway": targets["gt_behavior"] = torch.full((B,), 0, dtype=torch.long) # keep lane else: targets["gt_behavior"] = torch.randint(0, 5, (B,)) # Waypoints (20 waypoints, each with x, y, heading, speed) wp = torch.zeros(B, 20, 4) for t in range(20): wp[:, t, 0] = t * 0.5 # x: forward 0.5m per step wp[:, t, 1] = torch.randn(B) * 0.1 # y: slight lateral variation wp[:, t, 2] = torch.randn(B) * 0.02 # heading: nearly straight wp[:, t, 3] = self.config.max_speed_ms * 0.7 # target speed targets["gt_waypoints"] = wp # Control commands targets["gt_steering"] = torch.randn(B) * 5.0 # degrees targets["gt_throttle"] = torch.rand(B) * 0.5 + 0.2 targets["gt_brake"] = torch.zeros(B) return targets def __len__(self): return 1000 # virtual dataset size def __getitem__(self, idx): """Dataset-style access for DataLoader compatibility.""" inputs, targets = self.generate_batch(batch_size=1) # Squeeze batch dim inputs = {k: v.squeeze(0) for k, v in inputs.items()} targets = {k: v.squeeze(0) for k, v in targets.items()} return inputs, targets