Pulastya B commited on
Commit
76ac02c
·
1 Parent(s): b9c1a6b

FIX SSE TIMING: Subscribe BEFORE sending connection event to capture all events

Browse files
Files changed (1) hide show
  1. src/api/app.py +8 -1
src/api/app.py CHANGED
@@ -195,6 +195,10 @@ async def stream_progress(session_id: str):
195
  print(f"[SSE] ENDPOINT: Client connected for session_id={session_id}")
196
 
197
  async def event_generator():
 
 
 
 
198
  try:
199
  # Send initial connection event
200
  connection_event = {
@@ -210,8 +214,11 @@ async def stream_progress(session_id: str):
210
  for event in history[-10:]: # Send last 10 events
211
  yield f"data: {json.dumps(event)}\n\n"
212
 
 
 
213
  # Stream new events as they occur
214
- async for event in progress_manager.subscribe(session_id):
 
215
  print(f"[SSE] STREAMING event to client: {event.get('type')}")
216
  yield f"data: {json.dumps(event)}\n\n"
217
 
 
195
  print(f"[SSE] ENDPOINT: Client connected for session_id={session_id}")
196
 
197
  async def event_generator():
198
+ # CRITICAL: Subscribe FIRST to create queue before any events are emitted
199
+ subscription = progress_manager.subscribe(session_id)
200
+ queue_iterator = subscription.__aiter__()
201
+
202
  try:
203
  # Send initial connection event
204
  connection_event = {
 
214
  for event in history[-10:]: # Send last 10 events
215
  yield f"data: {json.dumps(event)}\n\n"
216
 
217
+ print(f"[SSE] Starting event stream loop for session {session_id}")
218
+
219
  # Stream new events as they occur
220
+ while True:
221
+ event = await queue_iterator.__anext__()
222
  print(f"[SSE] STREAMING event to client: {event.get('type')}")
223
  yield f"data: {json.dumps(event)}\n\n"
224