| from functools import lru_cache |
| from typing import List, Tuple, Optional, Dict |
| import aiohttp |
| import elevenlabs |
| import time |
| from contextlib import asynccontextmanager |
| from logger import setup_logger, log_execution_time, log_async_execution_time |
| from models import OpenRouterModel, OpenRouterRequest, OpenRouterResponse, Message |
|
|
| logger = setup_logger("api_clients") |
|
|
| def preprocess_text(text: str) -> str: |
| """ |
| Clean and format text by removing unwanted characters and formatting |
| |
| Args: |
| text: Raw input text |
| |
| Returns: |
| Cleaned text suitable for podcast generation |
| """ |
| import re |
| |
| |
| text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) |
| text = re.sub(r'\*(.+?)\*', r'\1', text) |
| text = re.sub(r'__(.+?)__', r'\1', text) |
| text = re.sub(r'~~(.+?)~~', r'\1', text) |
| |
| |
| text = re.sub(r'\[.*?\]', '', text) |
| text = re.sub(r'\{.*?\}', '', text) |
| text = re.sub(r'<.*?>', '', text) |
|
|
| |
| text = re.sub(r'\((?:pause|break|music|sfx|sound effect|jingle).*?\)', '', text, flags=re.IGNORECASE) |
| text = re.sub(r'\((host|speaker|guest)\s*\d*\s*:?\)', '', text, flags=re.IGNORECASE) |
| text = re.sub(r'#\s*\d+\s*[:.-]', '', text) |
| |
| |
| return ' '.join(text.split()) |
|
|
| class OpenRouterClient: |
| """Handles OpenRouter API interactions with comprehensive logging and error tracking""" |
| |
| def __init__(self, api_key: str): |
| logger.info("Initializing OpenRouter client") |
| self.api_key = api_key |
| self.base_url = "https://openrouter.ai/api/v1" |
| self.headers = { |
| "Authorization": f"Bearer {api_key}", |
| "HTTP-Referer": "https://localhost:7860", |
| "X-Title": "URL to Podcast Generator", |
| "Content-Type": "application/json" |
| } |
| logger.debug("OpenRouter client initialized successfully") |
| |
| @property |
| def api_key(self): |
| return self._api_key |
|
|
| @api_key.setter |
| def api_key(self, value: str): |
| if not value or len(value) < 32: |
| logger.error("Invalid API key format") |
| raise ValueError("Invalid OpenRouter API key") |
| self._api_key = value |
| |
| self.headers = { |
| "Authorization": f"Bearer {value}", |
| "HTTP-Referer": "https://localhost:7860", |
| "X-Title": "URL to Podcast Generator", |
| "Content-Type": "application/json", |
| } |
| logger.info("OpenRouter API key updated successfully") |
| |
| @asynccontextmanager |
| async def get_session(self): |
| logger.debug("Creating new aiohttp session") |
| async with aiohttp.ClientSession(headers=self.headers) as session: |
| yield session |
| |
| @lru_cache(maxsize=1) |
| async def get_models(self) -> List[Tuple[str, str]]: |
| """ |
| Fetch available models from OpenRouter API using pydantic models |
| |
| Returns: |
| List of tuples containing (model_id, model_id) where both values are the same |
| """ |
| logger.info("Fetching available models from OpenRouter") |
| async with self.get_session() as session: |
| async with session.get(f"{self.base_url}/models") as response: |
| response.raise_for_status() |
| data = await response.json() |
| models = [OpenRouterModel(**model) for model in data["data"]] |
| logger.info(f"Successfully fetched {len(models)} models") |
| return [(model.name, model.id) for model in models] |
|
|
| @log_async_execution_time(logger) |
| async def generate_script(self, content: str, prompt: str, model_id: str) -> str: |
| """ |
| Generate a podcast script with detailed progress tracking and validation |
| |
| Performance metrics and content analysis are logged at each step. |
| """ |
| logger.info(f"Starting script generation with model: {model_id}") |
| logger.debug(f"Input metrics - Content: {len(content)} chars, Prompt: {len(prompt)} chars") |
| |
| |
| if not content or len(content) < 100: |
| logger.error("Content too short for meaningful script generation") |
| raise ValueError("Insufficient content for script generation") |
| |
| if not prompt or len(prompt) < 10: |
| logger.error("Prompt too short or missing") |
| raise ValueError("Please provide a more detailed prompt") |
| |
| |
| cleaned_content = preprocess_text(content) |
| cleaned_prompt = preprocess_text(prompt) |
| |
| system_prompt = """You are an expert podcast script writer. Your task is to create engaging, |
| natural-sounding podcast scripts that flow conversationally while being informative and engaging. |
| |
| Follow these guidelines: |
| 1. Write in a conversational, natural speaking style that sounds authentic |
| 2. Break complex topics into digestible segments with clear transitions |
| 3. Avoid technical jargon unless necessary, explaining complex terms when used |
| 4. Use natural speech patterns: |
| - Contractions (I'm, we're, let's) |
| - Casual language |
| - Rhetorical questions to engage listeners |
| 5. Include brief pauses for emphasis and pacing (but don't mark them explicitly) |
| 6. Incorporate storytelling elements to maintain engagement |
| 7. End with a clear conclusion and call-to-action |
| 8. Keep paragraphs short and focused for easier delivery |
| 9. Use simple sentence structures that flow naturally when spoken |
| |
| Format the script for natural speech, avoiding any special characters or formatting.""" |
|
|
| user_prompt = f"""Create a podcast script based on the following topic and content: |
| |
| Topic: {cleaned_prompt} |
| |
| Content to cover: {cleaned_content} |
| |
| Focus on making it engaging and natural to listen to.""" |
|
|
| try: |
| request = OpenRouterRequest( |
| model=model_id, |
| messages=[ |
| Message(role="system", content=system_prompt), |
| Message(role="user", content=user_prompt) |
| ] |
| ) |
| |
| async with self.get_session() as session: |
| async with session.post( |
| f"{self.base_url}/chat/completions", |
| json=request.dict() |
| ) as response: |
| response.raise_for_status() |
| data = await response.json() |
| router_response = OpenRouterResponse(**data) |
| logger.debug(f"Generated script length: {len(router_response.choices[0].message.content)} chars") |
| return router_response.choices[0].message.content |
| |
| except Exception as e: |
| logger.error(f"Script generation failed", exc_info=True) |
| raise |
|
|
| class ElevenLabsClient: |
| def __init__(self, api_key: str): |
| self.api_key = api_key |
| elevenlabs.set_api_key(api_key) |
|
|
| def get_voices(self) -> List[Tuple[str, str]]: |
| """ |
| Synchronously get available voices from ElevenLabs |
| |
| Returns: |
| List of tuples containing (voice_id, display_name) |
| where display_name shows the name and description but not the ID |
| """ |
| try: |
| voices = elevenlabs.voices() |
| return [(voice.voice_id, f"{voice.name} ({voice.labels.get('accent', 'No accent')})" + |
| (f" - {voice.description[:50]}..." if voice.description else "")) |
| for voice in voices] |
| except Exception as e: |
| logger.error("Failed to fetch voices from ElevenLabs", exc_info=True) |
| raise |
|
|
| async def generate_audio(self, text: str, voice_id: str): |
| """Generate audio synchronously""" |
| logger.info(f"Starting audio generation with voice: {voice_id}") |
| logger.debug(f"Input text length: {len(text)} chars") |
| |
| if len(text) > 5000: |
| logger.warning(f"Long text detected ({len(text)} chars), may impact performance") |
| |
| try: |
| start_time = time.time() |
| audio = await elevenlabs.generate( |
| text=text, |
| voice=voice_id, |
| model="eleven_monolingual_v1" |
| ) |
| |
| duration = time.time() - start_time |
| audio_size = len(audio) |
| logger.info(f"Audio generated: {audio_size} bytes in {duration:.2f} seconds") |
| logger.debug(f"Audio generation rate: {len(text)/duration:.2f} chars/second") |
| |
| return audio |
| except Exception as e: |
| logger.error("Audio generation failed", exc_info=True) |
| raise |
|
|