Tools / src /a2e_avatar.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
import asyncio
import aiohttp
import aiofiles
import requests
from typing import Optional, Dict, Any, List, Tuple
from pathlib import Path
import time
from dataclasses import dataclass
from enum import Enum
import os
from src.logger_config import logger
from google_src import ai_studio_sdk
import json_repair
from src.config import get_config_value, set_config_value
from moviepy.editor import AudioFileClip
import uuid
import json
class VideoStatus(Enum):
"""Video generation status constants"""
INIT = "init"
START = "start"
PENDING = "pending"
PROCESS = "process"
COPY = "copy"
SUCCESS = "success"
FAIL = "fail"
@dataclass
class AvatarInfo:
"""Avatar information container"""
avatar_id: str
name: str
gender: str
supports_bg_removal: bool
background_color: Optional[str] = None
type: str = "custom"
@dataclass
class VoiceInfo:
"""Voice information container"""
voice_id: str
name: str
language: str
gender: str
type: str = "public"
class A2EAPIError(Exception):
"""Custom exception for A2E API errors"""
pass
class TalkingVideoGenerator:
"""
Production-ready talking video generator using A2E.ai APIs
Supports green screen background for video overlay workflows
"""
def __init__(
self,
a2e_api_key: str,
base_url: str = "https://video.a2e.ai",
timeout: int = 100,
max_retries: int = 3,
):
"""
Initialize the A2E video generator
Args:
a2e_api_key: A2E API authentication key
base_url: Base URL for A2E API
timeout: Request timeout in seconds
max_retries: Maximum retry attempts for failed requests
"""
self.api_key = a2e_api_key
self.base_url = base_url
self.timeout = timeout
self.max_retries = max_retries
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"x-lang": "en-US"
}
# Ensure tmp directory exists
self.tmp_dir = Path("/tmp")
self.tmp_dir.mkdir(exist_ok=True)
logger.debug("TalkingVideoGenerator initialized")
async def _make_request(
self,
method: str,
endpoint: str,
**kwargs
) -> Dict[str, Any]:
"""
Make HTTP request with retry logic
Args:
method: HTTP method (GET, POST, DELETE)
endpoint: API endpoint
**kwargs: Additional request parameters
Returns:
Response JSON data
Raises:
A2EAPIError: If request fails after retries
"""
url = f"{self.base_url}{endpoint}"
for attempt in range(self.max_retries):
try:
async with aiohttp.ClientSession() as session:
# Log request details for debugging
if kwargs.get('json'):
logger.debug(f"Request to {url}")
logger.debug(f"Payload: {kwargs['json']}")
async with session.request(
method=method,
url=url,
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=self.timeout),
**kwargs
) as response:
# Get response text for better error messages
response_text = await response.text()
if response.status >= 400:
logger.error(f"HTTP {response.status} Error")
logger.error(f"Response: {response_text}")
# Try to parse JSON error
try:
error_data = await response.json()
error_msg = error_data.get("msg", response_text)
except:
error_msg = response_text
raise A2EAPIError(f"HTTP {response.status}: {error_msg}")
data = await response.json()
if data.get("code") != 0:
error_msg = data.get("msg", "Unknown error")
logger.error(f"API error: {error_msg}")
raise A2EAPIError(f"API error: {error_msg}")
return data
except aiohttp.ClientError as e:
logger.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt == self.max_retries - 1:
raise A2EAPIError(f"Request failed after {self.max_retries} attempts: {e}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
except A2EAPIError:
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise A2EAPIError(f"Unexpected error: {e}")
def _make_sync_request(
self,
method: str,
endpoint: str,
**kwargs
) -> Dict[str, Any]:
"""
Make synchronous HTTP request with retry logic
Args:
method: HTTP method (GET, POST, DELETE)
endpoint: API endpoint
**kwargs: Additional request parameters
Returns:
Response JSON data
Raises:
A2EAPIError: If request fails after retries
"""
url = f"{self.base_url}{endpoint}"
for attempt in range(self.max_retries):
try:
response = requests.request(
method=method,
url=url,
headers=self.headers,
timeout=self.timeout,
**kwargs
)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
error_msg = data.get("msg", "Unknown error")
raise A2EAPIError(f"API error: {error_msg}")
return data
except requests.RequestException as e:
logger.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt == self.max_retries - 1:
raise A2EAPIError(f"Request failed after {self.max_retries} attempts: {e}")
time.sleep(2 ** attempt) # Exponential backoff
async def get_avatars_with_bg_support(self) -> List[AvatarInfo]:
"""
Get list of avatars that support background removal/replacement
Returns:
List of AvatarInfo objects with background support
"""
try:
if get_config_value("test_automation"):
return [AvatarInfo(
avatar_id="6271fa09438162bd7e59b8e5",
name="Sara",
gender="female",
supports_bg_removal=True,
background_color="rbg(0,255,0,1)",
type="custom"
)]
logger.debug("Fetching avatars with background support...")
# Get all custom avatars
data = await self._make_request(
"GET",
"/api/v1/anchor/character_list"
)
avatars = []
for avatar_data in data.get("data", []):
# Check if avatar supports background manipulation
# Avatars with background_color or background_img support BG manipulation
# Must check for non-empty strings, not just None
bg_color = avatar_data.get("background_color", "")
bg_img = avatar_data.get("background_img", "")
has_bg_support = bool(bg_color and bg_color.strip()) or bool(bg_img and bg_img.strip())
if has_bg_support and avatar_data.get("name", "Unknown") in [
"Mimi", "Katherine", "Samantha", "Emily", "Nova", "Grace", "Julia", "Lijun Deng", "Maneli #2", "Jimmy"
]:
avatar = AvatarInfo(
avatar_id=avatar_data.get("_id"),
name=avatar_data.get("name", "Unknown"),
gender=avatar_data.get("gender", "female"),
supports_bg_removal=True,
background_color=avatar_data.get("background_color"),
type=avatar_data.get("type", "custom")
)
avatars.append(avatar)
logger.debug(f"Total avatars with BG support: {len(avatars)}")
return avatars
except Exception as e:
logger.error(f"Failed to fetch avatars: {e}")
raise A2EAPIError(f"Failed to fetch avatars: {e}")
async def get_available_voices(
self,
country: Optional[str] = None,
region: Optional[str] = None,
voice_map_type: str = "en-US"
) -> List[VoiceInfo]:
"""
Get list of available TTS voices
Args:
country: Country code (e.g., "en")
region: Region code (e.g., "US")
voice_map_type: Language for labels ("en-US" or "zh-CN")
Returns:
List of VoiceInfo objects
"""
try:
if get_config_value("test_automation"):
return [VoiceInfo(
voice_id="669b7015ba04042f4090b288",
name="Jessica",
language="en-US",
gender="female",
type="public"
)]
logger.debug("Fetching available voices...")
params = {"voice_map_type": voice_map_type}
if country:
params["country"] = country
if region:
params["region"] = region
data = await self._make_request(
"GET",
"/api/v1/anchor/voice_list",
params=params
)
voices = []
disallowed = ["ASMR", "John Doe - Intimate", "Ava Multilingual", "ShanShan Chinese Accent", "august", "Emma Multilingual", "WF Texas NPR Storyteller", "Andrew Multilingual", "Alice", "Jessica"]
for gender_group in data.get("data", []):
gender = gender_group.get("value", "unknown")
for voice_data in gender_group.get("children", []):
label = voice_data.get("label", "Unknown").strip()
if not any(bad_word.lower() in label.lower() for bad_word in disallowed):
voice = VoiceInfo(
voice_id=voice_data.get("value"),
name=voice_data.get("label", "Unknown"),
language=f"{country or 'en'}-{region or 'US'}",
gender=gender,
type="public"
)
voices.append(voice)
logger.debug(f"Total voices available: {len(voices)}")
return voices
except Exception as e:
logger.error(f"Failed to fetch voices: {e}")
raise A2EAPIError(f"Failed to fetch voices: {e}")
def select_voice_n_avatar(
self,
avatars: List[AvatarInfo],
voices: List[VoiceInfo],
tts_script: str,
image_prompt: str
) -> Tuple[Optional[str], Optional[str]]:
logger.debug(f"Selecting avatar and voice for image_prompt: {image_prompt}")
with open("src/prompt/avatar_n_voice_selection_a2e.md", "r", encoding="utf-8") as file:
system_prompt = file.read()
if get_config_value("test_automation"):
available_voices = ['Aria - Sexy Female Villain Voice', 'ASMR Dr Lovejoy', 'august', 'Ava Multilingual', 'Emma Multilingual', 'Jessica', 'Laura', 'Natasha - Sensual Hypnosis', 'Nia African', 'SA Brown African', 'Sarah', 'Shannon ASMR', 'ShanShan Chinese Accent', 'Stacy Chinese Accent', 'Andrew Multilingual', 'Brian', 'Brian Multilingual', 'Callum', 'Charlie', 'Chris', 'Damon', 'Daniel', 'Eric', 'Jamal (African American)', 'John Doe - Intimate', 'Josh', 'Liam', 'neuris', 'Road Dawg African', 'Timmy', 'Tony', 'WF Texas NPR Storyteller', 'Will']
available_avatar = ['10-19-2025 11:59:41', '2025-09-17 16:34:53', '08-24-2025 07:19:17', 'Mimi', 'Harper', 'Nick2', 'Victoria', 'Margaret', 'Katherine', 'Elizabeth', 'Emma', 'Lily', 'Mia', 'Lauren', 'Samantha', 'Emily', 'Nova', 'Grace', 'Alex', 'Taylor', 'Alexandra', 'Jessica', 'Ava', 'Ella', 'Julia', 'Mido', 'Kieran', 'Summer', 'Brooke', 'River', 'Skye', 'Autumn', 'Holly', 'Mr.P', 'Ashley', 'Xiaomei', 'Xiaoai', 'Na Li (Frontal View)', 'Na Li (side view)', 'Albert Einstein', 'Lijun Deng', 'Ella', 'Ruoxi', 'Chenyu', 'Jingyi', 'Yahan', 'Shiya', 'Ruotong', 'Muqing', 'Wanqing', 'Jingshu', 'Yutong', 'Zhiyan', 'Feiyili', 'Shanxichen', 'Tainannan', 'YIshuhui', 'Huagulan', 'Jiaxuan', 'Qinlan', 'Qingwan', 'Luyao', 'Ximeng', 'Zixuan', 'Yuhan', 'Phoenix', 'Yanxi', 'Chuxuan', 'Xiyuan', 'Shinuo', 'Xueyan', 'Chenxi', 'Lingxiao', 'Qiya', 'Ruolan', 'Yuning', 'Xingyao', 'Muqing', 'Xiaoran', 'Shutong', 'Luoyi', 'Ximeng', 'Yuxi', 'Yilian', 'LiuJun', 'Li Shi Zhen', 'Jimmy', 'Maneli #2', 'Maneli #1', 'Elsa', 'Jenny', 'Amber', 'Sara']
available_avatar = [{
"name": ava,
"Usage Count": 0
} for ava in available_avatar]
else:
available_voices = [voc.name for voc in voices]
available_avatar = [{
"name": ava.name,
"Usage Count": get_config_value("avatar_usage_count").get(ava.avatar_id, 0)
} for ava in avatars]
model_input = f"""SYSTEM INSTRUCTION::
{system_prompt}
USER PROMPT:
TTS Script: {tts_script}
Image Prompt: {image_prompt}
Available Voices: {available_voices}
Available Avatars with Usage Count: {available_avatar}
"""
response = ai_studio_sdk.generate(model_input)
response_text = response.strip()
selection = json_repair.loads(response_text)
# Select avatar
selected_avatar = None
for avatar in avatars:
if avatar.name.lower() == selection["selected_avatar"].lower():
selected_avatar = avatar
break
if not selected_avatar and avatars:
selected_avatar = avatars[0] # Fallback to first available
# Select voice
selected_voice = None
for voice in voices:
if voice.name.lower() == selection["selected_voice"].lower():
selected_voice = voice
break
if not selected_voice and voices:
selected_voice = voices[0] # Fallback to first available
avatar_usage = get_config_value("avatar_usage_count", {})
avatar_usage[selected_avatar.avatar_id] = avatar_usage.get(selected_avatar.avatar_id, 0) + 1
set_config_value("avatar_usage_count", avatar_usage)
avatar_id = selected_avatar.avatar_id if selected_avatar else None
voice_id = selected_voice.voice_id if selected_voice else None
if avatar_id and voice_id:
logger.debug(f"Selected avatar: {selected_avatar.name} (ID: {avatar_id})")
logger.debug(f"Selected voice: {selected_voice.name} (ID: {voice_id})")
else:
logger.warning("Could not select appropriate avatar or voice")
return avatar_id, voice_id
async def _get_audio_duration(self, audio_url: str) -> float:
"""
Download the audio temporarily and measure duration using AudioFileClip.
"""
import tempfile
async with aiohttp.ClientSession() as session:
async with session.get(audio_url) as resp:
if resp.status != 200:
raise A2EAPIError(f"Failed to fetch audio: {resp.status}")
audio_bytes = await resp.read()
# Save temporarily to disk since AudioFileClip requires a file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file:
tmp_file.write(audio_bytes)
tmp_path = tmp_file.name
def get_duration():
with AudioFileClip(tmp_path) as clip:
return clip.duration
# Run blocking code in a thread
duration = await asyncio.to_thread(get_duration)
# Clean up file
os.remove(tmp_path)
return duration
def _get_voice_language_info(self, voice_id: str, voices: List[VoiceInfo]) -> Tuple[str, str]:
"""
Get country and region from voice_id by looking up in voices list.
Falls back to en-US if not found.
"""
for voice in voices:
if voice.voice_id == voice_id:
# Parse language string like "en-US" -> ("en", "US")
parts = voice.language.split('-')
if len(parts) == 2:
return parts[0], parts[1]
return "en", "US" # Fallback
# If voice not found, default to en-US
logger.warning(f"Voice ID {voice_id} not found in voices list, defaulting to en-US")
return "en", "US"
async def generate_tts_audio(
self,
tts_script: str,
voice_id: str,
voices: List[VoiceInfo],
speed_rate: float = 1.2,
country: str = "en",
region: str = "US",
min_duration: float = 11.0
) -> str:
"""
Generate TTS audio. If audio < min_duration, retry with speed_rate=1.0.
"""
try:
if get_config_value("test_automation"):
return "testData/Green Screen Avatar Video3.mp3"
# --- First attempt ---
logger.debug(f"Generating TTS audio (speed: {speed_rate}x)...")
country, region = self._get_voice_language_info(voice_id, voices)
logger.debug(country)
logger.debug(region)
payload = {
"msg": tts_script,
"tts_id": voice_id,
"speechRate": speed_rate,
"country": country,
"region": region
}
data = await self._make_request(
"POST",
"/api/v1/video/send_tts",
json=payload
)
audio_url = data.get("data", "")
if not audio_url:
raise A2EAPIError("No audio URL in response")
# Measure duration
duration = await self._get_audio_duration(audio_url)
logger.debug(f"Speech duration: {duration:.2f}s")
# Retry if too short
if duration < min_duration and speed_rate > 1.0:
logger.warning(
f"Audio too short ({duration:.2f}s < {min_duration}s). Retrying with slower rate..."
)
return await self.generate_tts_audio(
tts_script,
voice_id,
voices,
speed_rate=1.0,
country=country,
region=region,
min_duration=min_duration
)
if duration > 15 or duration < 10:
raise A2EAPIError(f"Audio is longer:({duration}) for the voice: {voice_id}")
logger.debug(f"✓ Final TTS audio generated ({duration:.2f}s): {audio_url}")
return audio_url
except Exception as e:
logger.error(f"Failed to generate TTS audio: {e}")
raise A2EAPIError(f"Failed to generate TTS audio: {e}")
async def generate_talking_video(
self,
avatar_id: str,
audio_url: str,
title: str = "AI Avatar Video",
anchor_type: int = 0,
resolution: int = 1080,
background_color: Tuple[int, int, int] = (0, 255, 0),
aspect_ratio: str = "9:16"
) -> Dict[str, Any]:
"""
Generate talking video with green screen background
Args:
avatar_id: Avatar ID from get_avatars_with_bg_support()
audio_url: URL of audio file
title: Video title
anchor_type: 0=system, 1=custom
resolution: Video resolution (720, 1080)
background_color: RGB tuple for background (default: green screen)
aspect_ratio: Video aspect ratio ("9:16", "16:9", etc.)
Returns:
Dict with video_url, task_id, and other details
Raises:
A2EAPIError: If video generation fails
"""
try:
if get_config_value("test_automation"):
return {
"video_url": "testData/Green Screen Avatar Video3.mp4"
}
logger.debug(f"Generating talking video with green screen background...")
# Calculate dimensions based on aspect ratio
if aspect_ratio == "9:16":
if resolution == 1080:
width, height = 1080, 1920
else: # 720
width, height = 405, 720
elif aspect_ratio == "16:9":
if resolution == 1080:
width, height = 1920, 1080
else: # 720
width, height = 1280, 720
else:
# Default to square
width = height = resolution
# Convert RGB tuple to rgba() format string (alpha = 1 for fully opaque)
bg_color_str = f"rgba({background_color[0]},{background_color[1]},{background_color[2]},1)"
# {
# "title": "11-09-2025 16:53:47",
# "isSkipRs": true,
# "isAllowReverse": true,
# "resolution": 1080,
# "isSubtitleEnabled": false,
# "anchor_id": "67209ea6a9050edd960698c0",
# "anchor_type": 0,
# "msg": "Hello Ther.",
# "tts_id": "66ca504f2732c24634224075",
# "speech_rate": 1.2,
# "audioSrc": "https://1day-tos.a2e.com.cn/tts3party_cache/stable/el/Aria/speed/a15fe669931864094b03fe17b7b2067d.wav",
# "color": "rgba(0,255,0,1)",
# "web_bg_width": 202.5,
# "web_bg_height": 360,
# "web_people_max_width": 202.5,
# "web_people_max_height": 360,
# "web_people_width": 600,
# "web_people_height": 360,
# "web_people_x": -198.75,
# "web_people_y": 0
# }
payload = {
"title": title,
"anchor_id": avatar_id,
"anchor_type": anchor_type,
"audioSrc": audio_url,
# "resolution": resolution,
"color": bg_color_str, # e.g. "rgba(0,255,0,1)" for green screen
# "anchor_background_color": bg_color_str, # e.g. "rgba(0,255,0,1)" for green screen
# "web_bg_width": width,
# "web_bg_height": height,
# "web_people_width": width,
# "web_people_height": height,
# "web_people_x": 0,
# "web_people_y": 0,
"isSkipRs": True # Fast mode
}
data = await self._make_request(
"POST",
"/api/v1/video/generate",
json=payload
)
task_id = data.get("data", {}).get("_id")
if not task_id:
raise A2EAPIError("No task ID in response")
logger.debug(f"✓ Video generation started. Task ID: {task_id}")
# Poll for completion
video_result = await self._poll_video_status(task_id)
return video_result
except Exception as e:
logger.error(f"Failed to generate talking video: {e}")
raise A2EAPIError(f"Failed to generate talking video: {e}")
async def _poll_video_status(
self,
task_id: str,
max_wait: int = 600,
poll_interval: int = 5
) -> Dict[str, Any]:
"""
Poll video generation status until complete
Args:
task_id: Video task ID (_id from /api/v1/video/generate)
max_wait: Maximum wait time in seconds
poll_interval: Polling interval in seconds
Returns:
Dict with video_url and metadata
Raises:
A2EAPIError: If video generation fails or times out
"""
endpoint = "/api/v1/video/awsResult"
start_time = time.time()
logger.debug(f"Polling video status (max wait: {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
# POST request with _id in body
data = await self._make_request(
"POST",
endpoint,
json={"_id": task_id}
)
# {
# "code": 0,
# "data": [
# {
# "_id": "691079f48ed2d40047d06575",
# "title": "11-09-2025 16:53:47",
# "msg": "Hello Ther.",
# "result": "",
# "process": 18,
# "status": "process",
# "anchor_type": "0",
# "end_success_date": null,
# "web_bg_width": 1080,
# "web_bg_height": 1920,
# "color": "rgba(0,255,0,1)",
# "result_cover": "",
# "video_cover": "https://prod-tos.avatar2everyone.com/adam2eve/stable/custom_avatar_cover/003e035d-69e8-4ea4-bf12-bf09184a33bb.jpeg",
# "anchor_id": "67209ea6a9050edd960698c0",
# "createdAt": "2025-11-09T11:24:36.029Z",
# "diff_s": 197,
# "current_date": "2025-11-09 19:27:53",
# "remainingDays": 30,
# "expirationDate": "2025-12-09T11:24:36.029Z",
# "isExpired": false,
# "expirationDays": 30
# }
# ],
# "trace_id": "bc04c4ec-5660-4931-a588-d952a13a3a58"
# }
# Response has data as an array
data_array = data.get("data", [])
if not data_array:
logger.warning("Empty response data, retrying...")
await asyncio.sleep(poll_interval)
continue
result_data = data_array[0] # Get first item
status = result_data.get("status")
process = result_data.get("process", 0)
if status == "success":
video_url = result_data.get("result")
if not video_url:
raise A2EAPIError("Video completed but no URL provided")
logger.debug(f"✓ Video generation completed: {video_url}")
return {
"video_url": video_url,
"task_id": task_id,
"status": status,
"process": process,
"created_at": result_data.get("createdAt"),
"anchor_id": result_data.get("anchor_id")
}
elif status == "fail":
error_msg = result_data.get("msg", "Unknown error")
raise A2EAPIError(f"Video generation failed: {error_msg}")
else:
# Status can be: init, start, pending, process, copy
logger.debug(f"⏳ Status: {status} ({process}%)...")
await asyncio.sleep(poll_interval)
except A2EAPIError:
raise
except Exception as e:
logger.warning(f"Error polling status: {e}")
await asyncio.sleep(poll_interval)
raise A2EAPIError(f"Video generation timed out after {max_wait} seconds")
async def download_video(
self,
video_url: str,
filename: Optional[str] = None
) -> Path:
"""
Download video to /tmp directory
Args:
video_url: URL of the video to download
filename: Optional custom filename (default: auto-generated)
Returns:
Path to downloaded file
Raises:
A2EAPIError: If download fails
"""
try:
if not filename:
timestamp = int(time.time())
filename = f"avatar_video_{timestamp}.mp4"
filepath = self.tmp_dir / filename
logger.debug(f"Downloading video to {filepath}...")
async with aiohttp.ClientSession() as session:
async with session.get(video_url, timeout=aiohttp.ClientTimeout(total=300)) as response:
response.raise_for_status()
async with aiofiles.open(filepath, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
file_size = filepath.stat().st_size / (1024 * 1024) # MB
logger.debug(f"✓ Video downloaded successfully ({file_size:.2f} MB): {filepath}")
return filepath
except Exception as e:
logger.error(f"Failed to download video: {e}")
raise A2EAPIError(f"Failed to download video: {e}")
async def get_all_videos(self) -> List[Dict[str, Any]]:
"""
Get list of all generated videos
Returns:
List of video objects with metadata
"""
try:
logger.debug("Fetching all generated videos...")
data = await self._make_request(
"GET",
"/api/v1/video/awsList?current=1&pageSize=300",
)
logger.debug(data)
videos = data.get("data", {}).get("data", [])
logger.debug(f"Found {len(videos)} videos")
return videos
except Exception as e:
logger.error(f"Failed to fetch videos: {e}")
raise
async def delete_video(self, video_id: str, title: str = "") -> bool:
"""
Delete a single video by ID
Args:
video_id: Video task ID (_id)
title: Video title (for logging)
Returns:
True if deleted successfully, False otherwise
"""
try:
logger.debug(f"Deleting video: {title or video_id}")
await self._make_request(
"DELETE",
f"/api/v1/video/{video_id}",
)
logger.debug(f"✓ Successfully deleted: {title or video_id}")
return True
except Exception as e:
logger.error(f"✗ Failed to delete {title or video_id}: {e}")
return False
async def delete_all_videos(self) -> Dict[str, int]:
"""
Delete all generated avatar videos
Args:
confirm: Must be True to actually delete (safety check)
Returns:
Dict with deletion statistics
"""
logger.debug("=" * 60)
logger.debug("DELETING ALL AVATAR VIDEOS")
logger.debug("=" * 60)
try:
# Get all videos
videos = await self.get_all_videos()
if not videos:
logger.debug("No videos to delete")
return {"total": 0, "deleted": 0, "failed": 0}
logger.debug(f"\nPreparing to delete {len(videos)} videos...")
# Delete each video
deleted_count = 0
failed_count = 0
for i, video in enumerate(videos, 1):
video_id = video.get("_id")
title = video.get("title", "Unknown")
status = video.get("status", "Unknown")
logger.debug(f"\n[{i}/{len(videos)}] Processing: {video_id} {title} (Status: {status})")
if video_id:
success = await self.delete_video(video_id, title)
if success:
deleted_count += 1
else:
failed_count += 1
# Small delay to avoid rate limiting
await asyncio.sleep(0.5)
else:
logger.warning(f"Skipping video without ID: {title}")
failed_count += 1
# Summary
logger.debug("\n" + "=" * 60)
logger.debug("DELETION SUMMARY")
logger.debug("=" * 60)
logger.debug(f"Total videos: {len(videos)}")
logger.debug(f"✓ Successfully deleted: {deleted_count}")
logger.debug(f"✗ Failed: {failed_count}")
logger.debug("=" * 60)
exit()
return {
"total": len(videos),
"deleted": deleted_count,
"failed": failed_count
}
except Exception as e:
logger.error(f"Failed to delete all videos: {e}")
raise
async def create_greenscreen_video_workflow(
api_key: str,
tts_script: str,
image_prompt: str,
output_filename: Optional[str] = None,
preferred_gender: str = "female",
preferred_language: str = "en-US",
speed_rate: float = 1.2,
title="Green Screen Avatar Video"
) -> Path:
"""
Complete workflow to create a green screen talking video
Args:
api_key: A2E API key
tts_script: Text for avatar to speak
output_filename: Optional output filename
preferred_gender: Preferred avatar/voice gender
preferred_language: Preferred voice language
speed_rate: Speech speed multiplier
Returns:
Path to downloaded video file
"""
logger.debug("=" * 60)
logger.debug("GREEN SCREEN TALKING VIDEO WORKFLOW")
logger.debug("=" * 60)
generator = TalkingVideoGenerator(a2e_api_key=api_key)
try:
# Step 1: Get avatars with background support
logger.debug("\n[STEP 1] Fetching avatars with background support...")
if get_config_value("delete_all_a2e_videos", False):
await generator.delete_all_videos()
avatars = await generator.get_avatars_with_bg_support()
if not avatars:
raise A2EAPIError("No avatars with background support found")
# Step 2: Get available voices
logger.debug("\n[STEP 2] Fetching available voices...")
voices = []
voices.extend(await generator.get_available_voices(
country="en",
region="US"
))
voices.extend(await generator.get_available_voices(
country="en",
region="AU"
))
voices.extend(await generator.get_available_voices(
country="en",
region="GB"
))
if not voices:
raise A2EAPIError("No voices available")
# Step 3: Select avatar and voice
logger.debug("\n[STEP 3] Selecting avatar and voice...")
avatar_id, voice_id = generator.select_voice_n_avatar(
avatars=avatars,
voices=voices,
tts_script=tts_script,
image_prompt=image_prompt
)
if not avatar_id or not voice_id:
raise A2EAPIError("Failed to select avatar or voice")
# Step 4: Generate TTS audio
logger.debug("\n[STEP 4] Generating TTS audio...")
audio_url = await generator.generate_tts_audio(
tts_script=tts_script,
voice_id=voice_id,
voices=voices,
speed_rate=speed_rate,
country=preferred_language.split('-')[0] if '-' in preferred_language else "en",
region=preferred_language.split('-')[1] if '-' in preferred_language else "US"
)
# Step 5: Generate talking video with green screen
logger.debug("\n[STEP 5] Generating talking video with green screen...")
video_result = await generator.generate_talking_video(
avatar_id=avatar_id,
audio_url=audio_url,
title=title
)
video_url = video_result.get("video_url")
if not video_url:
raise A2EAPIError("No video URL in result")
logger.debug("\n")
logger.debug("=" * 60)
logger.debug("SUCCESS!")
logger.debug("=" * 60)
return audio_url, video_url
except Exception as e:
logger.error(f"✗ Workflow failed: {e}")
raise
# Example usage
if __name__ == "__main__":
# Script for the avatar to speak
SCRIPT = """
Hello and welcome! This is a demonstration of AI-powered avatar video generation
with green screen background for easy video overlay and compositing.
"""
# Run workflow
try:
audio_url, video_url = asyncio.run(
create_greenscreen_video_workflow(
api_key=get_config_value("a2e_api_key"),
tts_script=SCRIPT,
output_filename="avatar_greenscreen.mp4",
preferred_gender="female",
preferred_language="en-US",
speed_rate=1.2
)
)
print(f"\n✓ Video ready at: {video_url}")
except Exception as e:
print(f"\n✗ Error: {e}")
import traceback
traceback.print_exc()