| | from typing import Dict, List, Optional, Tuple, Union |
| |
|
| | from PIL import Image |
| | from pydantic import BaseModel |
| | from pydub import AudioSegment |
| | from pydub.effects import normalize |
| | from scenedetect import (ContentDetector, FrameTimecode, SceneManager, |
| | VideoStream, open_video) |
| |
|
| |
|
| | class Scene(BaseModel): |
| | start: FrameTimecode |
| | end: FrameTimecode |
| | stt_res: Optional[Dict] = None |
| | summary: Optional[Dict] = None |
| |
|
| | class Config: |
| | """Configuration for this pydantic object.""" |
| |
|
| | arbitrary_types_allowed = True |
| |
|
| | @classmethod |
| | def init(cls, start: FrameTimecode, end: FrameTimecode, summary: dict = None): |
| | return cls(start=start, end=end, summary=summary) |
| |
|
| | @property |
| | def conversation(self): |
| | |
| | if isinstance(self.stt_res, list): |
| | output_conversation = "\n".join( |
| | [f"{item.get('text', None)}" for item in self.stt_res] |
| | ) |
| | else: |
| | output_conversation = self.stt_res |
| | return output_conversation |
| |
|
| |
|
| | class VideoScenes(BaseModel): |
| | stream: VideoStream |
| | audio: Union[AudioSegment, None] |
| | scenes: List[Scene] |
| | frame_extraction_interval: int |
| |
|
| | class Config: |
| | """Configuration for this pydantic object.""" |
| |
|
| | extra = "allow" |
| | arbitrary_types_allowed = True |
| |
|
| | @classmethod |
| | def load( |
| | cls, |
| | video_path: str, |
| | threshold: int = 27, |
| | min_scene_len: int = 1, |
| | frame_extraction_interval: int = 5, |
| | show_progress: bool = False, |
| | kernel_size: Optional[int] = None, |
| | ): |
| | """Load a video file. |
| | |
| | Args: |
| | video_path (str): The path of the video file. Only support local file. |
| | threshold (int): The scene detection threshold. |
| | min_scene_len (int): Once a cut is detected, this long time must pass before a new one can |
| | be added to the scene list. Count in seconds, defaults to 1. |
| | show_progress (bool, optional): Whether to display the progress bar when processing the video. Defaults to False. |
| | """ |
| | video = open_video(video_path) |
| | scene_manager = SceneManager() |
| | weight = ContentDetector.Components( |
| | delta_hue=1.0, |
| | delta_sat=1.0, |
| | delta_lum=0.0, |
| | delta_edges=1.0, |
| | ) |
| | if kernel_size is None: |
| | scene_manager.add_detector( |
| | ContentDetector( |
| | threshold=threshold, |
| | min_scene_len=int(video.frame_rate * min_scene_len), |
| | weights=weight, |
| | ) |
| | ) |
| | else: |
| | scene_manager.add_detector( |
| | ContentDetector( |
| | threshold=threshold, |
| | min_scene_len=int(video.frame_rate * min_scene_len), |
| | weights=weight, |
| | kernel_size=kernel_size, |
| | ) |
| | ) |
| | scene_manager.detect_scenes(video, show_progress=show_progress) |
| | scenes = scene_manager.get_scene_list(start_in_scene=True) |
| |
|
| | try: |
| | audio = AudioSegment.from_file(video_path) |
| | audio = normalize(audio) |
| | except (IndexError, OSError): |
| | audio = None |
| | return cls( |
| | stream=video, |
| | scenes=[Scene.init(*scene) for scene in scenes], |
| | audio=audio, |
| | frame_extraction_interval=frame_extraction_interval, |
| | ) |
| |
|
| | def get_video_frames( |
| | self, scene: Union[int, Scene, Tuple[FrameTimecode]], interval: int = None |
| | ) -> Tuple[List[Image.Image], List[float]]: |
| | """Get the frames of a scene. |
| | |
| | Args: |
| | scene (Union[int, Scene, Tuple[FrameTimecode]]): The scene to get frames. Can be the index of the scene, the scene object or a tuple of start and end frame timecode. |
| | interval (int, optional): The interval of the frames to get. Defaults to None. |
| | Raises: |
| | ValueError: If the type of scene is not int, Scene or tuple. |
| | |
| | Returns: |
| | List[ndarray]: The frames of the scene. |
| | """ |
| | if isinstance(scene, int): |
| | scene = self.scenes[scene] |
| | start, end = scene.start, scene.end |
| | elif isinstance(scene, Scene): |
| | start, end = scene.start, scene.end |
| | elif isinstance(scene, tuple): |
| | start, end = scene |
| | else: |
| | raise ValueError( |
| | f"scene should be int, Scene or tuple, not {type(scene).__name__}" |
| | ) |
| | self.stream.seek(start) |
| | frames = [] |
| | time_stamps = [] |
| | if interval is None: |
| | interval = self.frame_extraction_interval * self.stream.frame_rate |
| | scene_len = end.get_frames() - start.get_frames() |
| | if scene_len / 10 > interval: |
| | interval = int(scene_len / 10) + 1 |
| | for index in range(scene_len): |
| | if index % interval == 0: |
| | f = self.stream.read() |
| | frames.append(Image.fromarray(f)) |
| | time_stamps.append(self.stream.position.get_seconds()) |
| | else: |
| | self.stream.read(decode=False) |
| | self.stream.seek(0) |
| | return frames, time_stamps |
| |
|
| | def get_audio_clip( |
| | self, scene: Union[int, Scene, Tuple[FrameTimecode]] |
| | ) -> AudioSegment: |
| | """Get the audio clip of a scene. |
| | |
| | Args: |
| | scene (Union[int, Scene, Tuple[FrameTimecode]]): The scene to get audio clip. Can be the index of the scene, the scene object or a tuple of start and end frame timecode. |
| | |
| | Raises: |
| | ValueError: If the type of scene is not int, Scene or tuple. |
| | |
| | Returns: |
| | AudioSegment: The audio clip of the scene. |
| | """ |
| | if self.audio is None: |
| | return None |
| | if isinstance(scene, int): |
| | scene = self.scenes[scene] |
| | start, end = scene.start, scene.end |
| | elif isinstance(scene, Scene): |
| | start, end = scene.start, scene.end |
| | elif isinstance(scene, tuple): |
| | start, end = scene |
| | else: |
| | raise ValueError( |
| | f"scene should be int, Scene or tuple, not {type(scene).__name__}" |
| | ) |
| |
|
| | return self.audio[ |
| | int(start.get_seconds() * 1000) : int(end.get_seconds() * 1000) |
| | ] |
| |
|
| | def __len__(self): |
| | return len(self.scenes) |
| |
|
| | def __iter__(self): |
| | self.index = 0 |
| | return self |
| |
|
| | def __next__(self): |
| | if self.index >= len(self.scenes): |
| | raise StopIteration |
| | scene = self.scenes[self.index] |
| | self.index += 1 |
| | return scene |
| |
|
| | def __getitem__(self, index): |
| | return self.scenes[index] |
| |
|
| | def __setitem__(self, index, value): |
| | self.scenes[index] = value |
| |
|
| | def to_serializable(self) -> dict: |
| | """Convert VideoScenes to a serializable dictionary.""" |
| | scenes_data = [] |
| | for scene in self.scenes: |
| | scenes_data.append( |
| | { |
| | "start_frame": scene.start.frame_num, |
| | "end_frame": scene.end.frame_num, |
| | "stt_res": scene.stt_res, |
| | "summary": scene.summary, |
| | } |
| | ) |
| |
|
| | return { |
| | "video_path": self.stream.path, |
| | "frame_rate": self.stream.frame_rate, |
| | "scenes": scenes_data, |
| | "frame_extraction_interval": self.frame_extraction_interval, |
| | } |
| |
|
| | @classmethod |
| | def from_serializable(cls, data: dict): |
| | """Rebuild VideoScenes from serialized data.""" |
| | video = open_video(data["video_path"]) |
| | try: |
| | audio = AudioSegment.from_file(data["video_path"]) |
| | audio = normalize(audio) |
| | except Exception: |
| | audio = None |
| |
|
| | |
| | scenes = [] |
| | for scene_data in data["scenes"]: |
| | start = FrameTimecode(scene_data["start_frame"], data["frame_rate"]) |
| | end = FrameTimecode(scene_data["end_frame"], data["frame_rate"]) |
| | scene = Scene.init(start, end) |
| | scene.stt_res = scene_data["stt_res"] |
| | scene.summary = scene_data["summary"] |
| | scenes.append(scene) |
| |
|
| | return cls( |
| | stream=video, |
| | scenes=scenes, |
| | audio=audio, |
| | frame_extraction_interval=data["frame_extraction_interval"], |
| | ) |
| |
|