dev_caio / core /video_processor.py
Chaitanya-aitf's picture
Initializing project from local
ad4e58a verified
"""
ShortSmith v2 - Video Processor Module
FFmpeg-based video processing for:
- Extracting video metadata
- Extracting frames at specified timestamps/FPS
- Extracting audio tracks
- Cutting video clips
"""
import subprocess
import json
import shutil
from pathlib import Path
from typing import List, Optional, Tuple, Generator
from dataclasses import dataclass
import numpy as np
try:
from PIL import Image
except ImportError:
Image = None
from utils.logger import get_logger, LogTimer
from utils.helpers import (
VideoProcessingError,
validate_video_file,
ensure_dir,
format_timestamp,
)
from config import get_config
logger = get_logger("core.video_processor")
@dataclass
class VideoMetadata:
"""Video file metadata."""
duration: float # Duration in seconds
width: int
height: int
fps: float
codec: str
bitrate: Optional[int]
audio_codec: Optional[str]
audio_sample_rate: Optional[int]
file_size: int
file_path: Path
@property
def frame_count(self) -> int:
"""Estimated total frame count."""
return int(self.duration * self.fps)
@property
def aspect_ratio(self) -> float:
"""Video aspect ratio."""
return self.width / self.height if self.height > 0 else 0
@property
def resolution(self) -> str:
"""Human-readable resolution string."""
return f"{self.width}x{self.height}"
class VideoProcessor:
"""
FFmpeg-based video processor for frame extraction and manipulation.
Handles all low-level video operations using FFmpeg subprocess calls.
"""
def __init__(self, ffmpeg_path: Optional[str] = None):
"""
Initialize video processor.
Args:
ffmpeg_path: Path to FFmpeg executable (auto-detected if None)
Raises:
VideoProcessingError: If FFmpeg is not found
"""
self.ffmpeg_path = ffmpeg_path or self._find_ffmpeg()
self.ffprobe_path = self._find_ffprobe()
if not self.ffmpeg_path:
raise VideoProcessingError(
"FFmpeg not found. Please install FFmpeg and add it to PATH."
)
logger.info(f"VideoProcessor initialized with FFmpeg: {self.ffmpeg_path}")
def _find_ffmpeg(self) -> Optional[str]:
"""Find FFmpeg executable in PATH."""
ffmpeg = shutil.which("ffmpeg")
if ffmpeg:
return ffmpeg
# Common installation paths
common_paths = [
"/usr/bin/ffmpeg",
"/usr/local/bin/ffmpeg",
"C:\\ffmpeg\\bin\\ffmpeg.exe",
"C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe",
]
for path in common_paths:
if Path(path).exists():
return path
return None
def _find_ffprobe(self) -> Optional[str]:
"""Find FFprobe executable in PATH."""
ffprobe = shutil.which("ffprobe")
if ffprobe:
return ffprobe
# Try same directory as ffmpeg
if self.ffmpeg_path:
ffmpeg_dir = Path(self.ffmpeg_path).parent
ffprobe_path = ffmpeg_dir / "ffprobe"
if ffprobe_path.exists():
return str(ffprobe_path)
ffprobe_path = ffmpeg_dir / "ffprobe.exe"
if ffprobe_path.exists():
return str(ffprobe_path)
return None
def _run_command(
self,
command: List[str],
capture_output: bool = True,
check: bool = True,
) -> subprocess.CompletedProcess:
"""
Run a subprocess command with error handling.
Args:
command: Command and arguments
capture_output: Whether to capture stdout/stderr
check: Whether to raise on non-zero exit
Returns:
CompletedProcess result
Raises:
VideoProcessingError: If command fails
"""
try:
logger.debug(f"Running command: {' '.join(command)}")
result = subprocess.run(
command,
capture_output=capture_output,
text=True,
check=check,
)
return result
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
logger.error(f"Command failed: {error_msg}")
raise VideoProcessingError(f"FFmpeg command failed: {error_msg}") from e
except FileNotFoundError as e:
raise VideoProcessingError(f"FFmpeg not found: {e}") from e
def get_metadata(self, video_path: str | Path) -> VideoMetadata:
"""
Extract metadata from a video file.
Args:
video_path: Path to the video file
Returns:
VideoMetadata object with video information
Raises:
VideoProcessingError: If metadata extraction fails
"""
video_path = Path(video_path)
# Validate file first
validation = validate_video_file(video_path)
if not validation.is_valid:
raise VideoProcessingError(validation.error_message)
if not self.ffprobe_path:
raise VideoProcessingError("FFprobe not found for metadata extraction")
with LogTimer(logger, f"Extracting metadata from {video_path.name}"):
command = [
self.ffprobe_path,
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
str(video_path),
]
result = self._run_command(command)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError as e:
raise VideoProcessingError(f"Failed to parse video metadata: {e}") from e
# Extract video stream info
video_stream = None
audio_stream = None
for stream in data.get("streams", []):
if stream.get("codec_type") == "video" and video_stream is None:
video_stream = stream
elif stream.get("codec_type") == "audio" and audio_stream is None:
audio_stream = stream
if not video_stream:
raise VideoProcessingError("No video stream found in file")
# Parse FPS (can be "30/1" or "29.97")
fps_str = video_stream.get("r_frame_rate", "30/1")
if "/" in fps_str:
num, den = map(int, fps_str.split("/"))
fps = num / den if den > 0 else 30.0
else:
fps = float(fps_str)
# Get format info
format_info = data.get("format", {})
metadata = VideoMetadata(
duration=float(format_info.get("duration", 0)),
width=int(video_stream.get("width", 0)),
height=int(video_stream.get("height", 0)),
fps=fps,
codec=video_stream.get("codec_name", "unknown"),
bitrate=int(format_info.get("bit_rate", 0)) if format_info.get("bit_rate") else None,
audio_codec=audio_stream.get("codec_name") if audio_stream else None,
audio_sample_rate=int(audio_stream.get("sample_rate", 0)) if audio_stream else None,
file_size=validation.file_size,
file_path=video_path,
)
logger.info(
f"Video metadata: {metadata.resolution}, "
f"{metadata.fps:.2f}fps, {format_timestamp(metadata.duration)}"
)
return metadata
def extract_frames(
self,
video_path: str | Path,
output_dir: str | Path,
fps: Optional[float] = None,
timestamps: Optional[List[float]] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
scale: Optional[Tuple[int, int]] = None,
quality: int = 2,
) -> List[Path]:
"""
Extract frames from video.
Args:
video_path: Path to the video file
output_dir: Directory to save extracted frames
fps: Extract at this FPS (mutually exclusive with timestamps)
timestamps: Specific timestamps to extract (in seconds)
start_time: Start time for extraction (seconds)
end_time: End time for extraction (seconds)
scale: Target resolution (width, height), None to keep original
quality: JPEG quality (1-31, lower is better)
Returns:
List of paths to extracted frame images
Raises:
VideoProcessingError: If frame extraction fails
"""
video_path = Path(video_path)
output_dir = ensure_dir(output_dir)
with LogTimer(logger, f"Extracting frames from {video_path.name}"):
if timestamps:
# Extract specific timestamps
return self._extract_at_timestamps(
video_path, output_dir, timestamps, scale, quality
)
else:
# Extract at specified FPS
return self._extract_at_fps(
video_path, output_dir, fps or 1.0,
start_time, end_time, scale, quality
)
def _extract_at_fps(
self,
video_path: Path,
output_dir: Path,
fps: float,
start_time: Optional[float],
end_time: Optional[float],
scale: Optional[Tuple[int, int]],
quality: int,
) -> List[Path]:
"""Extract frames at specified FPS."""
command = [self.ffmpeg_path, "-y"]
# Input seeking (faster)
if start_time is not None:
command.extend(["-ss", str(start_time)])
command.extend(["-i", str(video_path)])
# Duration
if end_time is not None:
duration = end_time - (start_time or 0)
command.extend(["-t", str(duration)])
# Filters
filters = [f"fps={fps}"]
if scale:
filters.append(f"scale={scale[0]}:{scale[1]}")
command.extend(["-vf", ",".join(filters)])
# Output settings
command.extend([
"-q:v", str(quality),
"-f", "image2",
str(output_dir / "frame_%06d.jpg"),
])
self._run_command(command)
# Collect output files
frames = sorted(output_dir.glob("frame_*.jpg"))
logger.info(f"Extracted {len(frames)} frames at {fps} FPS")
return frames
def _extract_at_timestamps(
self,
video_path: Path,
output_dir: Path,
timestamps: List[float],
scale: Optional[Tuple[int, int]],
quality: int,
) -> List[Path]:
"""Extract frames at specific timestamps."""
frames = []
for i, ts in enumerate(timestamps):
output_path = output_dir / f"frame_{i:06d}.jpg"
command = [
self.ffmpeg_path, "-y",
"-ss", str(ts),
"-i", str(video_path),
"-vframes", "1",
]
if scale:
command.extend(["-vf", f"scale={scale[0]}:{scale[1]}"])
command.extend([
"-q:v", str(quality),
str(output_path),
])
try:
self._run_command(command)
if output_path.exists():
frames.append(output_path)
except VideoProcessingError as e:
logger.warning(f"Failed to extract frame at {ts}s: {e}")
logger.info(f"Extracted {len(frames)} frames at specific timestamps")
return frames
def extract_audio(
self,
video_path: str | Path,
output_path: str | Path,
sample_rate: int = 16000,
mono: bool = True,
) -> Path:
"""
Extract audio track from video.
Args:
video_path: Path to the video file
output_path: Path for the output audio file
sample_rate: Audio sample rate (Hz)
mono: Convert to mono if True
Returns:
Path to the extracted audio file
Raises:
VideoProcessingError: If audio extraction fails
"""
video_path = Path(video_path)
output_path = Path(output_path)
with LogTimer(logger, f"Extracting audio from {video_path.name}"):
command = [
self.ffmpeg_path, "-y",
"-i", str(video_path),
"-vn", # No video
"-acodec", "pcm_s16le", # WAV format
"-ar", str(sample_rate),
]
if mono:
command.extend(["-ac", "1"])
command.append(str(output_path))
self._run_command(command)
if not output_path.exists():
raise VideoProcessingError("Audio extraction produced no output")
logger.info(f"Extracted audio to {output_path}")
return output_path
def cut_clip(
self,
video_path: str | Path,
output_path: str | Path,
start_time: float,
end_time: float,
reencode: bool = False,
) -> Path:
"""
Cut a clip from the video.
Args:
video_path: Path to the source video
output_path: Path for the output clip
start_time: Start time in seconds
end_time: End time in seconds
reencode: Whether to re-encode (slower but more precise)
Returns:
Path to the cut clip
Raises:
VideoProcessingError: If cutting fails
"""
video_path = Path(video_path)
output_path = Path(output_path)
duration = end_time - start_time
if duration <= 0:
raise VideoProcessingError(
f"Invalid clip duration: {start_time} to {end_time}"
)
with LogTimer(logger, f"Cutting clip {format_timestamp(start_time)}-{format_timestamp(end_time)}"):
if reencode:
# Re-encode for precise cutting
command = [
self.ffmpeg_path, "-y",
"-i", str(video_path),
"-ss", str(start_time),
"-t", str(duration),
"-c:v", "libx264",
"-c:a", "aac",
"-preset", "fast",
str(output_path),
]
else:
# Stream copy for fast cutting (may be slightly imprecise)
command = [
self.ffmpeg_path, "-y",
"-ss", str(start_time),
"-i", str(video_path),
"-t", str(duration),
"-c", "copy",
"-avoid_negative_ts", "make_zero",
str(output_path),
]
self._run_command(command)
if not output_path.exists():
raise VideoProcessingError("Clip cutting produced no output")
logger.info(f"Cut clip saved to {output_path}")
return output_path
def cut_clips_batch(
self,
video_path: str | Path,
output_dir: str | Path,
segments: List[Tuple[float, float]],
reencode: bool = False,
name_prefix: str = "clip",
) -> List[Path]:
"""
Cut multiple clips from a video.
Args:
video_path: Path to the source video
output_dir: Directory for output clips
segments: List of (start_time, end_time) tuples
reencode: Whether to re-encode clips
name_prefix: Prefix for output filenames
Returns:
List of paths to cut clips
"""
output_dir = ensure_dir(output_dir)
clips = []
for i, (start, end) in enumerate(segments):
output_path = output_dir / f"{name_prefix}_{i+1:03d}.mp4"
try:
clip_path = self.cut_clip(
video_path, output_path, start, end, reencode
)
clips.append(clip_path)
except VideoProcessingError as e:
logger.error(f"Failed to cut clip {i+1}: {e}")
return clips
def get_frame_at_timestamp(
self,
video_path: str | Path,
timestamp: float,
scale: Optional[Tuple[int, int]] = None,
) -> Optional[np.ndarray]:
"""
Get a single frame at a specific timestamp as numpy array.
Args:
video_path: Path to the video file
timestamp: Timestamp in seconds
scale: Target resolution (width, height)
Returns:
Frame as numpy array (H, W, C) in RGB format, or None if failed
"""
if Image is None:
logger.error("PIL not installed, cannot get frame as array")
return None
import tempfile
try:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
tmp_path = Path(tmp.name)
command = [
self.ffmpeg_path, "-y",
"-ss", str(timestamp),
"-i", str(video_path),
"-vframes", "1",
]
if scale:
command.extend(["-vf", f"scale={scale[0]}:{scale[1]}"])
command.extend(["-q:v", "2", str(tmp_path)])
self._run_command(command)
if tmp_path.exists():
img = Image.open(tmp_path).convert("RGB")
frame = np.array(img)
tmp_path.unlink()
return frame
except Exception as e:
logger.error(f"Failed to get frame at {timestamp}s: {e}")
return None
def generate_thumbnail(
self,
video_path: str | Path,
output_path: str | Path,
timestamp: Optional[float] = None,
size: Tuple[int, int] = (320, 180),
) -> Path:
"""
Generate a thumbnail from the video.
Args:
video_path: Path to the video file
output_path: Path for the output thumbnail
timestamp: Timestamp for thumbnail (None = 10% into video)
size: Thumbnail size (width, height)
Returns:
Path to the generated thumbnail
"""
video_path = Path(video_path)
output_path = Path(output_path)
if timestamp is None:
# Default to 10% into the video
metadata = self.get_metadata(video_path)
timestamp = metadata.duration * 0.1
command = [
self.ffmpeg_path, "-y",
"-ss", str(timestamp),
"-i", str(video_path),
"-vframes", "1",
"-vf", f"scale={size[0]}:{size[1]}",
"-q:v", "2",
str(output_path),
]
self._run_command(command)
return output_path
# Export public interface
__all__ = ["VideoProcessor", "VideoMetadata"]