""" Video Processor Module Handles all video processing operations including frame extraction, validation, and video metadata extraction. """ import subprocess from pathlib import Path from typing import List, Optional, Union, Tuple import cv2 import magic from loguru import logger from core.config import config from core.exceptions import ( VideoProcessingError, InvalidFileError, FileSizeError, UnsupportedFormatError, FrameExtractionError, ) from core.image_processor import ImageProcessor class VideoProcessor: """ Process videos for analysis. Handles validation, frame extraction, and metadata extraction for videos before they are analyzed. """ def __init__(self): """Initialize VideoProcessor.""" self.max_size = config.MAX_VIDEO_SIZE self.allowed_formats = config.ALLOWED_VIDEO_FORMATS self.fps_extraction = config.VIDEO_FPS_EXTRACTION self.max_frames = config.MAX_FRAMES_PER_VIDEO self.image_processor = ImageProcessor() logger.info("VideoProcessor initialized") def validate_video(self, video_path: Path) -> bool: """ Validate video file. Args: video_path: Path to video file Returns: True if valid Raises: FileSizeError: If file too large UnsupportedFormatError: If format not supported InvalidFileError: If file is corrupted """ # Check file exists if not video_path.exists(): raise InvalidFileError( f"Video file not found: {video_path}", {"path": str(video_path)} ) # Check file size file_size = video_path.stat().st_size if file_size > self.max_size: raise FileSizeError( f"Video too large: {file_size / 1024 / 1024:.1f}MB", {"max_size": self.max_size, "actual_size": file_size} ) # Check file extension ext = video_path.suffix.lower() if ext not in self.allowed_formats: raise UnsupportedFormatError( f"Unsupported video format: {ext}", {"allowed": self.allowed_formats, "received": ext} ) # Check MIME type using magic bytes try: mime = magic.from_file(str(video_path), mime=True) if not mime.startswith("video/"): raise InvalidFileError( f"File is not a valid video: {mime}", {"mime_type": mime} ) except Exception as e: logger.warning(f"Could not verify MIME type: {e}") return True def get_video_info(self, video_path: Union[str, Path]) -> dict: """ Get video metadata using OpenCV. Args: video_path: Path to video file Returns: Dictionary with video information """ video_path = Path(video_path) self.validate_video(video_path) try: cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise InvalidFileError( "Cannot open video file", {"path": str(video_path)} ) # Extract metadata fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration = frame_count / fps if fps > 0 else 0 cap.release() info = { "filename": video_path.name, "fps": fps, "frame_count": frame_count, "width": width, "height": height, "duration": duration, "file_size": video_path.stat().st_size, } logger.info(f"Video info: {video_path.name} - {width}x{height}, " f"{fps:.2f}fps, {duration:.2f}s") return info except Exception as e: logger.error(f"Failed to get video info: {e}") raise VideoProcessingError( f"Cannot extract video metadata: {str(e)}", {"path": str(video_path), "error": str(e)} ) def extract_frames( self, video_path: Union[str, Path], fps: Optional[float] = None, max_frames: Optional[int] = None, output_dir: Optional[Path] = None ) -> List[Path]: """ Extract frames from video at specified FPS. Args: video_path: Path to video file fps: Frames per second to extract (default: config.VIDEO_FPS_EXTRACTION) max_frames: Maximum number of frames to extract output_dir: Directory to save frames (default: cache directory) Returns: List of paths to extracted frames Raises: FrameExtractionError: If frame extraction fails """ video_path = Path(video_path) self.validate_video(video_path) if fps is None: fps = self.fps_extraction if max_frames is None: max_frames = self.max_frames if output_dir is None: output_dir = config.CACHE_DIR / "frames" / video_path.stem output_dir.mkdir(parents=True, exist_ok=True) try: cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise FrameExtractionError( "Cannot open video file", {"path": str(video_path)} ) video_fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Calculate frame interval frame_interval = int(video_fps / fps) if fps < video_fps else 1 frames_saved = [] frame_idx = 0 saved_count = 0 logger.info(f"Extracting frames from {video_path.name} " f"(fps={fps}, interval={frame_interval})") while True: ret, frame = cap.read() if not ret: break # Extract frame at specified interval if frame_idx % frame_interval == 0: # Save frame frame_path = output_dir / f"frame_{saved_count:04d}.jpg" cv2.imwrite(str(frame_path), frame) frames_saved.append(frame_path) saved_count += 1 # Check if we've reached max frames if saved_count >= max_frames: logger.info(f"Reached max frames limit: {max_frames}") break frame_idx += 1 cap.release() logger.info(f"Extracted {len(frames_saved)} frames from {video_path.name}") return frames_saved except Exception as e: logger.error(f"Frame extraction failed: {e}") raise FrameExtractionError( f"Failed to extract frames: {str(e)}", {"path": str(video_path), "error": str(e)} ) def extract_key_frames( self, video_path: Union[str, Path], num_frames: int = 5, output_dir: Optional[Path] = None ) -> List[Path]: """ Extract evenly distributed key frames from video. Args: video_path: Path to video file num_frames: Number of key frames to extract output_dir: Directory to save frames Returns: List of paths to extracted frames """ video_path = Path(video_path) self.validate_video(video_path) if output_dir is None: output_dir = config.CACHE_DIR / "keyframes" / video_path.stem output_dir.mkdir(parents=True, exist_ok=True) try: cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise FrameExtractionError( "Cannot open video file", {"path": str(video_path)} ) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Calculate frame positions positions = [int(i * frame_count / (num_frames + 1)) for i in range(1, num_frames + 1)] frames_saved = [] for idx, pos in enumerate(positions): cap.set(cv2.CAP_PROP_POS_FRAMES, pos) ret, frame = cap.read() if ret: frame_path = output_dir / f"keyframe_{idx:02d}.jpg" cv2.imwrite(str(frame_path), frame) frames_saved.append(frame_path) cap.release() logger.info(f"Extracted {len(frames_saved)} key frames from {video_path.name}") return frames_saved except Exception as e: logger.error(f"Key frame extraction failed: {e}") raise FrameExtractionError( f"Failed to extract key frames: {str(e)}", {"path": str(video_path), "error": str(e)} ) def process( self, video_path: Union[str, Path], extract_method: str = "fps", **kwargs ) -> List[Path]: """ Complete video processing pipeline. Args: video_path: Path to video file extract_method: Method for frame extraction ("fps" or "keyframes") **kwargs: Additional arguments for extraction method Returns: List of extracted frame paths """ try: if extract_method == "fps": return self.extract_frames(video_path, **kwargs) elif extract_method == "keyframes": return self.extract_key_frames(video_path, **kwargs) else: raise ValueError(f"Unknown extraction method: {extract_method}") except Exception as e: logger.error(f"Video processing failed: {e}") raise VideoProcessingError( f"Failed to process video: {str(e)}", {"path": str(video_path), "error": str(e)} )