masterllm / services /agentic_orchestrator_wrapper.py
stellar413's picture
Added fixed agent to agent communication
6df13ef
# services/agentic_orchestrator_wrapper.py
"""
Agentic Orchestrator Wrapper - COMPLETE ISOLATION from pipeline_executor.
This module provides the ONLY interface for agentic orchestration.
NO internal agent details are exposed.
"""
import os
import logging
from typing import Dict, Any, Generator, Optional
from services.output_normalizer import normalize_agentic_output, NormalizationError
from services.agentic_integration_logger import (
log_agentic_execution,
log_fallback_trigger
)
logger = logging.getLogger("agentic.wrapper")
def execute_with_agentic_orchestration(
pipeline: Dict[str, Any],
file_path: str,
session_id: Optional[str] = None
) -> Generator[Dict[str, Any], None, None]:
"""
Execute pipeline using agentic orchestration - ISOLATED WRAPPER.
This function:
1. Instantiates MasterOrchestratorAgent
2. Translates pipeline → agent tasks
3. Executes delegation & message passing
4. Normalizes output to legacy format
5. Falls back on ANY failure
Args:
pipeline: Pipeline configuration
file_path: Path to file being processed
session_id: Optional session identifier
Yields:
Events in LEGACY-COMPATIBLE format
Raises:
Any exception triggers immediate fallback (caught by caller)
"""
try:
# Import here to avoid circular dependencies and keep isolation
from services.agents.master_orchestrator import MasterOrchestratorAgent
logger.info(f"Initializing agentic orchestration for session {session_id}")
# Yield status
yield {
"type": "status",
"message": "Initializing agentic orchestration...",
"executor": "agentic"
}
# Create master orchestrator
master = MasterOrchestratorAgent()
# PHASE 1: Create initial plan
plan_description = f"Execute pipeline: {pipeline.get('pipeline_name', 'unnamed')}"
context = {
"pipeline": pipeline,
"file_path": file_path,
"session_id": session_id
}
plan_v1 = master.create_plan(plan_description, context)
yield {
"type": "status",
"message": f"Plan v{plan_v1['version']} created",
"executor": "agentic"
}
# PHASE 2: Delegate to agents based on pipeline components
components = pipeline.get("components", pipeline.get("pipeline_steps", []))
if not components:
raise ValueError("No components found in pipeline")
for idx, component in enumerate(components, 1):
tool_name = component.get("tool_name", component.get("tool", "unknown"))
yield {
"type": "step",
"step": idx,
"tool": tool_name,
"status": "executing",
"executor": "agentic"
}
# Prepare task input
task_input = {
"filename": os.path.basename(file_path),
"temp_files": {os.path.basename(file_path): file_path},
"start_page": component.get("start_page", 1),
"end_page": component.get("end_page", 1)
}
# Delegate to agent
response = master.delegate_task(
agent_name=tool_name,
task_description=f"Execute {tool_name} on {os.path.basename(file_path)}",
task_input=task_input
)
# Evaluate response
evaluation = master.evaluate_response(response)
yield {
"type": "step",
"step": idx,
"tool": tool_name,
"status": "completed" if evaluation["accepted"] else "rejected",
"confidence": evaluation["confidence"],
"executor": "agentic"
}
# Handle rejection (MANDATORY: at least one rejection for demo)
if not evaluation["accepted"]:
# Reject output
master.reject_output(
agent_name=tool_name,
message_id=response.message_id,
reason=evaluation["reason"]
)
yield {
"type": "rejection",
"agent": tool_name,
"reason": evaluation["reason"],
"executor": "agentic"
}
# Modify plan
plan_v2 = master.modify_plan(
description=f"Adjusted plan after {tool_name} rejection",
reason=evaluation["reason"],
modifications=[f"Skip or retry {tool_name}"]
)
yield {
"type": "replan",
"from_version": plan_v1["version"],
"to_version": plan_v2["version"],
"reason": evaluation["reason"],
"executor": "agentic"
}
# Get execution summary
summary = master.get_execution_summary()
# Normalize to legacy format
normalized = normalize_agentic_output(summary, pipeline)
# Validate compatibility
from services.output_normalizer import validate_legacy_compatibility
if not validate_legacy_compatibility(normalized):
raise NormalizationError("Output validation failed")
# Log success
log_agentic_execution(
session_id=session_id or "unknown",
pipeline=pipeline,
agentic_summary=summary,
result="success"
)
# Yield final result
yield {
"type": "final",
"data": normalized,
"executor": "agentic"
}
logger.info(f"Agentic orchestration completed successfully for session {session_id}")
except Exception as e:
# Log fallback trigger
log_fallback_trigger(
session_id=session_id or "unknown",
reason="Agentic execution failed",
exception=e
)
# Re-raise to trigger fallback in caller
raise