ashishkblink commited on
Commit
1a3eaef
·
verified ·
1 Parent(s): 0ef9177

Update pipe_method3.py

Browse files
Files changed (1) hide show
  1. pipe_method3.py +160 -46
pipe_method3.py CHANGED
@@ -1,15 +1,7 @@
1
  """
2
  Twilio Media Streams (bidirectional) + Vosk + OpenAI Answer + Piper -> Twilio playback
3
-
4
- What this version does:
5
- - NO intent / NO clarify JSON
6
- - Logs only:
7
- STT_FINAL> ...
8
- LLM_ANS> ...
9
- TTS> ...
10
- - Generation-id safe TTS (no self-cancel on Railway)
11
- - Better phone clarity using ffmpeg filters (highpass/lowpass/compand)
12
- - Proper 20ms pacing + keepalive marks to prevent WS idle timeouts
13
  """
14
 
15
  import asyncio
@@ -17,18 +9,18 @@ import base64
17
  import json
18
  import logging
19
  import os
20
- import re
21
  import tempfile
22
  import time
23
  import audioop
24
  import subprocess
25
- import threading
26
  from dataclasses import dataclass, field
27
  from typing import Optional, List, Dict
28
 
29
  from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
30
- from fastapi.responses import PlainTextResponse, Response
31
  from fastapi.middleware.cors import CORSMiddleware
 
 
32
  from vosk import Model, KaldiRecognizer
33
  from openai import OpenAI
34
 
@@ -38,9 +30,11 @@ from openai import OpenAI
38
  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
39
  log = logging.getLogger("app")
40
 
 
41
  def P(tag: str, msg: str):
42
  print(f"{tag} {msg}", flush=True)
43
 
 
44
  # ----------------------------
45
  # Env
46
  # ----------------------------
@@ -54,7 +48,8 @@ PIPER_BIN = os.getenv("PIPER_BIN", "piper").strip()
54
  PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "").strip()
55
 
56
  HOST = "0.0.0.0"
57
- PORT = int(os.getenv("PORT", "7860"))
 
58
 
59
  # ----------------------------
60
  # FastAPI
@@ -68,6 +63,37 @@ app.add_middleware(
68
  allow_headers=["*"],
69
  )
70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  # ----------------------------
72
  # Audio / Twilio
73
  # ----------------------------
@@ -76,15 +102,17 @@ INPUT_RATE = 8000
76
  STT_RATE = 16000
77
  BYTES_PER_20MS_MULAW = int(INPUT_RATE * (FRAME_MS / 1000.0)) # 160 bytes @ 8kHz, 20ms
78
 
 
79
  # ----------------------------
80
  # VAD settings
81
  # ----------------------------
82
- RMS_SPEECH_THRESHOLD = 450
83
  SPEECH_START_FRAMES = 3
84
  SPEECH_END_SILENCE_FRAMES = 40 # 800ms
85
  MAX_UTTERANCE_MS = 12000
86
  PARTIAL_EMIT_EVERY_MS = 250
87
 
 
88
  # ----------------------------
89
  # LLM prompt
90
  # ----------------------------
@@ -94,14 +122,17 @@ SYSTEM_PROMPT = (
94
  "No filler. No greetings unless user greets first."
95
  )
96
 
 
97
  # ----------------------------
98
  # Cached Vosk model
99
  # ----------------------------
100
  _VOSK_MODEL = None
101
 
 
102
  def now_ms() -> int:
103
  return int(time.time() * 1000)
104
 
 
105
  def build_twiml(stream_url: str) -> str:
106
  return f"""<?xml version="1.0" encoding="UTF-8"?>
107
  <Response>
@@ -112,6 +143,7 @@ def build_twiml(stream_url: str) -> str:
112
  </Response>
113
  """
114
 
 
115
  def split_mulaw_frames(mulaw_bytes: bytes) -> List[bytes]:
116
  frames = []
117
  for i in range(0, len(mulaw_bytes), BYTES_PER_20MS_MULAW):
