redhairedshanks1 commited on
Commit
f950b83
·
1 Parent(s): b19aecc

Update services/pipeline_executor.py

Browse files
Files changed (1) hide show
  1. services/pipeline_executor.py +112 -99
services/pipeline_executor.py CHANGED
@@ -50,21 +50,32 @@ def execute_pipeline_bedrock(
50
  tools = get_langchain_tools()
51
 
52
  system_instructions = """You are MasterLLM, a precise document processing agent.
 
53
  Execute the provided pipeline components in ORDER. For each component:
54
  1. Call the corresponding tool with exact parameters
55
  2. Wait for the result
56
  3. Move to next component
 
57
  IMPORTANT:
58
  - Follow the pipeline order strictly
59
  - Use the file_path provided for all file-based operations
60
  - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
61
- - At the end, call 'finalize' tool with complete results"""
 
 
 
 
 
 
 
 
62
 
63
  prompt = ChatPromptTemplate.from_messages([
64
  ("system", system_instructions),
65
- ("human", "Execute the pipeline for document: {file_path}"),
66
- ("human", "Pipeline to execute: {pipeline_json}"),
67
- MessagesPlaceholder(variable_name="agent_scratchpad"),
 
68
  ])
69
 
70
  agent = create_tool_calling_agent(llm, tools, prompt)
@@ -80,7 +91,7 @@ IMPORTANT:
80
  "input": f"Execute pipeline: {pipeline['pipeline_name']}",
81
  "file_path": file_path,
82
  "pipeline_json": json.dumps(pipeline, indent=2),
83
- "agent_scratchpad": [] # Add this
84
  })
85
 
86
  return result
@@ -108,23 +119,35 @@ def execute_pipeline_bedrock_streaming(
108
 
109
  tools = get_langchain_tools()
110
 
111
- # Fixed prompt template with required placeholders
112
- system_instructions = """You are MasterLLM, a precise document processing agent.
113
- Execute the provided pipeline components in ORDER. For each component:
114
- 1. Call the corresponding tool with exact parameters
115
- 2. Wait for the result
116
- 3. Move to next component
117
- IMPORTANT:
118
- - Follow the pipeline order strictly
119
- - Use the file_path provided for all file-based operations
120
- - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
121
- - At the end, call 'finalize' tool with complete results"""
 
 
 
 
 
 
 
 
 
 
 
 
122
 
123
  prompt = ChatPromptTemplate.from_messages([
124
  ("system", system_instructions),
125
- ("human", "Execute the pipeline for document: {file_path}"),
126
- ("human", "Pipeline to execute: {pipeline_json}"),
127
- MessagesPlaceholder(variable_name="agent_scratchpad"),
128
  ])
129
 
130
  agent = create_tool_calling_agent(llm, tools, prompt)
@@ -132,9 +155,9 @@ IMPORTANT:
132
  agent=agent,
133
  tools=tools,
134
  verbose=True,
135
- max_iterations=15,
136
  handle_parsing_errors=True,
137
- return_intermediate_steps=True,
138
  )
139
 
140
  # Yield initial status
@@ -144,103 +167,92 @@ IMPORTANT:
144
  "executor": "bedrock"
145
  }
146
 
147
- # Stream execution with proper inputs including agent_scratchpad
148
- try:
149
- # Initialize inputs
150
- inputs = {
151
- "input": f"Execute pipeline: {pipeline['pipeline_name']} for file: {file_path}",
152
- "file_path": file_path,
153
- "pipeline_json": json.dumps(pipeline, indent=2),
154
- "agent_scratchpad": [], # Initialize empty scratchpad
155
- }
156
-
157
- step_count = 0
158
-
159
- # Stream execution
160
- for event in executor.stream(inputs):
161
- # Handle different event types
162
- if "agent" in event:
163
- # Agent is thinking/acting
164
  step_count += 1
165
- action = event.get("agent", {})
166
- tool = action.get("tool", "thinking")
 
167
 
168
  yield {
169
  "type": "step",
170
  "step": step_count,
171
  "tool": tool,
172
  "status": "executing",
173
- "executor": "bedrock"
 
174
  }
175
-
176
- elif "actions" in event:
177
- # Multiple actions
178
- for action in event.get("actions", []):
179
- step_count += 1
180
- tool = getattr(action, "tool", "unknown")
181
- yield {
182
- "type": "step",
183
- "step": step_count,
184
- "tool": tool,
185
- "status": "executing",
186
- "executor": "bedrock"
187
- }
188
-
189
- elif "steps" in event:
190
- # Steps completed
191
- for step in event.get("steps", []):
192
- observation = str(getattr(step, "observation", ""))[:500]
193
  yield {
194
  "type": "step",
195
  "step": step_count,
 
196
  "status": "completed",
197
- "observation": observation,
198
  "executor": "bedrock"
199
  }
 
 
 
 
200
 
201
- elif "output" in event:
202
- # Final output
 
203
  yield {
204
- "type": "final",
205
- "data": event.get("output"),
206
- "executor": "bedrock"
 
207
  }
208
  return
209
 
210
- elif "intermediate_steps" in event:
211
- # Intermediate steps (if enabled)
212
- steps = event.get("intermediate_steps", [])
213
- for i, (action, observation) in enumerate(steps):
214
- yield {
215
- "type": "step",
216
- "step": i + 1,
217
- "tool": getattr(action, "tool", str(action)),
218
- "status": "completed",
219
- "observation": str(observation)[:500],
220
- "executor": "bedrock"
221
- }
222
-
223
- except Exception as stream_error:
224
- # If streaming fails, try non-streaming execution
225
- yield {
226
- "type": "warning",
227
- "message": f"Streaming failed, trying non-streaming: {str(stream_error)}",
228
- "executor": "bedrock"
229
- }
230
-
231
- # Fallback to non-streaming execution
232
- result = executor.invoke({
233
- "input": f"Execute pipeline: {pipeline['pipeline_name']}",
234
- "file_path": file_path,
235
- "pipeline_json": json.dumps(pipeline, indent=2),
236
- "agent_scratchpad": []
237
- })
238
-
239
- yield {
240
- "type": "final",
241
- "data": result.get("output"),
242
- "executor": "bedrock"
243
- }
244
 
245
  except Exception as e:
246
  yield {
@@ -249,6 +261,7 @@ IMPORTANT:
249
  "executor": "bedrock"
250
  }
251
 
 
252
  # ========================
253
  # CREWAI EXECUTOR (FALLBACK)
254
  # ========================
 
50
  tools = get_langchain_tools()
51
 
52
  system_instructions = """You are MasterLLM, a precise document processing agent.
53
+
54
  Execute the provided pipeline components in ORDER. For each component:
55
  1. Call the corresponding tool with exact parameters
56
  2. Wait for the result
57
  3. Move to next component
58
+
59
  IMPORTANT:
60
  - Follow the pipeline order strictly
61
  - Use the file_path provided for all file-based operations
62
  - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
63
+ - At the end, call 'finalize' tool with complete results
64
+
65
+ Pipeline components will be in format:
66
+ {
67
+ "tool_name": "extract_text",
68
+ "start_page": 1,
69
+ "end_page": 5,
70
+ "params": {}
71
+ }"""
72
 
73
  prompt = ChatPromptTemplate.from_messages([
74
  ("system", system_instructions),
75
+ ("system", "File path: {file_path}"),
76
+ ("system", "Pipeline to execute: {pipeline_json}"),
77
+ ("system", "Session ID: {session_id}"),
78
+ ("human", "Execute the pipeline. Process each component in order and finalize with complete JSON results.")
79
  ])
80
 
81
  agent = create_tool_calling_agent(llm, tools, prompt)
 
91
  "input": f"Execute pipeline: {pipeline['pipeline_name']}",
92
  "file_path": file_path,
93
  "pipeline_json": json.dumps(pipeline, indent=2),
94
+ "session_id": session_id or "unknown"
95
  })
96
 
97
  return result
 
119
 
120
  tools = get_langchain_tools()
121
 
122
+ # STRONGER prompt that FORCES tool usage
123
+ system_instructions = """You are MasterLLM, an AI agent that MUST execute document processing tools.
124
+
125
+ CRITICAL INSTRUCTIONS:
126
+ 1. You MUST call the tools provided to you - do NOT just describe what you would do
127
+ 2. Execute the pipeline components IN ORDER
128
+ 3. For each component in the pipeline, you MUST:
129
+ - Call the corresponding tool with the specified parameters
130
+ - Wait for the actual result from the tool
131
+ - Use the file_path provided for file operations
132
+ - Store the results to pass to the next component
133
+ 4. After ALL components are executed, call the 'finalize' tool with the collected results
134
+ 5. DO NOT generate placeholder text like "TEXT_EXTRACTION_RESULT" - call the actual tools!
135
+
136
+ The pipeline components are structured as:
137
+ {
138
+ "tool_name": "extract_text",
139
+ "start_page": 1,
140
+ "end_page": -1,
141
+ "params": {}
142
+ }
143
+
144
+ You must call tools, not generate descriptions. This is mandatory."""
145
 
