""" ShortSmith v2 - Clip Extractor Module Final clip extraction and output generation. Handles cutting clips at precise timestamps with various output options. """ from pathlib import Path from typing import List, Optional, Tuple from dataclasses import dataclass, field import shutil from utils.logger import get_logger, LogTimer from utils.helpers import ( VideoProcessingError, ensure_dir, format_timestamp, get_unique_filename, ) from config import get_config, ProcessingConfig from core.video_processor import VideoProcessor, VideoMetadata logger = get_logger("core.clip_extractor") @dataclass class ExtractedClip: """Represents an extracted video clip.""" clip_path: Path # Path to the clip file start_time: float # Start timestamp in source video end_time: float # End timestamp in source video hype_score: float # Normalized hype score (0-1) rank: int # Rank among all clips (1 = best) thumbnail_path: Optional[Path] = None # Path to thumbnail # Metadata source_video: Optional[Path] = None person_detected: bool = False person_screen_time: float = 0.0 # Percentage of clip with target person # Additional scores visual_score: float = 0.0 audio_score: float = 0.0 motion_score: float = 0.0 @property def duration(self) -> float: """Clip duration in seconds.""" return self.end_time - self.start_time @property def time_range(self) -> str: """Human-readable time range.""" return f"{format_timestamp(self.start_time)} - {format_timestamp(self.end_time)}" def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { "clip_path": str(self.clip_path), "start_time": self.start_time, "end_time": self.end_time, "duration": self.duration, "hype_score": round(self.hype_score, 4), "rank": self.rank, "time_range": self.time_range, "visual_score": round(self.visual_score, 4), "audio_score": round(self.audio_score, 4), "motion_score": round(self.motion_score, 4), "person_detected": self.person_detected, "person_screen_time": round(self.person_screen_time, 4), } @dataclass class ClipCandidate: """A candidate segment for clip extraction.""" start_time: float end_time: float hype_score: float visual_score: float = 0.0 audio_score: float = 0.0 motion_score: float = 0.0 person_score: float = 0.0 # Target person visibility @property def duration(self) -> float: return self.end_time - self.start_time class ClipExtractor: """ Extracts final clips from video based on hype scores. Handles: - Selecting top segments based on scores - Enforcing diversity (minimum gap between clips) - Adjusting clip boundaries to scene cuts - Generating thumbnails """ def __init__( self, video_processor: VideoProcessor, config: Optional[ProcessingConfig] = None, ): """ Initialize clip extractor. Args: video_processor: VideoProcessor instance for clip cutting config: Processing configuration (uses default if None) """ self.video_processor = video_processor self.config = config or get_config().processing logger.info( f"ClipExtractor initialized (duration={self.config.min_clip_duration}-" f"{self.config.max_clip_duration}s, gap={self.config.min_gap_between_clips}s)" ) def select_clips( self, candidates: List[ClipCandidate], num_clips: int, enforce_diversity: bool = True, ) -> List[ClipCandidate]: """ Select top clips from candidates. Args: candidates: List of clip candidates with scores num_clips: Number of clips to select enforce_diversity: Enforce minimum gap between clips Returns: List of selected ClipCandidate objects """ if not candidates: logger.warning("No candidates provided for selection") return [] # Sort by hype score sorted_candidates = sorted( candidates, key=lambda c: c.hype_score, reverse=True ) if not enforce_diversity: return sorted_candidates[:num_clips] # Select with diversity constraint selected = [] min_gap = self.config.min_gap_between_clips for candidate in sorted_candidates: if len(selected) >= num_clips: break # Check if this candidate is far enough from existing selections is_diverse = True for existing in selected: # Calculate gap between clip starts gap = abs(candidate.start_time - existing.start_time) if gap < min_gap: is_diverse = False break if is_diverse: selected.append(candidate) # If we couldn't get enough with diversity, relax constraint if len(selected) < num_clips: logger.warning( f"Only {len(selected)} diverse clips found, " f"relaxing diversity constraint" ) for candidate in sorted_candidates: if candidate not in selected: selected.append(candidate) if len(selected) >= num_clips: break logger.info(f"Selected {len(selected)} clips from {len(candidates)} candidates") return selected def adjust_to_scene_boundaries( self, candidates: List[ClipCandidate], scene_boundaries: List[float], tolerance: float = 1.0, ) -> List[ClipCandidate]: """ Adjust clip boundaries to align with scene cuts. Args: candidates: List of clip candidates scene_boundaries: List of scene boundary timestamps tolerance: Maximum adjustment in seconds Returns: List of adjusted ClipCandidate objects """ if not scene_boundaries: return candidates adjusted = [] for candidate in candidates: new_start = candidate.start_time new_end = candidate.end_time # Find nearest scene boundary for start for boundary in scene_boundaries: if abs(boundary - candidate.start_time) < tolerance: new_start = boundary break # Find nearest scene boundary for end for boundary in scene_boundaries: if abs(boundary - candidate.end_time) < tolerance: new_end = boundary break # Ensure minimum duration if new_end - new_start < self.config.min_clip_duration: # Keep original boundaries new_start = candidate.start_time new_end = candidate.end_time adjusted.append(ClipCandidate( start_time=new_start, end_time=new_end, hype_score=candidate.hype_score, visual_score=candidate.visual_score, audio_score=candidate.audio_score, motion_score=candidate.motion_score, person_score=candidate.person_score, )) return adjusted def extract_clips( self, video_path: str | Path, output_dir: str | Path, candidates: List[ClipCandidate], num_clips: Optional[int] = None, generate_thumbnails: bool = True, reencode: bool = False, ) -> List[ExtractedClip]: """ Extract clips from video. Args: video_path: Path to source video output_dir: Directory for output clips candidates: List of clip candidates num_clips: Number of clips to extract (None = use config default) generate_thumbnails: Whether to generate thumbnails reencode: Whether to re-encode clips (slower but precise) Returns: List of ExtractedClip objects """ video_path = Path(video_path) output_dir = ensure_dir(output_dir) num_clips = num_clips or self.config.default_num_clips with LogTimer(logger, f"Extracting {num_clips} clips"): # Select top clips selected = self.select_clips(candidates, num_clips) if not selected: logger.warning("No clips to extract") return [] # Extract each clip clips = [] for rank, candidate in enumerate(selected, 1): try: clip = self._extract_single_clip( video_path=video_path, output_dir=output_dir, candidate=candidate, rank=rank, generate_thumbnail=generate_thumbnails, reencode=reencode, ) clips.append(clip) except Exception as e: logger.error(f"Failed to extract clip {rank}: {e}") logger.info(f"Successfully extracted {len(clips)} clips") return clips def _extract_single_clip( self, video_path: Path, output_dir: Path, candidate: ClipCandidate, rank: int, generate_thumbnail: bool, reencode: bool, ) -> ExtractedClip: """Extract a single clip.""" # Generate output filename clip_filename = f"clip_{rank:02d}_{format_timestamp(candidate.start_time).replace(':', '-')}.mp4" clip_path = output_dir / clip_filename # Cut the clip self.video_processor.cut_clip( video_path=video_path, output_path=clip_path, start_time=candidate.start_time, end_time=candidate.end_time, reencode=reencode, ) # Generate thumbnail thumbnail_path = None if generate_thumbnail: try: thumb_filename = f"thumb_{rank:02d}.jpg" thumbnail_path = output_dir / "thumbnails" / thumb_filename thumbnail_path.parent.mkdir(exist_ok=True) # Thumbnail at 1/3 into the clip thumb_time = candidate.start_time + (candidate.duration / 3) self.video_processor.generate_thumbnail( video_path=video_path, output_path=thumbnail_path, timestamp=thumb_time, ) except Exception as e: logger.warning(f"Failed to generate thumbnail for clip {rank}: {e}") thumbnail_path = None return ExtractedClip( clip_path=clip_path, start_time=candidate.start_time, end_time=candidate.end_time, hype_score=candidate.hype_score, rank=rank, thumbnail_path=thumbnail_path, source_video=video_path, visual_score=candidate.visual_score, audio_score=candidate.audio_score, motion_score=candidate.motion_score, person_detected=candidate.person_score > 0, person_screen_time=candidate.person_score, ) def create_fallback_clips( self, video_path: str | Path, output_dir: str | Path, duration: float, num_clips: int, ) -> List[ExtractedClip]: """ Create uniformly distributed clips when no highlights are detected. Args: video_path: Path to source video output_dir: Directory for output clips duration: Video duration in seconds num_clips: Number of clips to create Returns: List of fallback ExtractedClip objects """ logger.warning("Creating fallback clips (no highlights detected)") clip_duration = self.config.default_clip_duration total_clip_time = clip_duration * num_clips if total_clip_time >= duration: # Video too short, adjust clip_duration = max( self.config.min_clip_duration, duration / (num_clips + 1) ) # Calculate evenly spaced start times gap = (duration - clip_duration * num_clips) / (num_clips + 1) candidates = [] for i in range(num_clips): start = gap + i * (clip_duration + gap) end = start + clip_duration candidates.append(ClipCandidate( start_time=start, end_time=min(end, duration), hype_score=0.5, # Neutral score )) return self.extract_clips( video_path=video_path, output_dir=output_dir, candidates=candidates, num_clips=num_clips, ) def merge_adjacent_candidates( self, candidates: List[ClipCandidate], max_gap: float = 2.0, max_duration: Optional[float] = None, ) -> List[ClipCandidate]: """ Merge adjacent high-scoring candidates into longer clips. Args: candidates: List of clip candidates max_gap: Maximum gap between candidates to merge max_duration: Maximum merged clip duration Returns: List of merged ClipCandidate objects """ max_duration = max_duration or self.config.max_clip_duration if not candidates: return [] # Sort by start time sorted_candidates = sorted(candidates, key=lambda c: c.start_time) merged = [] current = sorted_candidates[0] for candidate in sorted_candidates[1:]: gap = candidate.start_time - current.end_time potential_duration = candidate.end_time - current.start_time if gap <= max_gap and potential_duration <= max_duration: # Merge current = ClipCandidate( start_time=current.start_time, end_time=candidate.end_time, hype_score=max(current.hype_score, candidate.hype_score), visual_score=max(current.visual_score, candidate.visual_score), audio_score=max(current.audio_score, candidate.audio_score), motion_score=max(current.motion_score, candidate.motion_score), person_score=max(current.person_score, candidate.person_score), ) else: merged.append(current) current = candidate merged.append(current) return merged # Export public interface __all__ = ["ClipExtractor", "ExtractedClip", "ClipCandidate"]