File size: 11,318 Bytes
58174b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
Synthetic data generation for training and testing the FSD model.
Generates realistic simulated scenarios including:
- Camera images (synthetic patterns representing road scenes)
- Ultrasonic distance readings
- Ground truth labels for all perception/planning/control tasks
"""

import torch
import numpy as np
from typing import Dict, Optional, Tuple
import math

from .config import VehicleConfig


class FSDDataGenerator:
    """
    Generates synthetic training data for the FSD model.
    Can produce:
    - Simulated camera images
    - Ultrasonic distance readings with noise
    - Ground truth detection heatmaps
    - Ground truth segmentation maps
    - Ground truth occupancy grids
    - Ground truth waypoints and control commands
    """
    
    def __init__(
        self,
        vehicle_config: Optional[VehicleConfig] = None,
        bev_size: int = 200,
        image_size: Tuple[int, int] = (480, 640),  # H, W
    ):
        if vehicle_config is None:
            vehicle_config = VehicleConfig()
        self.config = vehicle_config
        self.sensor_config = vehicle_config.sensor_config
        self.bev_size = bev_size
        self.image_size = image_size
    
    def generate_batch(
        self, 
        batch_size: int = 4, 
        scenario: str = "urban",
        device: str = "cpu",
    ) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]:
        """
        Generate a batch of synthetic data.
        
        Args:
            batch_size: Number of samples
            scenario: "urban", "highway", "parking", "intersection"
            device: torch device
        
        Returns:
            inputs: Dict of model inputs
            targets: Dict of ground truth labels
        """
        B = batch_size
        N_cam = self.sensor_config.num_cameras
        N_us = self.sensor_config.num_ultrasonics
        H, W = self.image_size
        
        # ── Generate Camera Data ──
        camera_images = self._generate_camera_images(B, N_cam, H, W, scenario)
        camera_intrinsics = self._generate_intrinsics(B, N_cam)
        camera_extrinsics = self._generate_extrinsics(B, N_cam)
        
        # ── Generate Ultrasonic Data ──
        us_distances, us_placements = self._generate_ultrasonic_data(B, N_us, scenario)
        
        # ── Generate Ego State ──
        ego_state = self._generate_ego_state(B, scenario)
        
        # ── Generate Navigation Command ──
        nav_command = torch.randint(0, 10, (B,))
        
        # ── Generate Ground Truth ──
        targets = self._generate_targets(B, scenario)
        
        inputs = {
            "camera_images": camera_images.to(device),
            "camera_intrinsics": camera_intrinsics.to(device),
            "camera_extrinsics": camera_extrinsics.to(device),
            "ultrasonic_distances": us_distances.to(device),
            "ultrasonic_placements": us_placements.to(device),
            "ego_state": ego_state.to(device),
            "nav_command": nav_command.to(device),
        }
        
        targets = {k: v.to(device) for k, v in targets.items()}
        
        return inputs, targets
    
    def _generate_camera_images(
        self, B: int, N: int, H: int, W: int, scenario: str
    ) -> torch.Tensor:
        """Generate synthetic camera images with road-like patterns."""
        images = torch.zeros(B, N, 3, H, W)
        
        for b in range(B):
            for n in range(N):
                # Sky region (top half)
                sky_color = torch.tensor([0.5, 0.7, 0.9]) + torch.randn(3) * 0.05
                images[b, n, :, :H//3, :] = sky_color.view(3, 1, 1)
                
                # Road region (bottom half)
                road_gray = 0.3 + torch.randn(1) * 0.05
                images[b, n, :, H//3:, :] = road_gray
                
                # Lane lines (white stripes)
                lane_y = torch.arange(H//3, H)
                for lane_x in [W//4, W//2, 3*W//4]:
                    x_start = max(0, lane_x - 2)
                    x_end = min(W, lane_x + 2)
                    images[b, n, :, H//3:, x_start:x_end] = 0.9
                
                # Add some random "objects" (colored rectangles)
                if scenario in ["urban", "intersection"]:
                    num_objects = np.random.randint(1, 5)
                    for _ in range(num_objects):
                        obj_h = np.random.randint(10, 40)
                        obj_w = np.random.randint(10, 30)
                        obj_y = np.random.randint(H//4, H - obj_h)
                        obj_x = np.random.randint(0, W - obj_w)
                        color = torch.rand(3)
                        images[b, n, :, obj_y:obj_y+obj_h, obj_x:obj_x+obj_w] = color.view(3, 1, 1)
                
                # Add noise
                images[b, n] += torch.randn_like(images[b, n]) * 0.02
        
        return images.clamp(0, 1)
    
    def _generate_intrinsics(self, B: int, N: int) -> torch.Tensor:
        """Generate camera intrinsic matrices from config."""
        K = torch.zeros(B, N, 3, 3)
        for i, cam in enumerate(self.sensor_config.cameras):
            if i >= N:
                break
            K[:, i, 0, 0] = cam.fx
            K[:, i, 1, 1] = cam.fy
            K[:, i, 0, 2] = cam.cx
            K[:, i, 1, 2] = cam.cy
            K[:, i, 2, 2] = 1.0
        return K
    
    def _generate_extrinsics(self, B: int, N: int) -> torch.Tensor:
        """Generate camera extrinsic matrices from config."""
        T = torch.zeros(B, N, 4, 4)
        for i, cam in enumerate(self.sensor_config.cameras):
            if i >= N:
                break
            T_np = cam.placement.to_transform_matrix()
            T[:, i] = torch.from_numpy(T_np).float()
        return T
    
    def _generate_ultrasonic_data(
        self, B: int, N: int, scenario: str
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """Generate ultrasonic distance readings and placements."""
        placements = torch.zeros(B, N, 6)
        for i, us in enumerate(self.sensor_config.ultrasonics):
            if i >= N:
                break
            p = us.placement
            placements[:, i] = torch.tensor([p.x, p.y, p.z, p.yaw, p.pitch, p.roll])
        
        # Generate realistic distance readings based on scenario
        if scenario == "parking":
            base_dist = torch.rand(B, N, 1) * 2.0 + 0.5  # 0.5-2.5m close range
        elif scenario == "urban":
            base_dist = torch.rand(B, N, 1) * 3.0 + 1.0  # 1-4m
        elif scenario == "highway":
            base_dist = torch.rand(B, N, 1) * 4.0 + 2.0  # 2-6m (clamped later)
        else:  # intersection
            base_dist = torch.rand(B, N, 1) * 3.5 + 0.5
        
        # Add realistic noise
        noise = torch.randn_like(base_dist) * 0.01
        distances = (base_dist + noise).clamp(0.02, 5.0)
        
        return distances, placements
    
    def _generate_ego_state(self, B: int, scenario: str) -> torch.Tensor:
        """Generate ego vehicle state [speed, accel, steer, yaw_rate, x, y]."""
        states = torch.zeros(B, 6)
        
        max_speed = self.config.max_speed_ms
        
        if scenario == "parking":
            states[:, 0] = torch.rand(B) * 2.0              # speed: 0-2 m/s
            states[:, 1] = torch.randn(B) * 0.5              # accel
            states[:, 2] = torch.randn(B) * 0.3              # steer
        elif scenario == "highway":
            states[:, 0] = torch.rand(B) * max_speed * 0.3 + max_speed * 0.7  # 70-100% max
            states[:, 1] = torch.randn(B) * 0.3
            states[:, 2] = torch.randn(B) * 0.05             # minimal steering
        elif scenario == "intersection":
            states[:, 0] = torch.rand(B) * max_speed * 0.5   # 0-50% max
            states[:, 1] = torch.randn(B) * 1.0
            states[:, 2] = torch.randn(B) * 0.2
        else:  # urban
            states[:, 0] = torch.rand(B) * max_speed * 0.7   # 0-70% max
            states[:, 1] = torch.randn(B) * 0.5
            states[:, 2] = torch.randn(B) * 0.15
        
        states[:, 3] = torch.randn(B) * 0.1  # yaw rate
        states[:, 4] = torch.randn(B) * 10    # x position
        states[:, 5] = torch.randn(B) * 5     # y position
        
        return states
    
    def _generate_targets(self, B: int, scenario: str) -> Dict[str, torch.Tensor]:
        """Generate all ground truth labels."""
        bev = self.bev_size
        
        targets = {}
        
        # Object detection heatmap (10 classes)
        heatmap = torch.zeros(B, 10, bev, bev)
        for b in range(B):
            num_obj = np.random.randint(2, 8)
            for _ in range(num_obj):
                cls = np.random.randint(0, 10)
                cx, cy = np.random.randint(20, bev-20), np.random.randint(20, bev-20)
                sigma = np.random.uniform(2, 6)
                y, x = torch.meshgrid(torch.arange(bev), torch.arange(bev), indexing='ij')
                gaussian = torch.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2))
                heatmap[b, cls] = torch.max(heatmap[b, cls], gaussian)
        targets["gt_heatmap"] = heatmap
        
        # Segmentation (7 classes)
        seg = torch.zeros(B, bev, bev, dtype=torch.long)
        seg[:, :, :] = 0  # background
        seg[:, bev//4:3*bev//4, :] = 1  # drivable
        for b in range(B):
            # Lane lines
            for lane_x in [bev//4, bev//2, 3*bev//4]:
                seg[b, :, max(0,lane_x-1):min(bev,lane_x+1)] = 2
        targets["gt_segmentation"] = seg
        
        # Occupancy grid
        occ = torch.zeros(B, 1, bev, bev)
        occ[:, :, :bev//4, :] = 1.0     # obstacles at edges
        occ[:, :, 3*bev//4:, :] = 1.0
        targets["gt_occupancy"] = occ
        
        # Behavior labels
        if scenario == "parking":
            targets["gt_behavior"] = torch.full((B,), 7, dtype=torch.long)  # park
        elif scenario == "highway":
            targets["gt_behavior"] = torch.full((B,), 0, dtype=torch.long)  # keep lane
        else:
            targets["gt_behavior"] = torch.randint(0, 5, (B,))
        
        # Waypoints (20 waypoints, each with x, y, heading, speed)
        wp = torch.zeros(B, 20, 4)
        for t in range(20):
            wp[:, t, 0] = t * 0.5  # x: forward 0.5m per step
            wp[:, t, 1] = torch.randn(B) * 0.1  # y: slight lateral variation
            wp[:, t, 2] = torch.randn(B) * 0.02  # heading: nearly straight
            wp[:, t, 3] = self.config.max_speed_ms * 0.7  # target speed
        targets["gt_waypoints"] = wp
        
        # Control commands
        targets["gt_steering"] = torch.randn(B) * 5.0  # degrees
        targets["gt_throttle"] = torch.rand(B) * 0.5 + 0.2
        targets["gt_brake"] = torch.zeros(B)
        
        return targets
    
    def __len__(self):
        return 1000  # virtual dataset size
    
    def __getitem__(self, idx):
        """Dataset-style access for DataLoader compatibility."""
        inputs, targets = self.generate_batch(batch_size=1)
        # Squeeze batch dim
        inputs = {k: v.squeeze(0) for k, v in inputs.items()}
        targets = {k: v.squeeze(0) for k, v in targets.items()}
        return inputs, targets