redhairedshanks1 commited on
Commit
b19aecc
·
1 Parent(s): 79998e9

Update services/pipeline_executor.py

Browse files
Files changed (1) hide show
  1. services/pipeline_executor.py +105 -49
services/pipeline_executor.py CHANGED
@@ -50,32 +50,21 @@ def execute_pipeline_bedrock(
50
  tools = get_langchain_tools()
51
 
52
  system_instructions = """You are MasterLLM, a precise document processing agent.
53
-
54
  Execute the provided pipeline components in ORDER. For each component:
55
  1. Call the corresponding tool with exact parameters
56
  2. Wait for the result
57
  3. Move to next component
58
-
59
  IMPORTANT:
60
  - Follow the pipeline order strictly
61
  - Use the file_path provided for all file-based operations
62
  - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
63
- - At the end, call 'finalize' tool with complete results
64
-
65
- Pipeline components will be in format:
66
- {
67
- "tool_name": "extract_text",
68
- "start_page": 1,
69
- "end_page": 5,
70
- "params": {}
71
- }"""
72
 
73
  prompt = ChatPromptTemplate.from_messages([
74
  ("system", system_instructions),
75
- ("system", "File path: {file_path}"),
76
- ("system", "Pipeline to execute: {pipeline_json}"),
77
- ("system", "Session ID: {session_id}"),
78
- ("human", "Execute the pipeline. Process each component in order and finalize with complete JSON results.")
79
  ])
80
 
81
  agent = create_tool_calling_agent(llm, tools, prompt)
@@ -91,7 +80,7 @@ Pipeline components will be in format:
91
  "input": f"Execute pipeline: {pipeline['pipeline_name']}",
92
  "file_path": file_path,
93
  "pipeline_json": json.dumps(pipeline, indent=2),
94
- "session_id": session_id or "unknown"
95
  })
96
 
97
  return result
@@ -119,15 +108,23 @@ def execute_pipeline_bedrock_streaming(
119
 
120
  tools = get_langchain_tools()
121
 
122
- system_instructions = """You are MasterLLM. Execute the pipeline components in ORDER.
123
-
124
- For each component, call the tool and wait for results."""
 
 
 
 
 
 
 
 
125
 
126
  prompt = ChatPromptTemplate.from_messages([
127
  ("system", system_instructions),
128
- ("system", "File: {file_path}"),
129
- ("system", "Pipeline: {pipeline_json}"),
130
- ("human", "Execute the pipeline")
131
  ])
132
 
133
  agent = create_tool_calling_agent(llm, tools, prompt)
@@ -137,6 +134,7 @@ For each component, call the tool and wait for results."""
137
  verbose=True,
138
  max_iterations=15,
139
  handle_parsing_errors=True,
 
140
  )
141
 
142
  # Yield initial status
@@ -146,18 +144,27 @@ For each component, call the tool and wait for results."""
146
  "executor": "bedrock"
147
  }
148
 
149
- step_count = 0
150
-
151
- # Stream execution
152
- for event in executor.stream({
153
- "input": f"Execute: {pipeline['pipeline_name']}",
154
- "file_path": file_path,
155
- "pipeline_json": json.dumps(pipeline, indent=2)
156
- }):
157
- if "actions" in event:
158
- for action in event.get("actions", []):
 
 
 
 
 
 
 
159
  step_count += 1
160
- tool = getattr(action, "tool", "unknown")
 
 
161
  yield {
162
  "type": "step",
163
  "step": step_count,
@@ -165,25 +172,75 @@ For each component, call the tool and wait for results."""
165
  "status": "executing",
166
  "executor": "bedrock"
167
  }
168
-
169
- elif "steps" in event:
170
- for step in event.get("steps", []):
171
- observation = str(getattr(step, "observation", ""))[:500]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
172
  yield {
173
- "type": "step",
174
- "step": step_count,
175
- "status": "completed",
176
- "observation": observation,
177
  "executor": "bedrock"
178
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
 
180
- elif "output" in event:
181
- yield {
182
- "type": "final",
183
- "data": event.get("output"),
184
- "executor": "bedrock"
185
- }
186
- return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
187
 
188
  except Exception as e:
189
  yield {
@@ -192,7 +249,6 @@ For each component, call the tool and wait for results."""
192
  "executor": "bedrock"
193
  }
194
 
195
-
196
  # ========================
197
  # CREWAI EXECUTOR (FALLBACK)
198
  # ========================
 
50
  tools = get_langchain_tools()
51
 
52
  system_instructions = """You are MasterLLM, a precise document processing agent.
 
53
  Execute the provided pipeline components in ORDER. For each component:
54
  1. Call the corresponding tool with exact parameters
55
  2. Wait for the result
56
  3. Move to next component
 
57
  IMPORTANT:
58
  - Follow the pipeline order strictly
59
  - Use the file_path provided for all file-based operations
60
  - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
61
+ - At the end, call 'finalize' tool with complete results"""
 
 
 
 
 
 
 
 
62
 
63
  prompt = ChatPromptTemplate.from_messages([
64
  ("system", system_instructions),
65
+ ("human", "Execute the pipeline for document: {file_path}"),
66
+ ("human", "Pipeline to execute: {pipeline_json}"),
67
+ MessagesPlaceholder(variable_name="agent_scratchpad"),
 
68
  ])
