| """
|
| Vehicle Detection, Tracking, Counting, and Speed Estimation System
|
| ===================================================================
|
|
|
| A comprehensive computer vision pipeline for analyzing traffic videos,
|
| detecting vehicles, tracking their movement, counting them, and estimating
|
| their speeds using YOLO object detection and perspective transformation.
|
|
|
| Authors:
|
| - Abhay Gupta (0205CC221005)
|
| - Aditi Lakhera (0205CC221011)
|
| - Balraj Patel (0205CC221049)
|
| - Bhumika Patel (0205CC221050)
|
|
|
| Technical Approach:
|
| - YOLO for real-time object detection
|
| - ByteTrack for multi-object tracking
|
| - Perspective transformation for speed calculation
|
| - Line zones for vehicle counting
|
| """
|
|
|
| import sys
|
| import logging
|
| from pathlib import Path
|
| from typing import Dict, Optional, Callable
|
| from time import time
|
|
|
| import cv2
|
| import numpy as np
|
| import supervision as sv
|
| from ultralytics import YOLO
|
|
|
| from src import FrameAnnotator, VehicleSpeedEstimator, PerspectiveTransformer
|
| from src.exceptions import (
|
| VideoProcessingError,
|
| ModelLoadError,
|
| ConfigurationError
|
| )
|
| from config import VehicleDetectionConfig
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
| )
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class VehicleDetectionPipeline:
|
| """
|
| Main pipeline for vehicle detection, tracking, counting, and speed estimation.
|
|
|
| This class orchestrates the entire processing workflow, from loading the model
|
| to processing each frame and generating the output video.
|
| """
|
|
|
| def __init__(self, config: VehicleDetectionConfig):
|
| """
|
| Initialize the detection pipeline.
|
|
|
| Args:
|
| config: Configuration object with all parameters
|
|
|
| Raises:
|
| ModelLoadError: If model cannot be loaded
|
| ConfigurationError: If configuration is invalid
|
| """
|
| self.config = config
|
| self.model = None
|
| self.tracker = None
|
| self.line_zone = None
|
| self.speed_estimator = None
|
| self.annotator = None
|
| self.video_info = None
|
|
|
| logger.info(f"Initializing pipeline with config: {config}")
|
| self._initialize_components()
|
|
|
| def _initialize_components(self) -> None:
|
| """Initialize all pipeline components."""
|
| try:
|
|
|
| logger.info(f"Loading YOLO model: {self.config.model_path}")
|
| self.model = YOLO(self.config.model_path)
|
| self.model.conf = self.config.confidence_threshold
|
| self.model.iou = self.config.iou_threshold
|
| logger.info("Model loaded successfully")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to load model: {e}")
|
| raise ModelLoadError(f"Could not load YOLO model from {self.config.model_path}: {e}")
|
|
|
| def _setup_video_components(self, video_path: str) -> None:
|
| """
|
| Set up video-specific components.
|
|
|
| Args:
|
| video_path: Path to input video
|
|
|
| Raises:
|
| VideoProcessingError: If video cannot be opened
|
| """
|
| try:
|
|
|
| self.video_info = sv.VideoInfo.from_video_path(video_path)
|
| logger.info(f"Video info: {self.video_info.width}x{self.video_info.height} @ {self.video_info.fps}fps")
|
|
|
|
|
| self.tracker = sv.ByteTrack(
|
| frame_rate=self.video_info.fps,
|
| track_activation_threshold=self.config.confidence_threshold
|
| )
|
| logger.info("Tracker initialized")
|
|
|
|
|
| line_start = sv.Point(
|
| x=self.config.line_offset,
|
| y=self.config.line_y
|
| )
|
| line_end = sv.Point(
|
| x=self.video_info.width - self.config.line_offset,
|
| y=self.config.line_y
|
| )
|
|
|
| self.line_zone = sv.LineZone(
|
| start=line_start,
|
| end=line_end,
|
| triggering_anchors=(sv.Position.BOTTOM_CENTER,)
|
| )
|
| logger.info(f"Line zone created at y={self.config.line_y}")
|
|
|
|
|
| source_pts = np.array(self.config.source_points, dtype=np.float32)
|
| target_pts = np.array(self.config.target_points, dtype=np.float32)
|
|
|
| transformer = PerspectiveTransformer(
|
| source_points=source_pts,
|
| target_points=target_pts
|
| )
|
| logger.info("Perspective transformer initialized")
|
|
|
|
|
| self.speed_estimator = VehicleSpeedEstimator(
|
| fps=self.video_info.fps,
|
| transformer=transformer,
|
| history_duration=self.config.speed_history_seconds,
|
| speed_unit=self.config.speed_unit
|
| )
|
| logger.info("Speed estimator initialized")
|
|
|
|
|
| self.annotator = FrameAnnotator(
|
| video_resolution=(self.video_info.width, self.video_info.height),
|
| show_boxes=self.config.enable_boxes,
|
| show_labels=self.config.enable_labels,
|
| show_traces=self.config.enable_traces,
|
| show_line_zones=self.config.enable_line_zones,
|
| trace_length=self.config.trace_length,
|
| zone_polygon=source_pts
|
| )
|
| logger.info("Frame annotator initialized")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to setup video components: {e}")
|
| raise VideoProcessingError(f"Error setting up video processing: {e}")
|
|
|
| def _process_single_frame(self, frame: np.ndarray) -> tuple:
|
| """
|
| Process a single video frame.
|
|
|
| Args:
|
| frame: Input video frame
|
|
|
| Returns:
|
| Tuple of (annotated_frame, detections)
|
| """
|
|
|
| results = self.model(frame, verbose=False)[0]
|
| detections = sv.Detections.from_ultralytics(results)
|
|
|
|
|
| detections = self.tracker.update_with_detections(detections)
|
|
|
|
|
| self.line_zone.trigger(detections)
|
|
|
|
|
| detections = self.speed_estimator.estimate(detections)
|
|
|
|
|
| labels = self._create_labels(detections)
|
|
|
|
|
| annotated_frame = self.annotator.draw_annotations(
|
| frame=frame,
|
| detections=detections,
|
| labels=labels,
|
| line_zones=[self.line_zone]
|
| )
|
|
|
| return annotated_frame, detections
|
|
|
| def _create_labels(self, detections: sv.Detections) -> list:
|
| """
|
| Create display labels for detected vehicles.
|
|
|
| Args:
|
| detections: Detection results
|
|
|
| Returns:
|
| List of label strings
|
| """
|
| labels = []
|
|
|
| if not hasattr(detections, 'tracker_id') or detections.tracker_id is None:
|
| return labels
|
|
|
| for idx, tracker_id in enumerate(detections.tracker_id):
|
|
|
| class_name = "Vehicle"
|
| if "class_name" in detections.data:
|
| class_name = detections.data["class_name"][idx]
|
|
|
|
|
| speed_text = ""
|
| if "speed" in detections.data:
|
| speed = detections.data["speed"][idx]
|
| if speed > 0:
|
| speed_text = f" {speed:.0f}{self.config.speed_unit}"
|
|
|
|
|
| label = f"{class_name} #{tracker_id}{speed_text}"
|
| labels.append(label)
|
|
|
| return labels
|
|
|
| def process_video(
|
| self,
|
| progress_callback: Optional[Callable[[float], None]] = None
|
| ) -> Dict:
|
| """
|
| Process the entire video.
|
|
|
| Args:
|
| progress_callback: Optional callback for progress updates
|
|
|
| Returns:
|
| Dictionary with processing statistics
|
|
|
| Raises:
|
| VideoProcessingError: If video processing fails
|
| """
|
| start_time = time()
|
|
|
| try:
|
|
|
| if not Path(self.config.input_video).exists():
|
| raise VideoProcessingError(f"Input video not found: {self.config.input_video}")
|
|
|
|
|
| self._setup_video_components(self.config.input_video)
|
|
|
|
|
| output_path = Path(self.config.output_video)
|
| output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| frame_count = 0
|
| total_frames = self.video_info.total_frames or 0
|
| all_speeds = []
|
|
|
|
|
| if self.config.display_enabled:
|
| cv2.namedWindow(self.config.window_name, cv2.WINDOW_NORMAL)
|
| cv2.resizeWindow(
|
| self.config.window_name,
|
| self.video_info.width,
|
| self.video_info.height
|
| )
|
|
|
|
|
| logger.info("Starting video processing...")
|
| frame_generator = sv.get_video_frames_generator(self.config.input_video)
|
|
|
| with sv.VideoSink(self.config.output_video, self.video_info) as sink:
|
| for frame in frame_generator:
|
| try:
|
|
|
| annotated_frame, detections = self._process_single_frame(frame)
|
|
|
|
|
| if "speed" in detections.data:
|
| speeds = detections.data["speed"]
|
| all_speeds.extend([s for s in speeds if s > 0])
|
|
|
|
|
| sink.write_frame(annotated_frame)
|
|
|
|
|
| if self.config.display_enabled:
|
| cv2.imshow(self.config.window_name, annotated_frame)
|
|
|
|
|
| if cv2.waitKey(1) & 0xFF == ord('q'):
|
| logger.info("Processing interrupted by user")
|
| break
|
|
|
|
|
| if cv2.getWindowProperty(
|
| self.config.window_name,
|
| cv2.WND_PROP_VISIBLE
|
| ) < 1:
|
| logger.info("Window closed by user")
|
| break
|
|
|
|
|
| frame_count += 1
|
| if progress_callback and total_frames > 0:
|
| progress = frame_count / total_frames
|
| progress_callback(progress)
|
|
|
| except Exception as e:
|
| logger.warning(f"Error processing frame {frame_count}: {e}")
|
| continue
|
|
|
|
|
| if self.config.display_enabled:
|
| cv2.destroyAllWindows()
|
|
|
|
|
| processing_time = time() - start_time
|
| stats = {
|
| 'total_count': self.line_zone.in_count + self.line_zone.out_count,
|
| 'in_count': self.line_zone.in_count,
|
| 'out_count': self.line_zone.out_count,
|
| 'avg_speed': np.mean(all_speeds) if all_speeds else 0.0,
|
| 'max_speed': np.max(all_speeds) if all_speeds else 0.0,
|
| 'min_speed': np.min(all_speeds) if all_speeds else 0.0,
|
| 'frames_processed': frame_count,
|
| 'processing_time': processing_time,
|
| 'fps': frame_count / processing_time if processing_time > 0 else 0
|
| }
|
|
|
| logger.info(f"Processing complete: {frame_count} frames in {processing_time:.2f}s")
|
| logger.info(f"Vehicles counted: {stats['total_count']} (In: {stats['in_count']}, Out: {stats['out_count']})")
|
|
|
| return stats
|
|
|
| except Exception as e:
|
| logger.error(f"Video processing failed: {e}", exc_info=True)
|
| raise VideoProcessingError(f"Failed to process video: {e}")
|
|
|
|
|
| def process_video(
|
| config: VehicleDetectionConfig,
|
| progress_callback: Optional[Callable[[float], None]] = None
|
| ) -> Dict:
|
| """
|
| Convenience function to process a video with given configuration.
|
|
|
| Args:
|
| config: Configuration object
|
| progress_callback: Optional progress callback
|
|
|
| Returns:
|
| Processing statistics dictionary
|
| """
|
| pipeline = VehicleDetectionPipeline(config)
|
| return pipeline.process_video(progress_callback)
|
|
|
|
|
| def main():
|
| """Main entry point for CLI usage."""
|
| try:
|
| logger.info("=" * 60)
|
| logger.info("Vehicle Speed Estimation & Counting System")
|
| logger.info("=" * 60)
|
|
|
|
|
| config = VehicleDetectionConfig()
|
| logger.info(f"Configuration: {config}")
|
|
|
|
|
| stats = process_video(config)
|
|
|
|
|
| print("\n" + "=" * 60)
|
| print("PROCESSING RESULTS")
|
| print("=" * 60)
|
| print(f"Output saved to: {config.output_video}")
|
| print(f"\nVehicle Count:")
|
| print(f" Total: {stats['total_count']}")
|
| print(f" In: {stats['in_count']}")
|
| print(f" Out: {stats['out_count']}")
|
| print(f"\nSpeed Statistics ({config.speed_unit}):")
|
| print(f" Average: {stats['avg_speed']:.1f}")
|
| print(f" Maximum: {stats['max_speed']:.1f}")
|
| print(f" Minimum: {stats['min_speed']:.1f}")
|
| print(f"\nProcessing Info:")
|
| print(f" Frames: {stats['frames_processed']}")
|
| print(f" Time: {stats['processing_time']:.2f}s")
|
| print(f" FPS: {stats['fps']:.1f}")
|
| print("=" * 60)
|
|
|
| return 0
|
|
|
| except KeyboardInterrupt:
|
| logger.info("Processing interrupted by user")
|
| return 1
|
| except Exception as e:
|
| logger.error(f"Fatal error: {e}", exc_info=True)
|
| print(f"\n❌ Error: {e}", file=sys.stderr)
|
| return 1
|
|
|
|
|
| if __name__ == "__main__":
|
| sys.exit(main())
|
|
|