Pulastya B commited on
Commit
109a48b
·
1 Parent(s): c84c9dd

CRITICAL FIX: Implement real SSE event streaming with ProgressManager

Browse files

- Created src/progress_manager.py with global event broadcaster
- Updated progress_callback to emit events via progress_manager
- Fixed SSE endpoint to stream events from progress_manager.subscribe()
- Updated frontend to handle onmessage events with proper debugging
- Added comprehensive logging: connection, received events, cleanup
- Events now properly flow: backend print -> emit -> SSE stream -> frontend

FRRONTEEEND/components/ChatInterface.tsx CHANGED
@@ -66,6 +66,7 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
66
  if (!isTyping) {
67
  // Close SSE connection when workflow completes
68
  if (eventSourceRef.current) {
 
69
  eventSourceRef.current.close();
70
  eventSourceRef.current = null;
71
  }
@@ -82,50 +83,36 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
82
  console.log('✅ SSE connection established');
83
  };
84
 
85
- // Handle connection event
86
- eventSource.addEventListener('connected', (e) => {
87
- console.log('Connected to progress stream:', e.data);
88
- });
89
-
90
- // Handle tool start events
91
- eventSource.addEventListener('tool_start', (e) => {
92
- try {
93
- const data = JSON.parse(e.data);
94
- setCurrentStep(data.message || `🔧 Executing: ${data.tool}`);
95
- } catch (err) {
96
- console.error('Error parsing tool_start event:', err);
97
- }
98
- });
99
-
100
- // Handle tool complete events
101
- eventSource.addEventListener('tool_complete', (e) => {
102
  try {
103
  const data = JSON.parse(e.data);
104
- setCurrentStep(data.message || `✓ Completed: ${data.tool}`);
105
- } catch (err) {
106
- console.error('Error parsing tool_complete event:', err);
107
- }
108
- });
109
-
110
- // Handle tool error events
111
- eventSource.addEventListener('tool_error', (e) => {
112
- try {
113
- const data = JSON.parse(e.data);
114
- setCurrentStep(data.message || `❌ Failed: ${data.tool}`);
 
 
 
 
 
 
115
  } catch (err) {
116
- console.error('Error parsing tool_error event:', err);
117
  }
118
- });
119
-
120
- // Handle analysis completion
121
- eventSource.addEventListener('analysis_complete', (e) => {
122
- console.log('✅ Analysis completed');
123
- setIsTyping(false); // This will trigger cleanup
124
- });
125
 
126
  // Handle errors
127
  eventSource.onerror = (err) => {
128
- console.error('SSE error:', err);
129
  eventSource.close();
130
  eventSourceRef.current = null;
131
  };
@@ -135,6 +122,7 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
135
  // Cleanup on unmount or when isTyping changes to false