@@ -121,6 +153,7 @@ def split_mulaw_frames(mulaw_bytes: bytes) -> List[bytes]:
121
  frames.append(chunk)
122
  return frames
123
 
 
124
  async def drain_queue(q: asyncio.Queue):
125
  try:
126
  while True:
@@ -129,6 +162,67 @@ async def drain_queue(q: asyncio.Queue):
129
  except asyncio.QueueEmpty:
130
  return
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  # ----------------------------
133
  # OpenAI
134
  # ----------------------------
@@ -137,10 +231,10 @@ def openai_client() -> OpenAI:
137
  raise RuntimeError("OPENAI_API_KEY not set")
138
  return OpenAI(api_key=OPENAI_API_KEY)
139
 
 
140
  def openai_answer_blocking(history: List[Dict], user_text: str) -> str:
141
  client = openai_client()
142
  msgs = [{"role": "system", "content": SYSTEM_PROMPT}]
143
- # short tail context
144
  tail = history[-6:] if len(history) > 1 else []
145
  msgs.extend(tail)
146
  msgs.append({"role": "user", "content": user_text})
@@ -154,8 +248,9 @@ def openai_answer_blocking(history: List[Dict], user_text: str) -> str:
154
  ans = (resp.choices[0].message.content or "").strip()
155
  return ans
156
 
 
157
  # ----------------------------
158
- # Piper TTS -> 8k mulaw (clarity improved)
159
  # ----------------------------
160
  def piper_tts_to_mulaw(text: str) -> bytes:
161
  if not PIPER_MODEL_PATH:
@@ -180,11 +275,6 @@ def piper_tts_to_mulaw(text: str) -> bytes:
180
  if r1.returncode != 0:
181
  raise RuntimeError(f"piper rc={r1.returncode} stderr={r1.stderr.decode('utf-8','ignore')[:500]}")
182
 
183
- # Phone-clarity filter chain:
184
- # - highpass removes rumble
185
- # - lowpass removes harshness
186
- # - compand evens volume (helps “clarity” on phone)
187
- # - dynaudnorm is avoided (can pump / distort at 8k)
188
  af = "highpass=f=200,lowpass=f=3400,compand=attacks=0:decays=0.3:points=-80/-80|-20/-10|0/-3"
189
 
190
  r2 = subprocess.run(
@@ -201,7 +291,6 @@ def piper_tts_to_mulaw(text: str) -> bytes:
201
  with open(mulaw_path, "rb") as f:
202
  data = f.read()
203
 
204
- P("TTS>", f"audio_bytes={len(data)}")
205
  return data
206
  finally:
207
  for p in (wav_path, mulaw_path):
@@ -210,6 +299,7 @@ def piper_tts_to_mulaw(text: str) -> bytes:
210
  except Exception:
211
  pass
212
 
 
213
  # ----------------------------
214
  # Call state
215
  # ----------------------------
@@ -219,6 +309,7 @@ class CancelFlag:
219
  def set(self):
220
  self.is_set = True
221
 
 
222
  @dataclass
223
  class CallState:
224
  call_id: str
@@ -255,8 +346,9 @@ class CallState:
255
  self.tts_generation_id += 1
256
  return self.tts_generation_id
257
 
 
258
  # ----------------------------
259
- # Keepalive marks (prevents WS ping timeout)
260
  # ----------------------------
261
  async def twilio_keepalive(ws: WebSocket, st: CallState):
262
  try:
@@ -270,12 +362,12 @@ async def twilio_keepalive(ws: WebSocket, st: CallState):
270
  "streamSid": st.stream_sid,
271
  "mark": {"name": name},
272
  }))
273
- P("TWILIO>", f"keepalive_mark={name}")
274
  except asyncio.CancelledError:
275
  return
276
  except Exception as e:
277
  P("SYS>", f"keepalive_error={e}")
278
 
 
279
  # ----------------------------
280
  # HTTP
281
  # ----------------------------