146
  prompt = ChatPromptTemplate.from_messages([
147
  ("system", system_instructions),
148
+ ("system", "File to process: {file_path}"),
149
+ ("system", "Pipeline configuration: {pipeline_json}"),
150
+ ("human", "Execute ALL the tools in the pipeline. Call each tool and get real results. Do not describe, actually execute!")
151
  ])
152
 
153
  agent = create_tool_calling_agent(llm, tools, prompt)
 
155
  agent=agent,
156
  tools=tools,
157
  verbose=True,
158
+ max_iterations=25, # Increased for multi-step pipelines
159
  handle_parsing_errors=True,
160
+ return_intermediate_steps=True # Important: get intermediate results
161
  )
162
 
163
  # Yield initial status
 
167
  "executor": "bedrock"
168
  }
169
 
170
+ step_count = 0
171
+ tool_results = {}
172
+ has_called_tools = False
173
+
174
+ # Stream execution
175
+ for event in executor.stream({
176
+ "input": f"Execute the pipeline '{pipeline['pipeline_name']}' by calling each tool in the components list",
177
+ "file_path": file_path,
178
+ "pipeline_json": json.dumps(pipeline, indent=2)
179
+ }):
180
+ # Track tool actions
181
+ if "actions" in event:
182
+ for action in event.get("actions", []):
 
 
 
 
183
  step_count += 1
184
+ has_called_tools = True
185
+ tool = getattr(action, "tool", "unknown")
186
+ tool_input = getattr(action, "tool_input", {})
187
 
188
  yield {
189
  "type": "step",
190
  "step": step_count,
191
  "tool": tool,
192
  "status": "executing",
193
+ "executor": "bedrock",
194
+ "input": str(tool_input)[:200]
195
  }
196
+
197
+ # Track tool outputs
198
+ elif "steps" in event:
199
+ for step in event.get("steps", []):
200
+ action = getattr(step, "action", None)
201
+ observation = getattr(step, "observation", "")
202
+
203
+ if action:
204
+ tool_name = getattr(action, "tool", "unknown")
205
+ tool_results[tool_name] = observation
206
+
 
 
 
 
 
 
 
207
  yield {
208
  "type": "step",
209
  "step": step_count,
210
+ "tool": tool_name,
211
  "status": "completed",
212
+ "observation": str(observation)[:500],
213
  "executor": "bedrock"
214
  }
215
+
216
+ # Final output
217
+ elif "output" in event:
218
+ output = event.get("output")
219
 
220
+ # Check if tools were actually called
221
+ if not has_called_tools:
222
+ # Agent didn't call tools, just generated text - this is a failure
223
  yield {
224
+ "type": "error",
225
+ "error": "Bedrock agent generated text instead of calling tools. Falling back to CrewAI.",
226
+ "executor": "bedrock",
227
+ "debug_output": str(output)[:500]
228
  }
229
  return
230
 
231
+ # If we have tool results, structure them properly
232
+ if tool_results:
233
+ structured_result = {
234
+ "status": "completed",
235
+ "components_executed": tool_results,
236
+ "summary": {
237
+ "total_tools_called": len(tool_results),
238
+ "tools": list(tool_results.keys())
239
+ },
240
+ "final_output": output if isinstance(output, str) else json.dumps(output)
241
+ }
242
+
243
+ yield {
244
+ "type": "final",
245
+ "data": structured_result,
246
+ "executor": "bedrock"
247
+ }
248
+ else:
249
+ # No tool results collected, likely a problem
250
+ yield {
251
+ "type": "error",
252
+ "error": "No tool results collected from Bedrock execution",
253
+ "executor": "bedrock"
254
+ }
255
+ return
 
 
 
 
 
 
 
 
 
256
 
257
  except Exception as e:
258
  yield {
 
261
  "executor": "bedrock"
262
  }
263
 
264
+
265
  # ========================
266
  # CREWAI EXECUTOR (FALLBACK)
267
  # ========================