masterllm / services /output_normalizer.py
stellar413's picture
Added fixed agent to agent communication
6df13ef
# services/output_normalizer.py
"""
Output Normalizer - Ensures agentic output matches legacy pipeline schema exactly.
This guarantees downstream consumers are unaware of agentic vs legacy execution.
"""
from typing import Dict, Any, List
import logging
logger = logging.getLogger("agentic.normalizer")
class NormalizationError(Exception):
"""Raised when agentic output cannot be normalized to legacy schema."""
pass
def normalize_agentic_output(
agentic_summary: Dict[str, Any],
pipeline: Dict[str, Any]
) -> Dict[str, Any]:
"""
Normalize agentic execution summary to legacy pipeline output format.
Args:
agentic_summary: Output from MasterOrchestratorAgent.get_execution_summary()
pipeline: Original pipeline configuration
Returns:
Dict matching legacy execute_pipeline_streaming output format
Raises:
NormalizationError: If normalization fails (triggers fallback)
"""
try:
# Extract components from agentic messages
components_executed = _extract_components_from_messages(
agentic_summary.get("agent_messages", [])
)
# Determine status
rejections = agentic_summary.get("rejections", [])
verification = agentic_summary.get("verification", {})
if verification.get("overall_verdict") == "FAIL":
status = "failed"
elif rejections:
status = "completed_with_rejections"
else:
status = "completed"
# Build legacy-compatible output
normalized = {
"status": status,
"pipeline_id": pipeline.get("pipeline_id"),
"pipeline_name": pipeline.get("pipeline_name"),
"executor": "agentic_orchestration",
"components_executed": components_executed,
"summary": {
"total_tools_called": len(components_executed),
"tools": [c["tool_name"] for c in components_executed],
"plan_versions": len(agentic_summary.get("plan_versions", [])),
"rejections": len(rejections)
},
"agentic_metadata": {
"plan_versions": agentic_summary.get("plan_versions", []),
"rejections": rejections,
"total_messages": agentic_summary.get("total_messages", 0),
"verification": verification
}
}
logger.info(f"Normalized agentic output: {len(components_executed)} components, status={status}")
return normalized
except Exception as e:
logger.error(f"Normalization failed: {e}")
raise NormalizationError(f"Cannot normalize agentic output: {e}") from e
def _extract_components_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract component execution results from agent messages."""
components = []
# Group messages by agent
agent_tasks = {}
agent_responses = {}
for msg in messages:
if msg["message_type"] == "task":
agent_tasks[msg["to_agent"]] = msg
elif msg["message_type"] == "response":
agent_responses[msg["from_agent"]] = msg
# Build components from responses
for agent_name, response_msg in agent_responses.items():
content = response_msg.get("content", {})
component = {
"tool_name": agent_name,
"tool": agent_name,
"status": content.get("status", "unknown"),
"result": content,
"confidence": content.get("confidence", 0.0),
"executor": "agentic",
"message_id": response_msg.get("message_id"),
"timestamp": response_msg.get("timestamp")
}
components.append(component)
return components
def validate_legacy_compatibility(normalized_output: Dict[str, Any]) -> bool:
"""
Validate that normalized output has all required legacy fields.
Returns:
True if compatible, False otherwise
"""
required_fields = ["status", "pipeline_id", "pipeline_name", "components_executed", "summary"]
for field in required_fields:
if field not in normalized_output:
logger.error(f"Missing required field: {field}")
return False
if not isinstance(normalized_output["components_executed"], list):
logger.error("components_executed must be a list")
return False
return True