Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import random | |
| import re | |
| import sys | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Literal | |
| from uuid import uuid4 | |
| import numpy as np | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from app.audio import chunk_audio, frame_duration_ms, pcm16_bytes_to_float32, peak, resample_audio, rms, trim_silence | |
| from app.config import settings | |
| from app.models import UtteranceState | |
| from app.speaker import SpeakerProfile, evaluate_speaker_focus | |
| from app.speech import pipeline | |
| from app.vad import VadStream | |
| BASE_DIR = Path(__file__).resolve().parent.parent | |
| STATIC_DIR = BASE_DIR / settings.static_dir | |
| app = FastAPI(title="Voice Latency Lab") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| class SessionRuntime: | |
| session_id: str | |
| send_lock: asyncio.Lock | |
| response_generation: int = 0 | |
| utterance_task: asyncio.Task[None] | None = None | |
| response_task: asyncio.Task[None] | None = None | |
| response_watchdog_task: asyncio.Task[None] | None = None | |
| partial_transcript_task: asyncio.Task[None] | None = None | |
| notification_task: asyncio.Task[None] | None = None | |
| backchannel_task: asyncio.Task[None] | None = None | |
| notification_queue: asyncio.Queue["AssistantNotification"] | None = None | |
| agent_session_id: str | None = None | |
| last_progress_voice_at: float = 0.0 | |
| last_assistant_reply: str = "" | |
| active_response_transcript: str = "" | |
| active_response_is_partial: bool = False | |
| backchannel_audio_until: float = 0.0 | |
| last_speech_end_at: float = 0.0 | |
| last_transcript_final_at: float = 0.0 | |
| last_llm_first_sentence_at: float = 0.0 | |
| last_tts_first_chunk_ready_at: float = 0.0 | |
| last_assistant_text_at: float = 0.0 | |
| last_first_audio_at: float = 0.0 | |
| speaker_profile: SpeakerProfile | None = None | |
| conversation_history: list[dict[str, str]] = field(default_factory=list) | |
| last_transcript_language: str | None = None | |
| class AssistantNotification: | |
| text: str | |
| event_type: Literal["assistant.status", "assistant.notification"] | |
| speak: bool = True | |
| class EndpointDecision: | |
| stop_ms: float | |
| complete: bool | |
| incomplete: bool | |
| question_like: bool | |
| class SessionConnection: | |
| websocket: WebSocket | |
| state: UtteranceState | |
| runtime: SessionRuntime | |
| voice_prompt_path: str | None = None | |
| class AgentNotificationRequest(BaseModel): | |
| text: str | |
| session_id: str | None = None | |
| event_type: Literal["assistant.status", "assistant.notification"] = "assistant.notification" | |
| speak: bool = True | |
| ACTIVE_SESSIONS: dict[str, SessionConnection] = {} | |
| ACTIVE_SESSIONS_LOCK = asyncio.Lock() | |
| ANSI_ESCAPE_PATTERN = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") | |
| FENCED_CODE_PATTERN = re.compile(r"```.*?```", flags=re.S) | |
| INLINE_CODE_PATTERN = re.compile(r"`([^`]+)`") | |
| MARKDOWN_LINK_PATTERN = re.compile(r"\[([^\]]+)\]\([^)]+\)") | |
| TABLE_ROW_PATTERN = re.compile(r"^\s*[|│].*[|│]\s*$") | |
| LIST_PREFIX_PATTERN = re.compile(r"^\s*(?:[-*+]\s+|\d+\.\s+)") | |
| RAWISH_LINE_PATTERN = re.compile( | |
| r"(```|^\s*[{[]|[}\]]\s*$|::|=>|&&|\|\||" | |
| r"/[-\w./]{3,}|\\[-\w.\\]{3,}|" # absolute paths | |
| r"\b\w+/\w+[-\w./]*\.\w{1,5}\b|" # relative paths like src/foo.rs | |
| r"\$\w+|" # shell variables | |
| r"\b\w+=\S+)" # bare key=value assignments | |
| ) | |
| NON_SPOKEN_CHARS_PATTERN = re.compile(r"[{}[\]<>`|$#~^]") | |
| QUESTION_STARTER_PATTERN = re.compile( | |
| r"^\s*(?:what|how|why|when|where|who|which|can|could|would|should|do|does|did|is|are|am|will|tell me)\b", | |
| flags=re.IGNORECASE, | |
| ) | |
| def _parakeet_bypass_silero_vad() -> bool: | |
| return settings.stt_backend == "parakeet-tdt-v3" and settings.parakeet_bypass_silero_vad | |
| def _normalize_compare_text(text: str) -> str: | |
| cleaned = re.sub(r"^(?:you|user|agent|assistant)\s*:\s*", "", (text or "").strip(), flags=re.IGNORECASE) | |
| cleaned = re.sub(r"[^\w\s]", " ", cleaned) | |
| cleaned = re.sub(r"\s+", " ", cleaned) | |
| return cleaned.strip().lower() | |
| def _looks_like_assistant_echo(runtime: SessionRuntime, transcript: str) -> bool: | |
| if runtime.last_assistant_text_at == 0.0: | |
| return False | |
| if (time.monotonic() - runtime.last_assistant_text_at) > 3.0: | |
| return False | |
| transcript_text = _normalize_compare_text(transcript) | |
| assistant_text = _normalize_compare_text(runtime.last_assistant_reply) | |
| if not transcript_text or not assistant_text: | |
| return False | |
| if transcript_text == assistant_text: | |
| return True | |
| if transcript_text.startswith(assistant_text) or assistant_text.startswith(transcript_text): | |
| return True | |
| transcript_words = transcript_text.split() | |
| if len(transcript_words) >= 4 and transcript_text in assistant_text: | |
| return True | |
| return False | |
| async def preload_models() -> None: | |
| if not settings.preload_models: | |
| return | |
| print("Startup preload: loading STT/TTS/assistant models", file=sys.stderr) | |
| await asyncio.to_thread(pipeline.preload_models) | |
| await pipeline.preload_assistant() | |
| print("Startup preload: complete", file=sys.stderr) | |
| async def root() -> FileResponse: | |
| return FileResponse(STATIC_DIR / "index.html") | |
| async def health() -> dict[str, str]: | |
| return {"status": "ok"} | |
| async def list_sessions() -> dict[str, list[str]]: | |
| async with ACTIVE_SESSIONS_LOCK: | |
| session_ids = sorted(ACTIVE_SESSIONS.keys()) | |
| return {"sessionIds": session_ids} | |
| async def agent_notify(payload: AgentNotificationRequest) -> dict[str, object]: | |
| targets = await _resolve_target_sessions(payload.session_id) | |
| if not targets: | |
| return {"ok": False, "error": "no_active_session"} | |
| delivered = 0 | |
| for connection in targets: | |
| await _queue_notification( | |
| connection, | |
| AssistantNotification( | |
| text=payload.text, | |
| event_type=payload.event_type, | |
| speak=payload.speak, | |
| ), | |
| ) | |
| delivered += 1 | |
| return {"ok": True, "delivered": delivered, "sessionIds": [connection.runtime.session_id for connection in targets]} | |
| async def websocket_endpoint(websocket: WebSocket) -> None: | |
| await websocket.accept() | |
| state = UtteranceState(sample_rate=settings.sample_rate) | |
| runtime = SessionRuntime( | |
| session_id=uuid4().hex[:8], | |
| send_lock=asyncio.Lock(), | |
| notification_queue=asyncio.Queue(), | |
| speaker_profile=SpeakerProfile(), | |
| ) | |
| vad = VadStream() | |
| preroll_samples = max(1, int((settings.preroll_ms / 1000.0) * state.sample_rate)) | |
| voice_prompt_path: str | None = None | |
| connection = SessionConnection(websocket=websocket, state=state, runtime=runtime) | |
| await _register_session(connection) | |
| runtime.notification_task = asyncio.create_task(_notification_worker(connection)) | |
| await _send_json( | |
| websocket, | |
| runtime, | |
| { | |
| "type": "session.ready", | |
| "sessionId": runtime.session_id, | |
| "sampleRate": settings.sample_rate, | |
| "assistantBackend": pipeline.assistant_backend_metadata()[0], | |
| "assistantModel": pipeline.assistant_backend_metadata()[1], | |
| "vad": { | |
| "provider": "rms-parakeet" if _parakeet_bypass_silero_vad() else vad.provider_name, | |
| "threshold": settings.silero_vad_threshold, | |
| "startMs": settings.parakeet_vad_start_ms if _parakeet_bypass_silero_vad() else settings.silero_vad_start_ms, | |
| "stopMs": settings.parakeet_vad_stop_ms if _parakeet_bypass_silero_vad() else settings.vad_stop_ms, | |
| }, | |
| } | |
| ) | |
| try: | |
| while True: | |
| try: | |
| message = await websocket.receive() | |
| except RuntimeError as exc: | |
| if "disconnect message has been received" in str(exc): | |
| break | |
| raise | |
| if message.get("type") == "websocket.disconnect": | |
| break | |
| if "bytes" in message and message["bytes"] is not None: | |
| frame = pcm16_bytes_to_float32(message["bytes"]) | |
| await _handle_frame(websocket, state, runtime, vad, frame, preroll_samples, voice_prompt_path) | |
| continue | |
| text = message.get("text") | |
| if not text: | |
| continue | |
| payload = json.loads(text) | |
| event_type = payload.get("type") | |
| if event_type == "session.configure": | |
| voice_prompt_path = payload.get("voicePromptPath") or None | |
| connection.voice_prompt_path = voice_prompt_path | |
| client_sample_rate = int(payload.get("clientSampleRate") or settings.sample_rate) | |
| state.sample_rate = client_sample_rate | |
| preroll_samples = max(1, int((settings.preroll_ms / 1000.0) * state.sample_rate)) | |
| state.reset_input() | |
| state.interrupt_assistant() | |
| vad.reset() | |
| await _interrupt_response(websocket, state, runtime, notify_client=False) | |
| await _send_json( | |
| websocket, | |
| runtime, | |
| { | |
| "type": "session.configured", | |
| "voicePromptPath": voice_prompt_path, | |
| "clientSampleRate": client_sample_rate, | |
| "serverSampleRate": settings.sample_rate, | |
| } | |
| ) | |
| elif event_type == "input.commit": | |
| await _finalize_utterance(websocket, state, runtime, voice_prompt_path) | |
| elif event_type == "ping": | |
| if not await _send_json(websocket, runtime, {"type": "pong", "ts": payload.get("ts")}): | |
| raise WebSocketDisconnect | |
| except WebSocketDisconnect: | |
| return | |
| finally: | |
| await _interrupt_response(websocket, state, runtime, notify_client=False) | |
| await _cancel_task(runtime.backchannel_task) | |
| await _cancel_task(runtime.notification_task) | |
| await _unregister_session(runtime.session_id) | |
| async def _handle_frame( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| vad: VadStream, | |
| frame: np.ndarray, | |
| preroll_samples: int, | |
| voice_prompt_path: str | None, | |
| ) -> None: | |
| raw_level = rms(frame) | |
| if not await _send_json(websocket, runtime, {"type": "input.level", "value": round(raw_level, 5)}): | |
| raise WebSocketDisconnect | |
| duration_ms = frame_duration_ms(frame, state.sample_rate) | |
| if _parakeet_bypass_silero_vad(): | |
| effective_level = vad.activity_level(frame, state.sample_rate) | |
| speech_active = effective_level >= settings.parakeet_vad_rms_threshold | |
| if effective_level >= settings.parakeet_vad_rms_threshold * 0.5: | |
| print( | |
| "VAD speech=" | |
| f"{int(speech_active)} active={int(speech_active)} prob={float(speech_active):.3f} " | |
| f"level={raw_level:.5f} vad_level={effective_level:.5f} mode=rms-parakeet", | |
| file=sys.stderr, | |
| ) | |
| barge_probability = min( | |
| 1.0, | |
| max(0.0, effective_level / max(settings.parakeet_barge_in_rms_threshold, 1e-6)), | |
| ) | |
| barge_in_rms_threshold = max( | |
| settings.parakeet_barge_in_rms_threshold, | |
| settings.assistant_barge_in_min_rms, | |
| ) | |
| barge_in_start_ms = max( | |
| settings.parakeet_barge_in_start_ms, | |
| settings.assistant_barge_in_start_ms, | |
| ) | |
| speech_start_ms = settings.parakeet_vad_start_ms | |
| stop_ms_when_barge = min(settings.parakeet_vad_stop_ms, 450) | |
| else: | |
| vad_result = vad.process(frame, state.sample_rate) | |
| effective_level = vad_result.level | |
| speech_active = vad_result.speech and ( | |
| effective_level >= settings.silero_vad_min_rms | |
| or vad_result.probability >= settings.silero_vad_strong_threshold | |
| ) | |
| if vad_result.probability >= 0.05: | |
| print( | |
| "VAD speech=" | |
| f"{int(vad_result.speech)} active={int(speech_active)} prob={vad_result.probability:.3f} " | |
| f"level={raw_level:.5f} vad_level={effective_level:.5f}", | |
| file=sys.stderr, | |
| ) | |
| barge_probability = vad_result.probability | |
| barge_in_rms_threshold = settings.assistant_barge_in_min_rms | |
| barge_in_start_ms = settings.assistant_barge_in_start_ms | |
| speech_start_ms = settings.silero_vad_start_ms | |
| stop_ms_when_barge = min(settings.vad_stop_ms, 450) | |
| state.push_preroll(frame, preroll_samples) | |
| if state.should_ignore_input(): | |
| # Allow real barge-in once the assistant playback window has passed and the | |
| # user is speaking with enough confidence. Otherwise keep discarding input. | |
| barge_candidate = ( | |
| speech_active | |
| and state.can_barge_in() | |
| and effective_level >= barge_in_rms_threshold | |
| and barge_probability >= settings.assistant_barge_in_prob_threshold | |
| ) | |
| if barge_candidate: | |
| state.pending_barge_ms += duration_ms | |
| if state.pending_barge_ms >= barge_in_start_ms: | |
| await _interrupt_response(websocket, state, runtime, notify_client=True) | |
| state.clear_active_input(preserve_preroll=True) | |
| state.barge_in_active = True | |
| state.pending_speech_ms = settings.silero_vad_start_ms | |
| state.start() | |
| state.append(frame) | |
| state.turn_speech_ms += duration_ms | |
| state.active_speech_ms += duration_ms | |
| print("VAD barge-in start", file=sys.stderr) | |
| if not await _send_json(websocket, runtime, {"type": "input.speech_start", "ts": time.time()}): | |
| raise WebSocketDisconnect | |
| return | |
| await _clear_partial_transcript(websocket, state, runtime) | |
| state.pending_barge_ms = 0.0 | |
| state.clear_active_input(preserve_preroll=True) | |
| return | |
| if not state.in_speech: | |
| if speech_active: | |
| state.pending_speech_ms += duration_ms | |
| if state.pending_speech_ms >= speech_start_ms: | |
| state.start() | |
| print("VAD turn start", file=sys.stderr) | |
| if not await _send_json(websocket, runtime, {"type": "input.speech_start", "ts": time.time()}): | |
| raise WebSocketDisconnect | |
| else: | |
| state.pending_speech_ms = 0.0 | |
| if state.in_speech: | |
| state.append(frame) | |
| await _maybe_emit_partial_transcript(websocket, state, runtime, input_sample_rate=state.sample_rate) | |
| await _maybe_start_partial_response(websocket, state, runtime, voice_prompt_path) | |
| if not speech_active: | |
| state.silence_ms += duration_ms | |
| state.active_speech_ms = 0.0 | |
| if state.barge_in_active and state.silence_ms >= stop_ms_when_barge: | |
| print( | |
| "VAD barge-in end " | |
| f"silence_ms={state.silence_ms:.1f} " | |
| f"target_ms={stop_ms_when_barge:.1f}", | |
| file=sys.stderr, | |
| ) | |
| await _finalize_utterance(websocket, state, runtime, voice_prompt_path) | |
| return | |
| if _parakeet_bypass_silero_vad(): | |
| endpoint = EndpointDecision( | |
| stop_ms=float(settings.parakeet_vad_stop_ms), | |
| complete=False, | |
| incomplete=False, | |
| question_like=False, | |
| ) | |
| else: | |
| endpoint = _dynamic_endpoint_decision(state) | |
| state.dynamic_endpoint_target_ms = endpoint.stop_ms | |
| if state.silence_ms >= endpoint.stop_ms: | |
| print( | |
| "VAD turn end " | |
| f"silence_ms={state.silence_ms:.1f} " | |
| f"target_ms={endpoint.stop_ms:.1f} " | |
| f"complete={int(endpoint.complete)} " | |
| f"incomplete={int(endpoint.incomplete)}", | |
| file=sys.stderr, | |
| ) | |
| await _finalize_utterance(websocket, state, runtime, voice_prompt_path) | |
| else: | |
| state.silence_ms = 0.0 | |
| state.dynamic_endpoint_target_ms = 0.0 | |
| state.turn_speech_ms += duration_ms | |
| state.active_speech_ms += duration_ms | |
| await _maybe_trigger_backchannel(websocket, state, runtime, voice_prompt_path) | |
| if await _maybe_force_finalize_turn(websocket, state, runtime, voice_prompt_path): | |
| return | |
| async def _maybe_force_finalize_turn( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| voice_prompt_path: str | None, | |
| ) -> bool: | |
| if not state.in_speech or not state.turn_started_at: | |
| return False | |
| now = time.monotonic() | |
| turn_elapsed_ms = (now - state.turn_started_at) * 1000.0 | |
| stale_partial_ms = ( | |
| (now - state.last_partial_transcript_change_at) * 1000.0 | |
| if state.last_partial_transcript_change_at | |
| else 0.0 | |
| ) | |
| has_stable_partial = bool(state.last_partial_transcript_text.strip()) and stale_partial_ms >= settings.dynamic_endpointing_stale_partial_ms | |
| hit_max_turn = turn_elapsed_ms >= settings.dynamic_endpointing_max_turn_ms | |
| if not has_stable_partial and not hit_max_turn: | |
| return False | |
| reason = "max_turn" if hit_max_turn else "stale_partial" | |
| print( | |
| "VAD forced turn end " | |
| f"reason={reason} " | |
| f"turn_ms={turn_elapsed_ms:.1f} " | |
| f"stale_partial_ms={stale_partial_ms:.1f} " | |
| f"partial={state.last_partial_transcript_text!r}", | |
| file=sys.stderr, | |
| ) | |
| await _finalize_utterance(websocket, state, runtime, voice_prompt_path) | |
| return True | |
| async def _finalize_utterance( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| voice_prompt_path: str | None, | |
| ) -> None: | |
| input_sample_rate = state.sample_rate | |
| await _cancel_task(runtime.partial_transcript_task) | |
| runtime.partial_transcript_task = None | |
| was_barge_in = state.barge_in_active | |
| final_transcript_hint = state.last_partial_transcript_text.strip() | |
| audio = state.finish() | |
| if audio.size == 0: | |
| return | |
| await _cancel_task(runtime.backchannel_task) | |
| runtime.backchannel_task = None | |
| runtime.last_speech_end_at = time.monotonic() | |
| runtime.last_transcript_final_at = 0.0 | |
| runtime.last_llm_first_sentence_at = 0.0 | |
| runtime.last_tts_first_chunk_ready_at = 0.0 | |
| runtime.last_assistant_text_at = 0.0 | |
| runtime.last_first_audio_at = 0.0 | |
| if not await _send_json(websocket, runtime, {"type": "input.speech_end", "samples": int(audio.size)}): | |
| raise WebSocketDisconnect | |
| await _send_json(websocket, runtime, {"type": "transcript.partial", "text": ""}) | |
| generation = _next_generation(runtime) | |
| runtime.utterance_task = asyncio.create_task( | |
| _process_utterance( | |
| websocket, | |
| state, | |
| runtime, | |
| audio, | |
| input_sample_rate, | |
| voice_prompt_path, | |
| generation, | |
| was_barge_in, | |
| final_transcript_hint, | |
| ) | |
| ) | |
| async def _process_utterance( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| audio: np.ndarray, | |
| input_sample_rate: int, | |
| voice_prompt_path: str | None, | |
| generation: int, | |
| was_barge_in: bool = False, | |
| final_transcript_hint: str = "", | |
| ) -> None: | |
| try: | |
| runtime.last_assistant_reply = "" | |
| transcription_audio = resample_audio(audio, input_sample_rate, settings.sample_rate) | |
| keep_edge_ms = settings.short_utterance_keep_edge_ms if _is_short_utterance(transcription_audio) else 0 | |
| transcription_audio = trim_silence( | |
| transcription_audio, | |
| settings.sample_rate, | |
| settings.transcription_trim_threshold, | |
| keep_edge_ms=keep_edge_ms, | |
| ) | |
| if transcription_audio.size == 0: | |
| await _send_empty_reply(websocket, runtime, generation) | |
| return | |
| audio_rms = rms(transcription_audio) | |
| audio_peak = peak(transcription_audio) | |
| print( | |
| f"STT utterance stats rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", | |
| file=sys.stderr, | |
| ) | |
| if _should_skip_transcription(transcription_audio, audio_rms, audio_peak, was_barge_in=was_barge_in): | |
| await _send_empty_reply(websocket, runtime, generation) | |
| return | |
| speaker_decision = evaluate_speaker_focus( | |
| transcription_audio, | |
| settings.sample_rate, | |
| profile=runtime.speaker_profile or SpeakerProfile(), | |
| enabled=settings.speaker_focus_enabled, | |
| min_utterance_ms=settings.speaker_focus_min_utterance_ms, | |
| min_rms=settings.speaker_focus_min_rms, | |
| similarity_threshold=settings.speaker_focus_similarity_threshold, | |
| profile_alpha=settings.speaker_focus_profile_alpha, | |
| multi_speaker_threshold=settings.speaker_focus_multi_speaker_threshold, | |
| reject_mixed=settings.speaker_focus_reject_mixed, | |
| ) | |
| if settings.speaker_focus_debug: | |
| similarity_text = ( | |
| f"{speaker_decision.similarity:.3f}" | |
| if speaker_decision.similarity is not None | |
| else "n/a" | |
| ) | |
| print( | |
| "SPEAKER focus " | |
| f"reason={speaker_decision.reason} " | |
| f"process={int(speaker_decision.should_process)} " | |
| f"enrolled={int(speaker_decision.enrolled)} " | |
| f"updated={int(speaker_decision.updated)} " | |
| f"mixed={int(speaker_decision.mixed_speaker)} " | |
| f"similarity={similarity_text} " | |
| f"profile_updates={speaker_decision.profile_updates}", | |
| file=sys.stderr, | |
| ) | |
| if not speaker_decision.should_process: | |
| print( | |
| "SPEAKER filtered utterance " | |
| f"reason={speaker_decision.reason} " | |
| f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", | |
| file=sys.stderr, | |
| ) | |
| await _send_empty_reply(websocket, runtime, generation) | |
| return | |
| transcript_result = await pipeline.transcribe(transcription_audio) | |
| transcript = transcript_result.text | |
| runtime.last_transcript_language = transcript_result.language | |
| if transcript_result.language: | |
| probability_text = ( | |
| f"{transcript_result.language_probability:.3f}" | |
| if transcript_result.language_probability is not None | |
| else "n/a" | |
| ) | |
| print( | |
| "STT language " | |
| f"backend={transcript_result.backend or settings.stt_backend} " | |
| f"language={transcript_result.language} " | |
| f"probability={probability_text}", | |
| file=sys.stderr, | |
| ) | |
| if not transcript.strip() and audio_rms >= settings.transcription_min_rms and audio_peak >= settings.transcription_min_peak: | |
| print( | |
| "STT final transcript empty, retrying permissive fallback " | |
| f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", | |
| file=sys.stderr, | |
| ) | |
| fallback_result = await pipeline.transcribe_fallback(transcription_audio) | |
| transcript = fallback_result.text | |
| if fallback_result.language: | |
| runtime.last_transcript_language = fallback_result.language | |
| if pipeline.is_likely_hallucination(transcript, audio_rms): | |
| print(f"STT hallucination filtered transcript={transcript!r} rms={audio_rms:.5f}", file=sys.stderr) | |
| transcript = "" | |
| if not transcript.strip() and final_transcript_hint: | |
| hint_words = len(re.findall(r"\w+", final_transcript_hint)) | |
| if was_barge_in or hint_words >= 2: | |
| print( | |
| f"STT using partial transcript fallback hint={final_transcript_hint!r}", | |
| file=sys.stderr, | |
| ) | |
| transcript = final_transcript_hint | |
| if transcript.strip() and _looks_like_assistant_echo(runtime, transcript): | |
| print(f"STT playback echo filtered transcript={transcript!r}", file=sys.stderr) | |
| transcript = "" | |
| print(f"STT transcript={transcript!r}", file=sys.stderr) | |
| if transcript.strip(): | |
| print(f"USER transcript={transcript.strip()}", file=sys.stderr) | |
| _mirror_cli_turn("You", transcript.strip()) | |
| if not _generation_is_current(runtime, generation): | |
| return | |
| runtime.last_transcript_final_at = time.monotonic() | |
| if not await _send_json(websocket, runtime, {"type": "transcript.final", "text": transcript}): | |
| raise WebSocketDisconnect | |
| if not transcript.strip(): | |
| print( | |
| "STT empty transcript drop " | |
| f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}", | |
| file=sys.stderr, | |
| ) | |
| await _interrupt_active_assistant(websocket, state, runtime, notify_client=True) | |
| if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": ""}): | |
| raise WebSocketDisconnect | |
| return | |
| active_transcript = runtime.active_response_transcript.strip() | |
| if ( | |
| runtime.response_task is not None | |
| and runtime.active_response_is_partial | |
| and active_transcript | |
| and active_transcript == transcript.strip() | |
| ): | |
| runtime.active_response_is_partial = False | |
| return | |
| interrupted_generation = await _interrupt_active_assistant(websocket, state, runtime, notify_client=True) | |
| if interrupted_generation is not None: | |
| response_generation = interrupted_generation | |
| else: | |
| if not _generation_is_current(runtime, generation): | |
| return | |
| response_generation = _next_generation(runtime) | |
| _start_response_task( | |
| websocket, | |
| state, | |
| runtime, | |
| transcript.strip(), | |
| runtime.last_transcript_language, | |
| voice_prompt_path, | |
| response_generation, | |
| is_partial=False, | |
| ) | |
| return | |
| except asyncio.CancelledError: | |
| return | |
| finally: | |
| current_task = asyncio.current_task() | |
| if runtime.utterance_task is current_task: | |
| runtime.utterance_task = None | |
| async def _send_empty_reply(websocket: WebSocket, runtime: SessionRuntime, generation: int) -> None: | |
| if not _generation_is_current(runtime, generation): | |
| return | |
| if not await _send_json(websocket, runtime, {"type": "transcript.final", "text": ""}): | |
| raise WebSocketDisconnect | |
| if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": ""}): | |
| raise WebSocketDisconnect | |
| def _start_response_task( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| transcript: str, | |
| transcript_language: str | None, | |
| voice_prompt_path: str | None, | |
| generation: int, | |
| *, | |
| is_partial: bool, | |
| retry_count: int = 0, | |
| ) -> None: | |
| runtime.active_response_transcript = transcript | |
| runtime.active_response_is_partial = is_partial | |
| runtime.response_task = asyncio.create_task( | |
| _respond_to_transcript( | |
| websocket, | |
| state, | |
| runtime, | |
| transcript, | |
| transcript_language, | |
| voice_prompt_path, | |
| generation, | |
| is_partial=is_partial, | |
| ) | |
| ) | |
| if runtime.response_watchdog_task is not None: | |
| runtime.response_watchdog_task.cancel() | |
| runtime.response_watchdog_task = None | |
| if not is_partial: | |
| runtime.response_watchdog_task = asyncio.create_task( | |
| _response_watchdog( | |
| websocket, | |
| state, | |
| runtime, | |
| transcript, | |
| transcript_language, | |
| voice_prompt_path, | |
| generation, | |
| retry_count, | |
| ) | |
| ) | |
| async def _respond_to_transcript( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| transcript: str, | |
| transcript_language: str | None, | |
| voice_prompt_path: str | None, | |
| generation: int, | |
| *, | |
| is_partial: bool, | |
| ) -> None: | |
| try: | |
| if settings.assistant_backend == "my-agent-cli": | |
| await _process_agent_cli_reply( | |
| websocket, | |
| state, | |
| runtime, | |
| transcript, | |
| transcript_language, | |
| voice_prompt_path, | |
| generation, | |
| is_partial=is_partial, | |
| ) | |
| return | |
| full_reply_parts: list[str] = [] | |
| did_send_audio = False | |
| async for sentence in pipeline.stream_reply_sentences( | |
| transcript, | |
| conversation_history=runtime.conversation_history, | |
| response_language=transcript_language, | |
| ): | |
| if not _generation_is_current(runtime, generation): | |
| return | |
| if not sentence: | |
| continue | |
| if runtime.last_llm_first_sentence_at == 0.0: | |
| runtime.last_llm_first_sentence_at = time.monotonic() | |
| display_text, tts_text = pipeline.render_assistant_text(sentence, transcript) | |
| if not display_text or not tts_text: | |
| continue | |
| full_reply_parts.append(display_text) | |
| sent_audio = await _send_reply_unit( | |
| websocket, | |
| state, | |
| runtime, | |
| generation, | |
| display_text, | |
| pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path), | |
| ) | |
| did_send_audio = did_send_audio or sent_audio | |
| if not full_reply_parts: | |
| fallback_reply = "Sorry, give me a second." if transcript.strip() else "I didn't catch that." | |
| print("LLM reply stream empty, using fallback reply", file=sys.stderr) | |
| display_text, tts_text = pipeline.render_assistant_text(fallback_reply, transcript) | |
| full_reply_parts.append(display_text) | |
| sent_audio = await _send_reply_unit( | |
| websocket, | |
| state, | |
| runtime, | |
| generation, | |
| display_text, | |
| pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path), | |
| ) | |
| did_send_audio = did_send_audio or sent_audio | |
| if did_send_audio: | |
| state.set_assistant_active(0.0, settings.assistant_holdoff_ms) | |
| if not is_partial and full_reply_parts: | |
| _remember_conversation_turn(runtime, transcript, " ".join(full_reply_parts).strip()) | |
| if _generation_is_current(runtime, generation): | |
| _log_turn_latency(runtime, "assistant.done") | |
| if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": " ".join(full_reply_parts).strip()}): | |
| raise WebSocketDisconnect | |
| except asyncio.CancelledError: | |
| return | |
| finally: | |
| current_task = asyncio.current_task() | |
| if runtime.response_task is current_task: | |
| runtime.response_task = None | |
| runtime.active_response_transcript = "" | |
| runtime.active_response_is_partial = False | |
| if runtime.response_watchdog_task is not None: | |
| runtime.response_watchdog_task.cancel() | |
| runtime.response_watchdog_task = None | |
| async def _send_fallback_reply( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| generation: int, | |
| text: str, | |
| voice_prompt_path: str | None, | |
| ) -> None: | |
| display_text, tts_text = pipeline.render_assistant_text(text, "") | |
| if not display_text or not tts_text: | |
| return | |
| sent_audio = await _send_reply_unit( | |
| websocket, | |
| state, | |
| runtime, | |
| generation, | |
| display_text, | |
| pipeline.stream_reply_audio(tts_text, "", voice_prompt_path=voice_prompt_path), | |
| ) | |
| if sent_audio: | |
| state.set_assistant_active(0.0, settings.assistant_holdoff_ms) | |
| if _generation_is_current(runtime, generation): | |
| _log_turn_latency(runtime, "assistant.done") | |
| if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": display_text}): | |
| raise WebSocketDisconnect | |
| async def _response_watchdog( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| transcript: str, | |
| transcript_language: str | None, | |
| voice_prompt_path: str | None, | |
| generation: int, | |
| retry_count: int, | |
| ) -> None: | |
| try: | |
| await asyncio.sleep(max(settings.assistant_response_watchdog_ms, 0) / 1000.0) | |
| if not _generation_is_current(runtime, generation): | |
| return | |
| if runtime.last_assistant_text_at != 0.0: | |
| return | |
| if runtime.response_task is None: | |
| return | |
| if runtime.active_response_is_partial or runtime.active_response_transcript.strip() != transcript.strip(): | |
| return | |
| print( | |
| f"LLM response watchdog triggered transcript={transcript!r} retry_count={retry_count}", | |
| file=sys.stderr, | |
| ) | |
| runtime.response_task.cancel() | |
| runtime.response_task = None | |
| runtime.active_response_transcript = "" | |
| runtime.active_response_is_partial = False | |
| if retry_count < 1: | |
| retry_generation = _next_generation(runtime) | |
| _start_response_task( | |
| websocket, | |
| state, | |
| runtime, | |
| transcript, | |
| transcript_language, | |
| voice_prompt_path, | |
| retry_generation, | |
| is_partial=False, | |
| retry_count=retry_count + 1, | |
| ) | |
| return | |
| fallback_generation = _next_generation(runtime) | |
| await _send_fallback_reply( | |
| websocket, | |
| state, | |
| runtime, | |
| fallback_generation, | |
| "Sorry, give me a second.", | |
| voice_prompt_path, | |
| ) | |
| except asyncio.CancelledError: | |
| return | |
| async def _send_reply_unit( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| generation: int, | |
| display_text: str, | |
| audio_stream, | |
| ) -> bool: | |
| iterator = audio_stream.__aiter__() | |
| did_send_audio = False | |
| first_chunk: np.ndarray | None = None | |
| try: | |
| first_chunk = await iterator.__anext__() | |
| if runtime.last_tts_first_chunk_ready_at == 0.0: | |
| runtime.last_tts_first_chunk_ready_at = time.monotonic() | |
| except StopAsyncIteration: | |
| first_chunk = None | |
| if not _generation_is_current(runtime, generation): | |
| return False | |
| if runtime.last_assistant_text_at == 0.0: | |
| runtime.last_assistant_text_at = time.monotonic() | |
| runtime.last_assistant_reply = display_text.strip() | |
| if not await _send_json(websocket, runtime, {"type": "assistant.text", "text": display_text}): | |
| raise WebSocketDisconnect | |
| if first_chunk is not None: | |
| await _send_audio_chunk(websocket, state, runtime, generation, first_chunk) | |
| did_send_audio = True | |
| async for audio_chunk in iterator: | |
| if not _generation_is_current(runtime, generation): | |
| return did_send_audio | |
| await _send_audio_chunk(websocket, state, runtime, generation, audio_chunk) | |
| did_send_audio = True | |
| return did_send_audio | |
| async def _maybe_emit_partial_transcript( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| *, | |
| input_sample_rate: int, | |
| ) -> None: | |
| if not settings.partial_transcripts_enabled: | |
| return | |
| if runtime.partial_transcript_task is not None and not runtime.partial_transcript_task.done(): | |
| return | |
| if state.turn_speech_ms < settings.partial_transcript_min_ms: | |
| return | |
| now = time.monotonic() | |
| interval_s = max(settings.partial_transcript_interval_ms, 0) / 1000.0 | |
| if state.last_partial_transcript_at and now - state.last_partial_transcript_at < interval_s: | |
| return | |
| snapshot = state.current_audio() | |
| if snapshot.size == 0: | |
| return | |
| state.last_partial_transcript_at = now | |
| runtime.partial_transcript_task = asyncio.create_task( | |
| _emit_partial_transcript(websocket, state, runtime, snapshot, input_sample_rate) | |
| ) | |
| async def _emit_partial_transcript( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| audio: np.ndarray, | |
| input_sample_rate: int, | |
| ) -> None: | |
| try: | |
| transcription_audio = resample_audio(audio, input_sample_rate, settings.sample_rate) | |
| keep_edge_ms = settings.short_utterance_keep_edge_ms if _is_short_utterance(transcription_audio) else 0 | |
| transcription_audio = trim_silence( | |
| transcription_audio, | |
| settings.sample_rate, | |
| settings.transcription_trim_threshold, | |
| keep_edge_ms=keep_edge_ms, | |
| ) | |
| if transcription_audio.size == 0: | |
| return | |
| transcript = await pipeline.transcribe_partial(transcription_audio) | |
| cleaned = transcript.strip() | |
| if cleaned == state.last_partial_transcript_text: | |
| return | |
| state.last_partial_transcript_text = cleaned | |
| state.last_partial_transcript_change_at = time.monotonic() | |
| if not await _send_json(websocket, runtime, {"type": "transcript.partial", "text": cleaned}): | |
| raise WebSocketDisconnect | |
| except asyncio.CancelledError: | |
| return | |
| finally: | |
| current_task = asyncio.current_task() | |
| if runtime.partial_transcript_task is current_task: | |
| runtime.partial_transcript_task = None | |
| async def _clear_partial_transcript(websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime) -> None: | |
| await _cancel_task(runtime.partial_transcript_task) | |
| runtime.partial_transcript_task = None | |
| if not state.last_partial_transcript_text: | |
| return | |
| state.last_partial_transcript_text = "" | |
| state.last_partial_transcript_change_at = 0.0 | |
| await _send_json(websocket, runtime, {"type": "transcript.partial", "text": ""}) | |
| async def _maybe_start_partial_response( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| voice_prompt_path: str | None, | |
| ) -> None: | |
| if not settings.partial_response_enabled: | |
| return | |
| if settings.assistant_backend == "my-agent-cli": | |
| return | |
| transcript = state.last_partial_transcript_text.strip() | |
| if not transcript: | |
| return | |
| if runtime.response_task is not None: | |
| return | |
| if transcript == state.last_partial_response_text: | |
| return | |
| endpoint = _dynamic_endpoint_decision(state) | |
| if not endpoint.complete or endpoint.incomplete: | |
| return | |
| if endpoint.question_like or _starts_with_question_starter(transcript): | |
| return | |
| word_count = len(re.findall(r"\w+", transcript)) | |
| if word_count > settings.partial_response_complete_max_words: | |
| return | |
| min_words = settings.partial_response_min_words | |
| if endpoint.complete: | |
| min_words = min(min_words, settings.partial_response_complete_min_words) | |
| elif endpoint.incomplete: | |
| min_words = max(min_words, settings.partial_response_incomplete_min_words) | |
| if word_count < min_words: | |
| return | |
| if state.silence_ms < settings.partial_response_min_silence_ms: | |
| return | |
| if not state.last_partial_transcript_change_at: | |
| return | |
| stable_ms = settings.partial_response_stable_ms | |
| if endpoint.complete: | |
| stable_ms = min(stable_ms, settings.partial_response_complete_stable_ms) | |
| elif endpoint.incomplete: | |
| stable_ms = max(stable_ms, settings.partial_response_incomplete_stable_ms) | |
| stable_s = max(stable_ms, 0) / 1000.0 | |
| if time.monotonic() - state.last_partial_transcript_change_at < stable_s: | |
| return | |
| state.last_partial_response_text = transcript | |
| generation = _next_generation(runtime) | |
| _start_response_task( | |
| websocket, | |
| state, | |
| runtime, | |
| transcript, | |
| runtime.last_transcript_language, | |
| voice_prompt_path, | |
| generation, | |
| is_partial=True, | |
| ) | |
| async def _process_agent_cli_reply( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| transcript: str, | |
| transcript_language: str | None, | |
| voice_prompt_path: str | None, | |
| generation: int, | |
| *, | |
| is_partial: bool, | |
| ) -> None: | |
| summary_reply_parts: list[str] = [] | |
| did_send_audio = False | |
| async for event in pipeline.stream_agent_cli_events(transcript, runtime.agent_session_id): | |
| if not _generation_is_current(runtime, generation): | |
| return | |
| kind = event.get("kind") | |
| if kind == "session_created": | |
| runtime.agent_session_id = event.get("newSessionId") or event.get("session_id") or runtime.agent_session_id | |
| continue | |
| if kind == "status": | |
| text = (event.get("text") or "").strip() | |
| if text: | |
| print(f"AGENT status={text}", file=sys.stderr) | |
| is_generic = _is_generic_agent_status(text) | |
| is_internal = _is_internal_agent_status(text) | |
| _mirror_cli_activity("Status", text) | |
| if not is_internal: | |
| if not await _send_json(websocket, runtime, {"type": "assistant.activity", "kind": "status", "text": text}): | |
| raise WebSocketDisconnect | |
| if not is_generic and not is_internal: | |
| chat_text = _status_text_for_chat(text) | |
| if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}): | |
| raise WebSocketDisconnect | |
| await _maybe_speak_progress_update(websocket, state, runtime, text, voice_prompt_path) | |
| continue | |
| if kind == "tool_use": | |
| tool_name = (event.get("toolName") or "").strip() | |
| tool_input = event.get("toolInput") or {} | |
| status_text = _tool_use_status_text(tool_name, tool_input) | |
| print(f"AGENT tool={status_text}", file=sys.stderr) | |
| _mirror_cli_activity("Tool", status_text) | |
| if not await _send_json( | |
| websocket, | |
| runtime, | |
| { | |
| "type": "assistant.activity", | |
| "kind": "tool_use", | |
| "toolName": tool_name, | |
| "toolInput": tool_input, | |
| "text": status_text, | |
| }, | |
| ): | |
| raise WebSocketDisconnect | |
| await _maybe_speak_progress_update(websocket, state, runtime, _tool_use_voice_text(tool_name), voice_prompt_path) | |
| continue | |
| if kind == "tool_result": | |
| result_text = _tool_result_summary(event.get("content")) | |
| _mirror_cli_activity("Result", result_text) | |
| if not await _send_json( | |
| websocket, | |
| runtime, | |
| { | |
| "type": "assistant.activity", | |
| "kind": "tool_result", | |
| "toolName": (event.get("toolName") or "").strip(), | |
| "text": result_text, | |
| "isError": bool(event.get("isError")), | |
| }, | |
| ): | |
| raise WebSocketDisconnect | |
| if event.get("isError"): | |
| error_text = (event.get("content") or "").strip() | |
| if error_text: | |
| print(f"AGENT error={error_text}", file=sys.stderr) | |
| chat_text = _status_text_for_chat(error_text) | |
| if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}): | |
| raise WebSocketDisconnect | |
| continue | |
| if kind == "error": | |
| error_text = (event.get("content") or "").strip() | |
| if error_text: | |
| print(f"AGENT error={error_text}", file=sys.stderr) | |
| _mirror_cli_activity("Error", error_text) | |
| if not await _send_json( | |
| websocket, | |
| runtime, | |
| {"type": "assistant.activity", "kind": "error", "text": error_text}, | |
| ): | |
| raise WebSocketDisconnect | |
| chat_text = _status_text_for_chat(error_text) | |
| if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}): | |
| raise WebSocketDisconnect | |
| continue | |
| if kind == "text" and (event.get("role") or "") == "assistant": | |
| content = (event.get("content") or "").strip() | |
| if not content: | |
| continue | |
| cli_text = _normalize_cli_reply_text(content) | |
| summary_source = _summarize_agent_reply(content) | |
| display_text, tts_text = pipeline.render_assistant_text(summary_source, transcript) | |
| if not cli_text or not display_text or not tts_text: | |
| continue | |
| normalized_reply = display_text.strip().lower() | |
| if normalized_reply and normalized_reply == runtime.last_assistant_reply: | |
| continue | |
| runtime.last_assistant_reply = normalized_reply | |
| print(f"AGENT reply={display_text}", file=sys.stderr) | |
| _mirror_cli_turn("Agent", cli_text) | |
| summary_reply_parts.append(display_text) | |
| if not await _send_json( | |
| websocket, | |
| runtime, | |
| {"type": "assistant.activity", "kind": "message", "text": display_text}, | |
| ): | |
| raise WebSocketDisconnect | |
| if not await _send_json(websocket, runtime, {"type": "assistant.text", "text": display_text}): | |
| raise WebSocketDisconnect | |
| sent_audio = await _stream_reply_audio( | |
| websocket, | |
| state, | |
| runtime, | |
| generation, | |
| pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path), | |
| ) | |
| did_send_audio = did_send_audio or sent_audio | |
| if did_send_audio: | |
| state.set_assistant_active(0.0, settings.assistant_holdoff_ms) | |
| if not is_partial and summary_reply_parts: | |
| _remember_conversation_turn(runtime, transcript, " ".join(summary_reply_parts).strip()) | |
| if _generation_is_current(runtime, generation): | |
| if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": " ".join(summary_reply_parts).strip()}): | |
| raise WebSocketDisconnect | |
| def _next_generation(runtime: SessionRuntime) -> int: | |
| runtime.response_generation += 1 | |
| return runtime.response_generation | |
| def _generation_is_current(runtime: SessionRuntime, generation: int) -> bool: | |
| return runtime.response_generation == generation | |
| def _remember_conversation_turn(runtime: SessionRuntime, user_text: str, assistant_text: str) -> None: | |
| if not settings.conversation_memory_enabled: | |
| return | |
| user_clean = user_text.strip() | |
| assistant_clean = assistant_text.strip() | |
| if not user_clean or not assistant_clean: | |
| return | |
| runtime.conversation_history.append({"role": "user", "content": user_clean}) | |
| runtime.conversation_history.append({"role": "assistant", "content": assistant_clean}) | |
| max_messages = max(settings.conversation_memory_turns, 0) * 2 | |
| if max_messages > 0 and len(runtime.conversation_history) > max_messages: | |
| runtime.conversation_history[:] = runtime.conversation_history[-max_messages:] | |
| max_chars = max(settings.conversation_memory_max_chars, 0) | |
| if max_chars <= 0: | |
| return | |
| while runtime.conversation_history: | |
| total_chars = sum(len(item.get("content", "")) for item in runtime.conversation_history) | |
| if total_chars <= max_chars: | |
| break | |
| runtime.conversation_history.pop(0) | |
| def _starts_with_question_starter(transcript: str) -> bool: | |
| return bool(QUESTION_STARTER_PATTERN.match(transcript.strip())) | |
| async def _interrupt_active_assistant( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| *, | |
| notify_client: bool = True, | |
| ) -> int | None: | |
| response_task = runtime.response_task | |
| assistant_active = state.should_ignore_input() | |
| superseded_transcript = runtime.active_response_transcript.strip() | |
| superseded_partial = runtime.active_response_is_partial | |
| user_visible_interrupt = bool( | |
| response_task is not None and not runtime.active_response_is_partial and runtime.active_response_transcript.strip() | |
| ) | |
| if response_task is None and not assistant_active: | |
| return None | |
| new_generation = _next_generation(runtime) | |
| state.interrupt_assistant() | |
| await _cancel_task(runtime.backchannel_task) | |
| runtime.backchannel_task = None | |
| await _cancel_task(runtime.response_watchdog_task) | |
| runtime.response_watchdog_task = None | |
| if response_task is not None: | |
| if superseded_transcript: | |
| print( | |
| "ASSISTANT superseded " | |
| f"partial={int(superseded_partial)} " | |
| f"transcript={superseded_transcript!r}", | |
| file=sys.stderr, | |
| ) | |
| runtime.response_task = None | |
| response_task.cancel() | |
| runtime.agent_session_id = None | |
| pipeline.reset_assistant_session() | |
| runtime.active_response_transcript = "" | |
| runtime.active_response_is_partial = False | |
| if user_visible_interrupt: | |
| _mirror_cli_interrupt() | |
| if notify_client and user_visible_interrupt: | |
| if not await _send_json(websocket, runtime, {"type": "assistant.interrupted"}): | |
| raise WebSocketDisconnect | |
| return new_generation | |
| async def _interrupt_response( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| *, | |
| notify_client: bool = True, | |
| ) -> bool: | |
| response_task = runtime.response_task | |
| utterance_task = runtime.utterance_task | |
| assistant_active = state.should_ignore_input() | |
| superseded_transcript = runtime.active_response_transcript.strip() | |
| superseded_partial = runtime.active_response_is_partial | |
| user_visible_interrupt = bool( | |
| utterance_task is not None | |
| or (response_task is not None and not runtime.active_response_is_partial and runtime.active_response_transcript.strip()) | |
| ) | |
| if response_task is None and utterance_task is None and not assistant_active: | |
| return False | |
| _next_generation(runtime) | |
| state.interrupt_assistant() | |
| await _cancel_task(runtime.partial_transcript_task) | |
| runtime.partial_transcript_task = None | |
| await _cancel_task(runtime.backchannel_task) | |
| runtime.backchannel_task = None | |
| await _cancel_task(runtime.response_watchdog_task) | |
| runtime.response_watchdog_task = None | |
| if utterance_task is not None: | |
| runtime.utterance_task = None | |
| utterance_task.cancel() | |
| if response_task is not None: | |
| if superseded_transcript: | |
| print( | |
| "ASSISTANT superseded " | |
| f"partial={int(superseded_partial)} " | |
| f"transcript={superseded_transcript!r}", | |
| file=sys.stderr, | |
| ) | |
| runtime.response_task = None | |
| response_task.cancel() | |
| if utterance_task is not None or response_task is not None: | |
| runtime.agent_session_id = None | |
| pipeline.reset_assistant_session() | |
| runtime.active_response_transcript = "" | |
| runtime.active_response_is_partial = False | |
| if user_visible_interrupt: | |
| _mirror_cli_interrupt() | |
| if notify_client and user_visible_interrupt: | |
| if not await _send_json(websocket, runtime, {"type": "assistant.interrupted"}): | |
| raise WebSocketDisconnect | |
| return True | |
| async def _send_json(websocket: WebSocket, runtime: SessionRuntime, payload: dict) -> bool: | |
| try: | |
| async with runtime.send_lock: | |
| await websocket.send_json(payload) | |
| return True | |
| except (WebSocketDisconnect, RuntimeError): | |
| return False | |
| async def _send_bytes(websocket: WebSocket, runtime: SessionRuntime, payload: bytes) -> bool: | |
| try: | |
| async with runtime.send_lock: | |
| await websocket.send_bytes(payload) | |
| return True | |
| except (WebSocketDisconnect, RuntimeError): | |
| return False | |
| async def _stream_reply_audio( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| generation: int, | |
| audio_stream, | |
| ) -> bool: | |
| did_send_audio = False | |
| async for audio_chunk in audio_stream: | |
| if not _generation_is_current(runtime, generation): | |
| return did_send_audio | |
| await _send_audio_chunk(websocket, state, runtime, generation, audio_chunk) | |
| did_send_audio = True | |
| return did_send_audio | |
| async def _send_audio_chunk( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| generation: int, | |
| audio_chunk: np.ndarray, | |
| ) -> None: | |
| if runtime.last_first_audio_at == 0.0: | |
| runtime.last_first_audio_at = time.monotonic() | |
| audio_duration_ms = (len(audio_chunk) / 24000.0) * 1000.0 | |
| state.set_assistant_active(audio_duration_ms, settings.assistant_holdoff_ms) | |
| state.set_barge_grace(settings.assistant_barge_in_grace_ms) | |
| for chunk in chunk_audio(audio_chunk, 2400): | |
| if not _generation_is_current(runtime, generation): | |
| return | |
| clipped = np.clip(chunk, -1.0, 1.0) | |
| pcm16 = (clipped * 32767.0).astype(np.int16) | |
| if not await _send_bytes(websocket, runtime, pcm16.tobytes()): | |
| raise WebSocketDisconnect | |
| if not await _send_json(websocket, runtime, {"type": "assistant.audio_chunk_end"}): | |
| raise WebSocketDisconnect | |
| def _log_turn_latency(runtime: SessionRuntime, marker: str) -> None: | |
| speech_end_at = runtime.last_speech_end_at | |
| if not speech_end_at: | |
| return | |
| transcript_ms = ( | |
| (runtime.last_transcript_final_at - speech_end_at) * 1000.0 | |
| if runtime.last_transcript_final_at | |
| else -1.0 | |
| ) | |
| text_ms = ( | |
| (runtime.last_assistant_text_at - speech_end_at) * 1000.0 | |
| if runtime.last_assistant_text_at | |
| else -1.0 | |
| ) | |
| llm_ms = ( | |
| (runtime.last_llm_first_sentence_at - speech_end_at) * 1000.0 | |
| if runtime.last_llm_first_sentence_at | |
| else -1.0 | |
| ) | |
| tts_ready_ms = ( | |
| (runtime.last_tts_first_chunk_ready_at - speech_end_at) * 1000.0 | |
| if runtime.last_tts_first_chunk_ready_at | |
| else -1.0 | |
| ) | |
| audio_ms = ( | |
| (runtime.last_first_audio_at - speech_end_at) * 1000.0 | |
| if runtime.last_first_audio_at | |
| else -1.0 | |
| ) | |
| done_ms = (time.monotonic() - speech_end_at) * 1000.0 | |
| print( | |
| "LATENCY " | |
| f"marker={marker} " | |
| f"speech_end_to_transcript_ms={transcript_ms:.1f} " | |
| f"speech_end_to_first_llm_sentence_ms={llm_ms:.1f} " | |
| f"speech_end_to_first_tts_chunk_ready_ms={tts_ready_ms:.1f} " | |
| f"speech_end_to_text_ms={text_ms:.1f} " | |
| f"speech_end_to_first_audio_ms={audio_ms:.1f} " | |
| f"speech_end_to_done_ms={done_ms:.1f}", | |
| file=sys.stderr, | |
| ) | |
| def _is_short_utterance(audio: np.ndarray) -> bool: | |
| duration_ms = (audio.size / settings.sample_rate) * 1000.0 | |
| return duration_ms <= settings.short_utterance_ms | |
| def _should_skip_transcription( | |
| audio: np.ndarray, | |
| audio_rms: float, | |
| audio_peak: float, | |
| *, | |
| was_barge_in: bool = False, | |
| ) -> bool: | |
| if was_barge_in and audio.size > 0: | |
| return False | |
| duration_ms = (audio.size / settings.sample_rate) * 1000.0 | |
| min_samples = int((settings.min_utterance_ms / 1000.0) * settings.sample_rate) | |
| short_utterance = _is_short_utterance(audio) | |
| min_rms = settings.short_utterance_min_rms if short_utterance else settings.transcription_min_rms | |
| min_peak = settings.short_utterance_min_peak if short_utterance else settings.transcription_min_peak | |
| if audio_rms < min_rms or audio_peak < min_peak: | |
| return True | |
| if short_utterance and duration_ms < settings.min_utterance_ms: | |
| return True | |
| if audio.size >= min_samples: | |
| return False | |
| return not short_utterance | |
| async def _register_session(connection: SessionConnection) -> None: | |
| async with ACTIVE_SESSIONS_LOCK: | |
| ACTIVE_SESSIONS[connection.runtime.session_id] = connection | |
| async def _unregister_session(session_id: str) -> None: | |
| async with ACTIVE_SESSIONS_LOCK: | |
| ACTIVE_SESSIONS.pop(session_id, None) | |
| async def _resolve_target_sessions(session_id: str | None) -> list[SessionConnection]: | |
| async with ACTIVE_SESSIONS_LOCK: | |
| if session_id: | |
| connection = ACTIVE_SESSIONS.get(session_id) | |
| return [connection] if connection is not None else [] | |
| if len(ACTIVE_SESSIONS) == 1: | |
| return list(ACTIVE_SESSIONS.values()) | |
| return [] | |
| async def _queue_notification(connection: SessionConnection, notification: AssistantNotification) -> None: | |
| if not await _send_json( | |
| connection.websocket, | |
| connection.runtime, | |
| {"type": notification.event_type, "text": notification.text}, | |
| ): | |
| return | |
| if notification.speak and connection.runtime.notification_queue is not None: | |
| await connection.runtime.notification_queue.put(notification) | |
| async def _notification_worker(connection: SessionConnection) -> None: | |
| runtime = connection.runtime | |
| queue = runtime.notification_queue | |
| if queue is None: | |
| return | |
| try: | |
| while True: | |
| notification = await queue.get() | |
| if not notification.text.strip(): | |
| continue | |
| if not await _wait_for_notification_window(connection.state, runtime): | |
| continue | |
| await _speak_side_channel( | |
| connection.websocket, | |
| connection.state, | |
| runtime, | |
| notification.text, | |
| connection.voice_prompt_path, | |
| event_type=notification.event_type, | |
| ) | |
| except asyncio.CancelledError: | |
| return | |
| async def _wait_for_notification_window(state: UtteranceState, runtime: SessionRuntime) -> bool: | |
| deadline = time.monotonic() + max(settings.notification_max_wait_ms, 0) / 1000.0 | |
| while time.monotonic() < deadline: | |
| if runtime.response_task is None and runtime.utterance_task is None and not state.in_speech and not state.should_ignore_input(): | |
| return True | |
| await asyncio.sleep(0.1) | |
| return False | |
| async def _maybe_trigger_backchannel( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| voice_prompt_path: str | None, | |
| ) -> None: | |
| if not settings.backchannel_enabled: | |
| return | |
| if runtime.response_task is not None or runtime.utterance_task is not None or state.should_ignore_input(): | |
| return | |
| if time.monotonic() < runtime.backchannel_audio_until: | |
| return | |
| if runtime.backchannel_task is not None and not runtime.backchannel_task.done(): | |
| return | |
| now = time.monotonic() | |
| if state.turn_speech_ms < settings.backchannel_min_speech_ms: | |
| return | |
| if state.active_speech_ms < settings.backchannel_stable_ms: | |
| return | |
| if (now - state.last_backchannel_at) * 1000.0 < settings.backchannel_min_interval_ms: | |
| return | |
| text = _choose_backchannel_text(state) | |
| state.last_backchannel_at = now | |
| state.backchannel_count += 1 | |
| runtime.backchannel_task = asyncio.create_task( | |
| _speak_side_channel( | |
| websocket, | |
| state, | |
| runtime, | |
| text, | |
| voice_prompt_path, | |
| event_type="assistant.backchannel", | |
| ) | |
| ) | |
| def _choose_backchannel_text(state: UtteranceState) -> str: | |
| transcript = state.last_partial_transcript_text.strip().lower() | |
| empathetic = ("I hear you", "yeah", "right", "okay") | |
| questionish = ("right", "okay", "got it", "go on") | |
| neutral_early = ("uh-huh", "yeah", "right", "go on") | |
| neutral_late = ("I hear you", "okay", "got it", "all right") | |
| if re.search(r"\b(frustrat|annoy|upset|worried|stress|issue|problem|hard|difficult|stuck)\w*", transcript): | |
| pool = empathetic | |
| elif "?" in transcript or re.search(r"\b(why|how|what|when|where|who|can you|do you|should i)\b", transcript): | |
| pool = questionish | |
| else: | |
| pool = neutral_early if state.backchannel_count == 0 else neutral_late | |
| limit = max(settings.backchannel_recent_limit, 1) | |
| recent = set(state.recent_backchannels[-limit:]) | |
| options = [choice for choice in pool if choice not in recent] or list(pool) | |
| choice = random.choice(options) | |
| state.recent_backchannels.append(choice) | |
| if len(state.recent_backchannels) > limit: | |
| del state.recent_backchannels[:-limit] | |
| return choice | |
| def _dynamic_endpoint_decision(state: UtteranceState) -> EndpointDecision: | |
| base_stop_ms = float(settings.vad_stop_ms) | |
| if not settings.dynamic_endpointing_enabled: | |
| return EndpointDecision( | |
| stop_ms=base_stop_ms, | |
| complete=False, | |
| incomplete=False, | |
| question_like=False, | |
| ) | |
| transcript = state.last_partial_transcript_text.strip() | |
| tokens = re.findall(r"[a-z']+", transcript.lower()) | |
| word_count = len(tokens) | |
| last_word = tokens[-1] if tokens else "" | |
| first_word = tokens[0] if tokens else "" | |
| if not transcript: | |
| stop_ms = base_stop_ms | |
| if state.turn_speech_ms <= 900: | |
| stop_ms -= settings.dynamic_endpointing_no_partial_short_discount_ms | |
| elif state.turn_speech_ms <= 1800: | |
| stop_ms -= settings.dynamic_endpointing_no_partial_medium_discount_ms | |
| stop_ms = max(settings.dynamic_endpointing_min_ms, stop_ms) | |
| stop_ms = min(settings.dynamic_endpointing_max_ms, stop_ms) | |
| return EndpointDecision( | |
| stop_ms=float(stop_ms), | |
| complete=False, | |
| incomplete=False, | |
| question_like=False, | |
| ) | |
| incomplete_tail_words = { | |
| "a", "an", "the", "and", "or", "but", "so", "because", "if", "when", "while", | |
| "that", "which", "who", "whose", "whom", "to", "for", "from", "with", "of", | |
| "in", "on", "at", "by", "into", "about", "like", "as", "than", "then", | |
| "is", "are", "was", "were", "am", "be", "been", "being", "do", "does", "did", | |
| "have", "has", "had", "can", "could", "should", "would", "will", "just", | |
| "my", "your", "our", "their", "this", "these", "those", "it", "i'm", "you're", | |
| "we're", "they're", "he's", "she's", "then", "then i", "then we", | |
| } | |
| filler_tail_words = {"um", "uh", "er", "ah", "hmm", "mm", "like"} | |
| continuation_phrases = ( | |
| "i think", "i mean", "for example", "so if", "and then", "because if", | |
| "what i", "what if", "the thing is", "it feels like", | |
| ) | |
| question_starters = ( | |
| "what", "why", "how", "when", "where", "who", "which", "can", "could", | |
| "would", "should", "do", "does", "did", "is", "are", "am", "will", | |
| ) | |
| ends_with_terminal_punctuation = bool(re.search(r"[.!?]['\"]?\s*$", transcript)) | |
| ends_with_commaish_pause = bool(re.search(r"[,;:]\s*$", transcript)) | |
| starts_with_continuation = any(transcript.lower().startswith(phrase) for phrase in continuation_phrases) | |
| open_clause_pattern = re.search( | |
| r"\b(" | |
| r"trying to|want to|wanted to|need to|needed to|going to|about to|" | |
| r"tell me how|tell me what|tell me why|show me how|" | |
| r"how to|what to|where to|why the|why it|" | |
| r"open the|open a|fix the|fix a" | |
| r")\b(?:\s+\w+){0,3}\s*$", | |
| transcript.lower(), | |
| ) | |
| request_for_instructions = re.search( | |
| r"\b(can|could|would|will)\s+you\s+tell\s+me\s+how\s+to\b(?:\s+\w+){0,4}\s*$", | |
| transcript.lower(), | |
| ) | |
| question_like = transcript.endswith("?") or ( | |
| word_count >= 4 and tokens and first_word in question_starters | |
| ) | |
| has_recent_partial = bool(transcript) and state.silence_ms < max(settings.vad_stop_ms, 1) * 0.8 | |
| incomplete = ( | |
| bool(transcript) | |
| and not ends_with_terminal_punctuation | |
| and ( | |
| last_word in incomplete_tail_words | |
| or last_word in filler_tail_words | |
| or ends_with_commaish_pause | |
| or re.search(r"\.\.\.\s*$", transcript) is not None | |
| or starts_with_continuation | |
| or open_clause_pattern is not None | |
| or request_for_instructions is not None | |
| or re.search(r"\b(and|or|but|so|because|if|when|while|then)\s+(the|a|an|my|your|our|their|it|this|that)\s*$", transcript.lower()) is not None | |
| or re.search(r"\b(i|we|they|he|she)\s+(was|were|am|are|is|have|had|would|could|should|will)\s*$", transcript.lower()) is not None | |
| or (has_recent_partial and word_count < 12 and not ends_with_terminal_punctuation) | |
| ) | |
| ) | |
| complete = ( | |
| bool(transcript) | |
| and word_count >= 3 | |
| and not incomplete | |
| and (ends_with_terminal_punctuation or question_like) | |
| ) | |
| stop_ms = base_stop_ms | |
| if transcript and word_count <= 2 and state.turn_speech_ms < 900: | |
| stop_ms += settings.dynamic_endpointing_short_utterance_bias_ms | |
| if incomplete: | |
| stop_ms += settings.dynamic_endpointing_incomplete_bias_ms | |
| if complete: | |
| stop_ms -= settings.dynamic_endpointing_complete_discount_ms | |
| if question_like and complete: | |
| stop_ms -= settings.dynamic_endpointing_question_discount_ms | |
| stop_ms = max(settings.dynamic_endpointing_min_ms, stop_ms) | |
| stop_ms = min(settings.dynamic_endpointing_max_ms, stop_ms) | |
| return EndpointDecision( | |
| stop_ms=float(stop_ms), | |
| complete=complete, | |
| incomplete=incomplete, | |
| question_like=question_like, | |
| ) | |
| async def _maybe_speak_progress_update( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| text: str, | |
| voice_prompt_path: str | None, | |
| ) -> None: | |
| if not text.strip(): | |
| return | |
| now = time.monotonic() | |
| if (now - runtime.last_progress_voice_at) * 1000.0 < settings.agent_progress_speak_min_interval_ms: | |
| return | |
| runtime.last_progress_voice_at = now | |
| await _speak_side_channel( | |
| websocket, | |
| state, | |
| runtime, | |
| text, | |
| voice_prompt_path, | |
| event_type="assistant.status", | |
| ) | |
| def _tool_use_status_text(tool_name: str, tool_input: object) -> str: | |
| if not isinstance(tool_input, dict): | |
| return tool_name or "working" | |
| preview = "" | |
| for key in ("path", "command", "url", "pattern", "name_pattern", "query", "tool_name"): | |
| value = tool_input.get(key) | |
| if isinstance(value, str) and value.strip(): | |
| preview = value.strip() | |
| break | |
| if preview: | |
| preview = preview if len(preview) <= 72 else f"{preview[:69]}..." | |
| return f"{tool_name}: {preview}" | |
| return tool_name or "working" | |
| def _status_text_for_chat(text: str) -> str: | |
| cleaned = _normalize_cli_reply_text(text) | |
| if not cleaned: | |
| return "" | |
| if _is_internal_agent_status(cleaned): | |
| return "" | |
| if len(cleaned) <= 120: | |
| return cleaned | |
| return f"{cleaned[:117].rstrip()}..." | |
| def _is_generic_agent_status(text: str) -> bool: | |
| normalized = text.strip().lower().rstrip(".") | |
| return normalized in { | |
| "working on it", | |
| "thinking", | |
| "building a plan", | |
| "starting multi-step work", | |
| "complex task detected; switching to orchestrate mode", | |
| "simple task detected; using tools", | |
| } | |
| def _is_internal_agent_status(text: str) -> bool: | |
| normalized = text.strip().lower() | |
| return ( | |
| "injected relevant context from memory" in normalized | |
| or "relevant context from memory" in normalized | |
| or "context from memory" in normalized | |
| or normalized == "from memory" | |
| ) | |
| def _normalize_cli_reply_text(text: str) -> str: | |
| cleaned = ANSI_ESCAPE_PATTERN.sub("", text).replace("\r", "\n") | |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) | |
| return cleaned.strip() | |
| def _summarize_agent_reply(text: str) -> str: | |
| cleaned = _normalize_cli_reply_text(text) | |
| if not cleaned: | |
| return "" | |
| cleaned = FENCED_CODE_PATTERN.sub(" ", cleaned) | |
| cleaned = MARKDOWN_LINK_PATTERN.sub(r"\1", cleaned) | |
| cleaned = INLINE_CODE_PATTERN.sub(r"\1", cleaned) | |
| parts: list[str] = [] | |
| for raw_line in cleaned.splitlines(): | |
| line = raw_line.strip() | |
| if not line or TABLE_ROW_PATTERN.match(line): | |
| continue | |
| if _is_internal_agent_status(line): | |
| continue | |
| line = re.sub(r"^#{1,6}\s*", "", line) | |
| line = LIST_PREFIX_PATTERN.sub("", line) | |
| line = re.sub(r"^\s*>\s*", "", line) | |
| line = re.sub(r"\*\*(.*?)\*\*", r"\1", line) | |
| line = re.sub(r"__(.*?)__", r"\1", line) | |
| line = re.sub(r"(?<!\*)\*([^*]+)\*(?!\*)", r"\1", line) | |
| line = NON_SPOKEN_CHARS_PATTERN.sub(" ", line) | |
| line = re.sub(r"\s+", " ", line).strip() | |
| if not line or RAWISH_LINE_PATTERN.search(line): | |
| continue | |
| if len(re.findall(r"\w+", line)) < 3: | |
| continue | |
| if line: | |
| parts.append(line) | |
| flattened = re.sub(r"\s+", " ", " ".join(parts)).strip() | |
| if not flattened: | |
| flattened = _fallback_agent_reply_summary(cleaned) | |
| if not flattened: | |
| return "" | |
| sentence_matches = re.findall(r"[^.!?]+[.!?]?", flattened) | |
| summary_parts: list[str] = [] | |
| current_len = 0 | |
| for sentence in sentence_matches: | |
| normalized = sentence.strip() | |
| if not normalized: | |
| continue | |
| projected = current_len + len(normalized) + (1 if summary_parts else 0) | |
| if summary_parts and projected > 150: | |
| break | |
| summary_parts.append(normalized) | |
| current_len = projected | |
| if len(summary_parts) >= 2: | |
| break | |
| summary = " ".join(summary_parts).strip() or flattened | |
| summary = _scrub_code_artifacts(summary) | |
| if not summary: | |
| return "I found it." | |
| return summary if len(summary) <= 150 else f"{summary[:147].rstrip()}..." | |
| def _fallback_agent_reply_summary(text: str) -> str: | |
| sentences = re.findall(r"[^.!?\n]+[.!?]?", text) | |
| for sentence in sentences: | |
| normalized = re.sub(r"\s+", " ", sentence).strip() | |
| if not normalized: | |
| continue | |
| if _is_internal_agent_status(normalized): | |
| continue | |
| if RAWISH_LINE_PATTERN.search(normalized): | |
| continue | |
| word_count = len(re.findall(r"\w+", normalized)) | |
| if word_count < 3: | |
| continue | |
| return normalized | |
| return "I found it." | |
| def _scrub_code_artifacts(text: str) -> str: | |
| scrubbed = re.sub(r"\b\w+/\w+[-\w./]*\.\w{1,5}\b", "", text) # relative paths | |
| scrubbed = re.sub(r"/[-\w./]{3,}", "", scrubbed) # absolute paths | |
| scrubbed = re.sub(r"\$\w+", "", scrubbed) # shell vars | |
| scrubbed = re.sub(r"\b\w+=\S+", "", scrubbed) # key=value | |
| scrubbed = NON_SPOKEN_CHARS_PATTERN.sub(" ", scrubbed) | |
| scrubbed = re.sub(r"\s+", " ", scrubbed).strip() | |
| scrubbed = re.sub(r"\s+([,.:;!?])", r"\1", scrubbed) | |
| return scrubbed | |
| def _mirror_cli_turn(speaker: str, text: str) -> None: | |
| cleaned = text.strip() | |
| if not cleaned: | |
| return | |
| print(f"{speaker}: {cleaned}", flush=True) | |
| if speaker.lower() == "you": | |
| return | |
| _mirror_to_agent_terminal(_format_cli_turn(speaker, cleaned)) | |
| _last_mirror_activity_at: float = 0.0 | |
| _MIRROR_ACTIVITY_MIN_INTERVAL_S: float = 0.4 | |
| _mirror_activity_skipped: int = 0 | |
| def _mirror_cli_activity(label: str, text: str) -> None: | |
| global _last_mirror_activity_at, _mirror_activity_skipped | |
| cleaned = _normalize_cli_reply_text(text) | |
| if not cleaned: | |
| return | |
| now = time.monotonic() | |
| elapsed = now - _last_mirror_activity_at | |
| if elapsed < _MIRROR_ACTIVITY_MIN_INTERVAL_S: | |
| _mirror_activity_skipped += 1 | |
| return | |
| skipped = _mirror_activity_skipped | |
| _mirror_activity_skipped = 0 | |
| _last_mirror_activity_at = now | |
| prefix = f" \x1b[90m(+{skipped} more)\x1b[0m\n" if skipped > 0 else "" | |
| _mirror_to_agent_terminal(f"{prefix}{_format_cli_activity(label, cleaned)}") | |
| def _mirror_cli_interrupt() -> None: | |
| _mirror_to_agent_terminal("\n\x1b[90m── interrupted ──\x1b[0m\n") | |
| def _format_cli_turn(speaker: str, text: str) -> str: | |
| is_user = speaker.lower() == "you" | |
| if is_user: | |
| return f"\n\x1b[32;1m›\x1b[0m \x1b[32m{text}\x1b[0m\n" | |
| return f"{text}\n" | |
| def _format_cli_activity(label: str, text: str) -> str: | |
| first_line, *rest = text.splitlines() or [text] | |
| summary = first_line.strip() | |
| if rest: | |
| summary = f"{summary} ..." | |
| summary = summary if len(summary) <= 120 else f"{summary[:117].rstrip()}..." | |
| lowered = label.lower() | |
| if summary == "💭 Injected relevant context from memory": | |
| return f"{summary}\n" | |
| if lowered == "status" and summary.lower().rstrip(".") in { | |
| "working on it", | |
| "thinking", | |
| "building a plan", | |
| "starting multi-step work", | |
| "complex task detected; switching to orchestrate mode", | |
| "simple task detected; using tools", | |
| }: | |
| return f" {summary}\n" | |
| if lowered == "tool": | |
| return f" \x1b[34mtool\x1b[0m {summary}\n" | |
| if lowered == "result": | |
| return f" \x1b[90mresult\x1b[0m {summary}\n" | |
| if lowered == "error": | |
| return f" \x1b[31merror\x1b[0m {summary}\n" | |
| return f" \x1b[90m…\x1b[0m {summary}\n" | |
| def _mirror_to_agent_terminal(text: str) -> None: | |
| tty_path = _resolve_agent_terminal_tty() | |
| if not tty_path: | |
| return | |
| try: | |
| payload = text | |
| if not payload.endswith("\n"): | |
| payload = f"{payload}\n" | |
| with open(tty_path, "a", encoding="utf-8", errors="ignore") as handle: | |
| handle.write(payload) | |
| handle.flush() | |
| except OSError: | |
| return | |
| def _resolve_agent_terminal_tty() -> str | None: | |
| configured = (settings.my_agent_active_tty_file or "").strip() | |
| if configured: | |
| marker_path = Path(configured) | |
| else: | |
| marker_path = Path(settings.my_agent_cwd) / ".active-terminal-tty" | |
| try: | |
| tty_path = marker_path.read_text(encoding="utf-8").strip() | |
| except OSError: | |
| return None | |
| if not tty_path.startswith("/dev/pts/") and tty_path != "/dev/tty": | |
| return None | |
| if not os.path.exists(tty_path): | |
| return None | |
| return tty_path | |
| def _tool_use_voice_text(tool_name: str) -> str: | |
| normalized = tool_name.lower() | |
| if "browser" in normalized or "web" in normalized: | |
| return "I'm checking that." | |
| if "file" in normalized or "path" in normalized: | |
| return "I'm looking through the files." | |
| if "command" in normalized or "shell" in normalized: | |
| return "I'm working on it in the terminal." | |
| if "search" in normalized: | |
| return "I'm looking into that." | |
| return "I'm working on it." | |
| def _tool_result_summary(content: object) -> str: | |
| if isinstance(content, str): | |
| text = content.strip() | |
| elif isinstance(content, list): | |
| parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| text_value = item.get("text") | |
| if isinstance(text_value, str) and text_value.strip(): | |
| parts.append(text_value.strip()) | |
| elif isinstance(item, str) and item.strip(): | |
| parts.append(item.strip()) | |
| text = " ".join(parts).strip() | |
| else: | |
| text = "" | |
| if not text: | |
| return "done" | |
| text = FENCED_CODE_PATTERN.sub(" ", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| if not text: | |
| return "done" | |
| segments = re.split(r"(?<=[.!?])\s+|\n+", text) | |
| for segment in segments: | |
| normalized = segment.strip() | |
| if not normalized or RAWISH_LINE_PATTERN.search(normalized): | |
| continue | |
| return normalized if len(normalized) <= 100 else f"{normalized[:97].rstrip()}..." | |
| return "done" | |
| async def _speak_side_channel( | |
| websocket: WebSocket, | |
| state: UtteranceState, | |
| runtime: SessionRuntime, | |
| text: str, | |
| voice_prompt_path: str | None, | |
| *, | |
| event_type: str, | |
| ) -> None: | |
| try: | |
| should_send_backchannel_audio = event_type != "assistant.backchannel" or state.in_speech | |
| display_text, tts_text = pipeline.render_assistant_text(text, "") | |
| if not display_text or not tts_text: | |
| return | |
| if event_type == "assistant.backchannel": | |
| if not await _send_json(websocket, runtime, {"type": event_type, "text": display_text}): | |
| raise WebSocketDisconnect | |
| if event_type == "assistant.backchannel": | |
| audio_chunks = await pipeline.synthesize_backchannel(tts_text, voice_prompt_path=voice_prompt_path) | |
| else: | |
| audio_chunks = await pipeline.synthesize_sentences(tts_text, voice_prompt_path=voice_prompt_path) | |
| if event_type == "assistant.backchannel" and not should_send_backchannel_audio: | |
| return | |
| if event_type == "assistant.backchannel": | |
| total_duration_s = sum(len(chunk) / 24000.0 for chunk in audio_chunks) | |
| runtime.backchannel_audio_until = time.monotonic() + total_duration_s + 0.15 | |
| for audio_chunk in audio_chunks: | |
| for chunk in chunk_audio(audio_chunk, 2400): | |
| clipped = np.clip(chunk, -1.0, 1.0) | |
| pcm16 = (clipped * 32767.0).astype(np.int16) | |
| if not await _send_bytes(websocket, runtime, pcm16.tobytes()): | |
| raise WebSocketDisconnect | |
| if not await _send_json(websocket, runtime, {"type": "assistant.audio_chunk_end"}): | |
| raise WebSocketDisconnect | |
| except (asyncio.CancelledError, WebSocketDisconnect): | |
| return | |
| async def _cancel_task(task: asyncio.Task[None] | None) -> None: | |
| if task is None: | |
| return | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |