dev_caio / core /scene_detector.py
Chaitanya-aitf's picture
Initializing project from local
ad4e58a verified
"""
ShortSmith v2 - Scene Detector Module
PySceneDetect integration for detecting scene/shot boundaries in videos.
Uses content-aware detection to find cuts, fades, and transitions.
"""
from pathlib import Path
from typing import List, Optional, Tuple
from dataclasses import dataclass
from utils.logger import get_logger, LogTimer
from utils.helpers import VideoProcessingError
from config import get_config
logger = get_logger("core.scene_detector")
@dataclass
class Scene:
"""Represents a detected scene/shot in the video."""
start_time: float # Start timestamp in seconds
end_time: float # End timestamp in seconds
start_frame: int # Start frame number
end_frame: int # End frame number
@property
def duration(self) -> float:
"""Scene duration in seconds."""
return self.end_time - self.start_time
@property
def frame_count(self) -> int:
"""Number of frames in scene."""
return self.end_frame - self.start_frame
@property
def midpoint(self) -> float:
"""Midpoint timestamp of the scene."""
return (self.start_time + self.end_time) / 2
def contains_timestamp(self, timestamp: float) -> bool:
"""Check if timestamp falls within this scene."""
return self.start_time <= timestamp < self.end_time
def overlaps_with(self, other: "Scene") -> bool:
"""Check if this scene overlaps with another."""
return not (self.end_time <= other.start_time or other.end_time <= self.start_time)
def __repr__(self) -> str:
return f"Scene({self.start_time:.2f}s - {self.end_time:.2f}s, {self.duration:.2f}s)"
class SceneDetector:
"""
Scene boundary detector using PySceneDetect.
Supports multiple detection modes:
- Content-aware: Detects cuts based on color histogram changes
- Adaptive: Uses rolling average for more robust detection
- Threshold: Simple luminance-based detection (for fades)
"""
def __init__(
self,
threshold: float = 27.0,
min_scene_length: float = 0.5,
adaptive_threshold: bool = True,
):
"""
Initialize scene detector.
Args:
threshold: Detection sensitivity (lower = more sensitive)
min_scene_length: Minimum scene duration in seconds
adaptive_threshold: Use adaptive threshold for varying content
Raises:
ImportError: If PySceneDetect is not installed
"""
self.threshold = threshold
self.min_scene_length = min_scene_length
self.adaptive_threshold = adaptive_threshold
# Verify PySceneDetect is available
self._verify_dependencies()
logger.info(
f"SceneDetector initialized (threshold={threshold}, "
f"min_length={min_scene_length}s, adaptive={adaptive_threshold})"
)
def _verify_dependencies(self) -> None:
"""Verify that PySceneDetect is installed."""
try:
import scenedetect
self._scenedetect = scenedetect
except ImportError as e:
raise ImportError(
"PySceneDetect is required for scene detection. "
"Install with: pip install scenedetect[opencv]"
) from e
def detect_scenes(
self,
video_path: str | Path,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
) -> List[Scene]:
"""
Detect scene boundaries in a video.
Args:
video_path: Path to the video file
start_time: Start analysis at this timestamp (seconds)
end_time: End analysis at this timestamp (seconds)
Returns:
List of detected Scene objects
Raises:
VideoProcessingError: If scene detection fails
"""
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector, AdaptiveDetector
video_path = Path(video_path)
if not video_path.exists():
raise VideoProcessingError(f"Video file not found: {video_path}")
with LogTimer(logger, f"Detecting scenes in {video_path.name}"):
try:
# Open video
video = open_video(str(video_path))
# Set up scene manager
scene_manager = SceneManager()
# Choose detector
if self.adaptive_threshold:
detector = AdaptiveDetector(
adaptive_threshold=self.threshold,
min_scene_len=int(self.min_scene_length * video.frame_rate),
)
else:
detector = ContentDetector(
threshold=self.threshold,
min_scene_len=int(self.min_scene_length * video.frame_rate),
)
scene_manager.add_detector(detector)
# Set time range if specified
if start_time is not None:
start_frame = int(start_time * video.frame_rate)
video.seek(start_frame)
else:
start_frame = 0
if end_time is not None:
duration_frames = int((end_time - (start_time or 0)) * video.frame_rate)
else:
duration_frames = None
# Detect scenes
scene_manager.detect_scenes(video, frame_skip=0, end_time=duration_frames)
# Get scene list
scene_list = scene_manager.get_scene_list()
# Convert to Scene objects
scenes = []
for scene_start, scene_end in scene_list:
scene = Scene(
start_time=scene_start.get_seconds(),
end_time=scene_end.get_seconds(),
start_frame=scene_start.get_frames(),
end_frame=scene_end.get_frames(),
)
scenes.append(scene)
logger.info(f"Detected {len(scenes)} scenes")
# If no scenes detected, create a single scene for entire video
if not scenes:
logger.warning("No scene cuts detected, treating as single scene")
video_duration = video.duration.get_seconds()
scenes = [Scene(
start_time=0,
end_time=video_duration,
start_frame=0,
end_frame=int(video_duration * video.frame_rate),
)]
return scenes
except Exception as e:
logger.error(f"Scene detection failed: {e}")
raise VideoProcessingError(f"Scene detection failed: {e}") from e
def detect_scene_boundaries(
self,
video_path: str | Path,
) -> List[float]:
"""
Get just the scene boundary timestamps.
Args:
video_path: Path to the video file
Returns:
List of timestamps where scene changes occur
"""
scenes = self.detect_scenes(video_path)
boundaries = [0.0] # Start of video
for scene in scenes:
if scene.start_time > 0:
boundaries.append(scene.start_time)
# Remove duplicates and sort
return sorted(set(boundaries))
def get_scene_at_timestamp(
self,
scenes: List[Scene],
timestamp: float,
) -> Optional[Scene]:
"""
Find the scene containing a specific timestamp.
Args:
scenes: List of detected scenes
timestamp: Timestamp to search for
Returns:
Scene containing the timestamp, or None if not found
"""
for scene in scenes:
if scene.contains_timestamp(timestamp):
return scene
return None
def get_scenes_in_range(
self,
scenes: List[Scene],
start_time: float,
end_time: float,
) -> List[Scene]:
"""
Get all scenes that overlap with a time range.
Args:
scenes: List of detected scenes
start_time: Range start
end_time: Range end
Returns:
List of overlapping scenes
"""
range_scene = Scene(
start_time=start_time,
end_time=end_time,
start_frame=0,
end_frame=0,
)
return [s for s in scenes if s.overlaps_with(range_scene)]
def merge_short_scenes(
self,
scenes: List[Scene],
min_duration: float = 2.0,
) -> List[Scene]:
"""
Merge scenes that are shorter than minimum duration.
Args:
scenes: List of scenes to process
min_duration: Minimum scene duration in seconds
Returns:
List of merged scenes
"""
if not scenes:
return []
merged = []
current = scenes[0]
for scene in scenes[1:]:
if current.duration < min_duration:
# Merge with next scene
current = Scene(
start_time=current.start_time,
end_time=scene.end_time,
start_frame=current.start_frame,
end_frame=scene.end_frame,
)
else:
merged.append(current)
current = scene
merged.append(current)
logger.debug(f"Merged {len(scenes)} scenes into {len(merged)}")
return merged
def split_long_scenes(
self,
scenes: List[Scene],
max_duration: float = 30.0,
video_fps: float = 30.0,
) -> List[Scene]:
"""
Split scenes that are longer than maximum duration.
Args:
scenes: List of scenes to process
max_duration: Maximum scene duration in seconds
video_fps: Video frame rate for frame calculations
Returns:
List of scenes with long ones split
"""
result = []
for scene in scenes:
if scene.duration <= max_duration:
result.append(scene)
else:
# Split into chunks
num_chunks = int(scene.duration / max_duration) + 1
chunk_duration = scene.duration / num_chunks
for i in range(num_chunks):
start = scene.start_time + (i * chunk_duration)
end = min(scene.start_time + ((i + 1) * chunk_duration), scene.end_time)
result.append(Scene(
start_time=start,
end_time=end,
start_frame=int(start * video_fps),
end_frame=int(end * video_fps),
))
logger.debug(f"Split {len(scenes)} scenes into {len(result)}")
return result
# Export public interface
__all__ = ["SceneDetector", "Scene"]