Spaces:
Sleeping
Sleeping
| """ | |
| External API client — generic adapter for third-party video/TTS endpoints. | |
| Supports: | |
| - Replicate (video generation) | |
| - ElevenLabs (TTS) | |
| - OpenAI (LLM fallback / TTS fallback) | |
| Set env vars to enable each service. | |
| """ | |
| import os | |
| import asyncio | |
| import aiohttp | |
| from typing import Optional | |
| class ReplicateClient: | |
| """ | |
| Replicate.com async client. | |
| Used as primary image-to-video source (e.g. stable-video-diffusion). | |
| """ | |
| API_URL = "https://api.replicate.com/v1" | |
| API_KEY = os.getenv("REPLICATE_API_KEY", "") | |
| MODEL_VERSION = os.getenv( | |
| "REPLICATE_I2V_VERSION", | |
| "stability-ai/stable-video-diffusion:3f0457e4619daac51203dedb472816fd4af51f3149fa7a9e0b5ffcf1b8172438", | |
| ) | |
| def __init__(self, timeout: int = 180): | |
| self.timeout = aiohttp.ClientTimeout(total=timeout) | |
| async def image_to_video(self, image_url: str) -> bytes: | |
| """ | |
| Submit an image-to-video prediction and poll until complete. | |
| Returns raw video bytes. | |
| """ | |
| headers = { | |
| "Authorization": f"Token {self.API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| async with aiohttp.ClientSession(timeout=self.timeout) as session: | |
| # Create prediction | |
| async with session.post( | |
| f"{self.API_URL}/predictions", | |
| json={"version": self.MODEL_VERSION, "input": {"image": image_url}}, | |
| headers=headers, | |
| ) as resp: | |
| resp.raise_for_status() | |
| prediction = await resp.json() | |
| pred_id = prediction["id"] | |
| # Poll | |
| for _ in range(60): | |
| await asyncio.sleep(3) | |
| async with session.get( | |
| f"{self.API_URL}/predictions/{pred_id}", | |
| headers=headers, | |
| ) as resp: | |
| resp.raise_for_status() | |
| pred = await resp.json() | |
| if pred["status"] == "succeeded": | |
| video_url = pred["output"] | |
| break | |
| if pred["status"] in ("failed", "canceled"): | |
| raise RuntimeError(f"Replicate prediction failed: {pred.get('error')}") | |
| else: | |
| raise RuntimeError("Replicate prediction timed out.") | |
| # Download video | |
| async with session.get(video_url) as resp: | |
| resp.raise_for_status() | |
| return await resp.read() | |
| class ElevenLabsClient: | |
| """ElevenLabs TTS async client.""" | |
| API_URL = "https://api.elevenlabs.io/v1" | |
| API_KEY = os.getenv("ELEVENLABS_API_KEY", "") | |
| VOICE_ID = os.getenv("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") # Rachel | |
| def __init__(self, timeout: int = 60): | |
| self.timeout = aiohttp.ClientTimeout(total=timeout) | |
| async def tts(self, text: str, model: str = "eleven_turbo_v2") -> bytes: | |
| """Returns MP3 audio bytes.""" | |
| headers = { | |
| "xi-api-key": self.API_KEY, | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "text": text, | |
| "model_id": model, | |
| "voice_settings": {"stability": 0.5, "similarity_boost": 0.75}, | |
| } | |
| async with aiohttp.ClientSession(timeout=self.timeout) as session: | |
| async with session.post( | |
| f"{self.API_URL}/text-to-speech/{self.VOICE_ID}", | |
| json=payload, | |
| headers=headers, | |
| ) as resp: | |
| resp.raise_for_status() | |
| return await resp.read() | |
| class OpenAIClient: | |
| """OpenAI fallback client for LLM and TTS.""" | |
| API_URL = "https://api.openai.com/v1" | |
| API_KEY = os.getenv("OPENAI_API_KEY", "") | |
| def __init__(self, timeout: int = 60): | |
| self.timeout = aiohttp.ClientTimeout(total=timeout) | |
| def _headers(self): | |
| return { | |
| "Authorization": f"Bearer {self.API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| async def chat(self, system_prompt: str, user_prompt: str, model: str = "gpt-4o-mini") -> str: | |
| """Returns assistant reply text.""" | |
| payload = { | |
| "model": model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| "temperature": 0.7, | |
| } | |
| async with aiohttp.ClientSession(timeout=self.timeout) as session: | |
| async with session.post( | |
| f"{self.API_URL}/chat/completions", | |
| json=payload, | |
| headers=self._headers(), | |
| ) as resp: | |
| resp.raise_for_status() | |
| data = await resp.json() | |
| return data["choices"][0]["message"]["content"] | |
| async def tts(self, text: str, voice: str = "alloy") -> bytes: | |
| """Returns MP3 bytes via OpenAI TTS.""" | |
| payload = {"model": "tts-1", "input": text, "voice": voice} | |
| async with aiohttp.ClientSession(timeout=self.timeout) as session: | |
| async with session.post( | |
| f"{self.API_URL}/audio/speech", | |
| json=payload, | |
| headers=self._headers(), | |
| ) as resp: | |
| resp.raise_for_status() | |
| return await resp.read() | |