redhairedshanks1 commited on
Commit
199576c
·
1 Parent(s): c895fea

langchain sends the reques in specific format and bedrock doesnt allow it, so the request doesnt reaches mistral and now manual calling

Browse files
Files changed (1) hide show
  1. services/pipeline_executor.py +160 -93
services/pipeline_executor.py CHANGED
@@ -108,109 +108,148 @@ def execute_pipeline_bedrock_streaming(
108
  session_id: Optional[str] = None
109
  ) -> Generator[Dict[str, Any], None, None]:
110
  """
111
- Execute pipeline using Bedrock + LangChain with ReAct agent (works with Mistral)
112
  """
113
  if not BEDROCK_AVAILABLE:
114
  raise RuntimeError("Bedrock LangChain not available")
115
 
116
  try:
117
- # Use Mistral (the only model you have access to)
118
- llm = ChatBedrock(
119
- model_id="mistral.mistral-large-2402-v1:0",
120
- region_name=os.getenv("AWS_REGION", "us-east-1"),
121
- model_kwargs={
122
- "temperature": 0.0,
123
- "max_tokens": 4096
124
- }
125
  )
126
 
127
- tools = get_langchain_tools()
128
 
129
- # ReAct prompt template - uses text-based reasoning
130
- react_prompt = PromptTemplate.from_template("""You are MasterLLM, a document processing assistant that executes tools step-by-step.
 
 
 
 
 
 
 
 
131
 
132
  You have access to these tools:
133
- {tools}
134
-
135
- Tool names: {tool_names}
136
 
137
- Use the following format EXACTLY:
138
-
139
- Thought: Think about what you need to do
140
  Action: tool_name
141
  Action Input: {{"param1": "value1", "param2": value2}}
142
- Observation: [result will appear here]
143
- ... (repeat Thought/Action/Action Input/Observation as needed)
144
- Thought: I have completed all steps
145
- Final Answer: [summarize what was done]
146
 
147
- CRITICAL RULES:
148
- 1. You MUST use the Action/Action Input format to call tools
149
- 2. Action Input MUST be valid JSON
150
- 3. After Observation, think again and take the next action
151
- 4. Call tools for EACH pipeline component
152
- 5. When done, provide Final Answer
 
 
153
 
154
  File path: {file_path}
155
- Pipeline to execute: {pipeline_json}
 
156
 
157
- Begin! Execute each component in the pipeline.
158
 
159
- {agent_scratchpad}""")
160
 
161
- # Create ReAct agent
162
- agent = create_react_agent(llm, tools, react_prompt)
163
- executor = AgentExecutor(
164
- agent=agent,
165
- tools=tools,
166
- verbose=True,
167
- max_iterations=25,
168
- handle_parsing_errors=True,
169
- return_intermediate_steps=True
170
- )
171
 
172
- # Yield initial status
173
  yield {
174
  "type": "status",
175
- "message": "Initializing Bedrock ReAct executor...",
176
  "executor": "bedrock"
177
  }
178
 
179
- step_count = 0
180
- tool_results = {}
181
- has_called_tools = False
182
-
183
- # Stream execution
184
- for event in executor.stream({
185
- "input": f"Execute the pipeline '{pipeline['pipeline_name']}' by running each tool in the components list",
186
- "file_path": file_path,
187
- "pipeline_json": json.dumps(pipeline, indent=2)
188
- }):
189
- # Track tool actions
190
- if "actions" in event:
191
- for action in event.get("actions", []):
192
- step_count += 1
193
- has_called_tools = True
194
- tool = getattr(action, "tool", "unknown")
195
- tool_input = getattr(action, "tool_input", {})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
 
197
  yield {
198
- "type": "step",
199
- "step": step_count,
200
- "tool": tool,
201
- "status": "executing",
202
- "executor": "bedrock",
203
- "input": str(tool_input)[:200]
204
  }
 
 
 
 
 
 
 
 
 
 
 
205
 
206
- # Track tool outputs
207
- elif "steps" in event:
208
- for step in event.get("steps", []):
209
- action = getattr(step, "action", None)
210
- observation = getattr(step, "observation", "")
 
 
211
 
212
- if action:
213
- tool_name = getattr(action, "tool", "unknown")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
  tool_results[tool_name] = observation
215
 
216
  yield {
@@ -221,23 +260,33 @@ Begin! Execute each component in the pipeline.
221
  "observation": str(observation)[:500],
222
  "executor": "bedrock"
223
  }
