Pulastya B commited on
Commit
8a420cb
·
1 Parent(s): ab2c6db

Fixed some minor issues where if the csv was empty then the website crashed

Browse files
Files changed (1) hide show
  1. src/api/app.py +83 -63
src/api/app.py CHANGED
@@ -361,18 +361,7 @@ async def get_progress(session_id: str):
361
 
362
  @app.get("/api/progress/stream/{session_id}")
363
  async def stream_progress(session_id: str):
364
- """Stream real-time progress updates using Server-Sent Events (SSE).
365
-
366
- This endpoint connects clients to the global progress_manager which
367
- receives events from the orchestrator as tools execute.
368
-
369
- Events:
370
- - tool_executing: When a tool begins execution
371
- - tool_completed: When a tool finishes successfully
372
- - tool_failed: When a tool fails
373
- - token_update: Token budget updates
374
- - analysis_complete: When the entire workflow finishes
375
- """
376
  print(f"[SSE] ENDPOINT: Client connected for session_id={session_id}")
377
 
378
  # CRITICAL: Create queue and register subscriber IMMEDIATELY
@@ -384,71 +373,102 @@ async def stream_progress(session_id: str):
384
 
385
  async def event_generator():
386
  try:
387
- # Send initial connection event
388
- connection_event = {
389
- 'type': 'connected',
390
- 'message': '🔗 Connected to progress stream',
391
- 'session_id': session_id
392
- }
393
- print(f"[SSE] SENDING connection event to client")
394
- yield f"data: {safe_json_dumps(connection_event)}\n\n"
 
 
 
 
 
 
 
395
 
396
- # 🔥 FIX: Replay any events that were emitted BEFORE this subscriber connected
397
- # This handles the race condition where background analysis starts emitting events
398
- # before the frontend's SSE reconnection completes
399
- history = progress_manager.get_history(session_id)
400
- if history:
401
- print(f"[SSE] Replaying {len(history)} missed events for late-joining subscriber")
402
- for past_event in history:
403
- # Don't replay if it's already a terminal event
404
- if past_event.get('type') != 'analysis_complete':
405
- yield f"data: {safe_json_dumps(past_event)}\n\n"
406
- else:
407
- # If analysis already completed before we connected, send it and close
408
- # Set a very long retry interval to prevent EventSource auto-reconnect
409
- yield f"retry: 86400000\n\n"
410
- yield f"data: {safe_json_dumps(past_event)}\n\n"
411
- print(f"[SSE] Analysis already completed before subscriber connected - closing")
412
- # Brief delay to ensure the client receives and processes the event
413
- await asyncio.sleep(2)
414
- return
415
- else:
416
- print(f"[SSE] No history to replay (fresh session)")
417
 
 
418
  print(f"[SSE] Starting event stream loop for session {session_id}")
 
419
 
420
- # Stream new events from the queue (poll with get_nowait to avoid blocking issues)
421
  while True:
422
- if not queue.empty():
423
- event = queue.get_nowait()
424
- print(f"[SSE] GOT event from queue: {event.get('type')}")
425
- yield f"data: {safe_json_dumps(event)}\n\n"
426
-
427
- # Check if analysis is complete
428
- if event.get('type') == 'analysis_complete':
429
- break
430
- else:
431
- # No events available, send keepalive and wait
432
- yield f": keepalive\n\n"
433
- await asyncio.sleep(0.5) # Poll every 500ms
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
434
 
435
- except asyncio.CancelledError:
436
- logger.info(f"SSE stream cancelled for session {session_id}")
437
  except Exception as e:
438
- logger.error(f"SSE error for session {session_id}: {e}")
 
439
  finally:
440
- # Cleanup queue
441
- if session_id in progress_manager._queues and queue in progress_manager._queues[session_id]:
442
- progress_manager._queues[session_id].remove(queue)
443
- logger.info(f"SSE stream closed for session {session_id}")
 
 
 
444
 
445
  return StreamingResponse(
446
  event_generator(),
447
  media_type="text/event-stream",
448
  headers={
449
- "Cache-Control": "no-cache",
450
  "Connection": "keep-alive",
451
- "X-Accel-Buffering": "no" # Disable nginx buffering
 
 
 
452
  }
453
  )
454
 
 
361
 
362
  @app.get("/api/progress/stream/{session_id}")
363
  async def stream_progress(session_id: str):
364
+ """Stream real-time progress updates using Server-Sent Events (SSE)."""
 
 
 
 
 
 
 
 
 
 
 
365
  print(f"[SSE] ENDPOINT: Client connected for session_id={session_id}")
366
 
