"""LiveKit server and session handler wiring.""" from __future__ import annotations import asyncio import json import sys from typing import Any from livekit import agents, rtc from livekit.agents import AgentServer, AgentSession, room_io from livekit.agents.types import APIConnectOptions from livekit.agents.voice.agent_session import SessionConnectOptions from livekit.plugins import noise_cancellation, silero from livekit.plugins.turn_detector.english import EnglishModel from src.agent.models.llm_runtime import ( build_llm_runtime, install_mcp_generate_reply_guard, run_startup_greeting, ) from src.agent.models.tts_factory import create_tts from src.agent.runtime.connect_options import build_api_connect_options from src.agent.models.stt_factory import create_stt from src.agent.runtime.assistant import Assistant from src.agent.runtime.tasks import ( cancel_task_for_shutdown, run_llm_warmup, schedule_startup_greeting_task, ) from src.agent.tools.feedback import ToolFeedbackController from src.agent.traces.langfuse import setup_langfuse_tracer from src.agent.traces.metrics_collector import MetricsCollector from src.agent.traces.text_output_tracing import install_tracing_text_output from src.core.logger import logger from src.core.settings import settings def _build_server() -> AgentServer: return AgentServer( num_idle_processes=settings.livekit.LIVEKIT_NUM_IDLE_PROCESSES, job_memory_warn_mb=settings.livekit.LIVEKIT_JOB_MEMORY_WARN_MB, initialize_process_timeout=( settings.livekit.LIVEKIT_INITIALIZE_PROCESS_TIMEOUT_SEC ), ) server = _build_server() def fallback_session_prefix() -> str | None: """Use console-prefixed fallback session id when running `... console`.""" if any(arg == "console" for arg in sys.argv[1:]): return "console" return None def fallback_participant_prefix() -> str | None: """Use console-prefixed fallback participant id when running `... console`.""" if any(arg == "console" for arg in sys.argv[1:]): return "console" return None def _resolve_stt_metrics_model_name() -> str: provider = settings.stt.STT_PROVIDER.lower() if provider == "moonshine": return settings.stt.MOONSHINE_MODEL_ID if provider == "deepgram": return settings.stt.DEEPGRAM_STT_MODEL return settings.stt.NVIDIA_STT_MODEL def _resolve_stt_language() -> str: provider = settings.stt.STT_PROVIDER.lower() if provider == "moonshine": return settings.stt.MOONSHINE_LANGUAGE if provider == "deepgram": return settings.stt.DEEPGRAM_STT_LANGUAGE return settings.stt.NVIDIA_STT_LANGUAGE_CODE def _build_session_connect_options() -> tuple[APIConnectOptions, SessionConnectOptions]: llm_conn_options = build_api_connect_options( max_retry=settings.llm.LLM_CONN_MAX_RETRY, retry_interval_sec=settings.llm.LLM_CONN_RETRY_INTERVAL_SEC, timeout_sec=settings.llm.LLM_CONN_TIMEOUT_SEC, ) tts_conn_options = build_api_connect_options( max_retry=settings.llm.LLM_CONN_MAX_RETRY, retry_interval_sec=settings.llm.LLM_CONN_RETRY_INTERVAL_SEC, timeout_sec=settings.voice.POCKET_TTS_CONN_TIMEOUT_SEC, ) session_conn_options = SessionConnectOptions( llm_conn_options=llm_conn_options, tts_conn_options=tts_conn_options, ) return llm_conn_options, session_conn_options @server.rtc_session(agent_name=settings.livekit.LIVEKIT_AGENT_NAME) async def session_handler(ctx: agents.JobContext) -> None: logger.info( "Agent session started: room=%s job_id=%s", ctx.room.name, ctx.job.id, ) trace_provider = setup_langfuse_tracer() startup_greeting_task: asyncio.Task[Any] | None = None tool_feedback = ToolFeedbackController(enabled=False) async def cancel_startup_greeting(_: str) -> None: await cancel_task_for_shutdown( startup_greeting_task, task_name="startup greeting", ) ctx.add_shutdown_callback(cancel_startup_greeting) async def close_tool_feedback(_: str) -> None: await tool_feedback.aclose() ctx.add_shutdown_callback(close_tool_feedback) participant = getattr(ctx.job, "participant", None) initial_participant_id = getattr(participant, "identity", None) room_info = getattr(ctx.job, "room", None) initial_room_id = getattr(room_info, "sid", None) or ctx.room.name metrics_collector = MetricsCollector( room=ctx.room, model_name=_resolve_stt_metrics_model_name(), room_name=ctx.room.name, room_id=initial_room_id, participant_id=initial_participant_id, fallback_session_prefix=fallback_session_prefix(), fallback_participant_prefix=fallback_participant_prefix(), langfuse_enabled=trace_provider is not None, ) async def drain_pending_traces(_: str) -> None: try: await metrics_collector.drain_pending_traces() except TimeoutError: logger.warning("Timed out while draining pending Langfuse traces during shutdown") except Exception as exc: logger.warning(f"Failed to drain pending Langfuse traces: {exc}") if trace_provider is None: return try: trace_provider.force_flush() except Exception as exc: logger.warning(f"Failed to flush Langfuse traces: {exc}") ctx.add_shutdown_callback(drain_pending_traces) if isinstance(ctx.job.metadata, str) and ctx.job.metadata.strip(): try: metadata = json.loads(ctx.job.metadata) except Exception: metadata = {} logger.info( "Session metadata received from dispatch: session_id=%s participant_id=%s room=%s", metadata.get("session_id"), metadata.get("participant_id"), ctx.room.name, ) await metrics_collector.on_session_metadata( session_id=metadata.get("session_id"), participant_id=metadata.get("participant_id"), ) tts_engine = create_tts() llm_conn_options, session_conn_options = _build_session_connect_options() llm_runtime = build_llm_runtime(settings.llm) mcp_runtime_active = llm_runtime.mcp_runtime_active tool_feedback = ToolFeedbackController(enabled=mcp_runtime_active) logger.info( "Running LLM warm-up before session start: provider=%s model=%s", llm_runtime.provider, llm_runtime.model, ) await run_llm_warmup( llm_client=llm_runtime.llm, conn_options=llm_conn_options, provider=llm_runtime.provider, model=llm_runtime.model, ) stt_engine = create_stt() logger.info( "Turn profile: detector=%s stt_provider=%s stt_model=%s stt_language=%s vad_min_silence=%.2fs min_endpointing=%.2fs max_endpointing=%.2fs preemptive_generation=%s", "EnglishModel", settings.stt.STT_PROVIDER, _resolve_stt_metrics_model_name(), _resolve_stt_language(), settings.voice.VAD_MIN_SILENCE_DURATION, settings.voice.MIN_ENDPOINTING_DELAY, settings.voice.MAX_ENDPOINTING_DELAY, settings.voice.PREEMPTIVE_GENERATION, ) session_kwargs: dict[str, Any] = dict( stt=stt_engine, llm=llm_runtime.llm, tts=tts_engine, vad=silero.VAD.load( min_speech_duration=settings.voice.VAD_MIN_SPEECH_DURATION, min_silence_duration=settings.voice.VAD_MIN_SILENCE_DURATION, activation_threshold=settings.voice.VAD_THRESHOLD, ), turn_detection=EnglishModel(), min_endpointing_delay=settings.voice.MIN_ENDPOINTING_DELAY, max_endpointing_delay=settings.voice.MAX_ENDPOINTING_DELAY, max_tool_steps=8, preemptive_generation=settings.voice.PREEMPTIVE_GENERATION, conn_options=session_conn_options, ) if llm_runtime.mcp_servers is not None: session_kwargs["mcp_servers"] = llm_runtime.mcp_servers session = AgentSession(**session_kwargs) install_mcp_generate_reply_guard(session, mcp_runtime_active=mcp_runtime_active) await session.start( room=ctx.room, record=False, agent=Assistant( metrics_collector=metrics_collector, room_name=ctx.room.name, job_id=ctx.job.id, tool_feedback=tool_feedback, ), room_options=room_io.RoomOptions( audio_input=room_io.AudioInputOptions( sample_rate=settings.voice.LIVEKIT_SAMPLE_RATE, num_channels=settings.voice.LIVEKIT_NUM_CHANNELS, frame_size_ms=settings.voice.LIVEKIT_FRAME_SIZE_MS, pre_connect_audio=settings.voice.LIVEKIT_PRE_CONNECT_AUDIO, pre_connect_audio_timeout=settings.voice.LIVEKIT_PRE_CONNECT_TIMEOUT, noise_cancellation=lambda params: noise_cancellation.BVCTelephony() if params.participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP else noise_cancellation.BVC(), ), ), ) if all( hasattr(metrics_collector, attr_name) for attr_name in ( "submit_streamed_assistant_text_delta", "submit_streamed_assistant_text_flush", "submit_streamed_assistant_text_context_missing", ) ): install_tracing_text_output( session=session, on_delta=metrics_collector.submit_streamed_assistant_text_delta, on_flush=metrics_collector.submit_streamed_assistant_text_flush, on_context_missing=metrics_collector.submit_streamed_assistant_text_context_missing, ) await tool_feedback.start(room=ctx.room, session=session) if mcp_runtime_active: startup_greeting_task = schedule_startup_greeting_task( session, mcp_runtime_active=mcp_runtime_active, timeout_sec=settings.llm.MCP_STARTUP_GREETING_TIMEOUT_SEC, ) else: run_startup_greeting(session, mcp_runtime_active=mcp_runtime_active)