""" AssetProcessor - Handles video selection and processing using AI """ import json import re import random from typing import List, Dict, Optional, Tuple import pandas as pd import json_repair from moviepy.editor import VideoFileClip from google_src import ai_studio_sdk from src.logger_config import logger from src.config import get_config_value from .video_lib import get_video_lib class AssetProcessor: """ Handles video selection and processing using AI (Gemini). Usage: processor = AssetProcessor() videos = await processor.select_videos(tts_script, timed_transcript) """ def __init__(self): self._video_lib = get_video_lib() def _parse_duration(self, duration_str: str) -> int: """Parse duration from various string formats to integer seconds""" try: if pd.isna(duration_str) or duration_str == "": return 0 duration_str = str(duration_str).lower().strip() numbers = re.findall(r"(\d+\.?\d*)", duration_str) if numbers: return int(float(numbers[0])) return 0 except (ValueError, TypeError) as e: logger.warning(f"Failed to parse duration '{duration_str}': {e}") return 0 async def select_videos(self, tts_script: str, timed_transcript, max_duration: int = 12) -> List[Dict]: """Select videos using AI analysis of TTS script""" try: logger.debug(f"🤖 AI video selection for script: {tts_script[:300]}...") selected_videos = await self._analyze_with_gemini( tts_script=tts_script, timed_transcript=timed_transcript ) if not selected_videos: raise Exception("⚠️ AI selection failed") for video in selected_videos: if isinstance(video.get("duration"), str): video["duration"] = self._parse_duration(video["duration"]) total_duration = sum(int(v.get("duration", 0)) for v in selected_videos) logger.debug(f"✓ Selected {len(selected_videos)} videos, total: {total_duration}s") return selected_videos except Exception as e: logger.error(f"❌ Video selection failed: {e}") raise async def _analyze_with_gemini(self, tts_script: str, timed_transcript) -> List[Dict]: """Use Gemini API for contextual video selection""" try: video_context = await self.prepare_video_context() with open("src/prompt/best_matches_two_video_tracking.md", "r", encoding="utf-8") as file: system_prompt = file.read() model_input = f"""SYSTEM INSTRUCTION:: {system_prompt} USER PROMPT: TTS Script: {tts_script} Video Options: {video_context} """ response = ai_studio_sdk.generate(model_input) response_text = response.strip() selection = json_repair.loads(response_text) selected = [] for item in selection: video_index = item["video_index"] if video_index < len(self._video_lib.video_library): video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["video_url"]] video = video_row.iloc[0] selected.append( { "url": video.get("Video URL (No Audio)", video.get("url", "")), "alternate_url": None, "alternate_url_local_path": None, "video_summary": video.get("Full Video Description Summary"), "tts_script_segment": item["tts_script_segment"], "duration": video.get("duration", 0), "reason": item["reason"], "alignment": video.get("Video Alignment with the TTS Script", video.get("alignment", "")), "energy": video.get("energy_score", 0), } ) if "alternate_video_index" in item: video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["alternate_video_url"]] video = video_row.iloc[0] selected[-1]["alternate_url"] = video.get("Video URL (No Audio)", video.get("url", "")) logger.debug(f"✓ Gemini selected {len(selected)}") return selected except json.JSONDecodeError as e: logger.error(f"Failed to parse Gemini JSON response: {e}") logger.debug(f"Raw response: {response_text[:500]}") raise except Exception as e: logger.error(f"Gemini analysis failed: {e}") import traceback traceback.print_exc() raise async def prepare_video_context(self) -> str: """Prepare video context for AI analysis by reading actual durations""" # Update durations using actual local files for video in get_config_value("visual_assets")["all_videos"]: local_path = video.get("local_path") if local_path: try: with VideoFileClip(local_path) as clip: video["duration"] = round(clip.duration, 2) except Exception as e: logger.warning(f"⚠️ Error reading duration for {local_path}: {e}") video["duration"] = 0 else: video["duration"] = 0 # Form video_context string (using actual durations) video_context = "\n".join( [ f"{i+1}. {row.get('Video URL (No Audio)')} - " f"{row.get('Full Video Description Summary', row.get('description', ''))} - " f"{next((v.get('duration', 0) for v in get_config_value('visual_assets')['all_videos'] if v['url'] == row.get('Video URL (No Audio)')), 0)}s - " f"Alignment: {row.get('Video Alignment with the TTS Script', row.get('alignment', ''))} - " f"Usage Count: {get_config_value('video_usage_count').get(row.get('Video URL (No Audio)'), 0)}" for i, row in self._video_lib.video_library.iterrows() ] ) return video_context def select_random_videos(self, count: int) -> List[str]: """Select random videos from downloaded library""" all_videos = get_config_value("visual_assets").get("all_videos", []) available_videos = [v for v in all_videos if v.get("local_path")] if len(available_videos) < count: logger.warning(f"⚠️ Not enough videos to select {count} random videos. Selecting {len(available_videos)} instead.") count = len(available_videos) selected_videos = random.sample(available_videos, count) return [v["local_path"] for v in selected_videos]