Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Scene Detector Module | |
| PySceneDetect integration for detecting scene/shot boundaries in videos. | |
| Uses content-aware detection to find cuts, fades, and transitions. | |
| """ | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple | |
| from dataclasses import dataclass | |
| from utils.logger import get_logger, LogTimer | |
| from utils.helpers import VideoProcessingError | |
| from config import get_config | |
| logger = get_logger("core.scene_detector") | |
| class Scene: | |
| """Represents a detected scene/shot in the video.""" | |
| start_time: float # Start timestamp in seconds | |
| end_time: float # End timestamp in seconds | |
| start_frame: int # Start frame number | |
| end_frame: int # End frame number | |
| def duration(self) -> float: | |
| """Scene duration in seconds.""" | |
| return self.end_time - self.start_time | |
| def frame_count(self) -> int: | |
| """Number of frames in scene.""" | |
| return self.end_frame - self.start_frame | |
| def midpoint(self) -> float: | |
| """Midpoint timestamp of the scene.""" | |
| return (self.start_time + self.end_time) / 2 | |
| def contains_timestamp(self, timestamp: float) -> bool: | |
| """Check if timestamp falls within this scene.""" | |
| return self.start_time <= timestamp < self.end_time | |
| def overlaps_with(self, other: "Scene") -> bool: | |
| """Check if this scene overlaps with another.""" | |
| return not (self.end_time <= other.start_time or other.end_time <= self.start_time) | |
| def __repr__(self) -> str: | |
| return f"Scene({self.start_time:.2f}s - {self.end_time:.2f}s, {self.duration:.2f}s)" | |
| class SceneDetector: | |
| """ | |
| Scene boundary detector using PySceneDetect. | |
| Supports multiple detection modes: | |
| - Content-aware: Detects cuts based on color histogram changes | |
| - Adaptive: Uses rolling average for more robust detection | |
| - Threshold: Simple luminance-based detection (for fades) | |
| """ | |
| def __init__( | |
| self, | |
| threshold: float = 27.0, | |
| min_scene_length: float = 0.5, | |
| adaptive_threshold: bool = True, | |
| ): | |
| """ | |
| Initialize scene detector. | |
| Args: | |
| threshold: Detection sensitivity (lower = more sensitive) | |
| min_scene_length: Minimum scene duration in seconds | |
| adaptive_threshold: Use adaptive threshold for varying content | |
| Raises: | |
| ImportError: If PySceneDetect is not installed | |
| """ | |
| self.threshold = threshold | |
| self.min_scene_length = min_scene_length | |
| self.adaptive_threshold = adaptive_threshold | |
| # Verify PySceneDetect is available | |
| self._verify_dependencies() | |
| logger.info( | |
| f"SceneDetector initialized (threshold={threshold}, " | |
| f"min_length={min_scene_length}s, adaptive={adaptive_threshold})" | |
| ) | |
| def _verify_dependencies(self) -> None: | |
| """Verify that PySceneDetect is installed.""" | |
| try: | |
| import scenedetect | |
| self._scenedetect = scenedetect | |
| except ImportError as e: | |
| raise ImportError( | |
| "PySceneDetect is required for scene detection. " | |
| "Install with: pip install scenedetect[opencv]" | |
| ) from e | |
| def detect_scenes( | |
| self, | |
| video_path: str | Path, | |
| start_time: Optional[float] = None, | |
| end_time: Optional[float] = None, | |
| ) -> List[Scene]: | |
| """ | |
| Detect scene boundaries in a video. | |
| Args: | |
| video_path: Path to the video file | |
| start_time: Start analysis at this timestamp (seconds) | |
| end_time: End analysis at this timestamp (seconds) | |
| Returns: | |
| List of detected Scene objects | |
| Raises: | |
| VideoProcessingError: If scene detection fails | |
| """ | |
| from scenedetect import open_video, SceneManager | |
| from scenedetect.detectors import ContentDetector, AdaptiveDetector | |
| video_path = Path(video_path) | |
| if not video_path.exists(): | |
| raise VideoProcessingError(f"Video file not found: {video_path}") | |
| with LogTimer(logger, f"Detecting scenes in {video_path.name}"): | |
| try: | |
| # Open video | |
| video = open_video(str(video_path)) | |
| # Set up scene manager | |
| scene_manager = SceneManager() | |
| # Choose detector | |
| if self.adaptive_threshold: | |
| detector = AdaptiveDetector( | |
| adaptive_threshold=self.threshold, | |
| min_scene_len=int(self.min_scene_length * video.frame_rate), | |
| ) | |
| else: | |
| detector = ContentDetector( | |
| threshold=self.threshold, | |
| min_scene_len=int(self.min_scene_length * video.frame_rate), | |
| ) | |
| scene_manager.add_detector(detector) | |
| # Set time range if specified | |
| if start_time is not None: | |
| start_frame = int(start_time * video.frame_rate) | |
| video.seek(start_frame) | |
| else: | |
| start_frame = 0 | |
| if end_time is not None: | |
| duration_frames = int((end_time - (start_time or 0)) * video.frame_rate) | |
| else: | |
| duration_frames = None | |
| # Detect scenes | |
| scene_manager.detect_scenes(video, frame_skip=0, end_time=duration_frames) | |
| # Get scene list | |
| scene_list = scene_manager.get_scene_list() | |
| # Convert to Scene objects | |
| scenes = [] | |
| for scene_start, scene_end in scene_list: | |
| scene = Scene( | |
| start_time=scene_start.get_seconds(), | |
| end_time=scene_end.get_seconds(), | |
| start_frame=scene_start.get_frames(), | |
| end_frame=scene_end.get_frames(), | |
| ) | |
| scenes.append(scene) | |
| logger.info(f"Detected {len(scenes)} scenes") | |
| # If no scenes detected, create a single scene for entire video | |
| if not scenes: | |
| logger.warning("No scene cuts detected, treating as single scene") | |
| video_duration = video.duration.get_seconds() | |
| scenes = [Scene( | |
| start_time=0, | |
| end_time=video_duration, | |
| start_frame=0, | |
| end_frame=int(video_duration * video.frame_rate), | |
| )] | |
| return scenes | |
| except Exception as e: | |
| logger.error(f"Scene detection failed: {e}") | |
| raise VideoProcessingError(f"Scene detection failed: {e}") from e | |
| def detect_scene_boundaries( | |
| self, | |
| video_path: str | Path, | |
| ) -> List[float]: | |
| """ | |
| Get just the scene boundary timestamps. | |
| Args: | |
| video_path: Path to the video file | |
| Returns: | |
| List of timestamps where scene changes occur | |
| """ | |
| scenes = self.detect_scenes(video_path) | |
| boundaries = [0.0] # Start of video | |
| for scene in scenes: | |
| if scene.start_time > 0: | |
| boundaries.append(scene.start_time) | |
| # Remove duplicates and sort | |
| return sorted(set(boundaries)) | |
| def get_scene_at_timestamp( | |
| self, | |
| scenes: List[Scene], | |
| timestamp: float, | |
| ) -> Optional[Scene]: | |
| """ | |
| Find the scene containing a specific timestamp. | |
| Args: | |
| scenes: List of detected scenes | |
| timestamp: Timestamp to search for | |
| Returns: | |
| Scene containing the timestamp, or None if not found | |
| """ | |
| for scene in scenes: | |
| if scene.contains_timestamp(timestamp): | |
| return scene | |
| return None | |
| def get_scenes_in_range( | |
| self, | |
| scenes: List[Scene], | |
| start_time: float, | |
| end_time: float, | |
| ) -> List[Scene]: | |
| """ | |
| Get all scenes that overlap with a time range. | |
| Args: | |
| scenes: List of detected scenes | |
| start_time: Range start | |
| end_time: Range end | |
| Returns: | |
| List of overlapping scenes | |
| """ | |
| range_scene = Scene( | |
| start_time=start_time, | |
| end_time=end_time, | |
| start_frame=0, | |
| end_frame=0, | |
| ) | |
| return [s for s in scenes if s.overlaps_with(range_scene)] | |
| def merge_short_scenes( | |
| self, | |
| scenes: List[Scene], | |
| min_duration: float = 2.0, | |
| ) -> List[Scene]: | |
| """ | |
| Merge scenes that are shorter than minimum duration. | |
| Args: | |
| scenes: List of scenes to process | |
| min_duration: Minimum scene duration in seconds | |
| Returns: | |
| List of merged scenes | |
| """ | |
| if not scenes: | |
| return [] | |
| merged = [] | |
| current = scenes[0] | |
| for scene in scenes[1:]: | |
| if current.duration < min_duration: | |
| # Merge with next scene | |
| current = Scene( | |
| start_time=current.start_time, | |
| end_time=scene.end_time, | |
| start_frame=current.start_frame, | |
| end_frame=scene.end_frame, | |
| ) | |
| else: | |
| merged.append(current) | |
| current = scene | |
| merged.append(current) | |
| logger.debug(f"Merged {len(scenes)} scenes into {len(merged)}") | |
| return merged | |
| def split_long_scenes( | |
| self, | |
| scenes: List[Scene], | |
| max_duration: float = 30.0, | |
| video_fps: float = 30.0, | |
| ) -> List[Scene]: | |
| """ | |
| Split scenes that are longer than maximum duration. | |
| Args: | |
| scenes: List of scenes to process | |
| max_duration: Maximum scene duration in seconds | |
| video_fps: Video frame rate for frame calculations | |
| Returns: | |
| List of scenes with long ones split | |
| """ | |
| result = [] | |
| for scene in scenes: | |
| if scene.duration <= max_duration: | |
| result.append(scene) | |
| else: | |
| # Split into chunks | |
| num_chunks = int(scene.duration / max_duration) + 1 | |
| chunk_duration = scene.duration / num_chunks | |
| for i in range(num_chunks): | |
| start = scene.start_time + (i * chunk_duration) | |
| end = min(scene.start_time + ((i + 1) * chunk_duration), scene.end_time) | |
| result.append(Scene( | |
| start_time=start, | |
| end_time=end, | |
| start_frame=int(start * video_fps), | |
| end_frame=int(end * video_fps), | |
| )) | |
| logger.debug(f"Split {len(scenes)} scenes into {len(result)}") | |
| return result | |
| # Export public interface | |
| __all__ = ["SceneDetector", "Scene"] | |