ashishkblink commited on
Commit
9e1ff8d
·
verified ·
1 Parent(s): 01cb0e6

Update pipe_method3.py

Browse files
Files changed (1) hide show
  1. pipe_method3.py +36 -73
pipe_method3.py CHANGED
@@ -1,18 +1,9 @@
1
  """
2
  Twilio Media Streams (bidirectional) + Vosk + OpenAI Answer + Piper -> Twilio playback
3
- What this version does:
4
- - NO intent / NO clarify JSON
5
- - Logs only:
6
- STT_FINAL> ...
7
- LLM_ANS> ...
8
- TTS> ...
9
- - Generation-id safe TTS
10
- - Better phone clarity using ffmpeg filters (highpass/lowpass/compand)
11
- - Proper 20ms pacing + keepalive marks to prevent WS idle timeouts
12
 
13
  HF Spaces notes:
14
  - DO NOT run uvicorn here
15
- - This FastAPI app is mounted at /twilio in app.py
16
  so routes here must be relative:
17
  POST /voice -> /twilio/voice
18
  WS /stream -> /twilio/stream
@@ -36,6 +27,7 @@ from fastapi.middleware.cors import CORSMiddleware
36
  from vosk import Model, KaldiRecognizer
37
  from openai import OpenAI
38
 
 
39
  # ----------------------------
40
  # Logging
41
  # ----------------------------
@@ -95,18 +87,12 @@ LAST_STATE = {
95
  "updated_ms": 0,
96
  }
97
 
98
- # ----------------------------
99
- # LLM prompt
100
- # ----------------------------
101
  SYSTEM_PROMPT = (
102
  "You are a phone-call assistant. "
103
  "Reply in 1 short sentence (max 15 words). "
104
  "No filler. No greetings unless user greets first."
105
  )
106
 
107
- # ----------------------------
108
- # Cached Vosk model
109
- # ----------------------------
110
  _VOSK_MODEL = None
111
 
112
  def now_ms() -> int:
@@ -160,11 +146,10 @@ def openai_answer_blocking(history: List[Dict], user_text: str) -> str:
160
  temperature=0.3,
161
  max_tokens=80,
162
  )
163
- ans = (resp.choices[0].message.content or "").strip()
164
- return ans
165
 
166
  # ----------------------------
167
- # Piper TTS -> 8k mulaw (clarity improved)
168
  # ----------------------------
169
  def piper_tts_to_mulaw(text: str) -> bytes:
170
  if not PIPER_MODEL_PATH:
@@ -189,6 +174,7 @@ def piper_tts_to_mulaw(text: str) -> bytes:
189
  if r1.returncode != 0:
190
  raise RuntimeError(f"piper rc={r1.returncode} stderr={r1.stderr.decode('utf-8','ignore')[:500]}")
191
 
 
192
  af = "highpass=f=200,lowpass=f=3400,compand=attacks=0:decays=0.3:points=-80/-80|-20/-10|0/-3"
193
 
194
  r2 = subprocess.run(
@@ -203,10 +189,7 @@ def piper_tts_to_mulaw(text: str) -> bytes:
203
  raise RuntimeError(f"ffmpeg rc={r2.returncode} stderr={r2.stderr.decode('utf-8','ignore')[:500]}")
204
 
205
  with open(mulaw_path, "rb") as f:
206
- data = f.read()
207
-
208
- P("TTS>", f"audio_bytes={len(data)}")
209
- return data
210
  finally:
211
  for p in (wav_path, mulaw_path):
212
  try:
@@ -214,6 +197,7 @@ def piper_tts_to_mulaw(text: str) -> bytes:
214
  except Exception:
215
  pass
216
 
 
217
  # ----------------------------
218
  # Call state
219
  # ----------------------------
@@ -228,7 +212,6 @@ class CallState:
228
  call_id: str
229
  stream_sid: str = ""
230
 
231
- # vad
232
  in_speech: bool = False
233
  speech_start_count: int = 0
234
  silence_count: int = 0
@@ -236,22 +219,18 @@ class CallState:
236
 
237
  rec: Optional[KaldiRecognizer] = None
238
 
239
- # partials
240
  last_partial: str = ""
241
  last_partial_emit_ms: int = 0
242
 
243
- # outbound
244
  outbound_q: asyncio.Queue = field(default_factory=lambda: asyncio.Queue(maxsize=50000))
245
  outbound_task: Optional[asyncio.Task] = None
246
  keepalive_task: Optional[asyncio.Task] = None
247
  mark_i: int = 0
248
 
249
- # speaking / generation
250
  bot_speaking: bool = False
251
  cancel_llm: CancelFlag = field(default_factory=CancelFlag)
252
  tts_generation_id: int = 0
253
 
254
- # conversation history
255
  history: List[Dict] = field(default_factory=list)
256
  bot_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
257
 
@@ -259,9 +238,7 @@ class CallState:
259
  self.tts_generation_id += 1
260
  return self.tts_generation_id
261
 
262
- # ----------------------------
263
- # Keepalive marks (prevents WS ping timeout)
264
- # ----------------------------
265
  async def twilio_keepalive(ws: WebSocket, st: CallState):
266
  try:
267
  while True:
@@ -280,39 +257,39 @@ async def twilio_keepalive(ws: WebSocket, st: CallState):
280
  except Exception as e:
281
  P("SYS>", f"keepalive_error={e}")
282
 
 
283
  # ----------------------------
284
- # HTTP (RELATIVE ROUTES)
285
- # Final URLs will be:
286
- # POST /twilio/voice
287
- # WS /twilio/stream
288
  # ----------------------------
289
- @app.get("/health")
290
- async def health():
291
- return {"ok": True}
292
-
293
  @app.post("/voice")
 
 
 
294
  async def voice(request: Request):
295
  stream_url = TWILIO_STREAM_URL
 
 
296
  if not stream_url:
297
  host = request.headers.get("host")
298
  if host:
299
- # because this app is mounted at /twilio
300
  stream_url = f"wss://{host}/twilio/stream"
301
  P("SYS>", f"auto_stream_url={stream_url}")
 
302
  if not stream_url:
303
  return PlainTextResponse("TWILIO_STREAM_URL not set and host not found", status_code=500)
 
304
  return Response(content=build_twiml(stream_url), media_type="application/xml")
305
 
306
- @app.get("/voice")
307
- async def voice_get(request: Request):
308
- return await voice(request)
309
 
310
- @app.get("/debug/last")
311
- async def debug_last():
312
- return LAST_STATE
 
313
 
314
  # ----------------------------
315
- # WebSocket /stream (RELATIVE)
316
  # ----------------------------
317
  @app.websocket("/stream")
318
  async def stream(ws: WebSocket):
@@ -360,10 +337,6 @@ async def stream(ws: WebSocket):
360
 
361
  await vad_and_stt(ws, st, pcm16_16k, is_speech)
362
 
363
- elif event == "mark":
364
- name = (msg.get("mark") or {}).get("name")
365
- P("TWILIO>", f"mark_received={name}")
366
-
367
  elif event == "stop":
368
  P("TWILIO>", "stop")
369
  break
@@ -380,6 +353,7 @@ async def stream(ws: WebSocket):
380
  st.outbound_task.cancel()
381
  P("SYS>", "ws_closed")
382
 
 
383
  # ----------------------------
384
  # VAD + STT
385
  # ----------------------------
@@ -428,6 +402,7 @@ async def vad_and_stt(ws: WebSocket, st: CallState, pcm16_16k: bytes, is_speech:
428
  if st.silence_count >= SPEECH_END_SILENCE_FRAMES:
429
  await finalize_utterance(ws, st, f"vad_silence_{SPEECH_END_SILENCE_FRAMES*FRAME_MS}ms")
430
 
 
431
  async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
432
  if not st.in_speech:
433
  return
@@ -454,12 +429,8 @@ async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
454
 
455
  asyncio.create_task(bot_job())
456
 
457
- # ----------------------------
458
- # LLM Answer -> Speak
459
- # ----------------------------
460
- async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
461
- st.cancel_llm = CancelFlag(False)
462
 
 
463
  st.history.append({"role": "user", "content": user_text})
464
  st.history = st.history[:1] + st.history[-8:]
465
 
@@ -469,9 +440,7 @@ async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
469
  return openai_answer_blocking(st.history, user_text)
470
 
471
  ans = await loop.run_in_executor(None, worker)
472
- ans = (ans or "").strip()
473
- if not ans:
474
- ans = "Sorry, I didn’t catch that."
475
 
476
  P("LLM_ANS>", ans)
477
 
@@ -480,11 +449,8 @@ async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
480
 
481
  await speak_text(ws, st, ans)
482
 
483
- # ----------------------------
484
- # Barge-in (clear + drain)
485
- # ----------------------------
486
  async def barge_in(ws: WebSocket, st: CallState):
487
- st.cancel_llm.set()
488
  st.bump_tts_generation()
489
 
490
  if st.stream_sid:
@@ -497,9 +463,7 @@ async def barge_in(ws: WebSocket, st: CallState):
497
  await drain_queue(st.outbound_q)
498
  st.bot_speaking = False
499
 
500
- # ----------------------------
501
- # Speak / TTS with generation-id
502
- # ----------------------------
503
  async def speak_text(ws: WebSocket, st: CallState, text: str):
504
  gen = st.bump_tts_generation()
505
 
@@ -509,14 +473,14 @@ async def speak_text(ws: WebSocket, st: CallState, text: str):
509
  P("TWILIO>", "sent_clear")
510
  except Exception:
511
  pass
512
- await drain_queue(st.outbound_q)
513
 
 
514
  await tts_enqueue(st, text, gen)
515
 
 
516
  async def tts_enqueue(st: CallState, text: str, gen: int):
517
- my_gen = gen
518
  st.bot_speaking = True
519
- P("TTS>", f"text={text} gen={my_gen}")
520
 
521
  loop = asyncio.get_running_loop()
522
  try:
@@ -526,18 +490,17 @@ async def tts_enqueue(st: CallState, text: str, gen: int):
526
  st.bot_speaking = False
527
  return
528
 
529
- if my_gen != st.tts_generation_id:
530
- P("TTS>", f"discard_gen my_gen={my_gen} current_gen={st.tts_generation_id}")
531
  return
532
 
533
  for fr in split_mulaw_frames(mulaw_bytes):
534
- if my_gen != st.tts_generation_id:
535
- P("TTS>", f"discard_midstream my_gen={my_gen} current_gen={st.tts_generation_id}")
536
  return
537
  await st.outbound_q.put(base64.b64encode(fr).decode("ascii"))
538
 
539
  await st.outbound_q.put("__END_CHUNK__")
540
 
 
541
  async def outbound_sender(ws: WebSocket, st: CallState):
542
  try:
543
  while True:
 
1
  """
