| | import hashlib |
| | import pickle |
| | import time |
| | from pathlib import Path |
| | from typing import List, Optional, Union |
| |
|
| | import json_repair |
| | from omagent_core.engine.worker.base import BaseWorker |
| | from omagent_core.models.asr.stt import STT |
| | |
| | from omagent_core.models.encoders.azure_encoder import AzureTextEmbeddingV3 |
| | from omagent_core.models.llms.base import BaseLLMBackend |
| | from omagent_core.models.llms.prompt import PromptTemplate |
| | from omagent_core.utils.registry import registry |
| | from pydantic import Field, field_validator |
| | from pydub import AudioSegment |
| | from pydub.effects import normalize |
| | from scenedetect import open_video |
| |
|
| | from ..misc.scene import VideoScenes |
| |
|
| | CURRENT_PATH = root_path = Path(__file__).parents[0] |
| |
|
| |
|
| | @registry.register_worker() |
| | class WebpageVideoPreprocessor(BaseLLMBackend, BaseWorker): |
| | prompts: List[PromptTemplate] = Field( |
| | default=[ |
| | PromptTemplate.from_file( |
| | CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system" |
| | ), |
| | PromptTemplate.from_file( |
| | CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" |
| | ), |
| | ] |
| | ) |
| | text_encoder: AzureTextEmbeddingV3 |
| | |
| | stt: STT |
| | scene_detect_threshold: Union[float, int] = 27 |
| | min_scene_len: int = 1 |
| | frame_extraction_interval: int = 5 |
| | kernel_size: Optional[int] = None |
| | show_progress: bool = True |
| | |
| | use_cache: bool = True |
| | cache_dir: str = "./video_cache" |
| | |
| | @field_validator("stt", mode="before") |
| | @classmethod |
| | def validate_asr(cls, stt): |
| | if isinstance(stt, STT): |
| | return stt |
| | elif isinstance(stt, dict): |
| | return STT(**stt) |
| | else: |
| | raise ValueError("Invalid STT type.") |
| | |
| | def calculate_md5(self, file_path): |
| | md5_hash = hashlib.md5() |
| | with open(file_path, "rb") as file: |
| | for byte_block in iter(lambda: file.read(4096), b""): |
| | md5_hash.update(byte_block) |
| | return md5_hash.hexdigest() |
| | |
| | def _run(self, video_path: str, *args, **kwargs): |
| | """ |
| | Process video files by: |
| | 1. Calculating MD5 hash of input video for caching |
| | 2. Loading video from cache if available and use_cache=True |
| | 3. Otherwise, processing video by: |
| | - Extracting audio and video streams |
| | - Detecting scene boundaries |
| | - Extracting frames at specified intervals |
| | - Generating scene summaries using LLM |
| | - Caching results for future use |
| | |
| | Args: |
| | video_path (str): Path to input video file |
| | *args: Variable length argument list |
| | **kwargs: Arbitrary keyword arguments |
| | |
| | Returns: |
| | dict: Dictionary containing video_md5 and video_path |
| | """ |
| | video_md5 = self.calculate_md5(video_path) |
| | kwargs["video_md5"] = video_md5 |
| | |
| | cache_path = ( |
| | Path(self.cache_dir) |
| | .joinpath(Path(video_path).name.replace("/", "-")) |
| | .joinpath("video_cache.pkl") |
| | ) |
| | |
| | if self.use_cache and cache_path.exists(): |
| | with open(cache_path, "rb") as f: |
| | loaded_scene = pickle.load(f) |
| | try: |
| | audio = AudioSegment.from_file(video_path) |
| | audio = normalize(audio) |
| | except Exception: |
| | audio = None |
| | video = VideoScenes( |
| | stream=open_video(video_path), |
| | audio=audio, |
| | scenes=loaded_scene, |
| | frame_extraction_interval=self.frame_extraction_interval, |
| | ) |
| | self.callback.send_block( |
| | agent_id=self.workflow_instance_id, |
| | msg="Loaded video scenes from cache.\nResume the interrupted transfer for results with scene.summary of None.", |
| | ) |
| | for index, scene in enumerate(video.scenes): |
| | if scene.summary is None: |
| | self.callback.send_block( |
| | agent_id=self.workflow_instance_id, |
| | msg=f"Resume the interrupted transfer for scene {index}.", |
| | ) |
| | video_frames, time_stamps = video.get_video_frames(scene) |
| | try: |
| | chat_complete_res = self.infer( |
| | input_list=[ |
| | { |
| | "stt_res": scene.conversation, |
| | "img_placeholders": "".join( |
| | [ |
| | f"<image_{i}>" |
| | for i in range(len(video_frames)) |
| | ] |
| | ), |
| | } |
| | ], |
| | images=video_frames, |
| | ) |
| | scene.summary = chat_complete_res[0]["choices"][0][ |
| | "message" |
| | ]["content"] |
| | scene_info = scene.summary.get("scene", []) |
| | events = scene.summary.get("events", []) |
| | start_time = scene.start.get_seconds() |
| | end_time = scene.end.get_seconds() |
| | content = ( |
| | f"Time in video: {scene.summary.get('time', 'null')}\n" |
| | f"Location: {scene.summary.get('location', 'null')}\n" |
| | f"Character': {scene.summary.get('character', 'null')}\n" |
| | f"Events: {events}\n" |
| | f"Scene: {scene_info}\n" |
| | f"Summary: {scene.summary.get('summary', '')}" |
| | ) |
| | content_vector = self.text_encoder.infer([content])[0] |
| | self.ltm[index] = { |
| | "value": { |
| | "video_md5": video_md5, |
| | "content": content, |
| | "start_time": start_time, |
| | "end_time": end_time, |
| | }, |
| | "embedding": content_vector, |
| | } |
| | except Exception as e: |
| | self.callback.error( |
| | f"Failed to resume scene {index}: {e}. Set to default." |
| | ) |
| | scene.summary = { |
| | "time": "", |
| | "location": "", |
| | "character": "", |
| | "events": [], |
| | "scene": [], |
| | "summary": "", |
| | } |
| | self.stm(self.workflow_instance_id)["video"] = video.to_serializable() |
| | |
| | with open(cache_path, "wb") as f: |
| | pickle.dump(video.scenes, f) |
| | |
| | |
| | |
| | if not cache_path.exists(): |
| | video = VideoScenes.load( |
| | video_path=video_path, |
| | threshold=self.scene_detect_threshold, |
| | min_scene_len=self.min_scene_len, |
| | frame_extraction_interval=self.frame_extraction_interval, |
| | show_progress=self.show_progress, |
| | kernel_size=self.kernel_size, |
| | ) |
| | self.stm(self.workflow_instance_id)["video"] = video.to_serializable() |
| | |
| | for index, scene in enumerate(video.scenes): |
| | print(f"Processing scene {index} / {len(video.scenes)}...") |
| | audio_clip = video.get_audio_clip(scene) |
| | if audio_clip is None: |
| | scene.stt_res = {"text": ""} |
| | else: |
| | scene.stt_res = self.stt.infer(audio_clip) |
| | video_frames, time_stamps = video.get_video_frames(scene) |
| | try: |
| | face_rec = registry.get_tool("FaceRecognition") |
| | for frame in video_frames: |
| | objs = face_rec.infer(frame) |
| | face_rec.visual_prompting(frame, objs) |
| | except Exception: |
| | pass |
| | try: |
| | chat_complete_res = self.infer( |
| | input_list=[ |
| | { |
| | "stt_res": scene.conversation, |
| | "img_placeholders": "".join( |
| | [f"<image_{i}>" for i in range(len(video_frames))] |
| | ), |
| | } |
| | ], |
| | images=video_frames, |
| | ) |
| | scene.summary = chat_complete_res[0]["choices"][0]["message"][ |
| | "content" |
| | ] |
| | scene_info = scene.summary.get("scene", []) |
| | events = scene.summary.get("events", []) |
| | start_time = scene.start.get_seconds() |
| | end_time = scene.end.get_seconds() |
| | content = ( |
| | f"Time in video: {scene.summary.get('time', 'null')}\n" |
| | f"Location: {scene.summary.get('location', 'null')}\n" |
| | f"Character': {scene.summary.get('character', 'null')}\n" |
| | f"Events: {events}\n" |
| | f"Scene: {scene_info}\n" |
| | f"Summary: {scene.summary.get('summary', '')}" |
| | ) |
| | content_vector = self.text_encoder.infer([content])[0] |
| | self.ltm[index] = { |
| | "value": { |
| | "video_md5": video_md5, |
| | "content": content, |
| | "start_time": start_time, |
| | "end_time": end_time, |
| | }, |
| | "embedding": content_vector, |
| | } |
| | except Exception as e: |
| | self.callback.error(f"Failed to process scene {index}: {e}") |
| | scene.summary = None |
| | |
| | if self.use_cache and not cache_path.exists(): |
| | cache_path.parent.mkdir(parents=True, exist_ok=True) |
| | with open(cache_path, "wb") as f: |
| | pickle.dump(video.scenes, f) |
| | return { |
| | "video_md5": video_md5, |
| | "video_path": video_path, |
| | "instance_id": self.workflow_instance_id, |
| | } |
| |
|