Reality123b's picture
Add data.py
58174b0 verified
"""
Synthetic data generation for training and testing the FSD model.
Generates realistic simulated scenarios including:
- Camera images (synthetic patterns representing road scenes)
- Ultrasonic distance readings
- Ground truth labels for all perception/planning/control tasks
"""
import torch
import numpy as np
from typing import Dict, Optional, Tuple
import math
from .config import VehicleConfig
class FSDDataGenerator:
"""
Generates synthetic training data for the FSD model.
Can produce:
- Simulated camera images
- Ultrasonic distance readings with noise
- Ground truth detection heatmaps
- Ground truth segmentation maps
- Ground truth occupancy grids
- Ground truth waypoints and control commands
"""
def __init__(
self,
vehicle_config: Optional[VehicleConfig] = None,
bev_size: int = 200,
image_size: Tuple[int, int] = (480, 640), # H, W
):
if vehicle_config is None:
vehicle_config = VehicleConfig()
self.config = vehicle_config
self.sensor_config = vehicle_config.sensor_config
self.bev_size = bev_size
self.image_size = image_size
def generate_batch(
self,
batch_size: int = 4,
scenario: str = "urban",
device: str = "cpu",
) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]:
"""
Generate a batch of synthetic data.
Args:
batch_size: Number of samples
scenario: "urban", "highway", "parking", "intersection"
device: torch device
Returns:
inputs: Dict of model inputs
targets: Dict of ground truth labels
"""
B = batch_size
N_cam = self.sensor_config.num_cameras
N_us = self.sensor_config.num_ultrasonics
H, W = self.image_size
# ── Generate Camera Data ──
camera_images = self._generate_camera_images(B, N_cam, H, W, scenario)
camera_intrinsics = self._generate_intrinsics(B, N_cam)
camera_extrinsics = self._generate_extrinsics(B, N_cam)
# ── Generate Ultrasonic Data ──
us_distances, us_placements = self._generate_ultrasonic_data(B, N_us, scenario)
# ── Generate Ego State ──
ego_state = self._generate_ego_state(B, scenario)
# ── Generate Navigation Command ──
nav_command = torch.randint(0, 10, (B,))
# ── Generate Ground Truth ──
targets = self._generate_targets(B, scenario)
inputs = {
"camera_images": camera_images.to(device),
"camera_intrinsics": camera_intrinsics.to(device),
"camera_extrinsics": camera_extrinsics.to(device),
"ultrasonic_distances": us_distances.to(device),
"ultrasonic_placements": us_placements.to(device),
"ego_state": ego_state.to(device),
"nav_command": nav_command.to(device),
}
targets = {k: v.to(device) for k, v in targets.items()}
return inputs, targets
def _generate_camera_images(
self, B: int, N: int, H: int, W: int, scenario: str
) -> torch.Tensor:
"""Generate synthetic camera images with road-like patterns."""
images = torch.zeros(B, N, 3, H, W)
for b in range(B):
for n in range(N):
# Sky region (top half)
sky_color = torch.tensor([0.5, 0.7, 0.9]) + torch.randn(3) * 0.05
images[b, n, :, :H//3, :] = sky_color.view(3, 1, 1)
# Road region (bottom half)
road_gray = 0.3 + torch.randn(1) * 0.05
images[b, n, :, H//3:, :] = road_gray
# Lane lines (white stripes)
lane_y = torch.arange(H//3, H)
for lane_x in [W//4, W//2, 3*W//4]:
x_start = max(0, lane_x - 2)
x_end = min(W, lane_x + 2)
images[b, n, :, H//3:, x_start:x_end] = 0.9
# Add some random "objects" (colored rectangles)
if scenario in ["urban", "intersection"]:
num_objects = np.random.randint(1, 5)
for _ in range(num_objects):
obj_h = np.random.randint(10, 40)
obj_w = np.random.randint(10, 30)
obj_y = np.random.randint(H//4, H - obj_h)
obj_x = np.random.randint(0, W - obj_w)
color = torch.rand(3)
images[b, n, :, obj_y:obj_y+obj_h, obj_x:obj_x+obj_w] = color.view(3, 1, 1)
# Add noise
images[b, n] += torch.randn_like(images[b, n]) * 0.02
return images.clamp(0, 1)
def _generate_intrinsics(self, B: int, N: int) -> torch.Tensor:
"""Generate camera intrinsic matrices from config."""
K = torch.zeros(B, N, 3, 3)
for i, cam in enumerate(self.sensor_config.cameras):
if i >= N:
break
K[:, i, 0, 0] = cam.fx
K[:, i, 1, 1] = cam.fy
K[:, i, 0, 2] = cam.cx
K[:, i, 1, 2] = cam.cy
K[:, i, 2, 2] = 1.0
return K
def _generate_extrinsics(self, B: int, N: int) -> torch.Tensor:
"""Generate camera extrinsic matrices from config."""
T = torch.zeros(B, N, 4, 4)
for i, cam in enumerate(self.sensor_config.cameras):
if i >= N:
break
T_np = cam.placement.to_transform_matrix()
T[:, i] = torch.from_numpy(T_np).float()
return T
def _generate_ultrasonic_data(
self, B: int, N: int, scenario: str
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Generate ultrasonic distance readings and placements."""
placements = torch.zeros(B, N, 6)
for i, us in enumerate(self.sensor_config.ultrasonics):
if i >= N:
break
p = us.placement
placements[:, i] = torch.tensor([p.x, p.y, p.z, p.yaw, p.pitch, p.roll])
# Generate realistic distance readings based on scenario
if scenario == "parking":
base_dist = torch.rand(B, N, 1) * 2.0 + 0.5 # 0.5-2.5m close range
elif scenario == "urban":
base_dist = torch.rand(B, N, 1) * 3.0 + 1.0 # 1-4m
elif scenario == "highway":
base_dist = torch.rand(B, N, 1) * 4.0 + 2.0 # 2-6m (clamped later)
else: # intersection
base_dist = torch.rand(B, N, 1) * 3.5 + 0.5
# Add realistic noise
noise = torch.randn_like(base_dist) * 0.01
distances = (base_dist + noise).clamp(0.02, 5.0)
return distances, placements
def _generate_ego_state(self, B: int, scenario: str) -> torch.Tensor:
"""Generate ego vehicle state [speed, accel, steer, yaw_rate, x, y]."""
states = torch.zeros(B, 6)
max_speed = self.config.max_speed_ms
if scenario == "parking":
states[:, 0] = torch.rand(B) * 2.0 # speed: 0-2 m/s
states[:, 1] = torch.randn(B) * 0.5 # accel
states[:, 2] = torch.randn(B) * 0.3 # steer
elif scenario == "highway":
states[:, 0] = torch.rand(B) * max_speed * 0.3 + max_speed * 0.7 # 70-100% max
states[:, 1] = torch.randn(B) * 0.3
states[:, 2] = torch.randn(B) * 0.05 # minimal steering
elif scenario == "intersection":
states[:, 0] = torch.rand(B) * max_speed * 0.5 # 0-50% max
states[:, 1] = torch.randn(B) * 1.0
states[:, 2] = torch.randn(B) * 0.2
else: # urban
states[:, 0] = torch.rand(B) * max_speed * 0.7 # 0-70% max
states[:, 1] = torch.randn(B) * 0.5
states[:, 2] = torch.randn(B) * 0.15
states[:, 3] = torch.randn(B) * 0.1 # yaw rate
states[:, 4] = torch.randn(B) * 10 # x position
states[:, 5] = torch.randn(B) * 5 # y position
return states
def _generate_targets(self, B: int, scenario: str) -> Dict[str, torch.Tensor]:
"""Generate all ground truth labels."""
bev = self.bev_size
targets = {}
# Object detection heatmap (10 classes)
heatmap = torch.zeros(B, 10, bev, bev)
for b in range(B):
num_obj = np.random.randint(2, 8)
for _ in range(num_obj):
cls = np.random.randint(0, 10)
cx, cy = np.random.randint(20, bev-20), np.random.randint(20, bev-20)
sigma = np.random.uniform(2, 6)
y, x = torch.meshgrid(torch.arange(bev), torch.arange(bev), indexing='ij')
gaussian = torch.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2))
heatmap[b, cls] = torch.max(heatmap[b, cls], gaussian)
targets["gt_heatmap"] = heatmap
# Segmentation (7 classes)
seg = torch.zeros(B, bev, bev, dtype=torch.long)
seg[:, :, :] = 0 # background
seg[:, bev//4:3*bev//4, :] = 1 # drivable
for b in range(B):
# Lane lines
for lane_x in [bev//4, bev//2, 3*bev//4]:
seg[b, :, max(0,lane_x-1):min(bev,lane_x+1)] = 2
targets["gt_segmentation"] = seg
# Occupancy grid
occ = torch.zeros(B, 1, bev, bev)
occ[:, :, :bev//4, :] = 1.0 # obstacles at edges
occ[:, :, 3*bev//4:, :] = 1.0
targets["gt_occupancy"] = occ
# Behavior labels
if scenario == "parking":
targets["gt_behavior"] = torch.full((B,), 7, dtype=torch.long) # park
elif scenario == "highway":
targets["gt_behavior"] = torch.full((B,), 0, dtype=torch.long) # keep lane
else:
targets["gt_behavior"] = torch.randint(0, 5, (B,))
# Waypoints (20 waypoints, each with x, y, heading, speed)
wp = torch.zeros(B, 20, 4)
for t in range(20):
wp[:, t, 0] = t * 0.5 # x: forward 0.5m per step
wp[:, t, 1] = torch.randn(B) * 0.1 # y: slight lateral variation
wp[:, t, 2] = torch.randn(B) * 0.02 # heading: nearly straight
wp[:, t, 3] = self.config.max_speed_ms * 0.7 # target speed
targets["gt_waypoints"] = wp
# Control commands
targets["gt_steering"] = torch.randn(B) * 5.0 # degrees
targets["gt_throttle"] = torch.rand(B) * 0.5 + 0.2
targets["gt_brake"] = torch.zeros(B)
return targets
def __len__(self):
return 1000 # virtual dataset size
def __getitem__(self, idx):
"""Dataset-style access for DataLoader compatibility."""
inputs, targets = self.generate_batch(batch_size=1)
# Squeeze batch dim
inputs = {k: v.squeeze(0) for k, v in inputs.items()}
targets = {k: v.squeeze(0) for k, v in targets.items()}
return inputs, targets