Spaces:
Sleeping
Sleeping
| """ | |
| FFmpeg operations for video clip extraction and concatenation. | |
| This module provides functions for extracting clips from videos and concatenating | |
| them using FFmpeg. Each FFmpeg command is documented with inline comments | |
| explaining what each argument does. | |
| Supported methods: | |
| - stream_copy: Fastest method, no re-encoding, but cuts on keyframes only | |
| - reencode: Frame-accurate cuts with re-encoding, best compression | |
| - ultrafast: Frame-accurate cuts with faster encoding, larger files | |
| """ | |
| import logging | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from pydantic import BaseModel, Field | |
| logger = logging.getLogger(__name__) | |
| class ClipMethod(Enum): | |
| """Enum for available clip extraction methods.""" | |
| STREAM_COPY = "stream_copy" # Fastest, keyframe-aligned cuts | |
| REENCODE = "reencode" # Frame-accurate, best compression | |
| ULTRAFAST = "ultrafast" # Frame-accurate, faster encoding | |
| class ClipInfo(BaseModel): | |
| """Information about a clip to extract.""" | |
| start_time: float = Field(..., description="Start time in seconds") | |
| end_time: float = Field(..., description="End time in seconds") | |
| output_path: Path = Field(..., description="Path for output file") | |
| play_number: Optional[int] = Field(None, description="Optional play number for logging") | |
| def extract_clip_stream_copy( | |
| video_path: str, | |
| output_path: str, | |
| start_time: float, | |
| duration: float, | |
| ) -> Tuple[bool, str]: | |
| """ | |
| Extract a clip using stream copy (no re-encoding). | |
| This is the fastest method but can only cut on keyframes, meaning | |
| the actual cut points may differ slightly from the requested times. | |
| Args: | |
| video_path: Path to source video. | |
| output_path: Path for output clip. | |
| start_time: Start time in seconds. | |
| duration: Duration of clip in seconds. | |
| Returns: | |
| Tuple of (success, error_message). | |
| """ | |
| cmd = [ | |
| "ffmpeg", # FFmpeg executable | |
| "-y", # Overwrite output file without asking for confirmation | |
| "-ss", | |
| str(start_time), # Seek to start position BEFORE input (enables fast seeking) | |
| "-i", | |
| video_path, # Input video file path | |
| "-t", | |
| str(duration), # Duration of output clip in seconds | |
| "-c", | |
| "copy", # Copy streams without re-encoding (very fast, but keyframe-aligned) | |
| "-avoid_negative_ts", | |
| "make_zero", # Fix timestamp issues that can occur from mid-stream cuts | |
| "-loglevel", | |
| "error", # Only show errors, suppress informational output | |
| output_path, # Output file path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| return (True, "") | |
| except subprocess.CalledProcessError as e: | |
| error_msg = e.stderr.decode() if e.stderr else str(e) | |
| return (False, error_msg) | |
| def extract_clip_reencode( | |
| video_path: str, | |
| output_path: str, | |
| start_time: float, | |
| duration: float, | |
| preset: str = "fast", | |
| crf: int = 23, | |
| ) -> Tuple[bool, str]: | |
| """ | |
| Extract a clip with re-encoding for frame-accurate cuts. | |
| This method is slower but provides precise cut points and good compression. | |
| Args: | |
| video_path: Path to source video. | |
| output_path: Path for output clip. | |
| start_time: Start time in seconds. | |
| duration: Duration of clip in seconds. | |
| preset: Encoding preset ("ultrafast", "fast", "medium", "slow"). | |
| crf: Constant Rate Factor for quality (lower = better, 18-28 is typical). | |
| Returns: | |
| Tuple of (success, error_message). | |
| """ | |
| cmd = [ | |
| "ffmpeg", # FFmpeg executable | |
| "-y", # Overwrite output file without asking | |
| "-ss", | |
| str(start_time), # Seek to start position (before -i for fast seeking) | |
| "-i", | |
| video_path, # Input video file path | |
| "-t", | |
| str(duration), # Duration of output clip | |
| "-c:v", | |
| "libx264", # Use H.264 video codec for wide compatibility | |
| "-preset", | |
| preset, # Encoding speed/compression tradeoff (ultrafast/fast/medium/slow) | |
| "-crf", | |
| str(crf), # Constant Rate Factor: quality level (18=high, 23=medium, 28=low) | |
| "-c:a", | |
| "aac", # Use AAC audio codec for compatibility | |
| "-b:a", | |
| "128k", # Audio bitrate (128 kbps is good quality for speech/commentary) | |
| "-loglevel", | |
| "error", # Only show errors | |
| output_path, # Output file path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| return (True, "") | |
| except subprocess.CalledProcessError as e: | |
| error_msg = e.stderr.decode() if e.stderr else str(e) | |
| return (False, error_msg) | |
| def _get_base_padding(play: Dict[str, Any], padding: float) -> Tuple[float, float]: | |
| """Get (start_padding, end_padding) for a play based on its type.""" | |
| play_type = play.get("play_type", "normal") | |
| if play_type == "normal": | |
| return (padding, padding) | |
| if play_type == "special": | |
| return (3.5, 0.0) | |
| if play_type == "flag": | |
| return (1.0, 0.0) | |
| return (0.0, 0.0) | |
| def _compute_mergeable_segments(plays: List[Dict[str, Any]], padding: float) -> List[Tuple[float, float]]: | |
| """ | |
| Compute clip segments by merging overlapping plays into single clips. | |
| When play A (x to x+10) and play B (x+7 to x+15) overlap, we create one segment | |
| (x to x+15) instead of two overlapping clips. This eliminates duplicate footage | |
| and captures the full context (e.g., play leading into penalty). | |
| Returns: | |
| List of (start_time, end_time) tuples, one per merged segment. | |
| """ | |
| if not plays: | |
| return [] | |
| # Build initial clip boundaries for each play (with type-specific padding) | |
| clip_boundaries = [] | |
| for play in plays: | |
| sp, ep = _get_base_padding(play, padding) | |
| start = max(0, play.get("start_time", 0) - sp) | |
| end = play.get("end_time", 0) + ep | |
| clip_boundaries.append((start, end)) | |
| # Merge overlapping segments into single clips | |
| segments = [] | |
| curr_start, curr_end = clip_boundaries[0] | |
| for start, end in clip_boundaries[1:]: | |
| if start <= curr_end: | |
| # Overlap: extend current segment to include both | |
| curr_end = max(curr_end, end) | |
| else: | |
| # No overlap: save current segment, start new one | |
| segments.append((curr_start, curr_end)) | |
| curr_start, curr_end = start, end | |
| segments.append((curr_start, curr_end)) | |
| if len(segments) < len(plays): | |
| logger.info( | |
| "Merged %d overlapping plays into %d segment%s (no duplicate footage)", | |
| len(plays), | |
| len(segments), | |
| "s" if len(segments) != 1 else "", | |
| ) | |
| return segments | |
| def _extract_clip_for_parallel( | |
| args: Tuple[int, float, float, str, Path], | |
| ) -> Tuple[int, Path, bool, str]: | |
| """ | |
| Extract a single clip using stream copy (for parallel execution). | |
| Args: | |
| args: Tuple of (segment_index, start_time, end_time, video_path, clips_dir). | |
| Returns: | |
| Tuple of (index, clip_path, success, error_message). | |
| """ | |
| i, start_time, end_time, video_path, clips_dir = args | |
| duration = end_time - start_time | |
| clip_path = clips_dir / f"play_{i + 1:02d}.mp4" | |
| success, error = extract_clip_stream_copy( | |
| video_path=video_path, | |
| output_path=str(clip_path), | |
| start_time=start_time, | |
| duration=duration, | |
| ) | |
| return (i, clip_path, success, error) | |
| def _generate_clips_stream_copy( | |
| plays: List[Dict[str, Any]], | |
| video_path: str, | |
| individual_clips_dir: Path, | |
| video_basename: str, | |
| padding: float, | |
| max_workers: int, | |
| generate_individual: bool, | |
| ) -> List[Path]: | |
| """ | |
| Generate clips using stream copy with parallel extraction. | |
| This is the fastest method but cuts only on keyframes. | |
| Args: | |
| plays: List of play dictionaries. | |
| video_path: Path to source video. | |
| individual_clips_dir: Directory for clip output. | |
| video_basename: Base name for logging. | |
| padding: Seconds of padding before/after each play. | |
| max_workers: Number of parallel workers. | |
| generate_individual: Whether to log individual clip creation. | |
| Returns: | |
| List of clip paths in order. | |
| """ | |
| # Compute merged segments (overlapping plays become single clips) | |
| segments = _compute_mergeable_segments(plays, padding) | |
| args_list = [(i, start_time, end_time, video_path, individual_clips_dir) for i, (start_time, end_time) in enumerate(segments)] | |
| # Extract clips in parallel | |
| clip_results = {} | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| futures = {executor.submit(_extract_clip_for_parallel, args): args[0] for args in args_list} | |
| for future in as_completed(futures): | |
| i, clip_path, success, error = future.result() | |
| clip_results[i] = (clip_path, success, error) | |
| if not success: | |
| logger.error(" Failed to create clip %d: %s", i, error) | |
| elif generate_individual: | |
| logger.info(" Created: %s/%s", video_basename, clip_path.name) | |
| # Build clip paths list in order | |
| return [clip_results[i][0] for i in sorted(clip_results.keys())] | |
| def _generate_clips_reencode( | |
| plays: List[Dict[str, Any]], | |
| video_path: str, | |
| individual_clips_dir: Path, | |
| video_basename: str, | |
| clip_method: str, | |
| padding: float, | |
| generate_individual: bool, | |
| ) -> List[Path]: | |
| """ | |
| Generate clips using re-encoding for frame-accurate cuts. | |
| Padding behavior varies by play type: | |
| - Normal plays: padding at both start AND end (default padding value) | |
| - Special plays: 3.5s start padding, no end padding (capture approach/setup) | |
| - Flag plays: 1s start padding, no end padding (flag already visible) | |
| - Timeout plays: no padding (just markers, not real plays) | |
| Args: | |
| plays: List of play dictionaries. | |
| video_path: Path to source video. | |
| individual_clips_dir: Directory for clip output. | |
| video_basename: Base name for logging. | |
| clip_method: Either "reencode" or "ultrafast". | |
| padding: Seconds of padding for normal plays. | |
| generate_individual: Whether to log individual clip creation. | |
| Returns: | |
| List of clip paths in order. | |
| """ | |
| preset = "ultrafast" if clip_method == "ultrafast" else "fast" | |
| clip_paths = [] | |
| segments = _compute_mergeable_segments(plays, padding) | |
| for i, (start_time, end_time) in enumerate(segments): | |
| duration = end_time - start_time | |
| clip_path = individual_clips_dir / f"play_{i + 1:02d}.mp4" | |
| clip_paths.append(clip_path) | |
| success, error = extract_clip_reencode( | |
| video_path=video_path, | |
| output_path=str(clip_path), | |
| start_time=start_time, | |
| duration=duration, | |
| preset=preset, | |
| ) | |
| if not success: | |
| logger.error(" Failed to create %s: %s", clip_path.name, error) | |
| elif generate_individual: | |
| logger.info(" Created: %s/%s (%.1fs)", video_basename, clip_path.name, duration) | |
| return clip_paths | |
| def concatenate_clips(clip_paths: List[Path], output_path: Path, working_dir: Path | None = None) -> Tuple[bool, str]: | |
| """ | |
| Concatenate multiple video clips into a single video. | |
| Uses FFmpeg's concat demuxer which works with clips that have the same | |
| codec parameters (typically clips from the same source video). | |
| Args: | |
| clip_paths: List of paths to video clips to concatenate. | |
| output_path: Path for the concatenated output video. | |
| working_dir: Directory to use for the concat list file. | |
| If None, uses the parent directory of the first clip. | |
| Returns: | |
| Tuple of (success, error_message). | |
| """ | |
| if not clip_paths: | |
| return (False, "No clips to concatenate") | |
| # Determine working directory for concat list | |
| if working_dir is None: | |
| working_dir = clip_paths[0].parent | |
| # Create concat list file | |
| # FFmpeg concat demuxer requires a text file listing all input files | |
| concat_list_path = working_dir / "concat_list.txt" | |
| with open(concat_list_path, "w", encoding="utf-8") as f: | |
| for clip_path in clip_paths: | |
| # Use relative paths if in same directory, otherwise absolute | |
| if clip_path.parent == working_dir: | |
| f.write(f"file '{clip_path.name}'\n") | |
| else: | |
| f.write(f"file '{clip_path.absolute()}'\n") | |
| cmd = [ | |
| "ffmpeg", # FFmpeg executable | |
| "-y", # Overwrite output without asking | |
| "-f", | |
| "concat", # Use concat demuxer (reads list of files) | |
| "-safe", | |
| "0", # Allow absolute paths and special characters in filenames | |
| "-i", | |
| str(concat_list_path), # Input: the concat list file | |
| "-c", | |
| "copy", # Copy streams without re-encoding (fast, lossless) | |
| "-loglevel", | |
| "error", # Only show errors | |
| str(output_path), # Output file path | |
| ] | |
| try: | |
| # Run from working directory so relative paths work | |
| subprocess.run(cmd, check=True, capture_output=True, cwd=str(working_dir)) | |
| # Clean up concat list file | |
| concat_list_path.unlink(missing_ok=True) | |
| return (True, "") | |
| except subprocess.CalledProcessError as e: | |
| error_msg = e.stderr.decode() if e.stderr else str(e) | |
| concat_list_path.unlink(missing_ok=True) | |
| return (False, error_msg) | |
| def generate_clips( | |
| plays: List[Dict[str, Any]], | |
| video_path: str, | |
| output_dir: Path, | |
| video_basename: str, | |
| clip_method: str = "stream_copy", | |
| generate_individual: bool = False, | |
| padding: float = 4.0, | |
| max_workers: int = 4, | |
| ) -> Dict[str, float]: | |
| """ | |
| Generate video clips for detected plays using FFmpeg. | |
| By default, generates only a single concatenated video of all plays. | |
| If generate_individual is True, also generates individual play clips. | |
| Padding behavior varies by play type: | |
| - Normal plays: `padding` seconds at both start AND end | |
| - Special plays: 3.5s start, no end (capture approach/setup for punts/FGs/XPs) | |
| - Flag plays: 1s start, no end (flag is already visible) | |
| - Timeout plays: no padding (just markers, not real plays) | |
| Args: | |
| plays: List of play dictionaries with "start_time", "end_time", "play_number", "play_type". | |
| video_path: Path to source video. | |
| output_dir: Output directory for clips. | |
| video_basename: Base name for output files (derived from video name). | |
| clip_method: Method for clip extraction - "stream_copy" (fastest), | |
| "reencode" (best compression), or "ultrafast" (faster encoding). | |
| generate_individual: If True, also generate individual play clips. | |
| padding: Seconds of padding for normal plays (other types have fixed padding). | |
| max_workers: Number of parallel workers for stream_copy method. | |
| Returns: | |
| Dictionary with timing information: | |
| - "clip_extraction": Time spent extracting clips | |
| - "concatenation": Time spent concatenating | |
| """ | |
| # Log the method being used | |
| method_descriptions = { | |
| "stream_copy": "Stream Copy (fastest, keyframe-aligned cuts)", | |
| "reencode": "Re-encode (frame-accurate, best compression)", | |
| "ultrafast": "Ultrafast (frame-accurate, faster encoding)", | |
| } | |
| logger.info("Clip method: %s", method_descriptions.get(clip_method, clip_method)) | |
| timing = {"clip_extraction": 0.0, "concatenation": 0.0} | |
| if not plays: | |
| logger.warning("No plays to generate clips for") | |
| return timing | |
| # Create clips directory | |
| clips_dir = output_dir / "clips" | |
| clips_dir.mkdir(parents=True, exist_ok=True) | |
| # Determine where to put individual clips | |
| # If generate_individual: permanent subfolder named after video | |
| # Otherwise: temp directory that gets cleaned up | |
| if generate_individual: | |
| individual_clips_dir = clips_dir / video_basename | |
| individual_clips_dir.mkdir(parents=True, exist_ok=True) | |
| temp_dir = None | |
| logger.info("Debug mode: generating individual clips in %s/", video_basename) | |
| else: | |
| temp_dir = tempfile.mkdtemp(prefix="cfb40_clips_") | |
| individual_clips_dir = Path(temp_dir) | |
| logger.info("Generating clips for concatenation...") | |
| # Extract clips using appropriate method | |
| t_start = time.perf_counter() | |
| if clip_method == "stream_copy": | |
| clip_paths = _generate_clips_stream_copy( | |
| plays=plays, | |
| video_path=video_path, | |
| individual_clips_dir=individual_clips_dir, | |
| video_basename=video_basename, | |
| padding=padding, | |
| max_workers=max_workers, | |
| generate_individual=generate_individual, | |
| ) | |
| else: | |
| clip_paths = _generate_clips_reencode( | |
| plays=plays, | |
| video_path=video_path, | |
| individual_clips_dir=individual_clips_dir, | |
| video_basename=video_basename, | |
| clip_method=clip_method, | |
| padding=padding, | |
| generate_individual=generate_individual, | |
| ) | |
| timing["clip_extraction"] = time.perf_counter() - t_start | |
| # Concatenate all clips into final video | |
| if len(clip_paths) >= 1: | |
| t_start = time.perf_counter() | |
| concat_path = clips_dir / f"{video_basename}_all_plays.mp4" | |
| # Ensure output path is absolute so ffmpeg can find it from any working directory | |
| concat_path_absolute = concat_path.resolve() | |
| logger.info("Concatenating %d clips into %s...", len(clip_paths), concat_path.name) | |
| success, error = concatenate_clips(clip_paths, concat_path_absolute, individual_clips_dir) | |
| if success: | |
| logger.info(" Created: %s", concat_path.name) | |
| else: | |
| logger.error(" Failed to concatenate: %s", error) | |
| timing["concatenation"] = time.perf_counter() - t_start | |
| # Clean up temp directory if we used one | |
| if temp_dir: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return timing | |
| def check_hardware_encoder_available(encoder: str) -> bool: | |
| """ | |
| Check if a hardware encoder is available. | |
| Args: | |
| encoder: Encoder name (e.g., "h264_videotoolbox", "h264_nvenc"). | |
| Returns: | |
| True if encoder is available, False otherwise. | |
| """ | |
| cmd = [ | |
| "ffmpeg", # FFmpeg executable | |
| "-hide_banner", # Don't show FFmpeg version banner | |
| "-encoders", # List all available encoders | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return encoder in result.stdout | |
| except subprocess.CalledProcessError: | |
| return False | |