Pulastya B commited on
Commit
3a3eba7
Β·
1 Parent(s): 6815ac4

CRITICAL FIX: Sync session IDs between HTTP and orchestrator for SSE

Browse files

Root cause: Session ID mismatch!
- Frontend connects to SSE with session_id='1' (from HTTP request)
- Orchestrator emits events to session.session_id='uuid-...' (internal UUID)
- Events went to wrong stream, frontend received nothing

Solution:
- Added http_session_key property to orchestrator
- app.py sets agent.http_session_key = session_key before analyze()
- All emit() calls now use: getattr(self, 'http_session_key', None) or self.session.session_id
- Events now emit to correct stream that frontend is listening to

This ensures orchestrator emits to the SAME session_id that frontend connects to

Files changed (2) hide show
  1. src/api/app.py +6 -0
  2. src/orchestrator.py +21 -6
src/api/app.py CHANGED
@@ -327,6 +327,9 @@ async def run_analysis(
327
  # Set progress callback on existing agent
328
  agent.progress_callback = progress_callback
329
 
 
 
 
330
  try:
331
  # Agent's session memory should resolve file_path from context
332
  result = agent.analyze(
@@ -442,6 +445,9 @@ async def run_analysis(
442
  # Set progress callback on existing agent
443
  agent.progress_callback = progress_callback
444
 
 
 
 
445
  # Call existing agent logic
446
  logger.info(f"Starting analysis with task: {task_description}")
447
  result = agent.analyze(
 
327
  # Set progress callback on existing agent
328
  agent.progress_callback = progress_callback
329
 
330
+ # Set HTTP session key for SSE streaming (so orchestrator emits to correct stream)
331
+ agent.http_session_key = session_key
332
+
333
  try:
334
  # Agent's session memory should resolve file_path from context
335
  result = agent.analyze(
 
445
  # Set progress callback on existing agent
446
  agent.progress_callback = progress_callback
447
 
448
+ # Set HTTP session key for SSE streaming (so orchestrator emits to correct stream)
449
+ agent.http_session_key = session_key
450
+
451
  # Call existing agent logic
452
  logger.info(f"Starting analysis with task: {task_description}")
453
  result = agent.analyze(
src/orchestrator.py CHANGED
@@ -165,6 +165,9 @@ class DataScienceCopilot:
165
  # Store progress callback
166
  self.progress_callback = progress_callback
167
 
 
 
 
168
  # Determine provider
169
  self.provider = provider or os.getenv("LLM_PROVIDER", "mistral").lower()
170
 
@@ -2238,7 +2241,9 @@ You are a DOER. Complete workflows based on user intent."""
2238
 
2239
  # Emit token update for SSE streaming
2240
  if hasattr(self, 'session') and self.session:
2241
- progress_manager.emit(self.session.session_id, {
 
 
2242
  'type': 'token_update',
2243
  'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2244
  'tokens_used': tokens_used,
@@ -2278,7 +2283,9 @@ You are a DOER. Complete workflows based on user intent."""
2278
 
2279
  # Emit token update for SSE streaming
2280
  if hasattr(self, 'session') and self.session:
2281
- progress_manager.emit(self.session.session_id, {
 
 
2282
  'type': 'token_update',
2283
  'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2284
  'tokens_used': tokens_used,
@@ -2342,7 +2349,9 @@ You are a DOER. Complete workflows based on user intent."""
2342
 
2343
  # Emit token update for SSE streaming
2344
  if hasattr(self, 'session') and self.session:
2345
- progress_manager.emit(self.session.session_id, {
 
 
2346
  'type': 'token_update',
2347
  'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2348
  'tokens_used': tokens_used,
@@ -3020,7 +3029,9 @@ You are a DOER. Complete workflows based on user intent."""
3020
 
3021
  # Emit progress event for SSE streaming
3022
  if hasattr(self, 'session') and self.session:
3023
- progress_manager.emit(self.session.session_id, {
 
 
3024
  'type': 'tool_executing',
3025
  'tool': tool_name,
3026
  'message': f"πŸ”§ Executing: {tool_name}",
@@ -3040,7 +3051,9 @@ You are a DOER. Complete workflows based on user intent."""
3040
 
3041
  # Emit failure event for SSE streaming
3042
  if hasattr(self, 'session') and self.session:
3043
- progress_manager.emit(self.session.session_id, {
 
 
3044
  'type': 'tool_failed',
3045
  'tool': tool_name,
3046
  'message': f"❌ FAILED: {tool_name}",
@@ -3114,7 +3127,9 @@ You are a DOER. Complete workflows based on user intent."""
3114
 
3115
  # Emit completion event for SSE streaming
3116
  if hasattr(self, 'session') and self.session:
3117
- progress_manager.emit(self.session.session_id, {
 
 
3118
  'type': 'tool_completed',
3119
  'tool': tool_name,
3120
  'message': f"βœ“ Completed: {tool_name}"
 
165
  # Store progress callback
166
  self.progress_callback = progress_callback
167
 
168
+ # Store HTTP session key for SSE streaming (set by app.py)
169
+ self.http_session_key = None
170
+
171
  # Determine provider
172
  self.provider = provider or os.getenv("LLM_PROVIDER", "mistral").lower()
173
 
 
2241
 
2242
  # Emit token update for SSE streaming
2243
  if hasattr(self, 'session') and self.session:
2244
+ # Use HTTP session key if set, otherwise use internal session UUID
2245
+ session_key = getattr(self, 'http_session_key', None) or self.session.session_id
2246
+ progress_manager.emit(session_key, {
2247
  'type': 'token_update',
2248
  'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2249
  'tokens_used': tokens_used,
 
2283
 
2284
  # Emit token update for SSE streaming
2285
  if hasattr(self, 'session') and self.session:
2286
+ # Use HTTP session key if set, otherwise use internal session UUID
2287
+ session_key = getattr(self, 'http_session_key', None) or self.session.session_id
2288
+ progress_manager.emit(session_key, {
2289
  'type': 'token_update',
2290
  'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2291
  'tokens_used': tokens_used,
 
2349
 
2350
  # Emit token update for SSE streaming
2351
  if hasattr(self, 'session') and self.session:
2352
+ # Use HTTP session key if set, otherwise use internal session UUID
2353
+ session_key = getattr(self, 'http_session_key', None) or self.session.session_id
2354
+ progress_manager.emit(session_key, {
2355
  'type': 'token_update',
2356
  'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2357
  'tokens_used': tokens_used,
 
3029
 
3030
  # Emit progress event for SSE streaming
3031
  if hasattr(self, 'session') and self.session:
3032
+ # Use HTTP session key if set, otherwise use internal session UUID
3033
+ session_key = getattr(self, 'http_session_key', None) or self.session.session_id
3034
+ progress_manager.emit(session_key, {
3035
  'type': 'tool_executing',
3036
  'tool': tool_name,
3037
  'message': f"πŸ”§ Executing: {tool_name}",
 
3051
 
3052
  # Emit failure event for SSE streaming
3053
  if hasattr(self, 'session') and self.session:
3054
+ # Use HTTP session key if set, otherwise use internal session UUID
3055
+ session_key = getattr(self, 'http_session_key', None) or self.session.session_id
3056
+ progress_manager.emit(session_key, {
3057
  'type': 'tool_failed',
3058
  'tool': tool_name,
3059
  'message': f"❌ FAILED: {tool_name}",
 
3127
 
3128
  # Emit completion event for SSE streaming
3129
  if hasattr(self, 'session') and self.session:
3130
+ # Use HTTP session key if set, otherwise use internal session UUID
3131
+ session_key = getattr(self, 'http_session_key', None) or self.session.session_id
3132
+ progress_manager.emit(session_key, {
3133
  'type': 'tool_completed',
3134
  'tool': tool_name,
3135
  'message': f"βœ“ Completed: {tool_name}"