@@ -283,6 +375,7 @@ async def twilio_keepalive(ws: WebSocket, st: CallState):
283
  async def health():
284
  return {"ok": True}
285
 
 
286
  @app.post("/voice")
287
  async def voice(request: Request):
288
  stream_url = TWILIO_STREAM_URL
@@ -295,12 +388,14 @@ async def voice(request: Request):
295
  return PlainTextResponse("TWILIO_STREAM_URL not set and host not found", status_code=500)
296
  return Response(content=build_twiml(stream_url), media_type="application/xml")
297
 
 
298
  @app.get("/voice")
299
  async def voice_get(request: Request):
300
  return await voice(request)
301
 
 
302
  # ----------------------------
303
- # WebSocket /stream
304
  # ----------------------------
305
  @app.websocket("/stream")
306
  async def stream(ws: WebSocket):
@@ -330,10 +425,19 @@ async def stream(ws: WebSocket):
330
  st.stream_sid = msg["start"]["streamSid"]
331
  P("TWILIO>", f"start streamSid={st.stream_sid}")
332
 
 
 
 
 
 
 
 
 
 
333
  if st.keepalive_task is None:
334
  st.keepalive_task = asyncio.create_task(twilio_keepalive(ws, st))
335
 
336
- # optional short greeting
337
  asyncio.create_task(speak_text(ws, st, "Hi! How can I help?"))
338
 
339
  elif event == "media":
@@ -351,8 +455,8 @@ async def stream(ws: WebSocket):
351
  await vad_and_stt(ws, st, pcm16_16k, is_speech)
352
 
353
  elif event == "mark":
354
- name = (msg.get("mark") or {}).get("name")
355
- P("TWILIO>", f"mark_received={name}")
356
 
357
  elif event == "stop":
358
  P("TWILIO>", "stop")
@@ -364,12 +468,16 @@ async def stream(ws: WebSocket):
364
  P("SYS>", f"ws_error={e}")
365
  log.exception("ws_error")
366
  finally:
 
 
 
367
  if st.keepalive_task:
368
  st.keepalive_task.cancel()
369
  if st.outbound_task:
370
  st.outbound_task.cancel()
371
  P("SYS>", "ws_closed")
372
 
 
373
  # ----------------------------
374
  # VAD + STT
375
  # ----------------------------
