Tools / src /asset_manager /asset_processor.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
"""
AssetProcessor - Handles video selection and processing using AI
"""
import json
import re
import random
from typing import List, Dict, Optional, Tuple
import pandas as pd
import json_repair
from moviepy.editor import VideoFileClip
from google_src import ai_studio_sdk
from src.logger_config import logger
from src.config import get_config_value
from .video_lib import get_video_lib
class AssetProcessor:
"""
Handles video selection and processing using AI (Gemini).
Usage:
processor = AssetProcessor()
videos = await processor.select_videos(tts_script, timed_transcript)
"""
def __init__(self):
self._video_lib = get_video_lib()
def _parse_duration(self, duration_str: str) -> int:
"""Parse duration from various string formats to integer seconds"""
try:
if pd.isna(duration_str) or duration_str == "":
return 0
duration_str = str(duration_str).lower().strip()
numbers = re.findall(r"(\d+\.?\d*)", duration_str)
if numbers:
return int(float(numbers[0]))
return 0
except (ValueError, TypeError) as e:
logger.warning(f"Failed to parse duration '{duration_str}': {e}")
return 0
async def select_videos(self, tts_script: str, timed_transcript, max_duration: int = 12) -> List[Dict]:
"""Select videos using AI analysis of TTS script"""
try:
logger.debug(f"🤖 AI video selection for script: {tts_script[:300]}...")
selected_videos = await self._analyze_with_gemini(
tts_script=tts_script,
timed_transcript=timed_transcript
)
if not selected_videos:
raise Exception("⚠️ AI selection failed")
for video in selected_videos:
if isinstance(video.get("duration"), str):
video["duration"] = self._parse_duration(video["duration"])
total_duration = sum(int(v.get("duration", 0)) for v in selected_videos)
logger.debug(f"✓ Selected {len(selected_videos)} videos, total: {total_duration}s")
return selected_videos
except Exception as e:
logger.error(f"❌ Video selection failed: {e}")
raise
async def _analyze_with_gemini(self, tts_script: str, timed_transcript) -> List[Dict]:
"""Use Gemini API for contextual video selection"""
try:
video_context = await self.prepare_video_context()
with open("src/prompt/best_matches_two_video_tracking.md", "r", encoding="utf-8") as file:
system_prompt = file.read()
model_input = f"""SYSTEM INSTRUCTION::
{system_prompt}
USER PROMPT:
TTS Script: {tts_script}
Video Options: {video_context}
"""
response = ai_studio_sdk.generate(model_input)
response_text = response.strip()
selection = json_repair.loads(response_text)
selected = []
for item in selection:
video_index = item["video_index"]
if video_index < len(self._video_lib.video_library):
video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["video_url"]]
video = video_row.iloc[0]
selected.append(
{
"url": video.get("Video URL (No Audio)", video.get("url", "")),
"alternate_url": None,
"alternate_url_local_path": None,
"video_summary": video.get("Full Video Description Summary"),
"tts_script_segment": item["tts_script_segment"],
"duration": video.get("duration", 0),
"reason": item["reason"],
"alignment": video.get("Video Alignment with the TTS Script", video.get("alignment", "")),
"energy": video.get("energy_score", 0),
}
)
if "alternate_video_index" in item:
video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["alternate_video_url"]]
video = video_row.iloc[0]
selected[-1]["alternate_url"] = video.get("Video URL (No Audio)", video.get("url", ""))
logger.debug(f"✓ Gemini selected {len(selected)}")
return selected
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.debug(f"Raw response: {response_text[:500]}")
raise
except Exception as e:
logger.error(f"Gemini analysis failed: {e}")
import traceback
traceback.print_exc()
raise
async def prepare_video_context(self) -> str:
"""Prepare video context for AI analysis by reading actual durations"""
# Update durations using actual local files
for video in get_config_value("visual_assets")["all_videos"]:
local_path = video.get("local_path")
if local_path:
try:
with VideoFileClip(local_path) as clip:
video["duration"] = round(clip.duration, 2)
except Exception as e:
logger.warning(f"⚠️ Error reading duration for {local_path}: {e}")
video["duration"] = 0
else:
video["duration"] = 0
# Form video_context string (using actual durations)
video_context = "\n".join(
[
f"{i+1}. {row.get('Video URL (No Audio)')} - "
f"{row.get('Full Video Description Summary', row.get('description', ''))} - "
f"{next((v.get('duration', 0) for v in get_config_value('visual_assets')['all_videos'] if v['url'] == row.get('Video URL (No Audio)')), 0)}s - "
f"Alignment: {row.get('Video Alignment with the TTS Script', row.get('alignment', ''))} - "
f"Usage Count: {get_config_value('video_usage_count').get(row.get('Video URL (No Audio)'), 0)}"
for i, row in self._video_lib.video_library.iterrows()
]
)
return video_context
def select_random_videos(self, count: int) -> List[str]:
"""Select random videos from downloaded library"""
all_videos = get_config_value("visual_assets").get("all_videos", [])
available_videos = [v for v in all_videos if v.get("local_path")]
if len(available_videos) < count:
logger.warning(f"⚠️ Not enough videos to select {count} random videos. Selecting {len(available_videos)} instead.")
count = len(available_videos)
selected_videos = random.sample(available_videos, count)
return [v["local_path"] for v in selected_videos]