|
|
import asyncio |
|
|
import aiohttp |
|
|
import aiofiles |
|
|
import requests |
|
|
from typing import Optional, Dict, Any, List, Tuple |
|
|
from pathlib import Path |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
import os |
|
|
from src.logger_config import logger |
|
|
from google_src import ai_studio_sdk |
|
|
import json_repair |
|
|
from src.config import get_config_value, set_config_value |
|
|
from moviepy.editor import AudioFileClip |
|
|
|
|
|
import uuid |
|
|
import json |
|
|
|
|
|
class VideoStatus(Enum): |
|
|
"""Video generation status constants""" |
|
|
INIT = "init" |
|
|
START = "start" |
|
|
PENDING = "pending" |
|
|
PROCESS = "process" |
|
|
COPY = "copy" |
|
|
SUCCESS = "success" |
|
|
FAIL = "fail" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AvatarInfo: |
|
|
"""Avatar information container""" |
|
|
avatar_id: str |
|
|
name: str |
|
|
gender: str |
|
|
supports_bg_removal: bool |
|
|
background_color: Optional[str] = None |
|
|
type: str = "custom" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class VoiceInfo: |
|
|
"""Voice information container""" |
|
|
voice_id: str |
|
|
name: str |
|
|
language: str |
|
|
gender: str |
|
|
type: str = "public" |
|
|
|
|
|
|
|
|
class A2EAPIError(Exception): |
|
|
"""Custom exception for A2E API errors""" |
|
|
pass |
|
|
|
|
|
|
|
|
class TalkingVideoGenerator: |
|
|
""" |
|
|
Production-ready talking video generator using A2E.ai APIs |
|
|
Supports green screen background for video overlay workflows |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
a2e_api_key: str, |
|
|
base_url: str = "https://video.a2e.ai", |
|
|
timeout: int = 100, |
|
|
max_retries: int = 3, |
|
|
): |
|
|
""" |
|
|
Initialize the A2E video generator |
|
|
|
|
|
Args: |
|
|
a2e_api_key: A2E API authentication key |
|
|
base_url: Base URL for A2E API |
|
|
timeout: Request timeout in seconds |
|
|
max_retries: Maximum retry attempts for failed requests |
|
|
""" |
|
|
self.api_key = a2e_api_key |
|
|
self.base_url = base_url |
|
|
self.timeout = timeout |
|
|
self.max_retries = max_retries |
|
|
self.headers = { |
|
|
"Authorization": f"Bearer {self.api_key}", |
|
|
"Content-Type": "application/json", |
|
|
"x-lang": "en-US" |
|
|
} |
|
|
|
|
|
|
|
|
self.tmp_dir = Path("/tmp") |
|
|
self.tmp_dir.mkdir(exist_ok=True) |
|
|
|
|
|
logger.debug("TalkingVideoGenerator initialized") |
|
|
|
|
|
async def _make_request( |
|
|
self, |
|
|
method: str, |
|
|
endpoint: str, |
|
|
**kwargs |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Make HTTP request with retry logic |
|
|
|
|
|
Args: |
|
|
method: HTTP method (GET, POST, DELETE) |
|
|
endpoint: API endpoint |
|
|
**kwargs: Additional request parameters |
|
|
|
|
|
Returns: |
|
|
Response JSON data |
|
|
|
|
|
Raises: |
|
|
A2EAPIError: If request fails after retries |
|
|
""" |
|
|
url = f"{self.base_url}{endpoint}" |
|
|
|
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
|
|
if kwargs.get('json'): |
|
|
logger.debug(f"Request to {url}") |
|
|
logger.debug(f"Payload: {kwargs['json']}") |
|
|
|
|
|
async with session.request( |
|
|
method=method, |
|
|
url=url, |
|
|
headers=self.headers, |
|
|
timeout=aiohttp.ClientTimeout(total=self.timeout), |
|
|
**kwargs |
|
|
) as response: |
|
|
|
|
|
response_text = await response.text() |
|
|
|
|
|
if response.status >= 400: |
|
|
logger.error(f"HTTP {response.status} Error") |
|
|
logger.error(f"Response: {response_text}") |
|
|
|
|
|
|
|
|
try: |
|
|
error_data = await response.json() |
|
|
error_msg = error_data.get("msg", response_text) |
|
|
except: |
|
|
error_msg = response_text |
|
|
|
|
|
raise A2EAPIError(f"HTTP {response.status}: {error_msg}") |
|
|
|
|
|
data = await response.json() |
|
|
|
|
|
if data.get("code") != 0: |
|
|
error_msg = data.get("msg", "Unknown error") |
|
|
logger.error(f"API error: {error_msg}") |
|
|
raise A2EAPIError(f"API error: {error_msg}") |
|
|
|
|
|
return data |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
logger.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}") |
|
|
if attempt == self.max_retries - 1: |
|
|
raise A2EAPIError(f"Request failed after {self.max_retries} attempts: {e}") |
|
|
await asyncio.sleep(2 ** attempt) |
|
|
except A2EAPIError: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error: {e}") |
|
|
raise A2EAPIError(f"Unexpected error: {e}") |
|
|
|
|
|
def _make_sync_request( |
|
|
self, |
|
|
method: str, |
|
|
endpoint: str, |
|
|
**kwargs |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Make synchronous HTTP request with retry logic |
|
|
|
|
|
Args: |
|
|
method: HTTP method (GET, POST, DELETE) |
|
|
endpoint: API endpoint |
|
|
**kwargs: Additional request parameters |
|
|
|
|
|
Returns: |
|
|
Response JSON data |
|
|
|
|
|
Raises: |
|
|
A2EAPIError: If request fails after retries |
|
|
""" |
|
|
url = f"{self.base_url}{endpoint}" |
|
|
|
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
response = requests.request( |
|
|
method=method, |
|
|
url=url, |
|
|
headers=self.headers, |
|
|
timeout=self.timeout, |
|
|
**kwargs |
|
|
) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
if data.get("code") != 0: |
|
|
error_msg = data.get("msg", "Unknown error") |
|
|
raise A2EAPIError(f"API error: {error_msg}") |
|
|
|
|
|
return data |
|
|
|
|
|
except requests.RequestException as e: |
|
|
logger.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}") |
|
|
if attempt == self.max_retries - 1: |
|
|
raise A2EAPIError(f"Request failed after {self.max_retries} attempts: {e}") |
|
|
time.sleep(2 ** attempt) |
|
|
|
|
|
async def get_avatars_with_bg_support(self) -> List[AvatarInfo]: |
|
|
""" |
|
|
Get list of avatars that support background removal/replacement |
|
|
|
|
|
Returns: |
|
|
List of AvatarInfo objects with background support |
|
|
""" |
|
|
try: |
|
|
if get_config_value("test_automation"): |
|
|
return [AvatarInfo( |
|
|
avatar_id="6271fa09438162bd7e59b8e5", |
|
|
name="Sara", |
|
|
gender="female", |
|
|
supports_bg_removal=True, |
|
|
background_color="rbg(0,255,0,1)", |
|
|
type="custom" |
|
|
)] |
|
|
logger.debug("Fetching avatars with background support...") |
|
|
|
|
|
|
|
|
data = await self._make_request( |
|
|
"GET", |
|
|
"/api/v1/anchor/character_list" |
|
|
) |
|
|
|
|
|
avatars = [] |
|
|
for avatar_data in data.get("data", []): |
|
|
|
|
|
|
|
|
|
|
|
bg_color = avatar_data.get("background_color", "") |
|
|
bg_img = avatar_data.get("background_img", "") |
|
|
|
|
|
has_bg_support = bool(bg_color and bg_color.strip()) or bool(bg_img and bg_img.strip()) |
|
|
|
|
|
if has_bg_support and avatar_data.get("name", "Unknown") in [ |
|
|
"Mimi", "Katherine", "Samantha", "Emily", "Nova", "Grace", "Julia", "Lijun Deng", "Maneli #2", "Jimmy" |
|
|
]: |
|
|
avatar = AvatarInfo( |
|
|
avatar_id=avatar_data.get("_id"), |
|
|
name=avatar_data.get("name", "Unknown"), |
|
|
gender=avatar_data.get("gender", "female"), |
|
|
supports_bg_removal=True, |
|
|
background_color=avatar_data.get("background_color"), |
|
|
type=avatar_data.get("type", "custom") |
|
|
) |
|
|
avatars.append(avatar) |
|
|
|
|
|
logger.debug(f"Total avatars with BG support: {len(avatars)}") |
|
|
return avatars |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to fetch avatars: {e}") |
|
|
raise A2EAPIError(f"Failed to fetch avatars: {e}") |
|
|
|
|
|
async def get_available_voices( |
|
|
self, |
|
|
country: Optional[str] = None, |
|
|
region: Optional[str] = None, |
|
|
voice_map_type: str = "en-US" |
|
|
) -> List[VoiceInfo]: |
|
|
""" |
|
|
Get list of available TTS voices |
|
|
|
|
|
Args: |
|
|
country: Country code (e.g., "en") |
|
|
region: Region code (e.g., "US") |
|
|
voice_map_type: Language for labels ("en-US" or "zh-CN") |
|
|
|
|
|
Returns: |
|
|
List of VoiceInfo objects |
|
|
""" |
|
|
try: |
|
|
if get_config_value("test_automation"): |
|
|
return [VoiceInfo( |
|
|
voice_id="669b7015ba04042f4090b288", |
|
|
name="Jessica", |
|
|
language="en-US", |
|
|
gender="female", |
|
|
type="public" |
|
|
)] |
|
|
logger.debug("Fetching available voices...") |
|
|
|
|
|
params = {"voice_map_type": voice_map_type} |
|
|
if country: |
|
|
params["country"] = country |
|
|
if region: |
|
|
params["region"] = region |
|
|
|
|
|
data = await self._make_request( |
|
|
"GET", |
|
|
"/api/v1/anchor/voice_list", |
|
|
params=params |
|
|
) |
|
|
|
|
|
voices = [] |
|
|
disallowed = ["ASMR", "John Doe - Intimate", "Ava Multilingual", "ShanShan Chinese Accent", "august", "Emma Multilingual", "WF Texas NPR Storyteller", "Andrew Multilingual", "Alice", "Jessica"] |
|
|
for gender_group in data.get("data", []): |
|
|
gender = gender_group.get("value", "unknown") |
|
|
for voice_data in gender_group.get("children", []): |
|
|
label = voice_data.get("label", "Unknown").strip() |
|
|
if not any(bad_word.lower() in label.lower() for bad_word in disallowed): |
|
|
voice = VoiceInfo( |
|
|
voice_id=voice_data.get("value"), |
|
|
name=voice_data.get("label", "Unknown"), |
|
|
language=f"{country or 'en'}-{region or 'US'}", |
|
|
gender=gender, |
|
|
type="public" |
|
|
) |
|
|
voices.append(voice) |
|
|
|
|
|
logger.debug(f"Total voices available: {len(voices)}") |
|
|
return voices |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to fetch voices: {e}") |
|
|
raise A2EAPIError(f"Failed to fetch voices: {e}") |
|
|
|
|
|
def select_voice_n_avatar( |
|
|
self, |
|
|
avatars: List[AvatarInfo], |
|
|
voices: List[VoiceInfo], |
|
|
tts_script: str, |
|
|
image_prompt: str |
|
|
) -> Tuple[Optional[str], Optional[str]]: |
|
|
logger.debug(f"Selecting avatar and voice for image_prompt: {image_prompt}") |
|
|
|
|
|
with open("src/prompt/avatar_n_voice_selection_a2e.md", "r", encoding="utf-8") as file: |
|
|
system_prompt = file.read() |
|
|
|
|
|
if get_config_value("test_automation"): |
|
|
available_voices = ['Aria - Sexy Female Villain Voice', 'ASMR Dr Lovejoy', 'august', 'Ava Multilingual', 'Emma Multilingual', 'Jessica', 'Laura', 'Natasha - Sensual Hypnosis', 'Nia African', 'SA Brown African', 'Sarah', 'Shannon ASMR', 'ShanShan Chinese Accent', 'Stacy Chinese Accent', 'Andrew Multilingual', 'Brian', 'Brian Multilingual', 'Callum', 'Charlie', 'Chris', 'Damon', 'Daniel', 'Eric', 'Jamal (African American)', 'John Doe - Intimate', 'Josh', 'Liam', 'neuris', 'Road Dawg African', 'Timmy', 'Tony', 'WF Texas NPR Storyteller', 'Will'] |
|
|
available_avatar = ['10-19-2025 11:59:41', '2025-09-17 16:34:53', '08-24-2025 07:19:17', 'Mimi', 'Harper', 'Nick2', 'Victoria', 'Margaret', 'Katherine', 'Elizabeth', 'Emma', 'Lily', 'Mia', 'Lauren', 'Samantha', 'Emily', 'Nova', 'Grace', 'Alex', 'Taylor', 'Alexandra', 'Jessica', 'Ava', 'Ella', 'Julia', 'Mido', 'Kieran', 'Summer', 'Brooke', 'River', 'Skye', 'Autumn', 'Holly', 'Mr.P', 'Ashley', 'Xiaomei', 'Xiaoai', 'Na Li (Frontal View)', 'Na Li (side view)', 'Albert Einstein', 'Lijun Deng', 'Ella', 'Ruoxi', 'Chenyu', 'Jingyi', 'Yahan', 'Shiya', 'Ruotong', 'Muqing', 'Wanqing', 'Jingshu', 'Yutong', 'Zhiyan', 'Feiyili', 'Shanxichen', 'Tainannan', 'YIshuhui', 'Huagulan', 'Jiaxuan', 'Qinlan', 'Qingwan', 'Luyao', 'Ximeng', 'Zixuan', 'Yuhan', 'Phoenix', 'Yanxi', 'Chuxuan', 'Xiyuan', 'Shinuo', 'Xueyan', 'Chenxi', 'Lingxiao', 'Qiya', 'Ruolan', 'Yuning', 'Xingyao', 'Muqing', 'Xiaoran', 'Shutong', 'Luoyi', 'Ximeng', 'Yuxi', 'Yilian', 'LiuJun', 'Li Shi Zhen', 'Jimmy', 'Maneli #2', 'Maneli #1', 'Elsa', 'Jenny', 'Amber', 'Sara'] |
|
|
available_avatar = [{ |
|
|
"name": ava, |
|
|
"Usage Count": 0 |
|
|
} for ava in available_avatar] |
|
|
else: |
|
|
available_voices = [voc.name for voc in voices] |
|
|
available_avatar = [{ |
|
|
"name": ava.name, |
|
|
"Usage Count": get_config_value("avatar_usage_count").get(ava.avatar_id, 0) |
|
|
} for ava in avatars] |
|
|
|
|
|
model_input = f"""SYSTEM INSTRUCTION:: |
|
|
{system_prompt} |
|
|
|
|
|
|
|
|
USER PROMPT: |
|
|
TTS Script: {tts_script} |
|
|
Image Prompt: {image_prompt} |
|
|
Available Voices: {available_voices} |
|
|
Available Avatars with Usage Count: {available_avatar} |
|
|
""" |
|
|
response = ai_studio_sdk.generate(model_input) |
|
|
|
|
|
response_text = response.strip() |
|
|
|
|
|
selection = json_repair.loads(response_text) |
|
|
|
|
|
|
|
|
selected_avatar = None |
|
|
for avatar in avatars: |
|
|
if avatar.name.lower() == selection["selected_avatar"].lower(): |
|
|
selected_avatar = avatar |
|
|
break |
|
|
|
|
|
if not selected_avatar and avatars: |
|
|
selected_avatar = avatars[0] |
|
|
|
|
|
|
|
|
selected_voice = None |
|
|
for voice in voices: |
|
|
if voice.name.lower() == selection["selected_voice"].lower(): |
|
|
selected_voice = voice |
|
|
break |
|
|
|
|
|
if not selected_voice and voices: |
|
|
selected_voice = voices[0] |
|
|
|
|
|
|
|
|
|
|
|
avatar_usage = get_config_value("avatar_usage_count", {}) |
|
|
avatar_usage[selected_avatar.avatar_id] = avatar_usage.get(selected_avatar.avatar_id, 0) + 1 |
|
|
set_config_value("avatar_usage_count", avatar_usage) |
|
|
|
|
|
avatar_id = selected_avatar.avatar_id if selected_avatar else None |
|
|
voice_id = selected_voice.voice_id if selected_voice else None |
|
|
|
|
|
if avatar_id and voice_id: |
|
|
logger.debug(f"Selected avatar: {selected_avatar.name} (ID: {avatar_id})") |
|
|
logger.debug(f"Selected voice: {selected_voice.name} (ID: {voice_id})") |
|
|
else: |
|
|
logger.warning("Could not select appropriate avatar or voice") |
|
|
|
|
|
return avatar_id, voice_id |
|
|
|
|
|
async def _get_audio_duration(self, audio_url: str) -> float: |
|
|
""" |
|
|
Download the audio temporarily and measure duration using AudioFileClip. |
|
|
""" |
|
|
import tempfile |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(audio_url) as resp: |
|
|
if resp.status != 200: |
|
|
raise A2EAPIError(f"Failed to fetch audio: {resp.status}") |
|
|
audio_bytes = await resp.read() |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file: |
|
|
tmp_file.write(audio_bytes) |
|
|
tmp_path = tmp_file.name |
|
|
|
|
|
def get_duration(): |
|
|
with AudioFileClip(tmp_path) as clip: |
|
|
return clip.duration |
|
|
|
|
|
|
|
|
duration = await asyncio.to_thread(get_duration) |
|
|
|
|
|
|
|
|
os.remove(tmp_path) |
|
|
return duration |
|
|
|
|
|
def _get_voice_language_info(self, voice_id: str, voices: List[VoiceInfo]) -> Tuple[str, str]: |
|
|
""" |
|
|
Get country and region from voice_id by looking up in voices list. |
|
|
Falls back to en-US if not found. |
|
|
""" |
|
|
for voice in voices: |
|
|
if voice.voice_id == voice_id: |
|
|
|
|
|
parts = voice.language.split('-') |
|
|
if len(parts) == 2: |
|
|
return parts[0], parts[1] |
|
|
return "en", "US" |
|
|
|
|
|
|
|
|
logger.warning(f"Voice ID {voice_id} not found in voices list, defaulting to en-US") |
|
|
return "en", "US" |
|
|
|
|
|
async def generate_tts_audio( |
|
|
self, |
|
|
tts_script: str, |
|
|
voice_id: str, |
|
|
voices: List[VoiceInfo], |
|
|
speed_rate: float = 1.2, |
|
|
country: str = "en", |
|
|
region: str = "US", |
|
|
min_duration: float = 11.0 |
|
|
) -> str: |
|
|
""" |
|
|
Generate TTS audio. If audio < min_duration, retry with speed_rate=1.0. |
|
|
""" |
|
|
try: |
|
|
if get_config_value("test_automation"): |
|
|
return "testData/Green Screen Avatar Video3.mp3" |
|
|
|
|
|
|
|
|
logger.debug(f"Generating TTS audio (speed: {speed_rate}x)...") |
|
|
|
|
|
country, region = self._get_voice_language_info(voice_id, voices) |
|
|
logger.debug(country) |
|
|
logger.debug(region) |
|
|
|
|
|
payload = { |
|
|
"msg": tts_script, |
|
|
"tts_id": voice_id, |
|
|
"speechRate": speed_rate, |
|
|
"country": country, |
|
|
"region": region |
|
|
} |
|
|
|
|
|
data = await self._make_request( |
|
|
"POST", |
|
|
"/api/v1/video/send_tts", |
|
|
json=payload |
|
|
) |
|
|
|
|
|
audio_url = data.get("data", "") |
|
|
if not audio_url: |
|
|
raise A2EAPIError("No audio URL in response") |
|
|
|
|
|
|
|
|
duration = await self._get_audio_duration(audio_url) |
|
|
logger.debug(f"Speech duration: {duration:.2f}s") |
|
|
|
|
|
|
|
|
if duration < min_duration and speed_rate > 1.0: |
|
|
logger.warning( |
|
|
f"Audio too short ({duration:.2f}s < {min_duration}s). Retrying with slower rate..." |
|
|
) |
|
|
return await self.generate_tts_audio( |
|
|
tts_script, |
|
|
voice_id, |
|
|
voices, |
|
|
speed_rate=1.0, |
|
|
country=country, |
|
|
region=region, |
|
|
min_duration=min_duration |
|
|
) |
|
|
|
|
|
if duration > 15 or duration < 10: |
|
|
raise A2EAPIError(f"Audio is longer:({duration}) for the voice: {voice_id}") |
|
|
|
|
|
logger.debug(f"✓ Final TTS audio generated ({duration:.2f}s): {audio_url}") |
|
|
return audio_url |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to generate TTS audio: {e}") |
|
|
raise A2EAPIError(f"Failed to generate TTS audio: {e}") |
|
|
|
|
|
async def generate_talking_video( |
|
|
self, |
|
|
avatar_id: str, |
|
|
audio_url: str, |
|
|
title: str = "AI Avatar Video", |
|
|
anchor_type: int = 0, |
|
|
resolution: int = 1080, |
|
|
background_color: Tuple[int, int, int] = (0, 255, 0), |
|
|
aspect_ratio: str = "9:16" |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate talking video with green screen background |
|
|
|
|
|
Args: |
|
|
avatar_id: Avatar ID from get_avatars_with_bg_support() |
|
|
audio_url: URL of audio file |
|
|
title: Video title |
|
|
anchor_type: 0=system, 1=custom |
|
|
resolution: Video resolution (720, 1080) |
|
|
background_color: RGB tuple for background (default: green screen) |
|
|
aspect_ratio: Video aspect ratio ("9:16", "16:9", etc.) |
|
|
|
|
|
Returns: |
|
|
Dict with video_url, task_id, and other details |
|
|
|
|
|
Raises: |
|
|
A2EAPIError: If video generation fails |
|
|
""" |
|
|
try: |
|
|
if get_config_value("test_automation"): |
|
|
return { |
|
|
"video_url": "testData/Green Screen Avatar Video3.mp4" |
|
|
} |
|
|
|
|
|
logger.debug(f"Generating talking video with green screen background...") |
|
|
|
|
|
|
|
|
if aspect_ratio == "9:16": |
|
|
if resolution == 1080: |
|
|
width, height = 1080, 1920 |
|
|
else: |
|
|
width, height = 405, 720 |
|
|
elif aspect_ratio == "16:9": |
|
|
if resolution == 1080: |
|
|
width, height = 1920, 1080 |
|
|
else: |
|
|
width, height = 1280, 720 |
|
|
else: |
|
|
|
|
|
width = height = resolution |
|
|
|
|
|
|
|
|
bg_color_str = f"rgba({background_color[0]},{background_color[1]},{background_color[2]},1)" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
payload = { |
|
|
"title": title, |
|
|
"anchor_id": avatar_id, |
|
|
"anchor_type": anchor_type, |
|
|
"audioSrc": audio_url, |
|
|
|
|
|
"color": bg_color_str, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"isSkipRs": True |
|
|
} |
|
|
data = await self._make_request( |
|
|
"POST", |
|
|
"/api/v1/video/generate", |
|
|
json=payload |
|
|
) |
|
|
|
|
|
task_id = data.get("data", {}).get("_id") |
|
|
if not task_id: |
|
|
raise A2EAPIError("No task ID in response") |
|
|
|
|
|
logger.debug(f"✓ Video generation started. Task ID: {task_id}") |
|
|
|
|
|
|
|
|
video_result = await self._poll_video_status(task_id) |
|
|
return video_result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to generate talking video: {e}") |
|
|
raise A2EAPIError(f"Failed to generate talking video: {e}") |
|
|
|
|
|
async def _poll_video_status( |
|
|
self, |
|
|
task_id: str, |
|
|
max_wait: int = 600, |
|
|
poll_interval: int = 5 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Poll video generation status until complete |
|
|
|
|
|
Args: |
|
|
task_id: Video task ID (_id from /api/v1/video/generate) |
|
|
max_wait: Maximum wait time in seconds |
|
|
poll_interval: Polling interval in seconds |
|
|
|
|
|
Returns: |
|
|
Dict with video_url and metadata |
|
|
|
|
|
Raises: |
|
|
A2EAPIError: If video generation fails or times out |
|
|
""" |
|
|
endpoint = "/api/v1/video/awsResult" |
|
|
start_time = time.time() |
|
|
|
|
|
logger.debug(f"Polling video status (max wait: {max_wait}s)...") |
|
|
|
|
|
while time.time() - start_time < max_wait: |
|
|
try: |
|
|
|
|
|
data = await self._make_request( |
|
|
"POST", |
|
|
endpoint, |
|
|
json={"_id": task_id} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
data_array = data.get("data", []) |
|
|
if not data_array: |
|
|
logger.warning("Empty response data, retrying...") |
|
|
await asyncio.sleep(poll_interval) |
|
|
continue |
|
|
|
|
|
result_data = data_array[0] |
|
|
|
|
|
status = result_data.get("status") |
|
|
process = result_data.get("process", 0) |
|
|
|
|
|
if status == "success": |
|
|
video_url = result_data.get("result") |
|
|
if not video_url: |
|
|
raise A2EAPIError("Video completed but no URL provided") |
|
|
|
|
|
logger.debug(f"✓ Video generation completed: {video_url}") |
|
|
|
|
|
return { |
|
|
"video_url": video_url, |
|
|
"task_id": task_id, |
|
|
"status": status, |
|
|
"process": process, |
|
|
"created_at": result_data.get("createdAt"), |
|
|
"anchor_id": result_data.get("anchor_id") |
|
|
} |
|
|
|
|
|
elif status == "fail": |
|
|
error_msg = result_data.get("msg", "Unknown error") |
|
|
raise A2EAPIError(f"Video generation failed: {error_msg}") |
|
|
|
|
|
else: |
|
|
|
|
|
logger.debug(f"⏳ Status: {status} ({process}%)...") |
|
|
await asyncio.sleep(poll_interval) |
|
|
|
|
|
except A2EAPIError: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.warning(f"Error polling status: {e}") |
|
|
await asyncio.sleep(poll_interval) |
|
|
|
|
|
raise A2EAPIError(f"Video generation timed out after {max_wait} seconds") |
|
|
|
|
|
async def download_video( |
|
|
self, |
|
|
video_url: str, |
|
|
filename: Optional[str] = None |
|
|
) -> Path: |
|
|
""" |
|
|
Download video to /tmp directory |
|
|
|
|
|
Args: |
|
|
video_url: URL of the video to download |
|
|
filename: Optional custom filename (default: auto-generated) |
|
|
|
|
|
Returns: |
|
|
Path to downloaded file |
|
|
|
|
|
Raises: |
|
|
A2EAPIError: If download fails |
|
|
""" |
|
|
try: |
|
|
if not filename: |
|
|
timestamp = int(time.time()) |
|
|
filename = f"avatar_video_{timestamp}.mp4" |
|
|
|
|
|
filepath = self.tmp_dir / filename |
|
|
|
|
|
logger.debug(f"Downloading video to {filepath}...") |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(video_url, timeout=aiohttp.ClientTimeout(total=300)) as response: |
|
|
response.raise_for_status() |
|
|
|
|
|
async with aiofiles.open(filepath, 'wb') as f: |
|
|
async for chunk in response.content.iter_chunked(8192): |
|
|
await f.write(chunk) |
|
|
|
|
|
file_size = filepath.stat().st_size / (1024 * 1024) |
|
|
logger.debug(f"✓ Video downloaded successfully ({file_size:.2f} MB): {filepath}") |
|
|
|
|
|
return filepath |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to download video: {e}") |
|
|
raise A2EAPIError(f"Failed to download video: {e}") |
|
|
|
|
|
async def get_all_videos(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get list of all generated videos |
|
|
|
|
|
Returns: |
|
|
List of video objects with metadata |
|
|
""" |
|
|
try: |
|
|
logger.debug("Fetching all generated videos...") |
|
|
|
|
|
data = await self._make_request( |
|
|
"GET", |
|
|
"/api/v1/video/awsList?current=1&pageSize=300", |
|
|
) |
|
|
logger.debug(data) |
|
|
videos = data.get("data", {}).get("data", []) |
|
|
logger.debug(f"Found {len(videos)} videos") |
|
|
|
|
|
return videos |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to fetch videos: {e}") |
|
|
raise |
|
|
|
|
|
async def delete_video(self, video_id: str, title: str = "") -> bool: |
|
|
""" |
|
|
Delete a single video by ID |
|
|
|
|
|
Args: |
|
|
video_id: Video task ID (_id) |
|
|
title: Video title (for logging) |
|
|
|
|
|
Returns: |
|
|
True if deleted successfully, False otherwise |
|
|
""" |
|
|
try: |
|
|
logger.debug(f"Deleting video: {title or video_id}") |
|
|
|
|
|
await self._make_request( |
|
|
"DELETE", |
|
|
f"/api/v1/video/{video_id}", |
|
|
) |
|
|
|
|
|
logger.debug(f"✓ Successfully deleted: {title or video_id}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"✗ Failed to delete {title or video_id}: {e}") |
|
|
return False |
|
|
|
|
|
async def delete_all_videos(self) -> Dict[str, int]: |
|
|
""" |
|
|
Delete all generated avatar videos |
|
|
|
|
|
Args: |
|
|
confirm: Must be True to actually delete (safety check) |
|
|
|
|
|
Returns: |
|
|
Dict with deletion statistics |
|
|
""" |
|
|
logger.debug("=" * 60) |
|
|
logger.debug("DELETING ALL AVATAR VIDEOS") |
|
|
logger.debug("=" * 60) |
|
|
|
|
|
try: |
|
|
|
|
|
videos = await self.get_all_videos() |
|
|
|
|
|
if not videos: |
|
|
logger.debug("No videos to delete") |
|
|
return {"total": 0, "deleted": 0, "failed": 0} |
|
|
|
|
|
logger.debug(f"\nPreparing to delete {len(videos)} videos...") |
|
|
|
|
|
|
|
|
deleted_count = 0 |
|
|
failed_count = 0 |
|
|
|
|
|
for i, video in enumerate(videos, 1): |
|
|
video_id = video.get("_id") |
|
|
title = video.get("title", "Unknown") |
|
|
status = video.get("status", "Unknown") |
|
|
|
|
|
logger.debug(f"\n[{i}/{len(videos)}] Processing: {video_id} {title} (Status: {status})") |
|
|
|
|
|
if video_id: |
|
|
success = await self.delete_video(video_id, title) |
|
|
if success: |
|
|
deleted_count += 1 |
|
|
else: |
|
|
failed_count += 1 |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
else: |
|
|
logger.warning(f"Skipping video without ID: {title}") |
|
|
failed_count += 1 |
|
|
|
|
|
|
|
|
logger.debug("\n" + "=" * 60) |
|
|
logger.debug("DELETION SUMMARY") |
|
|
logger.debug("=" * 60) |
|
|
logger.debug(f"Total videos: {len(videos)}") |
|
|
logger.debug(f"✓ Successfully deleted: {deleted_count}") |
|
|
logger.debug(f"✗ Failed: {failed_count}") |
|
|
logger.debug("=" * 60) |
|
|
exit() |
|
|
return { |
|
|
"total": len(videos), |
|
|
"deleted": deleted_count, |
|
|
"failed": failed_count |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to delete all videos: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
async def create_greenscreen_video_workflow( |
|
|
api_key: str, |
|
|
tts_script: str, |
|
|
image_prompt: str, |
|
|
output_filename: Optional[str] = None, |
|
|
preferred_gender: str = "female", |
|
|
preferred_language: str = "en-US", |
|
|
speed_rate: float = 1.2, |
|
|
title="Green Screen Avatar Video" |
|
|
) -> Path: |
|
|
""" |
|
|
Complete workflow to create a green screen talking video |
|
|
|
|
|
Args: |
|
|
api_key: A2E API key |
|
|
tts_script: Text for avatar to speak |
|
|
output_filename: Optional output filename |
|
|
preferred_gender: Preferred avatar/voice gender |
|
|
preferred_language: Preferred voice language |
|
|
speed_rate: Speech speed multiplier |
|
|
|
|
|
Returns: |
|
|
Path to downloaded video file |
|
|
""" |
|
|
logger.debug("=" * 60) |
|
|
logger.debug("GREEN SCREEN TALKING VIDEO WORKFLOW") |
|
|
logger.debug("=" * 60) |
|
|
|
|
|
generator = TalkingVideoGenerator(a2e_api_key=api_key) |
|
|
|
|
|
try: |
|
|
|
|
|
logger.debug("\n[STEP 1] Fetching avatars with background support...") |
|
|
if get_config_value("delete_all_a2e_videos", False): |
|
|
await generator.delete_all_videos() |
|
|
avatars = await generator.get_avatars_with_bg_support() |
|
|
|
|
|
if not avatars: |
|
|
raise A2EAPIError("No avatars with background support found") |
|
|
|
|
|
|
|
|
logger.debug("\n[STEP 2] Fetching available voices...") |
|
|
voices = [] |
|
|
voices.extend(await generator.get_available_voices( |
|
|
country="en", |
|
|
region="US" |
|
|
)) |
|
|
voices.extend(await generator.get_available_voices( |
|
|
country="en", |
|
|
region="AU" |
|
|
)) |
|
|
voices.extend(await generator.get_available_voices( |
|
|
country="en", |
|
|
region="GB" |
|
|
)) |
|
|
if not voices: |
|
|
raise A2EAPIError("No voices available") |
|
|
|
|
|
|
|
|
logger.debug("\n[STEP 3] Selecting avatar and voice...") |
|
|
avatar_id, voice_id = generator.select_voice_n_avatar( |
|
|
avatars=avatars, |
|
|
voices=voices, |
|
|
tts_script=tts_script, |
|
|
image_prompt=image_prompt |
|
|
) |
|
|
|
|
|
if not avatar_id or not voice_id: |
|
|
raise A2EAPIError("Failed to select avatar or voice") |
|
|
|
|
|
|
|
|
logger.debug("\n[STEP 4] Generating TTS audio...") |
|
|
audio_url = await generator.generate_tts_audio( |
|
|
tts_script=tts_script, |
|
|
voice_id=voice_id, |
|
|
voices=voices, |
|
|
speed_rate=speed_rate, |
|
|
country=preferred_language.split('-')[0] if '-' in preferred_language else "en", |
|
|
region=preferred_language.split('-')[1] if '-' in preferred_language else "US" |
|
|
) |
|
|
|
|
|
|
|
|
logger.debug("\n[STEP 5] Generating talking video with green screen...") |
|
|
video_result = await generator.generate_talking_video( |
|
|
avatar_id=avatar_id, |
|
|
audio_url=audio_url, |
|
|
title=title |
|
|
) |
|
|
|
|
|
video_url = video_result.get("video_url") |
|
|
if not video_url: |
|
|
raise A2EAPIError("No video URL in result") |
|
|
|
|
|
logger.debug("\n") |
|
|
logger.debug("=" * 60) |
|
|
logger.debug("SUCCESS!") |
|
|
logger.debug("=" * 60) |
|
|
|
|
|
return audio_url, video_url |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"✗ Workflow failed: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
SCRIPT = """ |
|
|
Hello and welcome! This is a demonstration of AI-powered avatar video generation |
|
|
with green screen background for easy video overlay and compositing. |
|
|
""" |
|
|
|
|
|
|
|
|
try: |
|
|
audio_url, video_url = asyncio.run( |
|
|
create_greenscreen_video_workflow( |
|
|
api_key=get_config_value("a2e_api_key"), |
|
|
tts_script=SCRIPT, |
|
|
output_filename="avatar_greenscreen.mp4", |
|
|
preferred_gender="female", |
|
|
preferred_language="en-US", |
|
|
speed_rate=1.2 |
|
|
) |
|
|
) |
|
|
print(f"\n✓ Video ready at: {video_url}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\n✗ Error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |