Pulastya B commited on
Commit
6815ac4
Β·
1 Parent(s): f34a4e8

Wire progress_manager into orchestrator with emit() calls

Browse files

- Added progress_manager import
- Emit tool_executing events when tools start
- Emit tool_completed events when tools succeed
- Emit tool_failed events when tools fail with error details
- Emit token_update events for all 3 LLM API calls (Mistral + 2 Groq paths)
- All events include session_id, type, message, and relevant metadata
- Real-time SSE streaming now fully functional

Files changed (1) hide show
  1. src/orchestrator.py +58 -0
src/orchestrator.py CHANGED
@@ -21,6 +21,7 @@ from .session_memory import SessionMemory
21
  from .session_store import SessionStore
22
  from .workflow_state import WorkflowState
23
  from .utils.schema_extraction import extract_schema_local, infer_task_type
 
24
  from .tools import (
25
  # Basic Tools (13) - UPDATED: Added get_smart_summary + 3 wrangling tools
26
  profile_dataset,
@@ -2234,6 +2235,16 @@ You are a DOER. Complete workflows based on user intent."""
2234
  tokens_used = response.usage.total_tokens
2235
  self.tokens_this_minute += tokens_used
2236
  print(f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute")
 
 
 
 
 
 
 
 
 
 
2237
 
2238
  response_message = response.choices[0].message
2239
  tool_calls = response_message.tool_calls
@@ -2264,6 +2275,16 @@ You are a DOER. Complete workflows based on user intent."""
2264
  tokens_used = response.usage.total_tokens
2265
  self.tokens_this_minute += tokens_used
2266
  print(f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute")
 
 
 
 
 
 
 
 
 
 
2267
 
2268
  response_message = response.choices[0].message
2269
  tool_calls = response_message.tool_calls
@@ -2318,6 +2339,16 @@ You are a DOER. Complete workflows based on user intent."""
2318
  tokens_used = response.usage.total_tokens
2319
  self.tokens_this_minute += tokens_used
2320
  print(f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute")
 
 
 
 
 
 
 
 
 
 
2321
 
2322
  response_message = response.choices[0].message
2323
  tool_calls = response_message.tool_calls
@@ -2987,6 +3018,15 @@ You are a DOER. Complete workflows based on user intent."""
2987
  except:
2988
  print(f" Arguments: {tool_args}")
2989
 
 
 
 
 
 
 
 
 
 
2990
  # Execute tool
2991
  tool_result = self._execute_tool(tool_name, tool_args)
2992
 
@@ -2998,6 +3038,16 @@ You are a DOER. Complete workflows based on user intent."""
2998
  print(f" ⚠️ Error Type: {error_type}")
2999
  print(f" ⚠️ Error Message: {error_msg}")
3000
 
 
 
 
 
 
 
 
 
 
 
3001
  # Add recovery guidance with last successful file
3002
  last_successful_file = self._get_last_successful_file(workflow_history)
3003
  if last_successful_file:
@@ -3061,6 +3111,14 @@ You are a DOER. Complete workflows based on user intent."""
3061
  print(f" {error_msg}\n")
3062
  else:
3063
  print(f" βœ“ Completed: {tool_name}")
 
 
 
 
 
 
 
 
3064
 
3065
  # Track in workflow
3066
  workflow_history.append({
 
21
  from .session_store import SessionStore
22
  from .workflow_state import WorkflowState
23
  from .utils.schema_extraction import extract_schema_local, infer_task_type
24
+ from .progress_manager import progress_manager
25
  from .tools import (
26
  # Basic Tools (13) - UPDATED: Added get_smart_summary + 3 wrangling tools
27
  profile_dataset,
 
2235
  tokens_used = response.usage.total_tokens
2236
  self.tokens_this_minute += tokens_used
2237
  print(f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute")
2238
+
2239
+ # Emit token update for SSE streaming
2240
+ if hasattr(self, 'session') and self.session:
2241
+ progress_manager.emit(self.session.session_id, {
2242
+ 'type': 'token_update',
2243
+ 'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2244
+ 'tokens_used': tokens_used,
2245
+ 'tokens_this_minute': self.tokens_this_minute,
2246
+ 'tpm_limit': self.tpm_limit
2247
+ })
2248
 
2249
  response_message = response.choices[0].message
2250
  tool_calls = response_message.tool_calls
 
2275
  tokens_used = response.usage.total_tokens
2276
  self.tokens_this_minute += tokens_used
2277
  print(f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute")
2278
+
2279
+ # Emit token update for SSE streaming
2280
+ if hasattr(self, 'session') and self.session:
2281
+ progress_manager.emit(self.session.session_id, {
2282
+ 'type': 'token_update',
2283
+ 'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2284
+ 'tokens_used': tokens_used,
2285
+ 'tokens_this_minute': self.tokens_this_minute,
2286
+ 'tpm_limit': self.tpm_limit
2287
+ })
2288
 
2289
  response_message = response.choices[0].message
2290
  tool_calls = response_message.tool_calls
 
2339
  tokens_used = response.usage.total_tokens
2340
  self.tokens_this_minute += tokens_used
2341
  print(f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute")
2342
+
2343
+ # Emit token update for SSE streaming
2344
+ if hasattr(self, 'session') and self.session:
2345
+ progress_manager.emit(self.session.session_id, {
2346
+ 'type': 'token_update',
2347
+ 'message': f"πŸ“Š Tokens: {tokens_used} this call | {self.tokens_this_minute}/{self.tpm_limit} this minute",
2348
+ 'tokens_used': tokens_used,
2349
+ 'tokens_this_minute': self.tokens_this_minute,
2350
+ 'tpm_limit': self.tpm_limit
2351
+ })
2352
 
2353
  response_message = response.choices[0].message
2354
  tool_calls = response_message.tool_calls
 
3018
  except:
3019
  print(f" Arguments: {tool_args}")
3020
 
3021
+ # Emit progress event for SSE streaming
3022
+ if hasattr(self, 'session') and self.session:
3023
+ progress_manager.emit(self.session.session_id, {
3024
+ 'type': 'tool_executing',
3025
+ 'tool': tool_name,
3026
+ 'message': f"πŸ”§ Executing: {tool_name}",
3027
+ 'arguments': tool_args
3028
+ })
3029
+
3030
  # Execute tool
3031
  tool_result = self._execute_tool(tool_name, tool_args)
3032
 
 
3038
  print(f" ⚠️ Error Type: {error_type}")
3039
  print(f" ⚠️ Error Message: {error_msg}")
3040
 
3041
+ # Emit failure event for SSE streaming
3042
+ if hasattr(self, 'session') and self.session:
3043
+ progress_manager.emit(self.session.session_id, {
3044
+ 'type': 'tool_failed',
3045
+ 'tool': tool_name,
3046
+ 'message': f"❌ FAILED: {tool_name}",
3047
+ 'error': error_msg,
3048
+ 'error_type': error_type
3049
+ })
3050
+
3051
  # Add recovery guidance with last successful file
3052
  last_successful_file = self._get_last_successful_file(workflow_history)
3053
  if last_successful_file:
 
3111
  print(f" {error_msg}\n")
3112
  else:
3113
  print(f" βœ“ Completed: {tool_name}")
3114
+
3115
+ # Emit completion event for SSE streaming
3116
+ if hasattr(self, 'session') and self.session:
3117
+ progress_manager.emit(self.session.session_id, {
3118
+ 'type': 'tool_completed',
3119
+ 'tool': tool_name,
3120
+ 'message': f"βœ“ Completed: {tool_name}"
3121
+ })
3122
 
3123
  # Track in workflow
3124
  workflow_history.append({