# services/pipeline_executor.py """ Pipeline Executor - Orchestrates multi-step document processing pipelines Supports: - Agentic Orchestration (Phase 3 - gated by feature flag) - Bedrock LangChain execution (preferred legacy) - CrewAI execution (fallback legacy) - Dynamic tool chaining - Component status tracking Version: 3.0 with Safe Agentic Integration """ import json import os import time import hashlib from typing import Dict, Any, List, Generator, Optional import logging logger = logging.getLogger(__name__) # ======================== # AGENTIC ORCHESTRATION GATING (Phase 3) # ======================== def _should_use_agentic_orchestration( pipeline: Dict[str, Any], session_id: Optional[str] = None ) -> bool: """ Decision logic for agentic vs legacy execution. Returns True only if: 1. Feature flag USE_AGENTIC_ORCHESTRATION=true 2. Session passes rollout percentage 3. Not in shadow mode (shadow uses legacy result) Args: pipeline: Pipeline configuration session_id: Optional session identifier for rollout hashing Returns: True if agentic orchestration should be used """ # Check kill switch if not os.getenv("USE_AGENTIC_ORCHESTRATION", "false").lower() == "true": return False # Shadow mode always uses legacy (agentic runs in parallel) if os.getenv("AGENTIC_SHADOW_MODE", "false").lower() == "true": return False # Rollout percentage (0-100) rollout_pct = int(os.getenv("AGENTIC_ROLLOUT_PERCENTAGE", "0")) if rollout_pct <= 0: return False # Disabled if rollout_pct >= 100: return True # Full rollout # Percentage-based rollout using session hash if session_id: hash_val = int(hashlib.md5(session_id.encode()).hexdigest(), 16) if (hash_val % 100) < rollout_pct: return True return False # For Bedrock LangChain try: from langchain_aws import ChatBedrock from langchain.agents import AgentExecutor, create_react_agent # Using ReAct instead of tool_calling from langchain_core.prompts import PromptTemplate from langchain import hub from services.master_tools import get_master_tools as get_langchain_tools BEDROCK_AVAILABLE = True print("✅ Bedrock LangChain imports successful - BEDROCK_AVAILABLE = True") except ImportError as e: BEDROCK_AVAILABLE = False print(f"❌ WARNING: LangChain Bedrock not available - {str(e)}") print(" Pipeline execution will use CrewAI only") except Exception as e: BEDROCK_AVAILABLE = False print(f"❌ ERROR: Bedrock import failed - {str(e)}") # For CrewAI fallback from services.agent_crewai import run_agent_streaming as crewai_run_streaming # Pipeline Manager for V3 architecture try: from services.pipeline_manager import get_pipeline_manager PIPELINE_MANAGER_AVAILABLE = True except ImportError: PIPELINE_MANAGER_AVAILABLE = False print("⚠️ Pipeline Manager not available") # ======================== # BEDROCK LANGCHAIN EXECUTOR # ======================== def execute_pipeline_bedrock( pipeline: Dict[str, Any], file_path: str, session_id: Optional[str] = None ) -> Dict[str, Any]: """ Execute pipeline using Bedrock + LangChain (priority method) """ if not BEDROCK_AVAILABLE: raise RuntimeError("Bedrock LangChain not available") try: llm = ChatBedrock( model_id="mistral.mistral-large-2402-v1:0", region_name=os.getenv("AWS_REGION", "us-east-1") ) tools = get_langchain_tools() system_instructions = """You are MasterLLM, a precise document processing agent. Execute the provided pipeline components in ORDER. For each component: 1. Call the corresponding tool with exact parameters 2. Wait for the result 3. Move to next component IMPORTANT: - Follow the pipeline order strictly - Use the file_path provided for all file-based operations - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps - At the end, call 'finalize' tool with complete results Pipeline components will be in format: {{ "tool_name": "extract_text", "start_page": 1, "end_page": 5, "params": {{}} }}""" prompt = ChatPromptTemplate.from_messages([ ("system", system_instructions), ("system", "File path: {file_path}"), ("system", "Pipeline to execute: {pipeline_json}"), ("system", "Session ID: {session_id}"), ("human", "Execute the pipeline. Process each component in order and finalize with complete JSON results."), MessagesPlaceholder(variable_name="agent_scratchpad") # REQUIRED for LangChain agent ]) agent = create_tool_calling_agent(llm, tools, prompt) executor = AgentExecutor( agent=agent, tools=tools, verbose=True, max_iterations=15, handle_parsing_errors=True, ) result = executor.invoke({ "input": f"Execute pipeline: {pipeline['pipeline_name']}", "file_path": file_path, "pipeline_json": json.dumps(pipeline, indent=2), "session_id": session_id or "unknown" }) return result except Exception as e: raise RuntimeError(f"Bedrock execution failed: {str(e)}") def execute_pipeline_bedrock_streaming( pipeline: Dict[str, Any], file_path: str, session_id: Optional[str] = None ) -> Generator[Dict[str, Any], None, None]: """ Execute pipeline using Bedrock with MANUAL tool calling loop (bypasses LangChain agents) """ if not BEDROCK_AVAILABLE: raise RuntimeError("Bedrock LangChain not available") try: import re import boto3 # Get Bedrock client directly bedrock_runtime = boto3.client( service_name='bedrock-runtime', region_name=os.getenv("AWS_REGION", "us-east-1") ) tools_dict = {tool.name: tool for tool in get_langchain_tools()} # Build tool descriptions for prompt tool_descriptions = [] for name, tool in tools_dict.items(): tool_descriptions.append(f"- {name}: {tool.description}") tools_text = "\n".join(tool_descriptions) tool_names = ", ".join(tools_dict.keys()) # Build list of EXACT components to execute components_list = pipeline.get('components', []) components_to_execute = "\n".join([ f"{i+1}. {comp.get('tool_name')} (pages {comp.get('start_page')}-{comp.get('end_page')})" for i, comp in enumerate(components_list) ]) # Initial prompt with STRICT component enforcement system_prompt = f"""You are MasterLLM, a document processing assistant. You have access to these tools: {tools_text} To use a tool, you MUST write EXACTLY in this format: Action: tool_name Action Input: {{"param1": "value1", "param2": value2}} After you write Action and Action Input, I will execute the tool and give you the Observation. Then you can take another Action or provide your Final Answer. CRITICAL RULES: - Write "Action:" followed by the tool name - Write "Action Input:" followed by valid JSON on the SAME line or next line - ALWAYS include "file_path" parameter in your Action Input - The file_path is: {file_path} - For page parameters: start_page must be >= 1, end_page must be >= 1 - To process ALL pages, use end_page: 999 (NOT -1!) - After seeing Observation, you can take another Action - When done, write "Final Answer:" followed by summary EXAMPLE of correct tool call: Action: extract_text Action Input: {{"file_path": "{file_path}", "start_page": 1, "end_page": 5}} IMPORTANT: Every tool call MUST include the file_path parameter! **CRITICAL: EXECUTE ONLY THESE COMPONENTS IN THIS EXACT ORDER:** {components_to_execute} **DO NOT execute any other tools or components not listed above!** **DO NOT add extra steps like summarize, translate, or classify unless explicitly listed!** **After completing the components listed above, provide your Final Answer immediately!** File to process: {file_path} Total components to execute: {len(components_list)} Execute ONLY the {len(components_list)} component(s) listed above, then provide Final Answer.""" user_message = f"Execute the pipeline: {pipeline['pipeline_name']}" conversation_history = [] tool_results = {} has_called_tools = False step_count = 0 # Set max_iterations based on number of components (with generous buffer) num_components = len(pipeline.get('components', [])) # Formula: num_components * 2 (for tool call + processing) + 3 (init + final answer + safety) max_iterations = max(5, (num_components * 2) + 3) print(f"📋 Pipeline has {num_components} components, max_iterations={max_iterations}") yield { "type": "status", "message": "Initializing Bedrock manual executor...", "executor": "bedrock" } for iteration in range(max_iterations): # Prepare messages (Bedrock converse API format) messages = [{"role": "user", "content": [{"text": user_message}]}] messages.extend(conversation_history) # Call Bedrock directly using converse API response = bedrock_runtime.converse( modelId="mistral.mistral-large-2402-v1:0", messages=messages, system=[{"text": system_prompt}], inferenceConfig={ "temperature": 0.0, "maxTokens": 2048 } ) # Get response text assistant_message = response['output']['message']['content'][0]['text'] print(f"\n🤖 Mistral Response (Iteration {iteration + 1}):\n{assistant_message}\n") # Add to conversation (Bedrock converse API format) conversation_history.append({"role": "assistant", "content": [{"text": assistant_message}]}) # Parse for Action and Action Input FIRST (before checking Final Answer) action_match = re.search(r'Action:\s*(\w+)', assistant_message) action_input_match = re.search(r'Action Input:\s*(\{.+?\})', assistant_message, re.DOTALL) if action_match and action_input_match: tool_name = action_match.group(1) action_input_str = action_input_match.group(1).strip() try: # Parse JSON input tool_input = json.loads(action_input_str) if tool_name in tools_dict: step_count += 1 has_called_tools = True yield { "type": "step", "step": step_count, "tool": tool_name, "status": "executing", "executor": "bedrock", "input": str(tool_input)[:200] } # Execute the component! tool = tools_dict[tool_name] observation = tool.invoke(tool_input) tool_results[tool_name] = observation # Generate component success message success_msg = f"Successfully executed {tool_name.replace('_', ' ')}" if isinstance(observation, dict): if "pages" in observation: success_msg = f"Successfully extracted from {observation.get('pages', 'N/A')} pages" elif "tables" in observation: success_msg = f"Successfully extracted {len(observation.get('tables', []))} tables" yield { "type": "component_status", "step": step_count, "component": tool_name, "status": "completed", "message": success_msg, "observation": str(observation)[:500], "executor": "bedrock" } # Add observation to conversation (Bedrock converse API format) observation_message = f"Observation: {observation}" conversation_history.append({"role": "user", "content": [{"text": observation_message}]}) # Continue to next iteration to get Mistral's next action continue else: # Unknown tool error_msg = f"Unknown tool: {tool_name}" conversation_history.append({"role": "user", "content": [{"text": f"Error: {error_msg}"}]}) continue except json.JSONDecodeError as e: # Invalid JSON error_msg = f"Invalid JSON in Action Input: {e}" conversation_history.append({"role": "user", "content": [{"text": f"Error: {error_msg}"}]}) continue # Check for Final Answer (only if no action was found) if "Final Answer:" in assistant_message or "final answer" in assistant_message.lower(): # Done! if tool_results: # Try to compile component results for pipeline manager (V3 architecture) # Note: This requires a pipeline record to exist in MongoDB # If not found, we'll fall back to basic response without S3 storage structured_result = { "status": "completed", "components_executed": tool_results, "summary": { "total_tools_called": len(tool_results), "tools": list(tool_results.keys()) }, "final_output": assistant_message } if PIPELINE_MANAGER_AVAILABLE and session_id: try: pipeline_mgr = get_pipeline_manager() # Check if pipeline record exists first pipeline_record = pipeline_mgr.get_pipeline(session_id) if pipeline_record: # Pipeline record exists, create S3 final output components_results = [] for comp_name, comp_output in tool_results.items(): components_results.append({ "component_name": comp_name, "status": "completed", "result": comp_output, "success_message": f"Successfully executed {comp_name}" }) # Create final output in S3 final_output_data = pipeline_mgr.mark_pipeline_completed( execution_id=session_id, components_results=components_results, executor="bedrock" ) # Add S3 URLs to structured result structured_result["final_output_url"] = final_output_data.get("final_output_url") structured_result["final_output_expires_at"] = final_output_data.get("final_output_expires_at") structured_result["last_node_output"] = final_output_data.get("last_node_output") structured_result["workflow_status"] = final_output_data.get("workflow_status", "completed") else: # No pipeline record - this is OK for direct executor calls # Just continue with basic response print(f"ℹ️ No pipeline record found for session {session_id} - using basic response") except Exception as e: # Pipeline manager failed - continue with basic response print(f"⚠️ Failed to create S3 final output: {str(e)}") # structured_result already has basic fields, just continue yield { "type": "final", "data": structured_result, "executor": "bedrock" } else: yield { "type": "error", "error": "Bedrock completed but no components were called", "executor": "bedrock" } return # No action found - agent might be confused or done if iteration > 0 and not has_called_tools: # Agent isn't calling tools properly yield { "type": "error", "error": "Bedrock didn't call tools in correct format. Falling back to CrewAI.", "executor": "bedrock", "debug_output": assistant_message[:500] } return elif iteration > 0: # Has called some tools but stopped - might be done structured_result = { "status": "completed", "components_executed": tool_results, "summary": { "total_tools_called": len(tool_results), "tools": list(tool_results.keys()) }, "final_output": assistant_message } yield { "type": "final", "data": structured_result, "executor": "bedrock" } return # Max iterations reached if tool_results: structured_result = { "status": "completed", "components_executed": tool_results, "summary": { "total_tools_called": len(tool_results), "tools": list(tool_results.keys()) }, "final_output": "Max iterations reached" } yield { "type": "final", "data": structured_result, "executor": "bedrock" } else: yield { "type": "error", "error": "Max iterations reached without tool calls", "executor": "bedrock" } except Exception as e: yield { "type": "error", "error": str(e), "executor": "bedrock" } # ======================== # CREWAI EXECUTOR (FALLBACK) # ======================== def execute_pipeline_crewai_streaming( pipeline: Dict[str, Any], file_path: str, session_id: Optional[str] = None ) -> Generator[Dict[str, Any], None, None]: """ Execute pipeline using CrewAI (fallback method) """ try: # Yield initial status yield { "type": "status", "message": "Using CrewAI executor (fallback)...", "executor": "crewai" } # Use existing CrewAI streaming function execution_goal = ( f"Execute the approved plan: {pipeline['pipeline_name']}. " f"Process {len(pipeline.get('components', []))} components in order." ) for event in crewai_run_streaming( user_input=execution_goal, session_file_path=file_path, plan=pipeline, chat_history=[] ): # Pass through CrewAI events with executor tag if isinstance(event, dict): event["executor"] = "crewai" yield event except Exception as e: yield { "type": "error", "error": str(e), "executor": "crewai" } # ======================== # UNIFIED EXECUTOR WITH FALLBACK # ======================== # ======================== # TOOL REGISTRY & DYNAMIC EXECUTION (UPDATED) # ======================== # Import the master tools try: from services.master_tools import get_master_tools from langchain_core.tools import StructuredTool # Get all tools from master_tools MASTER_TOOLS = get_master_tools() # Create tool registry mapping TOOL_REGISTRY = {} for tool in MASTER_TOOLS: if hasattr(tool, 'name'): TOOL_REGISTRY[tool.name] = tool elif hasattr(tool, '__name__'): TOOL_REGISTRY[tool.__name__] = tool print(f"✅ Loaded {len(TOOL_REGISTRY)} tools from master_tools.py") except ImportError as e: print(f"⚠️ Could not import master_tools: {e}") TOOL_REGISTRY = {} def get_tool_executor(tool_name: str) -> Optional[Any]: """Get tool from registry with intelligent name matching""" # Direct match if tool_name in TOOL_REGISTRY: return TOOL_REGISTRY[tool_name] # Try variations variations = [ tool_name, f"{tool_name}_tool", tool_name.replace("_", ""), tool_name + "_tool" ] for variation in variations: if variation in TOOL_REGISTRY: return TOOL_REGISTRY[variation] # Check partial matches for registered_name, tool in TOOL_REGISTRY.items(): if tool_name in registered_name or registered_name in tool_name: return tool return None # ======================== # UNIFIED EXECUTOR WITH FALLBACK (UPDATED) # ======================== # Update the execute_pipeline_streaming function: def execute_pipeline_streaming( pipeline: Dict[str, Any], file_path: str, session_id: Optional[str] = None, prefer_bedrock: bool = True ) -> Generator[Dict[str, Any], None, None]: """ Execute pipeline with agentic orchestration (gated) or legacy fallback. PHASE 1: ENTRY POINT GATING - If agentic enabled → route to agentic wrapper - On ANY failure → HARD FALLBACK to legacy path - Legacy path remains COMPLETELY UNCHANGED Args: pipeline: Pipeline configuration file_path: Path to file being processed session_id: Optional session identifier prefer_bedrock: Use Bedrock over CrewAI in legacy path Yields: Pipeline execution events """ # ======================================== # AGENTIC ORCHESTRATION GATE (Phase 3) # ======================================== if _should_use_agentic_orchestration(pipeline, session_id): logger.info(f"Routing to agentic orchestration for session {session_id}") try: # Import wrapper (isolated - no agent internals exposed) from services.agentic_orchestrator_wrapper import execute_with_agentic_orchestration from services.agentic_integration_logger import log_agentic_attempt # Log decision log_agentic_attempt( session_id=session_id or "unknown", pipeline=pipeline, decision="agentic_enabled" ) # Execute via agentic wrapper # If this succeeds, return early (skip legacy path) for event in execute_with_agentic_orchestration(pipeline, file_path, session_id): yield event logger.info(f"Agentic orchestration completed for session {session_id}") return # Success - done via agentic path except Exception as e: # HARD FALLBACK: Any exception → continue to legacy path below logger.error(f"Agentic orchestration failed, falling back to legacy: {e}") from services.agentic_integration_logger import log_fallback_trigger log_fallback_trigger( session_id=session_id or "unknown", reason="Exception in agentic execution", exception=e ) # Yield info event about fallback yield { "type": "info", "message": f"Agentic execution failed, using legacy pipeline", "executor": "fallback" } # Continue to legacy path below (no return) # ======================================== # LEGACY PATH (COMPLETELY UNCHANGED) # ======================================== components_executed = [] final_output = None executor_used = "unknown" fallback_triggered = False bedrock_error = None # Initialize pipeline info pipeline_id = pipeline.get("pipeline_id") pipeline_name = pipeline.get("pipeline_name", "Unnamed Pipeline") # FIX: Get steps from either 'components' or 'pipeline_steps' steps = [] if "pipeline_steps" in pipeline: steps = pipeline.get("pipeline_steps", []) elif "components" in pipeline: steps = pipeline.get("components", []) # Also update the pipeline to have both for consistency pipeline["pipeline_steps"] = steps if not steps: error_msg = f"No steps/components found in pipeline: {pipeline_name}" yield { "type": "error", "error": error_msg, "data": { "pipeline_id": pipeline_id, "pipeline_name": pipeline_name, "status": "failed", "components_executed": [], "error": error_msg } } return # Check if tools are available if not TOOL_REGISTRY: error_msg = "No tools available. master_tools.py not loaded correctly." yield { "type": "error", "error": error_msg, "data": { "pipeline_id": pipeline_id, "pipeline_name": pipeline_name, "status": "failed", "components_executed": [], "error": error_msg } } return print(f"🏆 Executing pipeline '{pipeline_name}' with {len(steps)} steps") print(f" Steps format: {[s.get('tool_name', s.get('tool', 'unknown')) for s in steps]}") yield { "type": "info", "message": f"Starting pipeline: {pipeline_name} with {len(steps)} steps", "executor": "initializing" } # Try Bedrock first (priority) if prefer_bedrock and BEDROCK_AVAILABLE: try: print(f"🏆 Executing pipeline with Bedrock: {pipeline_name}") yield { "type": "info", "message": "Attempting execution with Bedrock LangChain...", "executor": "bedrock" } # Global components_executed list for step-by-step execution # NOTE: This needs to be declared as nonlocal or passed to helper functions components_executed = [] # Reset for Bedrock execution # Execute step by step with Bedrock for step_num, step_def in enumerate(steps, 1): tool_name = step_def.get("tool_name", step_def.get("tool", "unknown")) yield { "type": "step", "step": step_num, "tool": tool_name, "status": "executing", "executor": "bedrock" } try: # Execute the step using master_tools result = _execute_step_with_master_tool( step_def=step_def, file_path=file_path, step_num=step_num, total_steps=len(steps), session_id=session_id, prefer_bedrock=True, previous_results=components_executed # Pass previous results ) executor_used = "bedrock" # Create component result object component_result = { "tool_name": tool_name, "tool": tool_name, # For compatibility **step_def, # Include all step definition fields "result": result.get("output"), "status": "completed", "executor": executor_used, "execution_time": result.get("execution_time"), "step_number": step_num, "success": True, "tool_version": result.get("tool_version", "1.0") } components_executed.append(component_result) yield { "type": "step", "step": step_num, "tool": tool_name, "status": "completed", "observation": result.get("output"), "input": step_def, "executor": executor_used } # Update file_path for next step if needed file_path = _update_file_path(file_path, result) except Exception as step_error: print(f"❌ Step {step_num} failed with Bedrock: {str(step_error)}") # Create failed component result component_result = { "tool_name": tool_name, "tool": tool_name, **step_def, "result": {"error": str(step_error)}, "status": "failed", "error": str(step_error), "step_number": step_num, "success": False } components_executed.append(component_result) bedrock_error = str(step_error) yield { "type": "error", "step": step_num, "tool": tool_name, "error": str(step_error), "message": f"Step {step_num} failed with Bedrock" } fallback_triggered = True break # If we completed all steps with Bedrock if not fallback_triggered and len(components_executed) == len(steps): final_output = _build_final_output(pipeline, components_executed, executor_used, "completed") yield { "type": "final", "data": final_output, "executor": executor_used } print(f"✅ Bedrock execution completed: {pipeline_name}") return except Exception as bedrock_exception: print(f"❌ Bedrock execution exception: {str(bedrock_exception)}") bedrock_error = str(bedrock_exception) fallback_triggered = True # If Bedrock failed or wasn't preferred, try CrewAI if fallback_triggered or not prefer_bedrock: print(f"🔄 Executing pipeline with CrewAI: {pipeline_name}") if fallback_triggered and bedrock_error: yield { "type": "info", "message": f"Bedrock failed: {bedrock_error}. Switching to CrewAI...", "executor": "fallback" } else: yield { "type": "info", "message": "Using CrewAI execution...", "executor": "crewai" } # Start from where Bedrock left off, or from beginning start_step = len(components_executed) + 1 if components_executed else 1 for step_num in range(start_step, len(steps) + 1): step_def = steps[step_num - 1] tool_name = step_def.get("tool_name", step_def.get("tool", "unknown")) yield { "type": "step", "step": step_num, "tool": tool_name, "status": "executing", "executor": "crewai" } try: # Execute the step using master_tools result = _execute_step_with_master_tool( step_def=step_def, file_path=file_path, step_num=step_num, total_steps=len(steps), session_id=session_id, prefer_bedrock=False, previous_results=components_executed ) executor_used = "crewai" # Create component result object component_result = { "tool_name": tool_name, "tool": tool_name, **step_def, "result": result.get("output"), "status": "completed", "executor": executor_used, "execution_time": result.get("execution_time"), "step_number": step_num, "success": True, "tool_version": result.get("tool_version", "1.0") } # Add or replace in components_executed if len(components_executed) >= step_num: components_executed[step_num - 1] = component_result else: components_executed.append(component_result) yield { "type": "step", "step": step_num, "tool": tool_name, "status": "completed", "observation": result.get("output"), "input": step_def, "executor": executor_used } # Update file_path for next step if needed file_path = _update_file_path(file_path, result) except Exception as step_error: print(f"❌ Step {step_num} failed with CrewAI: {str(step_error)}") # Create failed component result component_result = { "tool_name": tool_name, "tool": tool_name, **step_def, "result": {"error": str(step_error)}, "status": "failed", "error": str(step_error), "step_number": step_num, "success": False } # Add or replace in components_executed if len(components_executed) >= step_num: components_executed[step_num - 1] = component_result else: components_executed.append(component_result) yield { "type": "error", "step": step_num, "tool": tool_name, "error": str(step_error), "message": f"Step {step_num} failed with CrewAI" } break # Check if we completed all steps completed_steps = [c for c in components_executed if c.get("status") == "completed"] if len(completed_steps) == len(steps): # All steps completed final_output = _build_final_output(pipeline, components_executed, executor_used, "completed") yield { "type": "final", "data": final_output, "executor": executor_used } print(f"✅ CrewAI execution completed: {pipeline_name}") else: # Partial completion or failure final_output = _build_final_output(pipeline, components_executed, executor_used, "partial") final_output["error"] = f"Pipeline execution incomplete. Completed {len(completed_steps)} of {len(steps)} steps." yield { "type": "error", "error": "Pipeline execution incomplete", "data": final_output } print(f"⚠️ CrewAI execution incomplete for: {pipeline_name}") # ======================== # DYNAMIC STEP EXECUTION WITH MASTER_TOOLS # ======================== def _execute_step_with_master_tool( step_def: Dict[str, Any], file_path: str, step_num: int, total_steps: int, session_id: Optional[str] = None, prefer_bedrock: bool = True, previous_results: List[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Execute a pipeline step using master_tools. FIXED: Handle step_def with either 'tool_name' or 'tool' field """ import time import inspect # FIX: Get tool name from either 'tool_name' or 'tool' field tool_name = step_def.get("tool_name", step_def.get("tool", "unknown")) start_time = time.time() print(f" 🔨 Executing step {step_num}/{total_steps}: {tool_name}") print(f" Step definition: {step_def}") # Get tool from registry tool = get_tool_executor(tool_name) if not tool: error_msg = f"Tool '{tool_name}' not found in registry. Available tools: {list(TOOL_REGISTRY.keys())}" print(f" ❌ {error_msg}") raise ValueError(error_msg) # Prepare arguments args = {} # For StructuredTool (LangChain tools) if hasattr(tool, 'args_schema') and hasattr(tool, 'invoke'): # Get the args schema args_schema = tool.args_schema # Build arguments from step_def for field_name, field in args_schema.__fields__.items(): # Check if parameter is in step_def if field_name in step_def: args[field_name] = step_def[field_name] # Special handling for file_path elif field_name == "file_path" and file_path: args[field_name] = file_path # Special handling for session_id elif field_name == "session_id" and session_id: args[field_name] = session_id # Handle text parameter if not provided but we have previous output elif field_name == "text" and field_name not in step_def and previous_results: # Try to get text from previous step's output if step_num > 1 and len(previous_results) >= step_num - 1: prev_result = previous_results[step_num - 2].get("result") if isinstance(prev_result, dict) and "text" in prev_result: args["text"] = prev_result["text"] print(f" 📝 Using text from previous step: {args['text'][:100]}...") elif isinstance(prev_result, str): args["text"] = prev_result print(f" 📝 Using text from previous step: {args['text'][:100]}...") try: # Execute the tool print(f" 🚀 Invoking tool {tool_name} with args: {args}") output = tool.invoke(args) execution_time = time.time() - start_time print(f" ✅ Step {step_num} completed in {execution_time:.2f}s") return { "output": output, "executor": "bedrock" if prefer_bedrock else "crewai", "execution_time": execution_time, "tool_version": "master_tools_1.0", "args_used": list(args.keys()) } except Exception as e: # Try with minimal arguments print(f"⚠️ Tool {tool_name} failed with full args, trying minimal: {e}") # Try with just file_path if available if file_path and "file_path" in args_schema.__fields__: minimal_args = {"file_path": file_path} try: output = tool.invoke(minimal_args) execution_time = time.time() - start_time return { "output": output, "executor": "bedrock" if prefer_bedrock else "crewai", "execution_time": execution_time, "tool_version": "master_tools_1.0", "args_used": list(minimal_args.keys()), "warning": "Used minimal arguments" } except Exception as inner_error: raise RuntimeError(f"Tool '{tool_name}' failed with minimal args: {inner_error}") # For regular Python functions elif callable(tool): try: # Get function signature sig = inspect.signature(tool) # Build arguments based on signature call_args = {} for param_name, param in sig.parameters.items(): # Try step_def first if param_name in step_def: call_args[param_name] = step_def[param_name] # Special handling for file_path elif param_name == "file_path" and file_path: call_args[param_name] = file_path # Special handling for session_id elif param_name == "session_id" and session_id: call_args[param_name] = session_id # Handle text parameter elif param_name == "text" and param_name not in step_def and previous_results: # Try to get text from previous step if step_num > 1 and len(previous_results) >= step_num - 1: prev_result = previous_results[step_num - 2].get("result") if isinstance(prev_result, dict) and "text" in prev_result: call_args["text"] = prev_result["text"] elif isinstance(prev_result, str): call_args["text"] = prev_result # Execute the function print(f" 🚀 Calling function {tool_name} with args: {call_args}") output = tool(**call_args) execution_time = time.time() - start_time print(f" ✅ Step {step_num} completed in {execution_time:.2f}s") return { "output": output, "executor": "bedrock" if prefer_bedrock else "crewai", "execution_time": execution_time, "tool_version": "function_1.0", "args_used": list(call_args.keys()) } except Exception as e: raise RuntimeError(f"Failed to execute function {tool_name}: {e}") else: raise ValueError(f"Tool '{tool_name}' is not callable or a valid StructuredTool") def _update_file_path(current_file_path: str, result: Dict[str, Any]) -> str: """ Update file path based on tool result. Some tools might generate new files. """ output = result.get("output") if isinstance(output, dict): # Check for file references in output for key in ["file_path", "output_file", "new_file", "generated_file"]: if key in output and isinstance(output[key], str): return output[key] return current_file_path # ======================== # HELPER FUNCTIONS # ======================== def _build_final_output( pipeline: Dict[str, Any], components_executed: List[Dict[str, Any]], executor_used: str, status: str ) -> Dict[str, Any]: """ Build final output with components_executed array. FIXED: Handle both component formats """ # Get steps count from pipeline steps = pipeline.get("pipeline_steps", pipeline.get("components", [])) # Find the finalize step result if present final_result = None for component in components_executed: if component.get("tool_name") == "finalize" or component.get("tool") == "finalize": final_result = component.get("result") break # Count completed steps completed_steps = len([c for c in components_executed if c.get("status") == "completed"]) final_output = { "pipeline_id": pipeline.get("pipeline_id"), "pipeline_name": pipeline.get("pipeline_name"), "status": status, "components_executed": components_executed, "executor": executor_used, "summary": f"Pipeline execution {status} with {executor_used}", "total_steps": len(steps), "completed_steps": completed_steps, "final_result": final_result } # Extract text for user-facing output if final_result: # Use finalize tool's output final_output["text"] = final_result elif components_executed: # Find last completed component with text for component in reversed(components_executed): if component.get("status") == "completed" and component.get("result"): result = component["result"] if isinstance(result, str): final_output["text"] = result break elif isinstance(result, dict): for field in ["text", "summary", "content", "translation", "output"]: if field in result and isinstance(result[field], str): final_output["text"] = result[field] break # If no text field found but dict has string values if "text" not in final_output: for key, value in result.items(): if isinstance(value, str) and len(value) > 10: final_output["text"] = value break return final_output # ======================== # NON-STREAMING EXECUTOR # ======================== def execute_pipeline( pipeline: Dict[str, Any], file_path: str, session_id: Optional[str] = None, prefer_bedrock: bool = True ) -> Dict[str, Any]: """ Execute pipeline (non-streaming) with fallback """ final_result = None for event in execute_pipeline_streaming(pipeline, file_path, session_id, prefer_bedrock): if event.get("type") == "final": final_result = event.get("data") break elif event.get("type") == "error" and event.get("data"): final_result = event.get("data") break if final_result is None: final_result = { "pipeline_id": pipeline.get("pipeline_id"), "pipeline_name": pipeline.get("pipeline_name"), "status": "failed", "components_executed": [], "error": "Pipeline execution completed without final result" } return final_result if __name__ == "__main__": # Test test_pipeline = { "pipeline_name": "test-extraction", "components": [ { "tool_name": "extract_text", "start_page": 1, "end_page": 1, "params": {} } ], "_generator": "test" } test_file = "test.pdf" print("Testing streaming execution...") for event in execute_pipeline_streaming(test_pipeline, test_file): print(f"Event: {event}")