import asyncio import aiohttp import aiofiles import requests from typing import Optional, Dict, Any, List, Tuple from pathlib import Path import time from dataclasses import dataclass from enum import Enum import os from src.logger_config import logger from google_src import ai_studio_sdk import json_repair from src.config import get_config_value, set_config_value from moviepy.editor import AudioFileClip import uuid import json class VideoStatus(Enum): """Video generation status constants""" INIT = "init" START = "start" PENDING = "pending" PROCESS = "process" COPY = "copy" SUCCESS = "success" FAIL = "fail" @dataclass class AvatarInfo: """Avatar information container""" avatar_id: str name: str gender: str supports_bg_removal: bool background_color: Optional[str] = None type: str = "custom" @dataclass class VoiceInfo: """Voice information container""" voice_id: str name: str language: str gender: str type: str = "public" class A2EAPIError(Exception): """Custom exception for A2E API errors""" pass class TalkingVideoGenerator: """ Production-ready talking video generator using A2E.ai APIs Supports green screen background for video overlay workflows """ def __init__( self, a2e_api_key: str, base_url: str = "https://video.a2e.ai", timeout: int = 100, max_retries: int = 3, ): """ Initialize the A2E video generator Args: a2e_api_key: A2E API authentication key base_url: Base URL for A2E API timeout: Request timeout in seconds max_retries: Maximum retry attempts for failed requests """ self.api_key = a2e_api_key self.base_url = base_url self.timeout = timeout self.max_retries = max_retries self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "x-lang": "en-US" } # Ensure tmp directory exists self.tmp_dir = Path("/tmp") self.tmp_dir.mkdir(exist_ok=True) logger.debug("TalkingVideoGenerator initialized") async def _make_request( self, method: str, endpoint: str, **kwargs ) -> Dict[str, Any]: """ Make HTTP request with retry logic Args: method: HTTP method (GET, POST, DELETE) endpoint: API endpoint **kwargs: Additional request parameters Returns: Response JSON data Raises: A2EAPIError: If request fails after retries """ url = f"{self.base_url}{endpoint}" for attempt in range(self.max_retries): try: async with aiohttp.ClientSession() as session: # Log request details for debugging if kwargs.get('json'): logger.debug(f"Request to {url}") logger.debug(f"Payload: {kwargs['json']}") async with session.request( method=method, url=url, headers=self.headers, timeout=aiohttp.ClientTimeout(total=self.timeout), **kwargs ) as response: # Get response text for better error messages response_text = await response.text() if response.status >= 400: logger.error(f"HTTP {response.status} Error") logger.error(f"Response: {response_text}") # Try to parse JSON error try: error_data = await response.json() error_msg = error_data.get("msg", response_text) except: error_msg = response_text raise A2EAPIError(f"HTTP {response.status}: {error_msg}") data = await response.json() if data.get("code") != 0: error_msg = data.get("msg", "Unknown error") logger.error(f"API error: {error_msg}") raise A2EAPIError(f"API error: {error_msg}") return data except aiohttp.ClientError as e: logger.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}") if attempt == self.max_retries - 1: raise A2EAPIError(f"Request failed after {self.max_retries} attempts: {e}") await asyncio.sleep(2 ** attempt) # Exponential backoff except A2EAPIError: raise except Exception as e: logger.error(f"Unexpected error: {e}") raise A2EAPIError(f"Unexpected error: {e}") def _make_sync_request( self, method: str, endpoint: str, **kwargs ) -> Dict[str, Any]: """ Make synchronous HTTP request with retry logic Args: method: HTTP method (GET, POST, DELETE) endpoint: API endpoint **kwargs: Additional request parameters Returns: Response JSON data Raises: A2EAPIError: If request fails after retries """ url = f"{self.base_url}{endpoint}" for attempt in range(self.max_retries): try: response = requests.request( method=method, url=url, headers=self.headers, timeout=self.timeout, **kwargs ) response.raise_for_status() data = response.json() if data.get("code") != 0: error_msg = data.get("msg", "Unknown error") raise A2EAPIError(f"API error: {error_msg}") return data except requests.RequestException as e: logger.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}") if attempt == self.max_retries - 1: raise A2EAPIError(f"Request failed after {self.max_retries} attempts: {e}") time.sleep(2 ** attempt) # Exponential backoff async def get_avatars_with_bg_support(self) -> List[AvatarInfo]: """ Get list of avatars that support background removal/replacement Returns: List of AvatarInfo objects with background support """ try: if get_config_value("test_automation"): return [AvatarInfo( avatar_id="6271fa09438162bd7e59b8e5", name="Sara", gender="female", supports_bg_removal=True, background_color="rbg(0,255,0,1)", type="custom" )] logger.debug("Fetching avatars with background support...") # Get all custom avatars data = await self._make_request( "GET", "/api/v1/anchor/character_list" ) avatars = [] for avatar_data in data.get("data", []): # Check if avatar supports background manipulation # Avatars with background_color or background_img support BG manipulation # Must check for non-empty strings, not just None bg_color = avatar_data.get("background_color", "") bg_img = avatar_data.get("background_img", "") has_bg_support = bool(bg_color and bg_color.strip()) or bool(bg_img and bg_img.strip()) if has_bg_support and avatar_data.get("name", "Unknown") in [ "Mimi", "Katherine", "Samantha", "Emily", "Nova", "Grace", "Julia", "Lijun Deng", "Maneli #2", "Jimmy" ]: avatar = AvatarInfo( avatar_id=avatar_data.get("_id"), name=avatar_data.get("name", "Unknown"), gender=avatar_data.get("gender", "female"), supports_bg_removal=True, background_color=avatar_data.get("background_color"), type=avatar_data.get("type", "custom") ) avatars.append(avatar) logger.debug(f"Total avatars with BG support: {len(avatars)}") return avatars except Exception as e: logger.error(f"Failed to fetch avatars: {e}") raise A2EAPIError(f"Failed to fetch avatars: {e}") async def get_available_voices( self, country: Optional[str] = None, region: Optional[str] = None, voice_map_type: str = "en-US" ) -> List[VoiceInfo]: """ Get list of available TTS voices Args: country: Country code (e.g., "en") region: Region code (e.g., "US") voice_map_type: Language for labels ("en-US" or "zh-CN") Returns: List of VoiceInfo objects """ try: if get_config_value("test_automation"): return [VoiceInfo( voice_id="669b7015ba04042f4090b288", name="Jessica", language="en-US", gender="female", type="public" )] logger.debug("Fetching available voices...") params = {"voice_map_type": voice_map_type} if country: params["country"] = country if region: params["region"] = region data = await self._make_request( "GET", "/api/v1/anchor/voice_list", params=params ) voices = [] disallowed = ["ASMR", "John Doe - Intimate", "Ava Multilingual", "ShanShan Chinese Accent", "august", "Emma Multilingual", "WF Texas NPR Storyteller", "Andrew Multilingual", "Alice", "Jessica"] for gender_group in data.get("data", []): gender = gender_group.get("value", "unknown") for voice_data in gender_group.get("children", []): label = voice_data.get("label", "Unknown").strip() if not any(bad_word.lower() in label.lower() for bad_word in disallowed): voice = VoiceInfo( voice_id=voice_data.get("value"), name=voice_data.get("label", "Unknown"), language=f"{country or 'en'}-{region or 'US'}", gender=gender, type="public" ) voices.append(voice) logger.debug(f"Total voices available: {len(voices)}") return voices except Exception as e: logger.error(f"Failed to fetch voices: {e}") raise A2EAPIError(f"Failed to fetch voices: {e}") def select_voice_n_avatar( self, avatars: List[AvatarInfo], voices: List[VoiceInfo], tts_script: str, image_prompt: str ) -> Tuple[Optional[str], Optional[str]]: logger.debug(f"Selecting avatar and voice for image_prompt: {image_prompt}") with open("src/prompt/avatar_n_voice_selection_a2e.md", "r", encoding="utf-8") as file: system_prompt = file.read() if get_config_value("test_automation"): available_voices = ['Aria - Sexy Female Villain Voice', 'ASMR Dr Lovejoy', 'august', 'Ava Multilingual', 'Emma Multilingual', 'Jessica', 'Laura', 'Natasha - Sensual Hypnosis', 'Nia African', 'SA Brown African', 'Sarah', 'Shannon ASMR', 'ShanShan Chinese Accent', 'Stacy Chinese Accent', 'Andrew Multilingual', 'Brian', 'Brian Multilingual', 'Callum', 'Charlie', 'Chris', 'Damon', 'Daniel', 'Eric', 'Jamal (African American)', 'John Doe - Intimate', 'Josh', 'Liam', 'neuris', 'Road Dawg African', 'Timmy', 'Tony', 'WF Texas NPR Storyteller', 'Will'] available_avatar = ['10-19-2025 11:59:41', '2025-09-17 16:34:53', '08-24-2025 07:19:17', 'Mimi', 'Harper', 'Nick2', 'Victoria', 'Margaret', 'Katherine', 'Elizabeth', 'Emma', 'Lily', 'Mia', 'Lauren', 'Samantha', 'Emily', 'Nova', 'Grace', 'Alex', 'Taylor', 'Alexandra', 'Jessica', 'Ava', 'Ella', 'Julia', 'Mido', 'Kieran', 'Summer', 'Brooke', 'River', 'Skye', 'Autumn', 'Holly', 'Mr.P', 'Ashley', 'Xiaomei', 'Xiaoai', 'Na Li (Frontal View)', 'Na Li (side view)', 'Albert Einstein', 'Lijun Deng', 'Ella', 'Ruoxi', 'Chenyu', 'Jingyi', 'Yahan', 'Shiya', 'Ruotong', 'Muqing', 'Wanqing', 'Jingshu', 'Yutong', 'Zhiyan', 'Feiyili', 'Shanxichen', 'Tainannan', 'YIshuhui', 'Huagulan', 'Jiaxuan', 'Qinlan', 'Qingwan', 'Luyao', 'Ximeng', 'Zixuan', 'Yuhan', 'Phoenix', 'Yanxi', 'Chuxuan', 'Xiyuan', 'Shinuo', 'Xueyan', 'Chenxi', 'Lingxiao', 'Qiya', 'Ruolan', 'Yuning', 'Xingyao', 'Muqing', 'Xiaoran', 'Shutong', 'Luoyi', 'Ximeng', 'Yuxi', 'Yilian', 'LiuJun', 'Li Shi Zhen', 'Jimmy', 'Maneli #2', 'Maneli #1', 'Elsa', 'Jenny', 'Amber', 'Sara'] available_avatar = [{ "name": ava, "Usage Count": 0 } for ava in available_avatar] else: available_voices = [voc.name for voc in voices] available_avatar = [{ "name": ava.name, "Usage Count": get_config_value("avatar_usage_count").get(ava.avatar_id, 0) } for ava in avatars] model_input = f"""SYSTEM INSTRUCTION:: {system_prompt} USER PROMPT: TTS Script: {tts_script} Image Prompt: {image_prompt} Available Voices: {available_voices} Available Avatars with Usage Count: {available_avatar} """ response = ai_studio_sdk.generate(model_input) response_text = response.strip() selection = json_repair.loads(response_text) # Select avatar selected_avatar = None for avatar in avatars: if avatar.name.lower() == selection["selected_avatar"].lower(): selected_avatar = avatar break if not selected_avatar and avatars: selected_avatar = avatars[0] # Fallback to first available # Select voice selected_voice = None for voice in voices: if voice.name.lower() == selection["selected_voice"].lower(): selected_voice = voice break if not selected_voice and voices: selected_voice = voices[0] # Fallback to first available avatar_usage = get_config_value("avatar_usage_count", {}) avatar_usage[selected_avatar.avatar_id] = avatar_usage.get(selected_avatar.avatar_id, 0) + 1 set_config_value("avatar_usage_count", avatar_usage) avatar_id = selected_avatar.avatar_id if selected_avatar else None voice_id = selected_voice.voice_id if selected_voice else None if avatar_id and voice_id: logger.debug(f"Selected avatar: {selected_avatar.name} (ID: {avatar_id})") logger.debug(f"Selected voice: {selected_voice.name} (ID: {voice_id})") else: logger.warning("Could not select appropriate avatar or voice") return avatar_id, voice_id async def _get_audio_duration(self, audio_url: str) -> float: """ Download the audio temporarily and measure duration using AudioFileClip. """ import tempfile async with aiohttp.ClientSession() as session: async with session.get(audio_url) as resp: if resp.status != 200: raise A2EAPIError(f"Failed to fetch audio: {resp.status}") audio_bytes = await resp.read() # Save temporarily to disk since AudioFileClip requires a file with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file: tmp_file.write(audio_bytes) tmp_path = tmp_file.name def get_duration(): with AudioFileClip(tmp_path) as clip: return clip.duration # Run blocking code in a thread duration = await asyncio.to_thread(get_duration) # Clean up file os.remove(tmp_path) return duration def _get_voice_language_info(self, voice_id: str, voices: List[VoiceInfo]) -> Tuple[str, str]: """ Get country and region from voice_id by looking up in voices list. Falls back to en-US if not found. """ for voice in voices: if voice.voice_id == voice_id: # Parse language string like "en-US" -> ("en", "US") parts = voice.language.split('-') if len(parts) == 2: return parts[0], parts[1] return "en", "US" # Fallback # If voice not found, default to en-US logger.warning(f"Voice ID {voice_id} not found in voices list, defaulting to en-US") return "en", "US" async def generate_tts_audio( self, tts_script: str, voice_id: str, voices: List[VoiceInfo], speed_rate: float = 1.2, country: str = "en", region: str = "US", min_duration: float = 11.0 ) -> str: """ Generate TTS audio. If audio < min_duration, retry with speed_rate=1.0. """ try: if get_config_value("test_automation"): return "testData/Green Screen Avatar Video3.mp3" # --- First attempt --- logger.debug(f"Generating TTS audio (speed: {speed_rate}x)...") country, region = self._get_voice_language_info(voice_id, voices) logger.debug(country) logger.debug(region) payload = { "msg": tts_script, "tts_id": voice_id, "speechRate": speed_rate, "country": country, "region": region } data = await self._make_request( "POST", "/api/v1/video/send_tts", json=payload ) audio_url = data.get("data", "") if not audio_url: raise A2EAPIError("No audio URL in response") # Measure duration duration = await self._get_audio_duration(audio_url) logger.debug(f"Speech duration: {duration:.2f}s") # Retry if too short if duration < min_duration and speed_rate > 1.0: logger.warning( f"Audio too short ({duration:.2f}s < {min_duration}s). Retrying with slower rate..." ) return await self.generate_tts_audio( tts_script, voice_id, voices, speed_rate=1.0, country=country, region=region, min_duration=min_duration ) if duration > 15 or duration < 10: raise A2EAPIError(f"Audio is longer:({duration}) for the voice: {voice_id}") logger.debug(f"✓ Final TTS audio generated ({duration:.2f}s): {audio_url}") return audio_url except Exception as e: logger.error(f"Failed to generate TTS audio: {e}") raise A2EAPIError(f"Failed to generate TTS audio: {e}") async def generate_talking_video( self, avatar_id: str, audio_url: str, title: str = "AI Avatar Video", anchor_type: int = 0, resolution: int = 1080, background_color: Tuple[int, int, int] = (0, 255, 0), aspect_ratio: str = "9:16" ) -> Dict[str, Any]: """ Generate talking video with green screen background Args: avatar_id: Avatar ID from get_avatars_with_bg_support() audio_url: URL of audio file title: Video title anchor_type: 0=system, 1=custom resolution: Video resolution (720, 1080) background_color: RGB tuple for background (default: green screen) aspect_ratio: Video aspect ratio ("9:16", "16:9", etc.) Returns: Dict with video_url, task_id, and other details Raises: A2EAPIError: If video generation fails """ try: if get_config_value("test_automation"): return { "video_url": "testData/Green Screen Avatar Video3.mp4" } logger.debug(f"Generating talking video with green screen background...") # Calculate dimensions based on aspect ratio if aspect_ratio == "9:16": if resolution == 1080: width, height = 1080, 1920 else: # 720 width, height = 405, 720 elif aspect_ratio == "16:9": if resolution == 1080: width, height = 1920, 1080 else: # 720 width, height = 1280, 720 else: # Default to square width = height = resolution # Convert RGB tuple to rgba() format string (alpha = 1 for fully opaque) bg_color_str = f"rgba({background_color[0]},{background_color[1]},{background_color[2]},1)" # { # "title": "11-09-2025 16:53:47", # "isSkipRs": true, # "isAllowReverse": true, # "resolution": 1080, # "isSubtitleEnabled": false, # "anchor_id": "67209ea6a9050edd960698c0", # "anchor_type": 0, # "msg": "Hello Ther.", # "tts_id": "66ca504f2732c24634224075", # "speech_rate": 1.2, # "audioSrc": "https://1day-tos.a2e.com.cn/tts3party_cache/stable/el/Aria/speed/a15fe669931864094b03fe17b7b2067d.wav", # "color": "rgba(0,255,0,1)", # "web_bg_width": 202.5, # "web_bg_height": 360, # "web_people_max_width": 202.5, # "web_people_max_height": 360, # "web_people_width": 600, # "web_people_height": 360, # "web_people_x": -198.75, # "web_people_y": 0 # } payload = { "title": title, "anchor_id": avatar_id, "anchor_type": anchor_type, "audioSrc": audio_url, # "resolution": resolution, "color": bg_color_str, # e.g. "rgba(0,255,0,1)" for green screen # "anchor_background_color": bg_color_str, # e.g. "rgba(0,255,0,1)" for green screen # "web_bg_width": width, # "web_bg_height": height, # "web_people_width": width, # "web_people_height": height, # "web_people_x": 0, # "web_people_y": 0, "isSkipRs": True # Fast mode } data = await self._make_request( "POST", "/api/v1/video/generate", json=payload ) task_id = data.get("data", {}).get("_id") if not task_id: raise A2EAPIError("No task ID in response") logger.debug(f"✓ Video generation started. Task ID: {task_id}") # Poll for completion video_result = await self._poll_video_status(task_id) return video_result except Exception as e: logger.error(f"Failed to generate talking video: {e}") raise A2EAPIError(f"Failed to generate talking video: {e}") async def _poll_video_status( self, task_id: str, max_wait: int = 600, poll_interval: int = 5 ) -> Dict[str, Any]: """ Poll video generation status until complete Args: task_id: Video task ID (_id from /api/v1/video/generate) max_wait: Maximum wait time in seconds poll_interval: Polling interval in seconds Returns: Dict with video_url and metadata Raises: A2EAPIError: If video generation fails or times out """ endpoint = "/api/v1/video/awsResult" start_time = time.time() logger.debug(f"Polling video status (max wait: {max_wait}s)...") while time.time() - start_time < max_wait: try: # POST request with _id in body data = await self._make_request( "POST", endpoint, json={"_id": task_id} ) # { # "code": 0, # "data": [ # { # "_id": "691079f48ed2d40047d06575", # "title": "11-09-2025 16:53:47", # "msg": "Hello Ther.", # "result": "", # "process": 18, # "status": "process", # "anchor_type": "0", # "end_success_date": null, # "web_bg_width": 1080, # "web_bg_height": 1920, # "color": "rgba(0,255,0,1)", # "result_cover": "", # "video_cover": "https://prod-tos.avatar2everyone.com/adam2eve/stable/custom_avatar_cover/003e035d-69e8-4ea4-bf12-bf09184a33bb.jpeg", # "anchor_id": "67209ea6a9050edd960698c0", # "createdAt": "2025-11-09T11:24:36.029Z", # "diff_s": 197, # "current_date": "2025-11-09 19:27:53", # "remainingDays": 30, # "expirationDate": "2025-12-09T11:24:36.029Z", # "isExpired": false, # "expirationDays": 30 # } # ], # "trace_id": "bc04c4ec-5660-4931-a588-d952a13a3a58" # } # Response has data as an array data_array = data.get("data", []) if not data_array: logger.warning("Empty response data, retrying...") await asyncio.sleep(poll_interval) continue result_data = data_array[0] # Get first item status = result_data.get("status") process = result_data.get("process", 0) if status == "success": video_url = result_data.get("result") if not video_url: raise A2EAPIError("Video completed but no URL provided") logger.debug(f"✓ Video generation completed: {video_url}") return { "video_url": video_url, "task_id": task_id, "status": status, "process": process, "created_at": result_data.get("createdAt"), "anchor_id": result_data.get("anchor_id") } elif status == "fail": error_msg = result_data.get("msg", "Unknown error") raise A2EAPIError(f"Video generation failed: {error_msg}") else: # Status can be: init, start, pending, process, copy logger.debug(f"⏳ Status: {status} ({process}%)...") await asyncio.sleep(poll_interval) except A2EAPIError: raise except Exception as e: logger.warning(f"Error polling status: {e}") await asyncio.sleep(poll_interval) raise A2EAPIError(f"Video generation timed out after {max_wait} seconds") async def download_video( self, video_url: str, filename: Optional[str] = None ) -> Path: """ Download video to /tmp directory Args: video_url: URL of the video to download filename: Optional custom filename (default: auto-generated) Returns: Path to downloaded file Raises: A2EAPIError: If download fails """ try: if not filename: timestamp = int(time.time()) filename = f"avatar_video_{timestamp}.mp4" filepath = self.tmp_dir / filename logger.debug(f"Downloading video to {filepath}...") async with aiohttp.ClientSession() as session: async with session.get(video_url, timeout=aiohttp.ClientTimeout(total=300)) as response: response.raise_for_status() async with aiofiles.open(filepath, 'wb') as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) file_size = filepath.stat().st_size / (1024 * 1024) # MB logger.debug(f"✓ Video downloaded successfully ({file_size:.2f} MB): {filepath}") return filepath except Exception as e: logger.error(f"Failed to download video: {e}") raise A2EAPIError(f"Failed to download video: {e}") async def get_all_videos(self) -> List[Dict[str, Any]]: """ Get list of all generated videos Returns: List of video objects with metadata """ try: logger.debug("Fetching all generated videos...") data = await self._make_request( "GET", "/api/v1/video/awsList?current=1&pageSize=300", ) logger.debug(data) videos = data.get("data", {}).get("data", []) logger.debug(f"Found {len(videos)} videos") return videos except Exception as e: logger.error(f"Failed to fetch videos: {e}") raise async def delete_video(self, video_id: str, title: str = "") -> bool: """ Delete a single video by ID Args: video_id: Video task ID (_id) title: Video title (for logging) Returns: True if deleted successfully, False otherwise """ try: logger.debug(f"Deleting video: {title or video_id}") await self._make_request( "DELETE", f"/api/v1/video/{video_id}", ) logger.debug(f"✓ Successfully deleted: {title or video_id}") return True except Exception as e: logger.error(f"✗ Failed to delete {title or video_id}: {e}") return False async def delete_all_videos(self) -> Dict[str, int]: """ Delete all generated avatar videos Args: confirm: Must be True to actually delete (safety check) Returns: Dict with deletion statistics """ logger.debug("=" * 60) logger.debug("DELETING ALL AVATAR VIDEOS") logger.debug("=" * 60) try: # Get all videos videos = await self.get_all_videos() if not videos: logger.debug("No videos to delete") return {"total": 0, "deleted": 0, "failed": 0} logger.debug(f"\nPreparing to delete {len(videos)} videos...") # Delete each video deleted_count = 0 failed_count = 0 for i, video in enumerate(videos, 1): video_id = video.get("_id") title = video.get("title", "Unknown") status = video.get("status", "Unknown") logger.debug(f"\n[{i}/{len(videos)}] Processing: {video_id} {title} (Status: {status})") if video_id: success = await self.delete_video(video_id, title) if success: deleted_count += 1 else: failed_count += 1 # Small delay to avoid rate limiting await asyncio.sleep(0.5) else: logger.warning(f"Skipping video without ID: {title}") failed_count += 1 # Summary logger.debug("\n" + "=" * 60) logger.debug("DELETION SUMMARY") logger.debug("=" * 60) logger.debug(f"Total videos: {len(videos)}") logger.debug(f"✓ Successfully deleted: {deleted_count}") logger.debug(f"✗ Failed: {failed_count}") logger.debug("=" * 60) exit() return { "total": len(videos), "deleted": deleted_count, "failed": failed_count } except Exception as e: logger.error(f"Failed to delete all videos: {e}") raise async def create_greenscreen_video_workflow( api_key: str, tts_script: str, image_prompt: str, output_filename: Optional[str] = None, preferred_gender: str = "female", preferred_language: str = "en-US", speed_rate: float = 1.2, title="Green Screen Avatar Video" ) -> Path: """ Complete workflow to create a green screen talking video Args: api_key: A2E API key tts_script: Text for avatar to speak output_filename: Optional output filename preferred_gender: Preferred avatar/voice gender preferred_language: Preferred voice language speed_rate: Speech speed multiplier Returns: Path to downloaded video file """ logger.debug("=" * 60) logger.debug("GREEN SCREEN TALKING VIDEO WORKFLOW") logger.debug("=" * 60) generator = TalkingVideoGenerator(a2e_api_key=api_key) try: # Step 1: Get avatars with background support logger.debug("\n[STEP 1] Fetching avatars with background support...") if get_config_value("delete_all_a2e_videos", False): await generator.delete_all_videos() avatars = await generator.get_avatars_with_bg_support() if not avatars: raise A2EAPIError("No avatars with background support found") # Step 2: Get available voices logger.debug("\n[STEP 2] Fetching available voices...") voices = [] voices.extend(await generator.get_available_voices( country="en", region="US" )) voices.extend(await generator.get_available_voices( country="en", region="AU" )) voices.extend(await generator.get_available_voices( country="en", region="GB" )) if not voices: raise A2EAPIError("No voices available") # Step 3: Select avatar and voice logger.debug("\n[STEP 3] Selecting avatar and voice...") avatar_id, voice_id = generator.select_voice_n_avatar( avatars=avatars, voices=voices, tts_script=tts_script, image_prompt=image_prompt ) if not avatar_id or not voice_id: raise A2EAPIError("Failed to select avatar or voice") # Step 4: Generate TTS audio logger.debug("\n[STEP 4] Generating TTS audio...") audio_url = await generator.generate_tts_audio( tts_script=tts_script, voice_id=voice_id, voices=voices, speed_rate=speed_rate, country=preferred_language.split('-')[0] if '-' in preferred_language else "en", region=preferred_language.split('-')[1] if '-' in preferred_language else "US" ) # Step 5: Generate talking video with green screen logger.debug("\n[STEP 5] Generating talking video with green screen...") video_result = await generator.generate_talking_video( avatar_id=avatar_id, audio_url=audio_url, title=title ) video_url = video_result.get("video_url") if not video_url: raise A2EAPIError("No video URL in result") logger.debug("\n") logger.debug("=" * 60) logger.debug("SUCCESS!") logger.debug("=" * 60) return audio_url, video_url except Exception as e: logger.error(f"✗ Workflow failed: {e}") raise # Example usage if __name__ == "__main__": # Script for the avatar to speak SCRIPT = """ Hello and welcome! This is a demonstration of AI-powered avatar video generation with green screen background for easy video overlay and compositing. """ # Run workflow try: audio_url, video_url = asyncio.run( create_greenscreen_video_workflow( api_key=get_config_value("a2e_api_key"), tts_script=SCRIPT, output_filename="avatar_greenscreen.mp4", preferred_gender="female", preferred_language="en-US", speed_rate=1.2 ) ) print(f"\n✓ Video ready at: {video_url}") except Exception as e: print(f"\n✗ Error: {e}") import traceback traceback.print_exc()