Spaces:
Sleeping
Sleeping
| # services/output_normalizer.py | |
| """ | |
| Output Normalizer - Ensures agentic output matches legacy pipeline schema exactly. | |
| This guarantees downstream consumers are unaware of agentic vs legacy execution. | |
| """ | |
| from typing import Dict, Any, List | |
| import logging | |
| logger = logging.getLogger("agentic.normalizer") | |
| class NormalizationError(Exception): | |
| """Raised when agentic output cannot be normalized to legacy schema.""" | |
| pass | |
| def normalize_agentic_output( | |
| agentic_summary: Dict[str, Any], | |
| pipeline: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Normalize agentic execution summary to legacy pipeline output format. | |
| Args: | |
| agentic_summary: Output from MasterOrchestratorAgent.get_execution_summary() | |
| pipeline: Original pipeline configuration | |
| Returns: | |
| Dict matching legacy execute_pipeline_streaming output format | |
| Raises: | |
| NormalizationError: If normalization fails (triggers fallback) | |
| """ | |
| try: | |
| # Extract components from agentic messages | |
| components_executed = _extract_components_from_messages( | |
| agentic_summary.get("agent_messages", []) | |
| ) | |
| # Determine status | |
| rejections = agentic_summary.get("rejections", []) | |
| verification = agentic_summary.get("verification", {}) | |
| if verification.get("overall_verdict") == "FAIL": | |
| status = "failed" | |
| elif rejections: | |
| status = "completed_with_rejections" | |
| else: | |
| status = "completed" | |
| # Build legacy-compatible output | |
| normalized = { | |
| "status": status, | |
| "pipeline_id": pipeline.get("pipeline_id"), | |
| "pipeline_name": pipeline.get("pipeline_name"), | |
| "executor": "agentic_orchestration", | |
| "components_executed": components_executed, | |
| "summary": { | |
| "total_tools_called": len(components_executed), | |
| "tools": [c["tool_name"] for c in components_executed], | |
| "plan_versions": len(agentic_summary.get("plan_versions", [])), | |
| "rejections": len(rejections) | |
| }, | |
| "agentic_metadata": { | |
| "plan_versions": agentic_summary.get("plan_versions", []), | |
| "rejections": rejections, | |
| "total_messages": agentic_summary.get("total_messages", 0), | |
| "verification": verification | |
| } | |
| } | |
| logger.info(f"Normalized agentic output: {len(components_executed)} components, status={status}") | |
| return normalized | |
| except Exception as e: | |
| logger.error(f"Normalization failed: {e}") | |
| raise NormalizationError(f"Cannot normalize agentic output: {e}") from e | |
| def _extract_components_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Extract component execution results from agent messages.""" | |
| components = [] | |
| # Group messages by agent | |
| agent_tasks = {} | |
| agent_responses = {} | |
| for msg in messages: | |
| if msg["message_type"] == "task": | |
| agent_tasks[msg["to_agent"]] = msg | |
| elif msg["message_type"] == "response": | |
| agent_responses[msg["from_agent"]] = msg | |
| # Build components from responses | |
| for agent_name, response_msg in agent_responses.items(): | |
| content = response_msg.get("content", {}) | |
| component = { | |
| "tool_name": agent_name, | |
| "tool": agent_name, | |
| "status": content.get("status", "unknown"), | |
| "result": content, | |
| "confidence": content.get("confidence", 0.0), | |
| "executor": "agentic", | |
| "message_id": response_msg.get("message_id"), | |
| "timestamp": response_msg.get("timestamp") | |
| } | |
| components.append(component) | |
| return components | |
| def validate_legacy_compatibility(normalized_output: Dict[str, Any]) -> bool: | |
| """ | |
| Validate that normalized output has all required legacy fields. | |
| Returns: | |
| True if compatible, False otherwise | |
| """ | |
| required_fields = ["status", "pipeline_id", "pipeline_name", "components_executed", "summary"] | |
| for field in required_fields: | |
| if field not in normalized_output: | |
| logger.error(f"Missing required field: {field}") | |
| return False | |
| if not isinstance(normalized_output["components_executed"], list): | |
| logger.error("components_executed must be a list") | |
| return False | |
| return True | |