# services/agentic_orchestrator_wrapper.py """ Agentic Orchestrator Wrapper - COMPLETE ISOLATION from pipeline_executor. This module provides the ONLY interface for agentic orchestration. NO internal agent details are exposed. """ import os import logging from typing import Dict, Any, Generator, Optional from services.output_normalizer import normalize_agentic_output, NormalizationError from services.agentic_integration_logger import ( log_agentic_execution, log_fallback_trigger ) logger = logging.getLogger("agentic.wrapper") def execute_with_agentic_orchestration( pipeline: Dict[str, Any], file_path: str, session_id: Optional[str] = None ) -> Generator[Dict[str, Any], None, None]: """ Execute pipeline using agentic orchestration - ISOLATED WRAPPER. This function: 1. Instantiates MasterOrchestratorAgent 2. Translates pipeline → agent tasks 3. Executes delegation & message passing 4. Normalizes output to legacy format 5. Falls back on ANY failure Args: pipeline: Pipeline configuration file_path: Path to file being processed session_id: Optional session identifier Yields: Events in LEGACY-COMPATIBLE format Raises: Any exception triggers immediate fallback (caught by caller) """ try: # Import here to avoid circular dependencies and keep isolation from services.agents.master_orchestrator import MasterOrchestratorAgent logger.info(f"Initializing agentic orchestration for session {session_id}") # Yield status yield { "type": "status", "message": "Initializing agentic orchestration...", "executor": "agentic" } # Create master orchestrator master = MasterOrchestratorAgent() # PHASE 1: Create initial plan plan_description = f"Execute pipeline: {pipeline.get('pipeline_name', 'unnamed')}" context = { "pipeline": pipeline, "file_path": file_path, "session_id": session_id } plan_v1 = master.create_plan(plan_description, context) yield { "type": "status", "message": f"Plan v{plan_v1['version']} created", "executor": "agentic" } # PHASE 2: Delegate to agents based on pipeline components components = pipeline.get("components", pipeline.get("pipeline_steps", [])) if not components: raise ValueError("No components found in pipeline") for idx, component in enumerate(components, 1): tool_name = component.get("tool_name", component.get("tool", "unknown")) yield { "type": "step", "step": idx, "tool": tool_name, "status": "executing", "executor": "agentic" } # Prepare task input task_input = { "filename": os.path.basename(file_path), "temp_files": {os.path.basename(file_path): file_path}, "start_page": component.get("start_page", 1), "end_page": component.get("end_page", 1) } # Delegate to agent response = master.delegate_task( agent_name=tool_name, task_description=f"Execute {tool_name} on {os.path.basename(file_path)}", task_input=task_input ) # Evaluate response evaluation = master.evaluate_response(response) yield { "type": "step", "step": idx, "tool": tool_name, "status": "completed" if evaluation["accepted"] else "rejected", "confidence": evaluation["confidence"], "executor": "agentic" } # Handle rejection (MANDATORY: at least one rejection for demo) if not evaluation["accepted"]: # Reject output master.reject_output( agent_name=tool_name, message_id=response.message_id, reason=evaluation["reason"] ) yield { "type": "rejection", "agent": tool_name, "reason": evaluation["reason"], "executor": "agentic" } # Modify plan plan_v2 = master.modify_plan( description=f"Adjusted plan after {tool_name} rejection", reason=evaluation["reason"], modifications=[f"Skip or retry {tool_name}"] ) yield { "type": "replan", "from_version": plan_v1["version"], "to_version": plan_v2["version"], "reason": evaluation["reason"], "executor": "agentic" } # Get execution summary summary = master.get_execution_summary() # Normalize to legacy format normalized = normalize_agentic_output(summary, pipeline) # Validate compatibility from services.output_normalizer import validate_legacy_compatibility if not validate_legacy_compatibility(normalized): raise NormalizationError("Output validation failed") # Log success log_agentic_execution( session_id=session_id or "unknown", pipeline=pipeline, agentic_summary=summary, result="success" ) # Yield final result yield { "type": "final", "data": normalized, "executor": "agentic" } logger.info(f"Agentic orchestration completed successfully for session {session_id}") except Exception as e: # Log fallback trigger log_fallback_trigger( session_id=session_id or "unknown", reason="Agentic execution failed", exception=e ) # Re-raise to trigger fallback in caller raise