367
  # CRITICAL: Create queue and register subscriber IMMEDIATELY
 
373
 
374
  async def event_generator():
375
  try:
376
+ # 1. Send initial connection confirmation immediately
377
+ try:
378
+ connection_event = {
379
+ 'type': 'connected',
380
+ 'message': '🔗 Connected to progress stream',
381
+ 'session_id': session_id
382
+ }
383
+ data_str = safe_json_dumps(connection_event)
384
+ print(f"[SSE] SENDING connection event to client: {data_str[:100]}")
385
+ yield f"data: {data_str}\n\n"
386
+ except Exception as e:
387
+ print(f"[SSE] ERROR sending connection event: {e}")
388
+ logger.error(f"SSE connection event error: {e}", exc_info=True)
389
+ yield f"data: {json.dumps({'type': 'error', 'message': 'Connection failed'})}\n\n"
390
+ return
391
 
392
+ # 2. Replay history
393
+ try:
394
+ history = progress_manager.get_history(session_id)
395
+ if history:
396
+ print(f"[SSE] Found {len(history)} events in history")
397
+ for past_event in history:
398
+ if past_event.get('type') != 'analysis_complete':
399
+ data_str = safe_json_dumps(past_event)
400
+ yield f"data: {data_str}\n\n"
401
+ else:
402
+ # Terminal event - send it with long retry, then close
403
+ data_str = safe_json_dumps(past_event)
404
+ yield f"retry: 86400000\ndata: {data_str}\n\n"
405
+ print(f"[SSE] Analysis already complete, closing stream")
406
+ await asyncio.sleep(1)
407
+ return
408
+ else:
409
+ print(f"[SSE] No history to replay (fresh session)")
410
+ except Exception as e:
411
+ print(f"[SSE] ERROR replaying history: {e}")
412
+ logger.error(f"SSE history replay error: {e}", exc_info=True)
413
 
414
+ # 3. Stream new events
415
  print(f"[SSE] Starting event stream loop for session {session_id}")
416
+ consecutive_empty_cycles = 0
417
 
 
418
  while True:
419
+ try:
420
+ if not queue.empty():
421
+ consecutive_empty_cycles = 0
422
+ event = queue.get_nowait()
423
+ try:
424
+ data_str = safe_json_dumps(event)
425
+ print(f"[SSE] Sending {event.get('type')}: {data_str[:100]}")
426
+ yield f"data: {data_str}\n\n"
427
+
428
+ if event.get('type') == 'analysis_complete':
429
+ print(f"[SSE] Analysis complete, closing stream")
430
+ await asyncio.sleep(1)
431
+ return
432
+ except Exception as e:
433
+ print(f"[SSE] ERROR serializing event: {e}")
434
+ logger.error(f"SSE event serialization error: {e}", exc_info=True)
435
+ else:
436
+ # Keep-alive ping every 500ms
437
+ consecutive_empty_cycles += 1
438
+ if consecutive_empty_cycles % 2 == 0: # Every 1 second
439
+ yield ": keep-alive\n\n"
440
+ await asyncio.sleep(0.5)
441
+ except asyncio.CancelledError:
442
+ print(f"[SSE] Stream cancelled for session {session_id}")
443
+ break
444
+ except Exception as e:
445
+ print(f"[SSE] ERROR in event loop: {e}")
446
+ logger.error(f"SSE event loop error: {e}", exc_info=True)
447
+ yield f"data: {json.dumps({'type': 'error', 'message': 'Stream error'})}\n\n"
448
+ break
449
 
 
 
450
  except Exception as e:
451
+ print(f"[SSE] CRITICAL ERROR in event_generator: {e}")
452
+ logger.error(f"SSE generator error: {e}", exc_info=True)
453
  finally:
454
+ # Cleanup
455
+ try:
456
+ if session_id in progress_manager._queues and queue in progress_manager._queues[session_id]:
457
+ progress_manager._queues[session_id].remove(queue)
458
+ print(f"[SSE] Stream closed for session {session_id}, remaining subscribers: {len(progress_manager._queues.get(session_id, []))}")
459
+ except Exception as e:
460
+ print(f"[SSE] ERROR in cleanup: {e}")
461
 
462
  return StreamingResponse(
463
  event_generator(),
464
  media_type="text/event-stream",
465
  headers={
466
+ "Cache-Control": "no-cache, no-store, must-revalidate",
467
  "Connection": "keep-alive",
468
+ "X-Accel-Buffering": "no",
469
+ "Access-Control-Allow-Origin": "*",
470
+ "Access-Control-Allow-Methods": "GET",
471
+ "Access-Control-Max-Age": "3600"
472
  }
473
  )
474