|
|
""" |
|
|
AssetProcessor - Handles video selection and processing using AI |
|
|
""" |
|
|
|
|
|
import json |
|
|
import re |
|
|
import random |
|
|
from typing import List, Dict, Optional, Tuple |
|
|
|
|
|
import pandas as pd |
|
|
import json_repair |
|
|
from moviepy.editor import VideoFileClip |
|
|
from google_src import ai_studio_sdk |
|
|
from src.logger_config import logger |
|
|
from src.config import get_config_value |
|
|
from .video_lib import get_video_lib |
|
|
|
|
|
|
|
|
class AssetProcessor: |
|
|
""" |
|
|
Handles video selection and processing using AI (Gemini). |
|
|
|
|
|
Usage: |
|
|
processor = AssetProcessor() |
|
|
videos = await processor.select_videos(tts_script, timed_transcript) |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._video_lib = get_video_lib() |
|
|
|
|
|
def _parse_duration(self, duration_str: str) -> int: |
|
|
"""Parse duration from various string formats to integer seconds""" |
|
|
try: |
|
|
if pd.isna(duration_str) or duration_str == "": |
|
|
return 0 |
|
|
|
|
|
duration_str = str(duration_str).lower().strip() |
|
|
numbers = re.findall(r"(\d+\.?\d*)", duration_str) |
|
|
if numbers: |
|
|
return int(float(numbers[0])) |
|
|
|
|
|
return 0 |
|
|
except (ValueError, TypeError) as e: |
|
|
logger.warning(f"Failed to parse duration '{duration_str}': {e}") |
|
|
return 0 |
|
|
|
|
|
async def select_videos(self, tts_script: str, timed_transcript, max_duration: int = 12) -> List[Dict]: |
|
|
"""Select videos using AI analysis of TTS script""" |
|
|
try: |
|
|
logger.debug(f"🤖 AI video selection for script: {tts_script[:300]}...") |
|
|
|
|
|
selected_videos = await self._analyze_with_gemini( |
|
|
tts_script=tts_script, |
|
|
timed_transcript=timed_transcript |
|
|
) |
|
|
|
|
|
if not selected_videos: |
|
|
raise Exception("⚠️ AI selection failed") |
|
|
|
|
|
for video in selected_videos: |
|
|
if isinstance(video.get("duration"), str): |
|
|
video["duration"] = self._parse_duration(video["duration"]) |
|
|
|
|
|
total_duration = sum(int(v.get("duration", 0)) for v in selected_videos) |
|
|
logger.debug(f"✓ Selected {len(selected_videos)} videos, total: {total_duration}s") |
|
|
|
|
|
return selected_videos |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Video selection failed: {e}") |
|
|
raise |
|
|
|
|
|
async def _analyze_with_gemini(self, tts_script: str, timed_transcript) -> List[Dict]: |
|
|
"""Use Gemini API for contextual video selection""" |
|
|
try: |
|
|
video_context = await self.prepare_video_context() |
|
|
with open("src/prompt/best_matches_two_video_tracking.md", "r", encoding="utf-8") as file: |
|
|
system_prompt = file.read() |
|
|
|
|
|
model_input = f"""SYSTEM INSTRUCTION:: |
|
|
{system_prompt} |
|
|
|
|
|
|
|
|
USER PROMPT: |
|
|
TTS Script: {tts_script} |
|
|
Video Options: {video_context} |
|
|
""" |
|
|
response = ai_studio_sdk.generate(model_input) |
|
|
|
|
|
response_text = response.strip() |
|
|
|
|
|
selection = json_repair.loads(response_text) |
|
|
|
|
|
selected = [] |
|
|
for item in selection: |
|
|
video_index = item["video_index"] |
|
|
if video_index < len(self._video_lib.video_library): |
|
|
video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["video_url"]] |
|
|
video = video_row.iloc[0] |
|
|
selected.append( |
|
|
{ |
|
|
"url": video.get("Video URL (No Audio)", video.get("url", "")), |
|
|
"alternate_url": None, |
|
|
"alternate_url_local_path": None, |
|
|
"video_summary": video.get("Full Video Description Summary"), |
|
|
"tts_script_segment": item["tts_script_segment"], |
|
|
"duration": video.get("duration", 0), |
|
|
"reason": item["reason"], |
|
|
"alignment": video.get("Video Alignment with the TTS Script", video.get("alignment", "")), |
|
|
"energy": video.get("energy_score", 0), |
|
|
} |
|
|
) |
|
|
if "alternate_video_index" in item: |
|
|
video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["alternate_video_url"]] |
|
|
video = video_row.iloc[0] |
|
|
selected[-1]["alternate_url"] = video.get("Video URL (No Audio)", video.get("url", "")) |
|
|
|
|
|
logger.debug(f"✓ Gemini selected {len(selected)}") |
|
|
return selected |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Failed to parse Gemini JSON response: {e}") |
|
|
logger.debug(f"Raw response: {response_text[:500]}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Gemini analysis failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
raise |
|
|
|
|
|
async def prepare_video_context(self) -> str: |
|
|
"""Prepare video context for AI analysis by reading actual durations""" |
|
|
|
|
|
for video in get_config_value("visual_assets")["all_videos"]: |
|
|
local_path = video.get("local_path") |
|
|
if local_path: |
|
|
try: |
|
|
with VideoFileClip(local_path) as clip: |
|
|
video["duration"] = round(clip.duration, 2) |
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ Error reading duration for {local_path}: {e}") |
|
|
video["duration"] = 0 |
|
|
else: |
|
|
video["duration"] = 0 |
|
|
|
|
|
|
|
|
video_context = "\n".join( |
|
|
[ |
|
|
f"{i+1}. {row.get('Video URL (No Audio)')} - " |
|
|
f"{row.get('Full Video Description Summary', row.get('description', ''))} - " |
|
|
f"{next((v.get('duration', 0) for v in get_config_value('visual_assets')['all_videos'] if v['url'] == row.get('Video URL (No Audio)')), 0)}s - " |
|
|
f"Alignment: {row.get('Video Alignment with the TTS Script', row.get('alignment', ''))} - " |
|
|
f"Usage Count: {get_config_value('video_usage_count').get(row.get('Video URL (No Audio)'), 0)}" |
|
|
for i, row in self._video_lib.video_library.iterrows() |
|
|
] |
|
|
) |
|
|
|
|
|
return video_context |
|
|
|
|
|
def select_random_videos(self, count: int) -> List[str]: |
|
|
"""Select random videos from downloaded library""" |
|
|
all_videos = get_config_value("visual_assets").get("all_videos", []) |
|
|
available_videos = [v for v in all_videos if v.get("local_path")] |
|
|
|
|
|
if len(available_videos) < count: |
|
|
logger.warning(f"⚠️ Not enough videos to select {count} random videos. Selecting {len(available_videos)} instead.") |
|
|
count = len(available_videos) |
|
|
|
|
|
selected_videos = random.sample(available_videos, count) |
|
|
|
|
|
return [v["local_path"] for v in selected_videos] |
|
|
|