voice-agent / app /main.py
RalphThings's picture
Deploy Hugging Face Space
5f0a2ac
from __future__ import annotations
import asyncio
import json
import os
import random
import re
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
from uuid import uuid4
import numpy as np
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from app.audio import chunk_audio, frame_duration_ms, pcm16_bytes_to_float32, peak, resample_audio, rms, trim_silence
from app.config import settings
from app.models import UtteranceState
from app.speaker import SpeakerProfile, evaluate_speaker_focus
from app.speech import pipeline
from app.vad import VadStream
BASE_DIR = Path(__file__).resolve().parent.parent
STATIC_DIR = BASE_DIR / settings.static_dir
app = FastAPI(title="Voice Latency Lab")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
@dataclass
class SessionRuntime:
session_id: str
send_lock: asyncio.Lock
response_generation: int = 0
utterance_task: asyncio.Task[None] | None = None
response_task: asyncio.Task[None] | None = None
response_watchdog_task: asyncio.Task[None] | None = None
partial_transcript_task: asyncio.Task[None] | None = None
notification_task: asyncio.Task[None] | None = None
backchannel_task: asyncio.Task[None] | None = None
notification_queue: asyncio.Queue["AssistantNotification"] | None = None
agent_session_id: str | None = None
last_progress_voice_at: float = 0.0
last_assistant_reply: str = ""
active_response_transcript: str = ""
active_response_is_partial: bool = False
backchannel_audio_until: float = 0.0
last_speech_end_at: float = 0.0
last_transcript_final_at: float = 0.0
last_llm_first_sentence_at: float = 0.0
last_tts_first_chunk_ready_at: float = 0.0
last_assistant_text_at: float = 0.0
last_first_audio_at: float = 0.0
speaker_profile: SpeakerProfile | None = None
conversation_history: list[dict[str, str]] = field(default_factory=list)
last_transcript_language: str | None = None
@dataclass
class AssistantNotification:
text: str
event_type: Literal["assistant.status", "assistant.notification"]
speak: bool = True
@dataclass(frozen=True)
class EndpointDecision:
stop_ms: float
complete: bool
incomplete: bool
question_like: bool
@dataclass
class SessionConnection:
websocket: WebSocket
state: UtteranceState
runtime: SessionRuntime
voice_prompt_path: str | None = None
class AgentNotificationRequest(BaseModel):
text: str
session_id: str | None = None
event_type: Literal["assistant.status", "assistant.notification"] = "assistant.notification"
speak: bool = True
ACTIVE_SESSIONS: dict[str, SessionConnection] = {}
ACTIVE_SESSIONS_LOCK = asyncio.Lock()
ANSI_ESCAPE_PATTERN = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
FENCED_CODE_PATTERN = re.compile(r"```.*?```", flags=re.S)
INLINE_CODE_PATTERN = re.compile(r"`([^`]+)`")
MARKDOWN_LINK_PATTERN = re.compile(r"\[([^\]]+)\]\([^)]+\)")
TABLE_ROW_PATTERN = re.compile(r"^\s*[|│].*[|│]\s*$")
LIST_PREFIX_PATTERN = re.compile(r"^\s*(?:[-*+]\s+|\d+\.\s+)")
RAWISH_LINE_PATTERN = re.compile(
r"(```|^\s*[{[]|[}\]]\s*$|::|=>|&&|\|\||"
r"/[-\w./]{3,}|\\[-\w.\\]{3,}|" # absolute paths
r"\b\w+/\w+[-\w./]*\.\w{1,5}\b|" # relative paths like src/foo.rs
r"\$\w+|" # shell variables
r"\b\w+=\S+)" # bare key=value assignments
)
NON_SPOKEN_CHARS_PATTERN = re.compile(r"[{}[\]<>`|$#~^]")
QUESTION_STARTER_PATTERN = re.compile(
r"^\s*(?:what|how|why|when|where|who|which|can|could|would|should|do|does|did|is|are|am|will|tell me)\b",
flags=re.IGNORECASE,
)
def _parakeet_bypass_silero_vad() -> bool:
return settings.stt_backend == "parakeet-tdt-v3" and settings.parakeet_bypass_silero_vad
def _normalize_compare_text(text: str) -> str:
cleaned = re.sub(r"^(?:you|user|agent|assistant)\s*:\s*", "", (text or "").strip(), flags=re.IGNORECASE)
cleaned = re.sub(r"[^\w\s]", " ", cleaned)
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned.strip().lower()
def _looks_like_assistant_echo(runtime: SessionRuntime, transcript: str) -> bool:
if runtime.last_assistant_text_at == 0.0:
return False
if (time.monotonic() - runtime.last_assistant_text_at) > 3.0:
return False
transcript_text = _normalize_compare_text(transcript)
assistant_text = _normalize_compare_text(runtime.last_assistant_reply)
if not transcript_text or not assistant_text:
return False
if transcript_text == assistant_text:
return True
if transcript_text.startswith(assistant_text) or assistant_text.startswith(transcript_text):
return True
transcript_words = transcript_text.split()
if len(transcript_words) >= 4 and transcript_text in assistant_text:
return True
return False
@app.on_event("startup")
async def preload_models() -> None:
if not settings.preload_models:
return
print("Startup preload: loading STT/TTS/assistant models", file=sys.stderr)
await asyncio.to_thread(pipeline.preload_models)
await pipeline.preload_assistant()
print("Startup preload: complete", file=sys.stderr)
@app.get("/")
async def root() -> FileResponse:
return FileResponse(STATIC_DIR / "index.html")
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/sessions")
async def list_sessions() -> dict[str, list[str]]:
async with ACTIVE_SESSIONS_LOCK:
session_ids = sorted(ACTIVE_SESSIONS.keys())
return {"sessionIds": session_ids}
@app.post("/agent/notify")
async def agent_notify(payload: AgentNotificationRequest) -> dict[str, object]:
targets = await _resolve_target_sessions(payload.session_id)
if not targets:
return {"ok": False, "error": "no_active_session"}
delivered = 0
for connection in targets:
await _queue_notification(
connection,
AssistantNotification(
text=payload.text,
event_type=payload.event_type,
speak=payload.speak,
),
)
delivered += 1
return {"ok": True, "delivered": delivered, "sessionIds": [connection.runtime.session_id for connection in targets]}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
await websocket.accept()
state = UtteranceState(sample_rate=settings.sample_rate)
runtime = SessionRuntime(
session_id=uuid4().hex[:8],
send_lock=asyncio.Lock(),
notification_queue=asyncio.Queue(),
speaker_profile=SpeakerProfile(),
)
vad = VadStream()
preroll_samples = max(1, int((settings.preroll_ms / 1000.0) * state.sample_rate))
voice_prompt_path: str | None = None
connection = SessionConnection(websocket=websocket, state=state, runtime=runtime)
await _register_session(connection)
runtime.notification_task = asyncio.create_task(_notification_worker(connection))
await _send_json(
websocket,
runtime,
{
"type": "session.ready",
"sessionId": runtime.session_id,
"sampleRate": settings.sample_rate,
"assistantBackend": pipeline.assistant_backend_metadata()[0],
"assistantModel": pipeline.assistant_backend_metadata()[1],
"vad": {
"provider": "rms-parakeet" if _parakeet_bypass_silero_vad() else vad.provider_name,
"threshold": settings.silero_vad_threshold,
"startMs": settings.parakeet_vad_start_ms if _parakeet_bypass_silero_vad() else settings.silero_vad_start_ms,
"stopMs": settings.parakeet_vad_stop_ms if _parakeet_bypass_silero_vad() else settings.vad_stop_ms,
},
}
)
try:
while True:
try:
message = await websocket.receive()
except RuntimeError as exc:
if "disconnect message has been received" in str(exc):
break
raise
if message.get("type") == "websocket.disconnect":
break
if "bytes" in message and message["bytes"] is not None:
frame = pcm16_bytes_to_float32(message["bytes"])
await _handle_frame(websocket, state, runtime, vad, frame, preroll_samples, voice_prompt_path)
continue
text = message.get("text")
if not text:
continue
payload = json.loads(text)
event_type = payload.get("type")
if event_type == "session.configure":
voice_prompt_path = payload.get("voicePromptPath") or None
connection.voice_prompt_path = voice_prompt_path
client_sample_rate = int(payload.get("clientSampleRate") or settings.sample_rate)
state.sample_rate = client_sample_rate
preroll_samples = max(1, int((settings.preroll_ms / 1000.0) * state.sample_rate))
state.reset_input()
state.interrupt_assistant()
vad.reset()
await _interrupt_response(websocket, state, runtime, notify_client=False)
await _send_json(
websocket,
runtime,
{
"type": "session.configured",
"voicePromptPath": voice_prompt_path,
"clientSampleRate": client_sample_rate,
"serverSampleRate": settings.sample_rate,
}
)
elif event_type == "input.commit":
await _finalize_utterance(websocket, state, runtime, voice_prompt_path)
elif event_type == "ping":
if not await _send_json(websocket, runtime, {"type": "pong", "ts": payload.get("ts")}):
raise WebSocketDisconnect
except WebSocketDisconnect:
return
finally:
await _interrupt_response(websocket, state, runtime, notify_client=False)
await _cancel_task(runtime.backchannel_task)
await _cancel_task(runtime.notification_task)
await _unregister_session(runtime.session_id)
async def _handle_frame(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
vad: VadStream,
frame: np.ndarray,
preroll_samples: int,
voice_prompt_path: str | None,
) -> None:
raw_level = rms(frame)
if not await _send_json(websocket, runtime, {"type": "input.level", "value": round(raw_level, 5)}):
raise WebSocketDisconnect
duration_ms = frame_duration_ms(frame, state.sample_rate)
if _parakeet_bypass_silero_vad():
effective_level = vad.activity_level(frame, state.sample_rate)
speech_active = effective_level >= settings.parakeet_vad_rms_threshold
if effective_level >= settings.parakeet_vad_rms_threshold * 0.5:
print(
"VAD speech="
f"{int(speech_active)} active={int(speech_active)} prob={float(speech_active):.3f} "
f"level={raw_level:.5f} vad_level={effective_level:.5f} mode=rms-parakeet",
file=sys.stderr,
)
barge_probability = min(
1.0,
max(0.0, effective_level / max(settings.parakeet_barge_in_rms_threshold, 1e-6)),
)
barge_in_rms_threshold = max(
settings.parakeet_barge_in_rms_threshold,
settings.assistant_barge_in_min_rms,
)
barge_in_start_ms = max(
settings.parakeet_barge_in_start_ms,
settings.assistant_barge_in_start_ms,
)
speech_start_ms = settings.parakeet_vad_start_ms
stop_ms_when_barge = min(settings.parakeet_vad_stop_ms, 450)
else:
vad_result = vad.process(frame, state.sample_rate)
effective_level = vad_result.level
speech_active = vad_result.speech and (
effective_level >= settings.silero_vad_min_rms
or vad_result.probability >= settings.silero_vad_strong_threshold
)
if vad_result.probability >= 0.05:
print(
"VAD speech="
f"{int(vad_result.speech)} active={int(speech_active)} prob={vad_result.probability:.3f} "
f"level={raw_level:.5f} vad_level={effective_level:.5f}",
file=sys.stderr,
)
barge_probability = vad_result.probability
barge_in_rms_threshold = settings.assistant_barge_in_min_rms
barge_in_start_ms = settings.assistant_barge_in_start_ms
speech_start_ms = settings.silero_vad_start_ms
stop_ms_when_barge = min(settings.vad_stop_ms, 450)
state.push_preroll(frame, preroll_samples)
if state.should_ignore_input():
# Allow real barge-in once the assistant playback window has passed and the
# user is speaking with enough confidence. Otherwise keep discarding input.
barge_candidate = (
speech_active
and state.can_barge_in()
and effective_level >= barge_in_rms_threshold
and barge_probability >= settings.assistant_barge_in_prob_threshold
)
if barge_candidate:
state.pending_barge_ms += duration_ms
if state.pending_barge_ms >= barge_in_start_ms:
await _interrupt_response(websocket, state, runtime, notify_client=True)
state.clear_active_input(preserve_preroll=True)
state.barge_in_active = True
state.pending_speech_ms = settings.silero_vad_start_ms
state.start()
state.append(frame)
state.turn_speech_ms += duration_ms
state.active_speech_ms += duration_ms
print("VAD barge-in start", file=sys.stderr)
if not await _send_json(websocket, runtime, {"type": "input.speech_start", "ts": time.time()}):
raise WebSocketDisconnect
return
await _clear_partial_transcript(websocket, state, runtime)
state.pending_barge_ms = 0.0
state.clear_active_input(preserve_preroll=True)
return
if not state.in_speech:
if speech_active:
state.pending_speech_ms += duration_ms
if state.pending_speech_ms >= speech_start_ms:
state.start()
print("VAD turn start", file=sys.stderr)
if not await _send_json(websocket, runtime, {"type": "input.speech_start", "ts": time.time()}):
raise WebSocketDisconnect
else:
state.pending_speech_ms = 0.0
if state.in_speech:
state.append(frame)
await _maybe_emit_partial_transcript(websocket, state, runtime, input_sample_rate=state.sample_rate)
await _maybe_start_partial_response(websocket, state, runtime, voice_prompt_path)
if not speech_active:
state.silence_ms += duration_ms
state.active_speech_ms = 0.0
if state.barge_in_active and state.silence_ms >= stop_ms_when_barge:
print(
"VAD barge-in end "
f"silence_ms={state.silence_ms:.1f} "
f"target_ms={stop_ms_when_barge:.1f}",
file=sys.stderr,
)
await _finalize_utterance(websocket, state, runtime, voice_prompt_path)
return
if _parakeet_bypass_silero_vad():
endpoint = EndpointDecision(
stop_ms=float(settings.parakeet_vad_stop_ms),
complete=False,
incomplete=False,
question_like=False,
)
else:
endpoint = _dynamic_endpoint_decision(state)
state.dynamic_endpoint_target_ms = endpoint.stop_ms
if state.silence_ms >= endpoint.stop_ms:
print(
"VAD turn end "
f"silence_ms={state.silence_ms:.1f} "
f"target_ms={endpoint.stop_ms:.1f} "
f"complete={int(endpoint.complete)} "
f"incomplete={int(endpoint.incomplete)}",
file=sys.stderr,
)
await _finalize_utterance(websocket, state, runtime, voice_prompt_path)
else:
state.silence_ms = 0.0
state.dynamic_endpoint_target_ms = 0.0
state.turn_speech_ms += duration_ms
state.active_speech_ms += duration_ms
await _maybe_trigger_backchannel(websocket, state, runtime, voice_prompt_path)
if await _maybe_force_finalize_turn(websocket, state, runtime, voice_prompt_path):
return
async def _maybe_force_finalize_turn(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
voice_prompt_path: str | None,
) -> bool:
if not state.in_speech or not state.turn_started_at:
return False
now = time.monotonic()
turn_elapsed_ms = (now - state.turn_started_at) * 1000.0
stale_partial_ms = (
(now - state.last_partial_transcript_change_at) * 1000.0
if state.last_partial_transcript_change_at
else 0.0
)
has_stable_partial = bool(state.last_partial_transcript_text.strip()) and stale_partial_ms >= settings.dynamic_endpointing_stale_partial_ms
hit_max_turn = turn_elapsed_ms >= settings.dynamic_endpointing_max_turn_ms
if not has_stable_partial and not hit_max_turn:
return False
reason = "max_turn" if hit_max_turn else "stale_partial"
print(
"VAD forced turn end "
f"reason={reason} "
f"turn_ms={turn_elapsed_ms:.1f} "
f"stale_partial_ms={stale_partial_ms:.1f} "
f"partial={state.last_partial_transcript_text!r}",
file=sys.stderr,
)
await _finalize_utterance(websocket, state, runtime, voice_prompt_path)
return True
async def _finalize_utterance(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
voice_prompt_path: str | None,
) -> None:
input_sample_rate = state.sample_rate
await _cancel_task(runtime.partial_transcript_task)
runtime.partial_transcript_task = None
was_barge_in = state.barge_in_active
final_transcript_hint = state.last_partial_transcript_text.strip()
audio = state.finish()
if audio.size == 0:
return
await _cancel_task(runtime.backchannel_task)
runtime.backchannel_task = None
runtime.last_speech_end_at = time.monotonic()
runtime.last_transcript_final_at = 0.0
runtime.last_llm_first_sentence_at = 0.0
runtime.last_tts_first_chunk_ready_at = 0.0
runtime.last_assistant_text_at = 0.0
runtime.last_first_audio_at = 0.0
if not await _send_json(websocket, runtime, {"type": "input.speech_end", "samples": int(audio.size)}):
raise WebSocketDisconnect
await _send_json(websocket, runtime, {"type": "transcript.partial", "text": ""})
generation = _next_generation(runtime)
runtime.utterance_task = asyncio.create_task(
_process_utterance(
websocket,
state,
runtime,
audio,
input_sample_rate,
voice_prompt_path,
generation,
was_barge_in,
final_transcript_hint,
)
)
async def _process_utterance(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
audio: np.ndarray,
input_sample_rate: int,
voice_prompt_path: str | None,
generation: int,
was_barge_in: bool = False,
final_transcript_hint: str = "",
) -> None:
try:
runtime.last_assistant_reply = ""
transcription_audio = resample_audio(audio, input_sample_rate, settings.sample_rate)
keep_edge_ms = settings.short_utterance_keep_edge_ms if _is_short_utterance(transcription_audio) else 0
transcription_audio = trim_silence(
transcription_audio,
settings.sample_rate,
settings.transcription_trim_threshold,
keep_edge_ms=keep_edge_ms,
)
if transcription_audio.size == 0:
await _send_empty_reply(websocket, runtime, generation)
return
audio_rms = rms(transcription_audio)
audio_peak = peak(transcription_audio)
print(
f"STT utterance stats rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}",
file=sys.stderr,
)
if _should_skip_transcription(transcription_audio, audio_rms, audio_peak, was_barge_in=was_barge_in):
await _send_empty_reply(websocket, runtime, generation)
return
speaker_decision = evaluate_speaker_focus(
transcription_audio,
settings.sample_rate,
profile=runtime.speaker_profile or SpeakerProfile(),
enabled=settings.speaker_focus_enabled,
min_utterance_ms=settings.speaker_focus_min_utterance_ms,
min_rms=settings.speaker_focus_min_rms,
similarity_threshold=settings.speaker_focus_similarity_threshold,
profile_alpha=settings.speaker_focus_profile_alpha,
multi_speaker_threshold=settings.speaker_focus_multi_speaker_threshold,
reject_mixed=settings.speaker_focus_reject_mixed,
)
if settings.speaker_focus_debug:
similarity_text = (
f"{speaker_decision.similarity:.3f}"
if speaker_decision.similarity is not None
else "n/a"
)
print(
"SPEAKER focus "
f"reason={speaker_decision.reason} "
f"process={int(speaker_decision.should_process)} "
f"enrolled={int(speaker_decision.enrolled)} "
f"updated={int(speaker_decision.updated)} "
f"mixed={int(speaker_decision.mixed_speaker)} "
f"similarity={similarity_text} "
f"profile_updates={speaker_decision.profile_updates}",
file=sys.stderr,
)
if not speaker_decision.should_process:
print(
"SPEAKER filtered utterance "
f"reason={speaker_decision.reason} "
f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}",
file=sys.stderr,
)
await _send_empty_reply(websocket, runtime, generation)
return
transcript_result = await pipeline.transcribe(transcription_audio)
transcript = transcript_result.text
runtime.last_transcript_language = transcript_result.language
if transcript_result.language:
probability_text = (
f"{transcript_result.language_probability:.3f}"
if transcript_result.language_probability is not None
else "n/a"
)
print(
"STT language "
f"backend={transcript_result.backend or settings.stt_backend} "
f"language={transcript_result.language} "
f"probability={probability_text}",
file=sys.stderr,
)
if not transcript.strip() and audio_rms >= settings.transcription_min_rms and audio_peak >= settings.transcription_min_peak:
print(
"STT final transcript empty, retrying permissive fallback "
f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}",
file=sys.stderr,
)
fallback_result = await pipeline.transcribe_fallback(transcription_audio)
transcript = fallback_result.text
if fallback_result.language:
runtime.last_transcript_language = fallback_result.language
if pipeline.is_likely_hallucination(transcript, audio_rms):
print(f"STT hallucination filtered transcript={transcript!r} rms={audio_rms:.5f}", file=sys.stderr)
transcript = ""
if not transcript.strip() and final_transcript_hint:
hint_words = len(re.findall(r"\w+", final_transcript_hint))
if was_barge_in or hint_words >= 2:
print(
f"STT using partial transcript fallback hint={final_transcript_hint!r}",
file=sys.stderr,
)
transcript = final_transcript_hint
if transcript.strip() and _looks_like_assistant_echo(runtime, transcript):
print(f"STT playback echo filtered transcript={transcript!r}", file=sys.stderr)
transcript = ""
print(f"STT transcript={transcript!r}", file=sys.stderr)
if transcript.strip():
print(f"USER transcript={transcript.strip()}", file=sys.stderr)
_mirror_cli_turn("You", transcript.strip())
if not _generation_is_current(runtime, generation):
return
runtime.last_transcript_final_at = time.monotonic()
if not await _send_json(websocket, runtime, {"type": "transcript.final", "text": transcript}):
raise WebSocketDisconnect
if not transcript.strip():
print(
"STT empty transcript drop "
f"rms={audio_rms:.5f} peak={audio_peak:.5f} samples={transcription_audio.size}",
file=sys.stderr,
)
await _interrupt_active_assistant(websocket, state, runtime, notify_client=True)
if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": ""}):
raise WebSocketDisconnect
return
active_transcript = runtime.active_response_transcript.strip()
if (
runtime.response_task is not None
and runtime.active_response_is_partial
and active_transcript
and active_transcript == transcript.strip()
):
runtime.active_response_is_partial = False
return
interrupted_generation = await _interrupt_active_assistant(websocket, state, runtime, notify_client=True)
if interrupted_generation is not None:
response_generation = interrupted_generation
else:
if not _generation_is_current(runtime, generation):
return
response_generation = _next_generation(runtime)
_start_response_task(
websocket,
state,
runtime,
transcript.strip(),
runtime.last_transcript_language,
voice_prompt_path,
response_generation,
is_partial=False,
)
return
except asyncio.CancelledError:
return
finally:
current_task = asyncio.current_task()
if runtime.utterance_task is current_task:
runtime.utterance_task = None
async def _send_empty_reply(websocket: WebSocket, runtime: SessionRuntime, generation: int) -> None:
if not _generation_is_current(runtime, generation):
return
if not await _send_json(websocket, runtime, {"type": "transcript.final", "text": ""}):
raise WebSocketDisconnect
if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": ""}):
raise WebSocketDisconnect
def _start_response_task(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
transcript: str,
transcript_language: str | None,
voice_prompt_path: str | None,
generation: int,
*,
is_partial: bool,
retry_count: int = 0,
) -> None:
runtime.active_response_transcript = transcript
runtime.active_response_is_partial = is_partial
runtime.response_task = asyncio.create_task(
_respond_to_transcript(
websocket,
state,
runtime,
transcript,
transcript_language,
voice_prompt_path,
generation,
is_partial=is_partial,
)
)
if runtime.response_watchdog_task is not None:
runtime.response_watchdog_task.cancel()
runtime.response_watchdog_task = None
if not is_partial:
runtime.response_watchdog_task = asyncio.create_task(
_response_watchdog(
websocket,
state,
runtime,
transcript,
transcript_language,
voice_prompt_path,
generation,
retry_count,
)
)
async def _respond_to_transcript(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
transcript: str,
transcript_language: str | None,
voice_prompt_path: str | None,
generation: int,
*,
is_partial: bool,
) -> None:
try:
if settings.assistant_backend == "my-agent-cli":
await _process_agent_cli_reply(
websocket,
state,
runtime,
transcript,
transcript_language,
voice_prompt_path,
generation,
is_partial=is_partial,
)
return
full_reply_parts: list[str] = []
did_send_audio = False
async for sentence in pipeline.stream_reply_sentences(
transcript,
conversation_history=runtime.conversation_history,
response_language=transcript_language,
):
if not _generation_is_current(runtime, generation):
return
if not sentence:
continue
if runtime.last_llm_first_sentence_at == 0.0:
runtime.last_llm_first_sentence_at = time.monotonic()
display_text, tts_text = pipeline.render_assistant_text(sentence, transcript)
if not display_text or not tts_text:
continue
full_reply_parts.append(display_text)
sent_audio = await _send_reply_unit(
websocket,
state,
runtime,
generation,
display_text,
pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path),
)
did_send_audio = did_send_audio or sent_audio
if not full_reply_parts:
fallback_reply = "Sorry, give me a second." if transcript.strip() else "I didn't catch that."
print("LLM reply stream empty, using fallback reply", file=sys.stderr)
display_text, tts_text = pipeline.render_assistant_text(fallback_reply, transcript)
full_reply_parts.append(display_text)
sent_audio = await _send_reply_unit(
websocket,
state,
runtime,
generation,
display_text,
pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path),
)
did_send_audio = did_send_audio or sent_audio
if did_send_audio:
state.set_assistant_active(0.0, settings.assistant_holdoff_ms)
if not is_partial and full_reply_parts:
_remember_conversation_turn(runtime, transcript, " ".join(full_reply_parts).strip())
if _generation_is_current(runtime, generation):
_log_turn_latency(runtime, "assistant.done")
if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": " ".join(full_reply_parts).strip()}):
raise WebSocketDisconnect
except asyncio.CancelledError:
return
finally:
current_task = asyncio.current_task()
if runtime.response_task is current_task:
runtime.response_task = None
runtime.active_response_transcript = ""
runtime.active_response_is_partial = False
if runtime.response_watchdog_task is not None:
runtime.response_watchdog_task.cancel()
runtime.response_watchdog_task = None
async def _send_fallback_reply(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
generation: int,
text: str,
voice_prompt_path: str | None,
) -> None:
display_text, tts_text = pipeline.render_assistant_text(text, "")
if not display_text or not tts_text:
return
sent_audio = await _send_reply_unit(
websocket,
state,
runtime,
generation,
display_text,
pipeline.stream_reply_audio(tts_text, "", voice_prompt_path=voice_prompt_path),
)
if sent_audio:
state.set_assistant_active(0.0, settings.assistant_holdoff_ms)
if _generation_is_current(runtime, generation):
_log_turn_latency(runtime, "assistant.done")
if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": display_text}):
raise WebSocketDisconnect
async def _response_watchdog(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
transcript: str,
transcript_language: str | None,
voice_prompt_path: str | None,
generation: int,
retry_count: int,
) -> None:
try:
await asyncio.sleep(max(settings.assistant_response_watchdog_ms, 0) / 1000.0)
if not _generation_is_current(runtime, generation):
return
if runtime.last_assistant_text_at != 0.0:
return
if runtime.response_task is None:
return
if runtime.active_response_is_partial or runtime.active_response_transcript.strip() != transcript.strip():
return
print(
f"LLM response watchdog triggered transcript={transcript!r} retry_count={retry_count}",
file=sys.stderr,
)
runtime.response_task.cancel()
runtime.response_task = None
runtime.active_response_transcript = ""
runtime.active_response_is_partial = False
if retry_count < 1:
retry_generation = _next_generation(runtime)
_start_response_task(
websocket,
state,
runtime,
transcript,
transcript_language,
voice_prompt_path,
retry_generation,
is_partial=False,
retry_count=retry_count + 1,
)
return
fallback_generation = _next_generation(runtime)
await _send_fallback_reply(
websocket,
state,
runtime,
fallback_generation,
"Sorry, give me a second.",
voice_prompt_path,
)
except asyncio.CancelledError:
return
async def _send_reply_unit(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
generation: int,
display_text: str,
audio_stream,
) -> bool:
iterator = audio_stream.__aiter__()
did_send_audio = False
first_chunk: np.ndarray | None = None
try:
first_chunk = await iterator.__anext__()
if runtime.last_tts_first_chunk_ready_at == 0.0:
runtime.last_tts_first_chunk_ready_at = time.monotonic()
except StopAsyncIteration:
first_chunk = None
if not _generation_is_current(runtime, generation):
return False
if runtime.last_assistant_text_at == 0.0:
runtime.last_assistant_text_at = time.monotonic()
runtime.last_assistant_reply = display_text.strip()
if not await _send_json(websocket, runtime, {"type": "assistant.text", "text": display_text}):
raise WebSocketDisconnect
if first_chunk is not None:
await _send_audio_chunk(websocket, state, runtime, generation, first_chunk)
did_send_audio = True
async for audio_chunk in iterator:
if not _generation_is_current(runtime, generation):
return did_send_audio
await _send_audio_chunk(websocket, state, runtime, generation, audio_chunk)
did_send_audio = True
return did_send_audio
async def _maybe_emit_partial_transcript(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
*,
input_sample_rate: int,
) -> None:
if not settings.partial_transcripts_enabled:
return
if runtime.partial_transcript_task is not None and not runtime.partial_transcript_task.done():
return
if state.turn_speech_ms < settings.partial_transcript_min_ms:
return
now = time.monotonic()
interval_s = max(settings.partial_transcript_interval_ms, 0) / 1000.0
if state.last_partial_transcript_at and now - state.last_partial_transcript_at < interval_s:
return
snapshot = state.current_audio()
if snapshot.size == 0:
return
state.last_partial_transcript_at = now
runtime.partial_transcript_task = asyncio.create_task(
_emit_partial_transcript(websocket, state, runtime, snapshot, input_sample_rate)
)
async def _emit_partial_transcript(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
audio: np.ndarray,
input_sample_rate: int,
) -> None:
try:
transcription_audio = resample_audio(audio, input_sample_rate, settings.sample_rate)
keep_edge_ms = settings.short_utterance_keep_edge_ms if _is_short_utterance(transcription_audio) else 0
transcription_audio = trim_silence(
transcription_audio,
settings.sample_rate,
settings.transcription_trim_threshold,
keep_edge_ms=keep_edge_ms,
)
if transcription_audio.size == 0:
return
transcript = await pipeline.transcribe_partial(transcription_audio)
cleaned = transcript.strip()
if cleaned == state.last_partial_transcript_text:
return
state.last_partial_transcript_text = cleaned
state.last_partial_transcript_change_at = time.monotonic()
if not await _send_json(websocket, runtime, {"type": "transcript.partial", "text": cleaned}):
raise WebSocketDisconnect
except asyncio.CancelledError:
return
finally:
current_task = asyncio.current_task()
if runtime.partial_transcript_task is current_task:
runtime.partial_transcript_task = None
async def _clear_partial_transcript(websocket: WebSocket, state: UtteranceState, runtime: SessionRuntime) -> None:
await _cancel_task(runtime.partial_transcript_task)
runtime.partial_transcript_task = None
if not state.last_partial_transcript_text:
return
state.last_partial_transcript_text = ""
state.last_partial_transcript_change_at = 0.0
await _send_json(websocket, runtime, {"type": "transcript.partial", "text": ""})
async def _maybe_start_partial_response(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
voice_prompt_path: str | None,
) -> None:
if not settings.partial_response_enabled:
return
if settings.assistant_backend == "my-agent-cli":
return
transcript = state.last_partial_transcript_text.strip()
if not transcript:
return
if runtime.response_task is not None:
return
if transcript == state.last_partial_response_text:
return
endpoint = _dynamic_endpoint_decision(state)
if not endpoint.complete or endpoint.incomplete:
return
if endpoint.question_like or _starts_with_question_starter(transcript):
return
word_count = len(re.findall(r"\w+", transcript))
if word_count > settings.partial_response_complete_max_words:
return
min_words = settings.partial_response_min_words
if endpoint.complete:
min_words = min(min_words, settings.partial_response_complete_min_words)
elif endpoint.incomplete:
min_words = max(min_words, settings.partial_response_incomplete_min_words)
if word_count < min_words:
return
if state.silence_ms < settings.partial_response_min_silence_ms:
return
if not state.last_partial_transcript_change_at:
return
stable_ms = settings.partial_response_stable_ms
if endpoint.complete:
stable_ms = min(stable_ms, settings.partial_response_complete_stable_ms)
elif endpoint.incomplete:
stable_ms = max(stable_ms, settings.partial_response_incomplete_stable_ms)
stable_s = max(stable_ms, 0) / 1000.0
if time.monotonic() - state.last_partial_transcript_change_at < stable_s:
return
state.last_partial_response_text = transcript
generation = _next_generation(runtime)
_start_response_task(
websocket,
state,
runtime,
transcript,
runtime.last_transcript_language,
voice_prompt_path,
generation,
is_partial=True,
)
async def _process_agent_cli_reply(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
transcript: str,
transcript_language: str | None,
voice_prompt_path: str | None,
generation: int,
*,
is_partial: bool,
) -> None:
summary_reply_parts: list[str] = []
did_send_audio = False
async for event in pipeline.stream_agent_cli_events(transcript, runtime.agent_session_id):
if not _generation_is_current(runtime, generation):
return
kind = event.get("kind")
if kind == "session_created":
runtime.agent_session_id = event.get("newSessionId") or event.get("session_id") or runtime.agent_session_id
continue
if kind == "status":
text = (event.get("text") or "").strip()
if text:
print(f"AGENT status={text}", file=sys.stderr)
is_generic = _is_generic_agent_status(text)
is_internal = _is_internal_agent_status(text)
_mirror_cli_activity("Status", text)
if not is_internal:
if not await _send_json(websocket, runtime, {"type": "assistant.activity", "kind": "status", "text": text}):
raise WebSocketDisconnect
if not is_generic and not is_internal:
chat_text = _status_text_for_chat(text)
if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}):
raise WebSocketDisconnect
await _maybe_speak_progress_update(websocket, state, runtime, text, voice_prompt_path)
continue
if kind == "tool_use":
tool_name = (event.get("toolName") or "").strip()
tool_input = event.get("toolInput") or {}
status_text = _tool_use_status_text(tool_name, tool_input)
print(f"AGENT tool={status_text}", file=sys.stderr)
_mirror_cli_activity("Tool", status_text)
if not await _send_json(
websocket,
runtime,
{
"type": "assistant.activity",
"kind": "tool_use",
"toolName": tool_name,
"toolInput": tool_input,
"text": status_text,
},
):
raise WebSocketDisconnect
await _maybe_speak_progress_update(websocket, state, runtime, _tool_use_voice_text(tool_name), voice_prompt_path)
continue
if kind == "tool_result":
result_text = _tool_result_summary(event.get("content"))
_mirror_cli_activity("Result", result_text)
if not await _send_json(
websocket,
runtime,
{
"type": "assistant.activity",
"kind": "tool_result",
"toolName": (event.get("toolName") or "").strip(),
"text": result_text,
"isError": bool(event.get("isError")),
},
):
raise WebSocketDisconnect
if event.get("isError"):
error_text = (event.get("content") or "").strip()
if error_text:
print(f"AGENT error={error_text}", file=sys.stderr)
chat_text = _status_text_for_chat(error_text)
if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}):
raise WebSocketDisconnect
continue
if kind == "error":
error_text = (event.get("content") or "").strip()
if error_text:
print(f"AGENT error={error_text}", file=sys.stderr)
_mirror_cli_activity("Error", error_text)
if not await _send_json(
websocket,
runtime,
{"type": "assistant.activity", "kind": "error", "text": error_text},
):
raise WebSocketDisconnect
chat_text = _status_text_for_chat(error_text)
if chat_text and not await _send_json(websocket, runtime, {"type": "assistant.status", "text": chat_text}):
raise WebSocketDisconnect
continue
if kind == "text" and (event.get("role") or "") == "assistant":
content = (event.get("content") or "").strip()
if not content:
continue
cli_text = _normalize_cli_reply_text(content)
summary_source = _summarize_agent_reply(content)
display_text, tts_text = pipeline.render_assistant_text(summary_source, transcript)
if not cli_text or not display_text or not tts_text:
continue
normalized_reply = display_text.strip().lower()
if normalized_reply and normalized_reply == runtime.last_assistant_reply:
continue
runtime.last_assistant_reply = normalized_reply
print(f"AGENT reply={display_text}", file=sys.stderr)
_mirror_cli_turn("Agent", cli_text)
summary_reply_parts.append(display_text)
if not await _send_json(
websocket,
runtime,
{"type": "assistant.activity", "kind": "message", "text": display_text},
):
raise WebSocketDisconnect
if not await _send_json(websocket, runtime, {"type": "assistant.text", "text": display_text}):
raise WebSocketDisconnect
sent_audio = await _stream_reply_audio(
websocket,
state,
runtime,
generation,
pipeline.stream_reply_audio(tts_text, transcript, voice_prompt_path=voice_prompt_path),
)
did_send_audio = did_send_audio or sent_audio
if did_send_audio:
state.set_assistant_active(0.0, settings.assistant_holdoff_ms)
if not is_partial and summary_reply_parts:
_remember_conversation_turn(runtime, transcript, " ".join(summary_reply_parts).strip())
if _generation_is_current(runtime, generation):
if not await _send_json(websocket, runtime, {"type": "assistant.done", "text": " ".join(summary_reply_parts).strip()}):
raise WebSocketDisconnect
def _next_generation(runtime: SessionRuntime) -> int:
runtime.response_generation += 1
return runtime.response_generation
def _generation_is_current(runtime: SessionRuntime, generation: int) -> bool:
return runtime.response_generation == generation
def _remember_conversation_turn(runtime: SessionRuntime, user_text: str, assistant_text: str) -> None:
if not settings.conversation_memory_enabled:
return
user_clean = user_text.strip()
assistant_clean = assistant_text.strip()
if not user_clean or not assistant_clean:
return
runtime.conversation_history.append({"role": "user", "content": user_clean})
runtime.conversation_history.append({"role": "assistant", "content": assistant_clean})
max_messages = max(settings.conversation_memory_turns, 0) * 2
if max_messages > 0 and len(runtime.conversation_history) > max_messages:
runtime.conversation_history[:] = runtime.conversation_history[-max_messages:]
max_chars = max(settings.conversation_memory_max_chars, 0)
if max_chars <= 0:
return
while runtime.conversation_history:
total_chars = sum(len(item.get("content", "")) for item in runtime.conversation_history)
if total_chars <= max_chars:
break
runtime.conversation_history.pop(0)
def _starts_with_question_starter(transcript: str) -> bool:
return bool(QUESTION_STARTER_PATTERN.match(transcript.strip()))
async def _interrupt_active_assistant(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
*,
notify_client: bool = True,
) -> int | None:
response_task = runtime.response_task
assistant_active = state.should_ignore_input()
superseded_transcript = runtime.active_response_transcript.strip()
superseded_partial = runtime.active_response_is_partial
user_visible_interrupt = bool(
response_task is not None and not runtime.active_response_is_partial and runtime.active_response_transcript.strip()
)
if response_task is None and not assistant_active:
return None
new_generation = _next_generation(runtime)
state.interrupt_assistant()
await _cancel_task(runtime.backchannel_task)
runtime.backchannel_task = None
await _cancel_task(runtime.response_watchdog_task)
runtime.response_watchdog_task = None
if response_task is not None:
if superseded_transcript:
print(
"ASSISTANT superseded "
f"partial={int(superseded_partial)} "
f"transcript={superseded_transcript!r}",
file=sys.stderr,
)
runtime.response_task = None
response_task.cancel()
runtime.agent_session_id = None
pipeline.reset_assistant_session()
runtime.active_response_transcript = ""
runtime.active_response_is_partial = False
if user_visible_interrupt:
_mirror_cli_interrupt()
if notify_client and user_visible_interrupt:
if not await _send_json(websocket, runtime, {"type": "assistant.interrupted"}):
raise WebSocketDisconnect
return new_generation
async def _interrupt_response(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
*,
notify_client: bool = True,
) -> bool:
response_task = runtime.response_task
utterance_task = runtime.utterance_task
assistant_active = state.should_ignore_input()
superseded_transcript = runtime.active_response_transcript.strip()
superseded_partial = runtime.active_response_is_partial
user_visible_interrupt = bool(
utterance_task is not None
or (response_task is not None and not runtime.active_response_is_partial and runtime.active_response_transcript.strip())
)
if response_task is None and utterance_task is None and not assistant_active:
return False
_next_generation(runtime)
state.interrupt_assistant()
await _cancel_task(runtime.partial_transcript_task)
runtime.partial_transcript_task = None
await _cancel_task(runtime.backchannel_task)
runtime.backchannel_task = None
await _cancel_task(runtime.response_watchdog_task)
runtime.response_watchdog_task = None
if utterance_task is not None:
runtime.utterance_task = None
utterance_task.cancel()
if response_task is not None:
if superseded_transcript:
print(
"ASSISTANT superseded "
f"partial={int(superseded_partial)} "
f"transcript={superseded_transcript!r}",
file=sys.stderr,
)
runtime.response_task = None
response_task.cancel()
if utterance_task is not None or response_task is not None:
runtime.agent_session_id = None
pipeline.reset_assistant_session()
runtime.active_response_transcript = ""
runtime.active_response_is_partial = False
if user_visible_interrupt:
_mirror_cli_interrupt()
if notify_client and user_visible_interrupt:
if not await _send_json(websocket, runtime, {"type": "assistant.interrupted"}):
raise WebSocketDisconnect
return True
async def _send_json(websocket: WebSocket, runtime: SessionRuntime, payload: dict) -> bool:
try:
async with runtime.send_lock:
await websocket.send_json(payload)
return True
except (WebSocketDisconnect, RuntimeError):
return False
async def _send_bytes(websocket: WebSocket, runtime: SessionRuntime, payload: bytes) -> bool:
try:
async with runtime.send_lock:
await websocket.send_bytes(payload)
return True
except (WebSocketDisconnect, RuntimeError):
return False
async def _stream_reply_audio(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
generation: int,
audio_stream,
) -> bool:
did_send_audio = False
async for audio_chunk in audio_stream:
if not _generation_is_current(runtime, generation):
return did_send_audio
await _send_audio_chunk(websocket, state, runtime, generation, audio_chunk)
did_send_audio = True
return did_send_audio
async def _send_audio_chunk(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
generation: int,
audio_chunk: np.ndarray,
) -> None:
if runtime.last_first_audio_at == 0.0:
runtime.last_first_audio_at = time.monotonic()
audio_duration_ms = (len(audio_chunk) / 24000.0) * 1000.0
state.set_assistant_active(audio_duration_ms, settings.assistant_holdoff_ms)
state.set_barge_grace(settings.assistant_barge_in_grace_ms)
for chunk in chunk_audio(audio_chunk, 2400):
if not _generation_is_current(runtime, generation):
return
clipped = np.clip(chunk, -1.0, 1.0)
pcm16 = (clipped * 32767.0).astype(np.int16)
if not await _send_bytes(websocket, runtime, pcm16.tobytes()):
raise WebSocketDisconnect
if not await _send_json(websocket, runtime, {"type": "assistant.audio_chunk_end"}):
raise WebSocketDisconnect
def _log_turn_latency(runtime: SessionRuntime, marker: str) -> None:
speech_end_at = runtime.last_speech_end_at
if not speech_end_at:
return
transcript_ms = (
(runtime.last_transcript_final_at - speech_end_at) * 1000.0
if runtime.last_transcript_final_at
else -1.0
)
text_ms = (
(runtime.last_assistant_text_at - speech_end_at) * 1000.0
if runtime.last_assistant_text_at
else -1.0
)
llm_ms = (
(runtime.last_llm_first_sentence_at - speech_end_at) * 1000.0
if runtime.last_llm_first_sentence_at
else -1.0
)
tts_ready_ms = (
(runtime.last_tts_first_chunk_ready_at - speech_end_at) * 1000.0
if runtime.last_tts_first_chunk_ready_at
else -1.0
)
audio_ms = (
(runtime.last_first_audio_at - speech_end_at) * 1000.0
if runtime.last_first_audio_at
else -1.0
)
done_ms = (time.monotonic() - speech_end_at) * 1000.0
print(
"LATENCY "
f"marker={marker} "
f"speech_end_to_transcript_ms={transcript_ms:.1f} "
f"speech_end_to_first_llm_sentence_ms={llm_ms:.1f} "
f"speech_end_to_first_tts_chunk_ready_ms={tts_ready_ms:.1f} "
f"speech_end_to_text_ms={text_ms:.1f} "
f"speech_end_to_first_audio_ms={audio_ms:.1f} "
f"speech_end_to_done_ms={done_ms:.1f}",
file=sys.stderr,
)
def _is_short_utterance(audio: np.ndarray) -> bool:
duration_ms = (audio.size / settings.sample_rate) * 1000.0
return duration_ms <= settings.short_utterance_ms
def _should_skip_transcription(
audio: np.ndarray,
audio_rms: float,
audio_peak: float,
*,
was_barge_in: bool = False,
) -> bool:
if was_barge_in and audio.size > 0:
return False
duration_ms = (audio.size / settings.sample_rate) * 1000.0
min_samples = int((settings.min_utterance_ms / 1000.0) * settings.sample_rate)
short_utterance = _is_short_utterance(audio)
min_rms = settings.short_utterance_min_rms if short_utterance else settings.transcription_min_rms
min_peak = settings.short_utterance_min_peak if short_utterance else settings.transcription_min_peak
if audio_rms < min_rms or audio_peak < min_peak:
return True
if short_utterance and duration_ms < settings.min_utterance_ms:
return True
if audio.size >= min_samples:
return False
return not short_utterance
async def _register_session(connection: SessionConnection) -> None:
async with ACTIVE_SESSIONS_LOCK:
ACTIVE_SESSIONS[connection.runtime.session_id] = connection
async def _unregister_session(session_id: str) -> None:
async with ACTIVE_SESSIONS_LOCK:
ACTIVE_SESSIONS.pop(session_id, None)
async def _resolve_target_sessions(session_id: str | None) -> list[SessionConnection]:
async with ACTIVE_SESSIONS_LOCK:
if session_id:
connection = ACTIVE_SESSIONS.get(session_id)
return [connection] if connection is not None else []
if len(ACTIVE_SESSIONS) == 1:
return list(ACTIVE_SESSIONS.values())
return []
async def _queue_notification(connection: SessionConnection, notification: AssistantNotification) -> None:
if not await _send_json(
connection.websocket,
connection.runtime,
{"type": notification.event_type, "text": notification.text},
):
return
if notification.speak and connection.runtime.notification_queue is not None:
await connection.runtime.notification_queue.put(notification)
async def _notification_worker(connection: SessionConnection) -> None:
runtime = connection.runtime
queue = runtime.notification_queue
if queue is None:
return
try:
while True:
notification = await queue.get()
if not notification.text.strip():
continue
if not await _wait_for_notification_window(connection.state, runtime):
continue
await _speak_side_channel(
connection.websocket,
connection.state,
runtime,
notification.text,
connection.voice_prompt_path,
event_type=notification.event_type,
)
except asyncio.CancelledError:
return
async def _wait_for_notification_window(state: UtteranceState, runtime: SessionRuntime) -> bool:
deadline = time.monotonic() + max(settings.notification_max_wait_ms, 0) / 1000.0
while time.monotonic() < deadline:
if runtime.response_task is None and runtime.utterance_task is None and not state.in_speech and not state.should_ignore_input():
return True
await asyncio.sleep(0.1)
return False
async def _maybe_trigger_backchannel(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
voice_prompt_path: str | None,
) -> None:
if not settings.backchannel_enabled:
return
if runtime.response_task is not None or runtime.utterance_task is not None or state.should_ignore_input():
return
if time.monotonic() < runtime.backchannel_audio_until:
return
if runtime.backchannel_task is not None and not runtime.backchannel_task.done():
return
now = time.monotonic()
if state.turn_speech_ms < settings.backchannel_min_speech_ms:
return
if state.active_speech_ms < settings.backchannel_stable_ms:
return
if (now - state.last_backchannel_at) * 1000.0 < settings.backchannel_min_interval_ms:
return
text = _choose_backchannel_text(state)
state.last_backchannel_at = now
state.backchannel_count += 1
runtime.backchannel_task = asyncio.create_task(
_speak_side_channel(
websocket,
state,
runtime,
text,
voice_prompt_path,
event_type="assistant.backchannel",
)
)
def _choose_backchannel_text(state: UtteranceState) -> str:
transcript = state.last_partial_transcript_text.strip().lower()
empathetic = ("I hear you", "yeah", "right", "okay")
questionish = ("right", "okay", "got it", "go on")
neutral_early = ("uh-huh", "yeah", "right", "go on")
neutral_late = ("I hear you", "okay", "got it", "all right")
if re.search(r"\b(frustrat|annoy|upset|worried|stress|issue|problem|hard|difficult|stuck)\w*", transcript):
pool = empathetic
elif "?" in transcript or re.search(r"\b(why|how|what|when|where|who|can you|do you|should i)\b", transcript):
pool = questionish
else:
pool = neutral_early if state.backchannel_count == 0 else neutral_late
limit = max(settings.backchannel_recent_limit, 1)
recent = set(state.recent_backchannels[-limit:])
options = [choice for choice in pool if choice not in recent] or list(pool)
choice = random.choice(options)
state.recent_backchannels.append(choice)
if len(state.recent_backchannels) > limit:
del state.recent_backchannels[:-limit]
return choice
def _dynamic_endpoint_decision(state: UtteranceState) -> EndpointDecision:
base_stop_ms = float(settings.vad_stop_ms)
if not settings.dynamic_endpointing_enabled:
return EndpointDecision(
stop_ms=base_stop_ms,
complete=False,
incomplete=False,
question_like=False,
)
transcript = state.last_partial_transcript_text.strip()
tokens = re.findall(r"[a-z']+", transcript.lower())
word_count = len(tokens)
last_word = tokens[-1] if tokens else ""
first_word = tokens[0] if tokens else ""
if not transcript:
stop_ms = base_stop_ms
if state.turn_speech_ms <= 900:
stop_ms -= settings.dynamic_endpointing_no_partial_short_discount_ms
elif state.turn_speech_ms <= 1800:
stop_ms -= settings.dynamic_endpointing_no_partial_medium_discount_ms
stop_ms = max(settings.dynamic_endpointing_min_ms, stop_ms)
stop_ms = min(settings.dynamic_endpointing_max_ms, stop_ms)
return EndpointDecision(
stop_ms=float(stop_ms),
complete=False,
incomplete=False,
question_like=False,
)
incomplete_tail_words = {
"a", "an", "the", "and", "or", "but", "so", "because", "if", "when", "while",
"that", "which", "who", "whose", "whom", "to", "for", "from", "with", "of",
"in", "on", "at", "by", "into", "about", "like", "as", "than", "then",
"is", "are", "was", "were", "am", "be", "been", "being", "do", "does", "did",
"have", "has", "had", "can", "could", "should", "would", "will", "just",
"my", "your", "our", "their", "this", "these", "those", "it", "i'm", "you're",
"we're", "they're", "he's", "she's", "then", "then i", "then we",
}
filler_tail_words = {"um", "uh", "er", "ah", "hmm", "mm", "like"}
continuation_phrases = (
"i think", "i mean", "for example", "so if", "and then", "because if",
"what i", "what if", "the thing is", "it feels like",
)
question_starters = (
"what", "why", "how", "when", "where", "who", "which", "can", "could",
"would", "should", "do", "does", "did", "is", "are", "am", "will",
)
ends_with_terminal_punctuation = bool(re.search(r"[.!?]['\"]?\s*$", transcript))
ends_with_commaish_pause = bool(re.search(r"[,;:]\s*$", transcript))
starts_with_continuation = any(transcript.lower().startswith(phrase) for phrase in continuation_phrases)
open_clause_pattern = re.search(
r"\b("
r"trying to|want to|wanted to|need to|needed to|going to|about to|"
r"tell me how|tell me what|tell me why|show me how|"
r"how to|what to|where to|why the|why it|"
r"open the|open a|fix the|fix a"
r")\b(?:\s+\w+){0,3}\s*$",
transcript.lower(),
)
request_for_instructions = re.search(
r"\b(can|could|would|will)\s+you\s+tell\s+me\s+how\s+to\b(?:\s+\w+){0,4}\s*$",
transcript.lower(),
)
question_like = transcript.endswith("?") or (
word_count >= 4 and tokens and first_word in question_starters
)
has_recent_partial = bool(transcript) and state.silence_ms < max(settings.vad_stop_ms, 1) * 0.8
incomplete = (
bool(transcript)
and not ends_with_terminal_punctuation
and (
last_word in incomplete_tail_words
or last_word in filler_tail_words
or ends_with_commaish_pause
or re.search(r"\.\.\.\s*$", transcript) is not None
or starts_with_continuation
or open_clause_pattern is not None
or request_for_instructions is not None
or re.search(r"\b(and|or|but|so|because|if|when|while|then)\s+(the|a|an|my|your|our|their|it|this|that)\s*$", transcript.lower()) is not None
or re.search(r"\b(i|we|they|he|she)\s+(was|were|am|are|is|have|had|would|could|should|will)\s*$", transcript.lower()) is not None
or (has_recent_partial and word_count < 12 and not ends_with_terminal_punctuation)
)
)
complete = (
bool(transcript)
and word_count >= 3
and not incomplete
and (ends_with_terminal_punctuation or question_like)
)
stop_ms = base_stop_ms
if transcript and word_count <= 2 and state.turn_speech_ms < 900:
stop_ms += settings.dynamic_endpointing_short_utterance_bias_ms
if incomplete:
stop_ms += settings.dynamic_endpointing_incomplete_bias_ms
if complete:
stop_ms -= settings.dynamic_endpointing_complete_discount_ms
if question_like and complete:
stop_ms -= settings.dynamic_endpointing_question_discount_ms
stop_ms = max(settings.dynamic_endpointing_min_ms, stop_ms)
stop_ms = min(settings.dynamic_endpointing_max_ms, stop_ms)
return EndpointDecision(
stop_ms=float(stop_ms),
complete=complete,
incomplete=incomplete,
question_like=question_like,
)
async def _maybe_speak_progress_update(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
text: str,
voice_prompt_path: str | None,
) -> None:
if not text.strip():
return
now = time.monotonic()
if (now - runtime.last_progress_voice_at) * 1000.0 < settings.agent_progress_speak_min_interval_ms:
return
runtime.last_progress_voice_at = now
await _speak_side_channel(
websocket,
state,
runtime,
text,
voice_prompt_path,
event_type="assistant.status",
)
def _tool_use_status_text(tool_name: str, tool_input: object) -> str:
if not isinstance(tool_input, dict):
return tool_name or "working"
preview = ""
for key in ("path", "command", "url", "pattern", "name_pattern", "query", "tool_name"):
value = tool_input.get(key)
if isinstance(value, str) and value.strip():
preview = value.strip()
break
if preview:
preview = preview if len(preview) <= 72 else f"{preview[:69]}..."
return f"{tool_name}: {preview}"
return tool_name or "working"
def _status_text_for_chat(text: str) -> str:
cleaned = _normalize_cli_reply_text(text)
if not cleaned:
return ""
if _is_internal_agent_status(cleaned):
return ""
if len(cleaned) <= 120:
return cleaned
return f"{cleaned[:117].rstrip()}..."
def _is_generic_agent_status(text: str) -> bool:
normalized = text.strip().lower().rstrip(".")
return normalized in {
"working on it",
"thinking",
"building a plan",
"starting multi-step work",
"complex task detected; switching to orchestrate mode",
"simple task detected; using tools",
}
def _is_internal_agent_status(text: str) -> bool:
normalized = text.strip().lower()
return (
"injected relevant context from memory" in normalized
or "relevant context from memory" in normalized
or "context from memory" in normalized
or normalized == "from memory"
)
def _normalize_cli_reply_text(text: str) -> str:
cleaned = ANSI_ESCAPE_PATTERN.sub("", text).replace("\r", "\n")
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
def _summarize_agent_reply(text: str) -> str:
cleaned = _normalize_cli_reply_text(text)
if not cleaned:
return ""
cleaned = FENCED_CODE_PATTERN.sub(" ", cleaned)
cleaned = MARKDOWN_LINK_PATTERN.sub(r"\1", cleaned)
cleaned = INLINE_CODE_PATTERN.sub(r"\1", cleaned)
parts: list[str] = []
for raw_line in cleaned.splitlines():
line = raw_line.strip()
if not line or TABLE_ROW_PATTERN.match(line):
continue
if _is_internal_agent_status(line):
continue
line = re.sub(r"^#{1,6}\s*", "", line)
line = LIST_PREFIX_PATTERN.sub("", line)
line = re.sub(r"^\s*>\s*", "", line)
line = re.sub(r"\*\*(.*?)\*\*", r"\1", line)
line = re.sub(r"__(.*?)__", r"\1", line)
line = re.sub(r"(?<!\*)\*([^*]+)\*(?!\*)", r"\1", line)
line = NON_SPOKEN_CHARS_PATTERN.sub(" ", line)
line = re.sub(r"\s+", " ", line).strip()
if not line or RAWISH_LINE_PATTERN.search(line):
continue
if len(re.findall(r"\w+", line)) < 3:
continue
if line:
parts.append(line)
flattened = re.sub(r"\s+", " ", " ".join(parts)).strip()
if not flattened:
flattened = _fallback_agent_reply_summary(cleaned)
if not flattened:
return ""
sentence_matches = re.findall(r"[^.!?]+[.!?]?", flattened)
summary_parts: list[str] = []
current_len = 0
for sentence in sentence_matches:
normalized = sentence.strip()
if not normalized:
continue
projected = current_len + len(normalized) + (1 if summary_parts else 0)
if summary_parts and projected > 150:
break
summary_parts.append(normalized)
current_len = projected
if len(summary_parts) >= 2:
break
summary = " ".join(summary_parts).strip() or flattened
summary = _scrub_code_artifacts(summary)
if not summary:
return "I found it."
return summary if len(summary) <= 150 else f"{summary[:147].rstrip()}..."
def _fallback_agent_reply_summary(text: str) -> str:
sentences = re.findall(r"[^.!?\n]+[.!?]?", text)
for sentence in sentences:
normalized = re.sub(r"\s+", " ", sentence).strip()
if not normalized:
continue
if _is_internal_agent_status(normalized):
continue
if RAWISH_LINE_PATTERN.search(normalized):
continue
word_count = len(re.findall(r"\w+", normalized))
if word_count < 3:
continue
return normalized
return "I found it."
def _scrub_code_artifacts(text: str) -> str:
scrubbed = re.sub(r"\b\w+/\w+[-\w./]*\.\w{1,5}\b", "", text) # relative paths
scrubbed = re.sub(r"/[-\w./]{3,}", "", scrubbed) # absolute paths
scrubbed = re.sub(r"\$\w+", "", scrubbed) # shell vars
scrubbed = re.sub(r"\b\w+=\S+", "", scrubbed) # key=value
scrubbed = NON_SPOKEN_CHARS_PATTERN.sub(" ", scrubbed)
scrubbed = re.sub(r"\s+", " ", scrubbed).strip()
scrubbed = re.sub(r"\s+([,.:;!?])", r"\1", scrubbed)
return scrubbed
def _mirror_cli_turn(speaker: str, text: str) -> None:
cleaned = text.strip()
if not cleaned:
return
print(f"{speaker}: {cleaned}", flush=True)
if speaker.lower() == "you":
return
_mirror_to_agent_terminal(_format_cli_turn(speaker, cleaned))
_last_mirror_activity_at: float = 0.0
_MIRROR_ACTIVITY_MIN_INTERVAL_S: float = 0.4
_mirror_activity_skipped: int = 0
def _mirror_cli_activity(label: str, text: str) -> None:
global _last_mirror_activity_at, _mirror_activity_skipped
cleaned = _normalize_cli_reply_text(text)
if not cleaned:
return
now = time.monotonic()
elapsed = now - _last_mirror_activity_at
if elapsed < _MIRROR_ACTIVITY_MIN_INTERVAL_S:
_mirror_activity_skipped += 1
return
skipped = _mirror_activity_skipped
_mirror_activity_skipped = 0
_last_mirror_activity_at = now
prefix = f" \x1b[90m(+{skipped} more)\x1b[0m\n" if skipped > 0 else ""
_mirror_to_agent_terminal(f"{prefix}{_format_cli_activity(label, cleaned)}")
def _mirror_cli_interrupt() -> None:
_mirror_to_agent_terminal("\n\x1b[90m── interrupted ──\x1b[0m\n")
def _format_cli_turn(speaker: str, text: str) -> str:
is_user = speaker.lower() == "you"
if is_user:
return f"\n\x1b[32;1m›\x1b[0m \x1b[32m{text}\x1b[0m\n"
return f"{text}\n"
def _format_cli_activity(label: str, text: str) -> str:
first_line, *rest = text.splitlines() or [text]
summary = first_line.strip()
if rest:
summary = f"{summary} ..."
summary = summary if len(summary) <= 120 else f"{summary[:117].rstrip()}..."
lowered = label.lower()
if summary == "💭 Injected relevant context from memory":
return f"{summary}\n"
if lowered == "status" and summary.lower().rstrip(".") in {
"working on it",
"thinking",
"building a plan",
"starting multi-step work",
"complex task detected; switching to orchestrate mode",
"simple task detected; using tools",
}:
return f" {summary}\n"
if lowered == "tool":
return f" \x1b[34mtool\x1b[0m {summary}\n"
if lowered == "result":
return f" \x1b[90mresult\x1b[0m {summary}\n"
if lowered == "error":
return f" \x1b[31merror\x1b[0m {summary}\n"
return f" \x1b[90m…\x1b[0m {summary}\n"
def _mirror_to_agent_terminal(text: str) -> None:
tty_path = _resolve_agent_terminal_tty()
if not tty_path:
return
try:
payload = text
if not payload.endswith("\n"):
payload = f"{payload}\n"
with open(tty_path, "a", encoding="utf-8", errors="ignore") as handle:
handle.write(payload)
handle.flush()
except OSError:
return
def _resolve_agent_terminal_tty() -> str | None:
configured = (settings.my_agent_active_tty_file or "").strip()
if configured:
marker_path = Path(configured)
else:
marker_path = Path(settings.my_agent_cwd) / ".active-terminal-tty"
try:
tty_path = marker_path.read_text(encoding="utf-8").strip()
except OSError:
return None
if not tty_path.startswith("/dev/pts/") and tty_path != "/dev/tty":
return None
if not os.path.exists(tty_path):
return None
return tty_path
def _tool_use_voice_text(tool_name: str) -> str:
normalized = tool_name.lower()
if "browser" in normalized or "web" in normalized:
return "I'm checking that."
if "file" in normalized or "path" in normalized:
return "I'm looking through the files."
if "command" in normalized or "shell" in normalized:
return "I'm working on it in the terminal."
if "search" in normalized:
return "I'm looking into that."
return "I'm working on it."
def _tool_result_summary(content: object) -> str:
if isinstance(content, str):
text = content.strip()
elif isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text_value = item.get("text")
if isinstance(text_value, str) and text_value.strip():
parts.append(text_value.strip())
elif isinstance(item, str) and item.strip():
parts.append(item.strip())
text = " ".join(parts).strip()
else:
text = ""
if not text:
return "done"
text = FENCED_CODE_PATTERN.sub(" ", text)
text = re.sub(r"\s+", " ", text).strip()
if not text:
return "done"
segments = re.split(r"(?<=[.!?])\s+|\n+", text)
for segment in segments:
normalized = segment.strip()
if not normalized or RAWISH_LINE_PATTERN.search(normalized):
continue
return normalized if len(normalized) <= 100 else f"{normalized[:97].rstrip()}..."
return "done"
async def _speak_side_channel(
websocket: WebSocket,
state: UtteranceState,
runtime: SessionRuntime,
text: str,
voice_prompt_path: str | None,
*,
event_type: str,
) -> None:
try:
should_send_backchannel_audio = event_type != "assistant.backchannel" or state.in_speech
display_text, tts_text = pipeline.render_assistant_text(text, "")
if not display_text or not tts_text:
return
if event_type == "assistant.backchannel":
if not await _send_json(websocket, runtime, {"type": event_type, "text": display_text}):
raise WebSocketDisconnect
if event_type == "assistant.backchannel":
audio_chunks = await pipeline.synthesize_backchannel(tts_text, voice_prompt_path=voice_prompt_path)
else:
audio_chunks = await pipeline.synthesize_sentences(tts_text, voice_prompt_path=voice_prompt_path)
if event_type == "assistant.backchannel" and not should_send_backchannel_audio:
return
if event_type == "assistant.backchannel":
total_duration_s = sum(len(chunk) / 24000.0 for chunk in audio_chunks)
runtime.backchannel_audio_until = time.monotonic() + total_duration_s + 0.15
for audio_chunk in audio_chunks:
for chunk in chunk_audio(audio_chunk, 2400):
clipped = np.clip(chunk, -1.0, 1.0)
pcm16 = (clipped * 32767.0).astype(np.int16)
if not await _send_bytes(websocket, runtime, pcm16.tobytes()):
raise WebSocketDisconnect
if not await _send_json(websocket, runtime, {"type": "assistant.audio_chunk_end"}):
raise WebSocketDisconnect
except (asyncio.CancelledError, WebSocketDisconnect):
return
async def _cancel_task(task: asyncio.Task[None] | None) -> None:
if task is None:
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass