Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Twilio Media Streams (bidirectional) + Vosk + OpenAI Answer + Piper -> Twilio playback | |
| HF Spaces notes: | |
| - DO NOT run uvicorn here | |
| - This app is mounted at /twilio in app.py | |
| so routes here must be relative: | |
| POST /voice -> /twilio/voice | |
| WS /stream -> /twilio/stream | |
| """ | |
| import asyncio | |
| import base64 | |
| import json | |
| import logging | |
| import os | |
| import tempfile | |
| import time | |
| import audioop | |
| import subprocess | |
| from dataclasses import dataclass, field | |
| from typing import Optional, List, Dict | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request | |
| from fastapi.responses import PlainTextResponse, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from vosk import Model, KaldiRecognizer | |
| from openai import OpenAI | |
| # ---------------------------- | |
| # Logging | |
| # ---------------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| log = logging.getLogger("app") | |
| def P(tag: str, msg: str): | |
| print(f"{tag} {msg}", flush=True) | |
| # ---------------------------- | |
| # Env | |
| # ---------------------------- | |
| VOSK_MODEL_PATH = os.getenv("VOSK_MODEL_PATH", "/app/models/vosk-model-en-us-0.22-lgraph").strip() | |
| TWILIO_STREAM_URL = os.getenv("TWILIO_STREAM_URL", "").strip() | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip() | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini").strip() | |
| PIPER_BIN = os.getenv("PIPER_BIN", "piper").strip() | |
| PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "").strip() | |
| # ---------------------------- | |
| # FastAPI (sub-app) | |
| # ---------------------------- | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ---------------------------- | |
| # Audio / Twilio | |
| # ---------------------------- | |
| FRAME_MS = 20 | |
| INPUT_RATE = 8000 | |
| STT_RATE = 16000 | |
| BYTES_PER_20MS_MULAW = int(INPUT_RATE * (FRAME_MS / 1000.0)) # 160 bytes @ 8kHz, 20ms | |
| # ---------------------------- | |
| # VAD settings | |
| # ---------------------------- | |
| RMS_SPEECH_THRESHOLD = 450 | |
| SPEECH_START_FRAMES = 3 | |
| SPEECH_END_SILENCE_FRAMES = 40 # 800ms | |
| MAX_UTTERANCE_MS = 12000 | |
| PARTIAL_EMIT_EVERY_MS = 250 | |
| LAST_STATE = { | |
| "connected": False, | |
| "status": "idle", # idle | connected | listening | thinking | speaking | |
| "last_partial": "", | |
| "last_stt": "", | |
| "last_llm": "", | |
| "updated_ms": 0, | |
| } | |
| SYSTEM_PROMPT = ( | |
| "You are a phone-call assistant. " | |
| "Reply in 1 short sentence (max 15 words). " | |
| "No filler. No greetings unless user greets first." | |
| ) | |
| _VOSK_MODEL = None | |
| def now_ms() -> int: | |
| return int(time.time() * 1000) | |
| def build_twiml(stream_url: str) -> str: | |
| return f"""<?xml version="1.0" encoding="UTF-8"?> | |
| <Response> | |
| <Connect> | |
| <Stream url="{stream_url}" /> | |
| </Connect> | |
| <Pause length="600"/> | |
| </Response> | |
| """ | |
| def split_mulaw_frames(mulaw_bytes: bytes) -> List[bytes]: | |
| frames = [] | |
| for i in range(0, len(mulaw_bytes), BYTES_PER_20MS_MULAW): | |
| chunk = mulaw_bytes[i:i + BYTES_PER_20MS_MULAW] | |
| if len(chunk) < BYTES_PER_20MS_MULAW: | |
| chunk += b"\xFF" * (BYTES_PER_20MS_MULAW - len(chunk)) | |
| frames.append(chunk) | |
| return frames | |
| async def drain_queue(q: asyncio.Queue): | |
| try: | |
| while True: | |
| q.get_nowait() | |
| q.task_done() | |
| except asyncio.QueueEmpty: | |
| return | |
| # ---------------------------- | |
| # OpenAI | |
| # ---------------------------- | |
| def openai_client() -> OpenAI: | |
| if not OPENAI_API_KEY: | |
| raise RuntimeError("OPENAI_API_KEY not set") | |
| return OpenAI(api_key=OPENAI_API_KEY) | |
| def openai_answer_blocking(history: List[Dict], user_text: str) -> str: | |
| client = openai_client() | |
| msgs = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| tail = history[-6:] if len(history) > 1 else [] | |
| msgs.extend(tail) | |
| msgs.append({"role": "user", "content": user_text}) | |
| resp = client.chat.completions.create( | |
| model=OPENAI_MODEL, | |
| messages=msgs, | |
| temperature=0.3, | |
| max_tokens=80, | |
| ) | |
| return (resp.choices[0].message.content or "").strip() | |
| # ---------------------------- | |
| # Piper TTS -> 8k mulaw | |
| # ---------------------------- | |
| def piper_tts_to_mulaw(text: str) -> bytes: | |
| if not PIPER_MODEL_PATH: | |
| raise RuntimeError("PIPER_MODEL_PATH not set") | |
| text = (text or "").strip() | |
| if not text: | |
| return b"" | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wavf: | |
| wav_path = wavf.name | |
| with tempfile.NamedTemporaryFile(suffix=".mulaw", delete=False) as mlf: | |
| mulaw_path = mlf.name | |
| try: | |
| r1 = subprocess.run( | |
| [PIPER_BIN, "--model", PIPER_MODEL_PATH, "--output_file", wav_path], | |
| input=text.encode("utf-8"), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| if r1.returncode != 0: | |
| raise RuntimeError(f"piper rc={r1.returncode} stderr={r1.stderr.decode('utf-8','ignore')[:500]}") | |
| # clarity filter for phone audio | |
| af = "highpass=f=200,lowpass=f=3400,compand=attacks=0:decays=0.3:points=-80/-80|-20/-10|0/-3" | |
| r2 = subprocess.run( | |
| ["ffmpeg", "-y", "-i", wav_path, | |
| "-ac", "1", "-ar", "8000", | |
| "-af", af, | |
| "-f", "mulaw", mulaw_path], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| if r2.returncode != 0: | |
| raise RuntimeError(f"ffmpeg rc={r2.returncode} stderr={r2.stderr.decode('utf-8','ignore')[:500]}") | |
| with open(mulaw_path, "rb") as f: | |
| return f.read() | |
| finally: | |
| for p in (wav_path, mulaw_path): | |
| try: | |
| os.unlink(p) | |
| except Exception: | |
| pass | |
| # ---------------------------- | |
| # Call state | |
| # ---------------------------- | |
| class CancelFlag: | |
| is_set: bool = False | |
| def set(self): | |
| self.is_set = True | |
| class CallState: | |
| call_id: str | |
| stream_sid: str = "" | |
| in_speech: bool = False | |
| speech_start_count: int = 0 | |
| silence_count: int = 0 | |
| utter_start_ms: int = 0 | |
| rec: Optional[KaldiRecognizer] = None | |
| last_partial: str = "" | |
| last_partial_emit_ms: int = 0 | |
| outbound_q: asyncio.Queue = field(default_factory=lambda: asyncio.Queue(maxsize=50000)) | |
| outbound_task: Optional[asyncio.Task] = None | |
| keepalive_task: Optional[asyncio.Task] = None | |
| mark_i: int = 0 | |
| bot_speaking: bool = False | |
| cancel_llm: CancelFlag = field(default_factory=CancelFlag) | |
| tts_generation_id: int = 0 | |
| history: List[Dict] = field(default_factory=list) | |
| bot_lock: asyncio.Lock = field(default_factory=asyncio.Lock) | |
| def bump_tts_generation(self) -> int: | |
| self.tts_generation_id += 1 | |
| return self.tts_generation_id | |
| async def twilio_keepalive(ws: WebSocket, st: CallState): | |
| try: | |
| while True: | |
| await asyncio.sleep(10) | |
| if st.stream_sid: | |
| st.mark_i += 1 | |
| name = f"ka_{st.mark_i}" | |
| await ws.send_text(json.dumps({ | |
| "event": "mark", | |
| "streamSid": st.stream_sid, | |
| "mark": {"name": name}, | |
| })) | |
| P("TWILIO>", f"keepalive_mark={name}") | |
| except asyncio.CancelledError: | |
| return | |
| except Exception as e: | |
| P("SYS>", f"keepalive_error={e}") | |
| # ---------------------------- | |
| # Twilio Voice Webhook (FIXES 405) | |
| # Accept POST + GET + trailing slash | |
| # ---------------------------- | |
| async def voice(request: Request): | |
| stream_url = TWILIO_STREAM_URL | |
| # Auto-build if not set | |
| if not stream_url: | |
| host = request.headers.get("host") | |
| if host: | |
| # mounted at /twilio, so final ws is /twilio/stream | |
| stream_url = f"wss://{host}/twilio/stream" | |
| P("SYS>", f"auto_stream_url={stream_url}") | |
| if not stream_url: | |
| return PlainTextResponse("TWILIO_STREAM_URL not set and host not found", status_code=500) | |
| return Response(content=build_twiml(stream_url), media_type="application/xml") | |
| async def health(): | |
| return {"ok": True} | |
| # ---------------------------- | |
| # WebSocket /stream (mounted => /twilio/stream) | |
| # ---------------------------- | |
| async def stream(ws: WebSocket): | |
| await ws.accept() | |
| LAST_STATE["connected"] = True | |
| LAST_STATE["status"] = "connected" | |
| LAST_STATE["updated_ms"] = now_ms() | |
| st = CallState(call_id=str(id(ws))) | |
| st.history = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| P("SYS>", f"ws_open call_id={st.call_id}") | |
| global _VOSK_MODEL | |
| if _VOSK_MODEL is None: | |
| P("SYS>", f"loading_vosk={VOSK_MODEL_PATH}") | |
| _VOSK_MODEL = Model(VOSK_MODEL_PATH) | |
| P("SYS>", "vosk_loaded") | |
| st.rec = KaldiRecognizer(_VOSK_MODEL, STT_RATE) | |
| st.rec.SetWords(False) | |
| st.outbound_task = asyncio.create_task(outbound_sender(ws, st)) | |
| try: | |
| while True: | |
| raw = await ws.receive_text() | |
| msg = json.loads(raw) | |
| event = msg.get("event") | |
| if event == "start": | |
| st.stream_sid = msg["start"]["streamSid"] | |
| P("TWILIO>", f"start streamSid={st.stream_sid}") | |
| if st.keepalive_task is None: | |
| st.keepalive_task = asyncio.create_task(twilio_keepalive(ws, st)) | |
| asyncio.create_task(speak_text(ws, st, "Hi! How can I help?")) | |
| elif event == "media": | |
| mulaw = base64.b64decode(msg["media"]["payload"]) | |
| pcm16_8k = audioop.ulaw2lin(mulaw, 2) | |
| pcm16_16k, _ = audioop.ratecv(pcm16_8k, 2, 1, INPUT_RATE, STT_RATE, None) | |
| rms = audioop.rms(pcm16_16k, 2) | |
| is_speech = rms >= RMS_SPEECH_THRESHOLD | |
| if st.bot_speaking and is_speech: | |
| await barge_in(ws, st) | |
| await vad_and_stt(ws, st, pcm16_16k, is_speech) | |
| elif event == "stop": | |
| P("TWILIO>", "stop") | |
| break | |
| except WebSocketDisconnect: | |
| P("SYS>", "ws_disconnect") | |
| except Exception as e: | |
| P("SYS>", f"ws_error={e}") | |
| log.exception("ws_error") | |
| finally: | |
| if st.keepalive_task: | |
| st.keepalive_task.cancel() | |
| if st.outbound_task: | |
| st.outbound_task.cancel() | |
| LAST_STATE["connected"] = False | |
| LAST_STATE["status"] = "idle" | |
| LAST_STATE["last_partial"] = "" | |
| LAST_STATE["updated_ms"] = now_ms() | |
| P("SYS>", "ws_closed") | |
| # ---------------------------- | |
| # VAD + STT | |
| # ---------------------------- | |
| async def vad_and_stt(ws: WebSocket, st: CallState, pcm16_16k: bytes, is_speech: bool): | |
| t = now_ms() | |
| if not st.in_speech: | |
| if is_speech: | |
| st.speech_start_count += 1 | |
| if st.speech_start_count >= SPEECH_START_FRAMES: | |
| st.in_speech = True | |
| st.silence_count = 0 | |
| st.utter_start_ms = t | |
| st.speech_start_count = 0 | |
| st.last_partial = "" | |
| st.last_partial_emit_ms = 0 | |
| st.rec = KaldiRecognizer(_VOSK_MODEL, STT_RATE) | |
| st.rec.SetWords(False) | |
| else: | |
| st.speech_start_count = 0 | |
| return | |
| st.rec.AcceptWaveform(pcm16_16k) | |
| if t - st.last_partial_emit_ms >= PARTIAL_EMIT_EVERY_MS: | |
| st.last_partial_emit_ms = t | |
| try: | |
| pj = json.loads(st.rec.PartialResult() or "{}") | |
| partial = (pj.get("partial") or "").strip() | |
| except Exception: | |
| partial = "" | |
| if partial and partial != st.last_partial: | |
| st.last_partial = partial | |
| P("STT_PART>", partial) | |
| LAST_STATE["status"] = "listening" | |
| LAST_STATE["last_partial"] = partial | |
| LAST_STATE["updated_ms"] = now_ms() | |
| if (t - st.utter_start_ms) > MAX_UTTERANCE_MS: | |
| await finalize_utterance(ws, st, "max_utterance") | |
| return | |
| if is_speech: | |
| st.silence_count = 0 | |
| return | |
| st.silence_count += 1 | |
| if st.silence_count >= SPEECH_END_SILENCE_FRAMES: | |
| await finalize_utterance(ws, st, f"vad_silence_{SPEECH_END_SILENCE_FRAMES*FRAME_MS}ms") | |
| async def finalize_utterance(ws: WebSocket, st: CallState, reason: str): | |
| if not st.in_speech: | |
| return | |
| st.in_speech = False | |
| st.silence_count = 0 | |
| st.speech_start_count = 0 | |
| st.last_partial = "" | |
| try: | |
| j = json.loads(st.rec.FinalResult() or "{}") | |
| except Exception: | |
| j = {} | |
| user_text = (j.get("text") or "").strip() | |
| if not user_text: | |
| return | |
| P("STT_FINAL>", f"{user_text} ({reason})") | |
| LAST_STATE["status"] = "thinking" | |
| LAST_STATE["last_stt"] = user_text | |
| LAST_STATE["last_partial"] = "" | |
| LAST_STATE["updated_ms"] = now_ms() | |
| async def bot_job(): | |
| async with st.bot_lock: | |
| await answer_and_speak(ws, st, user_text) | |
| asyncio.create_task(bot_job()) | |
| async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str): | |
| st.history.append({"role": "user", "content": user_text}) | |
| st.history = st.history[:1] + st.history[-8:] | |
| loop = asyncio.get_running_loop() | |
| def worker(): | |
| return openai_answer_blocking(st.history, user_text) | |
| ans = await loop.run_in_executor(None, worker) | |
| ans = (ans or "").strip() or "Sorry, I didn’t catch that." | |
| P("LLM_ANS>", ans) | |
| LAST_STATE["last_llm"] = ans | |
| LAST_STATE["status"] = "speaking" | |
| LAST_STATE["updated_ms"] = now_ms() | |
| st.history.append({"role": "assistant", "content": ans}) | |
| st.history = st.history[:1] + st.history[-8:] | |
| await speak_text(ws, st, ans) | |
| async def barge_in(ws: WebSocket, st: CallState): | |
| st.bump_tts_generation() | |
| if st.stream_sid: | |
| try: | |
| await ws.send_text(json.dumps({"event": "clear", "streamSid": st.stream_sid})) | |
| P("TWILIO>", "sent_clear") | |
| except Exception: | |
| pass | |
| await drain_queue(st.outbound_q) | |
| st.bot_speaking = False | |
| async def speak_text(ws: WebSocket, st: CallState, text: str): | |
| gen = st.bump_tts_generation() | |
| if st.stream_sid: | |
| try: | |
| await ws.send_text(json.dumps({"event": "clear", "streamSid": st.stream_sid})) | |
| P("TWILIO>", "sent_clear") | |
| except Exception: | |
| pass | |
| await drain_queue(st.outbound_q) | |
| await tts_enqueue(st, text, gen) | |
| async def tts_enqueue(st: CallState, text: str, gen: int): | |
| st.bot_speaking = True | |
| P("TTS>", f"text={text} gen={gen}") | |
| loop = asyncio.get_running_loop() | |
| try: | |
| mulaw_bytes = await loop.run_in_executor(None, piper_tts_to_mulaw, text) | |
| except Exception as e: | |
| P("TTS_ERR>", str(e)) | |
| st.bot_speaking = False | |
| return | |
| if gen != st.tts_generation_id: | |
| return | |
| for fr in split_mulaw_frames(mulaw_bytes): | |
| if gen != st.tts_generation_id: | |
| return | |
| await st.outbound_q.put(base64.b64encode(fr).decode("ascii")) | |
| await st.outbound_q.put("__END_CHUNK__") | |
| async def outbound_sender(ws: WebSocket, st: CallState): | |
| try: | |
| while True: | |
| item = await st.outbound_q.get() | |
| if item == "__END_CHUNK__": | |
| await asyncio.sleep(0.02) | |
| if st.outbound_q.empty(): | |
| st.bot_speaking = False | |
| LAST_STATE["status"] = "connected" | |
| LAST_STATE["updated_ms"] = now_ms() | |
| st.outbound_q.task_done() | |
| continue | |
| if not st.stream_sid: | |
| st.outbound_q.task_done() | |
| continue | |
| await ws.send_text(json.dumps({ | |
| "event": "media", | |
| "streamSid": st.stream_sid, | |
| "media": {"payload": item}, | |
| })) | |
| st.outbound_q.task_done() | |
| await asyncio.sleep(FRAME_MS / 1000.0) | |
| except asyncio.CancelledError: | |
| return | |
| except Exception as e: | |
| P("SYS>", f"outbound_sender_error={e}") | |
| log.exception("outbound_sender_error") | |
| async def debug_last(): | |
| return LAST_STATE | |