Joffrey Thomas commited on
Commit
74d0265
·
1 Parent(s): 1586e1f

change app.py

Browse files
Files changed (1) hide show
  1. app.py +27 -20
app.py CHANGED
@@ -3,6 +3,7 @@ import asyncio
3
  import base64
4
  import json
5
  import os
 
6
  import threading
7
  import time
8
  import uuid
@@ -124,7 +125,8 @@ class UserSession:
124
  """Per-user session state."""
125
  def __init__(self):
126
  self.session_id = str(uuid.uuid4())
127
- self._audio_queue = None # Created lazily in the correct event loop
 
128
  self.transcription_text = ""
129
  self.is_running = False
130
  self.status_message = "ready"
@@ -137,14 +139,12 @@ class UserSession:
137
 
138
  @property
139
  def audio_queue(self):
140
- """Lazily create audio queue to ensure it's in the right event loop."""
141
- if self._audio_queue is None:
142
- self._audio_queue = asyncio.Queue(maxsize=100)
143
  return self._audio_queue
144
 
145
  def reset_queue(self):
146
  """Reset the audio queue."""
147
- self._audio_queue = asyncio.Queue(maxsize=100)
148
 
149
 
150
  # Load CSS from external file
@@ -298,8 +298,12 @@ async def send_silence(ws, duration=2.0):
298
  async def websocket_handler(session):
299
  """Connect to WebSocket and handle audio streaming + transcription."""
300
  try:
301
- async with websockets.connect(ws_url) as ws:
302
- await ws.recv()
 
 
 
 
303
  await ws.send(json.dumps({"type": "session.update", "model": model}))
304
 
305
  session.status_message = "warming"
@@ -325,15 +329,18 @@ async def websocket_handler(session):
325
  session.status_message = "timeout"
326
  break
327
 
 
328
  try:
329
- chunk = await asyncio.wait_for(session.audio_queue.get(), timeout=0.1)
330
  if session.is_running:
331
  await ws.send(
332
  json.dumps(
333
  {"type": "input_audio_buffer.append", "audio": chunk}
334
  )
335
  )
336
- except asyncio.TimeoutError:
 
 
337
  continue
338
  except Exception as e:
339
  if session.is_running: # Only log if unexpected
@@ -376,6 +383,9 @@ async def websocket_handler(session):
376
  pass # Normal cancellation
377
  except websockets.exceptions.ConnectionClosed:
378
  pass # Normal closure
 
 
 
379
  except Exception as e:
380
  error_msg = str(e) if str(e) else type(e).__name__
381
  if "ConnectionReset" not in error_msg: # Suppress common disconnect errors
@@ -386,7 +396,8 @@ async def websocket_handler(session):
386
  # Remove from active sessions
387
  with _sessions_lock:
388
  _active_sessions.pop(session.session_id, None)
389
- # Note: Don't remove from registry here - session might be reused
 
390
 
391
 
392
  def start_websocket(session):
@@ -396,6 +407,9 @@ def start_websocket(session):
396
  # Register this session
397
  with _sessions_lock:
398
  _active_sessions[session.session_id] = session
 
 
 
399
 
400
  # Submit to the shared event loop
401
  loop = get_event_loop()
@@ -538,12 +552,11 @@ def process_audio(audio, session_id):
538
  pcm16 = (audio_float * 32767).astype(np.int16)
539
  b64_chunk = base64.b64encode(pcm16.tobytes()).decode("utf-8")
540
 
541
- # Non-blocking put to async queue (thread-safe)
542
  try:
543
- loop = get_event_loop()
544
- loop.call_soon_threadsafe(lambda: _safe_queue_put(session.audio_queue, b64_chunk))
545
  except Exception:
546
- pass # Skip if queue is full or loop issues
547
 
548
  return get_transcription_html(session.transcription_text, session.status_message, session.current_wpm), current_session_id
549
  except Exception as e:
@@ -552,12 +565,6 @@ def process_audio(audio, session_id):
552
  return get_transcription_html("", "error", ""), current_session_id
553
 
554
 
