MarisUK's picture
Maris AI model sync
f440f03 verified
"""LiveKit Agents konfigurācija Maris realtime voice runtime'iem."""
from __future__ import annotations
import os
from dataclasses import asdict, dataclass
from typing import Any
@dataclass(slots=True, frozen=True)
class LiveKitVoiceAgentSettings:
livekit_url: str
api_key: str
api_secret: str
room_prefix: str = "maris-voice"
agent_name: str = "maris-livekit-agent"
default_language: str = "lv"
default_voice: str = "maris"
stt_provider: str = "speechmatics"
llm_provider: str = "maris-stream"
tts_provider: str = "azure"
vad_provider: str = "silero"
allow_barge_in: bool = True
def load_livekit_voice_agent_settings(
environ: dict[str, str] | None = None,
) -> LiveKitVoiceAgentSettings:
env = environ or os.environ
return LiveKitVoiceAgentSettings(
livekit_url=str(env.get("LIVEKIT_URL", "")).strip(),
api_key=str(env.get("LIVEKIT_API_KEY", "")).strip(),
api_secret=str(env.get("LIVEKIT_API_SECRET", "")).strip(),
room_prefix=str(env.get("MARIS_LIVEKIT_ROOM_PREFIX", "maris-voice")).strip()
or "maris-voice",
agent_name=str(env.get("MARIS_LIVEKIT_AGENT_NAME", "maris-livekit-agent")).strip()
or "maris-livekit-agent",
default_language=str(env.get("MARIS_VOICE_LANGUAGE", "lv")).strip() or "lv",
default_voice=str(env.get("MARIS_VOICE_NAME", "maris")).strip() or "maris",
stt_provider=str(env.get("MARIS_VOICE_STT_PROVIDER", "speechmatics")).strip()
or "speechmatics",
llm_provider=str(env.get("MARIS_VOICE_LLM_PROVIDER", "maris-stream")).strip()
or "maris-stream",
tts_provider=str(env.get("MARIS_VOICE_TTS_PROVIDER", "azure")).strip() or "azure",
vad_provider=str(env.get("MARIS_VOICE_VAD_PROVIDER", "silero")).strip() or "silero",
allow_barge_in=str(env.get("MARIS_VOICE_ALLOW_BARGE_IN", "true")).strip().lower()
in {"1", "true", "yes", "on"},
)
def build_livekit_agent_metadata(settings: LiveKitVoiceAgentSettings) -> dict[str, Any]:
payload = asdict(settings)
payload["features"] = {
"streaming_stt": True,
"streaming_llm": True,
"streaming_tts": True,
"vad": True,
"barge_in": settings.allow_barge_in,
}
payload["runtime"] = "livekit-agents"
return payload
def require_livekit_agents_runtime() -> tuple[Any, Any]:
"""Ielādē LiveKit runtime tikai tad, kad tas tiešām vajadzīgs."""
try:
from livekit import agents, rtc # type: ignore
except ImportError as exc: # pragma: no cover - atkarīgs no vides
raise RuntimeError(
"LiveKit Agents runtime nav instalēts. "
"Instalē livekit pakotni un konfigurē LIVEKIT_URL/LIVEKIT_API_KEY/LIVEKIT_API_SECRET."
) from exc
return agents, rtc