File size: 7,098 Bytes
ea33c8c 3a1dbcc f20025d 9d54dfd ea33c8c a4aa882 ea33c8c a4aa882 ea33c8c 503d4ac ea33c8c 503d4ac ea33c8c 3a1dbcc ea33c8c 9d1dc2c ea33c8c 9d1dc2c ea33c8c 503d4ac ea33c8c a4aa882 ea33c8c a4aa882 ea33c8c a4aa882 9d1dc2c ea33c8c a4aa882 ea33c8c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
"""
AssetProcessor - Handles video selection and processing using AI
"""
import json
import re
import random
from typing import List, Dict, Optional, Tuple
import pandas as pd
import json_repair
from moviepy.editor import VideoFileClip
from google_src import ai_studio_sdk
from src.logger_config import logger
from src.config import get_config_value
from .video_lib import get_video_lib
class AssetProcessor:
"""
Handles video selection and processing using AI (Gemini).
Usage:
processor = AssetProcessor()
videos = await processor.select_videos(tts_script, timed_transcript)
"""
def __init__(self):
self._video_lib = get_video_lib()
def _parse_duration(self, duration_str: str) -> int:
"""Parse duration from various string formats to integer seconds"""
try:
if pd.isna(duration_str) or duration_str == "":
return 0
duration_str = str(duration_str).lower().strip()
numbers = re.findall(r"(\d+\.?\d*)", duration_str)
if numbers:
return int(float(numbers[0]))
return 0
except (ValueError, TypeError) as e:
logger.warning(f"Failed to parse duration '{duration_str}': {e}")
return 0
async def select_videos(self, tts_script: str, timed_transcript, max_duration: int = 12) -> List[Dict]:
"""Select videos using AI analysis of TTS script"""
try:
logger.debug(f"🤖 AI video selection for script: {tts_script[:300]}...")
selected_videos = await self._analyze_with_gemini(
tts_script=tts_script,
timed_transcript=timed_transcript
)
if not selected_videos:
raise Exception("⚠️ AI selection failed")
for video in selected_videos:
if isinstance(video.get("duration"), str):
video["duration"] = self._parse_duration(video["duration"])
total_duration = sum(int(v.get("duration", 0)) for v in selected_videos)
logger.debug(f"✓ Selected {len(selected_videos)} videos, total: {total_duration}s")
return selected_videos
except Exception as e:
logger.error(f"❌ Video selection failed: {e}")
raise
async def _analyze_with_gemini(self, tts_script: str, timed_transcript) -> List[Dict]:
"""Use Gemini API for contextual video selection"""
try:
video_context = await self.prepare_video_context()
with open("src/prompt/best_matches_two_video_tracking.md", "r", encoding="utf-8") as file:
system_prompt = file.read()
model_input = f"""SYSTEM INSTRUCTION::
{system_prompt}
USER PROMPT:
TTS Script: {tts_script}
Video Options: {video_context}
"""
response = ai_studio_sdk.generate(model_input)
response_text = response.strip()
selection = json_repair.loads(response_text)
selected = []
for item in selection:
video_index = item["video_index"]
if video_index < len(self._video_lib.video_library):
video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["video_url"]]
video = video_row.iloc[0]
selected.append(
{
"url": video.get("Video URL (No Audio)", video.get("url", "")),
"alternate_url": None,
"alternate_url_local_path": None,
"video_summary": video.get("Full Video Description Summary"),
"tts_script_segment": item["tts_script_segment"],
"duration": video.get("duration", 0),
"reason": item["reason"],
"alignment": video.get("Video Alignment with the TTS Script", video.get("alignment", "")),
"energy": video.get("energy_score", 0),
}
)
if "alternate_video_index" in item:
video_row = self._video_lib.video_library[self._video_lib.video_library["Video URL (No Audio)"] == item["alternate_video_url"]]
video = video_row.iloc[0]
selected[-1]["alternate_url"] = video.get("Video URL (No Audio)", video.get("url", ""))
logger.debug(f"✓ Gemini selected {len(selected)}")
return selected
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.debug(f"Raw response: {response_text[:500]}")
raise
except Exception as e:
logger.error(f"Gemini analysis failed: {e}")
import traceback
traceback.print_exc()
raise
async def prepare_video_context(self) -> str:
"""Prepare video context for AI analysis by reading actual durations"""
# Update durations using actual local files
for video in get_config_value("visual_assets")["all_videos"]:
local_path = video.get("local_path")
if local_path:
try:
with VideoFileClip(local_path) as clip:
video["duration"] = round(clip.duration, 2)
except Exception as e:
logger.warning(f"⚠️ Error reading duration for {local_path}: {e}")
video["duration"] = 0
else:
video["duration"] = 0
# Form video_context string (using actual durations)
video_context = "\n".join(
[
f"{i+1}. {row.get('Video URL (No Audio)')} - "
f"{row.get('Full Video Description Summary', row.get('description', ''))} - "
f"{next((v.get('duration', 0) for v in get_config_value('visual_assets')['all_videos'] if v['url'] == row.get('Video URL (No Audio)')), 0)}s - "
f"Alignment: {row.get('Video Alignment with the TTS Script', row.get('alignment', ''))} - "
f"Usage Count: {get_config_value('video_usage_count').get(row.get('Video URL (No Audio)'), 0)}"
for i, row in self._video_lib.video_library.iterrows()
]
)
return video_context
def select_random_videos(self, count: int) -> List[str]:
"""Select random videos from downloaded library"""
all_videos = get_config_value("visual_assets").get("all_videos", [])
available_videos = [v for v in all_videos if v.get("local_path")]
if len(available_videos) < count:
logger.warning(f"⚠️ Not enough videos to select {count} random videos. Selecting {len(available_videos)} instead.")
count = len(available_videos)
selected_videos = random.sample(available_videos, count)
return [v["local_path"] for v in selected_videos]
|