136
  return () => {
137
  if (eventSourceRef.current) {
 
138
  eventSourceRef.current.close();
139
  eventSourceRef.current = null;
140
  }
 
66
  if (!isTyping) {
67
  // Close SSE connection when workflow completes
68
  if (eventSourceRef.current) {
69
+ console.log('🔌 Closing SSE connection');
70
  eventSourceRef.current.close();
71
  eventSourceRef.current = null;
72
  }
 
83
  console.log('✅ SSE connection established');
84
  };
85
 
86
+ // Handle all incoming messages
87
+ eventSource.onmessage = (e) => {
88
+ console.log('📨 SSE received:', e.data);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  try {
90
  const data = JSON.parse(e.data);
91
+
92
+ // Handle different event types
93
+ if (data.type === 'connected') {
94
+ console.log('🔗 Connected to progress stream');
95
+ } else if (data.type === 'tool_executing') {
96
+ setCurrentStep(data.message || `🔧 Executing: ${data.tool}`);
97
+ } else if (data.type === 'tool_completed') {
98
+ setCurrentStep(data.message || `✓ Completed: ${data.tool}`);
99
+ } else if (data.type === 'tool_failed') {
100
+ setCurrentStep(data.message || `❌ Failed: ${data.tool}`);
101
+ } else if (data.type === 'token_update') {
102
+ // Optional: Display token budget updates
103
+ console.log('💰 Token update:', data.message);
104
+ } else if (data.type === 'analysis_complete') {
105
+ console.log('✅ Analysis completed');
106
+ setIsTyping(false); // This will trigger cleanup
107
+ }
108
  } catch (err) {
109
+ console.error('Error parsing SSE event:', err, e.data);
110
  }
111
+ };
 
 
 
 
 
 
112
 
113
  // Handle errors
114
  eventSource.onerror = (err) => {
115
+ console.error('SSE error:', err);
116
  eventSource.close();
117
  eventSourceRef.current = null;
118
  };
 
122
  // Cleanup on unmount or when isTyping changes to false
123
  return () => {
124
  if (eventSourceRef.current) {
125
+ console.log('🧹 Cleaning up SSE connection');
126
  eventSourceRef.current.close();
127
  eventSourceRef.current = null;
128
  }
src/api/app.py CHANGED
@@ -26,6 +26,7 @@ import json
26
 
27
  # Import from parent package
28
  from src.orchestrator import DataScienceCopilot
 
29
 
30
  # Configure logging
31
  logging.basicConfig(level=logging.INFO)
@@ -181,51 +182,46 @@ async def get_progress(session_id: str):
181
  async def stream_progress(session_id: str):
182
  """Stream real-time progress updates using Server-Sent Events (SSE).
183
 
184
- This replaces the polling mechanism with a persistent connection that
185
- receives events as they happen during workflow execution.
186
 
187
  Events:
188
- - tool_start: When a tool begins execution
189
- - tool_complete: When a tool finishes successfully
190
- - tool_error: When a tool fails
 
191
  - analysis_complete: When the entire workflow finishes
192
- - status_update: General status messages
193
  """
194
  async def event_generator():
195
- queue = event_manager.create_stream(session_id)
196
-
197
  try:
198
  # Send initial connection event
199
- yield f"event: connected\ndata: {{\"session_id\": \"{session_id}\"}}\n\n"
 
 
 
 
 
200
 
201
- # Send current status if exists
202
- current = event_manager.get_current_status(session_id)
203
- if current:
204
- yield f"event: {current['type']}\ndata: {json.dumps(current['data'])}\n\n"
205
 
206
- # Stream events as they arrive
207
- while True:
208
- try:
209
- # Wait for next event with timeout
210
- event_type, data = await asyncio.wait_for(queue.get(), timeout=30.0)
211
-
212
- # Check for completion signal
213
- if event_type == "complete":
214
- yield f"event: analysis_complete\ndata: {{\"status\": \"completed\"}}\n\n"
215
- break
216
-
217
- # Send the event
218
- yield f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
219
-
220
- except asyncio.TimeoutError:
221
- # Send keepalive ping
222
- yield f": keepalive\n\n"
223
- continue
224
 
225
  except asyncio.CancelledError:
226
  logger.info(f"SSE stream cancelled for session {session_id}")
 
 
 
227
  finally:
228
- event_manager.remove_stream(session_id, queue)
229
 
230
  return StreamingResponse(
231
  event_generator(),
@@ -316,23 +312,17 @@ async def run_analysis(
316
  "timestamp": time.time()
317
  })
318
 
319
- # Send SSE event asynchronously
320
- event_type = "tool_start" if status == "running" else "tool_complete" if status == "completed" else "tool_error"
321
  event_data = {
 
322
  "tool": tool_name,
323
  "status": status,
324
  "message": f"🔧 Executing: {tool_name.replace('_', ' ').title()}" if status == "running" else
325
  f"✓ Completed: {tool_name.replace('_', ' ').title()}" if status == "completed" else
326
- f"❌ Failed: {tool_name.replace('_', ' ').title()}",
327
- "timestamp": time.time()
328
  }
329
-
330
- # Schedule the async event send
331
- try:
332
- asyncio.create_task(event_manager.send_event(session_key, event_type, event_data))
333
- except RuntimeError:
334
- # If no event loop, we're in sync context - that's ok, legacy polling still works
335
- pass
336
 
337
  # Set progress callback on existing agent
338
  agent.progress_callback = progress_callback
@@ -350,18 +340,11 @@ async def run_analysis(
350
  logger.info(f"Follow-up analysis completed: {result.get('status')}")
351
 
352
  # Send completion event via SSE
353
- try:
354
- asyncio.create_task(event_manager.send_event(
355
- session_key,
356
- "analysis_complete",
357
- {
358
- "status": result.get("status"),
359
- "message": "✅ Analysis completed successfully!",
360
- "timestamp": time.time()
361
- }
362
- ))
363
- except RuntimeError:
364
- pass
365
 
366
  # Make result JSON serializable
367
  def make_json_serializable(obj):
@@ -444,23 +427,17 @@ async def run_analysis(
444
  "timestamp": time.time()
445
  })
446
 
447
- # Send SSE event asynchronously
448
- event_type = "tool_start" if status == "running" else "tool_complete" if status == "completed" else "tool_error"
449
  event_data = {
 
450
  "tool": tool_name,
451
  "status": status,
452
  "message": f"🔧 Executing: {tool_name.replace('_', ' ').title()}" if status == "running" else
453
  f"✓ Completed: {tool_name.replace('_', ' ').title()}" if status == "completed" else
454
- f"❌ Failed: {tool_name.replace('_', ' ').title()}",
455
- "timestamp": time.time()
456
  }
457
-
458
- # Schedule the async event send
459
- try:
460
- asyncio.create_task(event_manager.send_event(session_key, event_type, event_data))
461
- except RuntimeError:
462
- # If no event loop, we're in sync context - that's ok, legacy polling still works
463
- pass
464
 
465
  # Set progress callback on existing agent
466
  agent.progress_callback = progress_callback
@@ -478,18 +455,11 @@ async def run_analysis(
478
  logger.info(f"Analysis completed: {result.get('status')}")
479
 
480
  # Send completion event via SSE
481
- try:
482
- asyncio.create_task(event_manager.send_event(
483
- session_key,
484
- "analysis_complete",
485
- {
486
- "status": result.get("status"),
487
- "message": "✅ Analysis completed successfully!",
488
- "timestamp": time.time()
489
- }
490
- ))
491
- except RuntimeError:
492
- pass
493
 
494
  # Filter out non-JSON-serializable objects (like matplotlib/plotly Figures)
495
  def make_json_serializable(obj):
 
26
 
27
  # Import from parent package
28
  from src.orchestrator import DataScienceCopilot
29
+ from src.progress_manager import progress_manager
30
 
31
  # Configure logging
32
  logging.basicConfig(level=logging.INFO)
 
182
  async def stream_progress(session_id: str):
183
  """Stream real-time progress updates using Server-Sent Events (SSE).
184
 
185
+ This endpoint connects clients to the global progress_manager which
186
+ receives events from the orchestrator as tools execute.
187
 
188
  Events:
189
+ - tool_executing: When a tool begins execution
190
+ - tool_completed: When a tool finishes successfully
191
+ - tool_failed: When a tool fails
192
+ - token_update: Token budget updates
193
  - analysis_complete: When the entire workflow finishes
 
194
  """
195
  async def event_generator():
 
 
196
  try:
197
  # Send initial connection event
198
+ connection_event = {
199
+ 'type': 'connected',
200
+ 'message': '🔗 Connected to progress stream',
201
+ 'session_id': session_id
202
+ }
203
+ yield f"data: {json.dumps(connection_event)}\n\n"
204
 
205
+ # Send any existing history first (for reconnections)
206
+ history = progress_manager.get_history(session_id)
207
+ for event in history[-10:]: # Send last 10 events
208
+ yield f"data: {json.dumps(event)}\n\n"
209
 
210
+ # Stream new events as they occur
211
+ async for event in progress_manager.subscribe(session_id):
212
+ yield f"data: {json.dumps(event)}\n\n"
213
+
214
+ # Check if analysis is complete
215
+ if event.get('type') == 'analysis_complete':
216
+ break
 
 
 
 
 
 
 
 
 
 
 
217
 
218
  except asyncio.CancelledError:
219
  logger.info(f"SSE stream cancelled for session {session_id}")
220
+ progress_manager.clear(session_id)
221
+ except Exception as e:
222
+ logger.error(f"SSE error for session {session_id}: {e}")
223
  finally:
224
+ logger.info(f"SSE stream closed for session {session_id}")
225
 
226
  return StreamingResponse(
227
  event_generator(),
 
312
  "timestamp": time.time()
313
  })
314
 
315
+ # Emit event to progress_manager (synchronous, works in any context)
316
+ event_type = "tool_executing" if status == "running" else "tool_completed" if status == "completed" else "tool_failed"
317
  event_data = {
318
+ "type": event_type,
319
  "tool": tool_name,
320
  "status": status,
321
  "message": f"🔧 Executing: {tool_name.replace('_', ' ').title()}" if status == "running" else
322
  f"✓ Completed: {tool_name.replace('_', ' ').title()}" if status == "completed" else
323
+ f"❌ Failed: {tool_name.replace('_', ' ').title()}"
 
324
  }
325
+ progress_manager.emit(session_key, event_data)
 
 
 
 
 
 
326
 
327
  # Set progress callback on existing agent
328
  agent.progress_callback = progress_callback
 
340
  logger.info(f"Follow-up analysis completed: {result.get('status')}")
341
 
342
  # Send completion event via SSE
343
+ progress_manager.emit(session_key, {
344
+ "type": "analysis_complete",
345
+ "status": result.get("status"),
346
+ "message": "✅ Analysis completed successfully!"
347
+ })
 
 
 
 
 
 
 
348
 
349
  # Make result JSON serializable
350
  def make_json_serializable(obj):
 
427
  "timestamp": time.time()
428
  })
