redhairedshanks1 commited on
Commit
c895fea
·
1 Parent(s): d79e2c1

Update services/pipeline_executor.py

Browse files
Files changed (1) hide show
  1. services/pipeline_executor.py +46 -41
services/pipeline_executor.py CHANGED
@@ -9,8 +9,9 @@ from typing import Dict, Any, Optional, Generator, List
9
  # For Bedrock LangChain
10
  try:
11
  from langchain_aws import ChatBedrock
12
- from langchain.agents import AgentExecutor, create_tool_calling_agent
13
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
 
14
  from services.master_tools import get_master_tools as get_langchain_tools
15
  BEDROCK_AVAILABLE = True
16
  print("✅ Bedrock LangChain imports successful - BEDROCK_AVAILABLE = True")
@@ -107,65 +108,71 @@ def execute_pipeline_bedrock_streaming(
107
  session_id: Optional[str] = None
108
  ) -> Generator[Dict[str, Any], None, None]:
109
  """
110
- Execute pipeline using Bedrock + LangChain with streaming
111
  """
112
  if not BEDROCK_AVAILABLE:
113
  raise RuntimeError("Bedrock LangChain not available")
114
 
115
  try:
 
116
  llm = ChatBedrock(
117
  model_id="mistral.mistral-large-2402-v1:0",
118
- region_name=os.getenv("AWS_REGION", "us-east-1")
 
 
 
 
119
  )
120
 
121
  tools = get_langchain_tools()
122
 
123
- # STRONGER prompt that FORCES tool usage
124
- system_instructions = """You are MasterLLM, an AI agent that MUST execute document processing tools.
125
 
126
- CRITICAL INSTRUCTIONS:
127
- 1. You MUST call the tools provided to you - do NOT just describe what you would do
128
- 2. Execute the pipeline components IN ORDER
129
- 3. For each component in the pipeline, you MUST:
130
- - Call the corresponding tool with the specified parameters
131
- - Wait for the actual result from the tool
132
- - Use the file_path provided for file operations
133
- - Store the results to pass to the next component
134
- 4. After ALL components are executed, call the 'finalize' tool with the collected results
135
- 5. DO NOT generate placeholder text like "TEXT_EXTRACTION_RESULT" - call the actual tools!
136
 
137
- The pipeline components are structured as:
138
- {{
139
- "tool_name": "extract_text",
140
- "start_page": 1,
141
- "end_page": -1,
142
- "params": {{}}
143
- }}
144
 
145
- You must call tools, not generate descriptions. This is mandatory."""
146
-
147
- prompt = ChatPromptTemplate.from_messages([
148
- ("system", system_instructions),
149
- ("system", "File to process: {file_path}"),
150
- ("system", "Pipeline configuration: {pipeline_json}"),
151
- ("human", "Execute ALL the tools in the pipeline. Call each tool and get real results. Do not describe, actually execute!"),
152
- MessagesPlaceholder(variable_name="agent_scratchpad") # REQUIRED for LangChain agent
153
- ])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
 
155
- agent = create_tool_calling_agent(llm, tools, prompt)
 
156
  executor = AgentExecutor(
157
  agent=agent,
158
  tools=tools,
159
  verbose=True,
160
- max_iterations=25, # Increased for multi-step pipelines
161
  handle_parsing_errors=True,
162
- return_intermediate_steps=True # Important: get intermediate results
163
  )
164
 
165
  # Yield initial status
166
  yield {
167
  "type": "status",
168
- "message": "Initializing Bedrock executor...",
169
  "executor": "bedrock"
170
  }
171
 
@@ -175,7 +182,7 @@ You must call tools, not generate descriptions. This is mandatory."""
175
 
176
  # Stream execution
177
  for event in executor.stream({
178
- "input": f"Execute the pipeline '{pipeline['pipeline_name']}' by calling each tool in the components list",
179
  "file_path": file_path,
180
  "pipeline_json": json.dumps(pipeline, indent=2)
181
  }):
@@ -221,16 +228,15 @@ You must call tools, not generate descriptions. This is mandatory."""
221
 
222
  # Check if tools were actually called
223
  if not has_called_tools:
224
- # Agent didn't call tools, just generated text - this is a failure
225
  yield {
226
  "type": "error",
227
- "error": "Bedrock agent generated text instead of calling tools. Falling back to CrewAI.",
228
  "executor": "bedrock",
229
  "debug_output": str(output)[:500]
230
  }
231
  return
232
 
233
- # If we have tool results, structure them properly
234
  if tool_results:
