from __future__ import annotations import asyncio import json import os import random import re import sys import time from dataclasses import dataclass, field from pathlib import Path from typing import Literal from uuid import uuid4 import numpy as np from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from app.audio import chunk_audio, frame_duration_ms, pcm16_bytes_to_float32, peak, resample_audio, rms, trim_silence from app.config import settings from app.models import UtteranceState from app.speaker import SpeakerProfile, evaluate_speaker_focus from app.speech import pipeline from app.vad import VadStream BASE_DIR = Path(__file__).resolve().parent.parent STATIC_DIR = BASE_DIR / settings.static_dir app = FastAPI(title="Voice Latency Lab") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") @dataclass class SessionRuntime: session_id: str send_lock: asyncio.Lock response_generation: int = 0 utterance_task: asyncio.Task[None] | None = None response_task: asyncio.Task[None] | None = None response_watchdog_task: asyncio.Task[None] | None = None partial_transcript_task: asyncio.Task[None] | None = None notification_task: asyncio.Task[None] | None = None backchannel_task: asyncio.Task[None] | None = None notification_queue: asyncio.Queue["AssistantNotification"] | None = None agent_session_id: str | None = None last_progress_voice_at: float = 0.0 last_assistant_reply: str = "" active_response_transcript: str = "" active_response_is_partial: bool = False backchannel_audio_until: float = 0.0 last_speech_end_at: float = 0.0 last_transcript_final_at: float = 0.0 last_llm_first_sentence_at: float = 0.0 last_tts_first_chunk_ready_at: float = 0.0 last_assistant_text_at: float = 0.0 last_first_audio_at: float = 0.0 speaker_profile: SpeakerProfile | None = None conversation_history: list[dict[str, str]] = field(default_factory=list) last_transcript_language: str | None = None @dataclass class AssistantNotification: text: str event_type: Literal["assistant.status", "assistant.notification"] speak: bool = True @dataclass(frozen=True) class EndpointDecision: stop_ms: float complete: bool incomplete: bool question_like: bool @dataclass class SessionConnection: websocket: WebSocket state: UtteranceState runtime: SessionRuntime voice_prompt_path: str | None = None class AgentNotificationRequest(BaseModel): text: str session_id: str | None = None event_type: Literal["assistant.status", "assistant.notification"] = "assistant.notification" speak: bool = True ACTIVE_SESSIONS: dict[str, SessionConnection] = {} ACTIVE_SESSIONS_LOCK = asyncio.Lock() ANSI_ESCAPE_PATTERN = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") FENCED_CODE_PATTERN = re.compile(r"```.*?```", flags=re.S) INLINE_CODE_PATTERN = re.compile(r"`([^`]+)`") MARKDOWN_LINK_PATTERN = re.compile(r"\[([^\]]+)\]\([^)]+\)") TABLE_ROW_PATTERN = re.compile(r"^\s*[|│].*[|│]\s*$") LIST_PREFIX_PATTERN = re.compile(r"^\s*(?:[-*+]\s+|\d+\.\s+)") RAWISH_LINE_PATTERN = re.compile( r"(```|^\s*[{[]|[}\]]\s*$|::|=>|&&|\|\||" r"/[-\w./]{3,}|\\[-\w.\\]{3,}|" # absolute paths r"\b\w+/\w+[-\w./]*\.\w{1,5}\b|" # relative paths like src/foo.rs r"\$\w+|" # shell variables r"\b\w+=\S+)" # bare key=value assignments ) NON_SPOKEN_CHARS_PATTERN = re.compile(r"[{}[\]<>`|$#~^]") QUESTION_STARTER_PATTERN = re.compile( r"^\s*(?:what|how|why|when|where|who|which|can|could|would|should|do|does|did|is|are|am|will|tell me)\b", flags=re.IGNORECASE, ) def _parakeet_bypass_silero_vad() -> bool: return settings.stt_backend == "parakeet-tdt-v3" and settings.parakeet_bypass_silero_vad def _normalize_compare_text(text: str) -> str: cleaned = re.sub(r"^(?:you|user|agent|assistant)\s*:\s*", "", (text or "").strip(), flags=re.IGNORECASE) cleaned = re.sub(r"[^\w\s]", " ", cleaned) cleaned = re.sub(r"\s+", " ", cleaned) return cleaned.strip().lower() def _looks_like_assistant_echo(runtime: SessionRuntime, transcript: str) -> bool: if runtime.last_assistant_text_at == 0.0: return False if (time.monotonic() - runtime.last_assistant_text_at) > 3.0: return False transcript_text = _normalize_compare_text(transcript) assistant_text = _normalize_compare_text(runtime.last_assistant_reply) if not transcript_text or not assistant_text: return False if transcript_text == assistant_text: return True if transcript_text.startswith(assistant_text) or assistant_text.startswith(transcript_text): return True transcript_words = transcript_text.split() if len(transcript_words) >= 4 and transcript_text in assistant_text: return True return False @app.on_event("startup") async def preload_models() -> None: if not settings.preload_models: return print("Startup preload: loading STT/TTS/assistant models", file=sys.stderr) await asyncio.to_thread(pipeline.preload_models) await pipeline.preload_assistant() print("Startup preload: complete", file=sys.stderr) @app.get("/") async def root() -> FileResponse: return FileResponse(STATIC_DIR / "index.html") @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok"} @app.get("/sessions") async def list_sessions() -> dict[str, list[str]]: async with ACTIVE_SESSIONS_LOCK: session_ids = sorted(ACTIVE_SESSIONS.keys()) return {"sessionIds": session_ids} @app.post("/agent/notify") async def agent_notify(payload: AgentNotificationRequest) -> dict[str, object]: targets = await _resolve_target_sessions(payload.session_id) if not targets: return {"ok": False, "error": "no_active_session"} delivered = 0 for connection in targets: await _queue_notification( connection, AssistantNotification( text=payload.text, event_type=payload.event_type, speak=payload.speak, ), ) delivered += 1 return {"ok": True, "delivered": delivered, "sessionIds": [connection.runtime.session_id for connection in targets]} @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket) -> None: await websocket.accept() state = UtteranceState(sample_rate=settings.sample_rate) runtime = SessionRuntime( session_id=uuid4().hex[:8], send_lock=asyncio.Lock(), notification_queue=asyncio.Queue(), speaker_profile=SpeakerProfile(), ) vad = VadStream() preroll_samples = max(1, int((settings.preroll_ms / 1000.0) * state.sample_rate)) voice_prompt_path: str | None = None connection = SessionConnection(websocket=websocket, state=state, runtime=runtime) await _register_session(connection) runtime.notification_task = asyncio.create_task(_notification_worker(connection)) await _send_json( websocket, runtime, { "type": "session.ready", "sessionId": runtime.session_id, "sampleRate": settings.sample_rate, "assistantBackend": pipeline.assistant_backend_metadata()[0], "assistantModel": pipeline.assistant_backend_metadata()[1], "vad": { "provider": "rms-parakeet" if _parakeet_bypass_silero_vad() else vad.provider_name, "threshold": settings.silero_vad_threshold, "startMs": settings.parakeet_vad_start_ms if _parakeet_bypass_silero_vad() else settings.silero_vad_start_ms, "stopMs": settings.parakeet_vad_stop_ms if _parakeet_bypass_silero_vad() else settings.vad_stop_ms, }, } ) try: while True: try: message = await websocket.receive() except RuntimeError as exc: if "disconnect message has been received" in str(exc): break raise if message.get("type") == "websocket.disconnect": break if "bytes" in message and message["bytes"] is not None: frame = pcm16_bytes_to_float32(message["bytes"]) await _handle_frame(websocket, state, runtime, vad, frame, preroll_samples, voice_prompt_path) continue text = message.get("text") if not text: continue payload = json.loads(text) event_type = payload.get("type") if event_type == "session.configure": voice_prompt_path = payload.get("voicePromptPath") or None connection.voice_prompt_path = voice_prompt_path client_sample_rate = int(payload.get("clientSampleRate") or settings.sample_rate) state.sample_rate = client_sample_rate preroll_samples = max(1, int((settings.preroll_ms / 1000.0) * state.sample_rate)) state.reset_input() state.interrupt_assistant() vad.reset() await _interrupt_response(websocket, state, runtime, notify_client=False) await _send_json( websocket, runtime, { "type": "session.configured", "voicePromptPath": voice_prompt_path, "clientSampleRate": client_sample_rate, "serverSampleRate": settings.sample_rate, } ) elif event_type == "input.commit": await _finalize_utterance(websocket, state, runtime, voice_prompt_path) elif event_type == "ping": if not await _send_json(websocket, runtime, {"type": "pong", "ts": payload.get("ts")}): raise WebSocketDisconnect except WebSocketDisconnect: return finally: await _interrupt_response(websocket, state, runtime, notify_client=False) await _cancel_task(runtime.backchannel_task) await _cancel_task(runtime.notification_task) await _unregister_session(runtime.session_id) async def _handle_frame( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, vad: VadStream, frame: np.ndarray, preroll_samples: int, voice_prompt_path: str | None, ) -> None: raw_level = rms(frame) if not await _send_json(websocket, runtime, {"type": "input.level", "value": round(raw_level, 5)}): raise WebSocketDisconnect duration_ms = frame_duration_ms(frame, state.sample_rate) if _parakeet_bypass_silero_vad(): effective_level = vad.activity_level(frame, state.sample_rate) speech_active = effective_level >= settings.parakeet_vad_rms_threshold if effective_level >= settings.parakeet_vad_rms_threshold * 0.5: print( "VAD speech=" f"{int(speech_active)} active={int(speech_active)} prob={float(speech_active):.3f} " f"level={raw_level:.5f} vad_level={effective_level:.5f} mode=rms-parakeet", file=sys.stderr, ) barge_probability = min( 1.0, max(0.0, effective_level / max(settings.parakeet_barge_in_rms_threshold, 1e-6)), ) barge_in_rms_threshold = max( settings.parakeet_barge_in_rms_threshold, settings.assistant_barge_in_min_rms, ) barge_in_start_ms = max( settings.parakeet_barge_in_start_ms, settings.assistant_barge_in_start_ms, ) speech_start_ms = settings.parakeet_vad_start_ms stop_ms_when_barge = min(settings.parakeet_vad_stop_ms, 450) else: vad_result = vad.process(frame, state.sample_rate) effective_level = vad_result.level speech_active = vad_result.speech and ( effective_level >= settings.silero_vad_min_rms or vad_result.probability >= settings.silero_vad_strong_threshold ) if vad_result.probability >= 0.05: print( "VAD speech=" f"{int(vad_result.speech)} active={int(speech_active)} prob={vad_result.probability:.3f} " f"level={raw_level:.5f} vad_level={effective_level:.5f}", file=sys.stderr, ) barge_probability = vad_result.probability barge_in_rms_threshold = settings.assistant_barge_in_min_rms barge_in_start_ms = settings.assistant_barge_in_start_ms speech_start_ms = settings.silero_vad_start_ms stop_ms_when_barge = min(settings.vad_stop_ms, 450) state.push_preroll(frame, preroll_samples) if state.should_ignore_input(): # Allow real barge-in once the assistant playback window has passed and the # user is speaking with enough confidence. Otherwise keep discarding input. barge_candidate = ( speech_active and state.can_barge_in() and effective_level >= barge_in_rms_threshold and barge_probability >= settings.assistant_barge_in_prob_threshold ) if barge_candidate: state.pending_barge_ms += duration_ms if state.pending_barge_ms >= barge_in_start_ms: await _interrupt_response(websocket, state, runtime, notify_client=True) state.clear_active_input(preserve_preroll=True) state.barge_in_active = True state.pending_speech_ms = settings.silero_vad_start_ms state.start() state.append(frame) state.turn_speech_ms += duration_ms state.active_speech_ms += duration_ms print("VAD barge-in start", file=sys.stderr) if not await _send_json(websocket, runtime, {"type": "input.speech_start", "ts": time.time()}): raise WebSocketDisconnect return await _clear_partial_transcript(websocket, state, runtime) state.pending_barge_ms = 0.0 state.clear_active_input(preserve_preroll=True) return if not state.in_speech: if speech_active: state.pending_speech_ms += duration_ms if state.pending_speech_ms >= speech_start_ms: state.start() print("VAD turn start", file=sys.stderr) if not await _send_json(websocket, runtime, {"type": "input.speech_start", "ts": time.time()}): raise WebSocketDisconnect else: state.pending_speech_ms = 0.0 if state.in_speech: state.append(frame) await _maybe_emit_partial_transcript(websocket, state, runtime, input_sample_rate=state.sample_rate) await _maybe_start_partial_response(websocket, state, runtime, voice_prompt_path) if not speech_active: state.silence_ms += duration_ms state.active_speech_ms = 0.0 if state.barge_in_active and state.silence_ms >= stop_ms_when_barge: print( "VAD barge-in end " f"silence_ms={state.silence_ms:.1f} " f"target_ms={stop_ms_when_barge:.1f}", file=sys.stderr, ) await _finalize_utterance(websocket, state, runtime, voice_prompt_path) return if _parakeet_bypass_silero_vad(): endpoint = EndpointDecision( stop_ms=float(settings.parakeet_vad_stop_ms), complete=False, incomplete=False, question_like=False, ) else: endpoint = _dynamic_endpoint_decision(state) state.dynamic_endpoint_target_ms = endpoint.stop_ms if state.silence_ms >= endpoint.stop_ms: print( "VAD turn end " f"silence_ms={state.silence_ms:.1f} " f"target_ms={endpoint.stop_ms:.1f} " f"complete={int(endpoint.complete)} " f"incomplete={int(endpoint.incomplete)}", file=sys.stderr, ) await _finalize_utterance(websocket, state, runtime, voice_prompt_path) else: state.silence_ms = 0.0 state.dynamic_endpoint_target_ms = 0.0 state.turn_speech_ms += duration_ms state.active_speech_ms += duration_ms await _maybe_trigger_backchannel(websocket, state, runtime, voice_prompt_path) if await _maybe_force_finalize_turn(websocket, state, runtime, voice_prompt_path): return async def _maybe_force_finalize_turn( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, voice_prompt_path: str | None, ) -> bool: if not state.in_speech or not state.turn_started_at: return False now = time.monotonic() turn_elapsed_ms = (now - state.turn_started_at) * 1000.0 stale_partial_ms = ( (now - state.last_partial_transcript_change_at) * 1000.0 if state.last_partial_transcript_change_at else 0.0 ) has_stable_partial = bool(state.last_partial_transcript_text.strip()) and stale_partial_ms >= settings.dynamic_endpointing_stale_partial_ms hit_max_turn = turn_elapsed_ms >= settings.dynamic_endpointing_max_turn_ms if not has_stable_partial and not hit_max_turn: return False reason = "max_turn" if hit_max_turn else "stale_partial" print( "VAD forced turn end " f"reason={reason} " f"turn_ms={turn_elapsed_ms:.1f} " f"stale_partial_ms={stale_partial_ms:.1f} " f"partial={state.last_partial_transcript_text!r}", file=sys.stderr, ) await _finalize_utterance(websocket, state, runtime, voice_prompt_path) return True async def _finalize_utterance( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, voice_prompt_path: str | None, ) -> None: input_sample_rate = state.sample_rate await _cancel_task(runtime.partial_transcript_task) runtime.partial_transcript_task = None was_barge_in = state.barge_in_active final_transcript_hint = state.last_partial_transcript_text.strip() audio = state.finish() if audio.size == 0: return await _cancel_task(runtime.backchannel_task) runtime.backchannel_task = None runtime.last_speech_end_at = time.monotonic() runtime.last_transcript_final_at = 0.0 runtime.last_llm_first_sentence_at = 0.0 runtime.last_tts_first_chunk_ready_at = 0.0 runtime.last_assistant_text_at = 0.0 runtime.last_first_audio_at = 0.0 if not await _send_json(websocket, runtime, {"type": "input.speech_end", "samples": int(audio.size)}): raise WebSocketDisconnect await _send_json(websocket, runtime, {"type": "transcript.partial", "text": ""}) generation = _next_generation(runtime) runtime.utterance_task = asyncio.create_task( _process_utterance( websocket, state, runtime, audio, input_sample_rate, voice_prompt_path, generation, was_barge_in, final_transcript_hint, ) ) async def _process_utterance( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, audio: np.ndarray, input_sample_rate: int, voice_prompt_path: str | None, generation: int, was_barge_in: bool = False, final_transcript_hint: str = "", ) -> None: try: runtime.last_assistant_reply = "" transcription_audio = resample_audio(audio, input_sample_rate, settings.sample_rate) keep_edge_ms = settings.short_utterance_keep_edge_ms if _is_short_utterance(transcription_audio) else 0 transcription_audio = trim_silence( transcription_audio, settings.sample_rate, settings.transcription_trim_threshold, keep_edge_ms=keep_edge_ms, ) if transcription_audio.size == 0: await _send_empty_reply(websocket, runtime, generation) return audio_rms = rms(transcription_audio) audio_peak = peak(transcription_audio) print( f"STT utterance stats rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", file=sys.stderr, ) if _should_skip_transcription(transcription_audio, audio_rms, audio_peak, was_barge_in=was_barge_in): await _send_empty_reply(websocket, runtime, generation) return speaker_decision = evaluate_speaker_focus( transcription_audio, settings.sample_rate, profile=runtime.speaker_profile or SpeakerProfile(), enabled=settings.speaker_focus_enabled, min_utterance_ms=settings.speaker_focus_min_utterance_ms, min_rms=settings.speaker_focus_min_rms, similarity_threshold=settings.speaker_focus_similarity_threshold, profile_alpha=settings.speaker_focus_profile_alpha, multi_speaker_threshold=settings.speaker_focus_multi_speaker_threshold, reject_mixed=settings.speaker_focus_reject_mixed, ) if settings.speaker_focus_debug: similarity_text = ( f"{speaker_decision.similarity:.3f}" if speaker_decision.similarity is not None else "n/a" ) print( "SPEAKER focus " f"reason={speaker_decision.reason} " f"process={int(speaker_decision.should_process)} " f"enrolled={int(speaker_decision.enrolled)} " f"updated={int(speaker_decision.updated)} " f"mixed={int(speaker_decision.mixed_speaker)} " f"similarity={similarity_text} " f"profile_updates={speaker_decision.profile_updates}", file=sys.stderr, ) if not speaker_decision.should_process: print( "SPEAKER filtered utterance " f"reason={speaker_decision.reason} " f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", file=sys.stderr, ) await _send_empty_reply(websocket, runtime, generation) return transcript_result = await pipeline.transcribe(transcription_audio) transcript = transcript_result.text runtime.last_transcript_language = transcript_result.language if transcript_result.language: probability_text = ( f"{transcript_result.language_probability:.3f}" if transcript_result.language_probability is not None else "n/a" ) print( "STT language " f"backend={transcript_result.backend or settings.stt_backend} " f"language={transcript_result.language} " f"probability={probability_text}", file=sys.stderr, ) if not transcript.strip() and audio_rms >= settings.transcription_min_rms and audio_peak >= settings.transcription_min_peak: print( "STT final transcript empty, retrying permissive fallback " f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", file=sys.stderr, ) fallback_result = await pipeline.transcribe_fallback(transcription_audio) transcript = fallback_result.text if fallback_result.language: runtime.last_transcript_language = fallback_result.language if pipeline.is_likely_hallucination(transcript, audio_rms): print(f"STT hallucination filtered transcript={transcript!r} rms={audio_rms:.5f}", file=sys.stderr) transcript = "" if not transcript.strip() and final_transcript_hint: hint_words = len(re.findall(r"\w+", final_transcript_hint)) if was_barge_in or hint_words >= 2: print( f"STT using partial transcript fallback hint={final_transcript_hint!r}", file=sys.stderr, ) transcript = final_transcript_hint if transcript.strip() and _looks_like_assistant_echo(runtime, transcript): print(f"STT playback echo filtered transcript={transcript!r}", file=sys.stderr) transcript = "" print(f"STT transcript={transcript!r}", file=sys.stderr) if transcript.strip(): print(f"USER transcript={transcript.strip()}", file=sys.stderr) _mirror_cli_turn("You", transcript.strip()) if not _generation_is_current(runtime, generation): return runtime.last_transcript_final_at = time.monotonic() if not await _send_json(websocket, runtime, {"type": "transcript.final", "text": transcript}): raise WebSocketDisconnect if not transcript.strip(): print( "STT empty transcript drop " f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", file=sys.stderr, ) await _interrupt_active_assistant(websocket, state, runtime, notify_client=True) if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": ""}): raise WebSocketDisconnect return active_transcript = runtime.active_response_transcript.strip() if ( runtime.response_task is not None and runtime.active_response_is_partial and active_transcript and active_transcript == transcript.strip() ): runtime.active_response_is_partial = False return interrupted_generation = await _interrupt_active_assistant(websocket, state, runtime, notify_client=True) if interrupted_generation is not None: response_generation = interrupted_generation else: if not _generation_is_current(runtime, generation): return response_generation = _next_generation(runtime) _start_response_task( websocket, state, runtime, transcript.strip(), runtime.last_transcript_language, voice_prompt_path, response_generation, is_partial=False, ) return except asyncio.CancelledError: return finally: current_task = asyncio.current_task() if runtime.utterance_task is current_task: runtime.utterance_task = None async def _send_empty_reply(websocket: WebSocket, runtime: SessionRuntime, generation: int) -> None: if not _generation_is_current(runtime, generation): return if not await _send_json(websocket, runtime, {"type": "transcript.final", "text": ""}): raise WebSocketDisconnect if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": ""}): raise WebSocketDisconnect def _start_response_task( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, transcript: str, transcript_language: str | None, voice_prompt_path: str | None, generation: int, *, is_partial: bool, retry_count: int = 0, ) -> None: runtime.active_response_transcript = transcript runtime.active_response_is_partial = is_partial runtime.response_task = asyncio.create_task( _respond_to_transcript( websocket, state, runtime, transcript, transcript_language, voice_prompt_path, generation, is_partial=is_partial, ) ) if runtime.response_watchdog_task is not None: runtime.response_watchdog_task.cancel() runtime.response_watchdog_task = None if not is_partial: runtime.response_watchdog_task = asyncio.create_task( _response_watchdog( websocket, state, runtime, transcript, transcript_language, voice_prompt_path, generation, retry_count, ) ) async def _respond_to_transcript( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, transcript: str, transcript_language: str | None, voice_prompt_path: str | None, generation: int, *, is_partial: bool, ) -> None: try: if settings.assistant_backend == "my-agent-cli": await _process_agent_cli_reply( websocket, state, runtime, transcript, transcript_language, voice_prompt_path, generation, is_partial=is_partial, ) return full_reply_parts: list[str] = [] did_send_audio = False async for sentence in pipeline.stream_reply_sentences( transcript, conversation_history=runtime.conversation_history, response_language=transcript_language, ): if not _generation_is_current(runtime, generation): return if not sentence: continue if runtime.last_llm_first_sentence_at == 0.0: runtime.last_llm_first_sentence_at = time.monotonic() display_text, tts_text = pipeline.render_assistant_text(sentence, transcript) if not display_text or not tts_text: continue full_reply_parts.append(display_text) sent_audio = await _send_reply_unit( websocket, state, runtime, generation, display_text, pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path), ) did_send_audio = did_send_audio or sent_audio if not full_reply_parts: fallback_reply = "Sorry, give me a second." if transcript.strip() else "I didn't catch that." print("LLM reply stream empty, using fallback reply", file=sys.stderr) display_text, tts_text = pipeline.render_assistant_text(fallback_reply, transcript) full_reply_parts.append(display_text) sent_audio = await _send_reply_unit( websocket, state, runtime, generation, display_text, pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path), ) did_send_audio = did_send_audio or sent_audio if did_send_audio: state.set_assistant_active(0.0, settings.assistant_holdoff_ms) if not is_partial and full_reply_parts: _remember_conversation_turn(runtime, transcript, " ".join(full_reply_parts).strip()) if _generation_is_current(runtime, generation): _log_turn_latency(runtime, "assistant.done") if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": " ".join(full_reply_parts).strip()}): raise WebSocketDisconnect except asyncio.CancelledError: return finally: current_task = asyncio.current_task() if runtime.response_task is current_task: runtime.response_task = None runtime.active_response_transcript = "" runtime.active_response_is_partial = False if runtime.response_watchdog_task is not None: runtime.response_watchdog_task.cancel() runtime.response_watchdog_task = None async def _send_fallback_reply( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, generation: int, text: str, voice_prompt_path: str | None, ) -> None: display_text, tts_text = pipeline.render_assistant_text(text, "") if not display_text or not tts_text: return sent_audio = await _send_reply_unit( websocket, state, runtime, generation, display_text, pipeline.stream_reply_audio(tts_text, "", voice_prompt_path=voice_prompt_path), ) if sent_audio: state.set_assistant_active(0.0, settings.assistant_holdoff_ms) if _generation_is_current(runtime, generation): _log_turn_latency(runtime, "assistant.done") if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": display_text}): raise WebSocketDisconnect async def _response_watchdog( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, transcript: str, transcript_language: str | None, voice_prompt_path: str | None, generation: int, retry_count: int, ) -> None: try: await asyncio.sleep(max(settings.assistant_response_watchdog_ms, 0) / 1000.0) if not _generation_is_current(runtime, generation): return if runtime.last_assistant_text_at != 0.0: return if runtime.response_task is None: return if runtime.active_response_is_partial or runtime.active_response_transcript.strip() != transcript.strip(): return print( f"LLM response watchdog triggered transcript={transcript!r} retry_count={retry_count}", file=sys.stderr, ) runtime.response_task.cancel() runtime.response_task = None runtime.active_response_transcript = "" runtime.active_response_is_partial = False if retry_count < 1: retry_generation = _next_generation(runtime) _start_response_task( websocket, state, runtime, transcript, transcript_language, voice_prompt_path, retry_generation, is_partial=False, retry_count=retry_count + 1, ) return fallback_generation = _next_generation(runtime) await _send_fallback_reply( websocket, state, runtime, fallback_generation, "Sorry, give me a second.", voice_prompt_path, ) except asyncio.CancelledError: return async def _send_reply_unit( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, generation: int, display_text: str, audio_stream, ) -> bool: iterator = audio_stream.__aiter__() did_send_audio = False first_chunk: np.ndarray | None = None try: first_chunk = await iterator.__anext__() if runtime.last_tts_first_chunk_ready_at == 0.0: runtime.last_tts_first_chunk_ready_at = time.monotonic() except StopAsyncIteration: first_chunk = None if not _generation_is_current(runtime, generation): return False if runtime.last_assistant_text_at == 0.0: runtime.last_assistant_text_at = time.monotonic() runtime.last_assistant_reply = display_text.strip() if not await _send_json(websocket, runtime, {"type": "assistant.text", "text": display_text}): raise WebSocketDisconnect if first_chunk is not None: await _send_audio_chunk(websocket, state, runtime, generation, first_chunk) did_send_audio = True async for audio_chunk in iterator: if not _generation_is_current(runtime, generation): return did_send_audio await _send_audio_chunk(websocket, state, runtime, generation, audio_chunk) did_send_audio = True return did_send_audio async def _maybe_emit_partial_transcript( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, *, input_sample_rate: int, ) -> None: if not settings.partial_transcripts_enabled: return if runtime.partial_transcript_task is not None and not runtime.partial_transcript_task.done(): return if state.turn_speech_ms < settings.partial_transcript_min_ms: return now = time.monotonic() interval_s = max(settings.partial_transcript_interval_ms, 0) / 1000.0 if state.last_partial_transcript_at and now - state.last_partial_transcript_at < interval_s: return snapshot = state.current_audio() if snapshot.size == 0: return state.last_partial_transcript_at = now runtime.partial_transcript_task = asyncio.create_task( _emit_partial_transcript(websocket, state, runtime, snapshot, input_sample_rate) ) async def _emit_partial_transcript( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, audio: np.ndarray, input_sample_rate: int, ) -> None: try: transcription_audio = resample_audio(audio, input_sample_rate, settings.sample_rate) keep_edge_ms = settings.short_utterance_keep_edge_ms if _is_short_utterance(transcription_audio) else 0 transcription_audio = trim_silence( transcription_audio, settings.sample_rate, settings.transcription_trim_threshold, keep_edge_ms=keep_edge_ms, ) if transcription_audio.size == 0: return transcript = await pipeline.transcribe_partial(transcription_audio) cleaned = transcript.strip() if cleaned == state.last_partial_transcript_text: return state.last_partial_transcript_text = cleaned state.last_partial_transcript_change_at = time.monotonic() if not await _send_json(websocket, runtime, {"type": "transcript.partial", "text": cleaned}): raise WebSocketDisconnect except asyncio.CancelledError: return finally: current_task = asyncio.current_task() if runtime.partial_transcript_task is current_task: runtime.partial_transcript_task = None async def _clear_partial_transcript(websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime) -> None: await _cancel_task(runtime.partial_transcript_task) runtime.partial_transcript_task = None if not state.last_partial_transcript_text: return state.last_partial_transcript_text = "" state.last_partial_transcript_change_at = 0.0 await _send_json(websocket, runtime, {"type": "transcript.partial", "text": ""}) async def _maybe_start_partial_response( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, voice_prompt_path: str | None, ) -> None: if not settings.partial_response_enabled: return if settings.assistant_backend == "my-agent-cli": return transcript = state.last_partial_transcript_text.strip() if not transcript: return if runtime.response_task is not None: return if transcript == state.last_partial_response_text: return endpoint = _dynamic_endpoint_decision(state) if not endpoint.complete or endpoint.incomplete: return if endpoint.question_like or _starts_with_question_starter(transcript): return word_count = len(re.findall(r"\w+", transcript)) if word_count > settings.partial_response_complete_max_words: return min_words = settings.partial_response_min_words if endpoint.complete: min_words = min(min_words, settings.partial_response_complete_min_words) elif endpoint.incomplete: min_words = max(min_words, settings.partial_response_incomplete_min_words) if word_count < min_words: return if state.silence_ms < settings.partial_response_min_silence_ms: return if not state.last_partial_transcript_change_at: return stable_ms = settings.partial_response_stable_ms if endpoint.complete: stable_ms = min(stable_ms, settings.partial_response_complete_stable_ms) elif endpoint.incomplete: stable_ms = max(stable_ms, settings.partial_response_incomplete_stable_ms) stable_s = max(stable_ms, 0) / 1000.0 if time.monotonic() - state.last_partial_transcript_change_at < stable_s: return state.last_partial_response_text = transcript generation = _next_generation(runtime) _start_response_task( websocket, state, runtime, transcript, runtime.last_transcript_language, voice_prompt_path, generation, is_partial=True, ) async def _process_agent_cli_reply( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, transcript: str, transcript_language: str | None, voice_prompt_path: str | None, generation: int, *, is_partial: bool, ) -> None: summary_reply_parts: list[str] = [] did_send_audio = False async for event in pipeline.stream_agent_cli_events(transcript, runtime.agent_session_id): if not _generation_is_current(runtime, generation): return kind = event.get("kind") if kind == "session_created": runtime.agent_session_id = event.get("newSessionId") or event.get("session_id") or runtime.agent_session_id continue if kind == "status": text = (event.get("text") or "").strip() if text: print(f"AGENT status={text}", file=sys.stderr) is_generic = _is_generic_agent_status(text) is_internal = _is_internal_agent_status(text) _mirror_cli_activity("Status", text) if not is_internal: if not await _send_json(websocket, runtime, {"type": "assistant.activity", "kind": "status", "text": text}): raise WebSocketDisconnect if not is_generic and not is_internal: chat_text = _status_text_for_chat(text) if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}): raise WebSocketDisconnect await _maybe_speak_progress_update(websocket, state, runtime, text, voice_prompt_path) continue if kind == "tool_use": tool_name = (event.get("toolName") or "").strip() tool_input = event.get("toolInput") or {} status_text = _tool_use_status_text(tool_name, tool_input) print(f"AGENT tool={status_text}", file=sys.stderr) _mirror_cli_activity("Tool", status_text) if not await _send_json( websocket, runtime, { "type": "assistant.activity", "kind": "tool_use", "toolName": tool_name, "toolInput": tool_input, "text": status_text, }, ): raise WebSocketDisconnect await _maybe_speak_progress_update(websocket, state, runtime, _tool_use_voice_text(tool_name), voice_prompt_path) continue if kind == "tool_result": result_text = _tool_result_summary(event.get("content")) _mirror_cli_activity("Result", result_text) if not await _send_json( websocket, runtime, { "type": "assistant.activity", "kind": "tool_result", "toolName": (event.get("toolName") or "").strip(), "text": result_text, "isError": bool(event.get("isError")), }, ): raise WebSocketDisconnect if event.get("isError"): error_text = (event.get("content") or "").strip() if error_text: print(f"AGENT error={error_text}", file=sys.stderr) chat_text = _status_text_for_chat(error_text) if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}): raise WebSocketDisconnect continue if kind == "error": error_text = (event.get("content") or "").strip() if error_text: print(f"AGENT error={error_text}", file=sys.stderr) _mirror_cli_activity("Error", error_text) if not await _send_json( websocket, runtime, {"type": "assistant.activity", "kind": "error", "text": error_text}, ): raise WebSocketDisconnect chat_text = _status_text_for_chat(error_text) if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}): raise WebSocketDisconnect continue if kind == "text" and (event.get("role") or "") == "assistant": content = (event.get("content") or "").strip() if not content: continue cli_text = _normalize_cli_reply_text(content) summary_source = _summarize_agent_reply(content) display_text, tts_text = pipeline.render_assistant_text(summary_source, transcript) if not cli_text or not display_text or not tts_text: continue normalized_reply = display_text.strip().lower() if normalized_reply and normalized_reply == runtime.last_assistant_reply: continue runtime.last_assistant_reply = normalized_reply print(f"AGENT reply={display_text}", file=sys.stderr) _mirror_cli_turn("Agent", cli_text) summary_reply_parts.append(display_text) if not await _send_json( websocket, runtime, {"type": "assistant.activity", "kind": "message", "text": display_text}, ): raise WebSocketDisconnect if not await _send_json(websocket, runtime, {"type": "assistant.text", "text": display_text}): raise WebSocketDisconnect sent_audio = await _stream_reply_audio( websocket, state, runtime, generation, pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path), ) did_send_audio = did_send_audio or sent_audio if did_send_audio: state.set_assistant_active(0.0, settings.assistant_holdoff_ms) if not is_partial and summary_reply_parts: _remember_conversation_turn(runtime, transcript, " ".join(summary_reply_parts).strip()) if _generation_is_current(runtime, generation): if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": " ".join(summary_reply_parts).strip()}): raise WebSocketDisconnect def _next_generation(runtime: SessionRuntime) -> int: runtime.response_generation += 1 return runtime.response_generation def _generation_is_current(runtime: SessionRuntime, generation: int) -> bool: return runtime.response_generation == generation def _remember_conversation_turn(runtime: SessionRuntime, user_text: str, assistant_text: str) -> None: if not settings.conversation_memory_enabled: return user_clean = user_text.strip() assistant_clean = assistant_text.strip() if not user_clean or not assistant_clean: return runtime.conversation_history.append({"role": "user", "content": user_clean}) runtime.conversation_history.append({"role": "assistant", "content": assistant_clean}) max_messages = max(settings.conversation_memory_turns, 0) * 2 if max_messages > 0 and len(runtime.conversation_history) > max_messages: runtime.conversation_history[:] = runtime.conversation_history[-max_messages:] max_chars = max(settings.conversation_memory_max_chars, 0) if max_chars <= 0: return while runtime.conversation_history: total_chars = sum(len(item.get("content", "")) for item in runtime.conversation_history) if total_chars <= max_chars: break runtime.conversation_history.pop(0) def _starts_with_question_starter(transcript: str) -> bool: return bool(QUESTION_STARTER_PATTERN.match(transcript.strip())) async def _interrupt_active_assistant( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, *, notify_client: bool = True, ) -> int | None: response_task = runtime.response_task assistant_active = state.should_ignore_input() superseded_transcript = runtime.active_response_transcript.strip() superseded_partial = runtime.active_response_is_partial user_visible_interrupt = bool( response_task is not None and not runtime.active_response_is_partial and runtime.active_response_transcript.strip() ) if response_task is None and not assistant_active: return None new_generation = _next_generation(runtime) state.interrupt_assistant() await _cancel_task(runtime.backchannel_task) runtime.backchannel_task = None await _cancel_task(runtime.response_watchdog_task) runtime.response_watchdog_task = None if response_task is not None: if superseded_transcript: print( "ASSISTANT superseded " f"partial={int(superseded_partial)} " f"transcript={superseded_transcript!r}", file=sys.stderr, ) runtime.response_task = None response_task.cancel() runtime.agent_session_id = None pipeline.reset_assistant_session() runtime.active_response_transcript = "" runtime.active_response_is_partial = False if user_visible_interrupt: _mirror_cli_interrupt() if notify_client and user_visible_interrupt: if not await _send_json(websocket, runtime, {"type": "assistant.interrupted"}): raise WebSocketDisconnect return new_generation async def _interrupt_response( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, *, notify_client: bool = True, ) -> bool: response_task = runtime.response_task utterance_task = runtime.utterance_task assistant_active = state.should_ignore_input() superseded_transcript = runtime.active_response_transcript.strip() superseded_partial = runtime.active_response_is_partial user_visible_interrupt = bool( utterance_task is not None or (response_task is not None and not runtime.active_response_is_partial and runtime.active_response_transcript.strip()) ) if response_task is None and utterance_task is None and not assistant_active: return False _next_generation(runtime) state.interrupt_assistant() await _cancel_task(runtime.partial_transcript_task) runtime.partial_transcript_task = None await _cancel_task(runtime.backchannel_task) runtime.backchannel_task = None await _cancel_task(runtime.response_watchdog_task) runtime.response_watchdog_task = None if utterance_task is not None: runtime.utterance_task = None utterance_task.cancel() if response_task is not None: if superseded_transcript: print( "ASSISTANT superseded " f"partial={int(superseded_partial)} " f"transcript={superseded_transcript!r}", file=sys.stderr, ) runtime.response_task = None response_task.cancel() if utterance_task is not None or response_task is not None: runtime.agent_session_id = None pipeline.reset_assistant_session() runtime.active_response_transcript = "" runtime.active_response_is_partial = False if user_visible_interrupt: _mirror_cli_interrupt() if notify_client and user_visible_interrupt: if not await _send_json(websocket, runtime, {"type": "assistant.interrupted"}): raise WebSocketDisconnect return True async def _send_json(websocket: WebSocket, runtime: SessionRuntime, payload: dict) -> bool: try: async with runtime.send_lock: await websocket.send_json(payload) return True except (WebSocketDisconnect, RuntimeError): return False async def _send_bytes(websocket: WebSocket, runtime: SessionRuntime, payload: bytes) -> bool: try: async with runtime.send_lock: await websocket.send_bytes(payload) return True except (WebSocketDisconnect, RuntimeError): return False async def _stream_reply_audio( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, generation: int, audio_stream, ) -> bool: did_send_audio = False async for audio_chunk in audio_stream: if not _generation_is_current(runtime, generation): return did_send_audio await _send_audio_chunk(websocket, state, runtime, generation, audio_chunk) did_send_audio = True return did_send_audio async def _send_audio_chunk( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, generation: int, audio_chunk: np.ndarray, ) -> None: if runtime.last_first_audio_at == 0.0: runtime.last_first_audio_at = time.monotonic() audio_duration_ms = (len(audio_chunk) / 24000.0) * 1000.0 state.set_assistant_active(audio_duration_ms, settings.assistant_holdoff_ms) state.set_barge_grace(settings.assistant_barge_in_grace_ms) for chunk in chunk_audio(audio_chunk, 2400): if not _generation_is_current(runtime, generation): return clipped = np.clip(chunk, -1.0, 1.0) pcm16 = (clipped * 32767.0).astype(np.int16) if not await _send_bytes(websocket, runtime, pcm16.tobytes()): raise WebSocketDisconnect if not await _send_json(websocket, runtime, {"type": "assistant.audio_chunk_end"}): raise WebSocketDisconnect def _log_turn_latency(runtime: SessionRuntime, marker: str) -> None: speech_end_at = runtime.last_speech_end_at if not speech_end_at: return transcript_ms = ( (runtime.last_transcript_final_at - speech_end_at) * 1000.0 if runtime.last_transcript_final_at else -1.0 ) text_ms = ( (runtime.last_assistant_text_at - speech_end_at) * 1000.0 if runtime.last_assistant_text_at else -1.0 ) llm_ms = ( (runtime.last_llm_first_sentence_at - speech_end_at) * 1000.0 if runtime.last_llm_first_sentence_at else -1.0 ) tts_ready_ms = ( (runtime.last_tts_first_chunk_ready_at - speech_end_at) * 1000.0 if runtime.last_tts_first_chunk_ready_at else -1.0 ) audio_ms = ( (runtime.last_first_audio_at - speech_end_at) * 1000.0 if runtime.last_first_audio_at else -1.0 ) done_ms = (time.monotonic() - speech_end_at) * 1000.0 print( "LATENCY " f"marker={marker} " f"speech_end_to_transcript_ms={transcript_ms:.1f} " f"speech_end_to_first_llm_sentence_ms={llm_ms:.1f} " f"speech_end_to_first_tts_chunk_ready_ms={tts_ready_ms:.1f} " f"speech_end_to_text_ms={text_ms:.1f} " f"speech_end_to_first_audio_ms={audio_ms:.1f} " f"speech_end_to_done_ms={done_ms:.1f}", file=sys.stderr, ) def _is_short_utterance(audio: np.ndarray) -> bool: duration_ms = (audio.size / settings.sample_rate) * 1000.0 return duration_ms <= settings.short_utterance_ms def _should_skip_transcription( audio: np.ndarray, audio_rms: float, audio_peak: float, *, was_barge_in: bool = False, ) -> bool: if was_barge_in and audio.size > 0: return False duration_ms = (audio.size / settings.sample_rate) * 1000.0 min_samples = int((settings.min_utterance_ms / 1000.0) * settings.sample_rate) short_utterance = _is_short_utterance(audio) min_rms = settings.short_utterance_min_rms if short_utterance else settings.transcription_min_rms min_peak = settings.short_utterance_min_peak if short_utterance else settings.transcription_min_peak if audio_rms < min_rms or audio_peak < min_peak: return True if short_utterance and duration_ms < settings.min_utterance_ms: return True if audio.size >= min_samples: return False return not short_utterance async def _register_session(connection: SessionConnection) -> None: async with ACTIVE_SESSIONS_LOCK: ACTIVE_SESSIONS[connection.runtime.session_id] = connection async def _unregister_session(session_id: str) -> None: async with ACTIVE_SESSIONS_LOCK: ACTIVE_SESSIONS.pop(session_id, None) async def _resolve_target_sessions(session_id: str | None) -> list[SessionConnection]: async with ACTIVE_SESSIONS_LOCK: if session_id: connection = ACTIVE_SESSIONS.get(session_id) return [connection] if connection is not None else [] if len(ACTIVE_SESSIONS) == 1: return list(ACTIVE_SESSIONS.values()) return [] async def _queue_notification(connection: SessionConnection, notification: AssistantNotification) -> None: if not await _send_json( connection.websocket, connection.runtime, {"type": notification.event_type, "text": notification.text}, ): return if notification.speak and connection.runtime.notification_queue is not None: await connection.runtime.notification_queue.put(notification) async def _notification_worker(connection: SessionConnection) -> None: runtime = connection.runtime queue = runtime.notification_queue if queue is None: return try: while True: notification = await queue.get() if not notification.text.strip(): continue if not await _wait_for_notification_window(connection.state, runtime): continue await _speak_side_channel( connection.websocket, connection.state, runtime, notification.text, connection.voice_prompt_path, event_type=notification.event_type, ) except asyncio.CancelledError: return async def _wait_for_notification_window(state: UtteranceState, runtime: SessionRuntime) -> bool: deadline = time.monotonic() + max(settings.notification_max_wait_ms, 0) / 1000.0 while time.monotonic() < deadline: if runtime.response_task is None and runtime.utterance_task is None and not state.in_speech and not state.should_ignore_input(): return True await asyncio.sleep(0.1) return False async def _maybe_trigger_backchannel( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, voice_prompt_path: str | None, ) -> None: if not settings.backchannel_enabled: return if runtime.response_task is not None or runtime.utterance_task is not None or state.should_ignore_input(): return if time.monotonic() < runtime.backchannel_audio_until: return if runtime.backchannel_task is not None and not runtime.backchannel_task.done(): return now = time.monotonic() if state.turn_speech_ms < settings.backchannel_min_speech_ms: return if state.active_speech_ms < settings.backchannel_stable_ms: return if (now - state.last_backchannel_at) * 1000.0 < settings.backchannel_min_interval_ms: return text = _choose_backchannel_text(state) state.last_backchannel_at = now state.backchannel_count += 1 runtime.backchannel_task = asyncio.create_task( _speak_side_channel( websocket, state, runtime, text, voice_prompt_path, event_type="assistant.backchannel", ) ) def _choose_backchannel_text(state: UtteranceState) -> str: transcript = state.last_partial_transcript_text.strip().lower() empathetic = ("I hear you", "yeah", "right", "okay") questionish = ("right", "okay", "got it", "go on") neutral_early = ("uh-huh", "yeah", "right", "go on") neutral_late = ("I hear you", "okay", "got it", "all right") if re.search(r"\b(frustrat|annoy|upset|worried|stress|issue|problem|hard|difficult|stuck)\w*", transcript): pool = empathetic elif "?" in transcript or re.search(r"\b(why|how|what|when|where|who|can you|do you|should i)\b", transcript): pool = questionish else: pool = neutral_early if state.backchannel_count == 0 else neutral_late limit = max(settings.backchannel_recent_limit, 1) recent = set(state.recent_backchannels[-limit:]) options = [choice for choice in pool if choice not in recent] or list(pool) choice = random.choice(options) state.recent_backchannels.append(choice) if len(state.recent_backchannels) > limit: del state.recent_backchannels[:-limit] return choice def _dynamic_endpoint_decision(state: UtteranceState) -> EndpointDecision: base_stop_ms = float(settings.vad_stop_ms) if not settings.dynamic_endpointing_enabled: return EndpointDecision( stop_ms=base_stop_ms, complete=False, incomplete=False, question_like=False, ) transcript = state.last_partial_transcript_text.strip() tokens = re.findall(r"[a-z']+", transcript.lower()) word_count = len(tokens) last_word = tokens[-1] if tokens else "" first_word = tokens[0] if tokens else "" if not transcript: stop_ms = base_stop_ms if state.turn_speech_ms <= 900: stop_ms -= settings.dynamic_endpointing_no_partial_short_discount_ms elif state.turn_speech_ms <= 1800: stop_ms -= settings.dynamic_endpointing_no_partial_medium_discount_ms stop_ms = max(settings.dynamic_endpointing_min_ms, stop_ms) stop_ms = min(settings.dynamic_endpointing_max_ms, stop_ms) return EndpointDecision( stop_ms=float(stop_ms), complete=False, incomplete=False, question_like=False, ) incomplete_tail_words = { "a", "an", "the", "and", "or", "but", "so", "because", "if", "when", "while", "that", "which", "who", "whose", "whom", "to", "for", "from", "with", "of", "in", "on", "at", "by", "into", "about", "like", "as", "than", "then", "is", "are", "was", "were", "am", "be", "been", "being", "do", "does", "did", "have", "has", "had", "can", "could", "should", "would", "will", "just", "my", "your", "our", "their", "this", "these", "those", "it", "i'm", "you're", "we're", "they're", "he's", "she's", "then", "then i", "then we", } filler_tail_words = {"um", "uh", "er", "ah", "hmm", "mm", "like"} continuation_phrases = ( "i think", "i mean", "for example", "so if", "and then", "because if", "what i", "what if", "the thing is", "it feels like", ) question_starters = ( "what", "why", "how", "when", "where", "who", "which", "can", "could", "would", "should", "do", "does", "did", "is", "are", "am", "will", ) ends_with_terminal_punctuation = bool(re.search(r"[.!?]['\"]?\s*$", transcript)) ends_with_commaish_pause = bool(re.search(r"[,;:]\s*$", transcript)) starts_with_continuation = any(transcript.lower().startswith(phrase) for phrase in continuation_phrases) open_clause_pattern = re.search( r"\b(" r"trying to|want to|wanted to|need to|needed to|going to|about to|" r"tell me how|tell me what|tell me why|show me how|" r"how to|what to|where to|why the|why it|" r"open the|open a|fix the|fix a" r")\b(?:\s+\w+){0,3}\s*$", transcript.lower(), ) request_for_instructions = re.search( r"\b(can|could|would|will)\s+you\s+tell\s+me\s+how\s+to\b(?:\s+\w+){0,4}\s*$", transcript.lower(), ) question_like = transcript.endswith("?") or ( word_count >= 4 and tokens and first_word in question_starters ) has_recent_partial = bool(transcript) and state.silence_ms < max(settings.vad_stop_ms, 1) * 0.8 incomplete = ( bool(transcript) and not ends_with_terminal_punctuation and ( last_word in incomplete_tail_words or last_word in filler_tail_words or ends_with_commaish_pause or re.search(r"\.\.\.\s*$", transcript) is not None or starts_with_continuation or open_clause_pattern is not None or request_for_instructions is not None or re.search(r"\b(and|or|but|so|because|if|when|while|then)\s+(the|a|an|my|your|our|their|it|this|that)\s*$", transcript.lower()) is not None or re.search(r"\b(i|we|they|he|she)\s+(was|were|am|are|is|have|had|would|could|should|will)\s*$", transcript.lower()) is not None or (has_recent_partial and word_count < 12 and not ends_with_terminal_punctuation) ) ) complete = ( bool(transcript) and word_count >= 3 and not incomplete and (ends_with_terminal_punctuation or question_like) ) stop_ms = base_stop_ms if transcript and word_count <= 2 and state.turn_speech_ms < 900: stop_ms += settings.dynamic_endpointing_short_utterance_bias_ms if incomplete: stop_ms += settings.dynamic_endpointing_incomplete_bias_ms if complete: stop_ms -= settings.dynamic_endpointing_complete_discount_ms if question_like and complete: stop_ms -= settings.dynamic_endpointing_question_discount_ms stop_ms = max(settings.dynamic_endpointing_min_ms, stop_ms) stop_ms = min(settings.dynamic_endpointing_max_ms, stop_ms) return EndpointDecision( stop_ms=float(stop_ms), complete=complete, incomplete=incomplete, question_like=question_like, ) async def _maybe_speak_progress_update( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, text: str, voice_prompt_path: str | None, ) -> None: if not text.strip(): return now = time.monotonic() if (now - runtime.last_progress_voice_at) * 1000.0 < settings.agent_progress_speak_min_interval_ms: return runtime.last_progress_voice_at = now await _speak_side_channel( websocket, state, runtime, text, voice_prompt_path, event_type="assistant.status", ) def _tool_use_status_text(tool_name: str, tool_input: object) -> str: if not isinstance(tool_input, dict): return tool_name or "working" preview = "" for key in ("path", "command", "url", "pattern", "name_pattern", "query", "tool_name"): value = tool_input.get(key) if isinstance(value, str) and value.strip(): preview = value.strip() break if preview: preview = preview if len(preview) <= 72 else f"{preview[:69]}..." return f"{tool_name}: {preview}" return tool_name or "working" def _status_text_for_chat(text: str) -> str: cleaned = _normalize_cli_reply_text(text) if not cleaned: return "" if _is_internal_agent_status(cleaned): return "" if len(cleaned) <= 120: return cleaned return f"{cleaned[:117].rstrip()}..." def _is_generic_agent_status(text: str) -> bool: normalized = text.strip().lower().rstrip(".") return normalized in { "working on it", "thinking", "building a plan", "starting multi-step work", "complex task detected; switching to orchestrate mode", "simple task detected; using tools", } def _is_internal_agent_status(text: str) -> bool: normalized = text.strip().lower() return ( "injected relevant context from memory" in normalized or "relevant context from memory" in normalized or "context from memory" in normalized or normalized == "from memory" ) def _normalize_cli_reply_text(text: str) -> str: cleaned = ANSI_ESCAPE_PATTERN.sub("", text).replace("\r", "\n") cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned.strip() def _summarize_agent_reply(text: str) -> str: cleaned = _normalize_cli_reply_text(text) if not cleaned: return "" cleaned = FENCED_CODE_PATTERN.sub(" ", cleaned) cleaned = MARKDOWN_LINK_PATTERN.sub(r"\1", cleaned) cleaned = INLINE_CODE_PATTERN.sub(r"\1", cleaned) parts: list[str] = [] for raw_line in cleaned.splitlines(): line = raw_line.strip() if not line or TABLE_ROW_PATTERN.match(line): continue if _is_internal_agent_status(line): continue line = re.sub(r"^#{1,6}\s*", "", line) line = LIST_PREFIX_PATTERN.sub("", line) line = re.sub(r"^\s*>\s*", "", line) line = re.sub(r"\*\*(.*?)\*\*", r"\1", line) line = re.sub(r"__(.*?)__", r"\1", line) line = re.sub(r"(? 150: break summary_parts.append(normalized) current_len = projected if len(summary_parts) >= 2: break summary = " ".join(summary_parts).strip() or flattened summary = _scrub_code_artifacts(summary) if not summary: return "I found it." return summary if len(summary) <= 150 else f"{summary[:147].rstrip()}..." def _fallback_agent_reply_summary(text: str) -> str: sentences = re.findall(r"[^.!?\n]+[.!?]?", text) for sentence in sentences: normalized = re.sub(r"\s+", " ", sentence).strip() if not normalized: continue if _is_internal_agent_status(normalized): continue if RAWISH_LINE_PATTERN.search(normalized): continue word_count = len(re.findall(r"\w+", normalized)) if word_count < 3: continue return normalized return "I found it." def _scrub_code_artifacts(text: str) -> str: scrubbed = re.sub(r"\b\w+/\w+[-\w./]*\.\w{1,5}\b", "", text) # relative paths scrubbed = re.sub(r"/[-\w./]{3,}", "", scrubbed) # absolute paths scrubbed = re.sub(r"\$\w+", "", scrubbed) # shell vars scrubbed = re.sub(r"\b\w+=\S+", "", scrubbed) # key=value scrubbed = NON_SPOKEN_CHARS_PATTERN.sub(" ", scrubbed) scrubbed = re.sub(r"\s+", " ", scrubbed).strip() scrubbed = re.sub(r"\s+([,.:;!?])", r"\1", scrubbed) return scrubbed def _mirror_cli_turn(speaker: str, text: str) -> None: cleaned = text.strip() if not cleaned: return print(f"{speaker}: {cleaned}", flush=True) if speaker.lower() == "you": return _mirror_to_agent_terminal(_format_cli_turn(speaker, cleaned)) _last_mirror_activity_at: float = 0.0 _MIRROR_ACTIVITY_MIN_INTERVAL_S: float = 0.4 _mirror_activity_skipped: int = 0 def _mirror_cli_activity(label: str, text: str) -> None: global _last_mirror_activity_at, _mirror_activity_skipped cleaned = _normalize_cli_reply_text(text) if not cleaned: return now = time.monotonic() elapsed = now - _last_mirror_activity_at if elapsed < _MIRROR_ACTIVITY_MIN_INTERVAL_S: _mirror_activity_skipped += 1 return skipped = _mirror_activity_skipped _mirror_activity_skipped = 0 _last_mirror_activity_at = now prefix = f" \x1b[90m(+{skipped} more)\x1b[0m\n" if skipped > 0 else "" _mirror_to_agent_terminal(f"{prefix}{_format_cli_activity(label, cleaned)}") def _mirror_cli_interrupt() -> None: _mirror_to_agent_terminal("\n\x1b[90m── interrupted ──\x1b[0m\n") def _format_cli_turn(speaker: str, text: str) -> str: is_user = speaker.lower() == "you" if is_user: return f"\n\x1b[32;1m›\x1b[0m \x1b[32m{text}\x1b[0m\n" return f"{text}\n" def _format_cli_activity(label: str, text: str) -> str: first_line, *rest = text.splitlines() or [text] summary = first_line.strip() if rest: summary = f"{summary} ..." summary = summary if len(summary) <= 120 else f"{summary[:117].rstrip()}..." lowered = label.lower() if summary == "💭 Injected relevant context from memory": return f"{summary}\n" if lowered == "status" and summary.lower().rstrip(".") in { "working on it", "thinking", "building a plan", "starting multi-step work", "complex task detected; switching to orchestrate mode", "simple task detected; using tools", }: return f" {summary}\n" if lowered == "tool": return f" \x1b[34mtool\x1b[0m {summary}\n" if lowered == "result": return f" \x1b[90mresult\x1b[0m {summary}\n" if lowered == "error": return f" \x1b[31merror\x1b[0m {summary}\n" return f" \x1b[90m…\x1b[0m {summary}\n" def _mirror_to_agent_terminal(text: str) -> None: tty_path = _resolve_agent_terminal_tty() if not tty_path: return try: payload = text if not payload.endswith("\n"): payload = f"{payload}\n" with open(tty_path, "a", encoding="utf-8", errors="ignore") as handle: handle.write(payload) handle.flush() except OSError: return def _resolve_agent_terminal_tty() -> str | None: configured = (settings.my_agent_active_tty_file or "").strip() if configured: marker_path = Path(configured) else: marker_path = Path(settings.my_agent_cwd) / ".active-terminal-tty" try: tty_path = marker_path.read_text(encoding="utf-8").strip() except OSError: return None if not tty_path.startswith("/dev/pts/") and tty_path != "/dev/tty": return None if not os.path.exists(tty_path): return None return tty_path def _tool_use_voice_text(tool_name: str) -> str: normalized = tool_name.lower() if "browser" in normalized or "web" in normalized: return "I'm checking that." if "file" in normalized or "path" in normalized: return "I'm looking through the files." if "command" in normalized or "shell" in normalized: return "I'm working on it in the terminal." if "search" in normalized: return "I'm looking into that." return "I'm working on it." def _tool_result_summary(content: object) -> str: if isinstance(content, str): text = content.strip() elif isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict): text_value = item.get("text") if isinstance(text_value, str) and text_value.strip(): parts.append(text_value.strip()) elif isinstance(item, str) and item.strip(): parts.append(item.strip()) text = " ".join(parts).strip() else: text = "" if not text: return "done" text = FENCED_CODE_PATTERN.sub(" ", text) text = re.sub(r"\s+", " ", text).strip() if not text: return "done" segments = re.split(r"(?<=[.!?])\s+|\n+", text) for segment in segments: normalized = segment.strip() if not normalized or RAWISH_LINE_PATTERN.search(normalized): continue return normalized if len(normalized) <= 100 else f"{normalized[:97].rstrip()}..." return "done" async def _speak_side_channel( websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime, text: str, voice_prompt_path: str | None, *, event_type: str, ) -> None: try: should_send_backchannel_audio = event_type != "assistant.backchannel" or state.in_speech display_text, tts_text = pipeline.render_assistant_text(text, "") if not display_text or not tts_text: return if event_type == "assistant.backchannel": if not await _send_json(websocket, runtime, {"type": event_type, "text": display_text}): raise WebSocketDisconnect if event_type == "assistant.backchannel": audio_chunks = await pipeline.synthesize_backchannel(tts_text, voice_prompt_path=voice_prompt_path) else: audio_chunks = await pipeline.synthesize_sentences(tts_text, voice_prompt_path=voice_prompt_path) if event_type == "assistant.backchannel" and not should_send_backchannel_audio: return if event_type == "assistant.backchannel": total_duration_s = sum(len(chunk) / 24000.0 for chunk in audio_chunks) runtime.backchannel_audio_until = time.monotonic() + total_duration_s + 0.15 for audio_chunk in audio_chunks: for chunk in chunk_audio(audio_chunk, 2400): clipped = np.clip(chunk, -1.0, 1.0) pcm16 = (clipped * 32767.0).astype(np.int16) if not await _send_bytes(websocket, runtime, pcm16.tobytes()): raise WebSocketDisconnect if not await _send_json(websocket, runtime, {"type": "assistant.audio_chunk_end"}): raise WebSocketDisconnect except (asyncio.CancelledError, WebSocketDisconnect): return async def _cancel_task(task: asyncio.Task[None] | None) -> None: if task is None: return task.cancel() try: await task except asyncio.CancelledError: pass