""" FFmpeg operations for video clip extraction and concatenation. This module provides functions for extracting clips from videos and concatenating them using FFmpeg. Each FFmpeg command is documented with inline comments explaining what each argument does. Supported methods: - stream_copy: Fastest method, no re-encoding, but cuts on keyframes only - reencode: Frame-accurate cuts with re-encoding, best compression - ultrafast: Frame-accurate cuts with faster encoding, larger files """ import logging import shutil import subprocess import tempfile import time from concurrent.futures import ThreadPoolExecutor, as_completed from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from pydantic import BaseModel, Field logger = logging.getLogger(__name__) class ClipMethod(Enum): """Enum for available clip extraction methods.""" STREAM_COPY = "stream_copy" # Fastest, keyframe-aligned cuts REENCODE = "reencode" # Frame-accurate, best compression ULTRAFAST = "ultrafast" # Frame-accurate, faster encoding class ClipInfo(BaseModel): """Information about a clip to extract.""" start_time: float = Field(..., description="Start time in seconds") end_time: float = Field(..., description="End time in seconds") output_path: Path = Field(..., description="Path for output file") play_number: Optional[int] = Field(None, description="Optional play number for logging") def extract_clip_stream_copy( video_path: str, output_path: str, start_time: float, duration: float, ) -> Tuple[bool, str]: """ Extract a clip using stream copy (no re-encoding). This is the fastest method but can only cut on keyframes, meaning the actual cut points may differ slightly from the requested times. Args: video_path: Path to source video. output_path: Path for output clip. start_time: Start time in seconds. duration: Duration of clip in seconds. Returns: Tuple of (success, error_message). """ cmd = [ "ffmpeg", # FFmpeg executable "-y", # Overwrite output file without asking for confirmation "-ss", str(start_time), # Seek to start position BEFORE input (enables fast seeking) "-i", video_path, # Input video file path "-t", str(duration), # Duration of output clip in seconds "-c", "copy", # Copy streams without re-encoding (very fast, but keyframe-aligned) "-avoid_negative_ts", "make_zero", # Fix timestamp issues that can occur from mid-stream cuts "-loglevel", "error", # Only show errors, suppress informational output output_path, # Output file path ] try: subprocess.run(cmd, check=True, capture_output=True) return (True, "") except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) return (False, error_msg) def extract_clip_reencode( video_path: str, output_path: str, start_time: float, duration: float, preset: str = "fast", crf: int = 23, ) -> Tuple[bool, str]: """ Extract a clip with re-encoding for frame-accurate cuts. This method is slower but provides precise cut points and good compression. Args: video_path: Path to source video. output_path: Path for output clip. start_time: Start time in seconds. duration: Duration of clip in seconds. preset: Encoding preset ("ultrafast", "fast", "medium", "slow"). crf: Constant Rate Factor for quality (lower = better, 18-28 is typical). Returns: Tuple of (success, error_message). """ cmd = [ "ffmpeg", # FFmpeg executable "-y", # Overwrite output file without asking "-ss", str(start_time), # Seek to start position (before -i for fast seeking) "-i", video_path, # Input video file path "-t", str(duration), # Duration of output clip "-c:v", "libx264", # Use H.264 video codec for wide compatibility "-preset", preset, # Encoding speed/compression tradeoff (ultrafast/fast/medium/slow) "-crf", str(crf), # Constant Rate Factor: quality level (18=high, 23=medium, 28=low) "-c:a", "aac", # Use AAC audio codec for compatibility "-b:a", "128k", # Audio bitrate (128 kbps is good quality for speech/commentary) "-loglevel", "error", # Only show errors output_path, # Output file path ] try: subprocess.run(cmd, check=True, capture_output=True) return (True, "") except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) return (False, error_msg) def _get_base_padding(play: Dict[str, Any], padding: float) -> Tuple[float, float]: """Get (start_padding, end_padding) for a play based on its type.""" play_type = play.get("play_type", "normal") if play_type == "normal": return (padding, padding) if play_type == "special": return (3.5, 0.0) if play_type == "flag": return (1.0, 0.0) return (0.0, 0.0) def _compute_mergeable_segments(plays: List[Dict[str, Any]], padding: float) -> List[Tuple[float, float]]: """ Compute clip segments by merging overlapping plays into single clips. When play A (x to x+10) and play B (x+7 to x+15) overlap, we create one segment (x to x+15) instead of two overlapping clips. This eliminates duplicate footage and captures the full context (e.g., play leading into penalty). Returns: List of (start_time, end_time) tuples, one per merged segment. """ if not plays: return [] # Build initial clip boundaries for each play (with type-specific padding) clip_boundaries = [] for play in plays: sp, ep = _get_base_padding(play, padding) start = max(0, play.get("start_time", 0) - sp) end = play.get("end_time", 0) + ep clip_boundaries.append((start, end)) # Merge overlapping segments into single clips segments = [] curr_start, curr_end = clip_boundaries[0] for start, end in clip_boundaries[1:]: if start <= curr_end: # Overlap: extend current segment to include both curr_end = max(curr_end, end) else: # No overlap: save current segment, start new one segments.append((curr_start, curr_end)) curr_start, curr_end = start, end segments.append((curr_start, curr_end)) if len(segments) < len(plays): logger.info( "Merged %d overlapping plays into %d segment%s (no duplicate footage)", len(plays), len(segments), "s" if len(segments) != 1 else "", ) return segments def _extract_clip_for_parallel( args: Tuple[int, float, float, str, Path], ) -> Tuple[int, Path, bool, str]: """ Extract a single clip using stream copy (for parallel execution). Args: args: Tuple of (segment_index, start_time, end_time, video_path, clips_dir). Returns: Tuple of (index, clip_path, success, error_message). """ i, start_time, end_time, video_path, clips_dir = args duration = end_time - start_time clip_path = clips_dir / f"play_{i + 1:02d}.mp4" success, error = extract_clip_stream_copy( video_path=video_path, output_path=str(clip_path), start_time=start_time, duration=duration, ) return (i, clip_path, success, error) def _generate_clips_stream_copy( plays: List[Dict[str, Any]], video_path: str, individual_clips_dir: Path, video_basename: str, padding: float, max_workers: int, generate_individual: bool, ) -> List[Path]: """ Generate clips using stream copy with parallel extraction. This is the fastest method but cuts only on keyframes. Args: plays: List of play dictionaries. video_path: Path to source video. individual_clips_dir: Directory for clip output. video_basename: Base name for logging. padding: Seconds of padding before/after each play. max_workers: Number of parallel workers. generate_individual: Whether to log individual clip creation. Returns: List of clip paths in order. """ # Compute merged segments (overlapping plays become single clips) segments = _compute_mergeable_segments(plays, padding) args_list = [(i, start_time, end_time, video_path, individual_clips_dir) for i, (start_time, end_time) in enumerate(segments)] # Extract clips in parallel clip_results = {} with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(_extract_clip_for_parallel, args): args[0] for args in args_list} for future in as_completed(futures): i, clip_path, success, error = future.result() clip_results[i] = (clip_path, success, error) if not success: logger.error(" Failed to create clip %d: %s", i, error) elif generate_individual: logger.info(" Created: %s/%s", video_basename, clip_path.name) # Build clip paths list in order return [clip_results[i][0] for i in sorted(clip_results.keys())] def _generate_clips_reencode( plays: List[Dict[str, Any]], video_path: str, individual_clips_dir: Path, video_basename: str, clip_method: str, padding: float, generate_individual: bool, ) -> List[Path]: """ Generate clips using re-encoding for frame-accurate cuts. Padding behavior varies by play type: - Normal plays: padding at both start AND end (default padding value) - Special plays: 3.5s start padding, no end padding (capture approach/setup) - Flag plays: 1s start padding, no end padding (flag already visible) - Timeout plays: no padding (just markers, not real plays) Args: plays: List of play dictionaries. video_path: Path to source video. individual_clips_dir: Directory for clip output. video_basename: Base name for logging. clip_method: Either "reencode" or "ultrafast". padding: Seconds of padding for normal plays. generate_individual: Whether to log individual clip creation. Returns: List of clip paths in order. """ preset = "ultrafast" if clip_method == "ultrafast" else "fast" clip_paths = [] segments = _compute_mergeable_segments(plays, padding) for i, (start_time, end_time) in enumerate(segments): duration = end_time - start_time clip_path = individual_clips_dir / f"play_{i + 1:02d}.mp4" clip_paths.append(clip_path) success, error = extract_clip_reencode( video_path=video_path, output_path=str(clip_path), start_time=start_time, duration=duration, preset=preset, ) if not success: logger.error(" Failed to create %s: %s", clip_path.name, error) elif generate_individual: logger.info(" Created: %s/%s (%.1fs)", video_basename, clip_path.name, duration) return clip_paths def concatenate_clips(clip_paths: List[Path], output_path: Path, working_dir: Path | None = None) -> Tuple[bool, str]: """ Concatenate multiple video clips into a single video. Uses FFmpeg's concat demuxer which works with clips that have the same codec parameters (typically clips from the same source video). Args: clip_paths: List of paths to video clips to concatenate. output_path: Path for the concatenated output video. working_dir: Directory to use for the concat list file. If None, uses the parent directory of the first clip. Returns: Tuple of (success, error_message). """ if not clip_paths: return (False, "No clips to concatenate") # Determine working directory for concat list if working_dir is None: working_dir = clip_paths[0].parent # Create concat list file # FFmpeg concat demuxer requires a text file listing all input files concat_list_path = working_dir / "concat_list.txt" with open(concat_list_path, "w", encoding="utf-8") as f: for clip_path in clip_paths: # Use relative paths if in same directory, otherwise absolute if clip_path.parent == working_dir: f.write(f"file '{clip_path.name}'\n") else: f.write(f"file '{clip_path.absolute()}'\n") cmd = [ "ffmpeg", # FFmpeg executable "-y", # Overwrite output without asking "-f", "concat", # Use concat demuxer (reads list of files) "-safe", "0", # Allow absolute paths and special characters in filenames "-i", str(concat_list_path), # Input: the concat list file "-c", "copy", # Copy streams without re-encoding (fast, lossless) "-loglevel", "error", # Only show errors str(output_path), # Output file path ] try: # Run from working directory so relative paths work subprocess.run(cmd, check=True, capture_output=True, cwd=str(working_dir)) # Clean up concat list file concat_list_path.unlink(missing_ok=True) return (True, "") except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) concat_list_path.unlink(missing_ok=True) return (False, error_msg) def generate_clips( plays: List[Dict[str, Any]], video_path: str, output_dir: Path, video_basename: str, clip_method: str = "stream_copy", generate_individual: bool = False, padding: float = 4.0, max_workers: int = 4, ) -> Dict[str, float]: """ Generate video clips for detected plays using FFmpeg. By default, generates only a single concatenated video of all plays. If generate_individual is True, also generates individual play clips. Padding behavior varies by play type: - Normal plays: `padding` seconds at both start AND end - Special plays: 3.5s start, no end (capture approach/setup for punts/FGs/XPs) - Flag plays: 1s start, no end (flag is already visible) - Timeout plays: no padding (just markers, not real plays) Args: plays: List of play dictionaries with "start_time", "end_time", "play_number", "play_type". video_path: Path to source video. output_dir: Output directory for clips. video_basename: Base name for output files (derived from video name). clip_method: Method for clip extraction - "stream_copy" (fastest), "reencode" (best compression), or "ultrafast" (faster encoding). generate_individual: If True, also generate individual play clips. padding: Seconds of padding for normal plays (other types have fixed padding). max_workers: Number of parallel workers for stream_copy method. Returns: Dictionary with timing information: - "clip_extraction": Time spent extracting clips - "concatenation": Time spent concatenating """ # Log the method being used method_descriptions = { "stream_copy": "Stream Copy (fastest, keyframe-aligned cuts)", "reencode": "Re-encode (frame-accurate, best compression)", "ultrafast": "Ultrafast (frame-accurate, faster encoding)", } logger.info("Clip method: %s", method_descriptions.get(clip_method, clip_method)) timing = {"clip_extraction": 0.0, "concatenation": 0.0} if not plays: logger.warning("No plays to generate clips for") return timing # Create clips directory clips_dir = output_dir / "clips" clips_dir.mkdir(parents=True, exist_ok=True) # Determine where to put individual clips # If generate_individual: permanent subfolder named after video # Otherwise: temp directory that gets cleaned up if generate_individual: individual_clips_dir = clips_dir / video_basename individual_clips_dir.mkdir(parents=True, exist_ok=True) temp_dir = None logger.info("Debug mode: generating individual clips in %s/", video_basename) else: temp_dir = tempfile.mkdtemp(prefix="cfb40_clips_") individual_clips_dir = Path(temp_dir) logger.info("Generating clips for concatenation...") # Extract clips using appropriate method t_start = time.perf_counter() if clip_method == "stream_copy": clip_paths = _generate_clips_stream_copy( plays=plays, video_path=video_path, individual_clips_dir=individual_clips_dir, video_basename=video_basename, padding=padding, max_workers=max_workers, generate_individual=generate_individual, ) else: clip_paths = _generate_clips_reencode( plays=plays, video_path=video_path, individual_clips_dir=individual_clips_dir, video_basename=video_basename, clip_method=clip_method, padding=padding, generate_individual=generate_individual, ) timing["clip_extraction"] = time.perf_counter() - t_start # Concatenate all clips into final video if len(clip_paths) >= 1: t_start = time.perf_counter() concat_path = clips_dir / f"{video_basename}_all_plays.mp4" # Ensure output path is absolute so ffmpeg can find it from any working directory concat_path_absolute = concat_path.resolve() logger.info("Concatenating %d clips into %s...", len(clip_paths), concat_path.name) success, error = concatenate_clips(clip_paths, concat_path_absolute, individual_clips_dir) if success: logger.info(" Created: %s", concat_path.name) else: logger.error(" Failed to concatenate: %s", error) timing["concatenation"] = time.perf_counter() - t_start # Clean up temp directory if we used one if temp_dir: shutil.rmtree(temp_dir, ignore_errors=True) return timing def check_hardware_encoder_available(encoder: str) -> bool: """ Check if a hardware encoder is available. Args: encoder: Encoder name (e.g., "h264_videotoolbox", "h264_nvenc"). Returns: True if encoder is available, False otherwise. """ cmd = [ "ffmpeg", # FFmpeg executable "-hide_banner", # Don't show FFmpeg version banner "-encoders", # List all available encoders ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return encoder in result.stdout except subprocess.CalledProcessError: return False