neuralvoiceGPU / pipe_method3.py
ashishkblink's picture
Update pipe_method3.py
5a2afab verified
"""
Twilio Media Streams (bidirectional) + Vosk + OpenAI Answer + Piper -> Twilio playback
HF Spaces notes:
- DO NOT run uvicorn here
- This app is mounted at /twilio in app.py
so routes here must be relative:
POST /voice -> /twilio/voice
WS /stream -> /twilio/stream
"""
import asyncio
import base64
import json
import logging
import os
import tempfile
import time
import audioop
import subprocess
from dataclasses import dataclass, field
from typing import Optional, List, Dict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import PlainTextResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from vosk import Model, KaldiRecognizer
from openai import OpenAI
# ----------------------------
# Logging
# ----------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("app")
def P(tag: str, msg: str):
print(f"{tag} {msg}", flush=True)
# ----------------------------
# Env
# ----------------------------
VOSK_MODEL_PATH = os.getenv("VOSK_MODEL_PATH", "/app/models/vosk-model-en-us-0.22-lgraph").strip()
TWILIO_STREAM_URL = os.getenv("TWILIO_STREAM_URL", "").strip()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini").strip()
PIPER_BIN = os.getenv("PIPER_BIN", "piper").strip()
PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "").strip()
# ----------------------------
# FastAPI (sub-app)
# ----------------------------
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ----------------------------
# Audio / Twilio
# ----------------------------
FRAME_MS = 20
INPUT_RATE = 8000
STT_RATE = 16000
BYTES_PER_20MS_MULAW = int(INPUT_RATE * (FRAME_MS / 1000.0)) # 160 bytes @ 8kHz, 20ms
# ----------------------------
# VAD settings
# ----------------------------
RMS_SPEECH_THRESHOLD = 450
SPEECH_START_FRAMES = 3
SPEECH_END_SILENCE_FRAMES = 40 # 800ms
MAX_UTTERANCE_MS = 12000
PARTIAL_EMIT_EVERY_MS = 250
LAST_STATE = {
"connected": False,
"status": "idle", # idle | connected | listening | thinking | speaking
"last_partial": "",
"last_stt": "",
"last_llm": "",
"updated_ms": 0,
}
SYSTEM_PROMPT = (
"You are a phone-call assistant. "
"Reply in 1 short sentence (max 15 words). "
"No filler. No greetings unless user greets first."
)
_VOSK_MODEL = None
def now_ms() -> int:
return int(time.time() * 1000)
def build_twiml(stream_url: str) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="{stream_url}" />
</Connect>
<Pause length="600"/>
</Response>
"""
def split_mulaw_frames(mulaw_bytes: bytes) -> List[bytes]:
frames = []
for i in range(0, len(mulaw_bytes), BYTES_PER_20MS_MULAW):
chunk = mulaw_bytes[i:i + BYTES_PER_20MS_MULAW]
if len(chunk) < BYTES_PER_20MS_MULAW:
chunk += b"\xFF" * (BYTES_PER_20MS_MULAW - len(chunk))
frames.append(chunk)
return frames
async def drain_queue(q: asyncio.Queue):
try:
while True:
q.get_nowait()
q.task_done()
except asyncio.QueueEmpty:
return
# ----------------------------
# OpenAI
# ----------------------------
def openai_client() -> OpenAI:
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY not set")
return OpenAI(api_key=OPENAI_API_KEY)
def openai_answer_blocking(history: List[Dict], user_text: str) -> str:
client = openai_client()
msgs = [{"role": "system", "content": SYSTEM_PROMPT}]
tail = history[-6:] if len(history) > 1 else []
msgs.extend(tail)
msgs.append({"role": "user", "content": user_text})
resp = client.chat.completions.create(
model=OPENAI_MODEL,
messages=msgs,
temperature=0.3,
max_tokens=80,
)
return (resp.choices[0].message.content or "").strip()
# ----------------------------
# Piper TTS -> 8k mulaw
# ----------------------------
def piper_tts_to_mulaw(text: str) -> bytes:
if not PIPER_MODEL_PATH:
raise RuntimeError("PIPER_MODEL_PATH not set")
text = (text or "").strip()
if not text:
return b""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wavf:
wav_path = wavf.name
with tempfile.NamedTemporaryFile(suffix=".mulaw", delete=False) as mlf:
mulaw_path = mlf.name
try:
r1 = subprocess.run(
[PIPER_BIN, "--model", PIPER_MODEL_PATH, "--output_file", wav_path],
input=text.encode("utf-8"),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if r1.returncode != 0:
raise RuntimeError(f"piper rc={r1.returncode} stderr={r1.stderr.decode('utf-8','ignore')[:500]}")
# clarity filter for phone audio
af = "highpass=f=200,lowpass=f=3400,compand=attacks=0:decays=0.3:points=-80/-80|-20/-10|0/-3"
r2 = subprocess.run(
["ffmpeg", "-y", "-i", wav_path,
"-ac", "1", "-ar", "8000",
"-af", af,
"-f", "mulaw", mulaw_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if r2.returncode != 0:
raise RuntimeError(f"ffmpeg rc={r2.returncode} stderr={r2.stderr.decode('utf-8','ignore')[:500]}")
with open(mulaw_path, "rb") as f:
return f.read()
finally:
for p in (wav_path, mulaw_path):
try:
os.unlink(p)
except Exception:
pass
# ----------------------------
# Call state
# ----------------------------
@dataclass
class CancelFlag:
is_set: bool = False
def set(self):
self.is_set = True
@dataclass
class CallState:
call_id: str
stream_sid: str = ""
in_speech: bool = False
speech_start_count: int = 0
silence_count: int = 0
utter_start_ms: int = 0
rec: Optional[KaldiRecognizer] = None
last_partial: str = ""
last_partial_emit_ms: int = 0
outbound_q: asyncio.Queue = field(default_factory=lambda: asyncio.Queue(maxsize=50000))
outbound_task: Optional[asyncio.Task] = None
keepalive_task: Optional[asyncio.Task] = None
mark_i: int = 0
bot_speaking: bool = False
cancel_llm: CancelFlag = field(default_factory=CancelFlag)
tts_generation_id: int = 0
history: List[Dict] = field(default_factory=list)
bot_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
def bump_tts_generation(self) -> int:
self.tts_generation_id += 1
return self.tts_generation_id
async def twilio_keepalive(ws: WebSocket, st: CallState):
try:
while True:
await asyncio.sleep(10)
if st.stream_sid:
st.mark_i += 1
name = f"ka_{st.mark_i}"
await ws.send_text(json.dumps({
"event": "mark",
"streamSid": st.stream_sid,
"mark": {"name": name},
}))
P("TWILIO>", f"keepalive_mark={name}")
except asyncio.CancelledError:
return
except Exception as e:
P("SYS>", f"keepalive_error={e}")
# ----------------------------
# Twilio Voice Webhook (FIXES 405)
# Accept POST + GET + trailing slash
# ----------------------------
@app.post("/voice")
@app.post("/voice/")
@app.get("/voice")
@app.get("/voice/")
async def voice(request: Request):
stream_url = TWILIO_STREAM_URL
# Auto-build if not set
if not stream_url:
host = request.headers.get("host")
if host:
# mounted at /twilio, so final ws is /twilio/stream
stream_url = f"wss://{host}/twilio/stream"
P("SYS>", f"auto_stream_url={stream_url}")
if not stream_url:
return PlainTextResponse("TWILIO_STREAM_URL not set and host not found", status_code=500)
return Response(content=build_twiml(stream_url), media_type="application/xml")
@app.get("/health")
async def health():
return {"ok": True}
# ----------------------------
# WebSocket /stream (mounted => /twilio/stream)
# ----------------------------
@app.websocket("/stream")
async def stream(ws: WebSocket):
await ws.accept()
LAST_STATE["connected"] = True
LAST_STATE["status"] = "connected"
LAST_STATE["updated_ms"] = now_ms()
st = CallState(call_id=str(id(ws)))
st.history = [{"role": "system", "content": SYSTEM_PROMPT}]
P("SYS>", f"ws_open call_id={st.call_id}")
global _VOSK_MODEL
if _VOSK_MODEL is None:
P("SYS>", f"loading_vosk={VOSK_MODEL_PATH}")
_VOSK_MODEL = Model(VOSK_MODEL_PATH)
P("SYS>", "vosk_loaded")
st.rec = KaldiRecognizer(_VOSK_MODEL, STT_RATE)
st.rec.SetWords(False)
st.outbound_task = asyncio.create_task(outbound_sender(ws, st))
try:
while True:
raw = await ws.receive_text()
msg = json.loads(raw)
event = msg.get("event")
if event == "start":
st.stream_sid = msg["start"]["streamSid"]
P("TWILIO>", f"start streamSid={st.stream_sid}")
if st.keepalive_task is None:
st.keepalive_task = asyncio.create_task(twilio_keepalive(ws, st))
asyncio.create_task(speak_text(ws, st, "Hi! How can I help?"))
elif event == "media":
mulaw = base64.b64decode(msg["media"]["payload"])
pcm16_8k = audioop.ulaw2lin(mulaw, 2)
pcm16_16k, _ = audioop.ratecv(pcm16_8k, 2, 1, INPUT_RATE, STT_RATE, None)
rms = audioop.rms(pcm16_16k, 2)
is_speech = rms >= RMS_SPEECH_THRESHOLD
if st.bot_speaking and is_speech:
await barge_in(ws, st)
await vad_and_stt(ws, st, pcm16_16k, is_speech)
elif event == "stop":
P("TWILIO>", "stop")
break
except WebSocketDisconnect:
P("SYS>", "ws_disconnect")
except Exception as e:
P("SYS>", f"ws_error={e}")
log.exception("ws_error")
finally:
if st.keepalive_task:
st.keepalive_task.cancel()
if st.outbound_task:
st.outbound_task.cancel()
LAST_STATE["connected"] = False
LAST_STATE["status"] = "idle"
LAST_STATE["last_partial"] = ""
LAST_STATE["updated_ms"] = now_ms()
P("SYS>", "ws_closed")
# ----------------------------
# VAD + STT
# ----------------------------
async def vad_and_stt(ws: WebSocket, st: CallState, pcm16_16k: bytes, is_speech: bool):
t = now_ms()
if not st.in_speech:
if is_speech:
st.speech_start_count += 1
if st.speech_start_count >= SPEECH_START_FRAMES:
st.in_speech = True
st.silence_count = 0
st.utter_start_ms = t
st.speech_start_count = 0
st.last_partial = ""
st.last_partial_emit_ms = 0
st.rec = KaldiRecognizer(_VOSK_MODEL, STT_RATE)
st.rec.SetWords(False)
else:
st.speech_start_count = 0
return
st.rec.AcceptWaveform(pcm16_16k)
if t - st.last_partial_emit_ms >= PARTIAL_EMIT_EVERY_MS:
st.last_partial_emit_ms = t
try:
pj = json.loads(st.rec.PartialResult() or "{}")
partial = (pj.get("partial") or "").strip()
except Exception:
partial = ""
if partial and partial != st.last_partial:
st.last_partial = partial
P("STT_PART>", partial)
LAST_STATE["status"] = "listening"
LAST_STATE["last_partial"] = partial
LAST_STATE["updated_ms"] = now_ms()
if (t - st.utter_start_ms) > MAX_UTTERANCE_MS:
await finalize_utterance(ws, st, "max_utterance")
return
if is_speech:
st.silence_count = 0
return
st.silence_count += 1
if st.silence_count >= SPEECH_END_SILENCE_FRAMES:
await finalize_utterance(ws, st, f"vad_silence_{SPEECH_END_SILENCE_FRAMES*FRAME_MS}ms")
async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
if not st.in_speech:
return
st.in_speech = False
st.silence_count = 0
st.speech_start_count = 0
st.last_partial = ""
try:
j = json.loads(st.rec.FinalResult() or "{}")
except Exception:
j = {}
user_text = (j.get("text") or "").strip()
if not user_text:
return
P("STT_FINAL>", f"{user_text} ({reason})")
LAST_STATE["status"] = "thinking"
LAST_STATE["last_stt"] = user_text
LAST_STATE["last_partial"] = ""
LAST_STATE["updated_ms"] = now_ms()
async def bot_job():
async with st.bot_lock:
await answer_and_speak(ws, st, user_text)
asyncio.create_task(bot_job())
async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
st.history.append({"role": "user", "content": user_text})
st.history = st.history[:1] + st.history[-8:]
loop = asyncio.get_running_loop()
def worker():
return openai_answer_blocking(st.history, user_text)
ans = await loop.run_in_executor(None, worker)
ans = (ans or "").strip() or "Sorry, I didn’t catch that."
P("LLM_ANS>", ans)
LAST_STATE["last_llm"] = ans
LAST_STATE["status"] = "speaking"
LAST_STATE["updated_ms"] = now_ms()
st.history.append({"role": "assistant", "content": ans})
st.history = st.history[:1] + st.history[-8:]
await speak_text(ws, st, ans)
async def barge_in(ws: WebSocket, st: CallState):
st.bump_tts_generation()
if st.stream_sid:
try:
await ws.send_text(json.dumps({"event": "clear", "streamSid": st.stream_sid}))
P("TWILIO>", "sent_clear")
except Exception:
pass
await drain_queue(st.outbound_q)
st.bot_speaking = False
async def speak_text(ws: WebSocket, st: CallState, text: str):
gen = st.bump_tts_generation()
if st.stream_sid:
try:
await ws.send_text(json.dumps({"event": "clear", "streamSid": st.stream_sid}))
P("TWILIO>", "sent_clear")
except Exception:
pass
await drain_queue(st.outbound_q)
await tts_enqueue(st, text, gen)
async def tts_enqueue(st: CallState, text: str, gen: int):
st.bot_speaking = True
P("TTS>", f"text={text} gen={gen}")
loop = asyncio.get_running_loop()
try:
mulaw_bytes = await loop.run_in_executor(None, piper_tts_to_mulaw, text)
except Exception as e:
P("TTS_ERR>", str(e))
st.bot_speaking = False
return
if gen != st.tts_generation_id:
return
for fr in split_mulaw_frames(mulaw_bytes):
if gen != st.tts_generation_id:
return
await st.outbound_q.put(base64.b64encode(fr).decode("ascii"))
await st.outbound_q.put("__END_CHUNK__")
async def outbound_sender(ws: WebSocket, st: CallState):
try:
while True:
item = await st.outbound_q.get()
if item == "__END_CHUNK__":
await asyncio.sleep(0.02)
if st.outbound_q.empty():
st.bot_speaking = False
LAST_STATE["status"] = "connected"
LAST_STATE["updated_ms"] = now_ms()
st.outbound_q.task_done()
continue
if not st.stream_sid:
st.outbound_q.task_done()
continue
await ws.send_text(json.dumps({
"event": "media",
"streamSid": st.stream_sid,
"media": {"payload": item},
}))
st.outbound_q.task_done()
await asyncio.sleep(FRAME_MS / 1000.0)
except asyncio.CancelledError:
return
except Exception as e:
P("SYS>", f"outbound_sender_error={e}")
log.exception("outbound_sender_error")
@app.get("/debug/last")
async def debug_last():
return LAST_STATE