|
|
| """
|
| Video Capture Module for AI Processing
|
| Captures frames from webcam or RTSP stream and preprocesses them for YOLOv8
|
| Uses motion detection (MOG2) to extract ROIs for faster inference
|
| """
|
|
|
| import cv2
|
| import numpy as np
|
| import platform
|
| from typing import Optional, Tuple, Generator, List
|
| from dataclasses import dataclass
|
|
|
|
|
| @dataclass
|
| class ROI:
|
| """Region of Interest containing motion."""
|
| x: int
|
| y: int
|
| width: int
|
| height: int
|
| cropped_frame: np.ndarray
|
| preprocessed: np.ndarray
|
|
|
|
|
| class MotionDetector:
|
| """Detects motion using MOG2 background subtraction."""
|
|
|
| def __init__(self,
|
| history: int = 500,
|
| var_threshold: float = 16,
|
| detect_shadows: bool = True,
|
| min_contour_area: int = 500,
|
| merge_distance: int = 50):
|
| """
|
| Initialize MOG2 background subtractor.
|
|
|
| Args:
|
| history: Number of frames for background model
|
| var_threshold: Variance threshold for background/foreground segmentation
|
| detect_shadows: Whether to detect shadows (marks them gray vs white)
|
| min_contour_area: Minimum area (pixels) to consider as valid motion
|
| merge_distance: Distance to merge nearby contours into single ROI
|
| """
|
| self.bg_subtractor = cv2.createBackgroundSubtractorMOG2(
|
| history=history,
|
| varThreshold=var_threshold,
|
| detectShadows=detect_shadows
|
| )
|
| self.min_contour_area = min_contour_area
|
| self.merge_distance = merge_distance
|
| self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
|
|
| def get_foreground_mask(self, frame: np.ndarray) -> np.ndarray:
|
| """
|
| Get binary mask of moving objects.
|
|
|
| Args:
|
| frame: Input BGR frame
|
|
|
| Returns:
|
| Binary mask where white = motion
|
| """
|
|
|
| fg_mask = self.bg_subtractor.apply(frame)
|
|
|
|
|
| _, fg_mask = cv2.threshold(fg_mask, 250, 255, cv2.THRESH_BINARY)
|
|
|
|
|
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self.kernel)
|
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self.kernel)
|
| fg_mask = cv2.dilate(fg_mask, self.kernel, iterations=2)
|
|
|
| return fg_mask
|
|
|
| def _merge_bounding_boxes(self, boxes: List[Tuple[int, int, int, int]]) -> List[Tuple[int, int, int, int]]:
|
| """Merge nearby bounding boxes to reduce fragmentation."""
|
| if not boxes:
|
| return []
|
|
|
| merged = []
|
| used = [False] * len(boxes)
|
|
|
| for i, (x1, y1, w1, h1) in enumerate(boxes):
|
| if used[i]:
|
| continue
|
|
|
|
|
| min_x, min_y = x1, y1
|
| max_x, max_y = x1 + w1, y1 + h1
|
| used[i] = True
|
|
|
|
|
| changed = True
|
| while changed:
|
| changed = False
|
| for j, (x2, y2, w2, h2) in enumerate(boxes):
|
| if used[j]:
|
| continue
|
|
|
|
|
| if (x2 < max_x + self.merge_distance and
|
| x2 + w2 > min_x - self.merge_distance and
|
| y2 < max_y + self.merge_distance and
|
| y2 + h2 > min_y - self.merge_distance):
|
|
|
| min_x = min(min_x, x2)
|
| min_y = min(min_y, y2)
|
| max_x = max(max_x, x2 + w2)
|
| max_y = max(max_y, y2 + h2)
|
| used[j] = True
|
| changed = True
|
|
|
| merged.append((min_x, min_y, max_x - min_x, max_y - min_y))
|
|
|
| return merged
|
|
|
| def detect_motion_regions(self, frame: np.ndarray,
|
| padding: int = 20) -> List[Tuple[int, int, int, int]]:
|
| """
|
| Detect regions with motion.
|
|
|
| Args:
|
| frame: Input BGR frame
|
| padding: Pixels to add around detected regions
|
|
|
| Returns:
|
| List of bounding boxes (x, y, width, height)
|
| """
|
| fg_mask = self.get_foreground_mask(frame)
|
|
|
|
|
| contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
| boxes = []
|
| h, w = frame.shape[:2]
|
|
|
| for contour in contours:
|
| area = cv2.contourArea(contour)
|
| if area < self.min_contour_area:
|
| continue
|
|
|
| x, y, bw, bh = cv2.boundingRect(contour)
|
|
|
|
|
| x = max(0, x - padding)
|
| y = max(0, y - padding)
|
| bw = min(w - x, bw + 2 * padding)
|
| bh = min(h - y, bh + 2 * padding)
|
|
|
| boxes.append((x, y, bw, bh))
|
|
|
|
|
| return self._merge_bounding_boxes(boxes)
|
|
|
|
|
| class VideoCapture:
|
| """Captures and preprocesses video frames for AI inference."""
|
|
|
|
|
| TARGET_SIZE = (640, 640)
|
|
|
| def __init__(self, source: int | str = 0, use_motion_detection: bool = True):
|
| """
|
| Initialize video capture.
|
|
|
| Args:
|
| source: Camera index (0 for default webcam) or RTSP URL string
|
| Example RTSP: "rtsp://username:password@ip_address:port/stream"
|
| use_motion_detection: Enable MOG2 motion detection for ROI extraction
|
| """
|
| self.source = self._normalize_source(source)
|
| self.cap: Optional[cv2.VideoCapture] = None
|
| self.use_motion_detection = use_motion_detection
|
| self.motion_detector: Optional[MotionDetector] = None
|
| self.active_source: Optional[int | str] = None
|
| self.active_backend: Optional[int] = None
|
|
|
| @staticmethod
|
| def _normalize_source(source: int | str) -> int | str:
|
| """Normalize source values so numeric strings map to camera indices."""
|
| if isinstance(source, str) and source.strip().isdigit():
|
| return int(source.strip())
|
| return source
|
|
|
| @staticmethod
|
| def _backend_name(backend: int) -> str:
|
| """Get a readable backend name for diagnostics."""
|
| names = {
|
| cv2.CAP_ANY: "CAP_ANY",
|
| cv2.CAP_DSHOW: "CAP_DSHOW",
|
| cv2.CAP_MSMF: "CAP_MSMF",
|
| }
|
| return names.get(backend, str(backend))
|
|
|
| def _source_candidates(self) -> List[int | str]:
|
| """Return source candidates to try opening in order."""
|
| if isinstance(self.source, int):
|
| candidates = [self.source]
|
| if self.source == 0:
|
| candidates.extend([1, 2])
|
| return candidates
|
| return [self.source]
|
|
|
| def _backend_candidates(self) -> List[int]:
|
| """Return backend candidates based on platform and source type."""
|
| if isinstance(self.source, str):
|
| return [cv2.CAP_ANY]
|
|
|
| if platform.system().lower().startswith("win"):
|
| return [cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY]
|
|
|
| return [cv2.CAP_ANY]
|
|
|
| def start(self, verbose: bool = True) -> bool:
|
| """
|
| Start the video capture.
|
|
|
| Returns:
|
| True if capture started successfully, False otherwise
|
| """
|
| self.stop()
|
|
|
| open_attempts = []
|
| for source_candidate in self._source_candidates():
|
| for backend in self._backend_candidates():
|
| cap = cv2.VideoCapture(source_candidate, backend)
|
| open_attempts.append(f"{source_candidate} via {self._backend_name(backend)}")
|
|
|
| if cap.isOpened():
|
| self.cap = cap
|
| self.active_source = source_candidate
|
| self.active_backend = backend
|
| break
|
|
|
| cap.release()
|
|
|
| if self.cap is not None:
|
| break
|
|
|
| if self.cap is None:
|
| if verbose:
|
| print(f"Error: Could not open video source: {self.source}")
|
| print("Tried:")
|
| for attempt in open_attempts:
|
| print(f" - {attempt}")
|
| return False
|
|
|
|
|
| self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
|
| if self.use_motion_detection:
|
| self.motion_detector = MotionDetector()
|
| if verbose:
|
| print("Motion detection enabled (MOG2)")
|
|
|
|
|
| width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| fps = self.cap.get(cv2.CAP_PROP_FPS)
|
| if verbose:
|
| backend_name = self._backend_name(self.active_backend) if self.active_backend is not None else "Unknown"
|
| print(f"Video capture started: source={self.active_source}, backend={backend_name}")
|
| print(f"Resolution: {width}x{height} @ {fps:.1f} FPS")
|
|
|
| return True
|
|
|
| def stop(self):
|
| """Release the video capture resources."""
|
| if self.cap is not None:
|
| self.cap.release()
|
| self.cap = None
|
| self.active_source = None
|
| self.active_backend = None
|
| print("Video capture stopped")
|
|
|
| def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
|
| """
|
| Preprocess frame for YOLOv8 inference.
|
|
|
| Args:
|
| frame: Raw BGR frame from camera
|
|
|
| Returns:
|
| Preprocessed frame resized to 640x640
|
| """
|
|
|
| resized = cv2.resize(frame, self.TARGET_SIZE, interpolation=cv2.INTER_LINEAR)
|
| return resized
|
|
|
| def read_frame(self) -> Tuple[bool, Optional[np.ndarray], Optional[np.ndarray]]:
|
| """
|
| Read and preprocess a single frame.
|
|
|
| Returns:
|
| Tuple of (success, original_frame, preprocessed_frame)
|
| """
|
| if self.cap is None:
|
| return False, None, None
|
|
|
| ret, frame = self.cap.read()
|
|
|
| if not ret or frame is None:
|
| return False, None, None
|
|
|
| preprocessed = self.preprocess_frame(frame)
|
| return True, frame, preprocessed
|
|
|
| def extract_rois(self, frame: np.ndarray) -> List[ROI]:
|
| """
|
| Extract regions of interest (moving objects) from frame.
|
|
|
| Args:
|
| frame: Input BGR frame
|
|
|
| Returns:
|
| List of ROI objects containing cropped and preprocessed regions
|
| """
|
| if self.motion_detector is None:
|
|
|
| preprocessed = self.preprocess_frame(frame)
|
| return [ROI(0, 0, frame.shape[1], frame.shape[0], frame, preprocessed)]
|
|
|
| boxes = self.motion_detector.detect_motion_regions(frame)
|
|
|
| if not boxes:
|
| return []
|
|
|
| rois = []
|
| for x, y, w, h in boxes:
|
| cropped = frame[y:y+h, x:x+w]
|
| preprocessed = cv2.resize(cropped, self.TARGET_SIZE, interpolation=cv2.INTER_LINEAR)
|
| rois.append(ROI(x, y, w, h, cropped, preprocessed))
|
|
|
| return rois
|
|
|
| def read_frame_with_rois(self) -> Tuple[bool, Optional[np.ndarray], List[ROI], Optional[np.ndarray]]:
|
| """
|
| Read frame and extract ROIs for motion regions.
|
|
|
| Returns:
|
| Tuple of (success, original_frame, list_of_rois, foreground_mask)
|
| """
|
| if self.cap is None:
|
| return False, None, [], None
|
|
|
| ret, frame = self.cap.read()
|
|
|
| if not ret or frame is None:
|
| return False, None, [], None
|
|
|
| rois = self.extract_rois(frame)
|
|
|
|
|
| fg_mask = None
|
| if self.motion_detector is not None:
|
| fg_mask = self.motion_detector.get_foreground_mask(frame)
|
|
|
| return True, frame, rois, fg_mask
|
|
|
| def stream_frames(self) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]:
|
| """
|
| Generator that continuously yields frames (no motion detection).
|
|
|
| Yields:
|
| Tuple of (original_frame, preprocessed_frame)
|
| """
|
| while True:
|
| success, original, preprocessed = self.read_frame()
|
| if not success:
|
| break
|
| yield original, preprocessed
|
|
|
| def stream_rois(self) -> Generator[Tuple[np.ndarray, List[ROI], Optional[np.ndarray]], None, None]:
|
| """
|
| Generator that yields frames with motion-detected ROIs.
|
|
|
| Yields:
|
| Tuple of (original_frame, list_of_rois, foreground_mask)
|
| """
|
| while True:
|
| success, original, rois, fg_mask = self.read_frame_with_rois()
|
| if not success:
|
| break
|
| yield original, rois, fg_mask
|
|
|
| def __enter__(self):
|
| """Context manager entry."""
|
| if not self.start():
|
| raise RuntimeError(f"Could not open video source: {self.source}")
|
| return self
|
|
|
| def __exit__(self, exc_type, exc_val, exc_tb):
|
| """Context manager exit."""
|
| self.stop()
|
|
|
|
|
| def main():
|
| """Demo: Capture frames with motion detection from webcam."""
|
|
|
|
|
|
|
| source = 0
|
|
|
| with VideoCapture(source, use_motion_detection=True) as capture:
|
| print("Press 'q' to quit")
|
| print("Motion detection active - only moving regions will be processed")
|
|
|
| for original, rois, fg_mask in capture.stream_rois():
|
|
|
| display_frame = original.copy()
|
|
|
| for i, roi in enumerate(rois):
|
|
|
| cv2.rectangle(display_frame,
|
| (roi.x, roi.y),
|
| (roi.x + roi.width, roi.y + roi.height),
|
| (0, 255, 0), 2)
|
|
|
|
|
| label = f"ROI {i+1}: {roi.width}x{roi.height}"
|
| cv2.putText(display_frame, label, (roi.x, roi.y - 10),
|
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
|
|
|
|
|
| info = f"Motion ROIs: {len(rois)} | Press 'q' to quit"
|
| cv2.putText(display_frame, info, (10, 30),
|
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
|
|
| cv2.imshow("Video Capture - Motion Detection", display_frame)
|
|
|
|
|
| if fg_mask is not None:
|
| cv2.imshow("Foreground Mask", fg_mask)
|
|
|
| if cv2.waitKey(1) & 0xFF == ord('q'):
|
| break
|
|
|
| cv2.destroyAllWindows()
|
| print("Done!")
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|