235
  structured_result = {
236
  "status": "completed",
@@ -248,7 +254,6 @@ You must call tools, not generate descriptions. This is mandatory."""
248
  "executor": "bedrock"
249
  }
250
  else:
251
- # No tool results collected, likely a problem
252
  yield {
253
  "type": "error",
254
  "error": "No tool results collected from Bedrock execution",
 
9
  # For Bedrock LangChain
10
  try:
11
  from langchain_aws import ChatBedrock
12
+ from langchain.agents import AgentExecutor, create_react_agent # Using ReAct instead of tool_calling
13
+ from langchain_core.prompts import PromptTemplate
14
+ from langchain import hub
15
  from services.master_tools import get_master_tools as get_langchain_tools
16
  BEDROCK_AVAILABLE = True
17
  print("✅ Bedrock LangChain imports successful - BEDROCK_AVAILABLE = True")
 
108
  session_id: Optional[str] = None
109
  ) -> Generator[Dict[str, Any], None, None]:
110
  """
111
+ Execute pipeline using Bedrock + LangChain with ReAct agent (works with Mistral)
112
  """
113
  if not BEDROCK_AVAILABLE:
114
  raise RuntimeError("Bedrock LangChain not available")
115
 
116
  try:
117
+ # Use Mistral (the only model you have access to)
118
  llm = ChatBedrock(
119
  model_id="mistral.mistral-large-2402-v1:0",
120
+ region_name=os.getenv("AWS_REGION", "us-east-1"),
121
+ model_kwargs={
122
+ "temperature": 0.0,
123
+ "max_tokens": 4096
124
+ }
125
  )
126
 
127
  tools = get_langchain_tools()
128
 
129
+ # ReAct prompt template - uses text-based reasoning
130
+ react_prompt = PromptTemplate.from_template("""You are MasterLLM, a document processing assistant that executes tools step-by-step.
131
 
132
+ You have access to these tools:
133
+ {tools}
 
 
 
 
 
 
 
 
134
 
135
+ Tool names: {tool_names}
 
 
 
 
 
 
136
 
137
+ Use the following format EXACTLY:
138
+
139
+ Thought: Think about what you need to do
140
+ Action: tool_name
141
+ Action Input: {{"param1": "value1", "param2": value2}}
142
+ Observation: [result will appear here]
143
+ ... (repeat Thought/Action/Action Input/Observation as needed)
144
+ Thought: I have completed all steps
145
+ Final Answer: [summarize what was done]
146
+
147
+ CRITICAL RULES:
148
+ 1. You MUST use the Action/Action Input format to call tools
149
+ 2. Action Input MUST be valid JSON
150
+ 3. After Observation, think again and take the next action
151
+ 4. Call tools for EACH pipeline component
152
+ 5. When done, provide Final Answer
153
+
154
+ File path: {file_path}
155
+ Pipeline to execute: {pipeline_json}
156
+
157
+ Begin! Execute each component in the pipeline.
158
+
159
+ {agent_scratchpad}""")
160
 
161
+ # Create ReAct agent
162
+ agent = create_react_agent(llm, tools, react_prompt)
163
  executor = AgentExecutor(
164
  agent=agent,
165
  tools=tools,
166
  verbose=True,
167
+ max_iterations=25,
168
  handle_parsing_errors=True,
169
+ return_intermediate_steps=True
170
  )
171
 
172
  # Yield initial status
173
  yield {
174
  "type": "status",
175
+ "message": "Initializing Bedrock ReAct executor...",
176
  "executor": "bedrock"
177
  }
178
 
 
182
 
183
  # Stream execution
184
  for event in executor.stream({
185
+ "input": f"Execute the pipeline '{pipeline['pipeline_name']}' by running each tool in the components list",
186
  "file_path": file_path,
187
  "pipeline_json": json.dumps(pipeline, indent=2)
188
  }):
 
228
 
229
  # Check if tools were actually called
230
  if not has_called_tools:
 
231
  yield {
232
  "type": "error",
233
+ "error": "Bedrock ReAct agent didn't call tools properly. Falling back to CrewAI.",
234
  "executor": "bedrock",
235
  "debug_output": str(output)[:500]
236
  }
237
  return
238
 
239
+ # If we have tool results, structure them
240
  if tool_results:
241
  structured_result = {
242
  "status": "completed",
 
254
  "executor": "bedrock"
255
  }
256
  else:
 
257
  yield {
258
  "type": "error",
259
  "error": "No tool results collected from Bedrock execution",