| """ |
| DungeonMaster AI - ElevenLabs Voice Client |
| |
| Production-ready voice synthesis client with circuit breaker pattern, |
| rate limiting, audio caching, and comprehensive error handling. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import hashlib |
| import logging |
| import random |
| import time |
| from collections.abc import AsyncIterator |
| from datetime import datetime, timedelta |
| from typing import TYPE_CHECKING |
|
|
| from src.config.settings import get_settings |
|
|
| from .exceptions import ( |
| VoiceAuthenticationError, |
| VoiceCircuitBreakerOpenError, |
| VoiceConfigurationError, |
| VoiceNotFoundError, |
| VoiceQuotaExhaustedError, |
| VoiceRateLimitError, |
| VoiceSynthesisError, |
| VoiceUnavailableError, |
| ) |
| from .models import ( |
| SynthesisResult, |
| VoiceCircuitState, |
| VoiceModelType, |
| VoiceProfile, |
| VoiceServiceState, |
| VoiceServiceStatus, |
| VoiceSynthesisSettings, |
| VoiceType, |
| ) |
| from .voice_profiles import get_all_profiles, get_voice_profile |
|
|
| if TYPE_CHECKING: |
| from elevenlabs.client import AsyncElevenLabs, ElevenLabs |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class VoiceCircuitBreaker: |
| """ |
| Circuit breaker pattern for voice API calls. |
| |
| Prevents repeated calls to a failing service by tracking failures |
| and temporarily rejecting requests when failure threshold is reached. |
| |
| States: |
| - CLOSED: Normal operation, requests allowed |
| - OPEN: Too many failures, requests rejected for reset_timeout |
| - HALF_OPEN: Testing if service recovered with a single request |
| """ |
|
|
| def __init__( |
| self, |
| failure_threshold: int = 3, |
| reset_timeout: float = 60.0, |
| half_open_max_calls: int = 1, |
| ) -> None: |
| """ |
| Initialize circuit breaker. |
| |
| Args: |
| failure_threshold: Failures before opening circuit. |
| reset_timeout: Seconds to wait before testing recovery. |
| half_open_max_calls: Max calls allowed in half-open state. |
| """ |
| self.failure_threshold = failure_threshold |
| self.reset_timeout = reset_timeout |
| self.half_open_max_calls = half_open_max_calls |
|
|
| self._state = VoiceCircuitState.CLOSED |
| self._failure_count = 0 |
| self._last_failure_time: float | None = None |
| self._half_open_calls = 0 |
|
|
| @property |
| def state(self) -> VoiceCircuitState: |
| """Get current circuit breaker state with auto-transition.""" |
| |
| if ( |
| self._state == VoiceCircuitState.OPEN |
| and self._last_failure_time is not None |
| ): |
| elapsed = time.time() - self._last_failure_time |
| if elapsed >= self.reset_timeout: |
| self._state = VoiceCircuitState.HALF_OPEN |
| self._half_open_calls = 0 |
| logger.info("Voice circuit breaker transitioned to HALF_OPEN") |
|
|
| return self._state |
|
|
| @property |
| def is_open(self) -> bool: |
| """Check if circuit is open (rejecting requests).""" |
| return self.state == VoiceCircuitState.OPEN |
|
|
| @property |
| def time_until_retry(self) -> float | None: |
| """Seconds until retry is allowed, or None if not in OPEN state.""" |
| if self._state != VoiceCircuitState.OPEN: |
| return None |
| if self._last_failure_time is None: |
| return None |
| elapsed = time.time() - self._last_failure_time |
| remaining = self.reset_timeout - elapsed |
| return max(0.0, remaining) |
|
|
| def record_success(self) -> None: |
| """Record a successful call.""" |
| if self._state == VoiceCircuitState.HALF_OPEN: |
| self._state = VoiceCircuitState.CLOSED |
| logger.info("Voice circuit breaker closed after successful recovery") |
|
|
| self._failure_count = 0 |
| self._last_failure_time = None |
|
|
| def record_failure(self) -> None: |
| """Record a failed call.""" |
| self._failure_count += 1 |
| self._last_failure_time = time.time() |
|
|
| if self._state == VoiceCircuitState.HALF_OPEN: |
| self._state = VoiceCircuitState.OPEN |
| logger.warning("Voice circuit breaker reopened after half-open failure") |
| elif self._failure_count >= self.failure_threshold: |
| self._state = VoiceCircuitState.OPEN |
| logger.warning( |
| f"Voice circuit breaker opened after {self._failure_count} failures" |
| ) |
|
|
| def allow_request(self) -> bool: |
| """Check if a request should be allowed.""" |
| state = self.state |
|
|
| if state == VoiceCircuitState.CLOSED: |
| return True |
|
|
| if state == VoiceCircuitState.HALF_OPEN: |
| if self._half_open_calls < self.half_open_max_calls: |
| self._half_open_calls += 1 |
| return True |
| return False |
|
|
| |
| return False |
|
|
| def reset(self) -> None: |
| """Reset circuit breaker to initial state.""" |
| self._state = VoiceCircuitState.CLOSED |
| self._failure_count = 0 |
| self._last_failure_time = None |
| self._half_open_calls = 0 |
|
|
|
|
| |
| |
| |
|
|
|
|
| class RateLimiter: |
| """ |
| Token bucket rate limiter for voice API. |
| |
| Tracks both request count and character usage to stay |
| within ElevenLabs API limits. |
| """ |
|
|
| def __init__( |
| self, |
| max_requests: int = 10, |
| max_characters: int = 10000, |
| window_seconds: float = 60.0, |
| ) -> None: |
| """ |
| Initialize rate limiter. |
| |
| Args: |
| max_requests: Maximum requests per window. |
| max_characters: Maximum characters per window. |
| window_seconds: Time window in seconds. |
| """ |
| self.max_requests = max_requests |
| self.max_characters = max_characters |
| self.window_seconds = window_seconds |
|
|
| self._request_timestamps: list[float] = [] |
| self._character_usage: list[tuple[float, int]] = [] |
|
|
| def _cleanup_old_entries(self) -> None: |
| """Remove entries outside the time window.""" |
| now = time.time() |
| cutoff = now - self.window_seconds |
|
|
| self._request_timestamps = [ |
| ts for ts in self._request_timestamps if ts > cutoff |
| ] |
| self._character_usage = [ |
| (ts, chars) for ts, chars in self._character_usage if ts > cutoff |
| ] |
|
|
| def check_request(self, character_count: int) -> tuple[bool, float | None]: |
| """ |
| Check if a request is allowed. |
| |
| Args: |
| character_count: Number of characters in the request. |
| |
| Returns: |
| Tuple of (allowed, retry_after_seconds). |
| If allowed=False, retry_after_seconds indicates when to retry. |
| """ |
| self._cleanup_old_entries() |
|
|
| |
| if len(self._request_timestamps) >= self.max_requests: |
| oldest = min(self._request_timestamps) |
| retry_after = self.window_seconds - (time.time() - oldest) |
| return False, max(0.0, retry_after) |
|
|
| |
| total_chars = sum(chars for _, chars in self._character_usage) |
| if total_chars + character_count > self.max_characters: |
| if self._character_usage: |
| oldest = min(ts for ts, _ in self._character_usage) |
| retry_after = self.window_seconds - (time.time() - oldest) |
| return False, max(0.0, retry_after) |
|
|
| return True, None |
|
|
| def record_request(self, character_count: int) -> None: |
| """Record a request for rate limiting.""" |
| now = time.time() |
| self._request_timestamps.append(now) |
| self._character_usage.append((now, character_count)) |
|
|
| def get_remaining(self) -> tuple[int, int]: |
| """Get remaining requests and characters in current window.""" |
| self._cleanup_old_entries() |
|
|
| remaining_requests = max(0, self.max_requests - len(self._request_timestamps)) |
| total_chars = sum(chars for _, chars in self._character_usage) |
| remaining_chars = max(0, self.max_characters - total_chars) |
|
|
| return remaining_requests, remaining_chars |
|
|
|
|
| |
| |
| |
|
|
|
|
| class AudioCache: |
| """ |
| LRU cache with TTL for synthesized audio. |
| |
| Caches audio by content hash to avoid re-synthesizing |
| identical text with the same voice. |
| """ |
|
|
| def __init__( |
| self, |
| max_entries: int = 100, |
| max_size_bytes: int = 50 * 1024 * 1024, |
| default_ttl: timedelta = timedelta(minutes=30), |
| ) -> None: |
| """ |
| Initialize audio cache. |
| |
| Args: |
| max_entries: Maximum number of cache entries. |
| max_size_bytes: Maximum total size in bytes. |
| default_ttl: Default time-to-live for entries. |
| """ |
| self.max_entries = max_entries |
| self.max_size_bytes = max_size_bytes |
| self.default_ttl = default_ttl |
|
|
| self._cache: dict[str, tuple[bytes, datetime, timedelta]] = {} |
| self._access_times: dict[str, datetime] = {} |
| self._current_size = 0 |
|
|
| |
| self._hits = 0 |
| self._misses = 0 |
|
|
| def _generate_key(self, text: str, voice_type: VoiceType, model: str) -> str: |
| """Generate cache key from synthesis parameters.""" |
| content = f"{voice_type.value}:{model}:{text}" |
| return hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
| def get(self, text: str, voice_type: VoiceType, model: str) -> bytes | None: |
| """ |
| Get audio from cache if valid. |
| |
| Args: |
| text: Original text. |
| voice_type: Voice type used. |
| model: Model used. |
| |
| Returns: |
| Cached audio bytes or None if not found/expired. |
| """ |
| key = self._generate_key(text, voice_type, model) |
|
|
| if key not in self._cache: |
| self._misses += 1 |
| return None |
|
|
| audio, created_at, ttl = self._cache[key] |
|
|
| |
| if datetime.now() - created_at > ttl: |
| self._remove(key) |
| self._misses += 1 |
| return None |
|
|
| |
| self._access_times[key] = datetime.now() |
| self._hits += 1 |
|
|
| return audio |
|
|
| def put( |
| self, |
| text: str, |
| voice_type: VoiceType, |
| model: str, |
| audio: bytes, |
| ttl: timedelta | None = None, |
| ) -> None: |
| """ |
| Store audio in cache. |
| |
| Args: |
| text: Original text. |
| voice_type: Voice type used. |
| model: Model used. |
| audio: Audio bytes to cache. |
| ttl: Optional custom TTL. |
| """ |
| key = self._generate_key(text, voice_type, model) |
| ttl = ttl or self.default_ttl |
| audio_size = len(audio) |
|
|
| |
| while ( |
| len(self._cache) >= self.max_entries |
| or self._current_size + audio_size > self.max_size_bytes |
| ): |
| if not self._evict_oldest(): |
| break |
|
|
| |
| self._cache[key] = (audio, datetime.now(), ttl) |
| self._access_times[key] = datetime.now() |
| self._current_size += audio_size |
|
|
| def _remove(self, key: str) -> None: |
| """Remove entry from cache.""" |
| if key in self._cache: |
| audio, _, _ = self._cache[key] |
| self._current_size -= len(audio) |
| del self._cache[key] |
| self._access_times.pop(key, None) |
|
|
| def _evict_oldest(self) -> bool: |
| """Evict least recently used entry.""" |
| if not self._access_times: |
| return False |
|
|
| oldest_key = min(self._access_times, key=lambda k: self._access_times[k]) |
| self._remove(oldest_key) |
| return True |
|
|
| def clear(self) -> None: |
| """Clear entire cache.""" |
| self._cache.clear() |
| self._access_times.clear() |
| self._current_size = 0 |
|
|
| @property |
| def size(self) -> int: |
| """Get number of cached entries.""" |
| return len(self._cache) |
|
|
| @property |
| def hit_rate(self) -> float: |
| """Get cache hit rate.""" |
| total = self._hits + self._misses |
| return self._hits / total if total > 0 else 0.0 |
|
|
|
|
| |
| |
| |
|
|
|
|
| class VoiceClient: |
| """ |
| Production-ready ElevenLabs client with comprehensive error handling. |
| |
| Features: |
| - Circuit breaker pattern for failure protection |
| - Rate limiting to respect API limits |
| - Audio caching for repeated phrases |
| - Automatic retry with exponential backoff |
| - Graceful degradation to text-only mode |
| - Streaming and non-streaming synthesis |
| """ |
|
|
| def __init__( |
| self, |
| api_key: str | None = None, |
| model_id: str = "eleven_turbo_v2", |
| max_retries: int = 3, |
| retry_delay: float = 1.0, |
| cache_enabled: bool = True, |
| cache_max_entries: int = 100, |
| ) -> None: |
| """ |
| Initialize voice client. |
| |
| Args: |
| api_key: ElevenLabs API key (uses settings if None). |
| model_id: Default model for synthesis. |
| max_retries: Maximum retry attempts. |
| retry_delay: Base delay between retries. |
| cache_enabled: Enable audio caching. |
| cache_max_entries: Maximum cache entries. |
| """ |
| settings = get_settings() |
| self._api_key = api_key or settings.voice.elevenlabs_api_key |
| self._model_id = model_id |
| self._max_retries = max_retries |
| self._retry_delay = retry_delay |
| self._cache_enabled = cache_enabled |
|
|
| |
| self._sync_client: ElevenLabs | None = None |
| self._async_client: AsyncElevenLabs | None = None |
|
|
| |
| self._voice_profiles: dict[str, VoiceProfile] = {} |
|
|
| |
| self._circuit_breaker = VoiceCircuitBreaker( |
| failure_threshold=3, |
| reset_timeout=60.0, |
| ) |
| self._rate_limiter = RateLimiter( |
| max_requests=10, |
| max_characters=10000, |
| window_seconds=60.0, |
| ) |
|
|
| |
| self._cache: AudioCache | None = None |
| if cache_enabled: |
| self._cache = AudioCache(max_entries=cache_max_entries) |
|
|
| |
| self._state = VoiceServiceState.UNAVAILABLE |
| self._initialized = False |
| self._last_error: str | None = None |
| self._consecutive_failures = 0 |
| self._last_successful_call: datetime | None = None |
|
|
| async def initialize(self) -> bool: |
| """ |
| Initialize the client and verify API key. |
| |
| Returns: |
| True if initialization successful. |
| |
| Raises: |
| VoiceAuthenticationError: If API key is invalid. |
| VoiceConfigurationError: If configuration is invalid. |
| """ |
| if not self._api_key: |
| logger.warning("ElevenLabs API key not configured") |
| self._state = VoiceServiceState.UNAVAILABLE |
| self._last_error = "API key not configured" |
| return False |
|
|
| try: |
| |
| from elevenlabs.client import AsyncElevenLabs, ElevenLabs |
|
|
| self._sync_client = ElevenLabs(api_key=self._api_key) |
| self._async_client = AsyncElevenLabs(api_key=self._api_key) |
|
|
| |
| |
| logger.info("ElevenLabs client configured (will validate on first use)") |
|
|
| |
| self._register_default_profiles() |
|
|
| self._initialized = True |
| self._state = VoiceServiceState.AVAILABLE |
| self._last_error = None |
| logger.info("Voice client initialized successfully") |
| return True |
|
|
| except Exception as e: |
| error_str = str(e).lower() |
|
|
| if "401" in error_str or "unauthorized" in error_str or "invalid" in error_str: |
| logger.error(f"Invalid ElevenLabs API key: {e}") |
| self._state = VoiceServiceState.UNAVAILABLE |
| self._last_error = "Invalid API key" |
| raise VoiceAuthenticationError(str(e)) from e |
|
|
| logger.error(f"Failed to initialize voice client: {e}") |
| self._state = VoiceServiceState.UNAVAILABLE |
| self._last_error = str(e) |
| return False |
|
|
| def _register_default_profiles(self) -> None: |
| """Register default voice profiles from settings.""" |
| profiles = get_all_profiles() |
| for voice_type, profile in profiles.items(): |
| self._voice_profiles[profile.name] = profile |
|
|
| def register_voice( |
| self, |
| name: str, |
| voice_id: str, |
| voice_type: VoiceType, |
| settings: VoiceSynthesisSettings | None = None, |
| description: str = "", |
| ) -> None: |
| """ |
| Register a custom voice profile. |
| |
| Args: |
| name: Profile name. |
| voice_id: ElevenLabs voice ID. |
| voice_type: Voice type category. |
| settings: Optional synthesis settings. |
| description: Optional description. |
| """ |
| self._voice_profiles[name] = VoiceProfile( |
| name=name, |
| voice_id=voice_id, |
| voice_type=voice_type, |
| settings=settings or VoiceSynthesisSettings(), |
| description=description or f"Custom voice: {name}", |
| ) |
|
|
| async def get_available_voices(self) -> list[dict[str, str]]: |
| """ |
| Fetch available voices from ElevenLabs. |
| |
| Returns: |
| List of voice info dictionaries with 'name' and 'voice_id'. |
| """ |
| if not self._initialized or not self._async_client: |
| return [] |
|
|
| try: |
| response = await self._async_client.voices.get_all() |
| return [ |
| {"name": v.name, "voice_id": v.voice_id} |
| for v in response.voices |
| ] |
| except Exception as e: |
| logger.error(f"Failed to fetch voices: {e}") |
| return [] |
|
|
| async def synthesize( |
| self, |
| text: str, |
| voice_type: VoiceType = VoiceType.DM, |
| stream: bool = False, |
| model: str | None = None, |
| ) -> bytes | AsyncIterator[bytes] | None: |
| """ |
| Synthesize speech from text. |
| |
| Args: |
| text: Text to synthesize. |
| voice_type: Voice type to use. |
| stream: If True, return async iterator of chunks. |
| model: Override model (uses default if None). |
| |
| Returns: |
| Audio bytes, async iterator of chunks, or None on failure. |
| |
| Raises: |
| VoiceCircuitBreakerOpenError: If circuit breaker is open. |
| VoiceRateLimitError: If rate limited. |
| VoiceAuthenticationError: If auth fails. |
| VoiceQuotaExhaustedError: If quota exhausted. |
| """ |
| |
| if not self._initialized: |
| logger.warning("Voice client not initialized") |
| return None |
|
|
| if not self._circuit_breaker.allow_request(): |
| retry_after = self._circuit_breaker.time_until_retry |
| logger.warning(f"Circuit breaker open. Retry after {retry_after}s") |
| raise VoiceCircuitBreakerOpenError(retry_after_seconds=retry_after) |
|
|
| |
| char_count = len(text) |
| allowed, retry_after = self._rate_limiter.check_request(char_count) |
| if not allowed: |
| logger.warning(f"Rate limited. Retry after {retry_after}s") |
| raise VoiceRateLimitError(retry_after_seconds=retry_after) |
|
|
| |
| profile = get_voice_profile(voice_type) |
| model = model or self._model_id |
|
|
| |
| if not stream and self._cache: |
| cached = self._cache.get(text, voice_type, model) |
| if cached: |
| logger.debug("Cache hit for voice synthesis") |
| return cached |
|
|
| |
| if stream: |
| return self._synthesize_stream(text, profile, model) |
| else: |
| return await self._synthesize_with_retry(text, profile, model, char_count) |
|
|
| async def _synthesize_with_retry( |
| self, |
| text: str, |
| profile: VoiceProfile, |
| model: str, |
| char_count: int, |
| ) -> bytes | None: |
| """Synthesize with retry logic.""" |
| for attempt in range(self._max_retries): |
| try: |
| audio = await self._synthesize_full(text, profile, model) |
|
|
| if audio: |
| |
| self._record_success(char_count) |
|
|
| |
| if self._cache: |
| self._cache.put(text, profile.voice_type, model, audio) |
|
|
| return audio |
|
|
| except Exception as e: |
| self._handle_synthesis_error(e, attempt) |
|
|
| if attempt < self._max_retries - 1: |
| delay = self._calculate_backoff(attempt) |
| await asyncio.sleep(delay) |
|
|
| return None |
|
|
| async def _synthesize_full( |
| self, |
| text: str, |
| profile: VoiceProfile, |
| model: str, |
| ) -> bytes: |
| """Internal non-streaming synthesis.""" |
| if not self._async_client: |
| raise VoiceUnavailableError("Client not initialized") |
|
|
| |
| response = self._async_client.text_to_speech.convert( |
| text=text, |
| voice_id=profile.voice_id, |
| model_id=model, |
| voice_settings={ |
| "stability": profile.settings.stability, |
| "similarity_boost": profile.settings.similarity_boost, |
| "style": profile.settings.style, |
| "use_speaker_boost": profile.settings.use_speaker_boost, |
| }, |
| ) |
|
|
| |
| chunks = [] |
| async for chunk in response: |
| if isinstance(chunk, bytes): |
| chunks.append(chunk) |
| return b"".join(chunks) |
|
|
| async def _synthesize_stream( |
| self, |
| text: str, |
| profile: VoiceProfile, |
| model: str, |
| ) -> AsyncIterator[bytes]: |
| """Internal streaming synthesis.""" |
| if not self._async_client: |
| raise VoiceUnavailableError("Client not initialized") |
|
|
| try: |
| |
| response = self._async_client.text_to_speech.convert( |
| text=text, |
| voice_id=profile.voice_id, |
| model_id=model, |
| voice_settings={ |
| "stability": profile.settings.stability, |
| "similarity_boost": profile.settings.similarity_boost, |
| "style": profile.settings.style, |
| "use_speaker_boost": profile.settings.use_speaker_boost, |
| }, |
| ) |
|
|
| |
| async for chunk in response: |
| if isinstance(chunk, bytes): |
| yield chunk |
|
|
| self._record_success(len(text)) |
|
|
| except Exception as e: |
| self._handle_synthesis_error(e, 0) |
| raise |
|
|
| def synthesize_sync( |
| self, |
| text: str, |
| voice_type: VoiceType = VoiceType.DM, |
| ) -> bytes | None: |
| """ |
| Synchronous wrapper for synthesize. |
| |
| Args: |
| text: Text to synthesize. |
| voice_type: Voice type to use. |
| |
| Returns: |
| Audio bytes or None on failure. |
| """ |
| try: |
| loop = asyncio.get_event_loop() |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
|
|
| result = loop.run_until_complete( |
| self.synthesize(text, voice_type, stream=False) |
| ) |
|
|
| if isinstance(result, bytes): |
| return result |
| return None |
|
|
| def _handle_synthesis_error(self, error: Exception, attempt: int) -> None: |
| """Handle synthesis error and update state.""" |
| error_str = str(error).lower() |
|
|
| |
| if "401" in error_str or "unauthorized" in error_str: |
| self._state = VoiceServiceState.UNAVAILABLE |
| self._last_error = "Authentication failed" |
| self._circuit_breaker.record_failure() |
| raise VoiceAuthenticationError(str(error)) from error |
|
|
| |
| if "quota" in error_str or "limit exceeded" in error_str: |
| self._state = VoiceServiceState.UNAVAILABLE |
| self._last_error = "Quota exhausted" |
| self._circuit_breaker.record_failure() |
| raise VoiceQuotaExhaustedError(str(error)) from error |
|
|
| |
| if "429" in error_str or "rate" in error_str: |
| logger.warning(f"Rate limit from API (attempt {attempt + 1})") |
| |
| raise VoiceRateLimitError(str(error)) |
|
|
| |
| if "voice" in error_str and "not found" in error_str: |
| raise VoiceNotFoundError(str(error)) |
|
|
| |
| logger.warning(f"Synthesis error (attempt {attempt + 1}): {error}") |
| self._consecutive_failures += 1 |
| self._circuit_breaker.record_failure() |
| self._last_error = str(error) |
| self._state = VoiceServiceState.DEGRADED |
|
|
| def _record_success(self, char_count: int) -> None: |
| """Record successful synthesis.""" |
| self._circuit_breaker.record_success() |
| self._rate_limiter.record_request(char_count) |
| self._last_successful_call = datetime.now() |
| self._consecutive_failures = 0 |
| self._state = VoiceServiceState.AVAILABLE |
|
|
| def _calculate_backoff(self, attempt: int) -> float: |
| """Calculate exponential backoff with jitter.""" |
| delay = self._retry_delay * (2 ** attempt) |
| delay = min(delay, 30.0) |
| jitter = delay * 0.1 * random.random() |
| return delay + jitter |
|
|
| def get_status(self) -> VoiceServiceStatus: |
| """Get current service status.""" |
| return VoiceServiceStatus( |
| state=self._state, |
| circuit_state=self._circuit_breaker.state, |
| is_available=self.is_available, |
| is_initialized=self._initialized, |
| last_successful_call=self._last_successful_call, |
| consecutive_failures=self._consecutive_failures, |
| cache_size=self._cache.size if self._cache else 0, |
| cache_hit_rate=self._cache.hit_rate if self._cache else 0.0, |
| error_message=self._last_error, |
| ) |
|
|
| @property |
| def is_available(self) -> bool: |
| """Check if voice service is available.""" |
| return ( |
| self._initialized |
| and self._state != VoiceServiceState.UNAVAILABLE |
| and not self._circuit_breaker.is_open |
| ) |
|
|
| def get_unavailable_message(self) -> str: |
| """Get user-friendly message when voice is unavailable.""" |
| if not self._initialized: |
| return "Voice narration is not configured." |
|
|
| if self._state == VoiceServiceState.UNAVAILABLE: |
| if self._last_error: |
| return f"Voice narration is unavailable: {self._last_error}" |
| return "Voice narration is currently unavailable." |
|
|
| if self._circuit_breaker.is_open: |
| retry = self._circuit_breaker.time_until_retry |
| return f"Voice service is recovering. Will retry in {retry:.0f} seconds." |
|
|
| return "Voice narration is disabled." |
|
|
| def __repr__(self) -> str: |
| """String representation.""" |
| return ( |
| f"VoiceClient(initialized={self._initialized}, " |
| f"state={self._state.value}, " |
| f"available={self.is_available})" |
| ) |
|
|