""" ShortSmith v2 - Scene Detector Module PySceneDetect integration for detecting scene/shot boundaries in videos. Uses content-aware detection to find cuts, fades, and transitions. """ from pathlib import Path from typing import List, Optional, Tuple from dataclasses import dataclass from utils.logger import get_logger, LogTimer from utils.helpers import VideoProcessingError from config import get_config logger = get_logger("core.scene_detector") @dataclass class Scene: """Represents a detected scene/shot in the video.""" start_time: float # Start timestamp in seconds end_time: float # End timestamp in seconds start_frame: int # Start frame number end_frame: int # End frame number @property def duration(self) -> float: """Scene duration in seconds.""" return self.end_time - self.start_time @property def frame_count(self) -> int: """Number of frames in scene.""" return self.end_frame - self.start_frame @property def midpoint(self) -> float: """Midpoint timestamp of the scene.""" return (self.start_time + self.end_time) / 2 def contains_timestamp(self, timestamp: float) -> bool: """Check if timestamp falls within this scene.""" return self.start_time <= timestamp < self.end_time def overlaps_with(self, other: "Scene") -> bool: """Check if this scene overlaps with another.""" return not (self.end_time <= other.start_time or other.end_time <= self.start_time) def __repr__(self) -> str: return f"Scene({self.start_time:.2f}s - {self.end_time:.2f}s, {self.duration:.2f}s)" class SceneDetector: """ Scene boundary detector using PySceneDetect. Supports multiple detection modes: - Content-aware: Detects cuts based on color histogram changes - Adaptive: Uses rolling average for more robust detection - Threshold: Simple luminance-based detection (for fades) """ def __init__( self, threshold: float = 27.0, min_scene_length: float = 0.5, adaptive_threshold: bool = True, ): """ Initialize scene detector. Args: threshold: Detection sensitivity (lower = more sensitive) min_scene_length: Minimum scene duration in seconds adaptive_threshold: Use adaptive threshold for varying content Raises: ImportError: If PySceneDetect is not installed """ self.threshold = threshold self.min_scene_length = min_scene_length self.adaptive_threshold = adaptive_threshold # Verify PySceneDetect is available self._verify_dependencies() logger.info( f"SceneDetector initialized (threshold={threshold}, " f"min_length={min_scene_length}s, adaptive={adaptive_threshold})" ) def _verify_dependencies(self) -> None: """Verify that PySceneDetect is installed.""" try: import scenedetect self._scenedetect = scenedetect except ImportError as e: raise ImportError( "PySceneDetect is required for scene detection. " "Install with: pip install scenedetect[opencv]" ) from e def detect_scenes( self, video_path: str | Path, start_time: Optional[float] = None, end_time: Optional[float] = None, ) -> List[Scene]: """ Detect scene boundaries in a video. Args: video_path: Path to the video file start_time: Start analysis at this timestamp (seconds) end_time: End analysis at this timestamp (seconds) Returns: List of detected Scene objects Raises: VideoProcessingError: If scene detection fails """ from scenedetect import open_video, SceneManager from scenedetect.detectors import ContentDetector, AdaptiveDetector video_path = Path(video_path) if not video_path.exists(): raise VideoProcessingError(f"Video file not found: {video_path}") with LogTimer(logger, f"Detecting scenes in {video_path.name}"): try: # Open video video = open_video(str(video_path)) # Set up scene manager scene_manager = SceneManager() # Choose detector if self.adaptive_threshold: detector = AdaptiveDetector( adaptive_threshold=self.threshold, min_scene_len=int(self.min_scene_length * video.frame_rate), ) else: detector = ContentDetector( threshold=self.threshold, min_scene_len=int(self.min_scene_length * video.frame_rate), ) scene_manager.add_detector(detector) # Set time range if specified if start_time is not None: start_frame = int(start_time * video.frame_rate) video.seek(start_frame) else: start_frame = 0 if end_time is not None: duration_frames = int((end_time - (start_time or 0)) * video.frame_rate) else: duration_frames = None # Detect scenes scene_manager.detect_scenes(video, frame_skip=0, end_time=duration_frames) # Get scene list scene_list = scene_manager.get_scene_list() # Convert to Scene objects scenes = [] for scene_start, scene_end in scene_list: scene = Scene( start_time=scene_start.get_seconds(), end_time=scene_end.get_seconds(), start_frame=scene_start.get_frames(), end_frame=scene_end.get_frames(), ) scenes.append(scene) logger.info(f"Detected {len(scenes)} scenes") # If no scenes detected, create a single scene for entire video if not scenes: logger.warning("No scene cuts detected, treating as single scene") video_duration = video.duration.get_seconds() scenes = [Scene( start_time=0, end_time=video_duration, start_frame=0, end_frame=int(video_duration * video.frame_rate), )] return scenes except Exception as e: logger.error(f"Scene detection failed: {e}") raise VideoProcessingError(f"Scene detection failed: {e}") from e def detect_scene_boundaries( self, video_path: str | Path, ) -> List[float]: """ Get just the scene boundary timestamps. Args: video_path: Path to the video file Returns: List of timestamps where scene changes occur """ scenes = self.detect_scenes(video_path) boundaries = [0.0] # Start of video for scene in scenes: if scene.start_time > 0: boundaries.append(scene.start_time) # Remove duplicates and sort return sorted(set(boundaries)) def get_scene_at_timestamp( self, scenes: List[Scene], timestamp: float, ) -> Optional[Scene]: """ Find the scene containing a specific timestamp. Args: scenes: List of detected scenes timestamp: Timestamp to search for Returns: Scene containing the timestamp, or None if not found """ for scene in scenes: if scene.contains_timestamp(timestamp): return scene return None def get_scenes_in_range( self, scenes: List[Scene], start_time: float, end_time: float, ) -> List[Scene]: """ Get all scenes that overlap with a time range. Args: scenes: List of detected scenes start_time: Range start end_time: Range end Returns: List of overlapping scenes """ range_scene = Scene( start_time=start_time, end_time=end_time, start_frame=0, end_frame=0, ) return [s for s in scenes if s.overlaps_with(range_scene)] def merge_short_scenes( self, scenes: List[Scene], min_duration: float = 2.0, ) -> List[Scene]: """ Merge scenes that are shorter than minimum duration. Args: scenes: List of scenes to process min_duration: Minimum scene duration in seconds Returns: List of merged scenes """ if not scenes: return [] merged = [] current = scenes[0] for scene in scenes[1:]: if current.duration < min_duration: # Merge with next scene current = Scene( start_time=current.start_time, end_time=scene.end_time, start_frame=current.start_frame, end_frame=scene.end_frame, ) else: merged.append(current) current = scene merged.append(current) logger.debug(f"Merged {len(scenes)} scenes into {len(merged)}") return merged def split_long_scenes( self, scenes: List[Scene], max_duration: float = 30.0, video_fps: float = 30.0, ) -> List[Scene]: """ Split scenes that are longer than maximum duration. Args: scenes: List of scenes to process max_duration: Maximum scene duration in seconds video_fps: Video frame rate for frame calculations Returns: List of scenes with long ones split """ result = [] for scene in scenes: if scene.duration <= max_duration: result.append(scene) else: # Split into chunks num_chunks = int(scene.duration / max_duration) + 1 chunk_duration = scene.duration / num_chunks for i in range(num_chunks): start = scene.start_time + (i * chunk_duration) end = min(scene.start_time + ((i + 1) * chunk_duration), scene.end_time) result.append(Scene( start_time=start, end_time=end, start_frame=int(start * video_fps), end_frame=int(end * video_fps), )) logger.debug(f"Split {len(scenes)} scenes into {len(result)}") return result # Export public interface __all__ = ["SceneDetector", "Scene"]