Pulastya B commited on
Commit
06f2512
·
1 Parent(s): 76ac02c

FIX SSE: Register queue BEFORE yielding connection event to capture all events

Browse files
Files changed (1) hide show
  1. src/api/app.py +14 -8
src/api/app.py CHANGED
@@ -194,11 +194,14 @@ async def stream_progress(session_id: str):
194
  """
195
  print(f"[SSE] ENDPOINT: Client connected for session_id={session_id}")
196
 
 
 
 
 
 
 
 
197
  async def event_generator():
198
- # CRITICAL: Subscribe FIRST to create queue before any events are emitted
199
- subscription = progress_manager.subscribe(session_id)
200
- queue_iterator = subscription.__aiter__()
201
-
202
  try:
203
  # Send initial connection event
204
  connection_event = {
@@ -211,15 +214,16 @@ async def stream_progress(session_id: str):
211
 
212
  # Send any existing history first (for reconnections)
213
  history = progress_manager.get_history(session_id)
 
214
  for event in history[-10:]: # Send last 10 events
215
  yield f"data: {json.dumps(event)}\n\n"
216
 
217
  print(f"[SSE] Starting event stream loop for session {session_id}")
218
 
219
- # Stream new events as they occur
220
  while True:
221
- event = await queue_iterator.__anext__()
222
- print(f"[SSE] STREAMING event to client: {event.get('type')}")
223
  yield f"data: {json.dumps(event)}\n\n"
224
 
225
  # Check if analysis is complete
@@ -228,10 +232,12 @@ async def stream_progress(session_id: str):
228
 
229
  except asyncio.CancelledError:
230
  logger.info(f"SSE stream cancelled for session {session_id}")
231
- progress_manager.clear(session_id)
232
  except Exception as e:
233
  logger.error(f"SSE error for session {session_id}: {e}")
234
  finally:
 
 
 
235
  logger.info(f"SSE stream closed for session {session_id}")
236
 
237
  return StreamingResponse(
 
194
  """
195
  print(f"[SSE] ENDPOINT: Client connected for session_id={session_id}")
196
 
197
+ # CRITICAL: Create queue and register subscriber IMMEDIATELY
198
+ queue = asyncio.Queue(maxsize=100)
199
+ if session_id not in progress_manager._queues:
200
+ progress_manager._queues[session_id] = []
201
+ progress_manager._queues[session_id].append(queue)
202
+ print(f"[SSE] Queue registered, total subscribers: {len(progress_manager._queues[session_id])}")
203
+
204
  async def event_generator():
 
 
 
 
205
  try:
206
  # Send initial connection event
207
  connection_event = {
 
214
 
215
  # Send any existing history first (for reconnections)
216
  history = progress_manager.get_history(session_id)
217
+ print(f"[SSE] Sending {len(history[-10:])} history events")
218
  for event in history[-10:]: # Send last 10 events
219
  yield f"data: {json.dumps(event)}\n\n"
220
 
221
  print(f"[SSE] Starting event stream loop for session {session_id}")
222
 
223
+ # Stream new events from the queue
224
  while True:
225
+ event = await queue.get()
226
+ print(f"[SSE] GOT event from queue: {event.get('type')}")
227
  yield f"data: {json.dumps(event)}\n\n"
228
 
229
  # Check if analysis is complete
 
232
 
233
  except asyncio.CancelledError:
234
  logger.info(f"SSE stream cancelled for session {session_id}")
 
235
  except Exception as e:
236
  logger.error(f"SSE error for session {session_id}: {e}")
237
  finally:
238
+ # Cleanup queue
239
+ if session_id in progress_manager._queues and queue in progress_manager._queues[session_id]:
240
+ progress_manager._queues[session_id].remove(queue)
241
  logger.info(f"SSE stream closed for session {session_id}")
242
 
243
  return StreamingResponse(