ganesh-vilje commited on
Commit
40d8682
·
1 Parent(s): 43efa38

fix: Gracefully handle missing pipeline records in executor

Browse files

- Check if pipeline record exists before creating S3 final output
- Fall back to basic response if no pipeline record found
- This allows executor to work with or without pipeline manager
- Fixes 'Pipeline not found' error during execution

Files changed (1) hide show
  1. services/pipeline_executor.py +46 -47
services/pipeline_executor.py CHANGED
@@ -303,59 +303,58 @@ Execute ONLY the {len(components_list)} component(s) listed above, then provide
303
  if "Final Answer:" in assistant_message or "final answer" in assistant_message.lower():
304
  # Done!
305
  if tool_results:
306
- # Compile component results for pipeline manager
 
 
 
 
 
 
 
 
 
 
 
 
307
  if PIPELINE_MANAGER_AVAILABLE and session_id:
308
  try:
309
  pipeline_mgr = get_pipeline_manager()
310
- components_results = []
311
- for comp_name, comp_output in tool_results.items():
312
- components_results.append({
313
- "component_name": comp_name,
314
- "status": "completed",
315
- "result": comp_output,
316
- "success_message": f"Successfully executed {comp_name}"
317
- })
318
 
319
- # Create final output in S3
320
- final_output_data = pipeline_mgr.mark_pipeline_completed(
321
- execution_id=session_id,
322
- components_results=components_results,
323
- executor="bedrock"
324
- )
325
 
326
- structured_result = {
327
- "status": "completed",
328
- "components_executed": tool_results,
329
- "summary": {
330
- "total_tools_called": len(tool_results),
331
- "tools": list(tool_results.keys())
332
- },
333
- "final_output_url": final_output_data.get("final_output_url"),
334
- "final_output_expires_at": final_output_data.get("final_output_expires_at"),
335
- "last_node_output": final_output_data.get("last_node_output"),
336
- "workflow_status": final_output_data.get("workflow_status", "completed")
337
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
338
  except Exception as e:
339
- print(f"⚠️ Failed to create final output in S3: {e}")
340
- structured_result = {
341
- "status": "completed",
342
- "components_executed": tool_results,
343
- "summary": {
344
- "total_tools_called": len(tool_results),
345
- "tools": list(tool_results.keys())
346
- },
347
- "final_output": assistant_message
348
- }
349
- else:
350
- structured_result = {
351
- "status": "completed",
352
- "components_executed": tool_results,
353
- "summary": {
354
- "total_tools_called": len(tool_results),
355
- "tools": list(tool_results.keys())
356
- },
357
- "final_output": assistant_message
358
- }
359
 
360
  yield {
361
  "type": "final",
 
303
  if "Final Answer:" in assistant_message or "final answer" in assistant_message.lower():
304
  # Done!
305
  if tool_results:
306
+ # Try to compile component results for pipeline manager (V3 architecture)
307
+ # Note: This requires a pipeline record to exist in MongoDB
308
+ # If not found, we'll fall back to basic response without S3 storage
309
+ structured_result = {
310
+ "status": "completed",
311
+ "components_executed": tool_results,
312
+ "summary": {
313
+ "total_tools_called": len(tool_results),
314
+ "tools": list(tool_results.keys())
315
+ },
316
+ "final_output": assistant_message
317
+ }
318
+
319
  if PIPELINE_MANAGER_AVAILABLE and session_id:
320
  try:
321
  pipeline_mgr = get_pipeline_manager()
 
 
 
 
 
 
 
 
322
 
323
+ # Check if pipeline record exists first
324
+ pipeline_record = pipeline_mgr.get_pipeline(session_id)
 
 
 
 
325
 
326
+ if pipeline_record:
327
+ # Pipeline record exists, create S3 final output
328
+ components_results = []
329
+ for comp_name, comp_output in tool_results.items():
330
+ components_results.append({
331
+ "component_name": comp_name,
332
+ "status": "completed",
333
+ "result": comp_output,
334
+ "success_message": f"Successfully executed {comp_name}"
335
+ })
336
+
337
+ # Create final output in S3
338
+ final_output_data = pipeline_mgr.mark_pipeline_completed(
339
+ execution_id=session_id,
340
+ components_results=components_results,
341
+ executor="bedrock"
342
+ )
343
+
344
+ # Add S3 URLs to structured result
345
+ structured_result["final_output_url"] = final_output_data.get("final_output_url")
346
+ structured_result["final_output_expires_at"] = final_output_data.get("final_output_expires_at")
347
+ structured_result["last_node_output"] = final_output_data.get("last_node_output")
348
+ structured_result["workflow_status"] = final_output_data.get("workflow_status", "completed")
349
+ else:
350
+ # No pipeline record - this is OK for direct executor calls
351
+ # Just continue with basic response
352
+ print(f"ℹ️ No pipeline record found for session {session_id} - using basic response")
353
+
354
  except Exception as e:
355
+ # Pipeline manager failed - continue with basic response
356
+ print(f"⚠️ Failed to create S3 final output: {str(e)}")
357
+ # structured_result already has basic fields, just continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
358
 
359
  yield {
360
  "type": "final",