Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Video Processor Module | |
| FFmpeg-based video processing for: | |
| - Extracting video metadata | |
| - Extracting frames at specified timestamps/FPS | |
| - Extracting audio tracks | |
| - Cutting video clips | |
| """ | |
| import subprocess | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple, Generator | |
| from dataclasses import dataclass | |
| import numpy as np | |
| try: | |
| from PIL import Image | |
| except ImportError: | |
| Image = None | |
| from utils.logger import get_logger, LogTimer | |
| from utils.helpers import ( | |
| VideoProcessingError, | |
| validate_video_file, | |
| ensure_dir, | |
| format_timestamp, | |
| ) | |
| from config import get_config | |
| logger = get_logger("core.video_processor") | |
| class VideoMetadata: | |
| """Video file metadata.""" | |
| duration: float # Duration in seconds | |
| width: int | |
| height: int | |
| fps: float | |
| codec: str | |
| bitrate: Optional[int] | |
| audio_codec: Optional[str] | |
| audio_sample_rate: Optional[int] | |
| file_size: int | |
| file_path: Path | |
| def frame_count(self) -> int: | |
| """Estimated total frame count.""" | |
| return int(self.duration * self.fps) | |
| def aspect_ratio(self) -> float: | |
| """Video aspect ratio.""" | |
| return self.width / self.height if self.height > 0 else 0 | |
| def resolution(self) -> str: | |
| """Human-readable resolution string.""" | |
| return f"{self.width}x{self.height}" | |
| class VideoProcessor: | |
| """ | |
| FFmpeg-based video processor for frame extraction and manipulation. | |
| Handles all low-level video operations using FFmpeg subprocess calls. | |
| """ | |
| def __init__(self, ffmpeg_path: Optional[str] = None): | |
| """ | |
| Initialize video processor. | |
| Args: | |
| ffmpeg_path: Path to FFmpeg executable (auto-detected if None) | |
| Raises: | |
| VideoProcessingError: If FFmpeg is not found | |
| """ | |
| self.ffmpeg_path = ffmpeg_path or self._find_ffmpeg() | |
| self.ffprobe_path = self._find_ffprobe() | |
| if not self.ffmpeg_path: | |
| raise VideoProcessingError( | |
| "FFmpeg not found. Please install FFmpeg and add it to PATH." | |
| ) | |
| logger.info(f"VideoProcessor initialized with FFmpeg: {self.ffmpeg_path}") | |
| def _find_ffmpeg(self) -> Optional[str]: | |
| """Find FFmpeg executable in PATH.""" | |
| ffmpeg = shutil.which("ffmpeg") | |
| if ffmpeg: | |
| return ffmpeg | |
| # Common installation paths | |
| common_paths = [ | |
| "/usr/bin/ffmpeg", | |
| "/usr/local/bin/ffmpeg", | |
| "C:\\ffmpeg\\bin\\ffmpeg.exe", | |
| "C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe", | |
| ] | |
| for path in common_paths: | |
| if Path(path).exists(): | |
| return path | |
| return None | |
| def _find_ffprobe(self) -> Optional[str]: | |
| """Find FFprobe executable in PATH.""" | |
| ffprobe = shutil.which("ffprobe") | |
| if ffprobe: | |
| return ffprobe | |
| # Try same directory as ffmpeg | |
| if self.ffmpeg_path: | |
| ffmpeg_dir = Path(self.ffmpeg_path).parent | |
| ffprobe_path = ffmpeg_dir / "ffprobe" | |
| if ffprobe_path.exists(): | |
| return str(ffprobe_path) | |
| ffprobe_path = ffmpeg_dir / "ffprobe.exe" | |
| if ffprobe_path.exists(): | |
| return str(ffprobe_path) | |
| return None | |
| def _run_command( | |
| self, | |
| command: List[str], | |
| capture_output: bool = True, | |
| check: bool = True, | |
| ) -> subprocess.CompletedProcess: | |
| """ | |
| Run a subprocess command with error handling. | |
| Args: | |
| command: Command and arguments | |
| capture_output: Whether to capture stdout/stderr | |
| check: Whether to raise on non-zero exit | |
| Returns: | |
| CompletedProcess result | |
| Raises: | |
| VideoProcessingError: If command fails | |
| """ | |
| try: | |
| logger.debug(f"Running command: {' '.join(command)}") | |
| result = subprocess.run( | |
| command, | |
| capture_output=capture_output, | |
| text=True, | |
| check=check, | |
| ) | |
| return result | |
| except subprocess.CalledProcessError as e: | |
| error_msg = e.stderr if e.stderr else str(e) | |
| logger.error(f"Command failed: {error_msg}") | |
| raise VideoProcessingError(f"FFmpeg command failed: {error_msg}") from e | |
| except FileNotFoundError as e: | |
| raise VideoProcessingError(f"FFmpeg not found: {e}") from e | |
| def get_metadata(self, video_path: str | Path) -> VideoMetadata: | |
| """ | |
| Extract metadata from a video file. | |
| Args: | |
| video_path: Path to the video file | |
| Returns: | |
| VideoMetadata object with video information | |
| Raises: | |
| VideoProcessingError: If metadata extraction fails | |
| """ | |
| video_path = Path(video_path) | |
| # Validate file first | |
| validation = validate_video_file(video_path) | |
| if not validation.is_valid: | |
| raise VideoProcessingError(validation.error_message) | |
| if not self.ffprobe_path: | |
| raise VideoProcessingError("FFprobe not found for metadata extraction") | |
| with LogTimer(logger, f"Extracting metadata from {video_path.name}"): | |
| command = [ | |
| self.ffprobe_path, | |
| "-v", "quiet", | |
| "-print_format", "json", | |
| "-show_format", | |
| "-show_streams", | |
| str(video_path), | |
| ] | |
| result = self._run_command(command) | |
| try: | |
| data = json.loads(result.stdout) | |
| except json.JSONDecodeError as e: | |
| raise VideoProcessingError(f"Failed to parse video metadata: {e}") from e | |
| # Extract video stream info | |
| video_stream = None | |
| audio_stream = None | |
| for stream in data.get("streams", []): | |
| if stream.get("codec_type") == "video" and video_stream is None: | |
| video_stream = stream | |
| elif stream.get("codec_type") == "audio" and audio_stream is None: | |
| audio_stream = stream | |
| if not video_stream: | |
| raise VideoProcessingError("No video stream found in file") | |
| # Parse FPS (can be "30/1" or "29.97") | |
| fps_str = video_stream.get("r_frame_rate", "30/1") | |
| if "/" in fps_str: | |
| num, den = map(int, fps_str.split("/")) | |
| fps = num / den if den > 0 else 30.0 | |
| else: | |
| fps = float(fps_str) | |
| # Get format info | |
| format_info = data.get("format", {}) | |
| metadata = VideoMetadata( | |
| duration=float(format_info.get("duration", 0)), | |
| width=int(video_stream.get("width", 0)), | |
| height=int(video_stream.get("height", 0)), | |
| fps=fps, | |
| codec=video_stream.get("codec_name", "unknown"), | |
| bitrate=int(format_info.get("bit_rate", 0)) if format_info.get("bit_rate") else None, | |
| audio_codec=audio_stream.get("codec_name") if audio_stream else None, | |
| audio_sample_rate=int(audio_stream.get("sample_rate", 0)) if audio_stream else None, | |
| file_size=validation.file_size, | |
| file_path=video_path, | |
| ) | |
| logger.info( | |
| f"Video metadata: {metadata.resolution}, " | |
| f"{metadata.fps:.2f}fps, {format_timestamp(metadata.duration)}" | |
| ) | |
| return metadata | |
| def extract_frames( | |
| self, | |
| video_path: str | Path, | |
| output_dir: str | Path, | |
| fps: Optional[float] = None, | |
| timestamps: Optional[List[float]] = None, | |
| start_time: Optional[float] = None, | |
| end_time: Optional[float] = None, | |
| scale: Optional[Tuple[int, int]] = None, | |
| quality: int = 2, | |
| ) -> List[Path]: | |
| """ | |
| Extract frames from video. | |
| Args: | |
| video_path: Path to the video file | |
| output_dir: Directory to save extracted frames | |
| fps: Extract at this FPS (mutually exclusive with timestamps) | |
| timestamps: Specific timestamps to extract (in seconds) | |
| start_time: Start time for extraction (seconds) | |
| end_time: End time for extraction (seconds) | |
| scale: Target resolution (width, height), None to keep original | |
| quality: JPEG quality (1-31, lower is better) | |
| Returns: | |
| List of paths to extracted frame images | |
| Raises: | |
| VideoProcessingError: If frame extraction fails | |
| """ | |
| video_path = Path(video_path) | |
| output_dir = ensure_dir(output_dir) | |
| with LogTimer(logger, f"Extracting frames from {video_path.name}"): | |
| if timestamps: | |
| # Extract specific timestamps | |
| return self._extract_at_timestamps( | |
| video_path, output_dir, timestamps, scale, quality | |
| ) | |
| else: | |
| # Extract at specified FPS | |
| return self._extract_at_fps( | |
| video_path, output_dir, fps or 1.0, | |
| start_time, end_time, scale, quality | |
| ) | |
| def _extract_at_fps( | |
| self, | |
| video_path: Path, | |
| output_dir: Path, | |
| fps: float, | |
| start_time: Optional[float], | |
| end_time: Optional[float], | |
| scale: Optional[Tuple[int, int]], | |
| quality: int, | |
| ) -> List[Path]: | |
| """Extract frames at specified FPS.""" | |
| command = [self.ffmpeg_path, "-y"] | |
| # Input seeking (faster) | |
| if start_time is not None: | |
| command.extend(["-ss", str(start_time)]) | |
| command.extend(["-i", str(video_path)]) | |
| # Duration | |
| if end_time is not None: | |
| duration = end_time - (start_time or 0) | |
| command.extend(["-t", str(duration)]) | |
| # Filters | |
| filters = [f"fps={fps}"] | |
| if scale: | |
| filters.append(f"scale={scale[0]}:{scale[1]}") | |
| command.extend(["-vf", ",".join(filters)]) | |
| # Output settings | |
| command.extend([ | |
| "-q:v", str(quality), | |
| "-f", "image2", | |
| str(output_dir / "frame_%06d.jpg"), | |
| ]) | |
| self._run_command(command) | |
| # Collect output files | |
| frames = sorted(output_dir.glob("frame_*.jpg")) | |
| logger.info(f"Extracted {len(frames)} frames at {fps} FPS") | |
| return frames | |
| def _extract_at_timestamps( | |
| self, | |
| video_path: Path, | |
| output_dir: Path, | |
| timestamps: List[float], | |
| scale: Optional[Tuple[int, int]], | |
| quality: int, | |
| ) -> List[Path]: | |
| """Extract frames at specific timestamps.""" | |
| frames = [] | |
| for i, ts in enumerate(timestamps): | |
| output_path = output_dir / f"frame_{i:06d}.jpg" | |
| command = [ | |
| self.ffmpeg_path, "-y", | |
| "-ss", str(ts), | |
| "-i", str(video_path), | |
| "-vframes", "1", | |
| ] | |
| if scale: | |
| command.extend(["-vf", f"scale={scale[0]}:{scale[1]}"]) | |
| command.extend([ | |
| "-q:v", str(quality), | |
| str(output_path), | |
| ]) | |
| try: | |
| self._run_command(command) | |
| if output_path.exists(): | |
| frames.append(output_path) | |
| except VideoProcessingError as e: | |
| logger.warning(f"Failed to extract frame at {ts}s: {e}") | |
| logger.info(f"Extracted {len(frames)} frames at specific timestamps") | |
| return frames | |
| def extract_audio( | |
| self, | |
| video_path: str | Path, | |
| output_path: str | Path, | |
| sample_rate: int = 16000, | |
| mono: bool = True, | |
| ) -> Path: | |
| """ | |
| Extract audio track from video. | |
| Args: | |
| video_path: Path to the video file | |
| output_path: Path for the output audio file | |
| sample_rate: Audio sample rate (Hz) | |
| mono: Convert to mono if True | |
| Returns: | |
| Path to the extracted audio file | |
| Raises: | |
| VideoProcessingError: If audio extraction fails | |
| """ | |
| video_path = Path(video_path) | |
| output_path = Path(output_path) | |
| with LogTimer(logger, f"Extracting audio from {video_path.name}"): | |
| command = [ | |
| self.ffmpeg_path, "-y", | |
| "-i", str(video_path), | |
| "-vn", # No video | |
| "-acodec", "pcm_s16le", # WAV format | |
| "-ar", str(sample_rate), | |
| ] | |
| if mono: | |
| command.extend(["-ac", "1"]) | |
| command.append(str(output_path)) | |
| self._run_command(command) | |
| if not output_path.exists(): | |
| raise VideoProcessingError("Audio extraction produced no output") | |
| logger.info(f"Extracted audio to {output_path}") | |
| return output_path | |
| def cut_clip( | |
| self, | |
| video_path: str | Path, | |
| output_path: str | Path, | |
| start_time: float, | |
| end_time: float, | |
| reencode: bool = False, | |
| ) -> Path: | |
| """ | |
| Cut a clip from the video. | |
| Args: | |
| video_path: Path to the source video | |
| output_path: Path for the output clip | |
| start_time: Start time in seconds | |
| end_time: End time in seconds | |
| reencode: Whether to re-encode (slower but more precise) | |
| Returns: | |
| Path to the cut clip | |
| Raises: | |
| VideoProcessingError: If cutting fails | |
| """ | |
| video_path = Path(video_path) | |
| output_path = Path(output_path) | |
| duration = end_time - start_time | |
| if duration <= 0: | |
| raise VideoProcessingError( | |
| f"Invalid clip duration: {start_time} to {end_time}" | |
| ) | |
| with LogTimer(logger, f"Cutting clip {format_timestamp(start_time)}-{format_timestamp(end_time)}"): | |
| if reencode: | |
| # Re-encode for precise cutting | |
| command = [ | |
| self.ffmpeg_path, "-y", | |
| "-i", str(video_path), | |
| "-ss", str(start_time), | |
| "-t", str(duration), | |
| "-c:v", "libx264", | |
| "-c:a", "aac", | |
| "-preset", "fast", | |
| str(output_path), | |
| ] | |
| else: | |
| # Stream copy for fast cutting (may be slightly imprecise) | |
| command = [ | |
| self.ffmpeg_path, "-y", | |
| "-ss", str(start_time), | |
| "-i", str(video_path), | |
| "-t", str(duration), | |
| "-c", "copy", | |
| "-avoid_negative_ts", "make_zero", | |
| str(output_path), | |
| ] | |
| self._run_command(command) | |
| if not output_path.exists(): | |
| raise VideoProcessingError("Clip cutting produced no output") | |
| logger.info(f"Cut clip saved to {output_path}") | |
| return output_path | |
| def cut_clips_batch( | |
| self, | |
| video_path: str | Path, | |
| output_dir: str | Path, | |
| segments: List[Tuple[float, float]], | |
| reencode: bool = False, | |
| name_prefix: str = "clip", | |
| ) -> List[Path]: | |
| """ | |
| Cut multiple clips from a video. | |
| Args: | |
| video_path: Path to the source video | |
| output_dir: Directory for output clips | |
| segments: List of (start_time, end_time) tuples | |
| reencode: Whether to re-encode clips | |
| name_prefix: Prefix for output filenames | |
| Returns: | |
| List of paths to cut clips | |
| """ | |
| output_dir = ensure_dir(output_dir) | |
| clips = [] | |
| for i, (start, end) in enumerate(segments): | |
| output_path = output_dir / f"{name_prefix}_{i+1:03d}.mp4" | |
| try: | |
| clip_path = self.cut_clip( | |
| video_path, output_path, start, end, reencode | |
| ) | |
| clips.append(clip_path) | |
| except VideoProcessingError as e: | |
| logger.error(f"Failed to cut clip {i+1}: {e}") | |
| return clips | |
| def get_frame_at_timestamp( | |
| self, | |
| video_path: str | Path, | |
| timestamp: float, | |
| scale: Optional[Tuple[int, int]] = None, | |
| ) -> Optional[np.ndarray]: | |
| """ | |
| Get a single frame at a specific timestamp as numpy array. | |
| Args: | |
| video_path: Path to the video file | |
| timestamp: Timestamp in seconds | |
| scale: Target resolution (width, height) | |
| Returns: | |
| Frame as numpy array (H, W, C) in RGB format, or None if failed | |
| """ | |
| if Image is None: | |
| logger.error("PIL not installed, cannot get frame as array") | |
| return None | |
| import tempfile | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: | |
| tmp_path = Path(tmp.name) | |
| command = [ | |
| self.ffmpeg_path, "-y", | |
| "-ss", str(timestamp), | |
| "-i", str(video_path), | |
| "-vframes", "1", | |
| ] | |
| if scale: | |
| command.extend(["-vf", f"scale={scale[0]}:{scale[1]}"]) | |
| command.extend(["-q:v", "2", str(tmp_path)]) | |
| self._run_command(command) | |
| if tmp_path.exists(): | |
| img = Image.open(tmp_path).convert("RGB") | |
| frame = np.array(img) | |
| tmp_path.unlink() | |
| return frame | |
| except Exception as e: | |
| logger.error(f"Failed to get frame at {timestamp}s: {e}") | |
| return None | |
| def generate_thumbnail( | |
| self, | |
| video_path: str | Path, | |
| output_path: str | Path, | |
| timestamp: Optional[float] = None, | |
| size: Tuple[int, int] = (320, 180), | |
| ) -> Path: | |
| """ | |
| Generate a thumbnail from the video. | |
| Args: | |
| video_path: Path to the video file | |
| output_path: Path for the output thumbnail | |
| timestamp: Timestamp for thumbnail (None = 10% into video) | |
| size: Thumbnail size (width, height) | |
| Returns: | |
| Path to the generated thumbnail | |
| """ | |
| video_path = Path(video_path) | |
| output_path = Path(output_path) | |
| if timestamp is None: | |
| # Default to 10% into the video | |
| metadata = self.get_metadata(video_path) | |
| timestamp = metadata.duration * 0.1 | |
| command = [ | |
| self.ffmpeg_path, "-y", | |
| "-ss", str(timestamp), | |
| "-i", str(video_path), | |
| "-vframes", "1", | |
| "-vf", f"scale={size[0]}:{size[1]}", | |
| "-q:v", "2", | |
| str(output_path), | |
| ] | |
| self._run_command(command) | |
| return output_path | |
| # Export public interface | |
| __all__ = ["VideoProcessor", "VideoMetadata"] | |