""" ShortSmith v2 - Video Processor Module FFmpeg-based video processing for: - Extracting video metadata - Extracting frames at specified timestamps/FPS - Extracting audio tracks - Cutting video clips """ import subprocess import json import shutil from pathlib import Path from typing import List, Optional, Tuple, Generator from dataclasses import dataclass import numpy as np try: from PIL import Image except ImportError: Image = None from utils.logger import get_logger, LogTimer from utils.helpers import ( VideoProcessingError, validate_video_file, ensure_dir, format_timestamp, ) from config import get_config logger = get_logger("core.video_processor") @dataclass class VideoMetadata: """Video file metadata.""" duration: float # Duration in seconds width: int height: int fps: float codec: str bitrate: Optional[int] audio_codec: Optional[str] audio_sample_rate: Optional[int] file_size: int file_path: Path @property def frame_count(self) -> int: """Estimated total frame count.""" return int(self.duration * self.fps) @property def aspect_ratio(self) -> float: """Video aspect ratio.""" return self.width / self.height if self.height > 0 else 0 @property def resolution(self) -> str: """Human-readable resolution string.""" return f"{self.width}x{self.height}" class VideoProcessor: """ FFmpeg-based video processor for frame extraction and manipulation. Handles all low-level video operations using FFmpeg subprocess calls. """ def __init__(self, ffmpeg_path: Optional[str] = None): """ Initialize video processor. Args: ffmpeg_path: Path to FFmpeg executable (auto-detected if None) Raises: VideoProcessingError: If FFmpeg is not found """ self.ffmpeg_path = ffmpeg_path or self._find_ffmpeg() self.ffprobe_path = self._find_ffprobe() if not self.ffmpeg_path: raise VideoProcessingError( "FFmpeg not found. Please install FFmpeg and add it to PATH." ) logger.info(f"VideoProcessor initialized with FFmpeg: {self.ffmpeg_path}") def _find_ffmpeg(self) -> Optional[str]: """Find FFmpeg executable in PATH.""" ffmpeg = shutil.which("ffmpeg") if ffmpeg: return ffmpeg # Common installation paths common_paths = [ "/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "C:\\ffmpeg\\bin\\ffmpeg.exe", "C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe", ] for path in common_paths: if Path(path).exists(): return path return None def _find_ffprobe(self) -> Optional[str]: """Find FFprobe executable in PATH.""" ffprobe = shutil.which("ffprobe") if ffprobe: return ffprobe # Try same directory as ffmpeg if self.ffmpeg_path: ffmpeg_dir = Path(self.ffmpeg_path).parent ffprobe_path = ffmpeg_dir / "ffprobe" if ffprobe_path.exists(): return str(ffprobe_path) ffprobe_path = ffmpeg_dir / "ffprobe.exe" if ffprobe_path.exists(): return str(ffprobe_path) return None def _run_command( self, command: List[str], capture_output: bool = True, check: bool = True, ) -> subprocess.CompletedProcess: """ Run a subprocess command with error handling. Args: command: Command and arguments capture_output: Whether to capture stdout/stderr check: Whether to raise on non-zero exit Returns: CompletedProcess result Raises: VideoProcessingError: If command fails """ try: logger.debug(f"Running command: {' '.join(command)}") result = subprocess.run( command, capture_output=capture_output, text=True, check=check, ) return result except subprocess.CalledProcessError as e: error_msg = e.stderr if e.stderr else str(e) logger.error(f"Command failed: {error_msg}") raise VideoProcessingError(f"FFmpeg command failed: {error_msg}") from e except FileNotFoundError as e: raise VideoProcessingError(f"FFmpeg not found: {e}") from e def get_metadata(self, video_path: str | Path) -> VideoMetadata: """ Extract metadata from a video file. Args: video_path: Path to the video file Returns: VideoMetadata object with video information Raises: VideoProcessingError: If metadata extraction fails """ video_path = Path(video_path) # Validate file first validation = validate_video_file(video_path) if not validation.is_valid: raise VideoProcessingError(validation.error_message) if not self.ffprobe_path: raise VideoProcessingError("FFprobe not found for metadata extraction") with LogTimer(logger, f"Extracting metadata from {video_path.name}"): command = [ self.ffprobe_path, "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(video_path), ] result = self._run_command(command) try: data = json.loads(result.stdout) except json.JSONDecodeError as e: raise VideoProcessingError(f"Failed to parse video metadata: {e}") from e # Extract video stream info video_stream = None audio_stream = None for stream in data.get("streams", []): if stream.get("codec_type") == "video" and video_stream is None: video_stream = stream elif stream.get("codec_type") == "audio" and audio_stream is None: audio_stream = stream if not video_stream: raise VideoProcessingError("No video stream found in file") # Parse FPS (can be "30/1" or "29.97") fps_str = video_stream.get("r_frame_rate", "30/1") if "/" in fps_str: num, den = map(int, fps_str.split("/")) fps = num / den if den > 0 else 30.0 else: fps = float(fps_str) # Get format info format_info = data.get("format", {}) metadata = VideoMetadata( duration=float(format_info.get("duration", 0)), width=int(video_stream.get("width", 0)), height=int(video_stream.get("height", 0)), fps=fps, codec=video_stream.get("codec_name", "unknown"), bitrate=int(format_info.get("bit_rate", 0)) if format_info.get("bit_rate") else None, audio_codec=audio_stream.get("codec_name") if audio_stream else None, audio_sample_rate=int(audio_stream.get("sample_rate", 0)) if audio_stream else None, file_size=validation.file_size, file_path=video_path, ) logger.info( f"Video metadata: {metadata.resolution}, " f"{metadata.fps:.2f}fps, {format_timestamp(metadata.duration)}" ) return metadata def extract_frames( self, video_path: str | Path, output_dir: str | Path, fps: Optional[float] = None, timestamps: Optional[List[float]] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, scale: Optional[Tuple[int, int]] = None, quality: int = 2, ) -> List[Path]: """ Extract frames from video. Args: video_path: Path to the video file output_dir: Directory to save extracted frames fps: Extract at this FPS (mutually exclusive with timestamps) timestamps: Specific timestamps to extract (in seconds) start_time: Start time for extraction (seconds) end_time: End time for extraction (seconds) scale: Target resolution (width, height), None to keep original quality: JPEG quality (1-31, lower is better) Returns: List of paths to extracted frame images Raises: VideoProcessingError: If frame extraction fails """ video_path = Path(video_path) output_dir = ensure_dir(output_dir) with LogTimer(logger, f"Extracting frames from {video_path.name}"): if timestamps: # Extract specific timestamps return self._extract_at_timestamps( video_path, output_dir, timestamps, scale, quality ) else: # Extract at specified FPS return self._extract_at_fps( video_path, output_dir, fps or 1.0, start_time, end_time, scale, quality ) def _extract_at_fps( self, video_path: Path, output_dir: Path, fps: float, start_time: Optional[float], end_time: Optional[float], scale: Optional[Tuple[int, int]], quality: int, ) -> List[Path]: """Extract frames at specified FPS.""" command = [self.ffmpeg_path, "-y"] # Input seeking (faster) if start_time is not None: command.extend(["-ss", str(start_time)]) command.extend(["-i", str(video_path)]) # Duration if end_time is not None: duration = end_time - (start_time or 0) command.extend(["-t", str(duration)]) # Filters filters = [f"fps={fps}"] if scale: filters.append(f"scale={scale[0]}:{scale[1]}") command.extend(["-vf", ",".join(filters)]) # Output settings command.extend([ "-q:v", str(quality), "-f", "image2", str(output_dir / "frame_%06d.jpg"), ]) self._run_command(command) # Collect output files frames = sorted(output_dir.glob("frame_*.jpg")) logger.info(f"Extracted {len(frames)} frames at {fps} FPS") return frames def _extract_at_timestamps( self, video_path: Path, output_dir: Path, timestamps: List[float], scale: Optional[Tuple[int, int]], quality: int, ) -> List[Path]: """Extract frames at specific timestamps.""" frames = [] for i, ts in enumerate(timestamps): output_path = output_dir / f"frame_{i:06d}.jpg" command = [ self.ffmpeg_path, "-y", "-ss", str(ts), "-i", str(video_path), "-vframes", "1", ] if scale: command.extend(["-vf", f"scale={scale[0]}:{scale[1]}"]) command.extend([ "-q:v", str(quality), str(output_path), ]) try: self._run_command(command) if output_path.exists(): frames.append(output_path) except VideoProcessingError as e: logger.warning(f"Failed to extract frame at {ts}s: {e}") logger.info(f"Extracted {len(frames)} frames at specific timestamps") return frames def extract_audio( self, video_path: str | Path, output_path: str | Path, sample_rate: int = 16000, mono: bool = True, ) -> Path: """ Extract audio track from video. Args: video_path: Path to the video file output_path: Path for the output audio file sample_rate: Audio sample rate (Hz) mono: Convert to mono if True Returns: Path to the extracted audio file Raises: VideoProcessingError: If audio extraction fails """ video_path = Path(video_path) output_path = Path(output_path) with LogTimer(logger, f"Extracting audio from {video_path.name}"): command = [ self.ffmpeg_path, "-y", "-i", str(video_path), "-vn", # No video "-acodec", "pcm_s16le", # WAV format "-ar", str(sample_rate), ] if mono: command.extend(["-ac", "1"]) command.append(str(output_path)) self._run_command(command) if not output_path.exists(): raise VideoProcessingError("Audio extraction produced no output") logger.info(f"Extracted audio to {output_path}") return output_path def cut_clip( self, video_path: str | Path, output_path: str | Path, start_time: float, end_time: float, reencode: bool = False, ) -> Path: """ Cut a clip from the video. Args: video_path: Path to the source video output_path: Path for the output clip start_time: Start time in seconds end_time: End time in seconds reencode: Whether to re-encode (slower but more precise) Returns: Path to the cut clip Raises: VideoProcessingError: If cutting fails """ video_path = Path(video_path) output_path = Path(output_path) duration = end_time - start_time if duration <= 0: raise VideoProcessingError( f"Invalid clip duration: {start_time} to {end_time}" ) with LogTimer(logger, f"Cutting clip {format_timestamp(start_time)}-{format_timestamp(end_time)}"): if reencode: # Re-encode for precise cutting command = [ self.ffmpeg_path, "-y", "-i", str(video_path), "-ss", str(start_time), "-t", str(duration), "-c:v", "libx264", "-c:a", "aac", "-preset", "fast", str(output_path), ] else: # Stream copy for fast cutting (may be slightly imprecise) command = [ self.ffmpeg_path, "-y", "-ss", str(start_time), "-i", str(video_path), "-t", str(duration), "-c", "copy", "-avoid_negative_ts", "make_zero", str(output_path), ] self._run_command(command) if not output_path.exists(): raise VideoProcessingError("Clip cutting produced no output") logger.info(f"Cut clip saved to {output_path}") return output_path def cut_clips_batch( self, video_path: str | Path, output_dir: str | Path, segments: List[Tuple[float, float]], reencode: bool = False, name_prefix: str = "clip", ) -> List[Path]: """ Cut multiple clips from a video. Args: video_path: Path to the source video output_dir: Directory for output clips segments: List of (start_time, end_time) tuples reencode: Whether to re-encode clips name_prefix: Prefix for output filenames Returns: List of paths to cut clips """ output_dir = ensure_dir(output_dir) clips = [] for i, (start, end) in enumerate(segments): output_path = output_dir / f"{name_prefix}_{i+1:03d}.mp4" try: clip_path = self.cut_clip( video_path, output_path, start, end, reencode ) clips.append(clip_path) except VideoProcessingError as e: logger.error(f"Failed to cut clip {i+1}: {e}") return clips def get_frame_at_timestamp( self, video_path: str | Path, timestamp: float, scale: Optional[Tuple[int, int]] = None, ) -> Optional[np.ndarray]: """ Get a single frame at a specific timestamp as numpy array. Args: video_path: Path to the video file timestamp: Timestamp in seconds scale: Target resolution (width, height) Returns: Frame as numpy array (H, W, C) in RGB format, or None if failed """ if Image is None: logger.error("PIL not installed, cannot get frame as array") return None import tempfile try: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp_path = Path(tmp.name) command = [ self.ffmpeg_path, "-y", "-ss", str(timestamp), "-i", str(video_path), "-vframes", "1", ] if scale: command.extend(["-vf", f"scale={scale[0]}:{scale[1]}"]) command.extend(["-q:v", "2", str(tmp_path)]) self._run_command(command) if tmp_path.exists(): img = Image.open(tmp_path).convert("RGB") frame = np.array(img) tmp_path.unlink() return frame except Exception as e: logger.error(f"Failed to get frame at {timestamp}s: {e}") return None def generate_thumbnail( self, video_path: str | Path, output_path: str | Path, timestamp: Optional[float] = None, size: Tuple[int, int] = (320, 180), ) -> Path: """ Generate a thumbnail from the video. Args: video_path: Path to the video file output_path: Path for the output thumbnail timestamp: Timestamp for thumbnail (None = 10% into video) size: Thumbnail size (width, height) Returns: Path to the generated thumbnail """ video_path = Path(video_path) output_path = Path(output_path) if timestamp is None: # Default to 10% into the video metadata = self.get_metadata(video_path) timestamp = metadata.duration * 0.1 command = [ self.ffmpeg_path, "-y", "-ss", str(timestamp), "-i", str(video_path), "-vframes", "1", "-vf", f"scale={size[0]}:{size[1]}", "-q:v", "2", str(output_path), ] self._run_command(command) return output_path # Export public interface __all__ = ["VideoProcessor", "VideoMetadata"]