Spaces:
Sleeping
Sleeping
| # services/pipeline_executor.py | |
| """ | |
| Pipeline Executor - Orchestrates multi-step document processing pipelines | |
| Supports: | |
| - Agentic Orchestration (Phase 3 - gated by feature flag) | |
| - Bedrock LangChain execution (preferred legacy) | |
| - CrewAI execution (fallback legacy) | |
| - Dynamic tool chaining | |
| - Component status tracking | |
| Version: 3.0 with Safe Agentic Integration | |
| """ | |
| import json | |
| import os | |
| import time | |
| import hashlib | |
| from typing import Dict, Any, List, Generator, Optional | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # ======================== | |
| # AGENTIC ORCHESTRATION GATING (Phase 3) | |
| # ======================== | |
| def _should_use_agentic_orchestration( | |
| pipeline: Dict[str, Any], | |
| session_id: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Decision logic for agentic vs legacy execution. | |
| Returns True only if: | |
| 1. Feature flag USE_AGENTIC_ORCHESTRATION=true | |
| 2. Session passes rollout percentage | |
| 3. Not in shadow mode (shadow uses legacy result) | |
| Args: | |
| pipeline: Pipeline configuration | |
| session_id: Optional session identifier for rollout hashing | |
| Returns: | |
| True if agentic orchestration should be used | |
| """ | |
| # Check kill switch | |
| if not os.getenv("USE_AGENTIC_ORCHESTRATION", "false").lower() == "true": | |
| return False | |
| # Shadow mode always uses legacy (agentic runs in parallel) | |
| if os.getenv("AGENTIC_SHADOW_MODE", "false").lower() == "true": | |
| return False | |
| # Rollout percentage (0-100) | |
| rollout_pct = int(os.getenv("AGENTIC_ROLLOUT_PERCENTAGE", "0")) | |
| if rollout_pct <= 0: | |
| return False # Disabled | |
| if rollout_pct >= 100: | |
| return True # Full rollout | |
| # Percentage-based rollout using session hash | |
| if session_id: | |
| hash_val = int(hashlib.md5(session_id.encode()).hexdigest(), 16) | |
| if (hash_val % 100) < rollout_pct: | |
| return True | |
| return False | |
| # For Bedrock LangChain | |
| try: | |
| from langchain_aws import ChatBedrock | |
| from langchain.agents import AgentExecutor, create_react_agent # Using ReAct instead of tool_calling | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain import hub | |
| from services.master_tools import get_master_tools as get_langchain_tools | |
| BEDROCK_AVAILABLE = True | |
| print("✅ Bedrock LangChain imports successful - BEDROCK_AVAILABLE = True") | |
| except ImportError as e: | |
| BEDROCK_AVAILABLE = False | |
| print(f"❌ WARNING: LangChain Bedrock not available - {str(e)}") | |
| print(" Pipeline execution will use CrewAI only") | |
| except Exception as e: | |
| BEDROCK_AVAILABLE = False | |
| print(f"❌ ERROR: Bedrock import failed - {str(e)}") | |
| # For CrewAI fallback | |
| from services.agent_crewai import run_agent_streaming as crewai_run_streaming | |
| # Pipeline Manager for V3 architecture | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| PIPELINE_MANAGER_AVAILABLE = True | |
| except ImportError: | |
| PIPELINE_MANAGER_AVAILABLE = False | |
| print("⚠️ Pipeline Manager not available") | |
| # ======================== | |
| # BEDROCK LANGCHAIN EXECUTOR | |
| # ======================== | |
| def execute_pipeline_bedrock( | |
| pipeline: Dict[str, Any], | |
| file_path: str, | |
| session_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute pipeline using Bedrock + LangChain (priority method) | |
| """ | |
| if not BEDROCK_AVAILABLE: | |
| raise RuntimeError("Bedrock LangChain not available") | |
| try: | |
| llm = ChatBedrock( | |
| model_id="mistral.mistral-large-2402-v1:0", | |
| region_name=os.getenv("AWS_REGION", "us-east-1") | |
| ) | |
| tools = get_langchain_tools() | |
| system_instructions = """You are MasterLLM, a precise document processing agent. | |
| Execute the provided pipeline components in ORDER. For each component: | |
| 1. Call the corresponding tool with exact parameters | |
| 2. Wait for the result | |
| 3. Move to next component | |
| IMPORTANT: | |
| - Follow the pipeline order strictly | |
| - Use the file_path provided for all file-based operations | |
| - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps | |
| - At the end, call 'finalize' tool with complete results | |
| Pipeline components will be in format: | |
| {{ | |
| "tool_name": "extract_text", | |
| "start_page": 1, | |
| "end_page": 5, | |
| "params": {{}} | |
| }}""" | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_instructions), | |
| ("system", "File path: {file_path}"), | |
| ("system", "Pipeline to execute: {pipeline_json}"), | |
| ("system", "Session ID: {session_id}"), | |
| ("human", "Execute the pipeline. Process each component in order and finalize with complete JSON results."), | |
| MessagesPlaceholder(variable_name="agent_scratchpad") # REQUIRED for LangChain agent | |
| ]) | |
| agent = create_tool_calling_agent(llm, tools, prompt) | |
| executor = AgentExecutor( | |
| agent=agent, | |
| tools=tools, | |
| verbose=True, | |
| max_iterations=15, | |
| handle_parsing_errors=True, | |
| ) | |
| result = executor.invoke({ | |
| "input": f"Execute pipeline: {pipeline['pipeline_name']}", | |
| "file_path": file_path, | |
| "pipeline_json": json.dumps(pipeline, indent=2), | |
| "session_id": session_id or "unknown" | |
| }) | |
| return result | |
| except Exception as e: | |
| raise RuntimeError(f"Bedrock execution failed: {str(e)}") | |
| def execute_pipeline_bedrock_streaming( | |
| pipeline: Dict[str, Any], | |
| file_path: str, | |
| session_id: Optional[str] = None | |
| ) -> Generator[Dict[str, Any], None, None]: | |
| """ | |
| Execute pipeline using Bedrock with MANUAL tool calling loop (bypasses LangChain agents) | |
| """ | |
| if not BEDROCK_AVAILABLE: | |
| raise RuntimeError("Bedrock LangChain not available") | |
| try: | |
| import re | |
| import boto3 | |
| # Get Bedrock client directly | |
| bedrock_runtime = boto3.client( | |
| service_name='bedrock-runtime', | |
| region_name=os.getenv("AWS_REGION", "us-east-1") | |
| ) | |
| tools_dict = {tool.name: tool for tool in get_langchain_tools()} | |
| # Build tool descriptions for prompt | |
| tool_descriptions = [] | |
| for name, tool in tools_dict.items(): | |
| tool_descriptions.append(f"- {name}: {tool.description}") | |
| tools_text = "\n".join(tool_descriptions) | |
| tool_names = ", ".join(tools_dict.keys()) | |
| # Build list of EXACT components to execute | |
| components_list = pipeline.get('components', []) | |
| components_to_execute = "\n".join([ | |
| f"{i+1}. {comp.get('tool_name')} (pages {comp.get('start_page')}-{comp.get('end_page')})" | |
| for i, comp in enumerate(components_list) | |
| ]) | |
| # Initial prompt with STRICT component enforcement | |
| system_prompt = f"""You are MasterLLM, a document processing assistant. | |
| You have access to these tools: | |
| {tools_text} | |
| To use a tool, you MUST write EXACTLY in this format: | |
| Action: tool_name | |
| Action Input: {{"param1": "value1", "param2": value2}} | |
| After you write Action and Action Input, I will execute the tool and give you the Observation. | |
| Then you can take another Action or provide your Final Answer. | |
| CRITICAL RULES: | |
| - Write "Action:" followed by the tool name | |
| - Write "Action Input:" followed by valid JSON on the SAME line or next line | |
| - ALWAYS include "file_path" parameter in your Action Input | |
| - The file_path is: {file_path} | |
| - For page parameters: start_page must be >= 1, end_page must be >= 1 | |
| - To process ALL pages, use end_page: 999 (NOT -1!) | |
| - After seeing Observation, you can take another Action | |
| - When done, write "Final Answer:" followed by summary | |
| EXAMPLE of correct tool call: | |
| Action: extract_text | |
| Action Input: {{"file_path": "{file_path}", "start_page": 1, "end_page": 5}} | |
| IMPORTANT: Every tool call MUST include the file_path parameter! | |
| **CRITICAL: EXECUTE ONLY THESE COMPONENTS IN THIS EXACT ORDER:** | |
| {components_to_execute} | |
| **DO NOT execute any other tools or components not listed above!** | |
| **DO NOT add extra steps like summarize, translate, or classify unless explicitly listed!** | |
| **After completing the components listed above, provide your Final Answer immediately!** | |
| File to process: {file_path} | |
| Total components to execute: {len(components_list)} | |
| Execute ONLY the {len(components_list)} component(s) listed above, then provide Final Answer.""" | |
| user_message = f"Execute the pipeline: {pipeline['pipeline_name']}" | |
| conversation_history = [] | |
| tool_results = {} | |
| has_called_tools = False | |
| step_count = 0 | |
| # Set max_iterations based on number of components (with generous buffer) | |
| num_components = len(pipeline.get('components', [])) | |
| # Formula: num_components * 2 (for tool call + processing) + 3 (init + final answer + safety) | |
| max_iterations = max(5, (num_components * 2) + 3) | |
| print(f"📋 Pipeline has {num_components} components, max_iterations={max_iterations}") | |
| yield { | |
| "type": "status", | |
| "message": "Initializing Bedrock manual executor...", | |
| "executor": "bedrock" | |
| } | |
| for iteration in range(max_iterations): | |
| # Prepare messages (Bedrock converse API format) | |
| messages = [{"role": "user", "content": [{"text": user_message}]}] | |
| messages.extend(conversation_history) | |
| # Call Bedrock directly using converse API | |
| response = bedrock_runtime.converse( | |
| modelId="mistral.mistral-large-2402-v1:0", | |
| messages=messages, | |
| system=[{"text": system_prompt}], | |
| inferenceConfig={ | |
| "temperature": 0.0, | |
| "maxTokens": 2048 | |
| } | |
| ) | |
| # Get response text | |
| assistant_message = response['output']['message']['content'][0]['text'] | |
| print(f"\n🤖 Mistral Response (Iteration {iteration + 1}):\n{assistant_message}\n") | |
| # Add to conversation (Bedrock converse API format) | |
| conversation_history.append({"role": "assistant", "content": [{"text": assistant_message}]}) | |
| # Parse for Action and Action Input FIRST (before checking Final Answer) | |
| action_match = re.search(r'Action:\s*(\w+)', assistant_message) | |
| action_input_match = re.search(r'Action Input:\s*(\{.+?\})', assistant_message, re.DOTALL) | |
| if action_match and action_input_match: | |
| tool_name = action_match.group(1) | |
| action_input_str = action_input_match.group(1).strip() | |
| try: | |
| # Parse JSON input | |
| tool_input = json.loads(action_input_str) | |
| if tool_name in tools_dict: | |
| step_count += 1 | |
| has_called_tools = True | |
| yield { | |
| "type": "step", | |
| "step": step_count, | |
| "tool": tool_name, | |
| "status": "executing", | |
| "executor": "bedrock", | |
| "input": str(tool_input)[:200] | |
| } | |
| # Execute the component! | |
| tool = tools_dict[tool_name] | |
| observation = tool.invoke(tool_input) | |
| tool_results[tool_name] = observation | |
| # Generate component success message | |
| success_msg = f"Successfully executed {tool_name.replace('_', ' ')}" | |
| if isinstance(observation, dict): | |
| if "pages" in observation: | |
| success_msg = f"Successfully extracted from {observation.get('pages', 'N/A')} pages" | |
| elif "tables" in observation: | |
| success_msg = f"Successfully extracted {len(observation.get('tables', []))} tables" | |
| yield { | |
| "type": "component_status", | |
| "step": step_count, | |
| "component": tool_name, | |
| "status": "completed", | |
| "message": success_msg, | |
| "observation": str(observation)[:500], | |
| "executor": "bedrock" | |
| } | |
| # Add observation to conversation (Bedrock converse API format) | |
| observation_message = f"Observation: {observation}" | |
| conversation_history.append({"role": "user", "content": [{"text": observation_message}]}) | |
| # Continue to next iteration to get Mistral's next action | |
| continue | |
| else: | |
| # Unknown tool | |
| error_msg = f"Unknown tool: {tool_name}" | |
| conversation_history.append({"role": "user", "content": [{"text": f"Error: {error_msg}"}]}) | |
| continue | |
| except json.JSONDecodeError as e: | |
| # Invalid JSON | |
| error_msg = f"Invalid JSON in Action Input: {e}" | |
| conversation_history.append({"role": "user", "content": [{"text": f"Error: {error_msg}"}]}) | |
| continue | |
| # Check for Final Answer (only if no action was found) | |
| if "Final Answer:" in assistant_message or "final answer" in assistant_message.lower(): | |
| # Done! | |
| if tool_results: | |
| # Try to compile component results for pipeline manager (V3 architecture) | |
| # Note: This requires a pipeline record to exist in MongoDB | |
| # If not found, we'll fall back to basic response without S3 storage | |
| structured_result = { | |
| "status": "completed", | |
| "components_executed": tool_results, | |
| "summary": { | |
| "total_tools_called": len(tool_results), | |
| "tools": list(tool_results.keys()) | |
| }, | |
| "final_output": assistant_message | |
| } | |
| if PIPELINE_MANAGER_AVAILABLE and session_id: | |
| try: | |
| pipeline_mgr = get_pipeline_manager() | |
| # Check if pipeline record exists first | |
| pipeline_record = pipeline_mgr.get_pipeline(session_id) | |
| if pipeline_record: | |
| # Pipeline record exists, create S3 final output | |
| components_results = [] | |
| for comp_name, comp_output in tool_results.items(): | |
| components_results.append({ | |
| "component_name": comp_name, | |
| "status": "completed", | |
| "result": comp_output, | |
| "success_message": f"Successfully executed {comp_name}" | |
| }) | |
| # Create final output in S3 | |
| final_output_data = pipeline_mgr.mark_pipeline_completed( | |
| execution_id=session_id, | |
| components_results=components_results, | |
| executor="bedrock" | |
| ) | |
| # Add S3 URLs to structured result | |
| structured_result["final_output_url"] = final_output_data.get("final_output_url") | |
| structured_result["final_output_expires_at"] = final_output_data.get("final_output_expires_at") | |
| structured_result["last_node_output"] = final_output_data.get("last_node_output") | |
| structured_result["workflow_status"] = final_output_data.get("workflow_status", "completed") | |
| else: | |
| # No pipeline record - this is OK for direct executor calls | |
| # Just continue with basic response | |
| print(f"ℹ️ No pipeline record found for session {session_id} - using basic response") | |
| except Exception as e: | |
| # Pipeline manager failed - continue with basic response | |
| print(f"⚠️ Failed to create S3 final output: {str(e)}") | |
| # structured_result already has basic fields, just continue | |
| yield { | |
| "type": "final", | |
| "data": structured_result, | |
| "executor": "bedrock" | |
| } | |
| else: | |
| yield { | |
| "type": "error", | |
| "error": "Bedrock completed but no components were called", | |
| "executor": "bedrock" | |
| } | |
| return | |
| # No action found - agent might be confused or done | |
| if iteration > 0 and not has_called_tools: | |
| # Agent isn't calling tools properly | |
| yield { | |
| "type": "error", | |
| "error": "Bedrock didn't call tools in correct format. Falling back to CrewAI.", | |
| "executor": "bedrock", | |
| "debug_output": assistant_message[:500] | |
| } | |
| return | |
| elif iteration > 0: | |
| # Has called some tools but stopped - might be done | |
| structured_result = { | |
| "status": "completed", | |
| "components_executed": tool_results, | |
| "summary": { | |
| "total_tools_called": len(tool_results), | |
| "tools": list(tool_results.keys()) | |
| }, | |
| "final_output": assistant_message | |
| } | |
| yield { | |
| "type": "final", | |
| "data": structured_result, | |
| "executor": "bedrock" | |
| } | |
| return | |
| # Max iterations reached | |
| if tool_results: | |
| structured_result = { | |
| "status": "completed", | |
| "components_executed": tool_results, | |
| "summary": { | |
| "total_tools_called": len(tool_results), | |
| "tools": list(tool_results.keys()) | |
| }, | |
| "final_output": "Max iterations reached" | |
| } | |
| yield { | |
| "type": "final", | |
| "data": structured_result, | |
| "executor": "bedrock" | |
| } | |
| else: | |
| yield { | |
| "type": "error", | |
| "error": "Max iterations reached without tool calls", | |
| "executor": "bedrock" | |
| } | |
| except Exception as e: | |
| yield { | |
| "type": "error", | |
| "error": str(e), | |
| "executor": "bedrock" | |
| } | |
| # ======================== | |
| # CREWAI EXECUTOR (FALLBACK) | |
| # ======================== | |
| def execute_pipeline_crewai_streaming( | |
| pipeline: Dict[str, Any], | |
| file_path: str, | |
| session_id: Optional[str] = None | |
| ) -> Generator[Dict[str, Any], None, None]: | |
| """ | |
| Execute pipeline using CrewAI (fallback method) | |
| """ | |
| try: | |
| # Yield initial status | |
| yield { | |
| "type": "status", | |
| "message": "Using CrewAI executor (fallback)...", | |
| "executor": "crewai" | |
| } | |
| # Use existing CrewAI streaming function | |
| execution_goal = ( | |
| f"Execute the approved plan: {pipeline['pipeline_name']}. " | |
| f"Process {len(pipeline.get('components', []))} components in order." | |
| ) | |
| for event in crewai_run_streaming( | |
| user_input=execution_goal, | |
| session_file_path=file_path, | |
| plan=pipeline, | |
| chat_history=[] | |
| ): | |
| # Pass through CrewAI events with executor tag | |
| if isinstance(event, dict): | |
| event["executor"] = "crewai" | |
| yield event | |
| except Exception as e: | |
| yield { | |
| "type": "error", | |
| "error": str(e), | |
| "executor": "crewai" | |
| } | |
| # ======================== | |
| # UNIFIED EXECUTOR WITH FALLBACK | |
| # ======================== | |
| # ======================== | |
| # TOOL REGISTRY & DYNAMIC EXECUTION (UPDATED) | |
| # ======================== | |
| # Import the master tools | |
| try: | |
| from services.master_tools import get_master_tools | |
| from langchain_core.tools import StructuredTool | |
| # Get all tools from master_tools | |
| MASTER_TOOLS = get_master_tools() | |
| # Create tool registry mapping | |
| TOOL_REGISTRY = {} | |
| for tool in MASTER_TOOLS: | |
| if hasattr(tool, 'name'): | |
| TOOL_REGISTRY[tool.name] = tool | |
| elif hasattr(tool, '__name__'): | |
| TOOL_REGISTRY[tool.__name__] = tool | |
| print(f"✅ Loaded {len(TOOL_REGISTRY)} tools from master_tools.py") | |
| except ImportError as e: | |
| print(f"⚠️ Could not import master_tools: {e}") | |
| TOOL_REGISTRY = {} | |
| def get_tool_executor(tool_name: str) -> Optional[Any]: | |
| """Get tool from registry with intelligent name matching""" | |
| # Direct match | |
| if tool_name in TOOL_REGISTRY: | |
| return TOOL_REGISTRY[tool_name] | |
| # Try variations | |
| variations = [ | |
| tool_name, | |
| f"{tool_name}_tool", | |
| tool_name.replace("_", ""), | |
| tool_name + "_tool" | |
| ] | |
| for variation in variations: | |
| if variation in TOOL_REGISTRY: | |
| return TOOL_REGISTRY[variation] | |
| # Check partial matches | |
| for registered_name, tool in TOOL_REGISTRY.items(): | |
| if tool_name in registered_name or registered_name in tool_name: | |
| return tool | |
| return None | |
| # ======================== | |
| # UNIFIED EXECUTOR WITH FALLBACK (UPDATED) | |
| # ======================== | |
| # Update the execute_pipeline_streaming function: | |
| def execute_pipeline_streaming( | |
| pipeline: Dict[str, Any], | |
| file_path: str, | |
| session_id: Optional[str] = None, | |
| prefer_bedrock: bool = True | |
| ) -> Generator[Dict[str, Any], None, None]: | |
| """ | |
| Execute pipeline with agentic orchestration (gated) or legacy fallback. | |
| PHASE 1: ENTRY POINT GATING | |
| - If agentic enabled → route to agentic wrapper | |
| - On ANY failure → HARD FALLBACK to legacy path | |
| - Legacy path remains COMPLETELY UNCHANGED | |
| Args: | |
| pipeline: Pipeline configuration | |
| file_path: Path to file being processed | |
| session_id: Optional session identifier | |
| prefer_bedrock: Use Bedrock over CrewAI in legacy path | |
| Yields: | |
| Pipeline execution events | |
| """ | |
| # ======================================== | |
| # AGENTIC ORCHESTRATION GATE (Phase 3) | |
| # ======================================== | |
| if _should_use_agentic_orchestration(pipeline, session_id): | |
| logger.info(f"Routing to agentic orchestration for session {session_id}") | |
| try: | |
| # Import wrapper (isolated - no agent internals exposed) | |
| from services.agentic_orchestrator_wrapper import execute_with_agentic_orchestration | |
| from services.agentic_integration_logger import log_agentic_attempt | |
| # Log decision | |
| log_agentic_attempt( | |
| session_id=session_id or "unknown", | |
| pipeline=pipeline, | |
| decision="agentic_enabled" | |
| ) | |
| # Execute via agentic wrapper | |
| # If this succeeds, return early (skip legacy path) | |
| for event in execute_with_agentic_orchestration(pipeline, file_path, session_id): | |
| yield event | |
| logger.info(f"Agentic orchestration completed for session {session_id}") | |
| return # Success - done via agentic path | |
| except Exception as e: | |
| # HARD FALLBACK: Any exception → continue to legacy path below | |
| logger.error(f"Agentic orchestration failed, falling back to legacy: {e}") | |
| from services.agentic_integration_logger import log_fallback_trigger | |
| log_fallback_trigger( | |
| session_id=session_id or "unknown", | |
| reason="Exception in agentic execution", | |
| exception=e | |
| ) | |
| # Yield info event about fallback | |
| yield { | |
| "type": "info", | |
| "message": f"Agentic execution failed, using legacy pipeline", | |
| "executor": "fallback" | |
| } | |
| # Continue to legacy path below (no return) | |
| # ======================================== | |
| # LEGACY PATH (COMPLETELY UNCHANGED) | |
| # ======================================== | |
| components_executed = [] | |
| final_output = None | |
| executor_used = "unknown" | |
| fallback_triggered = False | |
| bedrock_error = None | |
| # Initialize pipeline info | |
| pipeline_id = pipeline.get("pipeline_id") | |
| pipeline_name = pipeline.get("pipeline_name", "Unnamed Pipeline") | |
| # FIX: Get steps from either 'components' or 'pipeline_steps' | |
| steps = [] | |
| if "pipeline_steps" in pipeline: | |
| steps = pipeline.get("pipeline_steps", []) | |
| elif "components" in pipeline: | |
| steps = pipeline.get("components", []) | |
| # Also update the pipeline to have both for consistency | |
| pipeline["pipeline_steps"] = steps | |
| if not steps: | |
| error_msg = f"No steps/components found in pipeline: {pipeline_name}" | |
| yield { | |
| "type": "error", | |
| "error": error_msg, | |
| "data": { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": pipeline_name, | |
| "status": "failed", | |
| "components_executed": [], | |
| "error": error_msg | |
| } | |
| } | |
| return | |
| # Check if tools are available | |
| if not TOOL_REGISTRY: | |
| error_msg = "No tools available. master_tools.py not loaded correctly." | |
| yield { | |
| "type": "error", | |
| "error": error_msg, | |
| "data": { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": pipeline_name, | |
| "status": "failed", | |
| "components_executed": [], | |
| "error": error_msg | |
| } | |
| } | |
| return | |
| print(f"🏆 Executing pipeline '{pipeline_name}' with {len(steps)} steps") | |
| print(f" Steps format: {[s.get('tool_name', s.get('tool', 'unknown')) for s in steps]}") | |
| yield { | |
| "type": "info", | |
| "message": f"Starting pipeline: {pipeline_name} with {len(steps)} steps", | |
| "executor": "initializing" | |
| } | |
| # Try Bedrock first (priority) | |
| if prefer_bedrock and BEDROCK_AVAILABLE: | |
| try: | |
| print(f"🏆 Executing pipeline with Bedrock: {pipeline_name}") | |
| yield { | |
| "type": "info", | |
| "message": "Attempting execution with Bedrock LangChain...", | |
| "executor": "bedrock" | |
| } | |
| # Global components_executed list for step-by-step execution | |
| # NOTE: This needs to be declared as nonlocal or passed to helper functions | |
| components_executed = [] # Reset for Bedrock execution | |
| # Execute step by step with Bedrock | |
| for step_num, step_def in enumerate(steps, 1): | |
| tool_name = step_def.get("tool_name", step_def.get("tool", "unknown")) | |
| yield { | |
| "type": "step", | |
| "step": step_num, | |
| "tool": tool_name, | |
| "status": "executing", | |
| "executor": "bedrock" | |
| } | |
| try: | |
| # Execute the step using master_tools | |
| result = _execute_step_with_master_tool( | |
| step_def=step_def, | |
| file_path=file_path, | |
| step_num=step_num, | |
| total_steps=len(steps), | |
| session_id=session_id, | |
| prefer_bedrock=True, | |
| previous_results=components_executed # Pass previous results | |
| ) | |
| executor_used = "bedrock" | |
| # Create component result object | |
| component_result = { | |
| "tool_name": tool_name, | |
| "tool": tool_name, # For compatibility | |
| **step_def, # Include all step definition fields | |
| "result": result.get("output"), | |
| "status": "completed", | |
| "executor": executor_used, | |
| "execution_time": result.get("execution_time"), | |
| "step_number": step_num, | |
| "success": True, | |
| "tool_version": result.get("tool_version", "1.0") | |
| } | |
| components_executed.append(component_result) | |
| yield { | |
| "type": "step", | |
| "step": step_num, | |
| "tool": tool_name, | |
| "status": "completed", | |
| "observation": result.get("output"), | |
| "input": step_def, | |
| "executor": executor_used | |
| } | |
| # Update file_path for next step if needed | |
| file_path = _update_file_path(file_path, result) | |
| except Exception as step_error: | |
| print(f"❌ Step {step_num} failed with Bedrock: {str(step_error)}") | |
| # Create failed component result | |
| component_result = { | |
| "tool_name": tool_name, | |
| "tool": tool_name, | |
| **step_def, | |
| "result": {"error": str(step_error)}, | |
| "status": "failed", | |
| "error": str(step_error), | |
| "step_number": step_num, | |
| "success": False | |
| } | |
| components_executed.append(component_result) | |
| bedrock_error = str(step_error) | |
| yield { | |
| "type": "error", | |
| "step": step_num, | |
| "tool": tool_name, | |
| "error": str(step_error), | |
| "message": f"Step {step_num} failed with Bedrock" | |
| } | |
| fallback_triggered = True | |
| break | |
| # If we completed all steps with Bedrock | |
| if not fallback_triggered and len(components_executed) == len(steps): | |
| final_output = _build_final_output(pipeline, components_executed, executor_used, "completed") | |
| yield { | |
| "type": "final", | |
| "data": final_output, | |
| "executor": executor_used | |
| } | |
| print(f"✅ Bedrock execution completed: {pipeline_name}") | |
| return | |
| except Exception as bedrock_exception: | |
| print(f"❌ Bedrock execution exception: {str(bedrock_exception)}") | |
| bedrock_error = str(bedrock_exception) | |
| fallback_triggered = True | |
| # If Bedrock failed or wasn't preferred, try CrewAI | |
| if fallback_triggered or not prefer_bedrock: | |
| print(f"🔄 Executing pipeline with CrewAI: {pipeline_name}") | |
| if fallback_triggered and bedrock_error: | |
| yield { | |
| "type": "info", | |
| "message": f"Bedrock failed: {bedrock_error}. Switching to CrewAI...", | |
| "executor": "fallback" | |
| } | |
| else: | |
| yield { | |
| "type": "info", | |
| "message": "Using CrewAI execution...", | |
| "executor": "crewai" | |
| } | |
| # Start from where Bedrock left off, or from beginning | |
| start_step = len(components_executed) + 1 if components_executed else 1 | |
| for step_num in range(start_step, len(steps) + 1): | |
| step_def = steps[step_num - 1] | |
| tool_name = step_def.get("tool_name", step_def.get("tool", "unknown")) | |
| yield { | |
| "type": "step", | |
| "step": step_num, | |
| "tool": tool_name, | |
| "status": "executing", | |
| "executor": "crewai" | |
| } | |
| try: | |
| # Execute the step using master_tools | |
| result = _execute_step_with_master_tool( | |
| step_def=step_def, | |
| file_path=file_path, | |
| step_num=step_num, | |
| total_steps=len(steps), | |
| session_id=session_id, | |
| prefer_bedrock=False, | |
| previous_results=components_executed | |
| ) | |
| executor_used = "crewai" | |
| # Create component result object | |
| component_result = { | |
| "tool_name": tool_name, | |
| "tool": tool_name, | |
| **step_def, | |
| "result": result.get("output"), | |
| "status": "completed", | |
| "executor": executor_used, | |
| "execution_time": result.get("execution_time"), | |
| "step_number": step_num, | |
| "success": True, | |
| "tool_version": result.get("tool_version", "1.0") | |
| } | |
| # Add or replace in components_executed | |
| if len(components_executed) >= step_num: | |
| components_executed[step_num - 1] = component_result | |
| else: | |
| components_executed.append(component_result) | |
| yield { | |
| "type": "step", | |
| "step": step_num, | |
| "tool": tool_name, | |
| "status": "completed", | |
| "observation": result.get("output"), | |
| "input": step_def, | |
| "executor": executor_used | |
| } | |
| # Update file_path for next step if needed | |
| file_path = _update_file_path(file_path, result) | |
| except Exception as step_error: | |
| print(f"❌ Step {step_num} failed with CrewAI: {str(step_error)}") | |
| # Create failed component result | |
| component_result = { | |
| "tool_name": tool_name, | |
| "tool": tool_name, | |
| **step_def, | |
| "result": {"error": str(step_error)}, | |
| "status": "failed", | |
| "error": str(step_error), | |
| "step_number": step_num, | |
| "success": False | |
| } | |
| # Add or replace in components_executed | |
| if len(components_executed) >= step_num: | |
| components_executed[step_num - 1] = component_result | |
| else: | |
| components_executed.append(component_result) | |
| yield { | |
| "type": "error", | |
| "step": step_num, | |
| "tool": tool_name, | |
| "error": str(step_error), | |
| "message": f"Step {step_num} failed with CrewAI" | |
| } | |
| break | |
| # Check if we completed all steps | |
| completed_steps = [c for c in components_executed if c.get("status") == "completed"] | |
| if len(completed_steps) == len(steps): | |
| # All steps completed | |
| final_output = _build_final_output(pipeline, components_executed, executor_used, "completed") | |
| yield { | |
| "type": "final", | |
| "data": final_output, | |
| "executor": executor_used | |
| } | |
| print(f"✅ CrewAI execution completed: {pipeline_name}") | |
| else: | |
| # Partial completion or failure | |
| final_output = _build_final_output(pipeline, components_executed, executor_used, "partial") | |
| final_output["error"] = f"Pipeline execution incomplete. Completed {len(completed_steps)} of {len(steps)} steps." | |
| yield { | |
| "type": "error", | |
| "error": "Pipeline execution incomplete", | |
| "data": final_output | |
| } | |
| print(f"⚠️ CrewAI execution incomplete for: {pipeline_name}") | |
| # ======================== | |
| # DYNAMIC STEP EXECUTION WITH MASTER_TOOLS | |
| # ======================== | |
| def _execute_step_with_master_tool( | |
| step_def: Dict[str, Any], | |
| file_path: str, | |
| step_num: int, | |
| total_steps: int, | |
| session_id: Optional[str] = None, | |
| prefer_bedrock: bool = True, | |
| previous_results: List[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute a pipeline step using master_tools. | |
| FIXED: Handle step_def with either 'tool_name' or 'tool' field | |
| """ | |
| import time | |
| import inspect | |
| # FIX: Get tool name from either 'tool_name' or 'tool' field | |
| tool_name = step_def.get("tool_name", step_def.get("tool", "unknown")) | |
| start_time = time.time() | |
| print(f" 🔨 Executing step {step_num}/{total_steps}: {tool_name}") | |
| print(f" Step definition: {step_def}") | |
| # Get tool from registry | |
| tool = get_tool_executor(tool_name) | |
| if not tool: | |
| error_msg = f"Tool '{tool_name}' not found in registry. Available tools: {list(TOOL_REGISTRY.keys())}" | |
| print(f" ❌ {error_msg}") | |
| raise ValueError(error_msg) | |
| # Prepare arguments | |
| args = {} | |
| # For StructuredTool (LangChain tools) | |
| if hasattr(tool, 'args_schema') and hasattr(tool, 'invoke'): | |
| # Get the args schema | |
| args_schema = tool.args_schema | |
| # Build arguments from step_def | |
| for field_name, field in args_schema.__fields__.items(): | |
| # Check if parameter is in step_def | |
| if field_name in step_def: | |
| args[field_name] = step_def[field_name] | |
| # Special handling for file_path | |
| elif field_name == "file_path" and file_path: | |
| args[field_name] = file_path | |
| # Special handling for session_id | |
| elif field_name == "session_id" and session_id: | |
| args[field_name] = session_id | |
| # Handle text parameter if not provided but we have previous output | |
| elif field_name == "text" and field_name not in step_def and previous_results: | |
| # Try to get text from previous step's output | |
| if step_num > 1 and len(previous_results) >= step_num - 1: | |
| prev_result = previous_results[step_num - 2].get("result") | |
| if isinstance(prev_result, dict) and "text" in prev_result: | |
| args["text"] = prev_result["text"] | |
| print(f" 📝 Using text from previous step: {args['text'][:100]}...") | |
| elif isinstance(prev_result, str): | |
| args["text"] = prev_result | |
| print(f" 📝 Using text from previous step: {args['text'][:100]}...") | |
| try: | |
| # Execute the tool | |
| print(f" 🚀 Invoking tool {tool_name} with args: {args}") | |
| output = tool.invoke(args) | |
| execution_time = time.time() - start_time | |
| print(f" ✅ Step {step_num} completed in {execution_time:.2f}s") | |
| return { | |
| "output": output, | |
| "executor": "bedrock" if prefer_bedrock else "crewai", | |
| "execution_time": execution_time, | |
| "tool_version": "master_tools_1.0", | |
| "args_used": list(args.keys()) | |
| } | |
| except Exception as e: | |
| # Try with minimal arguments | |
| print(f"⚠️ Tool {tool_name} failed with full args, trying minimal: {e}") | |
| # Try with just file_path if available | |
| if file_path and "file_path" in args_schema.__fields__: | |
| minimal_args = {"file_path": file_path} | |
| try: | |
| output = tool.invoke(minimal_args) | |
| execution_time = time.time() - start_time | |
| return { | |
| "output": output, | |
| "executor": "bedrock" if prefer_bedrock else "crewai", | |
| "execution_time": execution_time, | |
| "tool_version": "master_tools_1.0", | |
| "args_used": list(minimal_args.keys()), | |
| "warning": "Used minimal arguments" | |
| } | |
| except Exception as inner_error: | |
| raise RuntimeError(f"Tool '{tool_name}' failed with minimal args: {inner_error}") | |
| # For regular Python functions | |
| elif callable(tool): | |
| try: | |
| # Get function signature | |
| sig = inspect.signature(tool) | |
| # Build arguments based on signature | |
| call_args = {} | |
| for param_name, param in sig.parameters.items(): | |
| # Try step_def first | |
| if param_name in step_def: | |
| call_args[param_name] = step_def[param_name] | |
| # Special handling for file_path | |
| elif param_name == "file_path" and file_path: | |
| call_args[param_name] = file_path | |
| # Special handling for session_id | |
| elif param_name == "session_id" and session_id: | |
| call_args[param_name] = session_id | |
| # Handle text parameter | |
| elif param_name == "text" and param_name not in step_def and previous_results: | |
| # Try to get text from previous step | |
| if step_num > 1 and len(previous_results) >= step_num - 1: | |
| prev_result = previous_results[step_num - 2].get("result") | |
| if isinstance(prev_result, dict) and "text" in prev_result: | |
| call_args["text"] = prev_result["text"] | |
| elif isinstance(prev_result, str): | |
| call_args["text"] = prev_result | |
| # Execute the function | |
| print(f" 🚀 Calling function {tool_name} with args: {call_args}") | |
| output = tool(**call_args) | |
| execution_time = time.time() - start_time | |
| print(f" ✅ Step {step_num} completed in {execution_time:.2f}s") | |
| return { | |
| "output": output, | |
| "executor": "bedrock" if prefer_bedrock else "crewai", | |
| "execution_time": execution_time, | |
| "tool_version": "function_1.0", | |
| "args_used": list(call_args.keys()) | |
| } | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to execute function {tool_name}: {e}") | |
| else: | |
| raise ValueError(f"Tool '{tool_name}' is not callable or a valid StructuredTool") | |
| def _update_file_path(current_file_path: str, result: Dict[str, Any]) -> str: | |
| """ | |
| Update file path based on tool result. | |
| Some tools might generate new files. | |
| """ | |
| output = result.get("output") | |
| if isinstance(output, dict): | |
| # Check for file references in output | |
| for key in ["file_path", "output_file", "new_file", "generated_file"]: | |
| if key in output and isinstance(output[key], str): | |
| return output[key] | |
| return current_file_path | |
| # ======================== | |
| # HELPER FUNCTIONS | |
| # ======================== | |
| def _build_final_output( | |
| pipeline: Dict[str, Any], | |
| components_executed: List[Dict[str, Any]], | |
| executor_used: str, | |
| status: str | |
| ) -> Dict[str, Any]: | |
| """ | |
| Build final output with components_executed array. | |
| FIXED: Handle both component formats | |
| """ | |
| # Get steps count from pipeline | |
| steps = pipeline.get("pipeline_steps", pipeline.get("components", [])) | |
| # Find the finalize step result if present | |
| final_result = None | |
| for component in components_executed: | |
| if component.get("tool_name") == "finalize" or component.get("tool") == "finalize": | |
| final_result = component.get("result") | |
| break | |
| # Count completed steps | |
| completed_steps = len([c for c in components_executed if c.get("status") == "completed"]) | |
| final_output = { | |
| "pipeline_id": pipeline.get("pipeline_id"), | |
| "pipeline_name": pipeline.get("pipeline_name"), | |
| "status": status, | |
| "components_executed": components_executed, | |
| "executor": executor_used, | |
| "summary": f"Pipeline execution {status} with {executor_used}", | |
| "total_steps": len(steps), | |
| "completed_steps": completed_steps, | |
| "final_result": final_result | |
| } | |
| # Extract text for user-facing output | |
| if final_result: | |
| # Use finalize tool's output | |
| final_output["text"] = final_result | |
| elif components_executed: | |
| # Find last completed component with text | |
| for component in reversed(components_executed): | |
| if component.get("status") == "completed" and component.get("result"): | |
| result = component["result"] | |
| if isinstance(result, str): | |
| final_output["text"] = result | |
| break | |
| elif isinstance(result, dict): | |
| for field in ["text", "summary", "content", "translation", "output"]: | |
| if field in result and isinstance(result[field], str): | |
| final_output["text"] = result[field] | |
| break | |
| # If no text field found but dict has string values | |
| if "text" not in final_output: | |
| for key, value in result.items(): | |
| if isinstance(value, str) and len(value) > 10: | |
| final_output["text"] = value | |
| break | |
| return final_output | |
| # ======================== | |
| # NON-STREAMING EXECUTOR | |
| # ======================== | |
| def execute_pipeline( | |
| pipeline: Dict[str, Any], | |
| file_path: str, | |
| session_id: Optional[str] = None, | |
| prefer_bedrock: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute pipeline (non-streaming) with fallback | |
| """ | |
| final_result = None | |
| for event in execute_pipeline_streaming(pipeline, file_path, session_id, prefer_bedrock): | |
| if event.get("type") == "final": | |
| final_result = event.get("data") | |
| break | |
| elif event.get("type") == "error" and event.get("data"): | |
| final_result = event.get("data") | |
| break | |
| if final_result is None: | |
| final_result = { | |
| "pipeline_id": pipeline.get("pipeline_id"), | |
| "pipeline_name": pipeline.get("pipeline_name"), | |
| "status": "failed", | |
| "components_executed": [], | |
| "error": "Pipeline execution completed without final result" | |
| } | |
| return final_result | |
| if __name__ == "__main__": | |
| # Test | |
| test_pipeline = { | |
| "pipeline_name": "test-extraction", | |
| "components": [ | |
| { | |
| "tool_name": "extract_text", | |
| "start_page": 1, | |
| "end_page": 1, | |
| "params": {} | |
| } | |
| ], | |
| "_generator": "test" | |
| } | |
| test_file = "test.pdf" | |
| print("Testing streaming execution...") | |
| for event in execute_pipeline_streaming(test_pipeline, test_file): | |
| print(f"Event: {event}") | |