Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Clip Extractor Module | |
| Final clip extraction and output generation. | |
| Handles cutting clips at precise timestamps with various output options. | |
| """ | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple | |
| from dataclasses import dataclass, field | |
| import shutil | |
| from utils.logger import get_logger, LogTimer | |
| from utils.helpers import ( | |
| VideoProcessingError, | |
| ensure_dir, | |
| format_timestamp, | |
| get_unique_filename, | |
| ) | |
| from config import get_config, ProcessingConfig | |
| from core.video_processor import VideoProcessor, VideoMetadata | |
| logger = get_logger("core.clip_extractor") | |
| class ExtractedClip: | |
| """Represents an extracted video clip.""" | |
| clip_path: Path # Path to the clip file | |
| start_time: float # Start timestamp in source video | |
| end_time: float # End timestamp in source video | |
| hype_score: float # Normalized hype score (0-1) | |
| rank: int # Rank among all clips (1 = best) | |
| thumbnail_path: Optional[Path] = None # Path to thumbnail | |
| # Metadata | |
| source_video: Optional[Path] = None | |
| person_detected: bool = False | |
| person_screen_time: float = 0.0 # Percentage of clip with target person | |
| # Additional scores | |
| visual_score: float = 0.0 | |
| audio_score: float = 0.0 | |
| motion_score: float = 0.0 | |
| def duration(self) -> float: | |
| """Clip duration in seconds.""" | |
| return self.end_time - self.start_time | |
| def time_range(self) -> str: | |
| """Human-readable time range.""" | |
| return f"{format_timestamp(self.start_time)} - {format_timestamp(self.end_time)}" | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary for JSON serialization.""" | |
| return { | |
| "clip_path": str(self.clip_path), | |
| "start_time": self.start_time, | |
| "end_time": self.end_time, | |
| "duration": self.duration, | |
| "hype_score": round(self.hype_score, 4), | |
| "rank": self.rank, | |
| "time_range": self.time_range, | |
| "visual_score": round(self.visual_score, 4), | |
| "audio_score": round(self.audio_score, 4), | |
| "motion_score": round(self.motion_score, 4), | |
| "person_detected": self.person_detected, | |
| "person_screen_time": round(self.person_screen_time, 4), | |
| } | |
| class ClipCandidate: | |
| """A candidate segment for clip extraction.""" | |
| start_time: float | |
| end_time: float | |
| hype_score: float | |
| visual_score: float = 0.0 | |
| audio_score: float = 0.0 | |
| motion_score: float = 0.0 | |
| person_score: float = 0.0 # Target person visibility | |
| def duration(self) -> float: | |
| return self.end_time - self.start_time | |
| class ClipExtractor: | |
| """ | |
| Extracts final clips from video based on hype scores. | |
| Handles: | |
| - Selecting top segments based on scores | |
| - Enforcing diversity (minimum gap between clips) | |
| - Adjusting clip boundaries to scene cuts | |
| - Generating thumbnails | |
| """ | |
| def __init__( | |
| self, | |
| video_processor: VideoProcessor, | |
| config: Optional[ProcessingConfig] = None, | |
| ): | |
| """ | |
| Initialize clip extractor. | |
| Args: | |
| video_processor: VideoProcessor instance for clip cutting | |
| config: Processing configuration (uses default if None) | |
| """ | |
| self.video_processor = video_processor | |
| self.config = config or get_config().processing | |
| logger.info( | |
| f"ClipExtractor initialized (duration={self.config.min_clip_duration}-" | |
| f"{self.config.max_clip_duration}s, gap={self.config.min_gap_between_clips}s)" | |
| ) | |
| def select_clips( | |
| self, | |
| candidates: List[ClipCandidate], | |
| num_clips: int, | |
| enforce_diversity: bool = True, | |
| ) -> List[ClipCandidate]: | |
| """ | |
| Select top clips from candidates. | |
| Args: | |
| candidates: List of clip candidates with scores | |
| num_clips: Number of clips to select | |
| enforce_diversity: Enforce minimum gap between clips | |
| Returns: | |
| List of selected ClipCandidate objects | |
| """ | |
| if not candidates: | |
| logger.warning("No candidates provided for selection") | |
| return [] | |
| # Sort by hype score | |
| sorted_candidates = sorted( | |
| candidates, key=lambda c: c.hype_score, reverse=True | |
| ) | |
| if not enforce_diversity: | |
| return sorted_candidates[:num_clips] | |
| # Select with diversity constraint | |
| selected = [] | |
| min_gap = self.config.min_gap_between_clips | |
| for candidate in sorted_candidates: | |
| if len(selected) >= num_clips: | |
| break | |
| # Check if this candidate is far enough from existing selections | |
| is_diverse = True | |
| for existing in selected: | |
| # Calculate gap between clip starts | |
| gap = abs(candidate.start_time - existing.start_time) | |
| if gap < min_gap: | |
| is_diverse = False | |
| break | |
| if is_diverse: | |
| selected.append(candidate) | |
| # If we couldn't get enough with diversity, relax constraint | |
| if len(selected) < num_clips: | |
| logger.warning( | |
| f"Only {len(selected)} diverse clips found, " | |
| f"relaxing diversity constraint" | |
| ) | |
| for candidate in sorted_candidates: | |
| if candidate not in selected: | |
| selected.append(candidate) | |
| if len(selected) >= num_clips: | |
| break | |
| logger.info(f"Selected {len(selected)} clips from {len(candidates)} candidates") | |
| return selected | |
| def adjust_to_scene_boundaries( | |
| self, | |
| candidates: List[ClipCandidate], | |
| scene_boundaries: List[float], | |
| tolerance: float = 1.0, | |
| ) -> List[ClipCandidate]: | |
| """ | |
| Adjust clip boundaries to align with scene cuts. | |
| Args: | |
| candidates: List of clip candidates | |
| scene_boundaries: List of scene boundary timestamps | |
| tolerance: Maximum adjustment in seconds | |
| Returns: | |
| List of adjusted ClipCandidate objects | |
| """ | |
| if not scene_boundaries: | |
| return candidates | |
| adjusted = [] | |
| for candidate in candidates: | |
| new_start = candidate.start_time | |
| new_end = candidate.end_time | |
| # Find nearest scene boundary for start | |
| for boundary in scene_boundaries: | |
| if abs(boundary - candidate.start_time) < tolerance: | |
| new_start = boundary | |
| break | |
| # Find nearest scene boundary for end | |
| for boundary in scene_boundaries: | |
| if abs(boundary - candidate.end_time) < tolerance: | |
| new_end = boundary | |
| break | |
| # Ensure minimum duration | |
| if new_end - new_start < self.config.min_clip_duration: | |
| # Keep original boundaries | |
| new_start = candidate.start_time | |
| new_end = candidate.end_time | |
| adjusted.append(ClipCandidate( | |
| start_time=new_start, | |
| end_time=new_end, | |
| hype_score=candidate.hype_score, | |
| visual_score=candidate.visual_score, | |
| audio_score=candidate.audio_score, | |
| motion_score=candidate.motion_score, | |
| person_score=candidate.person_score, | |
| )) | |
| return adjusted | |
| def extract_clips( | |
| self, | |
| video_path: str | Path, | |
| output_dir: str | Path, | |
| candidates: List[ClipCandidate], | |
| num_clips: Optional[int] = None, | |
| generate_thumbnails: bool = True, | |
| reencode: bool = False, | |
| ) -> List[ExtractedClip]: | |
| """ | |
| Extract clips from video. | |
| Args: | |
| video_path: Path to source video | |
| output_dir: Directory for output clips | |
| candidates: List of clip candidates | |
| num_clips: Number of clips to extract (None = use config default) | |
| generate_thumbnails: Whether to generate thumbnails | |
| reencode: Whether to re-encode clips (slower but precise) | |
| Returns: | |
| List of ExtractedClip objects | |
| """ | |
| video_path = Path(video_path) | |
| output_dir = ensure_dir(output_dir) | |
| num_clips = num_clips or self.config.default_num_clips | |
| with LogTimer(logger, f"Extracting {num_clips} clips"): | |
| # Select top clips | |
| selected = self.select_clips(candidates, num_clips) | |
| if not selected: | |
| logger.warning("No clips to extract") | |
| return [] | |
| # Extract each clip | |
| clips = [] | |
| for rank, candidate in enumerate(selected, 1): | |
| try: | |
| clip = self._extract_single_clip( | |
| video_path=video_path, | |
| output_dir=output_dir, | |
| candidate=candidate, | |
| rank=rank, | |
| generate_thumbnail=generate_thumbnails, | |
| reencode=reencode, | |
| ) | |
| clips.append(clip) | |
| except Exception as e: | |
| logger.error(f"Failed to extract clip {rank}: {e}") | |
| logger.info(f"Successfully extracted {len(clips)} clips") | |
| return clips | |
| def _extract_single_clip( | |
| self, | |
| video_path: Path, | |
| output_dir: Path, | |
| candidate: ClipCandidate, | |
| rank: int, | |
| generate_thumbnail: bool, | |
| reencode: bool, | |
| ) -> ExtractedClip: | |
| """Extract a single clip.""" | |
| # Generate output filename | |
| clip_filename = f"clip_{rank:02d}_{format_timestamp(candidate.start_time).replace(':', '-')}.mp4" | |
| clip_path = output_dir / clip_filename | |
| # Cut the clip | |
| self.video_processor.cut_clip( | |
| video_path=video_path, | |
| output_path=clip_path, | |
| start_time=candidate.start_time, | |
| end_time=candidate.end_time, | |
| reencode=reencode, | |
| ) | |
| # Generate thumbnail | |
| thumbnail_path = None | |
| if generate_thumbnail: | |
| try: | |
| thumb_filename = f"thumb_{rank:02d}.jpg" | |
| thumbnail_path = output_dir / "thumbnails" / thumb_filename | |
| thumbnail_path.parent.mkdir(exist_ok=True) | |
| # Thumbnail at 1/3 into the clip | |
| thumb_time = candidate.start_time + (candidate.duration / 3) | |
| self.video_processor.generate_thumbnail( | |
| video_path=video_path, | |
| output_path=thumbnail_path, | |
| timestamp=thumb_time, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to generate thumbnail for clip {rank}: {e}") | |
| thumbnail_path = None | |
| return ExtractedClip( | |
| clip_path=clip_path, | |
| start_time=candidate.start_time, | |
| end_time=candidate.end_time, | |
| hype_score=candidate.hype_score, | |
| rank=rank, | |
| thumbnail_path=thumbnail_path, | |
| source_video=video_path, | |
| visual_score=candidate.visual_score, | |
| audio_score=candidate.audio_score, | |
| motion_score=candidate.motion_score, | |
| person_detected=candidate.person_score > 0, | |
| person_screen_time=candidate.person_score, | |
| ) | |
| def create_fallback_clips( | |
| self, | |
| video_path: str | Path, | |
| output_dir: str | Path, | |
| duration: float, | |
| num_clips: int, | |
| ) -> List[ExtractedClip]: | |
| """ | |
| Create uniformly distributed clips when no highlights are detected. | |
| Args: | |
| video_path: Path to source video | |
| output_dir: Directory for output clips | |
| duration: Video duration in seconds | |
| num_clips: Number of clips to create | |
| Returns: | |
| List of fallback ExtractedClip objects | |
| """ | |
| logger.warning("Creating fallback clips (no highlights detected)") | |
| clip_duration = self.config.default_clip_duration | |
| total_clip_time = clip_duration * num_clips | |
| if total_clip_time >= duration: | |
| # Video too short, adjust | |
| clip_duration = max( | |
| self.config.min_clip_duration, | |
| duration / (num_clips + 1) | |
| ) | |
| # Calculate evenly spaced start times | |
| gap = (duration - clip_duration * num_clips) / (num_clips + 1) | |
| candidates = [] | |
| for i in range(num_clips): | |
| start = gap + i * (clip_duration + gap) | |
| end = start + clip_duration | |
| candidates.append(ClipCandidate( | |
| start_time=start, | |
| end_time=min(end, duration), | |
| hype_score=0.5, # Neutral score | |
| )) | |
| return self.extract_clips( | |
| video_path=video_path, | |
| output_dir=output_dir, | |
| candidates=candidates, | |
| num_clips=num_clips, | |
| ) | |
| def merge_adjacent_candidates( | |
| self, | |
| candidates: List[ClipCandidate], | |
| max_gap: float = 2.0, | |
| max_duration: Optional[float] = None, | |
| ) -> List[ClipCandidate]: | |
| """ | |
| Merge adjacent high-scoring candidates into longer clips. | |
| Args: | |
| candidates: List of clip candidates | |
| max_gap: Maximum gap between candidates to merge | |
| max_duration: Maximum merged clip duration | |
| Returns: | |
| List of merged ClipCandidate objects | |
| """ | |
| max_duration = max_duration or self.config.max_clip_duration | |
| if not candidates: | |
| return [] | |
| # Sort by start time | |
| sorted_candidates = sorted(candidates, key=lambda c: c.start_time) | |
| merged = [] | |
| current = sorted_candidates[0] | |
| for candidate in sorted_candidates[1:]: | |
| gap = candidate.start_time - current.end_time | |
| potential_duration = candidate.end_time - current.start_time | |
| if gap <= max_gap and potential_duration <= max_duration: | |
| # Merge | |
| current = ClipCandidate( | |
| start_time=current.start_time, | |
| end_time=candidate.end_time, | |
| hype_score=max(current.hype_score, candidate.hype_score), | |
| visual_score=max(current.visual_score, candidate.visual_score), | |
| audio_score=max(current.audio_score, candidate.audio_score), | |
| motion_score=max(current.motion_score, candidate.motion_score), | |
| person_score=max(current.person_score, candidate.person_score), | |
| ) | |
| else: | |
| merged.append(current) | |
| current = candidate | |
| merged.append(current) | |
| return merged | |
| # Export public interface | |
| __all__ = ["ClipExtractor", "ExtractedClip", "ClipCandidate"] | |