itsluckysharma01's picture
Upload 22 files
0a84654 verified
"""
Video Capture Module for AI Processing
Captures frames from webcam or RTSP stream and preprocesses them for YOLOv8
Uses motion detection (MOG2) to extract ROIs for faster inference
"""
import cv2
import numpy as np
import platform
from typing import Optional, Tuple, Generator, List
from dataclasses import dataclass
@dataclass
class ROI:
"""Region of Interest containing motion."""
x: int
y: int
width: int
height: int
cropped_frame: np.ndarray # Original cropped region
preprocessed: np.ndarray # Resized to 640x640 for YOLO
class MotionDetector:
"""Detects motion using MOG2 background subtraction."""
def __init__(self,
history: int = 500,
var_threshold: float = 16,
detect_shadows: bool = True,
min_contour_area: int = 500,
merge_distance: int = 50):
"""
Initialize MOG2 background subtractor.
Args:
history: Number of frames for background model
var_threshold: Variance threshold for background/foreground segmentation
detect_shadows: Whether to detect shadows (marks them gray vs white)
min_contour_area: Minimum area (pixels) to consider as valid motion
merge_distance: Distance to merge nearby contours into single ROI
"""
self.bg_subtractor = cv2.createBackgroundSubtractorMOG2(
history=history,
varThreshold=var_threshold,
detectShadows=detect_shadows
)
self.min_contour_area = min_contour_area
self.merge_distance = merge_distance
self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
def get_foreground_mask(self, frame: np.ndarray) -> np.ndarray:
"""
Get binary mask of moving objects.
Args:
frame: Input BGR frame
Returns:
Binary mask where white = motion
"""
# Apply background subtraction
fg_mask = self.bg_subtractor.apply(frame)
# Remove shadows (gray pixels become black)
_, fg_mask = cv2.threshold(fg_mask, 250, 255, cv2.THRESH_BINARY)
# Morphological operations to clean up noise
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self.kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self.kernel)
fg_mask = cv2.dilate(fg_mask, self.kernel, iterations=2)
return fg_mask
def _merge_bounding_boxes(self, boxes: List[Tuple[int, int, int, int]]) -> List[Tuple[int, int, int, int]]:
"""Merge nearby bounding boxes to reduce fragmentation."""
if not boxes:
return []
merged = []
used = [False] * len(boxes)
for i, (x1, y1, w1, h1) in enumerate(boxes):
if used[i]:
continue
# Start with current box
min_x, min_y = x1, y1
max_x, max_y = x1 + w1, y1 + h1
used[i] = True
# Find and merge nearby boxes
changed = True
while changed:
changed = False
for j, (x2, y2, w2, h2) in enumerate(boxes):
if used[j]:
continue
# Check if boxes are close enough to merge
if (x2 < max_x + self.merge_distance and
x2 + w2 > min_x - self.merge_distance and
y2 < max_y + self.merge_distance and
y2 + h2 > min_y - self.merge_distance):
min_x = min(min_x, x2)
min_y = min(min_y, y2)
max_x = max(max_x, x2 + w2)
max_y = max(max_y, y2 + h2)
used[j] = True
changed = True
merged.append((min_x, min_y, max_x - min_x, max_y - min_y))
return merged
def detect_motion_regions(self, frame: np.ndarray,
padding: int = 20) -> List[Tuple[int, int, int, int]]:
"""
Detect regions with motion.
Args:
frame: Input BGR frame
padding: Pixels to add around detected regions
Returns:
List of bounding boxes (x, y, width, height)
"""
fg_mask = self.get_foreground_mask(frame)
# Find contours of moving objects
contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
boxes = []
h, w = frame.shape[:2]
for contour in contours:
area = cv2.contourArea(contour)
if area < self.min_contour_area:
continue
x, y, bw, bh = cv2.boundingRect(contour)
# Add padding and clamp to frame bounds
x = max(0, x - padding)
y = max(0, y - padding)
bw = min(w - x, bw + 2 * padding)
bh = min(h - y, bh + 2 * padding)
boxes.append((x, y, bw, bh))
# Merge nearby boxes
return self._merge_bounding_boxes(boxes)
class VideoCapture:
"""Captures and preprocesses video frames for AI inference."""
# YOLOv8 native input size
TARGET_SIZE = (640, 640)
def __init__(self, source: int | str = 0, use_motion_detection: bool = True):
"""
Initialize video capture.
Args:
source: Camera index (0 for default webcam) or RTSP URL string
Example RTSP: "rtsp://username:password@ip_address:port/stream"
use_motion_detection: Enable MOG2 motion detection for ROI extraction
"""
self.source = self._normalize_source(source)
self.cap: Optional[cv2.VideoCapture] = None
self.use_motion_detection = use_motion_detection
self.motion_detector: Optional[MotionDetector] = None
self.active_source: Optional[int | str] = None
self.active_backend: Optional[int] = None
@staticmethod
def _normalize_source(source: int | str) -> int | str:
"""Normalize source values so numeric strings map to camera indices."""
if isinstance(source, str) and source.strip().isdigit():
return int(source.strip())
return source
@staticmethod
def _backend_name(backend: int) -> str:
"""Get a readable backend name for diagnostics."""
names = {
cv2.CAP_ANY: "CAP_ANY",
cv2.CAP_DSHOW: "CAP_DSHOW",
cv2.CAP_MSMF: "CAP_MSMF",
}
return names.get(backend, str(backend))
def _source_candidates(self) -> List[int | str]:
"""Return source candidates to try opening in order."""
if isinstance(self.source, int):
candidates = [self.source]
if self.source == 0:
candidates.extend([1, 2])
return candidates
return [self.source]
def _backend_candidates(self) -> List[int]:
"""Return backend candidates based on platform and source type."""
if isinstance(self.source, str):
return [cv2.CAP_ANY]
if platform.system().lower().startswith("win"):
return [cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY]
return [cv2.CAP_ANY]
def start(self, verbose: bool = True) -> bool:
"""
Start the video capture.
Returns:
True if capture started successfully, False otherwise
"""
self.stop()
open_attempts = []
for source_candidate in self._source_candidates():
for backend in self._backend_candidates():
cap = cv2.VideoCapture(source_candidate, backend)
open_attempts.append(f"{source_candidate} via {self._backend_name(backend)}")
if cap.isOpened():
self.cap = cap
self.active_source = source_candidate
self.active_backend = backend
break
cap.release()
if self.cap is not None:
break
if self.cap is None:
if verbose:
print(f"Error: Could not open video source: {self.source}")
print("Tried:")
for attempt in open_attempts:
print(f" - {attempt}")
return False
# Set buffer size to minimize latency (useful for RTSP streams)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Initialize motion detector if enabled
if self.use_motion_detection:
self.motion_detector = MotionDetector()
if verbose:
print("Motion detection enabled (MOG2)")
# Print capture info
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = self.cap.get(cv2.CAP_PROP_FPS)
if verbose:
backend_name = self._backend_name(self.active_backend) if self.active_backend is not None else "Unknown"
print(f"Video capture started: source={self.active_source}, backend={backend_name}")
print(f"Resolution: {width}x{height} @ {fps:.1f} FPS")
return True
def stop(self):
"""Release the video capture resources."""
if self.cap is not None:
self.cap.release()
self.cap = None
self.active_source = None
self.active_backend = None
print("Video capture stopped")
def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
"""
Preprocess frame for YOLOv8 inference.
Args:
frame: Raw BGR frame from camera
Returns:
Preprocessed frame resized to 640x640
"""
# Resize to YOLOv8 native size (640x640)
resized = cv2.resize(frame, self.TARGET_SIZE, interpolation=cv2.INTER_LINEAR)
return resized
def read_frame(self) -> Tuple[bool, Optional[np.ndarray], Optional[np.ndarray]]:
"""
Read and preprocess a single frame.
Returns:
Tuple of (success, original_frame, preprocessed_frame)
"""
if self.cap is None:
return False, None, None
ret, frame = self.cap.read()
if not ret or frame is None:
return False, None, None
preprocessed = self.preprocess_frame(frame)
return True, frame, preprocessed
def extract_rois(self, frame: np.ndarray) -> List[ROI]:
"""
Extract regions of interest (moving objects) from frame.
Args:
frame: Input BGR frame
Returns:
List of ROI objects containing cropped and preprocessed regions
"""
if self.motion_detector is None:
# If no motion detection, return whole frame as single ROI
preprocessed = self.preprocess_frame(frame)
return [ROI(0, 0, frame.shape[1], frame.shape[0], frame, preprocessed)]
boxes = self.motion_detector.detect_motion_regions(frame)
if not boxes:
return []
rois = []
for x, y, w, h in boxes:
cropped = frame[y:y+h, x:x+w]
preprocessed = cv2.resize(cropped, self.TARGET_SIZE, interpolation=cv2.INTER_LINEAR)
rois.append(ROI(x, y, w, h, cropped, preprocessed))
return rois
def read_frame_with_rois(self) -> Tuple[bool, Optional[np.ndarray], List[ROI], Optional[np.ndarray]]:
"""
Read frame and extract ROIs for motion regions.
Returns:
Tuple of (success, original_frame, list_of_rois, foreground_mask)
"""
if self.cap is None:
return False, None, [], None
ret, frame = self.cap.read()
if not ret or frame is None:
return False, None, [], None
rois = self.extract_rois(frame)
# Get foreground mask for visualization
fg_mask = None
if self.motion_detector is not None:
fg_mask = self.motion_detector.get_foreground_mask(frame)
return True, frame, rois, fg_mask
def stream_frames(self) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]:
"""
Generator that continuously yields frames (no motion detection).
Yields:
Tuple of (original_frame, preprocessed_frame)
"""
while True:
success, original, preprocessed = self.read_frame()
if not success:
break
yield original, preprocessed
def stream_rois(self) -> Generator[Tuple[np.ndarray, List[ROI], Optional[np.ndarray]], None, None]:
"""
Generator that yields frames with motion-detected ROIs.
Yields:
Tuple of (original_frame, list_of_rois, foreground_mask)
"""
while True:
success, original, rois, fg_mask = self.read_frame_with_rois()
if not success:
break
yield original, rois, fg_mask
def __enter__(self):
"""Context manager entry."""
if not self.start():
raise RuntimeError(f"Could not open video source: {self.source}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop()
def main():
"""Demo: Capture frames with motion detection from webcam."""
# Use 0 for default webcam, or provide RTSP URL for IP camera
# Example RTSP: "rtsp://admin:password@192.168.1.100:554/stream1"
source = 0
with VideoCapture(source, use_motion_detection=True) as capture:
print("Press 'q' to quit")
print("Motion detection active - only moving regions will be processed")
for original, rois, fg_mask in capture.stream_rois():
# Draw bounding boxes around motion regions
display_frame = original.copy()
for i, roi in enumerate(rois):
# Draw green rectangle around ROI
cv2.rectangle(display_frame,
(roi.x, roi.y),
(roi.x + roi.width, roi.y + roi.height),
(0, 255, 0), 2)
# Label with ROI index and size
label = f"ROI {i+1}: {roi.width}x{roi.height}"
cv2.putText(display_frame, label, (roi.x, roi.y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# Show info
info = f"Motion ROIs: {len(rois)} | Press 'q' to quit"
cv2.putText(display_frame, info, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.imshow("Video Capture - Motion Detection", display_frame)
# Show foreground mask
if fg_mask is not None:
cv2.imshow("Foreground Mask", fg_mask)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
print("Done!")
if __name__ == "__main__":
main()