| """ |
| Synthetic data generation for training and testing the FSD model. |
| Generates realistic simulated scenarios including: |
| - Camera images (synthetic patterns representing road scenes) |
| - Ultrasonic distance readings |
| - Ground truth labels for all perception/planning/control tasks |
| """ |
|
|
| import torch |
| import numpy as np |
| from typing import Dict, Optional, Tuple |
| import math |
|
|
| from .config import VehicleConfig |
|
|
|
|
| class FSDDataGenerator: |
| """ |
| Generates synthetic training data for the FSD model. |
| Can produce: |
| - Simulated camera images |
| - Ultrasonic distance readings with noise |
| - Ground truth detection heatmaps |
| - Ground truth segmentation maps |
| - Ground truth occupancy grids |
| - Ground truth waypoints and control commands |
| """ |
| |
| def __init__( |
| self, |
| vehicle_config: Optional[VehicleConfig] = None, |
| bev_size: int = 200, |
| image_size: Tuple[int, int] = (480, 640), |
| ): |
| if vehicle_config is None: |
| vehicle_config = VehicleConfig() |
| self.config = vehicle_config |
| self.sensor_config = vehicle_config.sensor_config |
| self.bev_size = bev_size |
| self.image_size = image_size |
| |
| def generate_batch( |
| self, |
| batch_size: int = 4, |
| scenario: str = "urban", |
| device: str = "cpu", |
| ) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]: |
| """ |
| Generate a batch of synthetic data. |
| |
| Args: |
| batch_size: Number of samples |
| scenario: "urban", "highway", "parking", "intersection" |
| device: torch device |
| |
| Returns: |
| inputs: Dict of model inputs |
| targets: Dict of ground truth labels |
| """ |
| B = batch_size |
| N_cam = self.sensor_config.num_cameras |
| N_us = self.sensor_config.num_ultrasonics |
| H, W = self.image_size |
| |
| |
| camera_images = self._generate_camera_images(B, N_cam, H, W, scenario) |
| camera_intrinsics = self._generate_intrinsics(B, N_cam) |
| camera_extrinsics = self._generate_extrinsics(B, N_cam) |
| |
| |
| us_distances, us_placements = self._generate_ultrasonic_data(B, N_us, scenario) |
| |
| |
| ego_state = self._generate_ego_state(B, scenario) |
| |
| |
| nav_command = torch.randint(0, 10, (B,)) |
| |
| |
| targets = self._generate_targets(B, scenario) |
| |
| inputs = { |
| "camera_images": camera_images.to(device), |
| "camera_intrinsics": camera_intrinsics.to(device), |
| "camera_extrinsics": camera_extrinsics.to(device), |
| "ultrasonic_distances": us_distances.to(device), |
| "ultrasonic_placements": us_placements.to(device), |
| "ego_state": ego_state.to(device), |
| "nav_command": nav_command.to(device), |
| } |
| |
| targets = {k: v.to(device) for k, v in targets.items()} |
| |
| return inputs, targets |
| |
| def _generate_camera_images( |
| self, B: int, N: int, H: int, W: int, scenario: str |
| ) -> torch.Tensor: |
| """Generate synthetic camera images with road-like patterns.""" |
| images = torch.zeros(B, N, 3, H, W) |
| |
| for b in range(B): |
| for n in range(N): |
| |
| sky_color = torch.tensor([0.5, 0.7, 0.9]) + torch.randn(3) * 0.05 |
| images[b, n, :, :H//3, :] = sky_color.view(3, 1, 1) |
| |
| |
| road_gray = 0.3 + torch.randn(1) * 0.05 |
| images[b, n, :, H//3:, :] = road_gray |
| |
| |
| lane_y = torch.arange(H//3, H) |
| for lane_x in [W//4, W//2, 3*W//4]: |
| x_start = max(0, lane_x - 2) |
| x_end = min(W, lane_x + 2) |
| images[b, n, :, H//3:, x_start:x_end] = 0.9 |
| |
| |
| if scenario in ["urban", "intersection"]: |
| num_objects = np.random.randint(1, 5) |
| for _ in range(num_objects): |
| obj_h = np.random.randint(10, 40) |
| obj_w = np.random.randint(10, 30) |
| obj_y = np.random.randint(H//4, H - obj_h) |
| obj_x = np.random.randint(0, W - obj_w) |
| color = torch.rand(3) |
| images[b, n, :, obj_y:obj_y+obj_h, obj_x:obj_x+obj_w] = color.view(3, 1, 1) |
| |
| |
| images[b, n] += torch.randn_like(images[b, n]) * 0.02 |
| |
| return images.clamp(0, 1) |
| |
| def _generate_intrinsics(self, B: int, N: int) -> torch.Tensor: |
| """Generate camera intrinsic matrices from config.""" |
| K = torch.zeros(B, N, 3, 3) |
| for i, cam in enumerate(self.sensor_config.cameras): |
| if i >= N: |
| break |
| K[:, i, 0, 0] = cam.fx |
| K[:, i, 1, 1] = cam.fy |
| K[:, i, 0, 2] = cam.cx |
| K[:, i, 1, 2] = cam.cy |
| K[:, i, 2, 2] = 1.0 |
| return K |
| |
| def _generate_extrinsics(self, B: int, N: int) -> torch.Tensor: |
| """Generate camera extrinsic matrices from config.""" |
| T = torch.zeros(B, N, 4, 4) |
| for i, cam in enumerate(self.sensor_config.cameras): |
| if i >= N: |
| break |
| T_np = cam.placement.to_transform_matrix() |
| T[:, i] = torch.from_numpy(T_np).float() |
| return T |
| |
| def _generate_ultrasonic_data( |
| self, B: int, N: int, scenario: str |
| ) -> Tuple[torch.Tensor, torch.Tensor]: |
| """Generate ultrasonic distance readings and placements.""" |
| placements = torch.zeros(B, N, 6) |
| for i, us in enumerate(self.sensor_config.ultrasonics): |
| if i >= N: |
| break |
| p = us.placement |
| placements[:, i] = torch.tensor([p.x, p.y, p.z, p.yaw, p.pitch, p.roll]) |
| |
| |
| if scenario == "parking": |
| base_dist = torch.rand(B, N, 1) * 2.0 + 0.5 |
| elif scenario == "urban": |
| base_dist = torch.rand(B, N, 1) * 3.0 + 1.0 |
| elif scenario == "highway": |
| base_dist = torch.rand(B, N, 1) * 4.0 + 2.0 |
| else: |
| base_dist = torch.rand(B, N, 1) * 3.5 + 0.5 |
| |
| |
| noise = torch.randn_like(base_dist) * 0.01 |
| distances = (base_dist + noise).clamp(0.02, 5.0) |
| |
| return distances, placements |
| |
| def _generate_ego_state(self, B: int, scenario: str) -> torch.Tensor: |
| """Generate ego vehicle state [speed, accel, steer, yaw_rate, x, y].""" |
| states = torch.zeros(B, 6) |
| |
| max_speed = self.config.max_speed_ms |
| |
| if scenario == "parking": |
| states[:, 0] = torch.rand(B) * 2.0 |
| states[:, 1] = torch.randn(B) * 0.5 |
| states[:, 2] = torch.randn(B) * 0.3 |
| elif scenario == "highway": |
| states[:, 0] = torch.rand(B) * max_speed * 0.3 + max_speed * 0.7 |
| states[:, 1] = torch.randn(B) * 0.3 |
| states[:, 2] = torch.randn(B) * 0.05 |
| elif scenario == "intersection": |
| states[:, 0] = torch.rand(B) * max_speed * 0.5 |
| states[:, 1] = torch.randn(B) * 1.0 |
| states[:, 2] = torch.randn(B) * 0.2 |
| else: |
| states[:, 0] = torch.rand(B) * max_speed * 0.7 |
| states[:, 1] = torch.randn(B) * 0.5 |
| states[:, 2] = torch.randn(B) * 0.15 |
| |
| states[:, 3] = torch.randn(B) * 0.1 |
| states[:, 4] = torch.randn(B) * 10 |
| states[:, 5] = torch.randn(B) * 5 |
| |
| return states |
| |
| def _generate_targets(self, B: int, scenario: str) -> Dict[str, torch.Tensor]: |
| """Generate all ground truth labels.""" |
| bev = self.bev_size |
| |
| targets = {} |
| |
| |
| heatmap = torch.zeros(B, 10, bev, bev) |
| for b in range(B): |
| num_obj = np.random.randint(2, 8) |
| for _ in range(num_obj): |
| cls = np.random.randint(0, 10) |
| cx, cy = np.random.randint(20, bev-20), np.random.randint(20, bev-20) |
| sigma = np.random.uniform(2, 6) |
| y, x = torch.meshgrid(torch.arange(bev), torch.arange(bev), indexing='ij') |
| gaussian = torch.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2)) |
| heatmap[b, cls] = torch.max(heatmap[b, cls], gaussian) |
| targets["gt_heatmap"] = heatmap |
| |
| |
| seg = torch.zeros(B, bev, bev, dtype=torch.long) |
| seg[:, :, :] = 0 |
| seg[:, bev//4:3*bev//4, :] = 1 |
| for b in range(B): |
| |
| for lane_x in [bev//4, bev//2, 3*bev//4]: |
| seg[b, :, max(0,lane_x-1):min(bev,lane_x+1)] = 2 |
| targets["gt_segmentation"] = seg |
| |
| |
| occ = torch.zeros(B, 1, bev, bev) |
| occ[:, :, :bev//4, :] = 1.0 |
| occ[:, :, 3*bev//4:, :] = 1.0 |
| targets["gt_occupancy"] = occ |
| |
| |
| if scenario == "parking": |
| targets["gt_behavior"] = torch.full((B,), 7, dtype=torch.long) |
| elif scenario == "highway": |
| targets["gt_behavior"] = torch.full((B,), 0, dtype=torch.long) |
| else: |
| targets["gt_behavior"] = torch.randint(0, 5, (B,)) |
| |
| |
| wp = torch.zeros(B, 20, 4) |
| for t in range(20): |
| wp[:, t, 0] = t * 0.5 |
| wp[:, t, 1] = torch.randn(B) * 0.1 |
| wp[:, t, 2] = torch.randn(B) * 0.02 |
| wp[:, t, 3] = self.config.max_speed_ms * 0.7 |
| targets["gt_waypoints"] = wp |
| |
| |
| targets["gt_steering"] = torch.randn(B) * 5.0 |
| targets["gt_throttle"] = torch.rand(B) * 0.5 + 0.2 |
| targets["gt_brake"] = torch.zeros(B) |
| |
| return targets |
| |
| def __len__(self): |
| return 1000 |
| |
| def __getitem__(self, idx): |
| """Dataset-style access for DataLoader compatibility.""" |
| inputs, targets = self.generate_batch(batch_size=1) |
| |
| inputs = {k: v.squeeze(0) for k, v in inputs.items()} |
| targets = {k: v.squeeze(0) for k, v in targets.items()} |
| return inputs, targets |
|
|