555
- def _safe_queue_put(q, item):
556
- """Safely put item in async queue without blocking."""
557
- try:
558
- q.put_nowait(item)
559
- except asyncio.QueueFull:
560
- pass # Drop frame if queue is full
561
 
562
 
563
  # Gradio interface
 
3
  import base64
4
  import json
5
  import os
6
+ import queue
7
  import threading
8
  import time
9
  import uuid
 
125
  """Per-user session state."""
126
  def __init__(self):
127
  self.session_id = str(uuid.uuid4())
128
+ # Use a thread-safe queue for cross-thread communication
129
+ self._audio_queue = queue.Queue(maxsize=200)
130
  self.transcription_text = ""
131
  self.is_running = False
132
  self.status_message = "ready"
 
139
 
140
  @property
141
  def audio_queue(self):
142
+ """Return the thread-safe queue."""
 
 
143
  return self._audio_queue
144
 
145
  def reset_queue(self):
146
  """Reset the audio queue."""
147
+ self._audio_queue = queue.Queue(maxsize=200)
148
 
149
 
150
  # Load CSS from external file
 
298
  async def websocket_handler(session):
299
  """Connect to WebSocket and handle audio streaming + transcription."""
300
  try:
301
+ # Add connection timeout to prevent hanging
302
+ async with asyncio.timeout(10): # 10 second connection timeout
303
+ ws = await websockets.connect(ws_url)
304
+
305
+ async with ws:
306
+ await asyncio.wait_for(ws.recv(), timeout=5)
307
  await ws.send(json.dumps({"type": "session.update", "model": model}))
308
 
309
  session.status_message = "warming"
 
329
  session.status_message = "timeout"
330
  break
331
 
332
+ # Use thread-safe queue with non-blocking get + async sleep
333
  try:
334
+ chunk = session.audio_queue.get_nowait()
335
  if session.is_running:
336
  await ws.send(
337
  json.dumps(
338
  {"type": "input_audio_buffer.append", "audio": chunk}
339
  )
340
  )
341
+ except queue.Empty:
342
+ # No audio available, yield control briefly
343
+ await asyncio.sleep(0.05)
344
  continue
345
  except Exception as e:
346
  if session.is_running: # Only log if unexpected
 
383
  pass # Normal cancellation
384
  except websockets.exceptions.ConnectionClosed:
385
  pass # Normal closure
386
+ except asyncio.TimeoutError:
387
+ print(f"WebSocket connection timeout for session {session.session_id[:8]}")
388
+ session.status_message = "error"
389
  except Exception as e:
390
  error_msg = str(e) if str(e) else type(e).__name__
391
  if "ConnectionReset" not in error_msg: # Suppress common disconnect errors
 
396
  # Remove from active sessions
397
  with _sessions_lock:
398
  _active_sessions.pop(session.session_id, None)
399
+ active_count = len(_active_sessions)
400
+ print(f"Session {session.session_id[:8]} ended. Active sessions: {active_count}")
401
 
402
 
403
  def start_websocket(session):
 
407
  # Register this session
408
  with _sessions_lock:
409
  _active_sessions[session.session_id] = session
410
+ active_count = len(_active_sessions)
411
+
412
+ print(f"Starting session {session.session_id[:8]}. Active sessions: {active_count}")
413
 
414
  # Submit to the shared event loop
415
  loop = get_event_loop()
 
552
  pcm16 = (audio_float * 32767).astype(np.int16)
553
  b64_chunk = base64.b64encode(pcm16.tobytes()).decode("utf-8")
554
 
555
+ # Put directly into thread-safe queue (no event loop needed)
556
  try:
557
+ session.audio_queue.put_nowait(b64_chunk)
 
558
  except Exception:
559
+ pass # Skip if queue is full
560
 
561
  return get_transcription_html(session.transcription_text, session.status_message, session.current_wpm), current_session_id
562
  except Exception as e:
 
565
  return get_transcription_html("", "error", ""), current_session_id
566
 
567
 
 
 
 
 
 
 
568
 
569
 
570
  # Gradio interface