224
-
225
- # Final output
226
- elif "output" in event:
227
- output = event.get("output")
228
-
229
- # Check if tools were actually called
230
- if not has_called_tools:
 
 
 
 
 
 
 
 
 
 
 
231
  yield {
232
  "type": "error",
233
- "error": "Bedrock ReAct agent didn't call tools properly. Falling back to CrewAI.",
234
  "executor": "bedrock",
235
- "debug_output": str(output)[:500]
236
  }
237
  return
238
-
239
- # If we have tool results, structure them
240
- if tool_results:
241
  structured_result = {
242
  "status": "completed",
243
  "components_executed": tool_results,
@@ -245,7 +294,7 @@ Begin! Execute each component in the pipeline.
245
  "total_tools_called": len(tool_results),
246
  "tools": list(tool_results.keys())
247
  },
248
- "final_output": output if isinstance(output, str) else json.dumps(output)
249
  }
250
 
251
  yield {
@@ -253,13 +302,31 @@ Begin! Execute each component in the pipeline.
253
  "data": structured_result,
254
  "executor": "bedrock"
255
  }
256
- else:
257
- yield {
258
- "type": "error",
259
- "error": "No tool results collected from Bedrock execution",
260
- "executor": "bedrock"
261
- }
262
- return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
  except Exception as e:
265
  yield {
 
108
  session_id: Optional[str] = None
109
  ) -> Generator[Dict[str, Any], None, None]:
110
  """
111
+ Execute pipeline using Bedrock with MANUAL tool calling loop (bypasses LangChain agents)
112
  """
113
  if not BEDROCK_AVAILABLE:
114
  raise RuntimeError("Bedrock LangChain not available")
115
 
116
  try:
117
+ import re
118
+ import boto3
119
+
120
+ # Get Bedrock client directly
121
+ bedrock_runtime = boto3.client(
122
+ service_name='bedrock-runtime',
123
+ region_name=os.getenv("AWS_REGION", "us-east-1")
 
124
  )
125
 
126
+ tools_dict = {tool.name: tool for tool in get_langchain_tools()}
127
 
128
+ # Build tool descriptions for prompt
129
+ tool_descriptions = []
130
+ for name, tool in tools_dict.items():
131
+ tool_descriptions.append(f"- {name}: {tool.description}")
132
+
133
+ tools_text = "\n".join(tool_descriptions)
134
+ tool_names = ", ".join(tools_dict.keys())
135
+
136
+ # Initial prompt
137
+ system_prompt = f"""You are MasterLLM, a document processing assistant.
138
 
139
  You have access to these tools:
140
+ {tools_text}
 
 
141
 
142
+ To use a tool, you MUST write EXACTLY in this format:
 
 
143
  Action: tool_name
144
  Action Input: {{"param1": "value1", "param2": value2}}
 
 
 
 
145
 
146
+ After you write Action and Action Input, I will execute the tool and give you the Observation.
147
+ Then you can take another Action or provide your Final Answer.
148
+
149
+ CRITICAL:
150
+ - Write "Action:" followed by the tool name
151
+ - Write "Action Input:" followed by valid JSON on the SAME line or next line
152
+ - After seeing Observation, you can take another Action
153
+ - When done, write "Final Answer:" followed by summary
154
 
155
  File path: {file_path}
156
+ Pipeline components to execute:
157
+ {json.dumps(pipeline.get('components', []), indent=2)}
158
 
159
+ Execute each component by calling the corresponding tool."""
160
 
161
+ user_message = f"Execute the pipeline: {pipeline['pipeline_name']}"
162
 
163
+ conversation_history = []
164
+ tool_results = {}
165
+ has_called_tools = False
166
+ step_count = 0
167
+ max_iterations = 10
 
 
 
 
 
168
 
 
169
  yield {
170
  "type": "status",
171
+ "message": "Initializing Bedrock manual executor...",
172
  "executor": "bedrock"
173
  }
174
 
