import asyncio import base64 import json import sys from typing import Any from livekit import agents, rtc from livekit.agents import AgentServer, AgentSession, Agent, room_io from livekit.agents.types import APIConnectOptions from livekit.agents.telemetry import set_tracer_provider from livekit.agents.voice.agent_session import SessionConnectOptions from livekit.agents.voice.events import ( AgentStateChangedEvent, CloseEvent, ConversationItemAddedEvent, ErrorEvent, MetricsCollectedEvent, SpeechCreatedEvent, UserInputTranscribedEvent, ) from livekit.plugins import noise_cancellation, silero from livekit.plugins.turn_detector.multilingual import MultilingualModel from livekit.plugins import langchain from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from src.agent.graph import create_graph, create_stt from src.agent._langchain_usage_patch import apply_langchain_usage_patch from src.agent.metrics_collector import MetricsCollector from src.plugins.pocket_tts import PocketTTS from src.core.settings import settings from src.core.logger import detach_default_root_handler, logger from livekit.agents.metrics import LLMMetrics, STTMetrics, TTSMetrics, VADMetrics, EOUMetrics, AgentMetrics _langfuse_tracer_provider: TracerProvider | None = None def _normalize_langfuse_host() -> str | None: host = settings.langfuse.LANGFUSE_HOST or settings.langfuse.LANGFUSE_BASE_URL if not host: return None return host.rstrip("/") def _fallback_session_prefix() -> str | None: """Use console-prefixed fallback session id when running `... console`.""" if any(arg == "console" for arg in sys.argv[1:]): return "console" return None def _fallback_participant_prefix() -> str | None: """Use console-prefixed fallback participant id when running `... console`.""" if any(arg == "console" for arg in sys.argv[1:]): return "console" return None def setup_langfuse_tracer() -> TracerProvider | None: """Configure LiveKit telemetry tracer to export traces to Langfuse.""" global _langfuse_tracer_provider if not settings.langfuse.LANGFUSE_ENABLED: return None if _langfuse_tracer_provider is not None: return _langfuse_tracer_provider host = _normalize_langfuse_host() public_key = settings.langfuse.LANGFUSE_PUBLIC_KEY secret_key = settings.langfuse.LANGFUSE_SECRET_KEY if not host or not public_key or not secret_key: logger.warning( "Langfuse tracing enabled but LANGFUSE_HOST/LANGFUSE_PUBLIC_KEY/LANGFUSE_SECRET_KEY are missing" ) return None try: auth = base64.b64encode(f"{public_key}:{secret_key}".encode("utf-8")).decode("utf-8") span_exporter = OTLPSpanExporter( endpoint=f"{host}/api/public/otel/v1/traces", headers={"Authorization": f"Basic {auth}"}, ) tracer_provider = TracerProvider( resource=Resource.create( { SERVICE_NAME: "open-voice-agent", SERVICE_VERSION: getattr(agents, "__version__", "unknown"), "deployment.environment": settings.langfuse.LANGFUSE_ENVIRONMENT ,#"default", } ) ) tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter)) set_tracer_provider(tracer_provider) _langfuse_tracer_provider = tracer_provider logger.info("Langfuse OTEL tracing configured") return tracer_provider except Exception as exc: logger.warning(f"Failed to set up Langfuse tracing: {exc}") return None def _error_type_name(error_obj: Any) -> str: return getattr(error_obj, "type", type(error_obj).__name__) def _error_recoverable(error_obj: Any) -> str: recoverable = getattr(error_obj, "recoverable", None) if recoverable is None: return "unknown" return str(bool(recoverable)).lower() def _error_detail(error_obj: Any) -> str: nested_error = getattr(error_obj, "error", None) if nested_error: return str(nested_error) return str(error_obj) class Assistant(Agent): def __init__( self, metrics_collector: MetricsCollector, *, room_name: str, job_id: str, ) -> None: super().__init__( instructions="""You are a helpful voice AI assistant. You eagerly assist users with their questions by providing information from your extensive knowledge. Your responses are concise, to the point, and without any complex formatting or punctuation including emojis, asterisks, or other symbols. You are curious, friendly, and have a sense of humor.""", ) self._metrics_collector = metrics_collector self._room_name = room_name self._job_id = job_id async def on_enter(self) -> None: """Called when the agent enters the session. Set up metrics listeners.""" def metrics_wrapper(event: MetricsCollectedEvent) -> None: asyncio.create_task( self._metrics_collector.on_metrics_collected(event.metrics) ) def transcript_wrapper(event: UserInputTranscribedEvent) -> None: asyncio.create_task( self._metrics_collector.on_user_input_transcribed( event.transcript, is_final=event.is_final, ) ) def conversation_item_wrapper(event: ConversationItemAddedEvent) -> None: item = event.item role = getattr(item, "role", None) content = getattr(item, "content", None) asyncio.create_task( self._metrics_collector.on_conversation_item_added( role=role, content=content, ) ) def speech_created_wrapper(event: SpeechCreatedEvent) -> None: asyncio.create_task( self._metrics_collector.on_speech_created(event.speech_handle) ) def agent_state_changed_wrapper(event: AgentStateChangedEvent) -> None: asyncio.create_task( self._metrics_collector.on_agent_state_changed( old_state=event.old_state, new_state=event.new_state, ) ) def error_wrapper(event: ErrorEvent) -> None: source = type(event.source).__name__ error_type = _error_type_name(event.error) recoverable = _error_recoverable(event.error) detail = _error_detail(event.error) logger.error( "Agent session pipeline error: room=%s job_id=%s source=%s error_type=%s recoverable=%s detail=%s", self._room_name, self._job_id, source, error_type, recoverable, detail, ) def close_wrapper(event: CloseEvent) -> None: reason = event.reason.value if event.error is None: logger.info( "Agent session closed: room=%s job_id=%s reason=%s", self._room_name, self._job_id, reason, ) return error_type = _error_type_name(event.error) recoverable = _error_recoverable(event.error) detail = _error_detail(event.error) logger.warning( "Agent session closed with error: room=%s job_id=%s reason=%s error_type=%s recoverable=%s detail=%s", self._room_name, self._job_id, reason, error_type, recoverable, detail, ) async def log_metrics_event(metrics: AgentMetrics) -> None: if isinstance(metrics, VADMetrics): logger.info("\n--- VAD Metrics ---") logger.info("Idle Time: %.4fs", metrics.idle_time) logger.info("Inference Count: %s", metrics.inference_count) logger.info("Inference Duration Total: %.4fs", metrics.inference_duration_total) logger.info("--------------------------------\n") return if isinstance(metrics, EOUMetrics): decision_overhead = max( metrics.end_of_utterance_delay - metrics.transcription_delay, 0.0, ) logger.info("\n--- End of Utterance Metrics ---") logger.info("Speech ID: %s", metrics.speech_id or "n/a") logger.info("User Turn Completed Delay: %.4fs", metrics.on_user_turn_completed_delay) logger.info("End of Utterance Delay: %.4fs", metrics.end_of_utterance_delay) logger.info("Transcription Delay: %.4fs", metrics.transcription_delay) logger.info( "Decision Overhead (EOU - STT Finalization): %.4fs", decision_overhead, ) logger.info("--------------------------------\n") return if isinstance(metrics, STTMetrics): logger.info("\n--- STT Metrics ---") logger.info("Request ID: %s", metrics.request_id) logger.info("Duration: %.4fs", metrics.duration) logger.info("Audio Duration: %.4fs", metrics.audio_duration) logger.info("Streamed: %s", "Yes" if metrics.streamed else "No") logger.info("------------------\n") return if isinstance(metrics, LLMMetrics): logger.info("\n--- LLM Metrics ---") logger.info("Speech ID: %s", metrics.speech_id or "n/a") logger.info("Request ID: %s", metrics.request_id) logger.info("Prompt Tokens: %s", metrics.prompt_tokens) logger.info("Completion Tokens: %s", metrics.completion_tokens) logger.info("Tokens per second: %.4f", metrics.tokens_per_second) logger.info("TTFT: %.4fs", metrics.ttft) logger.info("------------------\n") return if isinstance(metrics, TTSMetrics): logger.info("\n--- TTS Metrics ---") logger.info("Speech ID: %s", metrics.speech_id or "n/a") logger.info("Request ID: %s", metrics.request_id) logger.info("TTFB: %.4fs", metrics.ttfb) logger.info("Duration: %.4fs", metrics.duration) logger.info("Audio Duration: %.4fs", metrics.audio_duration) logger.info("Streamed: %s", "Yes" if metrics.streamed else "No") logger.info("------------------\n") return metric_type = getattr(metrics, "type", type(metrics).__name__) logger.info("Unhandled metrics event type=%s payload=%s", metric_type, metrics) # def metrics_log_wrapper(event: MetricsCollectedEvent) -> None: # asyncio.create_task(log_metrics_event(event.metrics)) self.session.on("metrics_collected", metrics_wrapper) #self.session.on("metrics_collected", metrics_log_wrapper) self.session.on("user_input_transcribed", transcript_wrapper) self.session.on("conversation_item_added", conversation_item_wrapper) self.session.on("speech_created", speech_created_wrapper) self.session.on("agent_state_changed", agent_state_changed_wrapper) self.session.on("error", error_wrapper) self.session.on("close", close_wrapper) server = AgentServer( num_idle_processes=settings.livekit.LIVEKIT_NUM_IDLE_PROCESSES, job_memory_warn_mb=settings.livekit.LIVEKIT_JOB_MEMORY_WARN_MB, ) @server.rtc_session(agent_name=settings.livekit.LIVEKIT_AGENT_NAME) async def session_handler(ctx: agents.JobContext) -> None: logger.info( "Agent session started: room=%s job_id=%s", ctx.room.name, ctx.job.id, ) trace_provider = setup_langfuse_tracer() apply_langchain_usage_patch() if trace_provider: async def flush_trace(_: str) -> None: try: trace_provider.force_flush() except Exception as exc: logger.warning(f"Failed to flush Langfuse traces: {exc}") ctx.add_shutdown_callback(flush_trace) # Create metrics collector participant = getattr(ctx.job, "participant", None) initial_participant_id = getattr(participant, "identity", None) room_info = getattr(ctx.job, "room", None) initial_room_id = getattr(room_info, "sid", None) or ctx.room.name metrics_collector = MetricsCollector( room=ctx.room, model_name=( settings.stt.MOONSHINE_MODEL_ID if settings.stt.STT_PROVIDER == "moonshine" else settings.stt.NVIDIA_STT_MODEL ), room_name=ctx.room.name, room_id=initial_room_id, participant_id=initial_participant_id, fallback_session_prefix=_fallback_session_prefix(), fallback_participant_prefix=_fallback_participant_prefix(), langfuse_enabled=trace_provider is not None, ) if isinstance(ctx.job.metadata, str) and ctx.job.metadata.strip(): try: metadata = json.loads(ctx.job.metadata) except Exception: metadata = {} logger.info( "Session metadata received from dispatch: session_id=%s participant_id=%s room=%s", metadata.get("session_id"), metadata.get("participant_id"), ctx.room.name, ) asyncio.create_task( metrics_collector.on_session_metadata( session_id=metadata.get("session_id"), participant_id=metadata.get("participant_id"), ) ) tts_engine = PocketTTS( voice=settings.voice.POCKET_TTS_VOICE, temperature=settings.voice.POCKET_TTS_TEMPERATURE, lsd_decode_steps=settings.voice.POCKET_TTS_LSD_DECODE_STEPS, ) llm_conn_options = APIConnectOptions( max_retry=settings.llm.LLM_CONN_MAX_RETRY, retry_interval=settings.llm.LLM_CONN_RETRY_INTERVAL_SEC, timeout=settings.llm.LLM_CONN_TIMEOUT_SEC, ) session_conn_options = SessionConnectOptions(llm_conn_options=llm_conn_options) session = AgentSession( stt=create_stt(), llm=langchain.LLMAdapter(create_graph()), tts=tts_engine, vad=silero.VAD.load( min_speech_duration=settings.voice.VAD_MIN_SPEECH_DURATION, min_silence_duration=settings.voice.VAD_MIN_SILENCE_DURATION, activation_threshold=settings.voice.VAD_THRESHOLD, ), turn_detection=MultilingualModel(), min_endpointing_delay=settings.voice.MIN_ENDPOINTING_DELAY, max_endpointing_delay=settings.voice.MAX_ENDPOINTING_DELAY, preemptive_generation=settings.voice.PREEMPTIVE_GENERATION, conn_options=session_conn_options, ) await session.start( room=ctx.room, record=False, agent=Assistant( metrics_collector=metrics_collector, room_name=ctx.room.name, job_id=ctx.job.id, ), room_options=room_io.RoomOptions( audio_input=room_io.AudioInputOptions( sample_rate=settings.voice.LIVEKIT_SAMPLE_RATE, num_channels=settings.voice.LIVEKIT_NUM_CHANNELS, frame_size_ms=settings.voice.LIVEKIT_FRAME_SIZE_MS, pre_connect_audio=settings.voice.LIVEKIT_PRE_CONNECT_AUDIO, pre_connect_audio_timeout=settings.voice.LIVEKIT_PRE_CONNECT_TIMEOUT, noise_cancellation=lambda params: noise_cancellation.BVCTelephony() if params.participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP else noise_cancellation.BVC(), ), ), ) await session.generate_reply(instructions="Greet the user and offer your assistance.") if __name__ == "__main__": detach_default_root_handler() agents.cli.run_app(server)