# services/output_normalizer.py """ Output Normalizer - Ensures agentic output matches legacy pipeline schema exactly. This guarantees downstream consumers are unaware of agentic vs legacy execution. """ from typing import Dict, Any, List import logging logger = logging.getLogger("agentic.normalizer") class NormalizationError(Exception): """Raised when agentic output cannot be normalized to legacy schema.""" pass def normalize_agentic_output( agentic_summary: Dict[str, Any], pipeline: Dict[str, Any] ) -> Dict[str, Any]: """ Normalize agentic execution summary to legacy pipeline output format. Args: agentic_summary: Output from MasterOrchestratorAgent.get_execution_summary() pipeline: Original pipeline configuration Returns: Dict matching legacy execute_pipeline_streaming output format Raises: NormalizationError: If normalization fails (triggers fallback) """ try: # Extract components from agentic messages components_executed = _extract_components_from_messages( agentic_summary.get("agent_messages", []) ) # Determine status rejections = agentic_summary.get("rejections", []) verification = agentic_summary.get("verification", {}) if verification.get("overall_verdict") == "FAIL": status = "failed" elif rejections: status = "completed_with_rejections" else: status = "completed" # Build legacy-compatible output normalized = { "status": status, "pipeline_id": pipeline.get("pipeline_id"), "pipeline_name": pipeline.get("pipeline_name"), "executor": "agentic_orchestration", "components_executed": components_executed, "summary": { "total_tools_called": len(components_executed), "tools": [c["tool_name"] for c in components_executed], "plan_versions": len(agentic_summary.get("plan_versions", [])), "rejections": len(rejections) }, "agentic_metadata": { "plan_versions": agentic_summary.get("plan_versions", []), "rejections": rejections, "total_messages": agentic_summary.get("total_messages", 0), "verification": verification } } logger.info(f"Normalized agentic output: {len(components_executed)} components, status={status}") return normalized except Exception as e: logger.error(f"Normalization failed: {e}") raise NormalizationError(f"Cannot normalize agentic output: {e}") from e def _extract_components_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Extract component execution results from agent messages.""" components = [] # Group messages by agent agent_tasks = {} agent_responses = {} for msg in messages: if msg["message_type"] == "task": agent_tasks[msg["to_agent"]] = msg elif msg["message_type"] == "response": agent_responses[msg["from_agent"]] = msg # Build components from responses for agent_name, response_msg in agent_responses.items(): content = response_msg.get("content", {}) component = { "tool_name": agent_name, "tool": agent_name, "status": content.get("status", "unknown"), "result": content, "confidence": content.get("confidence", 0.0), "executor": "agentic", "message_id": response_msg.get("message_id"), "timestamp": response_msg.get("timestamp") } components.append(component) return components def validate_legacy_compatibility(normalized_output: Dict[str, Any]) -> bool: """ Validate that normalized output has all required legacy fields. Returns: True if compatible, False otherwise """ required_fields = ["status", "pipeline_id", "pipeline_name", "components_executed", "summary"] for field in required_fields: if field not in normalized_output: logger.error(f"Missing required field: {field}") return False if not isinstance(normalized_output["components_executed"], list): logger.error("components_executed must be a list") return False return True