Spaces:
Sleeping
Sleeping
| # services/agentic_orchestrator_wrapper.py | |
| """ | |
| Agentic Orchestrator Wrapper - COMPLETE ISOLATION from pipeline_executor. | |
| This module provides the ONLY interface for agentic orchestration. | |
| NO internal agent details are exposed. | |
| """ | |
| import os | |
| import logging | |
| from typing import Dict, Any, Generator, Optional | |
| from services.output_normalizer import normalize_agentic_output, NormalizationError | |
| from services.agentic_integration_logger import ( | |
| log_agentic_execution, | |
| log_fallback_trigger | |
| ) | |
| logger = logging.getLogger("agentic.wrapper") | |
| def execute_with_agentic_orchestration( | |
| pipeline: Dict[str, Any], | |
| file_path: str, | |
| session_id: Optional[str] = None | |
| ) -> Generator[Dict[str, Any], None, None]: | |
| """ | |
| Execute pipeline using agentic orchestration - ISOLATED WRAPPER. | |
| This function: | |
| 1. Instantiates MasterOrchestratorAgent | |
| 2. Translates pipeline → agent tasks | |
| 3. Executes delegation & message passing | |
| 4. Normalizes output to legacy format | |
| 5. Falls back on ANY failure | |
| Args: | |
| pipeline: Pipeline configuration | |
| file_path: Path to file being processed | |
| session_id: Optional session identifier | |
| Yields: | |
| Events in LEGACY-COMPATIBLE format | |
| Raises: | |
| Any exception triggers immediate fallback (caught by caller) | |
| """ | |
| try: | |
| # Import here to avoid circular dependencies and keep isolation | |
| from services.agents.master_orchestrator import MasterOrchestratorAgent | |
| logger.info(f"Initializing agentic orchestration for session {session_id}") | |
| # Yield status | |
| yield { | |
| "type": "status", | |
| "message": "Initializing agentic orchestration...", | |
| "executor": "agentic" | |
| } | |
| # Create master orchestrator | |
| master = MasterOrchestratorAgent() | |
| # PHASE 1: Create initial plan | |
| plan_description = f"Execute pipeline: {pipeline.get('pipeline_name', 'unnamed')}" | |
| context = { | |
| "pipeline": pipeline, | |
| "file_path": file_path, | |
| "session_id": session_id | |
| } | |
| plan_v1 = master.create_plan(plan_description, context) | |
| yield { | |
| "type": "status", | |
| "message": f"Plan v{plan_v1['version']} created", | |
| "executor": "agentic" | |
| } | |
| # PHASE 2: Delegate to agents based on pipeline components | |
| components = pipeline.get("components", pipeline.get("pipeline_steps", [])) | |
| if not components: | |
| raise ValueError("No components found in pipeline") | |
| for idx, component in enumerate(components, 1): | |
| tool_name = component.get("tool_name", component.get("tool", "unknown")) | |
| yield { | |
| "type": "step", | |
| "step": idx, | |
| "tool": tool_name, | |
| "status": "executing", | |
| "executor": "agentic" | |
| } | |
| # Prepare task input | |
| task_input = { | |
| "filename": os.path.basename(file_path), | |
| "temp_files": {os.path.basename(file_path): file_path}, | |
| "start_page": component.get("start_page", 1), | |
| "end_page": component.get("end_page", 1) | |
| } | |
| # Delegate to agent | |
| response = master.delegate_task( | |
| agent_name=tool_name, | |
| task_description=f"Execute {tool_name} on {os.path.basename(file_path)}", | |
| task_input=task_input | |
| ) | |
| # Evaluate response | |
| evaluation = master.evaluate_response(response) | |
| yield { | |
| "type": "step", | |
| "step": idx, | |
| "tool": tool_name, | |
| "status": "completed" if evaluation["accepted"] else "rejected", | |
| "confidence": evaluation["confidence"], | |
| "executor": "agentic" | |
| } | |
| # Handle rejection (MANDATORY: at least one rejection for demo) | |
| if not evaluation["accepted"]: | |
| # Reject output | |
| master.reject_output( | |
| agent_name=tool_name, | |
| message_id=response.message_id, | |
| reason=evaluation["reason"] | |
| ) | |
| yield { | |
| "type": "rejection", | |
| "agent": tool_name, | |
| "reason": evaluation["reason"], | |
| "executor": "agentic" | |
| } | |
| # Modify plan | |
| plan_v2 = master.modify_plan( | |
| description=f"Adjusted plan after {tool_name} rejection", | |
| reason=evaluation["reason"], | |
| modifications=[f"Skip or retry {tool_name}"] | |
| ) | |
| yield { | |
| "type": "replan", | |
| "from_version": plan_v1["version"], | |
| "to_version": plan_v2["version"], | |
| "reason": evaluation["reason"], | |
| "executor": "agentic" | |
| } | |
| # Get execution summary | |
| summary = master.get_execution_summary() | |
| # Normalize to legacy format | |
| normalized = normalize_agentic_output(summary, pipeline) | |
| # Validate compatibility | |
| from services.output_normalizer import validate_legacy_compatibility | |
| if not validate_legacy_compatibility(normalized): | |
| raise NormalizationError("Output validation failed") | |
| # Log success | |
| log_agentic_execution( | |
| session_id=session_id or "unknown", | |
| pipeline=pipeline, | |
| agentic_summary=summary, | |
| result="success" | |
| ) | |
| # Yield final result | |
| yield { | |
| "type": "final", | |
| "data": normalized, | |
| "executor": "agentic" | |
| } | |
| logger.info(f"Agentic orchestration completed successfully for session {session_id}") | |
| except Exception as e: | |
| # Log fallback trigger | |
| log_fallback_trigger( | |
| session_id=session_id or "unknown", | |
| reason="Agentic execution failed", | |
| exception=e | |
| ) | |
| # Re-raise to trigger fallback in caller | |
| raise | |