2
  Twilio Media Streams (bidirectional) + Vosk + OpenAI Answer + Piper -> Twilio playback
 
 
 
 
 
 
 
 
 
3
 
4
  HF Spaces notes:
5
  - DO NOT run uvicorn here
6
+ - This app is mounted at /twilio in app.py
7
  so routes here must be relative:
8
  POST /voice -> /twilio/voice
9
  WS /stream -> /twilio/stream
 
27
  from vosk import Model, KaldiRecognizer
28
  from openai import OpenAI
29
 
30
+
31
  # ----------------------------
32
  # Logging
33
  # ----------------------------
 
87
  "updated_ms": 0,
88
  }
89
 
 
 
 
90
  SYSTEM_PROMPT = (
91
  "You are a phone-call assistant. "
92
  "Reply in 1 short sentence (max 15 words). "
93
  "No filler. No greetings unless user greets first."
94
  )
95
 
 
 
 
96
  _VOSK_MODEL = None
97
 
98
  def now_ms() -> int:
 
146
  temperature=0.3,
147
  max_tokens=80,
148
  )
149
+ return (resp.choices[0].message.content or "").strip()
 
150
 
151
  # ----------------------------
152
+ # Piper TTS -> 8k mulaw
153
  # ----------------------------
154
  def piper_tts_to_mulaw(text: str) -> bytes:
155
  if not PIPER_MODEL_PATH:
 
174
  if r1.returncode != 0:
175
  raise RuntimeError(f"piper rc={r1.returncode} stderr={r1.stderr.decode('utf-8','ignore')[:500]}")
176
 
177
+ # clarity filter for phone audio
178
  af = "highpass=f=200,lowpass=f=3400,compand=attacks=0:decays=0.3:points=-80/-80|-20/-10|0/-3"
179
 
180
  r2 = subprocess.run(
 
189
  raise RuntimeError(f"ffmpeg rc={r2.returncode} stderr={r2.stderr.decode('utf-8','ignore')[:500]}")
190
 
191
  with open(mulaw_path, "rb") as f:
192
+ return f.read()
 
 
 
193
  finally:
194
  for p in (wav_path, mulaw_path):
195
  try:
 
197
  except Exception:
198
  pass
199
 
200
+
201
  # ----------------------------
202
  # Call state
203
  # ----------------------------
 
212
  call_id: str
213
  stream_sid: str = ""
214
 
 
215
  in_speech: bool = False
216
  speech_start_count: int = 0
217
  silence_count: int = 0
 
219
 
220
  rec: Optional[KaldiRecognizer] = None
221
 
 
222
  last_partial: str = ""
223
  last_partial_emit_ms: int = 0
224
 
 
225
  outbound_q: asyncio.Queue = field(default_factory=lambda: asyncio.Queue(maxsize=50000))
226
  outbound_task: Optional[asyncio.Task] = None
227
  keepalive_task: Optional[asyncio.Task] = None
228
  mark_i: int = 0
229
 
 
230
  bot_speaking: bool = False
231
  cancel_llm: CancelFlag = field(default_factory=CancelFlag)
232
  tts_generation_id: int = 0
233
 
 
234
  history: List[Dict] = field(default_factory=list)
235
  bot_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
236
 
 
238
  self.tts_generation_id += 1
239
  return self.tts_generation_id
240
 
241
+
 
 
242
  async def twilio_keepalive(ws: WebSocket, st: CallState):
243
  try:
244
  while True:
 
257
  except Exception as e:
258
  P("SYS>", f"keepalive_error={e}")
259
 
260
+
261
  # ----------------------------
262
+ # Twilio Voice Webhook (FIXES 405)
263
+ # Accept POST + GET + trailing slash
 
 
264
  # ----------------------------
 
 
 
 
265
  @app.post("/voice")
266
+ @app.post("/voice/")
267
+ @app.get("/voice")
268
+ @app.get("/voice/")
269
  async def voice(request: Request):
270
  stream_url = TWILIO_STREAM_URL
271
+
272
+ # Auto-build if not set
273
  if not stream_url:
274
  host = request.headers.get("host")
275
  if host:
276
+ # mounted at /twilio, so final ws is /twilio/stream
277
  stream_url = f"wss://{host}/twilio/stream"
278
  P("SYS>", f"auto_stream_url={stream_url}")
279
+
280
  if not stream_url:
281
  return PlainTextResponse("TWILIO_STREAM_URL not set and host not found", status_code=500)
282
+
283
  return Response(content=build_twiml(stream_url), media_type="application/xml")
284
 
 
 
 
285
 
286
+ @app.get("/health")
287
+ async def health():
288
+ return {"ok": True}
289
+
290
 
291
  # ----------------------------
292
+ # WebSocket /stream (mounted => /twilio/stream)
293
  # ----------------------------
294
  @app.websocket("/stream")
295
  async def stream(ws: WebSocket):
 
337
 
338
  await vad_and_stt(ws, st, pcm16_16k, is_speech)
339
 
 
 
 
 
340
  elif event == "stop":
341
  P("TWILIO>", "stop")
342
  break
 
353
  st.outbound_task.cancel()
354
  P("SYS>", "ws_closed")
355
 
356
+
357
  # ----------------------------
358
  # VAD + STT
359
  # ----------------------------
 
402
  if st.silence_count >= SPEECH_END_SILENCE_FRAMES:
403
  await finalize_utterance(ws, st, f"vad_silence_{SPEECH_END_SILENCE_FRAMES*FRAME_MS}ms")
404
 
405
+
406
  async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
407
  if not st.in_speech:
408
  return
 
429
 
430
  asyncio.create_task(bot_job())
431
 
 
 
 
 
 
432
 
433
+ async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
434
  st.history.append({"role": "user", "content": user_text})
435
  st.history = st.history[:1] + st.history[-8:]
436
 
 
440
  return openai_answer_blocking(st.history, user_text)
441
 
442
  ans = await loop.run_in_executor(None, worker)
443
+ ans = (ans or "").strip() or "Sorry, I didn’t catch that."
 
 
444
 
445
  P("LLM_ANS>", ans)
446
 
 
449
 
450
  await speak_text(ws, st, ans)
451
 
452
+
 
 
453
  async def barge_in(ws: WebSocket, st: CallState):
 
454
  st.bump_tts_generation()
455
 
456
  if st.stream_sid:
 
463
  await drain_queue(st.outbound_q)
464
  st.bot_speaking = False
465
 
466
+
 
 
467
  async def speak_text(ws: WebSocket, st: CallState, text: str):
468
  gen = st.bump_tts_generation()
469
 
 
473
  P("TWILIO>", "sent_clear")
474
  except Exception:
475
  pass
 
476
 
477
+ await drain_queue(st.outbound_q)
478
  await tts_enqueue(st, text, gen)
479
 
480
+
481
  async def tts_enqueue(st: CallState, text: str, gen: int):
 
482
  st.bot_speaking = True
483
+ P("TTS>", f"text={text} gen={gen}")
484
 
485
  loop = asyncio.get_running_loop()
486
  try:
 
490
  st.bot_speaking = False
491
  return
492
 
493
+ if gen != st.tts_generation_id:
 
494
  return
495
 
496
  for fr in split_mulaw_frames(mulaw_bytes):
497
+ if gen != st.tts_generation_id:
 
498
  return
499
  await st.outbound_q.put(base64.b64encode(fr).decode("ascii"))
500
 
501
  await st.outbound_q.put("__END_CHUNK__")
502
 
503
+
504
  async def outbound_sender(ws: WebSocket, st: CallState):
505
  try:
506
  while True: