File size: 8,416 Bytes
806bdda
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""

Vehicle Speed Estimation Module

================================



Implements speed calculation for tracked vehicles using perspective transformation

and temporal position tracking with smoothing and outlier detection.



Authors:

    - Abhay Gupta (0205CC221005)

    - Aditi Lakhera (0205CC221011)

    - Balraj Patel (0205CC221049)

    - Bhumika Patel (0205CC221050)



Technical Approach:

    - Tracks vehicle positions across frames in transformed coordinate space

    - Calculates displacement over time windows

    - Applies smoothing to reduce noise

    - Converts to desired speed units

"""

import numpy as np
import supervision as sv
from collections import defaultdict, deque
from typing import Dict, Optional
import logging

from .view_transformer import PerspectiveTransformer

logger = logging.getLogger(__name__)


class VehicleSpeedEstimator:
    """

    Estimates vehicle speeds using perspective-corrected position tracking.

    

    This class maintains a history of vehicle positions in real-world coordinates

    and calculates speeds based on displacement over time.

    """
    
    def __init__(

        self,

        fps: int,

        transformer: PerspectiveTransformer,

        history_duration: int = 1,

        speed_unit: str = "km/h",

        min_frames_for_speed: Optional[int] = None

    ):
        """

        Initialize the speed estimator.

        

        Args:

            fps: Video frames per second

            transformer: Perspective transformation instance

            history_duration: Time window for speed calculation (seconds)

            speed_unit: Output speed unit ("km/h", "mph", or "m/s")

            min_frames_for_speed: Minimum frames needed for speed calculation

                                 (defaults to fps/2)

        """
        self.fps = fps
        self.transformer = transformer
        self.history_duration = history_duration
        self.speed_unit = speed_unit
        
        # Calculate minimum frames needed for reliable speed estimation
        self.min_frames = min_frames_for_speed or max(int(fps / 2), 5)
        
        # Maximum history length in frames
        max_history_frames = int(fps * history_duration)
        
        # Store position history for each tracked object
        # Key: tracker_id, Value: deque of (x, y) positions in real-world coordinates
        self.position_history: Dict[int, deque] = defaultdict(
            lambda: deque(maxlen=max_history_frames)
        )
        
        # Speed unit conversion factors (from m/s)
        self.unit_conversions = {
            "km/h": 3.6,
            "mph": 2.23694,
            "m/s": 1.0
        }
        
        if speed_unit not in self.unit_conversions:
            raise ValueError(f"Invalid speed unit: {speed_unit}")
        
        self.conversion_factor = self.unit_conversions[speed_unit]
        
        logger.info(f"Speed estimator initialized: {fps}fps, {history_duration}s history, "
                   f"unit={speed_unit}, min_frames={self.min_frames}")
    
    def _calculate_speed_for_vehicle(self, tracker_id: int) -> Optional[float]:
        """

        Calculate speed for a specific tracked vehicle.

        

        Args:

            tracker_id: Unique tracker ID

            

        Returns:

            Speed in configured units, or None if insufficient data

        """
        positions = self.position_history[tracker_id]
        
        # Need sufficient position history
        if len(positions) < self.min_frames:
            return None
        
        try:
            # Get first and last positions
            start_pos = positions[0]
            end_pos = positions[-1]
            
            # Calculate Euclidean distance in real-world coordinates (meters)
            displacement = np.linalg.norm(end_pos - start_pos)
            
            # Calculate time elapsed
            time_elapsed = len(positions) / self.fps
            
            # Avoid division by zero
            if time_elapsed == 0:
                return None
            
            # Calculate speed in m/s
            speed_ms = displacement / time_elapsed
            
            # Convert to desired unit
            speed = speed_ms * self.conversion_factor
            
            # Apply reasonable bounds (0-300 km/h equivalent)
            max_speed = 300 * self.unit_conversions["km/h"] / self.conversion_factor
            if speed < 0 or speed > max_speed:
                logger.debug(f"Outlier speed detected for vehicle {tracker_id}: {speed:.1f}")
                return None
            
            return speed
            
        except Exception as e:
            logger.warning(f"Error calculating speed for vehicle {tracker_id}: {e}")
            return None
    
    def _apply_smoothing(self, speeds: np.ndarray, window_size: int = 3) -> np.ndarray:
        """

        Apply moving average smoothing to speed values.

        

        Args:

            speeds: Array of speed values

            window_size: Smoothing window size

            

        Returns:

            Smoothed speed array

        """
        if len(speeds) < window_size:
            return speeds
        
        # Simple moving average
        smoothed = np.convolve(speeds, np.ones(window_size)/window_size, mode='same')
        return smoothed
    
    def estimate(self, detections: sv.Detections) -> sv.Detections:
        """

        Estimate speeds for all detected vehicles in current frame.

        

        Args:

            detections: Detection results from current frame

            

        Returns:

            Updated detections with 'speed' field added to data

        """
        # Initialize speed array
        speeds = []
        
        # Check if we have tracker IDs
        if not hasattr(detections, 'tracker_id') or detections.tracker_id is None:
            logger.warning("No tracker IDs found in detections")
            detections.data["speed"] = np.zeros(len(detections))
            return detections
        
        # Get anchor points (bottom center of bounding boxes)
        anchor_points = detections.get_anchors_coordinates(
            anchor=sv.Position.BOTTOM_CENTER
        )
        
        # Transform points to real-world coordinates
        try:
            transformed_points = self.transformer.apply_transformation(anchor_points)
        except Exception as e:
            logger.error(f"Error transforming points: {e}")
            detections.data["speed"] = np.zeros(len(detections))
            return detections
        
        # Process each detection
        for tracker_id, point in zip(detections.tracker_id, transformed_points):
            # Add position to history
            self.position_history[tracker_id].append(point)
            
            # Calculate speed
            speed = self._calculate_speed_for_vehicle(tracker_id)
            
            # Use 0 if speed cannot be calculated
            speeds.append(speed if speed is not None else 0.0)
        
        # Convert to numpy array and round
        speeds = np.array(speeds)
        speeds = np.round(speeds).astype(int)
        
        # Add to detections data
        detections.data["speed"] = speeds
        
        return detections
    
    def reset(self) -> None:
        """Clear all position history."""
        self.position_history.clear()
        logger.info("Speed estimator history cleared")
    
    def get_tracked_vehicle_count(self) -> int:
        """

        Get number of currently tracked vehicles.

        

        Returns:

            Number of vehicles with position history

        """
        return len(self.position_history)
    
    def cleanup_old_tracks(self, active_ids: set) -> None:
        """

        Remove position history for vehicles no longer being tracked.

        

        Args:

            active_ids: Set of currently active tracker IDs

        """
        inactive_ids = set(self.position_history.keys()) - active_ids
        for tracker_id in inactive_ids:
            del self.position_history[tracker_id]
        
        if inactive_ids:
            logger.debug(f"Cleaned up {len(inactive_ids)} inactive tracks")