""" Video Capture Module for AI Processing Captures frames from webcam or RTSP stream and preprocesses them for YOLOv8 Uses motion detection (MOG2) to extract ROIs for faster inference """ import cv2 import numpy as np import platform from typing import Optional, Tuple, Generator, List from dataclasses import dataclass @dataclass class ROI: """Region of Interest containing motion.""" x: int y: int width: int height: int cropped_frame: np.ndarray # Original cropped region preprocessed: np.ndarray # Resized to 640x640 for YOLO class MotionDetector: """Detects motion using MOG2 background subtraction.""" def __init__(self, history: int = 500, var_threshold: float = 16, detect_shadows: bool = True, min_contour_area: int = 500, merge_distance: int = 50): """ Initialize MOG2 background subtractor. Args: history: Number of frames for background model var_threshold: Variance threshold for background/foreground segmentation detect_shadows: Whether to detect shadows (marks them gray vs white) min_contour_area: Minimum area (pixels) to consider as valid motion merge_distance: Distance to merge nearby contours into single ROI """ self.bg_subtractor = cv2.createBackgroundSubtractorMOG2( history=history, varThreshold=var_threshold, detectShadows=detect_shadows ) self.min_contour_area = min_contour_area self.merge_distance = merge_distance self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) def get_foreground_mask(self, frame: np.ndarray) -> np.ndarray: """ Get binary mask of moving objects. Args: frame: Input BGR frame Returns: Binary mask where white = motion """ # Apply background subtraction fg_mask = self.bg_subtractor.apply(frame) # Remove shadows (gray pixels become black) _, fg_mask = cv2.threshold(fg_mask, 250, 255, cv2.THRESH_BINARY) # Morphological operations to clean up noise fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self.kernel) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self.kernel) fg_mask = cv2.dilate(fg_mask, self.kernel, iterations=2) return fg_mask def _merge_bounding_boxes(self, boxes: List[Tuple[int, int, int, int]]) -> List[Tuple[int, int, int, int]]: """Merge nearby bounding boxes to reduce fragmentation.""" if not boxes: return [] merged = [] used = [False] * len(boxes) for i, (x1, y1, w1, h1) in enumerate(boxes): if used[i]: continue # Start with current box min_x, min_y = x1, y1 max_x, max_y = x1 + w1, y1 + h1 used[i] = True # Find and merge nearby boxes changed = True while changed: changed = False for j, (x2, y2, w2, h2) in enumerate(boxes): if used[j]: continue # Check if boxes are close enough to merge if (x2 < max_x + self.merge_distance and x2 + w2 > min_x - self.merge_distance and y2 < max_y + self.merge_distance and y2 + h2 > min_y - self.merge_distance): min_x = min(min_x, x2) min_y = min(min_y, y2) max_x = max(max_x, x2 + w2) max_y = max(max_y, y2 + h2) used[j] = True changed = True merged.append((min_x, min_y, max_x - min_x, max_y - min_y)) return merged def detect_motion_regions(self, frame: np.ndarray, padding: int = 20) -> List[Tuple[int, int, int, int]]: """ Detect regions with motion. Args: frame: Input BGR frame padding: Pixels to add around detected regions Returns: List of bounding boxes (x, y, width, height) """ fg_mask = self.get_foreground_mask(frame) # Find contours of moving objects contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) boxes = [] h, w = frame.shape[:2] for contour in contours: area = cv2.contourArea(contour) if area < self.min_contour_area: continue x, y, bw, bh = cv2.boundingRect(contour) # Add padding and clamp to frame bounds x = max(0, x - padding) y = max(0, y - padding) bw = min(w - x, bw + 2 * padding) bh = min(h - y, bh + 2 * padding) boxes.append((x, y, bw, bh)) # Merge nearby boxes return self._merge_bounding_boxes(boxes) class VideoCapture: """Captures and preprocesses video frames for AI inference.""" # YOLOv8 native input size TARGET_SIZE = (640, 640) def __init__(self, source: int | str = 0, use_motion_detection: bool = True): """ Initialize video capture. Args: source: Camera index (0 for default webcam) or RTSP URL string Example RTSP: "rtsp://username:password@ip_address:port/stream" use_motion_detection: Enable MOG2 motion detection for ROI extraction """ self.source = self._normalize_source(source) self.cap: Optional[cv2.VideoCapture] = None self.use_motion_detection = use_motion_detection self.motion_detector: Optional[MotionDetector] = None self.active_source: Optional[int | str] = None self.active_backend: Optional[int] = None @staticmethod def _normalize_source(source: int | str) -> int | str: """Normalize source values so numeric strings map to camera indices.""" if isinstance(source, str) and source.strip().isdigit(): return int(source.strip()) return source @staticmethod def _backend_name(backend: int) -> str: """Get a readable backend name for diagnostics.""" names = { cv2.CAP_ANY: "CAP_ANY", cv2.CAP_DSHOW: "CAP_DSHOW", cv2.CAP_MSMF: "CAP_MSMF", } return names.get(backend, str(backend)) def _source_candidates(self) -> List[int | str]: """Return source candidates to try opening in order.""" if isinstance(self.source, int): candidates = [self.source] if self.source == 0: candidates.extend([1, 2]) return candidates return [self.source] def _backend_candidates(self) -> List[int]: """Return backend candidates based on platform and source type.""" if isinstance(self.source, str): return [cv2.CAP_ANY] if platform.system().lower().startswith("win"): return [cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY] return [cv2.CAP_ANY] def start(self, verbose: bool = True) -> bool: """ Start the video capture. Returns: True if capture started successfully, False otherwise """ self.stop() open_attempts = [] for source_candidate in self._source_candidates(): for backend in self._backend_candidates(): cap = cv2.VideoCapture(source_candidate, backend) open_attempts.append(f"{source_candidate} via {self._backend_name(backend)}") if cap.isOpened(): self.cap = cap self.active_source = source_candidate self.active_backend = backend break cap.release() if self.cap is not None: break if self.cap is None: if verbose: print(f"Error: Could not open video source: {self.source}") print("Tried:") for attempt in open_attempts: print(f" - {attempt}") return False # Set buffer size to minimize latency (useful for RTSP streams) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Initialize motion detector if enabled if self.use_motion_detection: self.motion_detector = MotionDetector() if verbose: print("Motion detection enabled (MOG2)") # Print capture info width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = self.cap.get(cv2.CAP_PROP_FPS) if verbose: backend_name = self._backend_name(self.active_backend) if self.active_backend is not None else "Unknown" print(f"Video capture started: source={self.active_source}, backend={backend_name}") print(f"Resolution: {width}x{height} @ {fps:.1f} FPS") return True def stop(self): """Release the video capture resources.""" if self.cap is not None: self.cap.release() self.cap = None self.active_source = None self.active_backend = None print("Video capture stopped") def preprocess_frame(self, frame: np.ndarray) -> np.ndarray: """ Preprocess frame for YOLOv8 inference. Args: frame: Raw BGR frame from camera Returns: Preprocessed frame resized to 640x640 """ # Resize to YOLOv8 native size (640x640) resized = cv2.resize(frame, self.TARGET_SIZE, interpolation=cv2.INTER_LINEAR) return resized def read_frame(self) -> Tuple[bool, Optional[np.ndarray], Optional[np.ndarray]]: """ Read and preprocess a single frame. Returns: Tuple of (success, original_frame, preprocessed_frame) """ if self.cap is None: return False, None, None ret, frame = self.cap.read() if not ret or frame is None: return False, None, None preprocessed = self.preprocess_frame(frame) return True, frame, preprocessed def extract_rois(self, frame: np.ndarray) -> List[ROI]: """ Extract regions of interest (moving objects) from frame. Args: frame: Input BGR frame Returns: List of ROI objects containing cropped and preprocessed regions """ if self.motion_detector is None: # If no motion detection, return whole frame as single ROI preprocessed = self.preprocess_frame(frame) return [ROI(0, 0, frame.shape[1], frame.shape[0], frame, preprocessed)] boxes = self.motion_detector.detect_motion_regions(frame) if not boxes: return [] rois = [] for x, y, w, h in boxes: cropped = frame[y:y+h, x:x+w] preprocessed = cv2.resize(cropped, self.TARGET_SIZE, interpolation=cv2.INTER_LINEAR) rois.append(ROI(x, y, w, h, cropped, preprocessed)) return rois def read_frame_with_rois(self) -> Tuple[bool, Optional[np.ndarray], List[ROI], Optional[np.ndarray]]: """ Read frame and extract ROIs for motion regions. Returns: Tuple of (success, original_frame, list_of_rois, foreground_mask) """ if self.cap is None: return False, None, [], None ret, frame = self.cap.read() if not ret or frame is None: return False, None, [], None rois = self.extract_rois(frame) # Get foreground mask for visualization fg_mask = None if self.motion_detector is not None: fg_mask = self.motion_detector.get_foreground_mask(frame) return True, frame, rois, fg_mask def stream_frames(self) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]: """ Generator that continuously yields frames (no motion detection). Yields: Tuple of (original_frame, preprocessed_frame) """ while True: success, original, preprocessed = self.read_frame() if not success: break yield original, preprocessed def stream_rois(self) -> Generator[Tuple[np.ndarray, List[ROI], Optional[np.ndarray]], None, None]: """ Generator that yields frames with motion-detected ROIs. Yields: Tuple of (original_frame, list_of_rois, foreground_mask) """ while True: success, original, rois, fg_mask = self.read_frame_with_rois() if not success: break yield original, rois, fg_mask def __enter__(self): """Context manager entry.""" if not self.start(): raise RuntimeError(f"Could not open video source: {self.source}") return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.stop() def main(): """Demo: Capture frames with motion detection from webcam.""" # Use 0 for default webcam, or provide RTSP URL for IP camera # Example RTSP: "rtsp://admin:password@192.168.1.100:554/stream1" source = 0 with VideoCapture(source, use_motion_detection=True) as capture: print("Press 'q' to quit") print("Motion detection active - only moving regions will be processed") for original, rois, fg_mask in capture.stream_rois(): # Draw bounding boxes around motion regions display_frame = original.copy() for i, roi in enumerate(rois): # Draw green rectangle around ROI cv2.rectangle(display_frame, (roi.x, roi.y), (roi.x + roi.width, roi.y + roi.height), (0, 255, 0), 2) # Label with ROI index and size label = f"ROI {i+1}: {roi.width}x{roi.height}" cv2.putText(display_frame, label, (roi.x, roi.y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) # Show info info = f"Motion ROIs: {len(rois)} | Press 'q' to quit" cv2.putText(display_frame, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.imshow("Video Capture - Motion Detection", display_frame) # Show foreground mask if fg_mask is not None: cv2.imshow("Foreground Mask", fg_mask) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyAllWindows() print("Done!") if __name__ == "__main__": main()