Spaces:
Sleeping
Sleeping
File size: 4,562 Bytes
6df13ef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# services/output_normalizer.py
"""
Output Normalizer - Ensures agentic output matches legacy pipeline schema exactly.
This guarantees downstream consumers are unaware of agentic vs legacy execution.
"""
from typing import Dict, Any, List
import logging
logger = logging.getLogger("agentic.normalizer")
class NormalizationError(Exception):
"""Raised when agentic output cannot be normalized to legacy schema."""
pass
def normalize_agentic_output(
agentic_summary: Dict[str, Any],
pipeline: Dict[str, Any]
) -> Dict[str, Any]:
"""
Normalize agentic execution summary to legacy pipeline output format.
Args:
agentic_summary: Output from MasterOrchestratorAgent.get_execution_summary()
pipeline: Original pipeline configuration
Returns:
Dict matching legacy execute_pipeline_streaming output format
Raises:
NormalizationError: If normalization fails (triggers fallback)
"""
try:
# Extract components from agentic messages
components_executed = _extract_components_from_messages(
agentic_summary.get("agent_messages", [])
)
# Determine status
rejections = agentic_summary.get("rejections", [])
verification = agentic_summary.get("verification", {})
if verification.get("overall_verdict") == "FAIL":
status = "failed"
elif rejections:
status = "completed_with_rejections"
else:
status = "completed"
# Build legacy-compatible output
normalized = {
"status": status,
"pipeline_id": pipeline.get("pipeline_id"),
"pipeline_name": pipeline.get("pipeline_name"),
"executor": "agentic_orchestration",
"components_executed": components_executed,
"summary": {
"total_tools_called": len(components_executed),
"tools": [c["tool_name"] for c in components_executed],
"plan_versions": len(agentic_summary.get("plan_versions", [])),
"rejections": len(rejections)
},
"agentic_metadata": {
"plan_versions": agentic_summary.get("plan_versions", []),
"rejections": rejections,
"total_messages": agentic_summary.get("total_messages", 0),
"verification": verification
}
}
logger.info(f"Normalized agentic output: {len(components_executed)} components, status={status}")
return normalized
except Exception as e:
logger.error(f"Normalization failed: {e}")
raise NormalizationError(f"Cannot normalize agentic output: {e}") from e
def _extract_components_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract component execution results from agent messages."""
components = []
# Group messages by agent
agent_tasks = {}
agent_responses = {}
for msg in messages:
if msg["message_type"] == "task":
agent_tasks[msg["to_agent"]] = msg
elif msg["message_type"] == "response":
agent_responses[msg["from_agent"]] = msg
# Build components from responses
for agent_name, response_msg in agent_responses.items():
content = response_msg.get("content", {})
component = {
"tool_name": agent_name,
"tool": agent_name,
"status": content.get("status", "unknown"),
"result": content,
"confidence": content.get("confidence", 0.0),
"executor": "agentic",
"message_id": response_msg.get("message_id"),
"timestamp": response_msg.get("timestamp")
}
components.append(component)
return components
def validate_legacy_compatibility(normalized_output: Dict[str, Any]) -> bool:
"""
Validate that normalized output has all required legacy fields.
Returns:
True if compatible, False otherwise
"""
required_fields = ["status", "pipeline_id", "pipeline_name", "components_executed", "summary"]
for field in required_fields:
if field not in normalized_output:
logger.error(f"Missing required field: {field}")
return False
if not isinstance(normalized_output["components_executed"], list):
logger.error("components_executed must be a list")
return False
return True
|