DungeonMaster-AI / src /voice /elevenlabs_client.py
bhupesh-sf's picture
first commit
f8ba6bf verified
"""
DungeonMaster AI - ElevenLabs Voice Client
Production-ready voice synthesis client with circuit breaker pattern,
rate limiting, audio caching, and comprehensive error handling.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import random
import time
from collections.abc import AsyncIterator
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from src.config.settings import get_settings
from .exceptions import (
VoiceAuthenticationError,
VoiceCircuitBreakerOpenError,
VoiceConfigurationError,
VoiceNotFoundError,
VoiceQuotaExhaustedError,
VoiceRateLimitError,
VoiceSynthesisError,
VoiceUnavailableError,
)
from .models import (
SynthesisResult,
VoiceCircuitState,
VoiceModelType,
VoiceProfile,
VoiceServiceState,
VoiceServiceStatus,
VoiceSynthesisSettings,
VoiceType,
)
from .voice_profiles import get_all_profiles, get_voice_profile
if TYPE_CHECKING:
from elevenlabs.client import AsyncElevenLabs, ElevenLabs
logger = logging.getLogger(__name__)
# =============================================================================
# Circuit Breaker
# =============================================================================
class VoiceCircuitBreaker:
"""
Circuit breaker pattern for voice API calls.
Prevents repeated calls to a failing service by tracking failures
and temporarily rejecting requests when failure threshold is reached.
States:
- CLOSED: Normal operation, requests allowed
- OPEN: Too many failures, requests rejected for reset_timeout
- HALF_OPEN: Testing if service recovered with a single request
"""
def __init__(
self,
failure_threshold: int = 3,
reset_timeout: float = 60.0,
half_open_max_calls: int = 1,
) -> None:
"""
Initialize circuit breaker.
Args:
failure_threshold: Failures before opening circuit.
reset_timeout: Seconds to wait before testing recovery.
half_open_max_calls: Max calls allowed in half-open state.
"""
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.half_open_max_calls = half_open_max_calls
self._state = VoiceCircuitState.CLOSED
self._failure_count = 0
self._last_failure_time: float | None = None
self._half_open_calls = 0
@property
def state(self) -> VoiceCircuitState:
"""Get current circuit breaker state with auto-transition."""
# Check if we should transition from OPEN to HALF_OPEN
if (
self._state == VoiceCircuitState.OPEN
and self._last_failure_time is not None
):
elapsed = time.time() - self._last_failure_time
if elapsed >= self.reset_timeout:
self._state = VoiceCircuitState.HALF_OPEN
self._half_open_calls = 0
logger.info("Voice circuit breaker transitioned to HALF_OPEN")
return self._state
@property
def is_open(self) -> bool:
"""Check if circuit is open (rejecting requests)."""
return self.state == VoiceCircuitState.OPEN
@property
def time_until_retry(self) -> float | None:
"""Seconds until retry is allowed, or None if not in OPEN state."""
if self._state != VoiceCircuitState.OPEN:
return None
if self._last_failure_time is None:
return None
elapsed = time.time() - self._last_failure_time
remaining = self.reset_timeout - elapsed
return max(0.0, remaining)
def record_success(self) -> None:
"""Record a successful call."""
if self._state == VoiceCircuitState.HALF_OPEN:
self._state = VoiceCircuitState.CLOSED
logger.info("Voice circuit breaker closed after successful recovery")
self._failure_count = 0
self._last_failure_time = None
def record_failure(self) -> None:
"""Record a failed call."""
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == VoiceCircuitState.HALF_OPEN:
self._state = VoiceCircuitState.OPEN
logger.warning("Voice circuit breaker reopened after half-open failure")
elif self._failure_count >= self.failure_threshold:
self._state = VoiceCircuitState.OPEN
logger.warning(
f"Voice circuit breaker opened after {self._failure_count} failures"
)
def allow_request(self) -> bool:
"""Check if a request should be allowed."""
state = self.state # This may transition OPEN -> HALF_OPEN
if state == VoiceCircuitState.CLOSED:
return True
if state == VoiceCircuitState.HALF_OPEN:
if self._half_open_calls < self.half_open_max_calls:
self._half_open_calls += 1
return True
return False
# OPEN state
return False
def reset(self) -> None:
"""Reset circuit breaker to initial state."""
self._state = VoiceCircuitState.CLOSED
self._failure_count = 0
self._last_failure_time = None
self._half_open_calls = 0
# =============================================================================
# Rate Limiter
# =============================================================================
class RateLimiter:
"""
Token bucket rate limiter for voice API.
Tracks both request count and character usage to stay
within ElevenLabs API limits.
"""
def __init__(
self,
max_requests: int = 10,
max_characters: int = 10000,
window_seconds: float = 60.0,
) -> None:
"""
Initialize rate limiter.
Args:
max_requests: Maximum requests per window.
max_characters: Maximum characters per window.
window_seconds: Time window in seconds.
"""
self.max_requests = max_requests
self.max_characters = max_characters
self.window_seconds = window_seconds
self._request_timestamps: list[float] = []
self._character_usage: list[tuple[float, int]] = []
def _cleanup_old_entries(self) -> None:
"""Remove entries outside the time window."""
now = time.time()
cutoff = now - self.window_seconds
self._request_timestamps = [
ts for ts in self._request_timestamps if ts > cutoff
]
self._character_usage = [
(ts, chars) for ts, chars in self._character_usage if ts > cutoff
]
def check_request(self, character_count: int) -> tuple[bool, float | None]:
"""
Check if a request is allowed.
Args:
character_count: Number of characters in the request.
Returns:
Tuple of (allowed, retry_after_seconds).
If allowed=False, retry_after_seconds indicates when to retry.
"""
self._cleanup_old_entries()
# Check request count
if len(self._request_timestamps) >= self.max_requests:
oldest = min(self._request_timestamps)
retry_after = self.window_seconds - (time.time() - oldest)
return False, max(0.0, retry_after)
# Check character limit
total_chars = sum(chars for _, chars in self._character_usage)
if total_chars + character_count > self.max_characters:
if self._character_usage:
oldest = min(ts for ts, _ in self._character_usage)
retry_after = self.window_seconds - (time.time() - oldest)
return False, max(0.0, retry_after)
return True, None
def record_request(self, character_count: int) -> None:
"""Record a request for rate limiting."""
now = time.time()
self._request_timestamps.append(now)
self._character_usage.append((now, character_count))
def get_remaining(self) -> tuple[int, int]:
"""Get remaining requests and characters in current window."""
self._cleanup_old_entries()
remaining_requests = max(0, self.max_requests - len(self._request_timestamps))
total_chars = sum(chars for _, chars in self._character_usage)
remaining_chars = max(0, self.max_characters - total_chars)
return remaining_requests, remaining_chars
# =============================================================================
# Audio Cache
# =============================================================================
class AudioCache:
"""
LRU cache with TTL for synthesized audio.
Caches audio by content hash to avoid re-synthesizing
identical text with the same voice.
"""
def __init__(
self,
max_entries: int = 100,
max_size_bytes: int = 50 * 1024 * 1024, # 50 MB
default_ttl: timedelta = timedelta(minutes=30),
) -> None:
"""
Initialize audio cache.
Args:
max_entries: Maximum number of cache entries.
max_size_bytes: Maximum total size in bytes.
default_ttl: Default time-to-live for entries.
"""
self.max_entries = max_entries
self.max_size_bytes = max_size_bytes
self.default_ttl = default_ttl
self._cache: dict[str, tuple[bytes, datetime, timedelta]] = {}
self._access_times: dict[str, datetime] = {}
self._current_size = 0
# Statistics
self._hits = 0
self._misses = 0
def _generate_key(self, text: str, voice_type: VoiceType, model: str) -> str:
"""Generate cache key from synthesis parameters."""
content = f"{voice_type.value}:{model}:{text}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def get(self, text: str, voice_type: VoiceType, model: str) -> bytes | None:
"""
Get audio from cache if valid.
Args:
text: Original text.
voice_type: Voice type used.
model: Model used.
Returns:
Cached audio bytes or None if not found/expired.
"""
key = self._generate_key(text, voice_type, model)
if key not in self._cache:
self._misses += 1
return None
audio, created_at, ttl = self._cache[key]
# Check if expired
if datetime.now() - created_at > ttl:
self._remove(key)
self._misses += 1
return None
# Update access time (for LRU)
self._access_times[key] = datetime.now()
self._hits += 1
return audio
def put(
self,
text: str,
voice_type: VoiceType,
model: str,
audio: bytes,
ttl: timedelta | None = None,
) -> None:
"""
Store audio in cache.
Args:
text: Original text.
voice_type: Voice type used.
model: Model used.
audio: Audio bytes to cache.
ttl: Optional custom TTL.
"""
key = self._generate_key(text, voice_type, model)
ttl = ttl or self.default_ttl
audio_size = len(audio)
# Evict if necessary
while (
len(self._cache) >= self.max_entries
or self._current_size + audio_size > self.max_size_bytes
):
if not self._evict_oldest():
break
# Store
self._cache[key] = (audio, datetime.now(), ttl)
self._access_times[key] = datetime.now()
self._current_size += audio_size
def _remove(self, key: str) -> None:
"""Remove entry from cache."""
if key in self._cache:
audio, _, _ = self._cache[key]
self._current_size -= len(audio)
del self._cache[key]
self._access_times.pop(key, None)
def _evict_oldest(self) -> bool:
"""Evict least recently used entry."""
if not self._access_times:
return False
oldest_key = min(self._access_times, key=lambda k: self._access_times[k])
self._remove(oldest_key)
return True
def clear(self) -> None:
"""Clear entire cache."""
self._cache.clear()
self._access_times.clear()
self._current_size = 0
@property
def size(self) -> int:
"""Get number of cached entries."""
return len(self._cache)
@property
def hit_rate(self) -> float:
"""Get cache hit rate."""
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
# =============================================================================
# Voice Client
# =============================================================================
class VoiceClient:
"""
Production-ready ElevenLabs client with comprehensive error handling.
Features:
- Circuit breaker pattern for failure protection
- Rate limiting to respect API limits
- Audio caching for repeated phrases
- Automatic retry with exponential backoff
- Graceful degradation to text-only mode
- Streaming and non-streaming synthesis
"""
def __init__(
self,
api_key: str | None = None,
model_id: str = "eleven_turbo_v2",
max_retries: int = 3,
retry_delay: float = 1.0,
cache_enabled: bool = True,
cache_max_entries: int = 100,
) -> None:
"""
Initialize voice client.
Args:
api_key: ElevenLabs API key (uses settings if None).
model_id: Default model for synthesis.
max_retries: Maximum retry attempts.
retry_delay: Base delay between retries.
cache_enabled: Enable audio caching.
cache_max_entries: Maximum cache entries.
"""
settings = get_settings()
self._api_key = api_key or settings.voice.elevenlabs_api_key
self._model_id = model_id
self._max_retries = max_retries
self._retry_delay = retry_delay
self._cache_enabled = cache_enabled
# SDK clients (initialized lazily)
self._sync_client: ElevenLabs | None = None
self._async_client: AsyncElevenLabs | None = None
# Voice profiles
self._voice_profiles: dict[str, VoiceProfile] = {}
# Resilience components
self._circuit_breaker = VoiceCircuitBreaker(
failure_threshold=3,
reset_timeout=60.0,
)
self._rate_limiter = RateLimiter(
max_requests=10,
max_characters=10000,
window_seconds=60.0,
)
# Audio cache
self._cache: AudioCache | None = None
if cache_enabled:
self._cache = AudioCache(max_entries=cache_max_entries)
# State tracking
self._state = VoiceServiceState.UNAVAILABLE
self._initialized = False
self._last_error: str | None = None
self._consecutive_failures = 0
self._last_successful_call: datetime | None = None
async def initialize(self) -> bool:
"""
Initialize the client and verify API key.
Returns:
True if initialization successful.
Raises:
VoiceAuthenticationError: If API key is invalid.
VoiceConfigurationError: If configuration is invalid.
"""
if not self._api_key:
logger.warning("ElevenLabs API key not configured")
self._state = VoiceServiceState.UNAVAILABLE
self._last_error = "API key not configured"
return False
try:
# Import and create clients
from elevenlabs.client import AsyncElevenLabs, ElevenLabs
self._sync_client = ElevenLabs(api_key=self._api_key)
self._async_client = AsyncElevenLabs(api_key=self._api_key)
# API key will be validated on first TTS call
# No voices.get_all() needed - we use hardcoded voice IDs from settings
logger.info("ElevenLabs client configured (will validate on first use)")
# Register default voice profiles
self._register_default_profiles()
self._initialized = True
self._state = VoiceServiceState.AVAILABLE
self._last_error = None
logger.info("Voice client initialized successfully")
return True
except Exception as e:
error_str = str(e).lower()
if "401" in error_str or "unauthorized" in error_str or "invalid" in error_str:
logger.error(f"Invalid ElevenLabs API key: {e}")
self._state = VoiceServiceState.UNAVAILABLE
self._last_error = "Invalid API key"
raise VoiceAuthenticationError(str(e)) from e
logger.error(f"Failed to initialize voice client: {e}")
self._state = VoiceServiceState.UNAVAILABLE
self._last_error = str(e)
return False
def _register_default_profiles(self) -> None:
"""Register default voice profiles from settings."""
profiles = get_all_profiles()
for voice_type, profile in profiles.items():
self._voice_profiles[profile.name] = profile
def register_voice(
self,
name: str,
voice_id: str,
voice_type: VoiceType,
settings: VoiceSynthesisSettings | None = None,
description: str = "",
) -> None:
"""
Register a custom voice profile.
Args:
name: Profile name.
voice_id: ElevenLabs voice ID.
voice_type: Voice type category.
settings: Optional synthesis settings.
description: Optional description.
"""
self._voice_profiles[name] = VoiceProfile(
name=name,
voice_id=voice_id,
voice_type=voice_type,
settings=settings or VoiceSynthesisSettings(),
description=description or f"Custom voice: {name}",
)
async def get_available_voices(self) -> list[dict[str, str]]:
"""
Fetch available voices from ElevenLabs.
Returns:
List of voice info dictionaries with 'name' and 'voice_id'.
"""
if not self._initialized or not self._async_client:
return []
try:
response = await self._async_client.voices.get_all()
return [
{"name": v.name, "voice_id": v.voice_id}
for v in response.voices
]
except Exception as e:
logger.error(f"Failed to fetch voices: {e}")
return []
async def synthesize(
self,
text: str,
voice_type: VoiceType = VoiceType.DM,
stream: bool = False,
model: str | None = None,
) -> bytes | AsyncIterator[bytes] | None:
"""
Synthesize speech from text.
Args:
text: Text to synthesize.
voice_type: Voice type to use.
stream: If True, return async iterator of chunks.
model: Override model (uses default if None).
Returns:
Audio bytes, async iterator of chunks, or None on failure.
Raises:
VoiceCircuitBreakerOpenError: If circuit breaker is open.
VoiceRateLimitError: If rate limited.
VoiceAuthenticationError: If auth fails.
VoiceQuotaExhaustedError: If quota exhausted.
"""
# Pre-flight checks
if not self._initialized:
logger.warning("Voice client not initialized")
return None
if not self._circuit_breaker.allow_request():
retry_after = self._circuit_breaker.time_until_retry
logger.warning(f"Circuit breaker open. Retry after {retry_after}s")
raise VoiceCircuitBreakerOpenError(retry_after_seconds=retry_after)
# Rate limit check
char_count = len(text)
allowed, retry_after = self._rate_limiter.check_request(char_count)
if not allowed:
logger.warning(f"Rate limited. Retry after {retry_after}s")
raise VoiceRateLimitError(retry_after_seconds=retry_after)
# Get profile
profile = get_voice_profile(voice_type)
model = model or self._model_id
# Cache check (non-streaming only)
if not stream and self._cache:
cached = self._cache.get(text, voice_type, model)
if cached:
logger.debug("Cache hit for voice synthesis")
return cached
# Synthesize with retry
if stream:
return self._synthesize_stream(text, profile, model)
else:
return await self._synthesize_with_retry(text, profile, model, char_count)
async def _synthesize_with_retry(
self,
text: str,
profile: VoiceProfile,
model: str,
char_count: int,
) -> bytes | None:
"""Synthesize with retry logic."""
for attempt in range(self._max_retries):
try:
audio = await self._synthesize_full(text, profile, model)
if audio:
# Record success
self._record_success(char_count)
# Cache result
if self._cache:
self._cache.put(text, profile.voice_type, model, audio)
return audio
except Exception as e:
self._handle_synthesis_error(e, attempt)
if attempt < self._max_retries - 1:
delay = self._calculate_backoff(attempt)
await asyncio.sleep(delay)
return None
async def _synthesize_full(
self,
text: str,
profile: VoiceProfile,
model: str,
) -> bytes:
"""Internal non-streaming synthesis."""
if not self._async_client:
raise VoiceUnavailableError("Client not initialized")
# convert() returns an async generator, not a coroutine
response = self._async_client.text_to_speech.convert(
text=text,
voice_id=profile.voice_id,
model_id=model,
voice_settings={
"stability": profile.settings.stability,
"similarity_boost": profile.settings.similarity_boost,
"style": profile.settings.style,
"use_speaker_boost": profile.settings.use_speaker_boost,
},
)
# Collect all chunks from the async generator
chunks = []
async for chunk in response:
if isinstance(chunk, bytes):
chunks.append(chunk)
return b"".join(chunks)
async def _synthesize_stream(
self,
text: str,
profile: VoiceProfile,
model: str,
) -> AsyncIterator[bytes]:
"""Internal streaming synthesis."""
if not self._async_client:
raise VoiceUnavailableError("Client not initialized")
try:
# convert() returns an async generator directly
response = self._async_client.text_to_speech.convert(
text=text,
voice_id=profile.voice_id,
model_id=model,
voice_settings={
"stability": profile.settings.stability,
"similarity_boost": profile.settings.similarity_boost,
"style": profile.settings.style,
"use_speaker_boost": profile.settings.use_speaker_boost,
},
)
# Yield chunks from the async generator
async for chunk in response:
if isinstance(chunk, bytes):
yield chunk
self._record_success(len(text))
except Exception as e:
self._handle_synthesis_error(e, 0)
raise
def synthesize_sync(
self,
text: str,
voice_type: VoiceType = VoiceType.DM,
) -> bytes | None:
"""
Synchronous wrapper for synthesize.
Args:
text: Text to synthesize.
voice_type: Voice type to use.
Returns:
Audio bytes or None on failure.
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
self.synthesize(text, voice_type, stream=False)
)
if isinstance(result, bytes):
return result
return None
def _handle_synthesis_error(self, error: Exception, attempt: int) -> None:
"""Handle synthesis error and update state."""
error_str = str(error).lower()
# Authentication error - no retry
if "401" in error_str or "unauthorized" in error_str:
self._state = VoiceServiceState.UNAVAILABLE
self._last_error = "Authentication failed"
self._circuit_breaker.record_failure()
raise VoiceAuthenticationError(str(error)) from error
# Quota exhausted - no retry
if "quota" in error_str or "limit exceeded" in error_str:
self._state = VoiceServiceState.UNAVAILABLE
self._last_error = "Quota exhausted"
self._circuit_breaker.record_failure()
raise VoiceQuotaExhaustedError(str(error)) from error
# Rate limit - may retry
if "429" in error_str or "rate" in error_str:
logger.warning(f"Rate limit from API (attempt {attempt + 1})")
# Don't record failure for rate limits
raise VoiceRateLimitError(str(error))
# Voice not found - no retry
if "voice" in error_str and "not found" in error_str:
raise VoiceNotFoundError(str(error))
# Generic error
logger.warning(f"Synthesis error (attempt {attempt + 1}): {error}")
self._consecutive_failures += 1
self._circuit_breaker.record_failure()
self._last_error = str(error)
self._state = VoiceServiceState.DEGRADED
def _record_success(self, char_count: int) -> None:
"""Record successful synthesis."""
self._circuit_breaker.record_success()
self._rate_limiter.record_request(char_count)
self._last_successful_call = datetime.now()
self._consecutive_failures = 0
self._state = VoiceServiceState.AVAILABLE
def _calculate_backoff(self, attempt: int) -> float:
"""Calculate exponential backoff with jitter."""
delay = self._retry_delay * (2 ** attempt)
delay = min(delay, 30.0) # Cap at 30 seconds
jitter = delay * 0.1 * random.random()
return delay + jitter
def get_status(self) -> VoiceServiceStatus:
"""Get current service status."""
return VoiceServiceStatus(
state=self._state,
circuit_state=self._circuit_breaker.state,
is_available=self.is_available,
is_initialized=self._initialized,
last_successful_call=self._last_successful_call,
consecutive_failures=self._consecutive_failures,
cache_size=self._cache.size if self._cache else 0,
cache_hit_rate=self._cache.hit_rate if self._cache else 0.0,
error_message=self._last_error,
)
@property
def is_available(self) -> bool:
"""Check if voice service is available."""
return (
self._initialized
and self._state != VoiceServiceState.UNAVAILABLE
and not self._circuit_breaker.is_open
)
def get_unavailable_message(self) -> str:
"""Get user-friendly message when voice is unavailable."""
if not self._initialized:
return "Voice narration is not configured."
if self._state == VoiceServiceState.UNAVAILABLE:
if self._last_error:
return f"Voice narration is unavailable: {self._last_error}"
return "Voice narration is currently unavailable."
if self._circuit_breaker.is_open:
retry = self._circuit_breaker.time_until_retry
return f"Voice service is recovering. Will retry in {retry:.0f} seconds."
return "Voice narration is disabled."
def __repr__(self) -> str:
"""String representation."""
return (
f"VoiceClient(initialized={self._initialized}, "
f"state={self._state.value}, "
f"available={self.is_available})"
)