429
 
430
+ # Emit event to progress_manager (synchronous, works in any context)
431
+ event_type = "tool_executing" if status == "running" else "tool_completed" if status == "completed" else "tool_failed"
432
  event_data = {
433
+ "type": event_type,
434
  "tool": tool_name,
435
  "status": status,
436
  "message": f"🔧 Executing: {tool_name.replace('_', ' ').title()}" if status == "running" else
437
  f"✓ Completed: {tool_name.replace('_', ' ').title()}" if status == "completed" else
438
+ f"❌ Failed: {tool_name.replace('_', ' ').title()}"
 
439
  }
440
+ progress_manager.emit(session_key, event_data)
 
 
 
 
 
 
441
 
442
  # Set progress callback on existing agent
443
  agent.progress_callback = progress_callback
 
455
  logger.info(f"Analysis completed: {result.get('status')}")
456
 
457
  # Send completion event via SSE
458
+ progress_manager.emit(session_key, {
459
+ "type": "analysis_complete",
460
+ "status": result.get("status"),
461
+ "message": "✅ Analysis completed successfully!"
462
+ })
 
 
 
 
 
 
 
463
 
464
  # Filter out non-JSON-serializable objects (like matplotlib/plotly Figures)
465
  def make_json_serializable(obj):
src/progress_manager.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Global Progress Event Manager for Real-Time SSE Streaming
3
+
4
+ This module provides a singleton ProgressManager that captures all workflow progress
5
+ events and broadcasts them to connected SSE clients in real-time.
6
+ """
7
+
8
+ import asyncio
9
+ import json
10
+ from typing import Dict, List, Any, Optional
11
+ from datetime import datetime
12
+ from collections import defaultdict
13
+
14
+
15
+ class ProgressManager:
16
+ """
17
+ Manages progress events for active analysis sessions.
18
+
19
+ Features:
20
+ - Emit events to multiple subscribers simultaneously
21
+ - Store event history for late-joining clients
22
+ - Automatic cleanup of dead connections
23
+ - Thread-safe event broadcasting
24
+ """
25
+
26
+ def __init__(self):
27
+ self._queues: Dict[str, List[asyncio.Queue]] = defaultdict(list)
28
+ self._history: Dict[str, List[Dict]] = defaultdict(list)
29
+ self._lock = asyncio.Lock()
30
+
31
+ def emit(self, session_id: str, event: Dict[str, Any]):
32
+ """
33
+ Emit a progress event to all subscribers.
34
+
35
+ Args:
36
+ session_id: Session identifier
37
+ event: Event data (must include 'type' and 'message')
38
+ """
39
+ # Add timestamp
40
+ event['timestamp'] = datetime.now().isoformat()
41
+
42
+ # Store in history
43
+ self._history[session_id].append(event)
44
+
45
+ # Limit history size to prevent memory leaks
46
+ if len(self._history[session_id]) > 500:
47
+ self._history[session_id] = self._history[session_id][-500:]
48
+
49
+ # Send to all active subscribers
50
+ if session_id in self._queues:
51
+ dead_queues = []
52
+ for queue in self._queues[session_id]:
53
+ try:
54
+ queue.put_nowait(event)
55
+ except asyncio.QueueFull:
56
+ # Queue is full, mark for removal
57
+ dead_queues.append(queue)
58
+ except Exception:
59
+ dead_queues.append(queue)
60
+
61
+ # Remove dead queues
62
+ for dead_queue in dead_queues:
63
+ if dead_queue in self._queues[session_id]:
64
+ self._queues[session_id].remove(dead_queue)
65
+
66
+ async def subscribe(self, session_id: str):
67
+ """
68
+ Subscribe to progress events for a session.
69
+
70
+ Args:
71
+ session_id: Session identifier
72
+
73
+ Yields:
74
+ Progress events as they occur
75
+ """
76
+ queue = asyncio.Queue(maxsize=100)
77
+ self._queues[session_id].append(queue)
78
+
79
+ try:
80
+ while True:
81
+ event = await queue.get()
82
+ yield event
83
+ except asyncio.CancelledError:
84
+ # Client disconnected
85
+ pass
86
+ finally:
87
+ # Cleanup
88
+ if session_id in self._queues and queue in self._queues[session_id]:
89
+ self._queues[session_id].remove(queue)
90
+
91
+ def get_history(self, session_id: str) -> List[Dict]:
92
+ """
93
+ Get all past events for a session.
94
+
95
+ Args:
96
+ session_id: Session identifier
97
+
98
+ Returns:
99
+ List of past events
100
+ """
101
+ return self._history.get(session_id, [])
102
+
103
+ def clear(self, session_id: str):
104
+ """
105
+ Clear history and disconnect all subscribers for a session.
106
+
107
+ Args:
108
+ session_id: Session identifier
109
+ """
110
+ if session_id in self._history:
111
+ del self._history[session_id]
112
+ if session_id in self._queues:
113
+ # Close all queues
114
+ for queue in self._queues[session_id]:
115
+ try:
116
+ queue.put_nowait({'type': 'session_cleared', 'message': 'Session ended'})
117
+ except:
118
+ pass
119
+ del self._queues[session_id]
120
+
121
+ def get_active_sessions(self) -> List[str]:
122
+ """Get list of sessions with active subscribers."""
123
+ return [sid for sid, queues in self._queues.items() if len(queues) > 0]
124
+
125
+ def get_subscriber_count(self, session_id: str) -> int:
126
+ """Get number of active subscribers for a session."""
127
+ return len(self._queues.get(session_id, []))
128
+
129
+
130
+ # Global singleton instance
131
+ progress_manager = ProgressManager()