Abs6187 commited on
Commit
806bdda
·
verified ·
1 Parent(s): 84c318c

Upload 11 files

Browse files
.gitattributes CHANGED
@@ -4,3 +4,5 @@
4
  *.mp4 filter=lfs diff=lfs merge=lfs -text
5
  *.avi filter=lfs diff=lfs merge=lfs -text
6
  *.mov filter=lfs diff=lfs merge=lfs -text
 
 
 
4
  *.mp4 filter=lfs diff=lfs merge=lfs -text
5
  *.avi filter=lfs diff=lfs merge=lfs -text
6
  *.mov filter=lfs diff=lfs merge=lfs -text
7
+ data/annotated_frame.png filter=lfs diff=lfs merge=lfs -text
8
+ data/frame.png filter=lfs diff=lfs merge=lfs -text
data/annotated_frame.png ADDED

Git LFS Details

  • SHA256: fd55fbf3b0800d806a901ee2e69e6ede03fd5acf72870362dbcf24a617a7ac33
  • Pointer size: 131 Bytes
  • Size of remote file: 838 kB
data/frame.png ADDED

Git LFS Details

  • SHA256: fdbb2ef02a84a2b235226a46b55b64cc23e752b6c19738c79ed814e4db40b84a
  • Pointer size: 131 Bytes
  • Size of remote file: 685 kB
