Ali Abdullah
Fix requirements.txt encoding for HF
98a79a7
"""
Video Input Handler
Manages video input from camera or file
Optimized with threaded capture for GPU inference
"""
import cv2
import numpy as np
from typing import Optional, Tuple
import logging
import os
import threading
from collections import deque
logger = logging.getLogger(__name__)
class VideoHandler:
"""
Handle video input from webcam or video file
Supports:
- Webcam input (source = 0, 1, 2, ...)
- Video file input (source = path to file)
- Frame skipping for performance
- Resolution configuration
"""
def __init__(self, config: dict):
"""
Initialize video handler
Args:
config: Configuration dictionary
"""
self.config = config
self.source = config['video']['source']
self.target_fps = config['video']['fps']
self.skip_frames = config['video']['skip_frames']
self.target_width = config['video']['resolution']['width']
self.target_height = config['video']['resolution']['height']
self.cap = None
self.frame_count = 0
self.is_camera = False
# Threaded capture for async frame reading (cameras only)
self._thread = None
self._stopped = False
self._frame_buffer = deque(maxlen=2) # Small buffer for low latency
self._lock = threading.Lock()
self._use_threading = False # Disabled - causes issues with video files
logger.info(f"Video Handler initialized with source: {self.source}")
def set_source(self, source, is_camera: bool = None):
"""
Set video source and optionally specify if it's a camera
Args:
source: Video source (int for camera, str for file path)
is_camera: Explicitly specify if source is camera (optional)
"""
self.source = source
if is_camera is not None:
self.is_camera = is_camera
self._is_camera_explicit = True # Mark as explicitly set
else:
# Auto-detect
self.is_camera = isinstance(source, int)
if hasattr(self, '_is_camera_explicit'):
delattr(self, '_is_camera_explicit')
logger.info(f"Source set to: {source}, is_camera: {self.is_camera}")
def open(self) -> bool:
"""
Open video source
Returns:
success: True if video source opened successfully
"""
try:
# Release any existing capture
if self.cap is not None:
self.cap.release()
self.cap = None
# Validate source type
if isinstance(self.source, int):
# Camera source - only update is_camera if not already explicitly set
if not hasattr(self, '_is_camera_explicit'):
self.is_camera = True
logger.info(f"Opening webcam: Camera {self.source}")
elif isinstance(self.source, str):
# File source - only update is_camera if not already explicitly set
if not hasattr(self, '_is_camera_explicit'):
self.is_camera = False
if not os.path.exists(self.source):
logger.error(f"Video file not found: {self.source}")
return False
logger.info(f"Opening video file: {self.source}")
else:
logger.error(f"Invalid source type: {type(self.source)}")
return False
# Open video source with DirectShow backend for Windows cameras
if self.is_camera:
# Try with DirectShow backend first (Windows)
self.cap = cv2.VideoCapture(self.source, cv2.CAP_DSHOW)
if not self.cap.isOpened():
logger.warning("DirectShow backend failed, trying default backend")
self.cap = cv2.VideoCapture(self.source)
else:
# For video files, use default backend
logger.info(f"Creating VideoCapture for file: {self.source}")
self.cap = cv2.VideoCapture(self.source)
if not self.cap.isOpened():
logger.error(f"Failed to open video source: {self.source}")
return False
# Set camera properties if using webcam
if self.is_camera:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.target_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.target_height)
self.cap.set(cv2.CAP_PROP_FPS, self.target_fps)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Minimize buffer for low latency
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G')) # MJPEG for speed
# Get actual video properties
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
logger.info(f"Video source opened successfully")
logger.info(f" Resolution: {actual_width}x{actual_height}")
logger.info(f" FPS: {actual_fps}")
if not self.is_camera:
logger.info(f" Total frames: {total_frames}")
# Start threaded capture for async frame reading
if self._use_threading:
self._start_capture_thread()
return True
except Exception as e:
logger.error(f"Error opening video source: {e}")
return False
def _start_capture_thread(self):
"""Start background thread for frame capture"""
self._stopped = False
self._thread = threading.Thread(target=self._capture_loop, daemon=True)
self._thread.start()
logger.info("Threaded frame capture started")
def _capture_loop(self):
"""Background thread that continuously captures frames"""
while not self._stopped and self.cap is not None and self.cap.isOpened():
# Skip frames if configured
for _ in range(self.skip_frames - 1):
self.cap.grab()
ret, frame = self.cap.read()
if ret:
# Resize if needed
if frame.shape[1] != self.target_width or frame.shape[0] != self.target_height:
frame = cv2.resize(frame, (self.target_width, self.target_height),
interpolation=cv2.INTER_NEAREST)
with self._lock:
self._frame_buffer.append((True, frame))
else:
with self._lock:
self._frame_buffer.append((False, None))
def read_frame(self) -> Tuple[bool, Optional[np.ndarray]]:
"""
Read a frame from video source
Uses threaded capture if enabled for non-blocking reads
Returns:
success: True if frame read successfully
frame: Frame as numpy array (BGR format)
"""
if self.cap is None or not self.cap.isOpened():
return False, None
try:
# Use threaded capture if enabled
if self._use_threading and self._thread is not None:
with self._lock:
if len(self._frame_buffer) > 0:
ret, frame = self._frame_buffer.pop()
if ret:
self.frame_count += 1
return ret, frame
else:
return False, None
# Fallback to synchronous capture
for _ in range(self.skip_frames - 1):
self.cap.grab()
ret, frame = self.cap.read()
if not ret:
return False, None
# Only resize if dimensions don't match (optimization)
if frame.shape[1] != self.target_width or frame.shape[0] != self.target_height:
frame = cv2.resize(frame, (self.target_width, self.target_height),
interpolation=cv2.INTER_NEAREST) # Fastest interpolation
self.frame_count += 1
return True, frame
except Exception as e:
logger.error(f"Error reading frame: {e}")
return False, None
def release(self):
"""Release video source and stop capture thread with proper cleanup"""
try:
self._stopped = True
# Wait for thread to finish with extended timeout
if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout=3.0) # Increased from 1.0 to 3.0 seconds
# Force cleanup if thread is still alive
if self._thread.is_alive():
logger.warning("Capture thread did not stop within timeout, forcing cleanup")
self._thread = None
# Release OpenCV capture
if self.cap is not None:
if self.cap.isOpened():
self.cap.release()
self.cap = None
logger.info("Video source released")
# Clear buffer
with self._lock:
self._frame_buffer.clear()
except Exception as e:
logger.error(f"Error releasing video capture: {e}")
# Force cleanup even on error
self.cap = None
self._thread = None
def is_opened(self) -> bool:
"""Check if video source is opened"""
return self.cap is not None and self.cap.isOpened()
def get_properties(self) -> dict:
"""
Get video source properties
Returns:
properties: Dictionary with video properties
"""
if self.cap is None or not self.cap.isOpened():
return {}
return {
'width': int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
'height': int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
'fps': self.cap.get(cv2.CAP_PROP_FPS),
'total_frames': int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)),
'current_frame': self.frame_count,
'is_camera': self.is_camera
}
def is_video_end(self) -> bool:
"""
Check if video file has reached the end
Returns:
True if video has ended, False otherwise (or if camera)
"""
if self.is_camera or self.cap is None:
return False
try:
# Get current position and total frames
current_pos = self.cap.get(cv2.CAP_PROP_POS_FRAMES)
total_frames = self.cap.get(cv2.CAP_PROP_FRAME_COUNT)
# Check if we're at or past the end
# Use -2 threshold to catch end before actual failure
if total_frames > 0 and current_pos >= total_frames - 2:
return True
return False
except Exception as e:
logger.error(f"Error checking video end: {e}")
return False
def restart(self) -> bool:
"""
Restart video from beginning (for video files only)
Returns:
success: True if restart successful
"""
if self.is_camera:
logger.warning("Cannot restart camera feed")
return False
if self.cap is not None:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.frame_count = 0
logger.info("Video restarted from beginning")
return True
return False
def __del__(self):
"""Destructor to ensure video source is released"""
self.release()