@@ -395,7 +503,6 @@ async def vad_and_stt(ws: WebSocket, st: CallState, pcm16_16k: bytes, is_speech:
395
 
396
  st.rec.AcceptWaveform(pcm16_16k)
397
 
398
- # partial logging only (you said UI later)
399
  if t - st.last_partial_emit_ms >= PARTIAL_EMIT_EVERY_MS:
400
  st.last_partial_emit_ms = t
401
  try:
@@ -403,9 +510,12 @@ async def vad_and_stt(ws: WebSocket, st: CallState, pcm16_16k: bytes, is_speech:
403
  partial = (pj.get("partial") or "").strip()
404
  except Exception:
405
  partial = ""
 
406
  if partial and partial != st.last_partial:
407
  st.last_partial = partial
408
  P("STT_PART>", partial)
 
 
409
 
410
  if (t - st.utter_start_ms) > MAX_UTTERANCE_MS:
411
  await finalize_utterance(ws, st, "max_utterance")
@@ -419,6 +529,7 @@ async def vad_and_stt(ws: WebSocket, st: CallState, pcm16_16k: bytes, is_speech:
419
  if st.silence_count >= SPEECH_END_SILENCE_FRAMES:
420
  await finalize_utterance(ws, st, f"vad_silence_{SPEECH_END_SILENCE_FRAMES*FRAME_MS}ms")
421
 
 
422
  async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
423
  if not st.in_speech:
424
  return
@@ -438,6 +549,8 @@ async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
438
  return
439
 
440
  P("STT_FINAL>", f"{user_text} ({reason})")
 
 
441
 
442
  async def bot_job():
443
  async with st.bot_lock:
@@ -445,13 +558,13 @@ async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
445
 
446
  asyncio.create_task(bot_job())
447
 
 
448
  # ----------------------------
449
  # LLM Answer -> Speak
450
  # ----------------------------
451
  async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
452
  st.cancel_llm = CancelFlag(False)
453
 
454
- # store user
455
  st.history.append({"role": "user", "content": user_text})
456
  st.history = st.history[:1] + st.history[-8:]
457
 
@@ -466,13 +579,15 @@ async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
466
  ans = "Sorry, I didn’t catch that."
467
 
468
  P("LLM_ANS>", ans)
 
 
469
 
470
- # store assistant
471
  st.history.append({"role": "assistant", "content": ans})
472
  st.history = st.history[:1] + st.history[-8:]
473
 
474
  await speak_text(ws, st, ans)
475
 
 
476
  # ----------------------------
477
  # Barge-in (clear + drain)
478
  # ----------------------------
@@ -490,27 +605,20 @@ async def barge_in(ws: WebSocket, st: CallState):
490
  await drain_queue(st.outbound_q)
491
  st.bot_speaking = False
492
 
 
493
  # ----------------------------
494
- # Speak / TTS with generation-id
495
  # ----------------------------
496
  async def speak_text(ws: WebSocket, st: CallState, text: str):
497
  gen = st.bump_tts_generation()
498
-
499
- # clear previous audio
500
- if st.stream_sid:
501
- try:
502
- await ws.send_text(json.dumps({"event": "clear", "streamSid": st.stream_sid}))
503
- P("TWILIO>", "sent_clear")
504
- except Exception:
505
- pass
506
- await drain_queue(st.outbound_q)
507
-
508
  await tts_enqueue(st, text, gen)
509
 
 
510
  async def tts_enqueue(st: CallState, text: str, gen: int):
511
  my_gen = gen
512
  st.bot_speaking = True
513
  P("TTS>", f"text={text} gen={my_gen}")
 
514
 
515
  loop = asyncio.get_running_loop()
516
  try:
@@ -521,17 +629,22 @@ async def tts_enqueue(st: CallState, text: str, gen: int):
521
  return
522
 
523
  if my_gen != st.tts_generation_id:
524
- P("TTS>", f"discard_gen my_gen={my_gen} current_gen={st.tts_generation_id}")
525
  return
526
 
 
527
  for fr in split_mulaw_frames(mulaw_bytes):
528
  if my_gen != st.tts_generation_id:
529
- P("TTS>", f"discard_midstream my_gen={my_gen} current_gen={st.tts_generation_id}")
530
  return
531
  await st.outbound_q.put(base64.b64encode(fr).decode("ascii"))
532
 
 
 
 
 
 
533
  await st.outbound_q.put("__END_CHUNK__")
534
 
 
535
  async def outbound_sender(ws: WebSocket, st: CallState):
536
  try:
537
  while True:
@@ -563,10 +676,11 @@ async def outbound_sender(ws: WebSocket, st: CallState):
563
  P("SYS>", f"outbound_sender_error={e}")
564
  log.exception("outbound_sender_error")
565
 
 
566
  # ----------------------------
567
  # main
568
  # ----------------------------
569
  if __name__ == "__main__":
570
  import uvicorn
571
  P("SYS>", f"starting {HOST}:{PORT}")
572
- uvicorn.run(app, host=HOST, port=PORT)
 
1
  """
2
  Twilio Media Streams (bidirectional) + Vosk + OpenAI Answer + Piper -> Twilio playback
3
+ + Live UI (web_demo) showing STT/LLM in realtime
4
+ + Multi-call UI support (separate calls by streamSid)
 
 
 
 
 
 
 
 
5
  """
6
 
7
  import asyncio
 
9
  import json
10
  import logging
11
  import os
 
12
  import tempfile
13
  import time
14
  import audioop
15
  import subprocess
 
16
  from dataclasses import dataclass, field
17
  from typing import Optional, List, Dict
18
 
19
  from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
20
+ from fastapi.responses import PlainTextResponse, Response, FileResponse
21
  from fastapi.middleware.cors import CORSMiddleware
22
+ from fastapi.staticfiles import StaticFiles
23
+
24
  from vosk import Model, KaldiRecognizer
25
  from openai import OpenAI
26
 
 
30
  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
31
  log = logging.getLogger("app")
32
 
33
+
34
  def P(tag: str, msg: str):
35
  print(f"{tag} {msg}", flush=True)
36
 
37
+
38
  # ----------------------------
39
  # Env
40
  # ----------------------------
 
48
  PIPER_MODEL_PATH = os.getenv("PIPER_MODEL_PATH", "").strip()
49
 
50
  HOST = "0.0.0.0"
51
+ PORT = int(os.getenv("PORT", "7860")) # HF uses 7860
52
+
53
 
54
  # ----------------------------
55
  # FastAPI
 
63
  allow_headers=["*"],
64
  )
65
 
66
+
67
+ # ----------------------------
68
+ # Frontend serving (web_demo/dist)
69
+ # ----------------------------
70
+ FRONTEND_DIR = os.path.join(os.getcwd(), "web_demo", "dist")
71
+ ASSETS_DIR = os.path.join(FRONTEND_DIR, "assets")
72
+
73
+ if os.path.isdir(ASSETS_DIR):
74
+ app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets")
75
+
76
+
77
+ @app.get("/")
78
+ async def serve_index():
79
+ index_path = os.path.join(FRONTEND_DIR, "index.html")
80
+ if os.path.isfile(index_path):
81
+ return FileResponse(index_path)
82
+ return PlainTextResponse("UI not built. Ensure web_demo/dist exists.", status_code=404)
83
+
84
+
85
+ # SPA fallback (optional): if you later add routes like /dashboard
86
+ @app.get("/{path:path}")
87
+ async def serve_spa_fallback(path: str):
88
+ candidate = os.path.join(FRONTEND_DIR, path)
89
+ if os.path.isfile(candidate):
90
+ return FileResponse(candidate)
91
+ index_path = os.path.join(FRONTEND_DIR, "index.html")
92
+ if os.path.isfile(index_path):
93
+ return FileResponse(index_path)
94
+ return PlainTextResponse("Not Found", status_code=404)
95
+
96
+
97
  # ----------------------------
98
  # Audio / Twilio
99
  # ----------------------------
 
102
  STT_RATE = 16000
103
  BYTES_PER_20MS_MULAW = int(INPUT_RATE * (FRAME_MS / 1000.0)) # 160 bytes @ 8kHz, 20ms
104
 
105
+
106
  # ----------------------------
107
  # VAD settings
108
  # ----------------------------
109
+ RMS_SPEECH_THRESHOLD = 550
110
  SPEECH_START_FRAMES = 3
111
  SPEECH_END_SILENCE_FRAMES = 40 # 800ms
112
  MAX_UTTERANCE_MS = 12000
113
  PARTIAL_EMIT_EVERY_MS = 250
114
 
115
+
116
  # ----------------------------
117
  # LLM prompt
118
  # ----------------------------
 
122
  "No filler. No greetings unless user greets first."
123
  )
124
 
125
+
126
  # ----------------------------
127
  # Cached Vosk model
128
  # ----------------------------
129
  _VOSK_MODEL = None
130
 
131
+
132
  def now_ms() -> int:
133
  return int(time.time() * 1000)
134
 
135
+
136
  def build_twiml(stream_url: str) -> str:
137
  return f"""<?xml version="1.0" encoding="UTF-8"?>
138
  <Response>
 
143
  </Response>
144
  """
145
 
146
+
147
  def split_mulaw_frames(mulaw_bytes: bytes) -> List[bytes]:
148
  frames = []
149
  for i in range(0, len(mulaw_bytes), BYTES_PER_20MS_MULAW):
 
153
  frames.append(chunk)
154
  return frames
155
 
156
+
157
  async def drain_queue(q: asyncio.Queue):
158
  try:
159
  while True:
 
162
  except asyncio.QueueEmpty:
163
  return
164
 
165
+
166
+ # ----------------------------
167
+ # UI live dashboard (multi-call)
168
+ # ----------------------------
169
+ _UI_CLIENTS = set()
170
+ _UI_LOCK = asyncio.Lock()
171
+
172
+ ACTIVE_CALLS: Dict[str, Dict] = {} # key: streamSid
173
+ ACTIVE_LOCK = asyncio.Lock()
174
+
175
+
176
+ async def ui_broadcast(event: str, data: dict):
177
+ msg = {"event": event, "data": data, "ts_ms": now_ms()}
178
+ dead = []
179
+ async with _UI_LOCK:
180
+ for c in list(_UI_CLIENTS):
181
+ try:
182
+ await c.send_text(json.dumps(msg))
183
+ except Exception:
184
+ dead.append(c)
185
+ for c in dead:
186
+ _UI_CLIENTS.discard(c)
187
+
188
+
189
+ async def upsert_call(stream_sid: str, **fields):
190
+ if not stream_sid:
191
+ return
192
+ async with ACTIVE_LOCK:
193
+ row = ACTIVE_CALLS.get(stream_sid, {})
194
+ row.update(fields)
195
+ ACTIVE_CALLS[stream_sid] = row
196
+
197
+
198
+ async def remove_call(stream_sid: str):
199
+ if not stream_sid:
200
+ return
201
+ async with ACTIVE_LOCK:
202
+ ACTIVE_CALLS.pop(stream_sid, None)
203
+
204
+
205
+ @app.websocket("/ui/ws")
206
+ async def ui_ws(ws: WebSocket):
207
+ await ws.accept()
208
+ async with _UI_LOCK:
209
+ _UI_CLIENTS.add(ws)
210
+ try:
211
+ while True:
212
+ await asyncio.sleep(60)
213
+ except WebSocketDisconnect:
214
+ pass
215
+ finally:
216
+ async with _UI_LOCK:
217
+ _UI_CLIENTS.discard(ws)
218
+
219
+
220
+ @app.get("/ui/calls")
221
+ async def ui_calls():
222
+ async with ACTIVE_LOCK:
223
+ return {k: dict(v) for k, v in ACTIVE_CALLS.items()}
224
+
225
+
226
  # ----------------------------
227
  # OpenAI
228
  # ----------------------------
 
231
  raise RuntimeError("OPENAI_API_KEY not set")
232
  return OpenAI(api_key=OPENAI_API_KEY)
233
 
234
+
235
  def openai_answer_blocking(history: List[Dict], user_text: str) -> str:
236
  client = openai_client()
237
  msgs = [{"role": "system", "content": SYSTEM_PROMPT}]
 
238
  tail = history[-6:] if len(history) > 1 else []
239
  msgs.extend(tail)
240
  msgs.append({"role": "user", "content": user_text})
 
248
  ans = (resp.choices[0].message.content or "").strip()
249
  return ans
250
 
251
+
252
  # ----------------------------
253
+ # Piper TTS -> 8k mulaw
254
  # ----------------------------
255
  def piper_tts_to_mulaw(text: str) -> bytes:
256
  if not PIPER_MODEL_PATH:
 
275
  if r1.returncode != 0:
276
  raise RuntimeError(f"piper rc={r1.returncode} stderr={r1.stderr.decode('utf-8','ignore')[:500]}")
277
 
 
 
 
 
 
278
  af = "highpass=f=200,lowpass=f=3400,compand=attacks=0:decays=0.3:points=-80/-80|-20/-10|0/-3"
279
 
280
  r2 = subprocess.run(
 
291
  with open(mulaw_path, "rb") as f:
292
  data = f.read()
293
 
 
294
  return data
295
  finally:
296
  for p in (wav_path, mulaw_path):
 
299
  except Exception:
300
  pass
301
 
302
+
303
  # ----------------------------
304
  # Call state
305
  # ----------------------------
 
309
  def set(self):
310
  self.is_set = True
311
 
312
+
313
  @dataclass
314
  class CallState:
315
  call_id: str
 
346
  self.tts_generation_id += 1
347
  return self.tts_generation_id
348
 
349
+
350
  # ----------------------------
351
+ # Keepalive marks
352
  # ----------------------------
353
  async def twilio_keepalive(ws: WebSocket, st: CallState):
354
  try:
 
362
  "streamSid": st.stream_sid,
363
  "mark": {"name": name},
364
  }))
 