data/vehicles.mp4 ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:0f2ceb75fcb9be14c74abcc88b79a1de722019992262d65c2cf1fbf5a0ff0fa8
3
+ size 6106564
data/vehicles_output.mp4 ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:b0436bb36acd10a2e41cd9748a33555794b0595ee7730e30b7d6ebceb5c75678
3
+ size 14132171
models/VisDrone_YOLO_x2.pt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:53910cdfafd0065503ffeafdd41a1acecf892551d0ab971cfc2cb869836a4558
3
+ size 10432046
models/yolov8n.pt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:f59b3d833e2ff32e194b5bb8e08d211dc7c5bdf144b90d2c8412c47ccfc83b36
3
+ size 6549796
src/__init__.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vehicle Detection and Speed Estimation Package
3
+ ===============================================
4
+
5
+ A comprehensive system for vehicle detection, tracking, counting, and speed estimation
6
+ using YOLO object detection and perspective transformation.
7
+
8
+ Authors:
9
+ - Abhay Gupta (0205CC221005)
10
+ - Aditi Lakhera (0205CC221011)
11
+ - Balraj Patel (0205CC221049)
12
+ - Bhumika Patel (0205CC221050)
13
+
14
+ Modules:
15
+ - annotator: Frame annotation and visualization
16
+ - speed_estimator: Vehicle speed calculation
17
+ - view_transformer: Perspective transformation utilities
18
+ - exceptions: Custom exception classes
19
+ """
20
+
21
+ __version__ = "1.0.0"
22
+ __author__ = "Abhay Gupta, Aditi Lakhera, Balraj Patel, Bhumika Patel"
23
+
24
+ # Import main classes for easy access
25
+ from .annotator import FrameAnnotator
26
+ from .speed_estimator import VehicleSpeedEstimator
27
+ from .view_transformer import PerspectiveTransformer
28
+ from .exceptions import (
29
+ VehicleDetectionError,
30
+ VideoProcessingError,
31
+ ModelLoadError,
32
+ ConfigurationError,
33
+ DetectionError,
34
+ TrackingError,
35
+ SpeedEstimationError
36
+ )
37
+
38
+ __all__ = [
39
+ "FrameAnnotator",
40
+ "VehicleSpeedEstimator",
41
+ "PerspectiveTransformer",
42
+ "VehicleDetectionError",
43
+ "VideoProcessingError",
44
+ "ModelLoadError",
45
+ "ConfigurationError",
46
+ "DetectionError",
47
+ "TrackingError",
48
+ "SpeedEstimationError",
49
+ ]
src/annotator.py ADDED
@@ -0,0 +1,237 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Frame Annotation Module
3
+ ========================
4
+
5
+ Provides comprehensive video frame annotation capabilities for vehicle detection
6
+ visualization including bounding boxes, labels, trajectories, and counting zones.
7
+
8
+ Authors:
9
+ - Abhay Gupta (0205CC221005)
10
+ - Aditi Lakhera (0205CC221011)
11
+ - Balraj Patel (0205CC221049)
12
+ - Bhumika Patel (0205CC221050)
13
+ """
14
+
15
+ import numpy as np
16
+ import supervision as sv
17
+ from typing import List, Optional, Tuple
18
+ import logging
19
+
20
+ logger = logging.getLogger(__name__)
21
+
22
+
23
+ class FrameAnnotator:
24
+ """
25
+ Comprehensive frame annotation system for vehicle detection visualization.
26
+
27
+ This class manages multiple annotation layers including bounding boxes,
28
+ labels, object trajectories, counting lines, and detection zones.
29
+ """
30
+
31
+ def __init__(
32
+ self,
33
+ video_resolution: Tuple[int, int],
34
+ show_boxes: bool = True,
35
+ show_labels: bool = True,
36
+ show_traces: bool = True,
37
+ show_line_zones: bool = True,
38
+ trace_length: int = 20,
39
+ zone_polygon: Optional[np.ndarray] = None
40
+ ):
41
+ """
42
+ Initialize the frame annotator.
43
+
44
+ Args:
45
+ video_resolution: Video resolution as (width, height)
46
+ show_boxes: Enable bounding box annotations
47
+ show_labels: Enable label annotations
48
+ show_traces: Enable trajectory trace annotations
49
+ show_line_zones: Enable line zone annotations
50
+ trace_length: Maximum number of points in trajectory trace
51
+ zone_polygon: Optional polygon defining detection zone
52
+ """
53
+ self.resolution = video_resolution
54
+ self.show_boxes = show_boxes
55
+ self.show_labels = show_labels
56
+ self.show_traces = show_traces
57
+ self.show_line_zones = show_line_zones
58
+ self.zone_polygon = zone_polygon
59
+
60
+ # Calculate optimal annotation parameters based on resolution
61
+ self.line_thickness = sv.calculate_optimal_line_thickness(video_resolution)
62
+ self.text_scale = sv.calculate_optimal_text_scale(video_resolution)
63
+
64
+ logger.info(f"Initializing annotator for {video_resolution[0]}x{video_resolution[1]}")
65
+ logger.info(f"Line thickness: {self.line_thickness}, Text scale: {self.text_scale}")
66
+
67
+ # Initialize annotation components
68
+ self._setup_annotators(trace_length)
69
+
70
+ def _setup_annotators(self, trace_length: int) -> None:
71
+ """
72
+ Set up individual annotation components.
73
+
74
+ Args:
75
+ trace_length: Maximum trajectory trace length
76
+ """
77
+ # Bounding box annotator
78
+ if self.show_boxes:
79
+ self.box_drawer = sv.BoxAnnotator(
80
+ thickness=self.line_thickness,
81
+ color_lookup=sv.ColorLookup.TRACK
82
+ )
83
+ logger.debug("Box annotator initialized")
84
+ else:
85
+ self.box_drawer = None
86
+
87
+ # Label annotator
88
+ if self.show_labels:
89
+ self.label_drawer = sv.LabelAnnotator(
90
+ text_thickness=self.line_thickness,
91
+ text_scale=self.text_scale,
92
+ text_position=sv.Position.BOTTOM_CENTER,
93
+ color_lookup=sv.ColorLookup.TRACK
94
+ )
95
+ logger.debug("Label annotator initialized")
96
+ else:
97
+ self.label_drawer = None
98
+
99
+ # Trajectory trace annotator
100
+ if self.show_traces:
101
+ self.trace_drawer = sv.TraceAnnotator(
102
+ thickness=self.line_thickness,
103
+ trace_length=trace_length,
104
+ position=sv.Position.BOTTOM_CENTER,
105
+ color_lookup=sv.ColorLookup.TRACK
106
+ )
107
+ logger.debug(f"Trace annotator initialized with length {trace_length}")
108
+ else:
109
+ self.trace_drawer = None
110
+
111
+ # Line zone annotator
112
+ if self.show_line_zones:
113
+ self.line_zone_drawer = sv.LineZoneAnnotator(
114
+ thickness=self.line_thickness,
115
+ text_thickness=self.line_thickness,
116
+ text_scale=self.text_scale,
117
+ color=sv.Color.WHITE,
118
+ text_color=sv.Color.BLACK,
119
+ display_in_count=True,
120
+ display_out_count=True
121
+ )
122
+ logger.debug("Line zone annotator initialized")
123
+ else:
124
+ self.line_zone_drawer = None
125
+
126
+ # Polygon zone annotator (for detection area visualization)
127
+ if self.zone_polygon is not None:
128
+ try:
129
+ zone = sv.PolygonZone(
130
+ polygon=self.zone_polygon,
131
+ frame_resolution_wh=self.resolution
132
+ )
133
+ self.polygon_drawer = sv.PolygonZoneAnnotator(
134
+ zone=zone,
135
+ color=sv.Color.GREEN,
136
+ thickness=self.line_thickness,
137
+ display_in_zone_count=False
138
+ )
139
+ logger.debug("Polygon zone annotator initialized")
140
+ except Exception as e:
141
+ logger.warning(f"Failed to initialize polygon zone: {e}")
142
+ self.polygon_drawer = None
143
+ else:
144
+ self.polygon_drawer = None
145
+
146
+ def draw_annotations(
147
+ self,
148
+ frame: np.ndarray,
149
+ detections: sv.Detections,
150
+ labels: Optional[List[str]] = None,
151
+ line_zones: Optional[List[sv.LineZone]] = None
152
+ ) -> np.ndarray:
153
+ """
154
+ Apply all enabled annotations to a video frame.
155
+
156
+ Args:
157
+ frame: Input video frame (BGR format)
158
+ detections: Detection results to annotate
159
+ labels: Optional list of labels for each detection
160
+ line_zones: Optional list of line zones for counting visualization
161
+
162
+ Returns:
163
+ Annotated frame
164
+ """
165
+ # Create a copy to avoid modifying original
166
+ annotated = frame.copy()
167
+
168
+ try:
169
+ # Draw polygon zone first (background layer)
170
+ if self.polygon_drawer is not None:
171
+ annotated = self.polygon_drawer.annotate(annotated)
172
+
173
+ # Draw bounding boxes
174
+ if self.box_drawer is not None and len(detections) > 0:
175
+ annotated = self.box_drawer.annotate(
176
+ scene=annotated,
177
+ detections=detections
178
+ )
179
+
180
+ # Draw trajectory traces
181
+ if self.trace_drawer is not None and len(detections) > 0:
182
+ annotated = self.trace_drawer.annotate(
183
+ scene=annotated,
184
+ detections=detections
185
+ )
186
+
187
+ # Draw labels
188
+ if self.label_drawer is not None and len(detections) > 0:
189
+ annotated = self.label_drawer.annotate(
190
+ scene=annotated,
191
+ detections=detections,
192
+ labels=labels
193
+ )
194
+
195
+ # Draw line zones
196
+ if self.line_zone_drawer is not None and line_zones:
197
+ for zone in line_zones:
198
+ annotated = self.line_zone_drawer.annotate(
199
+ frame=annotated,
200
+ line_counter=zone
201
+ )
202
+
203
+ return annotated
204
+
205
+ except Exception as e:
206
+ logger.error(f"Error during annotation: {e}")
207
+ # Return original frame if annotation fails
208
+ return frame
209
+
210
+ def update_settings(
211
+ self,
212
+ show_boxes: Optional[bool] = None,
213
+ show_labels: Optional[bool] = None,
214
+ show_traces: Optional[bool] = None,
215
+ show_line_zones: Optional[bool] = None
216
+ ) -> None:
217
+ """
218
+ Update annotation settings dynamically.
219
+
220
+ Args:
221
+ show_boxes: Enable/disable bounding boxes
222
+ show_labels: Enable/disable labels
223
+ show_traces: Enable/disable traces
224
+ show_line_zones: Enable/disable line zones
225
+ """
226
+ if show_boxes is not None:
227
+ self.show_boxes = show_boxes
228
+ if show_labels is not None:
229
+ self.show_labels = show_labels
230
+ if show_traces is not None:
231
+ self.show_traces = show_traces
232
+ if show_line_zones is not None:
233
+ self.show_line_zones = show_line_zones
234
+
235
+ logger.info(f"Annotation settings updated: boxes={self.show_boxes}, "
236
+ f"labels={self.show_labels}, traces={self.show_traces}, "
237
+ f"zones={self.show_line_zones}")
src/exceptions.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Custom Exception Classes for Vehicle Detection System
3
+ ======================================================
4
+
5
+ Defines custom exceptions for better error handling and user feedback.
6
+
7
+ Authors:
8
+ - Abhay Gupta (0205CC221005)
9
+ - Aditi Lakhera (0205CC221011)
10
+ - Balraj Patel (0205CC221049)
11
+ - Bhumika Patel (0205CC221050)
12
+ """
13
+
14
+
15
+ class VehicleDetectionError(Exception):
16
+ """Base exception for vehicle detection system."""
17
+ pass
18
+
19
+
20
+ class VideoProcessingError(VehicleDetectionError):
21
+ """Raised when video processing fails."""
22
+ pass
23
+
24
+
25
+ class ModelLoadError(VehicleDetectionError):
26
+ """Raised when model loading fails."""
27
+ pass
28
+
29
+
30
+ class ConfigurationError(VehicleDetectionError):
31
+ """Raised when configuration is invalid."""
32
+ pass
33
+
34
+
35
+ class DetectionError(VehicleDetectionError):
36
+ """Raised when object detection fails."""
37
+ pass
38
+
39
+
40
+ class TrackingError(VehicleDetectionError):
41
+ """Raised when object tracking fails."""
42
+ pass
43
+
44
+
45
+ class SpeedEstimationError(VehicleDetectionError):
46
+ """Raised when speed estimation fails."""
47
+ pass
src/speed_estimator.py ADDED
@@ -0,0 +1,236 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vehicle Speed Estimation Module
3
+ ================================
4
+
5
+ Implements speed calculation for tracked vehicles using perspective transformation
6
+ and temporal position tracking with smoothing and outlier detection.
7
+
8
+ Authors:
9
+ - Abhay Gupta (0205CC221005)
10
+ - Aditi Lakhera (0205CC221011)
11
+ - Balraj Patel (0205CC221049)
12
+ - Bhumika Patel (0205CC221050)
13
+
14
+ Technical Approach:
15
+ - Tracks vehicle positions across frames in transformed coordinate space
16
+ - Calculates displacement over time windows
17
+ - Applies smoothing to reduce noise
18
+ - Converts to desired speed units
19
+ """
20
+
21
+ import numpy as np
22
+ import supervision as sv
23
+ from collections import defaultdict, deque
24
+ from typing import Dict, Optional
25
+ import logging
26
+
27
+ from .view_transformer import PerspectiveTransformer
28
+
29
+ logger = logging.getLogger(__name__)
30
+
31
+
32
+ class VehicleSpeedEstimator:
33
+ """
34
+ Estimates vehicle speeds using perspective-corrected position tracking.
35
+
36
+ This class maintains a history of vehicle positions in real-world coordinates
37
+ and calculates speeds based on displacement over time.
38
+ """
39
+
40
+ def __init__(
41
+ self,
42
+ fps: int,
43
+ transformer: PerspectiveTransformer,
44
+ history_duration: int = 1,
45
+ speed_unit: str = "km/h",
46
+ min_frames_for_speed: Optional[int] = None
47
+ ):
48
+ """
49
+ Initialize the speed estimator.
50
+
51
+ Args:
52
+ fps: Video frames per second
53
+ transformer: Perspective transformation instance
54
+ history_duration: Time window for speed calculation (seconds)
55
+ speed_unit: Output speed unit ("km/h", "mph", or "m/s")
56
+ min_frames_for_speed: Minimum frames needed for speed calculation
57
+ (defaults to fps/2)
58
+ """
59
+ self.fps = fps
60
+ self.transformer = transformer
61
+ self.history_duration = history_duration
62
+ self.speed_unit = speed_unit
63
+
64
+ # Calculate minimum frames needed for reliable speed estimation
65
+ self.min_frames = min_frames_for_speed or max(int(fps / 2), 5)
66
+
67
+ # Maximum history length in frames
68
+ max_history_frames = int(fps * history_duration)
69
+
70
+ # Store position history for each tracked object
71
+ # Key: tracker_id, Value: deque of (x, y) positions in real-world coordinates
72
+ self.position_history: Dict[int, deque] = defaultdict(
73
+ lambda: deque(maxlen=max_history_frames)
74
+ )
75
+
76
+ # Speed unit conversion factors (from m/s)
77
+ self.unit_conversions = {
78
+ "km/h": 3.6,
79
+ "mph": 2.23694,
80
+ "m/s": 1.0
81
+ }
82
+
83
+ if speed_unit not in self.unit_conversions:
84
+ raise ValueError(f"Invalid speed unit: {speed_unit}")
85
+
86
+ self.conversion_factor = self.unit_conversions[speed_unit]
87
+
88
+ logger.info(f"Speed estimator initialized: {fps}fps, {history_duration}s history, "
89
+ f"unit={speed_unit}, min_frames={self.min_frames}")
90
+
91
+ def _calculate_speed_for_vehicle(self, tracker_id: int) -> Optional[float]:
92
+ """
93
+ Calculate speed for a specific tracked vehicle.
94
+
95
+ Args:
96
+ tracker_id: Unique tracker ID
97
+
98
+ Returns:
99
+ Speed in configured units, or None if insufficient data
100
+ """
101
+ positions = self.position_history[tracker_id]
102
+
103
+ # Need sufficient position history
104
+ if len(positions) < self.min_frames:
105
+ return None
106
+
107
+ try:
108
+ # Get first and last positions
109
+ start_pos = positions[0]
110
+ end_pos = positions[-1]
111
+
112
+ # Calculate Euclidean distance in real-world coordinates (meters)
113
+ displacement = np.linalg.norm(end_pos - start_pos)
114
+
115
+ # Calculate time elapsed
116
+ time_elapsed = len(positions) / self.fps
117
+
118
+ # Avoid division by zero
119
+ if time_elapsed == 0:
120
+ return None
121
+
122
+ # Calculate speed in m/s
123
+ speed_ms = displacement / time_elapsed
124
+
125
+ # Convert to desired unit
126
+ speed = speed_ms * self.conversion_factor
127
+
128
+ # Apply reasonable bounds (0-300 km/h equivalent)
129
+ max_speed = 300 * self.unit_conversions["km/h"] / self.conversion_factor
130
+ if speed < 0 or speed > max_speed:
131
+ logger.debug(f"Outlier speed detected for vehicle {tracker_id}: {speed:.1f}")
132
+ return None
133
+
134
+ return speed
135
+
136
+ except Exception as e:
137
+ logger.warning(f"Error calculating speed for vehicle {tracker_id}: {e}")
138
+ return None
139
+
140
+ def _apply_smoothing(self, speeds: np.ndarray, window_size: int = 3) -> np.ndarray:
141
+ """
142
+ Apply moving average smoothing to speed values.
143
+
144
+ Args:
145
+ speeds: Array of speed values
146
+ window_size: Smoothing window size
147
+
148
+ Returns:
149
+ Smoothed speed array
150
+ """
151
+ if len(speeds) < window_size:
152
+ return speeds
153
+
154
+ # Simple moving average
155
+ smoothed = np.convolve(speeds, np.ones(window_size)/window_size, mode='same')
156
+ return smoothed
157
+
158
+ def estimate(self, detections: sv.Detections) -> sv.Detections:
159
+ """
160
+ Estimate speeds for all detected vehicles in current frame.
161
+
162
+ Args:
163
+ detections: Detection results from current frame
164
+
165
+ Returns:
166
+ Updated detections with 'speed' field added to data
167
+ """
168
+ # Initialize speed array
169
+ speeds = []
170
+
171
+ # Check if we have tracker IDs
172
+ if not hasattr(detections, 'tracker_id') or detections.tracker_id is None:
173
+ logger.warning("No tracker IDs found in detections")
174
+ detections.data["speed"] = np.zeros(len(detections))
175
+ return detections
176
+
177
+ # Get anchor points (bottom center of bounding boxes)
178
+ anchor_points = detections.get_anchors_coordinates(
179
+ anchor=sv.Position.BOTTOM_CENTER
180
+ )
181
+
182
+ # Transform points to real-world coordinates
183
+ try:
184
+ transformed_points = self.transformer.apply_transformation(anchor_points)
185
+ except Exception as e:
186
+ logger.error(f"Error transforming points: {e}")
187
+ detections.data["speed"] = np.zeros(len(detections))
188
+ return detections
189
+
190
+ # Process each detection
191
+ for tracker_id, point in zip(detections.tracker_id, transformed_points):
192
+ # Add position to history
193
+ self.position_history[tracker_id].append(point)
194
+
195
+ # Calculate speed
196
+ speed = self._calculate_speed_for_vehicle(tracker_id)
197
+
198
+ # Use 0 if speed cannot be calculated
199
+ speeds.append(speed if speed is not None else 0.0)
200
+
201
+ # Convert to numpy array and round
202
+ speeds = np.array(speeds)
203
+ speeds = np.round(speeds).astype(int)
204
+
205
+ # Add to detections data
206
+ detections.data["speed"] = speeds
207
+
208
+ return detections
209
+
210
+ def reset(self) -> None:
211
+ """Clear all position history."""
212
+ self.position_history.clear()
213
+ logger.info("Speed estimator history cleared")
214
+
215
+ def get_tracked_vehicle_count(self) -> int:
216
+ """
217
+ Get number of currently tracked vehicles.
218
+
219
+ Returns:
220
+ Number of vehicles with position history
221
+ """
222
+ return len(self.position_history)
223
+
224
+ def cleanup_old_tracks(self, active_ids: set) -> None:
225
+ """
226
+ Remove position history for vehicles no longer being tracked.
227
+
228
+ Args:
229
+ active_ids: Set of currently active tracker IDs
230
+ """
231
+ inactive_ids = set(self.position_history.keys()) - active_ids
232
+ for tracker_id in inactive_ids:
233
+ del self.position_history[tracker_id]
234
+
235
+ if inactive_ids:
236
+ logger.debug(f"Cleaned up {len(inactive_ids)} inactive tracks")
src/view_transformer.py ADDED
@@ -0,0 +1,254 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Perspective Transformation Module
3
+ ==================================
4
+
5
+ Implements perspective transformation for converting camera view coordinates
6
+ to real-world top-down coordinates, essential for accurate speed calculation.
7
+
8
+ Authors:
9
+ - Abhay Gupta (0205CC221005)
10
+ - Aditi Lakhera (0205CC221011)
11
+ - Balraj Patel (0205CC221049)
12
+ - Bhumika Patel (0205CC221050)
13
+
14
+ Mathematical Background:
15
+ Perspective transformation uses a 3x3 homography matrix to map points
16
+ from one plane to another. This is crucial for converting pixel coordinates
17
+ to real-world measurements.
18
+ """
19
+
20
+ import cv2
21
+ import numpy as np
22
+ from typing import Tuple
23
+ import logging
24
+
25
+ logger = logging.getLogger(__name__)
26
+
27
+
28
+ class PerspectiveTransformer:
29
+ """
30
+ Handles perspective transformation between camera view and real-world coordinates.
31
+
32
+ This class computes and applies homography transformations to convert
33
+ image coordinates to a top-down view with real-world measurements.
34
+ """
35
+
36
+ def __init__(
37
+ self,
38
+ source_points: np.ndarray,
39
+ target_points: np.ndarray
40
+ ):
41
+ """
42
+ Initialize the perspective transformer.
43
+
44
+ Args:
45
+ source_points: 4 points in source image (camera view)
46
+ Shape: (4, 2) with [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
47
+ target_points: 4 corresponding points in target space (real-world)
48
+ Shape: (4, 2) with same format
49
+
50
+ Raises:
51
+ ValueError: If points are invalid or transformation cannot be computed
52
+ """
53
+ # Validate inputs
54
+ self._validate_points(source_points, "source")
55
+ self._validate_points(target_points, "target")
56
+
57
+ # Store points as float32 (required by OpenCV)
58
+ self.source_pts = source_points.astype(np.float32)
59
+ self.target_pts = target_points.astype(np.float32)
60
+
61
+ # Compute transformation matrix
62
+ self.matrix = self._compute_transformation_matrix()
63
+
64
+ # Compute inverse transformation (for reverse mapping if needed)
65
+ self.inverse_matrix = self._compute_inverse_matrix()
66
+
67
+ logger.info("Perspective transformer initialized successfully")
68
+ logger.debug(f"Source points:\n{self.source_pts}")
69
+ logger.debug(f"Target points:\n{self.target_pts}")
70
+
71
+ def _validate_points(self, points: np.ndarray, name: str) -> None:
72
+ """
73
+ Validate point array format and values.
74
+
75
+ Args:
76
+ points: Points array to validate
77
+ name: Name for error messages
78
+
79
+ Raises:
80
+ ValueError: If points are invalid
81
+ """
82
+ if not isinstance(points, np.ndarray):
83
+ raise ValueError(f"{name} points must be a numpy array")
84
+
85
+ if points.shape != (4, 2):
86
+ raise ValueError(
87
+ f"{name} points must have shape (4, 2), got {points.shape}"
88
+ )
89
+
90
+ if not np.isfinite(points).all():
91
+ raise ValueError(f"{name} points contain invalid values (NaN or Inf)")
92
+
93
+ def _compute_transformation_matrix(self) -> np.ndarray:
94
+ """
95
+ Compute the perspective transformation matrix.
96
+
97
+ Returns:
98
+ 3x3 homography matrix
99
+
100
+ Raises:
101
+ ValueError: If transformation cannot be computed
102
+ """
103
+ try:
104
+ matrix = cv2.getPerspectiveTransform(
105
+ self.source_pts,
106
+ self.target_pts
107
+ )
108
+
109
+ # Validate matrix
110
+ if matrix is None or not np.isfinite(matrix).all():
111
+ raise ValueError("Invalid transformation matrix computed")
112
+
113
+ logger.debug(f"Transformation matrix:\n{matrix}")
114
+ return matrix
115
+
116
+ except cv2.error as e:
117
+ raise ValueError(f"Failed to compute perspective transform: {e}")
118
+
119
+ def _compute_inverse_matrix(self) -> np.ndarray:
120
+ """
121
+ Compute the inverse transformation matrix.
122
+
123
+ Returns:
124
+ 3x3 inverse homography matrix
125
+ """
126
+ try:
127
+ inverse = cv2.getPerspectiveTransform(
128
+ self.target_pts,
129
+ self.source_pts
130
+ )
131
+ return inverse
132
+ except Exception as e:
133
+ logger.warning(f"Could not compute inverse matrix: {e}")
134
+ return None
135
+
136
+ def apply_transformation(self, points: np.ndarray) -> np.ndarray:
137
+ """
138
+ Transform points from source to target coordinate system.
139
+
140
+ Args:
141
+ points: Array of points to transform
142
+ Shape: (N, 2) where N is number of points
143
+
144
+ Returns:
145
+ Transformed points in target coordinate system
146
+ Shape: (N, 2)
147
+
148
+ Raises:
149
+ ValueError: If points have invalid shape
150
+ """
151
+ # Handle empty input
152
+ if points.size == 0:
153
+ return points
154
+
155
+ # Validate input shape
156
+ if len(points.shape) != 2 or points.shape[1] != 2:
157
+ raise ValueError(
158
+ f"Points must have shape (N, 2), got {points.shape}"
159
+ )
160
+
161
+ try:
162
+ # Reshape for OpenCV: (N, 1, 2)
163
+ points_reshaped = points.reshape(-1, 1, 2).astype(np.float32)
164
+
165
+ # Apply transformation
166
+ transformed = cv2.perspectiveTransform(
167
+ points_reshaped,
168
+ self.matrix
169
+ )
170
+
171
+ # Reshape back to (N, 2)
172
+ result = transformed.reshape(-1, 2)
173
+
174
+ return result
175
+
176
+ except Exception as e:
177
+ logger.error(f"Error applying transformation: {e}")
178
+ raise ValueError(f"Transformation failed: {e}")
179
+
180
+ def apply_inverse_transformation(self, points: np.ndarray) -> np.ndarray:
181
+ """
182
+ Transform points from target back to source coordinate system.
183
+
184
+ Args:
185
+ points: Array of points in target coordinates
186
+ Shape: (N, 2)
187
+
188
+ Returns:
189
+ Points in source coordinate system
190
+ Shape: (N, 2)
191
+
192
+ Raises:
193
+ ValueError: If inverse matrix not available or transformation fails
194
+ """
195
+ if self.inverse_matrix is None:
196
+ raise ValueError("Inverse transformation matrix not available")
197
+
198
+ if points.size == 0:
199
+ return points
200
+
201
+ try:
202
+ points_reshaped = points.reshape(-1, 1, 2).astype(np.float32)
203
+ transformed = cv2.perspectiveTransform(
204
+ points_reshaped,
205
+ self.inverse_matrix
206
+ )
207
+ return transformed.reshape(-1, 2)
208
+
209
+ except Exception as e:
210
+ logger.error(f"Error applying inverse transformation: {e}")
211
+ raise ValueError(f"Inverse transformation failed: {e}")
212
+
213
+ def transform_single_point(self, x: float, y: float) -> Tuple[float, float]:
214
+ """
215
+ Transform a single point (convenience method).
216
+
217
+ Args:
218
+ x: X coordinate in source system
219
+ y: Y coordinate in source system
220
+
221
+ Returns:
222
+ Tuple of (x, y) in target system
223
+ """
224
+ point = np.array([[x, y]], dtype=np.float32)
225
+ transformed = self.apply_transformation(point)
226
+ return tuple(transformed[0])
227
+
228
+ def get_transformation_matrix(self) -> np.ndarray:
229
+ """
230
+ Get the transformation matrix.
231
+
232
+ Returns:
233
+ 3x3 homography matrix
234
+ """
235
+ return self.matrix.copy()
236
+
237
+ def get_scale_factors(self) -> Tuple[float, float]:
238
+ """
239
+ Estimate scale factors in x and y directions.
240
+
241
+ Returns:
242
+ Tuple of (scale_x, scale_y) representing pixels per meter
243
+ """
244
+ # Transform corners to estimate scaling
245
+ source_width = np.linalg.norm(self.source_pts[1] - self.source_pts[0])
246
+ source_height = np.linalg.norm(self.source_pts[3] - self.source_pts[0])
247
+
248
+ target_width = np.linalg.norm(self.target_pts[1] - self.target_pts[0])
249
+ target_height = np.linalg.norm(self.target_pts[3] - self.target_pts[0])
250
+
251
+ scale_x = source_width / target_width if target_width > 0 else 1.0
252
+ scale_y = source_height / target_height if target_height > 0 else 1.0
253
+
254
+ return scale_x, scale_y