175
+ for iteration in range(max_iterations):
176
+ # Prepare messages
177
+ messages = [{"role": "user", "content": user_message}]
178
+ messages.extend(conversation_history)
179
+
180
+ # Call Bedrock directly using converse API
181
+ response = bedrock_runtime.converse(
182
+ modelId="mistral.mistral-large-2402-v1:0",
183
+ messages=messages,
184
+ system=[{"text": system_prompt}],
185
+ inferenceConfig={
186
+ "temperature": 0.0,
187
+ "maxTokens": 2048
188
+ }
189
+ )
190
+
191
+ # Get response text
192
+ assistant_message = response['output']['message']['content'][0]['text']
193
+ print(f"\n🤖 Mistral Response (Iteration {iteration + 1}):\n{assistant_message}\n")
194
+
195
+ # Add to conversation
196
+ conversation_history.append({"role": "assistant", "content": assistant_message})
197
+
198
+ # Check for Final Answer
199
+ if "Final Answer:" in assistant_message or "final answer" in assistant_message.lower():
200
+ # Done!
201
+ if tool_results:
202
+ structured_result = {
203
+ "status": "completed",
204
+ "components_executed": tool_results,
205
+ "summary": {
206
+ "total_tools_called": len(tool_results),
207
+ "tools": list(tool_results.keys())
208
+ },
209
+ "final_output": assistant_message
210
+ }
211
 
212
  yield {
213
+ "type": "final",
214
+ "data": structured_result,
215
+ "executor": "bedrock"
 
 
 
216
  }
217
+ else:
218
+ yield {
219
+ "type": "error",
220
+ "error": "Bedrock completed but no tools were called",
221
+ "executor": "bedrock"
222
+ }
223
+ return
224
+
225
+ # Parse for Action and Action Input
226
+ action_match = re.search(r'Action:\s*(\w+)', assistant_message)
227
+ action_input_match = re.search(r'Action Input:\s*(\{.*?\})', assistant_message, re.DOTALL)
228
 
229
+ if action_match and action_input_match:
230
+ tool_name = action_match.group(1)
231
+ action_input_str = action_input_match.group(1)
232
+
233
+ try:
234
+ # Parse JSON input
235
+ tool_input = json.loads(action_input_str)
236
 
237
+ if tool_name in tools_dict:
238
+ step_count += 1
239
+ has_called_tools = True
240
+
241
+ yield {
242
+ "type": "step",
243
+ "step": step_count,
244
+ "tool": tool_name,
245
+ "status": "executing",
246
+ "executor": "bedrock",
247
+ "input": str(tool_input)[:200]
248
+ }
249
+
250
+ # Execute the tool!
251
+ tool = tools_dict[tool_name]
252
+ observation = tool.invoke(tool_input)
253
  tool_results[tool_name] = observation
254
 
255
  yield {
 
260
  "observation": str(observation)[:500],
261
  "executor": "bedrock"
262
  }
263
+
264
+ # Add observation to conversation
265
+ observation_message = f"Observation: {observation}"
266
+ conversation_history.append({"role": "user", "content": observation_message})
267
+
268
+ else:
269
+ # Unknown tool
270
+ error_msg = f"Unknown tool: {tool_name}"
271
+ conversation_history.append({"role": "user", "content": f"Error: {error_msg}"})
272
+
273
+ except json.JSONDecodeError as e:
274
+ # Invalid JSON
275
+ error_msg = f"Invalid JSON in Action Input: {e}"
276
+ conversation_history.append({"role": "user", "content": f"Error: {error_msg}"})
277
+ else:
278
+ # No action found - agent might be confused or done
279
+ if iteration > 0 and not has_called_tools:
280
+ # Agent isn't calling tools properly
281
  yield {
282
  "type": "error",
283
+ "error": "Bedrock didn't call tools in correct format. Falling back to CrewAI.",
284
  "executor": "bedrock",
285
+ "debug_output": assistant_message[:500]
286
  }
287
  return
288
+ elif iteration > 0:
289
+ # Has called some tools but stopped - might be done
 
290
  structured_result = {
291
  "status": "completed",
292
  "components_executed": tool_results,
 
294
  "total_tools_called": len(tool_results),
295
  "tools": list(tool_results.keys())
296
  },
297
+ "final_output": assistant_message
298
  }
299
 
300
  yield {
 
302
  "data": structured_result,
303
  "executor": "bedrock"
304
  }
305
+ return
306
+
307
+ # Max iterations reached
308
+ if tool_results:
309
+ structured_result = {
310
+ "status": "completed",
311
+ "components_executed": tool_results,
312
+ "summary": {
313
+ "total_tools_called": len(tool_results),
314
+ "tools": list(tool_results.keys())
315
+ },
316
+ "final_output": "Max iterations reached"
317
+ }
318
+
319
+ yield {
320
+ "type": "final",
321
+ "data": structured_result,
322
+ "executor": "bedrock"
323
+ }
324
+ else:
325
+ yield {
326
+ "type": "error",
327
+ "error": "Max iterations reached without tool calls",
328
+ "executor": "bedrock"
329
+ }
330
 
331
  except Exception as e:
332
  yield {