365
  except asyncio.CancelledError:
366
  return
367
  except Exception as e:
368
  P("SYS>", f"keepalive_error={e}")
369
 
370
+
371
  # ----------------------------
372
  # HTTP
373
  # ----------------------------
 
375
  async def health():
376
  return {"ok": True}
377
 
378
+
379
  @app.post("/voice")
380
  async def voice(request: Request):
381
  stream_url = TWILIO_STREAM_URL
 
388
  return PlainTextResponse("TWILIO_STREAM_URL not set and host not found", status_code=500)
389
  return Response(content=build_twiml(stream_url), media_type="application/xml")
390
 
391
+
392
  @app.get("/voice")
393
  async def voice_get(request: Request):
394
  return await voice(request)
395
 
396
+
397
  # ----------------------------
398
+ # WebSocket /stream (Twilio)
399
  # ----------------------------
400
  @app.websocket("/stream")
401
  async def stream(ws: WebSocket):
 
425
  st.stream_sid = msg["start"]["streamSid"]
426
  P("TWILIO>", f"start streamSid={st.stream_sid}")
427
 
428
+ await upsert_call(
429
+ st.stream_sid,
430
+ call_id=st.call_id,
431
+ started_ms=now_ms(),
432
+ last_seen_ms=now_ms(),
433
+ last_event="start",
434
+ )
435
+ await ui_broadcast("call_start", {"streamSid": st.stream_sid, "call_id": st.call_id})
436
+
437
  if st.keepalive_task is None:
438
  st.keepalive_task = asyncio.create_task(twilio_keepalive(ws, st))
439
 
440
+ # greeting (optional)
441
  asyncio.create_task(speak_text(ws, st, "Hi! How can I help?"))
442
 
443
  elif event == "media":
 
455
  await vad_and_stt(ws, st, pcm16_16k, is_speech)
456
 
457
  elif event == "mark":
458
+ # ignore; keepalive
459
+ pass
460
 
461
  elif event == "stop":
462
  P("TWILIO>", "stop")
 
468
  P("SYS>", f"ws_error={e}")
469
  log.exception("ws_error")
470
  finally:
471
+ if st.stream_sid:
472
+ await remove_call(st.stream_sid)
473
+ await ui_broadcast("call_end", {"streamSid": st.stream_sid})
474
  if st.keepalive_task:
475
  st.keepalive_task.cancel()
476
  if st.outbound_task:
477
  st.outbound_task.cancel()
478
  P("SYS>", "ws_closed")
479
 
480
+
481
  # ----------------------------
482
  # VAD + STT
483
  # ----------------------------
 
503
 
504
  st.rec.AcceptWaveform(pcm16_16k)
505
 
 
506
  if t - st.last_partial_emit_ms >= PARTIAL_EMIT_EVERY_MS:
507
  st.last_partial_emit_ms = t
508
  try:
 
510
  partial = (pj.get("partial") or "").strip()
511
  except Exception:
512
  partial = ""
513
+
514
  if partial and partial != st.last_partial:
515
  st.last_partial = partial
516
  P("STT_PART>", partial)
517
+ await upsert_call(st.stream_sid, last_seen_ms=t, last_event="stt_partial")
518
+ await ui_broadcast("stt_partial", {"streamSid": st.stream_sid, "text": partial})
519
 
520
  if (t - st.utter_start_ms) > MAX_UTTERANCE_MS:
521
  await finalize_utterance(ws, st, "max_utterance")
 
529
  if st.silence_count >= SPEECH_END_SILENCE_FRAMES:
530
  await finalize_utterance(ws, st, f"vad_silence_{SPEECH_END_SILENCE_FRAMES*FRAME_MS}ms")
531
 
532
+
533
  async def finalize_utterance(ws: WebSocket, st: CallState, reason: str):
534
  if not st.in_speech:
535
  return
 
549
  return
550
 
551
  P("STT_FINAL>", f"{user_text} ({reason})")
552
+ await upsert_call(st.stream_sid, last_seen_ms=now_ms(), last_event="stt_final", last_user_text=user_text)
553
+ await ui_broadcast("stt_final", {"streamSid": st.stream_sid, "text": user_text, "reason": reason})
554
 
