Pulastya B commited on
Commit
c14102f
Β·
1 Parent(s): 29c9177

Fix SSE double-emit: eliminate numeric session identity, use UUID only

Browse files
FRRONTEEEND/components/ChatInterface.tsx CHANGED
@@ -42,7 +42,7 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
42
  updatedAt: new Date(),
43
  }
44
  ]);
45
- const [activeSessionId, setActiveSessionId] = useState('1');
46
  const [input, setInput] = useState('');
47
  const [isTyping, setIsTyping] = useState(false);
48
  const [currentStep, setCurrentStep] = useState<string>('');
@@ -63,8 +63,8 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
63
 
64
  // Connect to SSE when workflow starts, disconnect when it completes
65
  useEffect(() => {
66
- if (!isTyping) {
67
- // Close SSE connection when workflow completes
68
  if (eventSourceRef.current) {
69
  console.log('πŸ”Œ Closing SSE connection');
70
  eventSourceRef.current.close();
@@ -74,10 +74,10 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
74
  return;
75
  }
76
 
77
- // Connect to SSE stream
78
  const API_URL = window.location.origin;
79
- const sessionKey = activeSessionId || 'default';
80
- const eventSource = new EventSource(`${API_URL}/api/progress/stream/${sessionKey}`);
81
 
82
  eventSource.onopen = () => {
83
  console.log('βœ… SSE connection established');
 
42
  updatedAt: new Date(),
43
  }
44
  ]);
45
+ const [activeSessionId, setActiveSessionId] = useState<string>(''); // Empty initially, set from backend UUID
46
  const [input, setInput] = useState('');
47
  const [isTyping, setIsTyping] = useState(false);
48
  const [currentStep, setCurrentStep] = useState<string>('');
 
63
 
64
  // Connect to SSE when workflow starts, disconnect when it completes
65
  useEffect(() => {
66
+ if (!isTyping || !activeSessionId) {
67
+ // Close SSE connection when workflow completes or no session ID yet
68
  if (eventSourceRef.current) {
69
  console.log('πŸ”Œ Closing SSE connection');
70
  eventSourceRef.current.close();
 
74
  return;
75
  }
76
 
77
+ // Connect to SSE stream with actual session UUID
78
  const API_URL = window.location.origin;
79
+ console.log(`πŸ”Œ Connecting SSE to session: ${activeSessionId}`);
80
+ const eventSource = new EventSource(`${API_URL}/api/progress/stream/${activeSessionId}`);
81
 
82
  eventSource.onopen = () => {
83
  console.log('βœ… SSE connection established');
src/api/app.py CHANGED
@@ -52,9 +52,6 @@ app.add_middleware(
52
  # Agent itself is stateless - no conversation memory between requests
53
  agent: Optional[DataScienceCopilot] = None
54
 
55
- # Global progress tracking with SSE support
56
- progress_store: Dict[str, List[Dict[str, Any]]] = {}
57
-
58
  # SSE event queues for real-time streaming
59
  class ProgressEventManager:
60
  """Manages SSE connections and progress events for real-time updates."""
@@ -321,37 +318,11 @@ async def run_analysis(
321
  logger.info(f"Follow-up request without file, using session memory")
322
  logger.info(f"Task: {task_description}")
323
 
324
- # Initialize progress tracking
325
- session_key = session_id or "default"
326
- progress_store[session_key] = []
327
-
328
- def progress_callback(tool_name: str, status: str):
329
- """Callback to track progress and send SSE events"""
330
- # Store in legacy progress store
331
- progress_store[session_key].append({
332
- "tool": tool_name,
333
- "status": status,
334
- "timestamp": time.time()
335
- })
336
-
337
- # Emit event to progress_manager (synchronous, works in any context)
338
- event_type = "tool_executing" if status == "running" else "tool_completed" if status == "completed" else "tool_failed"
339
- event_data = {
340
- "type": event_type,
341
- "tool": tool_name,
342
- "status": status,
343
- "message": f"πŸ”§ Executing: {tool_name.replace('_', ' ').title()}" if status == "running" else
344
- f"βœ“ Completed: {tool_name.replace('_', ' ').title()}" if status == "completed" else
345
- f"❌ Failed: {tool_name.replace('_', ' ').title()}"
346
- }
347
- progress_manager.emit(session_key, event_data)
348
-
349
- # Set progress callback on existing agent
350
- agent.progress_callback = progress_callback
351
-
352
  # Get the agent's actual session UUID for SSE routing
353
  actual_session_id = agent.session.session_id if hasattr(agent, 'session') and agent.session else "default"
354
- print(f"[SSE] Using agent session UUID: {actual_session_id}")
 
 
355
 
356
  try:
357
  # Agent's session memory should resolve file_path from context
@@ -365,8 +336,8 @@ async def run_analysis(
365
 
366
  logger.info(f"Follow-up analysis completed: {result.get('status')}")
367
 
368
- # Send completion event via SSE
369
- progress_manager.emit(session_key, {
370
  "type": "analysis_complete",
371
  "status": result.get("status"),
372
  "message": "βœ… Analysis completed successfully!"
@@ -440,37 +411,11 @@ async def run_analysis(
440
 
441
  logger.info(f"File saved successfully: {file.filename} ({os.path.getsize(temp_file_path)} bytes)")
442
 
443
- # Initialize progress tracking for this session
444
- session_key = session_id or "default"
445
- progress_store[session_key] = []
446
-
447
- def progress_callback(tool_name: str, status: str):
448
- """Callback to track progress and send SSE events"""
449
- # Store in legacy progress store
450
- progress_store[session_key].append({
451
- "tool": tool_name,
452
- "status": status,
453
- "timestamp": time.time()
454
- })
455
-
456
- # Emit event to progress_manager (synchronous, works in any context)
457
- event_type = "tool_executing" if status == "running" else "tool_completed" if status == "completed" else "tool_failed"
458
- event_data = {
459
- "type": event_type,
460
- "tool": tool_name,
461
- "status": status,
462
- "message": f"πŸ”§ Executing: {tool_name.replace('_', ' ').title()}" if status == "running" else
463
- f"βœ“ Completed: {tool_name.replace('_', ' ').title()}" if status == "completed" else
464
- f"❌ Failed: {tool_name.replace('_', ' ').title()}"
465
- }
466
- progress_manager.emit(session_key, event_data)
467
-
468
- # Set progress callback on existing agent
469
- agent.progress_callback = progress_callback
470
-
471
- # Get the agent's actual session UUID for SSE routing
472
  actual_session_id = agent.session.session_id if hasattr(agent, 'session') and agent.session else "default"
473
- print(f"[SSE] Using agent session UUID: {actual_session_id}")
 
 
474
 
475
  # Call existing agent logic
476
  logger.info(f"Starting analysis with task: {task_description}")
@@ -512,12 +457,11 @@ async def run_analysis(
512
 
513
  serializable_result = make_json_serializable(result)
514
 
515
- # Return result with progress tracking and ACTUAL session UUID for SSE
516
  return JSONResponse(
517
  content={
518
  "success": result.get("status") == "success",
519
  "result": serializable_result,
520
- "progress": progress_store.get(session_key, []),
521
  "session_id": actual_session_id, # Return UUID for SSE connection
522
  "metadata": {
523
  "filename": file.filename,
 
52
  # Agent itself is stateless - no conversation memory between requests
53
  agent: Optional[DataScienceCopilot] = None
54
 
 
 
 
55
  # SSE event queues for real-time streaming
56
  class ProgressEventManager:
57
  """Manages SSE connections and progress events for real-time updates."""
 
318
  logger.info(f"Follow-up request without file, using session memory")
319
  logger.info(f"Task: {task_description}")
320
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321
  # Get the agent's actual session UUID for SSE routing
322
  actual_session_id = agent.session.session_id if hasattr(agent, 'session') and agent.session else "default"
323
+ print(f"[SSE] Follow-up using agent session UUID: {actual_session_id}")
324
+
325
+ # NO progress_callback - orchestrator emits directly to UUID
326
 
327
  try:
328
  # Agent's session memory should resolve file_path from context
 
336
 
337
  logger.info(f"Follow-up analysis completed: {result.get('status')}")
338
 
339
+ # Send completion event via SSE using actual session UUID
340
+ progress_manager.emit(actual_session_id, {
341
  "type": "analysis_complete",
342
  "status": result.get("status"),
343
  "message": "βœ… Analysis completed successfully!"
 
411
 
412
  logger.info(f"File saved successfully: {file.filename} ({os.path.getsize(temp_file_path)} bytes)")
413
 
414
+ # Get the agent's actual session UUID for SSE routing (BEFORE analyze())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415
  actual_session_id = agent.session.session_id if hasattr(agent, 'session') and agent.session else "default"
416
+ print(f"[SSE] File upload using agent session UUID: {actual_session_id}")
417
+
418
+ # NO progress_callback - orchestrator emits directly to UUID
419
 
420
  # Call existing agent logic
421
  logger.info(f"Starting analysis with task: {task_description}")
 
457
 
458
  serializable_result = make_json_serializable(result)
459
 
460
+ # Return result with ACTUAL session UUID for SSE
461
  return JSONResponse(
462
  content={
463
  "success": result.get("status") == "success",
464
  "result": serializable_result,
 
465
  "session_id": actual_session_id, # Return UUID for SSE connection
466
  "metadata": {
467
  "filename": file.filename,