Pulastya B commited on
Commit
132976b
·
1 Parent(s): 5af048e

FIX SSE BLOCKING: Use get_nowait() polling instead of blocking get() to avoid event loop deadlock

Browse files
Files changed (1) hide show
  1. src/api/app.py +14 -9
src/api/app.py CHANGED
@@ -220,16 +220,21 @@ async def stream_progress(session_id: str):
220
 
221
  print(f"[SSE] Starting event stream loop for session {session_id}")
222
 
223
- # Stream new events from the queue
224
  while True:
225
- print(f"[SSE] ABOUT TO CALL queue.get() - queue size: {queue.qsize()}")
226
- event = await queue.get()
227
- print(f"[SSE] GOT event from queue: {event.get('type')}")
228
- yield f"data: {json.dumps(event)}\n\n"
229
-
230
- # Check if analysis is complete
231
- if event.get('type') == 'analysis_complete':
232
- break
 
 
 
 
 
233
 
234
  except asyncio.CancelledError:
235
  logger.info(f"SSE stream cancelled for session {session_id}")
 
220
 
221
  print(f"[SSE] Starting event stream loop for session {session_id}")
222
 
223
+ # Stream new events from the queue (poll with get_nowait to avoid blocking issues)
224
  while True:
225
+ try:
226
+ # Try to get event without blocking
227
+ event = queue.get_nowait()
228
+ print(f"[SSE] GOT event from queue: {event.get('type')}")
229
+ yield f"data: {json.dumps(event)}\n\n"
230
+
231
+ # Check if analysis is complete
232
+ if event.get('type') == 'analysis_complete':
233
+ break
234
+ except asyncio.QueueEmpty:
235
+ # No events available, send keepalive and wait
236
+ yield f": keepalive\n\n"
237
+ await asyncio.sleep(0.1) # Poll every 100ms
238
 
239
  except asyncio.CancelledError:
240
  logger.info(f"SSE stream cancelled for session {session_id}")