""" Twilio Media Streams (bidirectional) + Vosk + OpenAI Answer + Piper -> Twilio playback HF Spaces notes: - DO NOT run uvicorn here - This app is mounted at /twilio in app.py so routes here must be relative: POST /voice -> /twilio/voice WS /stream -> /twilio/stream """ import asyncio import base64 import json import logging import os import tempfile import time import audioop import subprocess from dataclasses import dataclass, field from typing import Optional, List, Dict from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.responses import PlainTextResponse, Response from fastapi.middleware.cors import CORSMiddleware from vosk import Model, KaldiRecognizer from openai import OpenAI # ---------------------------- # Logging # ---------------------------- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("app") def P(tag: str, msg: str): print(f"{tag} {msg}", flush=True) # ---------------------------- # Env # ---------------------------- VOSK_MODEL_PATH = os.getenv("VOSK_MODEL_PATH", "/app/models/vosk-model-en-us-0.22-lgraph").strip() TWILIO_STREAM_URL = os.getenv("TWILIO_STREAM_URL", "").strip() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip() OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini").strip() PIPER_BIN = os.getenv("PIPER_BIN", "piper").strip() PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "").strip() # ---------------------------- # FastAPI (sub-app) # ---------------------------- app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ---------------------------- # Audio / Twilio # ---------------------------- FRAME_MS = 20 INPUT_RATE = 8000 STT_RATE = 16000 BYTES_PER_20MS_MULAW = int(INPUT_RATE * (FRAME_MS / 1000.0)) # 160 bytes @ 8kHz, 20ms # ---------------------------- # VAD settings # ---------------------------- RMS_SPEECH_THRESHOLD = 450 SPEECH_START_FRAMES = 3 SPEECH_END_SILENCE_FRAMES = 40 # 800ms MAX_UTTERANCE_MS = 12000 PARTIAL_EMIT_EVERY_MS = 250 LAST_STATE = { "connected": False, "status": "idle", # idle | connected | listening | thinking | speaking "last_partial": "", "last_stt": "", "last_llm": "", "updated_ms": 0, } SYSTEM_PROMPT = ( "You are a phone-call assistant. " "Reply in 1 short sentence (max 15 words). " "No filler. No greetings unless user greets first." ) _VOSK_MODEL = None def now_ms() -> int: return int(time.time() * 1000) def build_twiml(stream_url: str) -> str: return f""" """ def split_mulaw_frames(mulaw_bytes: bytes) -> List[bytes]: frames = [] for i in range(0, len(mulaw_bytes), BYTES_PER_20MS_MULAW): chunk = mulaw_bytes[i:i + BYTES_PER_20MS_MULAW] if len(chunk) < BYTES_PER_20MS_MULAW: chunk += b"\xFF" * (BYTES_PER_20MS_MULAW - len(chunk)) frames.append(chunk) return frames async def drain_queue(q: asyncio.Queue): try: while True: q.get_nowait() q.task_done() except asyncio.QueueEmpty: return # ---------------------------- # OpenAI # ---------------------------- def openai_client() -> OpenAI: if not OPENAI_API_KEY: raise RuntimeError("OPENAI_API_KEY not set") return OpenAI(api_key=OPENAI_API_KEY) def openai_answer_blocking(history: List[Dict], user_text: str) -> str: client = openai_client() msgs = [{"role": "system", "content": SYSTEM_PROMPT}] tail = history[-6:] if len(history) > 1 else [] msgs.extend(tail) msgs.append({"role": "user", "content": user_text}) resp = client.chat.completions.create( model=OPENAI_MODEL, messages=msgs, temperature=0.3, max_tokens=80, ) return (resp.choices[0].message.content or "").strip() # ---------------------------- # Piper TTS -> 8k mulaw # ---------------------------- def piper_tts_to_mulaw(text: str) -> bytes: if not PIPER_MODEL_PATH: raise RuntimeError("PIPER_MODEL_PATH not set") text = (text or "").strip() if not text: return b"" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wavf: wav_path = wavf.name with tempfile.NamedTemporaryFile(suffix=".mulaw", delete=False) as mlf: mulaw_path = mlf.name try: r1 = subprocess.run( [PIPER_BIN, "--model", PIPER_MODEL_PATH, "--output_file", wav_path], input=text.encode("utf-8"), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if r1.returncode != 0: raise RuntimeError(f"piper rc={r1.returncode} stderr={r1.stderr.decode('utf-8','ignore')[:500]}") # clarity filter for phone audio af = "highpass=f=200,lowpass=f=3400,compand=attacks=0:decays=0.3:points=-80/-80|-20/-10|0/-3" r2 = subprocess.run( ["ffmpeg", "-y", "-i", wav_path, "-ac", "1", "-ar", "8000", "-af", af, "-f", "mulaw", mulaw_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if r2.returncode != 0: raise RuntimeError(f"ffmpeg rc={r2.returncode} stderr={r2.stderr.decode('utf-8','ignore')[:500]}") with open(mulaw_path, "rb") as f: return f.read() finally: for p in (wav_path, mulaw_path): try: os.unlink(p) except Exception: pass # ---------------------------- # Call state # ---------------------------- @dataclass class CancelFlag: is_set: bool = False def set(self): self.is_set = True @dataclass class CallState: call_id: str stream_sid: str = "" in_speech: bool = False speech_start_count: int = 0 silence_count: int = 0 utter_start_ms: int = 0 rec: Optional[KaldiRecognizer] = None last_partial: str = "" last_partial_emit_ms: int = 0 outbound_q: asyncio.Queue = field(default_factory=lambda: asyncio.Queue(maxsize=50000)) outbound_task: Optional[asyncio.Task] = None keepalive_task: Optional[asyncio.Task] = None mark_i: int = 0 bot_speaking: bool = False cancel_llm: CancelFlag = field(default_factory=CancelFlag) tts_generation_id: int = 0 history: List[Dict] = field(default_factory=list) bot_lock: asyncio.Lock = field(default_factory=asyncio.Lock) def bump_tts_generation(self) -> int: self.tts_generation_id += 1 return self.tts_generation_id async def twilio_keepalive(ws: WebSocket, st: CallState): try: while True: await asyncio.sleep(10) if st.stream_sid: st.mark_i += 1 name = f"ka_{st.mark_i}" await ws.send_text(json.dumps({ "event": "mark", "streamSid": st.stream_sid, "mark": {"name": name}, })) P("TWILIO>", f"keepalive_mark={name}") except asyncio.CancelledError: return except Exception as e: P("SYS>", f"keepalive_error={e}") # ---------------------------- # Twilio Voice Webhook (FIXES 405) # Accept POST + GET + trailing slash # ---------------------------- @app.post("/voice") @app.post("/voice/") @app.get("/voice") @app.get("/voice/") async def voice(request: Request): stream_url = TWILIO_STREAM_URL # Auto-build if not set if not stream_url: host = request.headers.get("host") if host: # mounted at /twilio, so final ws is /twilio/stream stream_url = f"wss://{host}/twilio/stream" P("SYS>", f"auto_stream_url={stream_url}") if not stream_url: return PlainTextResponse("TWILIO_STREAM_URL not set and host not found", status_code=500) return Response(content=build_twiml(stream_url), media_type="application/xml") @app.get("/health") async def health(): return {"ok": True} # ---------------------------- # WebSocket /stream (mounted => /twilio/stream) # ---------------------------- @app.websocket("/stream") async def stream(ws: WebSocket): await ws.accept() LAST_STATE["connected"] = True LAST_STATE["status"] = "connected" LAST_STATE["updated_ms"] = now_ms() st = CallState(call_id=str(id(ws))) st.history = [{"role": "system", "content": SYSTEM_PROMPT}] P("SYS>", f"ws_open call_id={st.call_id}") global _VOSK_MODEL if _VOSK_MODEL is None: P("SYS>", f"loading_vosk={VOSK_MODEL_PATH}") _VOSK_MODEL = Model(VOSK_MODEL_PATH) P("SYS>", "vosk_loaded") st.rec = KaldiRecognizer(_VOSK_MODEL, STT_RATE) st.rec.SetWords(False) st.outbound_task = asyncio.create_task(outbound_sender(ws, st)) try: while True: raw = await ws.receive_text() msg = json.loads(raw) event = msg.get("event") if event == "start": st.stream_sid = msg["start"]["streamSid"] P("TWILIO>", f"start streamSid={st.stream_sid}") if st.keepalive_task is None: st.keepalive_task = asyncio.create_task(twilio_keepalive(ws, st)) asyncio.create_task(speak_text(ws, st, "Hi! How can I help?")) elif event == "media": mulaw = base64.b64decode(msg["media"]["payload"]) pcm16_8k = audioop.ulaw2lin(mulaw, 2) pcm16_16k, _ = audioop.ratecv(pcm16_8k, 2, 1, INPUT_RATE, STT_RATE, None) rms = audioop.rms(pcm16_16k, 2) is_speech = rms >= RMS_SPEECH_THRESHOLD if st.bot_speaking and is_speech: await barge_in(ws, st) await vad_and_stt(ws, st, pcm16_16k, is_speech) elif event == "stop": P("TWILIO>", "stop") break except WebSocketDisconnect: P("SYS>", "ws_disconnect") except Exception as e: P("SYS>", f"ws_error={e}") log.exception("ws_error") finally: if st.keepalive_task: st.keepalive_task.cancel() if st.outbound_task: st.outbound_task.cancel() LAST_STATE["connected"] = False LAST_STATE["status"] = "idle" LAST_STATE["last_partial"] = "" LAST_STATE["updated_ms"] = now_ms() P("SYS>", "ws_closed") # ---------------------------- # VAD + STT # ---------------------------- async def vad_and_stt(ws: WebSocket, st: CallState, pcm16_16k: bytes, is_speech: bool): t = now_ms() if not st.in_speech: if is_speech: st.speech_start_count += 1 if st.speech_start_count >= SPEECH_START_FRAMES: st.in_speech = True st.silence_count = 0 st.utter_start_ms = t st.speech_start_count = 0 st.last_partial = "" st.last_partial_emit_ms = 0 st.rec = KaldiRecognizer(_VOSK_MODEL, STT_RATE) st.rec.SetWords(False) else: st.speech_start_count = 0 return st.rec.AcceptWaveform(pcm16_16k) if t - st.last_partial_emit_ms >= PARTIAL_EMIT_EVERY_MS: st.last_partial_emit_ms = t try: pj = json.loads(st.rec.PartialResult() or "{}") partial = (pj.get("partial") or "").strip() except Exception: partial = "" if partial and partial != st.last_partial: st.last_partial = partial P("STT_PART>", partial) LAST_STATE["status"] = "listening" LAST_STATE["last_partial"] = partial LAST_STATE["updated_ms"] = now_ms() if (t - st.utter_start_ms) > MAX_UTTERANCE_MS: await finalize_utterance(ws, st, "max_utterance") return if is_speech: st.silence_count = 0 return st.silence_count += 1 if st.silence_count >= SPEECH_END_SILENCE_FRAMES: await finalize_utterance(ws, st, f"vad_silence_{SPEECH_END_SILENCE_FRAMES*FRAME_MS}ms") async def finalize_utterance(ws: WebSocket, st: CallState, reason: str): if not st.in_speech: return st.in_speech = False st.silence_count = 0 st.speech_start_count = 0 st.last_partial = "" try: j = json.loads(st.rec.FinalResult() or "{}") except Exception: j = {} user_text = (j.get("text") or "").strip() if not user_text: return P("STT_FINAL>", f"{user_text} ({reason})") LAST_STATE["status"] = "thinking" LAST_STATE["last_stt"] = user_text LAST_STATE["last_partial"] = "" LAST_STATE["updated_ms"] = now_ms() async def bot_job(): async with st.bot_lock: await answer_and_speak(ws, st, user_text) asyncio.create_task(bot_job()) async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str): st.history.append({"role": "user", "content": user_text}) st.history = st.history[:1] + st.history[-8:] loop = asyncio.get_running_loop() def worker(): return openai_answer_blocking(st.history, user_text) ans = await loop.run_in_executor(None, worker) ans = (ans or "").strip() or "Sorry, I didn’t catch that." P("LLM_ANS>", ans) LAST_STATE["last_llm"] = ans LAST_STATE["status"] = "speaking" LAST_STATE["updated_ms"] = now_ms() st.history.append({"role": "assistant", "content": ans}) st.history = st.history[:1] + st.history[-8:] await speak_text(ws, st, ans) async def barge_in(ws: WebSocket, st: CallState): st.bump_tts_generation() if st.stream_sid: try: await ws.send_text(json.dumps({"event": "clear", "streamSid": st.stream_sid})) P("TWILIO>", "sent_clear") except Exception: pass await drain_queue(st.outbound_q) st.bot_speaking = False async def speak_text(ws: WebSocket, st: CallState, text: str): gen = st.bump_tts_generation() if st.stream_sid: try: await ws.send_text(json.dumps({"event": "clear", "streamSid": st.stream_sid})) P("TWILIO>", "sent_clear") except Exception: pass await drain_queue(st.outbound_q) await tts_enqueue(st, text, gen) async def tts_enqueue(st: CallState, text: str, gen: int): st.bot_speaking = True P("TTS>", f"text={text} gen={gen}") loop = asyncio.get_running_loop() try: mulaw_bytes = await loop.run_in_executor(None, piper_tts_to_mulaw, text) except Exception as e: P("TTS_ERR>", str(e)) st.bot_speaking = False return if gen != st.tts_generation_id: return for fr in split_mulaw_frames(mulaw_bytes): if gen != st.tts_generation_id: return await st.outbound_q.put(base64.b64encode(fr).decode("ascii")) await st.outbound_q.put("__END_CHUNK__") async def outbound_sender(ws: WebSocket, st: CallState): try: while True: item = await st.outbound_q.get() if item == "__END_CHUNK__": await asyncio.sleep(0.02) if st.outbound_q.empty(): st.bot_speaking = False LAST_STATE["status"] = "connected" LAST_STATE["updated_ms"] = now_ms() st.outbound_q.task_done() continue if not st.stream_sid: st.outbound_q.task_done() continue await ws.send_text(json.dumps({ "event": "media", "streamSid": st.stream_sid, "media": {"payload": item}, })) st.outbound_q.task_done() await asyncio.sleep(FRAME_MS / 1000.0) except asyncio.CancelledError: return except Exception as e: P("SYS>", f"outbound_sender_error={e}") log.exception("outbound_sender_error") @app.get("/debug/last") async def debug_last(): return LAST_STATE