"""LiveKit Agents konfigurācija Maris realtime voice runtime'iem.""" from __future__ import annotations import os from dataclasses import asdict, dataclass from typing import Any @dataclass(slots=True, frozen=True) class LiveKitVoiceAgentSettings: livekit_url: str api_key: str api_secret: str room_prefix: str = "maris-voice" agent_name: str = "maris-livekit-agent" default_language: str = "lv" default_voice: str = "maris" stt_provider: str = "speechmatics" llm_provider: str = "maris-stream" tts_provider: str = "azure" vad_provider: str = "silero" allow_barge_in: bool = True def load_livekit_voice_agent_settings( environ: dict[str, str] | None = None, ) -> LiveKitVoiceAgentSettings: env = environ or os.environ return LiveKitVoiceAgentSettings( livekit_url=str(env.get("LIVEKIT_URL", "")).strip(), api_key=str(env.get("LIVEKIT_API_KEY", "")).strip(), api_secret=str(env.get("LIVEKIT_API_SECRET", "")).strip(), room_prefix=str(env.get("MARIS_LIVEKIT_ROOM_PREFIX", "maris-voice")).strip() or "maris-voice", agent_name=str(env.get("MARIS_LIVEKIT_AGENT_NAME", "maris-livekit-agent")).strip() or "maris-livekit-agent", default_language=str(env.get("MARIS_VOICE_LANGUAGE", "lv")).strip() or "lv", default_voice=str(env.get("MARIS_VOICE_NAME", "maris")).strip() or "maris", stt_provider=str(env.get("MARIS_VOICE_STT_PROVIDER", "speechmatics")).strip() or "speechmatics", llm_provider=str(env.get("MARIS_VOICE_LLM_PROVIDER", "maris-stream")).strip() or "maris-stream", tts_provider=str(env.get("MARIS_VOICE_TTS_PROVIDER", "azure")).strip() or "azure", vad_provider=str(env.get("MARIS_VOICE_VAD_PROVIDER", "silero")).strip() or "silero", allow_barge_in=str(env.get("MARIS_VOICE_ALLOW_BARGE_IN", "true")).strip().lower() in {"1", "true", "yes", "on"}, ) def build_livekit_agent_metadata(settings: LiveKitVoiceAgentSettings) -> dict[str, Any]: payload = asdict(settings) payload["features"] = { "streaming_stt": True, "streaming_llm": True, "streaming_tts": True, "vad": True, "barge_in": settings.allow_barge_in, } payload["runtime"] = "livekit-agents" return payload def require_livekit_agents_runtime() -> tuple[Any, Any]: """Ielādē LiveKit runtime tikai tad, kad tas tiešām vajadzīgs.""" try: from livekit import agents, rtc # type: ignore except ImportError as exc: # pragma: no cover - atkarīgs no vides raise RuntimeError( "LiveKit Agents runtime nav instalēts. " "Instalē livekit pakotni un konfigurē LIVEKIT_URL/LIVEKIT_API_KEY/LIVEKIT_API_SECRET." ) from exc return agents, rtc