|
|
""" |
|
|
Video Processor Module |
|
|
|
|
|
Handles all video processing operations including frame extraction, |
|
|
validation, and video metadata extraction. |
|
|
""" |
|
|
|
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from typing import List, Optional, Union, Tuple |
|
|
import cv2 |
|
|
import magic |
|
|
from loguru import logger |
|
|
|
|
|
from core.config import config |
|
|
from core.exceptions import ( |
|
|
VideoProcessingError, |
|
|
InvalidFileError, |
|
|
FileSizeError, |
|
|
UnsupportedFormatError, |
|
|
FrameExtractionError, |
|
|
) |
|
|
from core.image_processor import ImageProcessor |
|
|
|
|
|
|
|
|
class VideoProcessor: |
|
|
""" |
|
|
Process videos for analysis. |
|
|
|
|
|
Handles validation, frame extraction, and metadata extraction |
|
|
for videos before they are analyzed. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize VideoProcessor.""" |
|
|
self.max_size = config.MAX_VIDEO_SIZE |
|
|
self.allowed_formats = config.ALLOWED_VIDEO_FORMATS |
|
|
self.fps_extraction = config.VIDEO_FPS_EXTRACTION |
|
|
self.max_frames = config.MAX_FRAMES_PER_VIDEO |
|
|
self.image_processor = ImageProcessor() |
|
|
logger.info("VideoProcessor initialized") |
|
|
|
|
|
def validate_video(self, video_path: Path) -> bool: |
|
|
""" |
|
|
Validate video file. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
|
|
|
Returns: |
|
|
True if valid |
|
|
|
|
|
Raises: |
|
|
FileSizeError: If file too large |
|
|
UnsupportedFormatError: If format not supported |
|
|
InvalidFileError: If file is corrupted |
|
|
""" |
|
|
|
|
|
if not video_path.exists(): |
|
|
raise InvalidFileError( |
|
|
f"Video file not found: {video_path}", |
|
|
{"path": str(video_path)} |
|
|
) |
|
|
|
|
|
|
|
|
file_size = video_path.stat().st_size |
|
|
if file_size > self.max_size: |
|
|
raise FileSizeError( |
|
|
f"Video too large: {file_size / 1024 / 1024:.1f}MB", |
|
|
{"max_size": self.max_size, "actual_size": file_size} |
|
|
) |
|
|
|
|
|
|
|
|
ext = video_path.suffix.lower() |
|
|
if ext not in self.allowed_formats: |
|
|
raise UnsupportedFormatError( |
|
|
f"Unsupported video format: {ext}", |
|
|
{"allowed": self.allowed_formats, "received": ext} |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
mime = magic.from_file(str(video_path), mime=True) |
|
|
if not mime.startswith("video/"): |
|
|
raise InvalidFileError( |
|
|
f"File is not a valid video: {mime}", |
|
|
{"mime_type": mime} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not verify MIME type: {e}") |
|
|
|
|
|
return True |
|
|
|
|
|
def get_video_info(self, video_path: Union[str, Path]) -> dict: |
|
|
""" |
|
|
Get video metadata using OpenCV. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
|
|
|
Returns: |
|
|
Dictionary with video information |
|
|
""" |
|
|
video_path = Path(video_path) |
|
|
self.validate_video(video_path) |
|
|
|
|
|
try: |
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
raise InvalidFileError( |
|
|
"Cannot open video file", |
|
|
{"path": str(video_path)} |
|
|
) |
|
|
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
duration = frame_count / fps if fps > 0 else 0 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
info = { |
|
|
"filename": video_path.name, |
|
|
"fps": fps, |
|
|
"frame_count": frame_count, |
|
|
"width": width, |
|
|
"height": height, |
|
|
"duration": duration, |
|
|
"file_size": video_path.stat().st_size, |
|
|
} |
|
|
|
|
|
logger.info(f"Video info: {video_path.name} - {width}x{height}, " |
|
|
f"{fps:.2f}fps, {duration:.2f}s") |
|
|
|
|
|
return info |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get video info: {e}") |
|
|
raise VideoProcessingError( |
|
|
f"Cannot extract video metadata: {str(e)}", |
|
|
{"path": str(video_path), "error": str(e)} |
|
|
) |
|
|
|
|
|
def extract_frames( |
|
|
self, |
|
|
video_path: Union[str, Path], |
|
|
fps: Optional[float] = None, |
|
|
max_frames: Optional[int] = None, |
|
|
output_dir: Optional[Path] = None |
|
|
) -> List[Path]: |
|
|
""" |
|
|
Extract frames from video at specified FPS. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
fps: Frames per second to extract (default: config.VIDEO_FPS_EXTRACTION) |
|
|
max_frames: Maximum number of frames to extract |
|
|
output_dir: Directory to save frames (default: cache directory) |
|
|
|
|
|
Returns: |
|
|
List of paths to extracted frames |
|
|
|
|
|
Raises: |
|
|
FrameExtractionError: If frame extraction fails |
|
|
""" |
|
|
video_path = Path(video_path) |
|
|
self.validate_video(video_path) |
|
|
|
|
|
if fps is None: |
|
|
fps = self.fps_extraction |
|
|
|
|
|
if max_frames is None: |
|
|
max_frames = self.max_frames |
|
|
|
|
|
if output_dir is None: |
|
|
output_dir = config.CACHE_DIR / "frames" / video_path.stem |
|
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
try: |
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
raise FrameExtractionError( |
|
|
"Cannot open video file", |
|
|
{"path": str(video_path)} |
|
|
) |
|
|
|
|
|
video_fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
|
|
|
frame_interval = int(video_fps / fps) if fps < video_fps else 1 |
|
|
|
|
|
frames_saved = [] |
|
|
frame_idx = 0 |
|
|
saved_count = 0 |
|
|
|
|
|
logger.info(f"Extracting frames from {video_path.name} " |
|
|
f"(fps={fps}, interval={frame_interval})") |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
if frame_idx % frame_interval == 0: |
|
|
|
|
|
frame_path = output_dir / f"frame_{saved_count:04d}.jpg" |
|
|
cv2.imwrite(str(frame_path), frame) |
|
|
frames_saved.append(frame_path) |
|
|
saved_count += 1 |
|
|
|
|
|
|
|
|
if saved_count >= max_frames: |
|
|
logger.info(f"Reached max frames limit: {max_frames}") |
|
|
break |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
cap.release() |
|
|
|
|
|
logger.info(f"Extracted {len(frames_saved)} frames from {video_path.name}") |
|
|
|
|
|
return frames_saved |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Frame extraction failed: {e}") |
|
|
raise FrameExtractionError( |
|
|
f"Failed to extract frames: {str(e)}", |
|
|
{"path": str(video_path), "error": str(e)} |
|
|
) |
|
|
|
|
|
def extract_key_frames( |
|
|
self, |
|
|
video_path: Union[str, Path], |
|
|
num_frames: int = 5, |
|
|
output_dir: Optional[Path] = None |
|
|
) -> List[Path]: |
|
|
""" |
|
|
Extract evenly distributed key frames from video. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
num_frames: Number of key frames to extract |
|
|
output_dir: Directory to save frames |
|
|
|
|
|
Returns: |
|
|
List of paths to extracted frames |
|
|
""" |
|
|
video_path = Path(video_path) |
|
|
self.validate_video(video_path) |
|
|
|
|
|
if output_dir is None: |
|
|
output_dir = config.CACHE_DIR / "keyframes" / video_path.stem |
|
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
try: |
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
raise FrameExtractionError( |
|
|
"Cannot open video file", |
|
|
{"path": str(video_path)} |
|
|
) |
|
|
|
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
|
|
|
positions = [int(i * frame_count / (num_frames + 1)) |
|
|
for i in range(1, num_frames + 1)] |
|
|
|
|
|
frames_saved = [] |
|
|
|
|
|
for idx, pos in enumerate(positions): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, pos) |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if ret: |
|
|
frame_path = output_dir / f"keyframe_{idx:02d}.jpg" |
|
|
cv2.imwrite(str(frame_path), frame) |
|
|
frames_saved.append(frame_path) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
logger.info(f"Extracted {len(frames_saved)} key frames from {video_path.name}") |
|
|
|
|
|
return frames_saved |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Key frame extraction failed: {e}") |
|
|
raise FrameExtractionError( |
|
|
f"Failed to extract key frames: {str(e)}", |
|
|
{"path": str(video_path), "error": str(e)} |
|
|
) |
|
|
|
|
|
def process( |
|
|
self, |
|
|
video_path: Union[str, Path], |
|
|
extract_method: str = "fps", |
|
|
**kwargs |
|
|
) -> List[Path]: |
|
|
""" |
|
|
Complete video processing pipeline. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
extract_method: Method for frame extraction ("fps" or "keyframes") |
|
|
**kwargs: Additional arguments for extraction method |
|
|
|
|
|
Returns: |
|
|
List of extracted frame paths |
|
|
""" |
|
|
try: |
|
|
if extract_method == "fps": |
|
|
return self.extract_frames(video_path, **kwargs) |
|
|
elif extract_method == "keyframes": |
|
|
return self.extract_key_frames(video_path, **kwargs) |
|
|
else: |
|
|
raise ValueError(f"Unknown extraction method: {extract_method}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video processing failed: {e}") |
|
|
raise VideoProcessingError( |
|
|
f"Failed to process video: {str(e)}", |
|
|
{"path": str(video_path), "error": str(e)} |
|
|
) |
|
|
|