Spaces:
Runtime error
Runtime error
| """ | |
| Video Processor Module | |
| Handles all video processing operations including frame extraction, | |
| validation, and video metadata extraction. | |
| """ | |
| import subprocess | |
| from pathlib import Path | |
| from typing import List, Optional, Union, Tuple | |
| import cv2 | |
| import magic | |
| from loguru import logger | |
| from core.config import config | |
| from core.exceptions import ( | |
| VideoProcessingError, | |
| InvalidFileError, | |
| FileSizeError, | |
| UnsupportedFormatError, | |
| FrameExtractionError, | |
| ) | |
| from core.image_processor import ImageProcessor | |
| class VideoProcessor: | |
| """ | |
| Process videos for analysis. | |
| Handles validation, frame extraction, and metadata extraction | |
| for videos before they are analyzed. | |
| """ | |
| def __init__(self): | |
| """Initialize VideoProcessor.""" | |
| self.max_size = config.MAX_VIDEO_SIZE | |
| self.allowed_formats = config.ALLOWED_VIDEO_FORMATS | |
| self.fps_extraction = config.VIDEO_FPS_EXTRACTION | |
| self.max_frames = config.MAX_FRAMES_PER_VIDEO | |
| self.image_processor = ImageProcessor() | |
| logger.info("VideoProcessor initialized") | |
| def validate_video(self, video_path: Path) -> bool: | |
| """ | |
| Validate video file. | |
| Args: | |
| video_path: Path to video file | |
| Returns: | |
| True if valid | |
| Raises: | |
| FileSizeError: If file too large | |
| UnsupportedFormatError: If format not supported | |
| InvalidFileError: If file is corrupted | |
| """ | |
| # Check file exists | |
| if not video_path.exists(): | |
| raise InvalidFileError( | |
| f"Video file not found: {video_path}", | |
| {"path": str(video_path)} | |
| ) | |
| # Check file size | |
| file_size = video_path.stat().st_size | |
| if file_size > self.max_size: | |
| raise FileSizeError( | |
| f"Video too large: {file_size / 1024 / 1024:.1f}MB", | |
| {"max_size": self.max_size, "actual_size": file_size} | |
| ) | |
| # Check file extension | |
| ext = video_path.suffix.lower() | |
| if ext not in self.allowed_formats: | |
| raise UnsupportedFormatError( | |
| f"Unsupported video format: {ext}", | |
| {"allowed": self.allowed_formats, "received": ext} | |
| ) | |
| # Check MIME type using magic bytes | |
| try: | |
| mime = magic.from_file(str(video_path), mime=True) | |
| if not mime.startswith("video/"): | |
| raise InvalidFileError( | |
| f"File is not a valid video: {mime}", | |
| {"mime_type": mime} | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Could not verify MIME type: {e}") | |
| return True | |
| def get_video_info(self, video_path: Union[str, Path]) -> dict: | |
| """ | |
| Get video metadata using OpenCV. | |
| Args: | |
| video_path: Path to video file | |
| Returns: | |
| Dictionary with video information | |
| """ | |
| video_path = Path(video_path) | |
| self.validate_video(video_path) | |
| try: | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| raise InvalidFileError( | |
| "Cannot open video file", | |
| {"path": str(video_path)} | |
| ) | |
| # Extract metadata | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| cap.release() | |
| info = { | |
| "filename": video_path.name, | |
| "fps": fps, | |
| "frame_count": frame_count, | |
| "width": width, | |
| "height": height, | |
| "duration": duration, | |
| "file_size": video_path.stat().st_size, | |
| } | |
| logger.info(f"Video info: {video_path.name} - {width}x{height}, " | |
| f"{fps:.2f}fps, {duration:.2f}s") | |
| return info | |
| except Exception as e: | |
| logger.error(f"Failed to get video info: {e}") | |
| raise VideoProcessingError( | |
| f"Cannot extract video metadata: {str(e)}", | |
| {"path": str(video_path), "error": str(e)} | |
| ) | |
| def extract_frames( | |
| self, | |
| video_path: Union[str, Path], | |
| fps: Optional[float] = None, | |
| max_frames: Optional[int] = None, | |
| output_dir: Optional[Path] = None | |
| ) -> List[Path]: | |
| """ | |
| Extract frames from video at specified FPS. | |
| Args: | |
| video_path: Path to video file | |
| fps: Frames per second to extract (default: config.VIDEO_FPS_EXTRACTION) | |
| max_frames: Maximum number of frames to extract | |
| output_dir: Directory to save frames (default: cache directory) | |
| Returns: | |
| List of paths to extracted frames | |
| Raises: | |
| FrameExtractionError: If frame extraction fails | |
| """ | |
| video_path = Path(video_path) | |
| self.validate_video(video_path) | |
| if fps is None: | |
| fps = self.fps_extraction | |
| if max_frames is None: | |
| max_frames = self.max_frames | |
| if output_dir is None: | |
| output_dir = config.CACHE_DIR / "frames" / video_path.stem | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| raise FrameExtractionError( | |
| "Cannot open video file", | |
| {"path": str(video_path)} | |
| ) | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Calculate frame interval | |
| frame_interval = int(video_fps / fps) if fps < video_fps else 1 | |
| frames_saved = [] | |
| frame_idx = 0 | |
| saved_count = 0 | |
| logger.info(f"Extracting frames from {video_path.name} " | |
| f"(fps={fps}, interval={frame_interval})") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Extract frame at specified interval | |
| if frame_idx % frame_interval == 0: | |
| # Save frame | |
| frame_path = output_dir / f"frame_{saved_count:04d}.jpg" | |
| cv2.imwrite(str(frame_path), frame) | |
| frames_saved.append(frame_path) | |
| saved_count += 1 | |
| # Check if we've reached max frames | |
| if saved_count >= max_frames: | |
| logger.info(f"Reached max frames limit: {max_frames}") | |
| break | |
| frame_idx += 1 | |
| cap.release() | |
| logger.info(f"Extracted {len(frames_saved)} frames from {video_path.name}") | |
| return frames_saved | |
| except Exception as e: | |
| logger.error(f"Frame extraction failed: {e}") | |
| raise FrameExtractionError( | |
| f"Failed to extract frames: {str(e)}", | |
| {"path": str(video_path), "error": str(e)} | |
| ) | |
| def extract_key_frames( | |
| self, | |
| video_path: Union[str, Path], | |
| num_frames: int = 5, | |
| output_dir: Optional[Path] = None | |
| ) -> List[Path]: | |
| """ | |
| Extract evenly distributed key frames from video. | |
| Args: | |
| video_path: Path to video file | |
| num_frames: Number of key frames to extract | |
| output_dir: Directory to save frames | |
| Returns: | |
| List of paths to extracted frames | |
| """ | |
| video_path = Path(video_path) | |
| self.validate_video(video_path) | |
| if output_dir is None: | |
| output_dir = config.CACHE_DIR / "keyframes" / video_path.stem | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| raise FrameExtractionError( | |
| "Cannot open video file", | |
| {"path": str(video_path)} | |
| ) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Calculate frame positions | |
| positions = [int(i * frame_count / (num_frames + 1)) | |
| for i in range(1, num_frames + 1)] | |
| frames_saved = [] | |
| for idx, pos in enumerate(positions): | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, pos) | |
| ret, frame = cap.read() | |
| if ret: | |
| frame_path = output_dir / f"keyframe_{idx:02d}.jpg" | |
| cv2.imwrite(str(frame_path), frame) | |
| frames_saved.append(frame_path) | |
| cap.release() | |
| logger.info(f"Extracted {len(frames_saved)} key frames from {video_path.name}") | |
| return frames_saved | |
| except Exception as e: | |
| logger.error(f"Key frame extraction failed: {e}") | |
| raise FrameExtractionError( | |
| f"Failed to extract key frames: {str(e)}", | |
| {"path": str(video_path), "error": str(e)} | |
| ) | |
| def process( | |
| self, | |
| video_path: Union[str, Path], | |
| extract_method: str = "fps", | |
| **kwargs | |
| ) -> List[Path]: | |
| """ | |
| Complete video processing pipeline. | |
| Args: | |
| video_path: Path to video file | |
| extract_method: Method for frame extraction ("fps" or "keyframes") | |
| **kwargs: Additional arguments for extraction method | |
| Returns: | |
| List of extracted frame paths | |
| """ | |
| try: | |
| if extract_method == "fps": | |
| return self.extract_frames(video_path, **kwargs) | |
| elif extract_method == "keyframes": | |
| return self.extract_key_frames(video_path, **kwargs) | |
| else: | |
| raise ValueError(f"Unknown extraction method: {extract_method}") | |
| except Exception as e: | |
| logger.error(f"Video processing failed: {e}") | |
| raise VideoProcessingError( | |
| f"Failed to process video: {str(e)}", | |
| {"path": str(video_path), "error": str(e)} | |
| ) | |