555
  async def bot_job():
556
  async with st.bot_lock:
 
558
 
559
  asyncio.create_task(bot_job())
560
 
561
+
562
  # ----------------------------
563
  # LLM Answer -> Speak
564
  # ----------------------------
565
  async def answer_and_speak(ws: WebSocket, st: CallState, user_text: str):
566
  st.cancel_llm = CancelFlag(False)
567
 
 
568
  st.history.append({"role": "user", "content": user_text})
569
  st.history = st.history[:1] + st.history[-8:]
570
 
 
579
  ans = "Sorry, I didn’t catch that."
580
 
581
  P("LLM_ANS>", ans)
582
+ await upsert_call(st.stream_sid, last_seen_ms=now_ms(), last_event="llm_ans", last_bot_text=ans)
583
+ await ui_broadcast("llm_ans", {"streamSid": st.stream_sid, "text": ans})
584
 
 
585
  st.history.append({"role": "assistant", "content": ans})
586
  st.history = st.history[:1] + st.history[-8:]
587
 
588
  await speak_text(ws, st, ans)
589
 
590
+
591
  # ----------------------------
592
  # Barge-in (clear + drain)
593
  # ----------------------------
 
605
  await drain_queue(st.outbound_q)
606
  st.bot_speaking = False
607
 
608
+
609
  # ----------------------------
610
+ # Speak / TTS (no clear here; clear only on barge-in)
611
  # ----------------------------
612
  async def speak_text(ws: WebSocket, st: CallState, text: str):
613
  gen = st.bump_tts_generation()
 
 
 
 
 
 
 
 
 
 
614
  await tts_enqueue(st, text, gen)
615
 
616
+
617
  async def tts_enqueue(st: CallState, text: str, gen: int):
618
  my_gen = gen
619
  st.bot_speaking = True
620
  P("TTS>", f"text={text} gen={my_gen}")
621
+ await ui_broadcast("tts", {"streamSid": st.stream_sid, "text": text, "gen": my_gen})
622
 
623
  loop = asyncio.get_running_loop()
624
  try:
 
629
  return
630
 
631
  if my_gen != st.tts_generation_id:
 
632
  return
633
 
634
+ # enqueue audio frames
635
  for fr in split_mulaw_frames(mulaw_bytes):
636
  if my_gen != st.tts_generation_id:
 
637
  return
638
  await st.outbound_q.put(base64.b64encode(fr).decode("ascii"))
639
 
640
+ # add a short silence tail to prevent cutoff
641
+ silence = base64.b64encode(b"\xFF" * BYTES_PER_20MS_MULAW).decode("ascii")
642
+ for _ in range(6): # ~120ms
643
+ await st.outbound_q.put(silence)
644
+
645
  await st.outbound_q.put("__END_CHUNK__")
646
 
647
+
648
  async def outbound_sender(ws: WebSocket, st: CallState):
649
  try:
650
  while True:
 
676
  P("SYS>", f"outbound_sender_error={e}")
677
  log.exception("outbound_sender_error")
678
 
679
+
680
  # ----------------------------
681
  # main
682
  # ----------------------------
683
  if __name__ == "__main__":
684
  import uvicorn
685
  P("SYS>", f"starting {HOST}:{PORT}")
686
+ uvicorn.run(app, host=HOST, port=PORT)