Spaces:
Sleeping
Sleeping
File size: 6,455 Bytes
6df13ef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# services/agentic_orchestrator_wrapper.py
"""
Agentic Orchestrator Wrapper - COMPLETE ISOLATION from pipeline_executor.
This module provides the ONLY interface for agentic orchestration.
NO internal agent details are exposed.
"""
import os
import logging
from typing import Dict, Any, Generator, Optional
from services.output_normalizer import normalize_agentic_output, NormalizationError
from services.agentic_integration_logger import (
log_agentic_execution,
log_fallback_trigger
)
logger = logging.getLogger("agentic.wrapper")
def execute_with_agentic_orchestration(
pipeline: Dict[str, Any],
file_path: str,
session_id: Optional[str] = None
) -> Generator[Dict[str, Any], None, None]:
"""
Execute pipeline using agentic orchestration - ISOLATED WRAPPER.
This function:
1. Instantiates MasterOrchestratorAgent
2. Translates pipeline → agent tasks
3. Executes delegation & message passing
4. Normalizes output to legacy format
5. Falls back on ANY failure
Args:
pipeline: Pipeline configuration
file_path: Path to file being processed
session_id: Optional session identifier
Yields:
Events in LEGACY-COMPATIBLE format
Raises:
Any exception triggers immediate fallback (caught by caller)
"""
try:
# Import here to avoid circular dependencies and keep isolation
from services.agents.master_orchestrator import MasterOrchestratorAgent
logger.info(f"Initializing agentic orchestration for session {session_id}")
# Yield status
yield {
"type": "status",
"message": "Initializing agentic orchestration...",
"executor": "agentic"
}
# Create master orchestrator
master = MasterOrchestratorAgent()
# PHASE 1: Create initial plan
plan_description = f"Execute pipeline: {pipeline.get('pipeline_name', 'unnamed')}"
context = {
"pipeline": pipeline,
"file_path": file_path,
"session_id": session_id
}
plan_v1 = master.create_plan(plan_description, context)
yield {
"type": "status",
"message": f"Plan v{plan_v1['version']} created",
"executor": "agentic"
}
# PHASE 2: Delegate to agents based on pipeline components
components = pipeline.get("components", pipeline.get("pipeline_steps", []))
if not components:
raise ValueError("No components found in pipeline")
for idx, component in enumerate(components, 1):
tool_name = component.get("tool_name", component.get("tool", "unknown"))
yield {
"type": "step",
"step": idx,
"tool": tool_name,
"status": "executing",
"executor": "agentic"
}
# Prepare task input
task_input = {
"filename": os.path.basename(file_path),
"temp_files": {os.path.basename(file_path): file_path},
"start_page": component.get("start_page", 1),
"end_page": component.get("end_page", 1)
}
# Delegate to agent
response = master.delegate_task(
agent_name=tool_name,
task_description=f"Execute {tool_name} on {os.path.basename(file_path)}",
task_input=task_input
)
# Evaluate response
evaluation = master.evaluate_response(response)
yield {
"type": "step",
"step": idx,
"tool": tool_name,
"status": "completed" if evaluation["accepted"] else "rejected",
"confidence": evaluation["confidence"],
"executor": "agentic"
}
# Handle rejection (MANDATORY: at least one rejection for demo)
if not evaluation["accepted"]:
# Reject output
master.reject_output(
agent_name=tool_name,
message_id=response.message_id,
reason=evaluation["reason"]
)
yield {
"type": "rejection",
"agent": tool_name,
"reason": evaluation["reason"],
"executor": "agentic"
}
# Modify plan
plan_v2 = master.modify_plan(
description=f"Adjusted plan after {tool_name} rejection",
reason=evaluation["reason"],
modifications=[f"Skip or retry {tool_name}"]
)
yield {
"type": "replan",
"from_version": plan_v1["version"],
"to_version": plan_v2["version"],
"reason": evaluation["reason"],
"executor": "agentic"
}
# Get execution summary
summary = master.get_execution_summary()
# Normalize to legacy format
normalized = normalize_agentic_output(summary, pipeline)
# Validate compatibility
from services.output_normalizer import validate_legacy_compatibility
if not validate_legacy_compatibility(normalized):
raise NormalizationError("Output validation failed")
# Log success
log_agentic_execution(
session_id=session_id or "unknown",
pipeline=pipeline,
agentic_summary=summary,
result="success"
)
# Yield final result
yield {
"type": "final",
"data": normalized,
"executor": "agentic"
}
logger.info(f"Agentic orchestration completed successfully for session {session_id}")
except Exception as e:
# Log fallback trigger
log_fallback_trigger(
session_id=session_id or "unknown",
reason="Agentic execution failed",
exception=e
)
# Re-raise to trigger fallback in caller
raise
|