69
 
70
  agent = create_tool_calling_agent(llm, tools, prompt)
 
80
  "input": f"Execute pipeline: {pipeline['pipeline_name']}",
81
  "file_path": file_path,
82
  "pipeline_json": json.dumps(pipeline, indent=2),
83
+ "agent_scratchpad": [] # Add this
84
  })
85
 
86
  return result
 
108
 
109
  tools = get_langchain_tools()
110
 
111
+ # Fixed prompt template with required placeholders
112
+ system_instructions = """You are MasterLLM, a precise document processing agent.
113
+ Execute the provided pipeline components in ORDER. For each component:
114
+ 1. Call the corresponding tool with exact parameters
115
+ 2. Wait for the result
116
+ 3. Move to next component
117
+ IMPORTANT:
118
+ - Follow the pipeline order strictly
119
+ - Use the file_path provided for all file-based operations
120
+ - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
121
+ - At the end, call 'finalize' tool with complete results"""
122
 
123
  prompt = ChatPromptTemplate.from_messages([
124
  ("system", system_instructions),
125
+ ("human", "Execute the pipeline for document: {file_path}"),
126
+ ("human", "Pipeline to execute: {pipeline_json}"),
127
+ MessagesPlaceholder(variable_name="agent_scratchpad"),
128
  ])
129
 
130
  agent = create_tool_calling_agent(llm, tools, prompt)
 
134
  verbose=True,
135
  max_iterations=15,
136
  handle_parsing_errors=True,
137
+ return_intermediate_steps=True,
138
  )
139
 
140
  # Yield initial status
 
144
  "executor": "bedrock"
145
  }
146
 
147
+ # Stream execution with proper inputs including agent_scratchpad
148
+ try:
149
+ # Initialize inputs
150
+ inputs = {
151
+ "input": f"Execute pipeline: {pipeline['pipeline_name']} for file: {file_path}",
152
+ "file_path": file_path,
153
+ "pipeline_json": json.dumps(pipeline, indent=2),
154
+ "agent_scratchpad": [], # Initialize empty scratchpad
155
+ }
156
+
157
+ step_count = 0
158
+
159
+ # Stream execution
160
+ for event in executor.stream(inputs):
161
+ # Handle different event types
162
+ if "agent" in event:
163
+ # Agent is thinking/acting
164
  step_count += 1
165
+ action = event.get("agent", {})
166
+ tool = action.get("tool", "thinking")
167
+
168
  yield {
169
  "type": "step",
170
  "step": step_count,
 
172
  "status": "executing",
173
  "executor": "bedrock"
174
  }
175
+
176
+ elif "actions" in event:
177
+ # Multiple actions
178
+ for action in event.get("actions", []):
179
+ step_count += 1
180
+ tool = getattr(action, "tool", "unknown")
181
+ yield {
182
+ "type": "step",
183
+ "step": step_count,
184
+ "tool": tool,
185
+ "status": "executing",
186
+ "executor": "bedrock"
187
+ }
188
+
189
+ elif "steps" in event:
190
+ # Steps completed
191
+ for step in event.get("steps", []):
192
+ observation = str(getattr(step, "observation", ""))[:500]
193
+ yield {
194
+ "type": "step",
195
+ "step": step_count,
196
+ "status": "completed",
197
+ "observation": observation,
198
+ "executor": "bedrock"
199
+ }
200
+
201
+ elif "output" in event:
202
+ # Final output
203
  yield {
204
+ "type": "final",
205
+ "data": event.get("output"),
 
 
206
  "executor": "bedrock"
207
  }
208
+ return
209
+
210
+ elif "intermediate_steps" in event:
211
+ # Intermediate steps (if enabled)
212
+ steps = event.get("intermediate_steps", [])
213
+ for i, (action, observation) in enumerate(steps):
214
+ yield {
215
+ "type": "step",
216
+ "step": i + 1,
217
+ "tool": getattr(action, "tool", str(action)),
218
+ "status": "completed",
219
+ "observation": str(observation)[:500],
220
+ "executor": "bedrock"
221
+ }
222
 
223
+ except Exception as stream_error:
224
+ # If streaming fails, try non-streaming execution
225
+ yield {
226
+ "type": "warning",
227
+ "message": f"Streaming failed, trying non-streaming: {str(stream_error)}",
228
+ "executor": "bedrock"
229
+ }
230
+
231
+ # Fallback to non-streaming execution
232
+ result = executor.invoke({
233
+ "input": f"Execute pipeline: {pipeline['pipeline_name']}",
234
+ "file_path": file_path,
235
+ "pipeline_json": json.dumps(pipeline, indent=2),
236
+ "agent_scratchpad": []
237
+ })
238
+
239
+ yield {
240
+ "type": "final",
241
+ "data": result.get("output"),
242
+ "executor": "bedrock"
243
+ }
244
 
245
  except Exception as e:
246
  yield {
 
249
  "executor": "bedrock"
250
  }
251
 
 
252
  # ========================
253
  # CREWAI EXECUTOR (FALLBACK)
254
  # ========================