""" YOLOv8 Crowd Detection Module - Enhanced for Small Objects Implements REQ-1, REQ-2, REQ-3: Person detection with bounding boxes and counting """ import cv2 import numpy as np from ultralytics import YOLO import time from typing import List, Tuple, Dict import logging logger = logging.getLogger(__name__) class CrowdDetector: """ Main crowd detection class using YOLOv8 - Enhanced for small objects Satisfies SRS Requirements: - REQ-1: Detect individuals in video frames using pre-trained model - REQ-2: Display bounding boxes around detected individuals - REQ-3: Update count continuously as frames are processed """ def __init__(self, config: Dict): """ Initialize the YOLOv8 detector Args: config: Configuration dictionary from config.yaml """ self.config = config self.model_name = config['model']['name'] self.confidence_threshold = config['model']['confidence_threshold'] self.iou_threshold = config['model']['iou_threshold'] self.device = config['model']['device'] self.class_filter = config['model']['class_filter'] self.min_size = config['crowd']['min_detection_size'] # Optimization parameters self.small_object_mode = config['model'].get('small_object_mode', True) self.imgsz = 416 # Lower resolution for faster TensorRT inference # Dynamic mode parameters (can be updated via API) self.max_det = 300 # Default max detections self.second_pass_conf = 0.05 # Default second pass confidence self.duplicate_threshold = 30 # Default duplicate detection threshold self.min_box_size = 5 # Default minimum box size # Performance tracking from collections import deque self.frame_times = deque(maxlen=30) # Keep last 30 frame times self.detection_count = 0 self.frame_count = 0 # Track frames for logging throttle logger.info(f"Initializing YOLOv8 Detector with model: {self.model_name}") logger.info(f"Device: {self.device}, Confidence: {self.confidence_threshold}") logger.info(f"Small object mode: {self.small_object_mode}") # Check for TensorRT optimized model first tensorrt_model = self.model_name.replace('.pt', '.engine') use_tensorrt = False try: import os if os.path.exists(tensorrt_model): logger.info(f"Loading TensorRT optimized model: {tensorrt_model}") self.model = YOLO(tensorrt_model) use_tensorrt = True else: logger.info(f"Loading PyTorch model: {self.model_name}") logger.info(f"TIP: Export to TensorRT for 2-3x speedup: yolo export model={self.model_name} format=engine half=True device=0") self.model = YOLO(self.model_name) self.model.to(self.device) use_tensorrt = False logger.info("YOLOv8 model loaded successfully") logger.info(f"*** USING {'TensorRT ENGINE' if use_tensorrt else 'PyTorch FP16'} for inference ***") # GPU Warmup - run dummy inference to compile CUDA kernels logger.info("Warming up GPU (this may take a few seconds)...") dummy_frame = np.zeros((416, 416, 3), dtype=np.uint8) for _ in range(5): # Run 5 warmup passes for better optimization self.model(dummy_frame, conf=0.5, verbose=False, device=self.device, half=(self.device != "cpu"), imgsz=self.imgsz) logger.info("GPU warmup complete - ready for fast inference") except Exception as e: logger.error(f"Failed to load YOLOv8 model: {e}") raise def preprocess_frame(self, frame: np.ndarray) -> np.ndarray: """ Light preprocessing - only applied if needed Returns frame as-is for speed (YOLO handles normalization) """ return frame # Skip preprocessing for speed def detect_additional_pass(self, frame: np.ndarray, existing_detections: List[Dict]) -> List[Dict]: """ Additional detection pass with very low confidence for missed small objects """ try: # Second pass with lower confidence threshold for small/distant objects results = self.model( frame, conf=self.second_pass_conf, # Use dynamic second pass confidence iou=self.iou_threshold, classes=self.class_filter, verbose=False, imgsz=self.imgsz, device=self.device, half=(self.device != "cpu") ) additional_detections = [] existing_centers = [(d['center'][0], d['center'][1]) for d in existing_detections] for result in results: boxes = result.boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = float(box.conf[0].cpu().numpy()) class_id = int(box.cls[0].cpu().numpy()) width = x2 - x1 height = y2 - y1 center_x = int((x1 + x2) / 2) center_y = int((y1 + y2) / 2) # Filter out very small noise using dynamic min_box_size if width < self.min_box_size or height < self.min_box_size: continue # Check if this is a duplicate (near existing detection) is_duplicate = False for ex, ey in existing_centers: distance = ((center_x - ex)**2 + (center_y - ey)**2)**0.5 if distance < self.duplicate_threshold: # Use dynamic threshold is_duplicate = True break if not is_duplicate: detection = { 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'confidence': confidence, 'class_id': class_id, 'class_name': 'person', 'center': [center_x, center_y], 'size': 'tiny' if (width < 10 or height < 10) else ('small' if (width < 50 or height < 50) else 'normal') } additional_detections.append(detection) return additional_detections except Exception as e: logger.error(f"Additional detection pass error: {e}") return [] def detect(self, frame: np.ndarray, resize_factor: float = 1.0, confidence_threshold: float = None) -> Tuple[List[Dict], int, float]: """ Detect people in the frame using YOLOv8 with TensorRT Args: frame: Input frame (BGR format from OpenCV) resize_factor: Ignored - always uses full resolution for best accuracy confidence_threshold: Optional override for detection threshold Returns: detections: List of all detected people (primary + second pass) count: Total number of people detected processing_time: Time taken for detection """ start_time = time.time() detections = [] if confidence_threshold is None: confidence_threshold = self.confidence_threshold try: # Primary detection with CUDA using configured device results = self.model( frame, conf=confidence_threshold, iou=self.iou_threshold, classes=self.class_filter, verbose=False, imgsz=self.imgsz, device=self.device, half=(self.device != "cpu"), # FP16 inference for 2x speedup on RTX 3050 max_det=self.max_det, # Use dynamic max detections agnostic_nms=False, retina_masks=False # Disable for speed ) # Extract primary detections for result in results: boxes = result.boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = float(box.conf[0].cpu().numpy()) class_id = int(box.cls[0].cpu().numpy()) width = x2 - x1 height = y2 - y1 # Filter by minimum size to remove noise if width >= self.min_size and height >= self.min_size: detection = { 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'confidence': confidence, 'class_id': class_id, 'class_name': 'person', 'center': [int((x1 + x2) / 2), int((y1 + y2) / 2)], 'size': 'tiny' if (width < 10 or height < 10) else ('small' if (width < 50 or height < 50) else 'normal') } detections.append(detection) # Second pass for better small object detection if self.small_object_mode: additional = self.detect_additional_pass(frame, detections) detections.extend(additional) processing_time = time.time() - start_time self.frame_times.append(processing_time) self.detection_count += len(detections) self.frame_count += 1 count = len(detections) # Log detection count occasionally (every 30 frames) to avoid log spam if self.frame_count % 30 == 0 or count > 0: logger.debug(f"Detected {count} people in {processing_time:.3f}s (frame {self.frame_count})") return detections, count, processing_time except Exception as e: logger.error(f"Detection error: {e}") import traceback logger.error(traceback.format_exc()) return [], 0, 0.0 def _calculate_iou(self, box1: List[int], box2: List[int]) -> float: """ Calculate Intersection over Union for two bounding boxes with edge case handling Args: box1: [x1, y1, x2, y2] box2: [x1, y1, x2, y2] Returns: IoU value between 0.0 and 1.0 """ # Calculate intersection area x1_inter = max(box1[0], box2[0]) y1_inter = max(box1[1], box2[1]) x2_inter = min(box1[2], box2[2]) y2_inter = min(box1[3], box2[3]) inter_area = max(0, x2_inter - x1_inter) * max(0, y2_inter - y1_inter) # Calculate union area box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) # Handle edge cases if box1_area <= 0 or box2_area <= 0: return 0.0 union_area = box1_area + box2_area - inter_area # Avoid division by zero if union_area <= 0: return 0.0 return inter_area / union_area def draw_detections(self, frame: np.ndarray, detections: List[Dict], show_confidence: bool = True) -> np.ndarray: """ Draw bounding boxes with high visibility for crowd detection Color-coded by confidence level Args: frame: Input frame detections: List of detections show_confidence: Whether to display confidence scores Returns: frame: Frame with drawn bounding boxes """ frame_copy = frame.copy() for i, det in enumerate(detections): x1, y1, x2, y2 = det['bbox'] confidence = det['confidence'] is_small = det.get('size') == 'small' # Color by confidence: Green (high) -> Yellow (medium) -> Orange (low) if confidence >= 0.5: color = (0, 255, 0) # Green - high confidence elif confidence >= 0.25: color = (0, 255, 255) # Yellow - medium elif confidence >= 0.15: color = (0, 165, 255) # Orange - lower else: color = (0, 128, 255) # Light orange - very low thickness = 1 if is_small else 2 # Draw bounding box cv2.rectangle(frame_copy, (x1, y1), (x2, y2), color, thickness) # Draw center dot for all detections center_x, center_y = det['center'] cv2.circle(frame_copy, (center_x, center_y), 2, color, -1) # Draw prominent count display in top-left corner count = len(detections) count_text = f"PEOPLE: {count}" # Background box for count (text_w, text_h), _ = cv2.getTextSize(count_text, cv2.FONT_HERSHEY_SIMPLEX, 1.0, 2) cv2.rectangle(frame_copy, (5, 5), (text_w + 15, text_h + 15), (0, 0, 0), -1) cv2.rectangle(frame_copy, (5, 5), (text_w + 15, text_h + 15), (0, 255, 0), 2) # Count text cv2.putText(frame_copy, count_text, (10, text_h + 8), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2) return frame_copy def get_statistics(self) -> Dict: """ Get detection statistics Returns: stats: Dictionary with performance metrics """ if not self.frame_times: return { 'avg_processing_time': 0.0, 'fps': 0.0, 'total_detections': 0, 'frames_processed': 0 } avg_time = np.mean(self.frame_times[-100:]) # Last 100 frames fps = 1.0 / avg_time if avg_time > 0 else 0.0 return { 'avg_processing_time': avg_time, 'fps': fps, 'total_detections': self.detection_count, 'frames_processed': len(self.frame_times), 'max_processing_time': max(self.frame_times) if self.frame_times else 0.0, 'min_processing_time': min(self.frame_times) if self.frame_times else 0.0 } def reset_statistics(self): """Reset detection statistics""" from collections import deque self.frame_times = deque(maxlen=30) self.detection_count = 0 self.frame_count = 0 logger.info("Detection statistics reset")