masterllm / services /pipeline_executor.py
stellar413's picture
Added fixed agent to agent communication
6df13ef
# services/pipeline_executor.py
"""
Pipeline Executor - Orchestrates multi-step document processing pipelines
Supports:
- Agentic Orchestration (Phase 3 - gated by feature flag)
- Bedrock LangChain execution (preferred legacy)
- CrewAI execution (fallback legacy)
- Dynamic tool chaining
- Component status tracking
Version: 3.0 with Safe Agentic Integration
"""
import json
import os
import time
import hashlib
from typing import Dict, Any, List, Generator, Optional
import logging
logger = logging.getLogger(__name__)
# ========================
# AGENTIC ORCHESTRATION GATING (Phase 3)
# ========================
def _should_use_agentic_orchestration(
pipeline: Dict[str, Any],
session_id: Optional[str] = None
) -> bool:
"""
Decision logic for agentic vs legacy execution.
Returns True only if:
1. Feature flag USE_AGENTIC_ORCHESTRATION=true
2. Session passes rollout percentage
3. Not in shadow mode (shadow uses legacy result)
Args:
pipeline: Pipeline configuration
session_id: Optional session identifier for rollout hashing
Returns:
True if agentic orchestration should be used
"""
# Check kill switch
if not os.getenv("USE_AGENTIC_ORCHESTRATION", "false").lower() == "true":
return False
# Shadow mode always uses legacy (agentic runs in parallel)
if os.getenv("AGENTIC_SHADOW_MODE", "false").lower() == "true":
return False
# Rollout percentage (0-100)
rollout_pct = int(os.getenv("AGENTIC_ROLLOUT_PERCENTAGE", "0"))
if rollout_pct <= 0:
return False # Disabled
if rollout_pct >= 100:
return True # Full rollout
# Percentage-based rollout using session hash
if session_id:
hash_val = int(hashlib.md5(session_id.encode()).hexdigest(), 16)
if (hash_val % 100) < rollout_pct:
return True
return False
# For Bedrock LangChain
try:
from langchain_aws import ChatBedrock
from langchain.agents import AgentExecutor, create_react_agent # Using ReAct instead of tool_calling
from langchain_core.prompts import PromptTemplate
from langchain import hub
from services.master_tools import get_master_tools as get_langchain_tools
BEDROCK_AVAILABLE = True
print("✅ Bedrock LangChain imports successful - BEDROCK_AVAILABLE = True")
except ImportError as e:
BEDROCK_AVAILABLE = False
print(f"❌ WARNING: LangChain Bedrock not available - {str(e)}")
print(" Pipeline execution will use CrewAI only")
except Exception as e:
BEDROCK_AVAILABLE = False
print(f"❌ ERROR: Bedrock import failed - {str(e)}")
# For CrewAI fallback
from services.agent_crewai import run_agent_streaming as crewai_run_streaming
# Pipeline Manager for V3 architecture
try:
from services.pipeline_manager import get_pipeline_manager
PIPELINE_MANAGER_AVAILABLE = True
except ImportError:
PIPELINE_MANAGER_AVAILABLE = False
print("⚠️ Pipeline Manager not available")
# ========================
# BEDROCK LANGCHAIN EXECUTOR
# ========================
def execute_pipeline_bedrock(
pipeline: Dict[str, Any],
file_path: str,
session_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Execute pipeline using Bedrock + LangChain (priority method)
"""
if not BEDROCK_AVAILABLE:
raise RuntimeError("Bedrock LangChain not available")
try:
llm = ChatBedrock(
model_id="mistral.mistral-large-2402-v1:0",
region_name=os.getenv("AWS_REGION", "us-east-1")
)
tools = get_langchain_tools()
system_instructions = """You are MasterLLM, a precise document processing agent.
Execute the provided pipeline components in ORDER. For each component:
1. Call the corresponding tool with exact parameters
2. Wait for the result
3. Move to next component
IMPORTANT:
- Follow the pipeline order strictly
- Use the file_path provided for all file-based operations
- For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
- At the end, call 'finalize' tool with complete results
Pipeline components will be in format:
{{
"tool_name": "extract_text",
"start_page": 1,
"end_page": 5,
"params": {{}}
}}"""
prompt = ChatPromptTemplate.from_messages([
("system", system_instructions),
("system", "File path: {file_path}"),
("system", "Pipeline to execute: {pipeline_json}"),
("system", "Session ID: {session_id}"),
("human", "Execute the pipeline. Process each component in order and finalize with complete JSON results."),
MessagesPlaceholder(variable_name="agent_scratchpad") # REQUIRED for LangChain agent
])
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=15,
handle_parsing_errors=True,
)
result = executor.invoke({
"input": f"Execute pipeline: {pipeline['pipeline_name']}",
"file_path": file_path,
"pipeline_json": json.dumps(pipeline, indent=2),
"session_id": session_id or "unknown"
})
return result
except Exception as e:
raise RuntimeError(f"Bedrock execution failed: {str(e)}")
def execute_pipeline_bedrock_streaming(
pipeline: Dict[str, Any],
file_path: str,
session_id: Optional[str] = None
) -> Generator[Dict[str, Any], None, None]:
"""
Execute pipeline using Bedrock with MANUAL tool calling loop (bypasses LangChain agents)
"""
if not BEDROCK_AVAILABLE:
raise RuntimeError("Bedrock LangChain not available")
try:
import re
import boto3
# Get Bedrock client directly
bedrock_runtime = boto3.client(
service_name='bedrock-runtime',
region_name=os.getenv("AWS_REGION", "us-east-1")
)
tools_dict = {tool.name: tool for tool in get_langchain_tools()}
# Build tool descriptions for prompt
tool_descriptions = []
for name, tool in tools_dict.items():
tool_descriptions.append(f"- {name}: {tool.description}")
tools_text = "\n".join(tool_descriptions)
tool_names = ", ".join(tools_dict.keys())
# Build list of EXACT components to execute
components_list = pipeline.get('components', [])
components_to_execute = "\n".join([
f"{i+1}. {comp.get('tool_name')} (pages {comp.get('start_page')}-{comp.get('end_page')})"
for i, comp in enumerate(components_list)
])
# Initial prompt with STRICT component enforcement
system_prompt = f"""You are MasterLLM, a document processing assistant.
You have access to these tools:
{tools_text}
To use a tool, you MUST write EXACTLY in this format:
Action: tool_name
Action Input: {{"param1": "value1", "param2": value2}}
After you write Action and Action Input, I will execute the tool and give you the Observation.
Then you can take another Action or provide your Final Answer.
CRITICAL RULES:
- Write "Action:" followed by the tool name
- Write "Action Input:" followed by valid JSON on the SAME line or next line
- ALWAYS include "file_path" parameter in your Action Input
- The file_path is: {file_path}
- For page parameters: start_page must be >= 1, end_page must be >= 1
- To process ALL pages, use end_page: 999 (NOT -1!)
- After seeing Observation, you can take another Action
- When done, write "Final Answer:" followed by summary
EXAMPLE of correct tool call:
Action: extract_text
Action Input: {{"file_path": "{file_path}", "start_page": 1, "end_page": 5}}
IMPORTANT: Every tool call MUST include the file_path parameter!
**CRITICAL: EXECUTE ONLY THESE COMPONENTS IN THIS EXACT ORDER:**
{components_to_execute}
**DO NOT execute any other tools or components not listed above!**
**DO NOT add extra steps like summarize, translate, or classify unless explicitly listed!**
**After completing the components listed above, provide your Final Answer immediately!**
File to process: {file_path}
Total components to execute: {len(components_list)}
Execute ONLY the {len(components_list)} component(s) listed above, then provide Final Answer."""
user_message = f"Execute the pipeline: {pipeline['pipeline_name']}"
conversation_history = []
tool_results = {}
has_called_tools = False
step_count = 0
# Set max_iterations based on number of components (with generous buffer)
num_components = len(pipeline.get('components', []))
# Formula: num_components * 2 (for tool call + processing) + 3 (init + final answer + safety)
max_iterations = max(5, (num_components * 2) + 3)
print(f"📋 Pipeline has {num_components} components, max_iterations={max_iterations}")
yield {
"type": "status",
"message": "Initializing Bedrock manual executor...",
"executor": "bedrock"
}
for iteration in range(max_iterations):
# Prepare messages (Bedrock converse API format)
messages = [{"role": "user", "content": [{"text": user_message}]}]
messages.extend(conversation_history)
# Call Bedrock directly using converse API
response = bedrock_runtime.converse(
modelId="mistral.mistral-large-2402-v1:0",
messages=messages,
system=[{"text": system_prompt}],
inferenceConfig={
"temperature": 0.0,
"maxTokens": 2048
}
)
# Get response text
assistant_message = response['output']['message']['content'][0]['text']
print(f"\n🤖 Mistral Response (Iteration {iteration + 1}):\n{assistant_message}\n")
# Add to conversation (Bedrock converse API format)
conversation_history.append({"role": "assistant", "content": [{"text": assistant_message}]})
# Parse for Action and Action Input FIRST (before checking Final Answer)
action_match = re.search(r'Action:\s*(\w+)', assistant_message)
action_input_match = re.search(r'Action Input:\s*(\{.+?\})', assistant_message, re.DOTALL)
if action_match and action_input_match:
tool_name = action_match.group(1)
action_input_str = action_input_match.group(1).strip()
try:
# Parse JSON input
tool_input = json.loads(action_input_str)
if tool_name in tools_dict:
step_count += 1
has_called_tools = True
yield {
"type": "step",
"step": step_count,
"tool": tool_name,
"status": "executing",
"executor": "bedrock",
"input": str(tool_input)[:200]
}
# Execute the component!
tool = tools_dict[tool_name]
observation = tool.invoke(tool_input)
tool_results[tool_name] = observation
# Generate component success message
success_msg = f"Successfully executed {tool_name.replace('_', ' ')}"
if isinstance(observation, dict):
if "pages" in observation:
success_msg = f"Successfully extracted from {observation.get('pages', 'N/A')} pages"
elif "tables" in observation:
success_msg = f"Successfully extracted {len(observation.get('tables', []))} tables"
yield {
"type": "component_status",
"step": step_count,
"component": tool_name,
"status": "completed",
"message": success_msg,
"observation": str(observation)[:500],
"executor": "bedrock"
}
# Add observation to conversation (Bedrock converse API format)
observation_message = f"Observation: {observation}"
conversation_history.append({"role": "user", "content": [{"text": observation_message}]})
# Continue to next iteration to get Mistral's next action
continue
else:
# Unknown tool
error_msg = f"Unknown tool: {tool_name}"
conversation_history.append({"role": "user", "content": [{"text": f"Error: {error_msg}"}]})
continue
except json.JSONDecodeError as e:
# Invalid JSON
error_msg = f"Invalid JSON in Action Input: {e}"
conversation_history.append({"role": "user", "content": [{"text": f"Error: {error_msg}"}]})
continue
# Check for Final Answer (only if no action was found)
if "Final Answer:" in assistant_message or "final answer" in assistant_message.lower():
# Done!
if tool_results:
# Try to compile component results for pipeline manager (V3 architecture)
# Note: This requires a pipeline record to exist in MongoDB
# If not found, we'll fall back to basic response without S3 storage
structured_result = {
"status": "completed",
"components_executed": tool_results,
"summary": {
"total_tools_called": len(tool_results),
"tools": list(tool_results.keys())
},
"final_output": assistant_message
}
if PIPELINE_MANAGER_AVAILABLE and session_id:
try:
pipeline_mgr = get_pipeline_manager()
# Check if pipeline record exists first
pipeline_record = pipeline_mgr.get_pipeline(session_id)
if pipeline_record:
# Pipeline record exists, create S3 final output
components_results = []
for comp_name, comp_output in tool_results.items():
components_results.append({
"component_name": comp_name,
"status": "completed",
"result": comp_output,
"success_message": f"Successfully executed {comp_name}"
})
# Create final output in S3
final_output_data = pipeline_mgr.mark_pipeline_completed(
execution_id=session_id,
components_results=components_results,
executor="bedrock"
)
# Add S3 URLs to structured result
structured_result["final_output_url"] = final_output_data.get("final_output_url")
structured_result["final_output_expires_at"] = final_output_data.get("final_output_expires_at")
structured_result["last_node_output"] = final_output_data.get("last_node_output")
structured_result["workflow_status"] = final_output_data.get("workflow_status", "completed")
else:
# No pipeline record - this is OK for direct executor calls
# Just continue with basic response
print(f"ℹ️ No pipeline record found for session {session_id} - using basic response")
except Exception as e:
# Pipeline manager failed - continue with basic response
print(f"⚠️ Failed to create S3 final output: {str(e)}")
# structured_result already has basic fields, just continue
yield {
"type": "final",
"data": structured_result,
"executor": "bedrock"
}
else:
yield {
"type": "error",
"error": "Bedrock completed but no components were called",
"executor": "bedrock"
}
return
# No action found - agent might be confused or done
if iteration > 0 and not has_called_tools:
# Agent isn't calling tools properly
yield {
"type": "error",
"error": "Bedrock didn't call tools in correct format. Falling back to CrewAI.",
"executor": "bedrock",
"debug_output": assistant_message[:500]
}
return
elif iteration > 0:
# Has called some tools but stopped - might be done
structured_result = {
"status": "completed",
"components_executed": tool_results,
"summary": {
"total_tools_called": len(tool_results),
"tools": list(tool_results.keys())
},
"final_output": assistant_message
}
yield {
"type": "final",
"data": structured_result,
"executor": "bedrock"
}
return
# Max iterations reached
if tool_results:
structured_result = {
"status": "completed",
"components_executed": tool_results,
"summary": {
"total_tools_called": len(tool_results),
"tools": list(tool_results.keys())
},
"final_output": "Max iterations reached"
}
yield {
"type": "final",
"data": structured_result,
"executor": "bedrock"
}
else:
yield {
"type": "error",
"error": "Max iterations reached without tool calls",
"executor": "bedrock"
}
except Exception as e:
yield {
"type": "error",
"error": str(e),
"executor": "bedrock"
}
# ========================
# CREWAI EXECUTOR (FALLBACK)
# ========================
def execute_pipeline_crewai_streaming(
pipeline: Dict[str, Any],
file_path: str,
session_id: Optional[str] = None
) -> Generator[Dict[str, Any], None, None]:
"""
Execute pipeline using CrewAI (fallback method)
"""
try:
# Yield initial status
yield {
"type": "status",
"message": "Using CrewAI executor (fallback)...",
"executor": "crewai"
}
# Use existing CrewAI streaming function
execution_goal = (
f"Execute the approved plan: {pipeline['pipeline_name']}. "
f"Process {len(pipeline.get('components', []))} components in order."
)
for event in crewai_run_streaming(
user_input=execution_goal,
session_file_path=file_path,
plan=pipeline,
chat_history=[]
):
# Pass through CrewAI events with executor tag
if isinstance(event, dict):
event["executor"] = "crewai"
yield event
except Exception as e:
yield {
"type": "error",
"error": str(e),
"executor": "crewai"
}
# ========================
# UNIFIED EXECUTOR WITH FALLBACK
# ========================
# ========================
# TOOL REGISTRY & DYNAMIC EXECUTION (UPDATED)
# ========================
# Import the master tools
try:
from services.master_tools import get_master_tools
from langchain_core.tools import StructuredTool
# Get all tools from master_tools
MASTER_TOOLS = get_master_tools()
# Create tool registry mapping
TOOL_REGISTRY = {}
for tool in MASTER_TOOLS:
if hasattr(tool, 'name'):
TOOL_REGISTRY[tool.name] = tool
elif hasattr(tool, '__name__'):
TOOL_REGISTRY[tool.__name__] = tool
print(f"✅ Loaded {len(TOOL_REGISTRY)} tools from master_tools.py")
except ImportError as e:
print(f"⚠️ Could not import master_tools: {e}")
TOOL_REGISTRY = {}
def get_tool_executor(tool_name: str) -> Optional[Any]:
"""Get tool from registry with intelligent name matching"""
# Direct match
if tool_name in TOOL_REGISTRY:
return TOOL_REGISTRY[tool_name]
# Try variations
variations = [
tool_name,
f"{tool_name}_tool",
tool_name.replace("_", ""),
tool_name + "_tool"
]
for variation in variations:
if variation in TOOL_REGISTRY:
return TOOL_REGISTRY[variation]
# Check partial matches
for registered_name, tool in TOOL_REGISTRY.items():
if tool_name in registered_name or registered_name in tool_name:
return tool
return None
# ========================
# UNIFIED EXECUTOR WITH FALLBACK (UPDATED)
# ========================
# Update the execute_pipeline_streaming function:
def execute_pipeline_streaming(
pipeline: Dict[str, Any],
file_path: str,
session_id: Optional[str] = None,
prefer_bedrock: bool = True
) -> Generator[Dict[str, Any], None, None]:
"""
Execute pipeline with agentic orchestration (gated) or legacy fallback.
PHASE 1: ENTRY POINT GATING
- If agentic enabled → route to agentic wrapper
- On ANY failure → HARD FALLBACK to legacy path
- Legacy path remains COMPLETELY UNCHANGED
Args:
pipeline: Pipeline configuration
file_path: Path to file being processed
session_id: Optional session identifier
prefer_bedrock: Use Bedrock over CrewAI in legacy path
Yields:
Pipeline execution events
"""
# ========================================
# AGENTIC ORCHESTRATION GATE (Phase 3)
# ========================================
if _should_use_agentic_orchestration(pipeline, session_id):
logger.info(f"Routing to agentic orchestration for session {session_id}")
try:
# Import wrapper (isolated - no agent internals exposed)
from services.agentic_orchestrator_wrapper import execute_with_agentic_orchestration
from services.agentic_integration_logger import log_agentic_attempt
# Log decision
log_agentic_attempt(
session_id=session_id or "unknown",
pipeline=pipeline,
decision="agentic_enabled"
)
# Execute via agentic wrapper
# If this succeeds, return early (skip legacy path)
for event in execute_with_agentic_orchestration(pipeline, file_path, session_id):
yield event
logger.info(f"Agentic orchestration completed for session {session_id}")
return # Success - done via agentic path
except Exception as e:
# HARD FALLBACK: Any exception → continue to legacy path below
logger.error(f"Agentic orchestration failed, falling back to legacy: {e}")
from services.agentic_integration_logger import log_fallback_trigger
log_fallback_trigger(
session_id=session_id or "unknown",
reason="Exception in agentic execution",
exception=e
)
# Yield info event about fallback
yield {
"type": "info",
"message": f"Agentic execution failed, using legacy pipeline",
"executor": "fallback"
}
# Continue to legacy path below (no return)
# ========================================
# LEGACY PATH (COMPLETELY UNCHANGED)
# ========================================
components_executed = []
final_output = None
executor_used = "unknown"
fallback_triggered = False
bedrock_error = None
# Initialize pipeline info
pipeline_id = pipeline.get("pipeline_id")
pipeline_name = pipeline.get("pipeline_name", "Unnamed Pipeline")
# FIX: Get steps from either 'components' or 'pipeline_steps'
steps = []
if "pipeline_steps" in pipeline:
steps = pipeline.get("pipeline_steps", [])
elif "components" in pipeline:
steps = pipeline.get("components", [])
# Also update the pipeline to have both for consistency
pipeline["pipeline_steps"] = steps
if not steps:
error_msg = f"No steps/components found in pipeline: {pipeline_name}"
yield {
"type": "error",
"error": error_msg,
"data": {
"pipeline_id": pipeline_id,
"pipeline_name": pipeline_name,
"status": "failed",
"components_executed": [],
"error": error_msg
}
}
return
# Check if tools are available
if not TOOL_REGISTRY:
error_msg = "No tools available. master_tools.py not loaded correctly."
yield {
"type": "error",
"error": error_msg,
"data": {
"pipeline_id": pipeline_id,
"pipeline_name": pipeline_name,
"status": "failed",
"components_executed": [],
"error": error_msg
}
}
return
print(f"🏆 Executing pipeline '{pipeline_name}' with {len(steps)} steps")
print(f" Steps format: {[s.get('tool_name', s.get('tool', 'unknown')) for s in steps]}")
yield {
"type": "info",
"message": f"Starting pipeline: {pipeline_name} with {len(steps)} steps",
"executor": "initializing"
}
# Try Bedrock first (priority)
if prefer_bedrock and BEDROCK_AVAILABLE:
try:
print(f"🏆 Executing pipeline with Bedrock: {pipeline_name}")
yield {
"type": "info",
"message": "Attempting execution with Bedrock LangChain...",
"executor": "bedrock"
}
# Global components_executed list for step-by-step execution
# NOTE: This needs to be declared as nonlocal or passed to helper functions
components_executed = [] # Reset for Bedrock execution
# Execute step by step with Bedrock
for step_num, step_def in enumerate(steps, 1):
tool_name = step_def.get("tool_name", step_def.get("tool", "unknown"))
yield {
"type": "step",
"step": step_num,
"tool": tool_name,
"status": "executing",
"executor": "bedrock"
}
try:
# Execute the step using master_tools
result = _execute_step_with_master_tool(
step_def=step_def,
file_path=file_path,
step_num=step_num,
total_steps=len(steps),
session_id=session_id,
prefer_bedrock=True,
previous_results=components_executed # Pass previous results
)
executor_used = "bedrock"
# Create component result object
component_result = {
"tool_name": tool_name,
"tool": tool_name, # For compatibility
**step_def, # Include all step definition fields
"result": result.get("output"),
"status": "completed",
"executor": executor_used,
"execution_time": result.get("execution_time"),
"step_number": step_num,
"success": True,
"tool_version": result.get("tool_version", "1.0")
}
components_executed.append(component_result)
yield {
"type": "step",
"step": step_num,
"tool": tool_name,
"status": "completed",
"observation": result.get("output"),
"input": step_def,
"executor": executor_used
}
# Update file_path for next step if needed
file_path = _update_file_path(file_path, result)
except Exception as step_error:
print(f"❌ Step {step_num} failed with Bedrock: {str(step_error)}")
# Create failed component result
component_result = {
"tool_name": tool_name,
"tool": tool_name,
**step_def,
"result": {"error": str(step_error)},
"status": "failed",
"error": str(step_error),
"step_number": step_num,
"success": False
}
components_executed.append(component_result)
bedrock_error = str(step_error)
yield {
"type": "error",
"step": step_num,
"tool": tool_name,
"error": str(step_error),
"message": f"Step {step_num} failed with Bedrock"
}
fallback_triggered = True
break
# If we completed all steps with Bedrock
if not fallback_triggered and len(components_executed) == len(steps):
final_output = _build_final_output(pipeline, components_executed, executor_used, "completed")
yield {
"type": "final",
"data": final_output,
"executor": executor_used
}
print(f"✅ Bedrock execution completed: {pipeline_name}")
return
except Exception as bedrock_exception:
print(f"❌ Bedrock execution exception: {str(bedrock_exception)}")
bedrock_error = str(bedrock_exception)
fallback_triggered = True
# If Bedrock failed or wasn't preferred, try CrewAI
if fallback_triggered or not prefer_bedrock:
print(f"🔄 Executing pipeline with CrewAI: {pipeline_name}")
if fallback_triggered and bedrock_error:
yield {
"type": "info",
"message": f"Bedrock failed: {bedrock_error}. Switching to CrewAI...",
"executor": "fallback"
}
else:
yield {
"type": "info",
"message": "Using CrewAI execution...",
"executor": "crewai"
}
# Start from where Bedrock left off, or from beginning
start_step = len(components_executed) + 1 if components_executed else 1
for step_num in range(start_step, len(steps) + 1):
step_def = steps[step_num - 1]
tool_name = step_def.get("tool_name", step_def.get("tool", "unknown"))
yield {
"type": "step",
"step": step_num,
"tool": tool_name,
"status": "executing",
"executor": "crewai"
}
try:
# Execute the step using master_tools
result = _execute_step_with_master_tool(
step_def=step_def,
file_path=file_path,
step_num=step_num,
total_steps=len(steps),
session_id=session_id,
prefer_bedrock=False,
previous_results=components_executed
)
executor_used = "crewai"
# Create component result object
component_result = {
"tool_name": tool_name,
"tool": tool_name,
**step_def,
"result": result.get("output"),
"status": "completed",
"executor": executor_used,
"execution_time": result.get("execution_time"),
"step_number": step_num,
"success": True,
"tool_version": result.get("tool_version", "1.0")
}
# Add or replace in components_executed
if len(components_executed) >= step_num:
components_executed[step_num - 1] = component_result
else:
components_executed.append(component_result)
yield {
"type": "step",
"step": step_num,
"tool": tool_name,
"status": "completed",
"observation": result.get("output"),
"input": step_def,
"executor": executor_used
}
# Update file_path for next step if needed
file_path = _update_file_path(file_path, result)
except Exception as step_error:
print(f"❌ Step {step_num} failed with CrewAI: {str(step_error)}")
# Create failed component result
component_result = {
"tool_name": tool_name,
"tool": tool_name,
**step_def,
"result": {"error": str(step_error)},
"status": "failed",
"error": str(step_error),
"step_number": step_num,
"success": False
}
# Add or replace in components_executed
if len(components_executed) >= step_num:
components_executed[step_num - 1] = component_result
else:
components_executed.append(component_result)
yield {
"type": "error",
"step": step_num,
"tool": tool_name,
"error": str(step_error),
"message": f"Step {step_num} failed with CrewAI"
}
break
# Check if we completed all steps
completed_steps = [c for c in components_executed if c.get("status") == "completed"]
if len(completed_steps) == len(steps):
# All steps completed
final_output = _build_final_output(pipeline, components_executed, executor_used, "completed")
yield {
"type": "final",
"data": final_output,
"executor": executor_used
}
print(f"✅ CrewAI execution completed: {pipeline_name}")
else:
# Partial completion or failure
final_output = _build_final_output(pipeline, components_executed, executor_used, "partial")
final_output["error"] = f"Pipeline execution incomplete. Completed {len(completed_steps)} of {len(steps)} steps."
yield {
"type": "error",
"error": "Pipeline execution incomplete",
"data": final_output
}
print(f"⚠️ CrewAI execution incomplete for: {pipeline_name}")
# ========================
# DYNAMIC STEP EXECUTION WITH MASTER_TOOLS
# ========================
def _execute_step_with_master_tool(
step_def: Dict[str, Any],
file_path: str,
step_num: int,
total_steps: int,
session_id: Optional[str] = None,
prefer_bedrock: bool = True,
previous_results: List[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Execute a pipeline step using master_tools.
FIXED: Handle step_def with either 'tool_name' or 'tool' field
"""
import time
import inspect
# FIX: Get tool name from either 'tool_name' or 'tool' field
tool_name = step_def.get("tool_name", step_def.get("tool", "unknown"))
start_time = time.time()
print(f" 🔨 Executing step {step_num}/{total_steps}: {tool_name}")
print(f" Step definition: {step_def}")
# Get tool from registry
tool = get_tool_executor(tool_name)
if not tool:
error_msg = f"Tool '{tool_name}' not found in registry. Available tools: {list(TOOL_REGISTRY.keys())}"
print(f" ❌ {error_msg}")
raise ValueError(error_msg)
# Prepare arguments
args = {}
# For StructuredTool (LangChain tools)
if hasattr(tool, 'args_schema') and hasattr(tool, 'invoke'):
# Get the args schema
args_schema = tool.args_schema
# Build arguments from step_def
for field_name, field in args_schema.__fields__.items():
# Check if parameter is in step_def
if field_name in step_def:
args[field_name] = step_def[field_name]
# Special handling for file_path
elif field_name == "file_path" and file_path:
args[field_name] = file_path
# Special handling for session_id
elif field_name == "session_id" and session_id:
args[field_name] = session_id
# Handle text parameter if not provided but we have previous output
elif field_name == "text" and field_name not in step_def and previous_results:
# Try to get text from previous step's output
if step_num > 1 and len(previous_results) >= step_num - 1:
prev_result = previous_results[step_num - 2].get("result")
if isinstance(prev_result, dict) and "text" in prev_result:
args["text"] = prev_result["text"]
print(f" 📝 Using text from previous step: {args['text'][:100]}...")
elif isinstance(prev_result, str):
args["text"] = prev_result
print(f" 📝 Using text from previous step: {args['text'][:100]}...")
try:
# Execute the tool
print(f" 🚀 Invoking tool {tool_name} with args: {args}")
output = tool.invoke(args)
execution_time = time.time() - start_time
print(f" ✅ Step {step_num} completed in {execution_time:.2f}s")
return {
"output": output,
"executor": "bedrock" if prefer_bedrock else "crewai",
"execution_time": execution_time,
"tool_version": "master_tools_1.0",
"args_used": list(args.keys())
}
except Exception as e:
# Try with minimal arguments
print(f"⚠️ Tool {tool_name} failed with full args, trying minimal: {e}")
# Try with just file_path if available
if file_path and "file_path" in args_schema.__fields__:
minimal_args = {"file_path": file_path}
try:
output = tool.invoke(minimal_args)
execution_time = time.time() - start_time
return {
"output": output,
"executor": "bedrock" if prefer_bedrock else "crewai",
"execution_time": execution_time,
"tool_version": "master_tools_1.0",
"args_used": list(minimal_args.keys()),
"warning": "Used minimal arguments"
}
except Exception as inner_error:
raise RuntimeError(f"Tool '{tool_name}' failed with minimal args: {inner_error}")
# For regular Python functions
elif callable(tool):
try:
# Get function signature
sig = inspect.signature(tool)
# Build arguments based on signature
call_args = {}
for param_name, param in sig.parameters.items():
# Try step_def first
if param_name in step_def:
call_args[param_name] = step_def[param_name]
# Special handling for file_path
elif param_name == "file_path" and file_path:
call_args[param_name] = file_path
# Special handling for session_id
elif param_name == "session_id" and session_id:
call_args[param_name] = session_id
# Handle text parameter
elif param_name == "text" and param_name not in step_def and previous_results:
# Try to get text from previous step
if step_num > 1 and len(previous_results) >= step_num - 1:
prev_result = previous_results[step_num - 2].get("result")
if isinstance(prev_result, dict) and "text" in prev_result:
call_args["text"] = prev_result["text"]
elif isinstance(prev_result, str):
call_args["text"] = prev_result
# Execute the function
print(f" 🚀 Calling function {tool_name} with args: {call_args}")
output = tool(**call_args)
execution_time = time.time() - start_time
print(f" ✅ Step {step_num} completed in {execution_time:.2f}s")
return {
"output": output,
"executor": "bedrock" if prefer_bedrock else "crewai",
"execution_time": execution_time,
"tool_version": "function_1.0",
"args_used": list(call_args.keys())
}
except Exception as e:
raise RuntimeError(f"Failed to execute function {tool_name}: {e}")
else:
raise ValueError(f"Tool '{tool_name}' is not callable or a valid StructuredTool")
def _update_file_path(current_file_path: str, result: Dict[str, Any]) -> str:
"""
Update file path based on tool result.
Some tools might generate new files.
"""
output = result.get("output")
if isinstance(output, dict):
# Check for file references in output
for key in ["file_path", "output_file", "new_file", "generated_file"]:
if key in output and isinstance(output[key], str):
return output[key]
return current_file_path
# ========================
# HELPER FUNCTIONS
# ========================
def _build_final_output(
pipeline: Dict[str, Any],
components_executed: List[Dict[str, Any]],
executor_used: str,
status: str
) -> Dict[str, Any]:
"""
Build final output with components_executed array.
FIXED: Handle both component formats
"""
# Get steps count from pipeline
steps = pipeline.get("pipeline_steps", pipeline.get("components", []))
# Find the finalize step result if present
final_result = None
for component in components_executed:
if component.get("tool_name") == "finalize" or component.get("tool") == "finalize":
final_result = component.get("result")
break
# Count completed steps
completed_steps = len([c for c in components_executed if c.get("status") == "completed"])
final_output = {
"pipeline_id": pipeline.get("pipeline_id"),
"pipeline_name": pipeline.get("pipeline_name"),
"status": status,
"components_executed": components_executed,
"executor": executor_used,
"summary": f"Pipeline execution {status} with {executor_used}",
"total_steps": len(steps),
"completed_steps": completed_steps,
"final_result": final_result
}
# Extract text for user-facing output
if final_result:
# Use finalize tool's output
final_output["text"] = final_result
elif components_executed:
# Find last completed component with text
for component in reversed(components_executed):
if component.get("status") == "completed" and component.get("result"):
result = component["result"]
if isinstance(result, str):
final_output["text"] = result
break
elif isinstance(result, dict):
for field in ["text", "summary", "content", "translation", "output"]:
if field in result and isinstance(result[field], str):
final_output["text"] = result[field]
break
# If no text field found but dict has string values
if "text" not in final_output:
for key, value in result.items():
if isinstance(value, str) and len(value) > 10:
final_output["text"] = value
break
return final_output
# ========================
# NON-STREAMING EXECUTOR
# ========================
def execute_pipeline(
pipeline: Dict[str, Any],
file_path: str,
session_id: Optional[str] = None,
prefer_bedrock: bool = True
) -> Dict[str, Any]:
"""
Execute pipeline (non-streaming) with fallback
"""
final_result = None
for event in execute_pipeline_streaming(pipeline, file_path, session_id, prefer_bedrock):
if event.get("type") == "final":
final_result = event.get("data")
break
elif event.get("type") == "error" and event.get("data"):
final_result = event.get("data")
break
if final_result is None:
final_result = {
"pipeline_id": pipeline.get("pipeline_id"),
"pipeline_name": pipeline.get("pipeline_name"),
"status": "failed",
"components_executed": [],
"error": "Pipeline execution completed without final result"
}
return final_result
if __name__ == "__main__":
# Test
test_pipeline = {
"pipeline_name": "test-extraction",
"components": [
{
"tool_name": "extract_text",
"start_page": 1,
"end_page": 1,
"params": {}
}
],
"_generator": "test"
}
test_file = "test.pdf"
print("Testing streaming execution...")
for event in execute_pipeline_streaming(test_pipeline, test_file):
print(f"Event: {event}")