File size: 4,562 Bytes
6df13ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# services/output_normalizer.py
"""
Output Normalizer - Ensures agentic output matches legacy pipeline schema exactly.

This guarantees downstream consumers are unaware of agentic vs legacy execution.
"""
from typing import Dict, Any, List
import logging

logger = logging.getLogger("agentic.normalizer")


class NormalizationError(Exception):
    """Raised when agentic output cannot be normalized to legacy schema."""
    pass


def normalize_agentic_output(
    agentic_summary: Dict[str, Any],
    pipeline: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Normalize agentic execution summary to legacy pipeline output format.
    
    Args:
        agentic_summary: Output from MasterOrchestratorAgent.get_execution_summary()
        pipeline: Original pipeline configuration
        
    Returns:
        Dict matching legacy execute_pipeline_streaming output format
        
    Raises:
        NormalizationError: If normalization fails (triggers fallback)
    """
    try:
        # Extract components from agentic messages
        components_executed = _extract_components_from_messages(
            agentic_summary.get("agent_messages", [])
        )
        
        # Determine status
        rejections = agentic_summary.get("rejections", [])
        verification = agentic_summary.get("verification", {})
        
        if verification.get("overall_verdict") == "FAIL":
            status = "failed"
        elif rejections:
            status = "completed_with_rejections"
        else:
            status = "completed"
        
        # Build legacy-compatible output
        normalized = {
            "status": status,
            "pipeline_id": pipeline.get("pipeline_id"),
            "pipeline_name": pipeline.get("pipeline_name"),
            "executor": "agentic_orchestration",
            "components_executed": components_executed,
            "summary": {
                "total_tools_called": len(components_executed),
                "tools": [c["tool_name"] for c in components_executed],
                "plan_versions": len(agentic_summary.get("plan_versions", [])),
                "rejections": len(rejections)
            },
            "agentic_metadata": {
                "plan_versions": agentic_summary.get("plan_versions", []),
                "rejections": rejections,
                "total_messages": agentic_summary.get("total_messages", 0),
                "verification": verification
            }
        }
        
        logger.info(f"Normalized agentic output: {len(components_executed)} components, status={status}")
        
        return normalized
        
    except Exception as e:
        logger.error(f"Normalization failed: {e}")
        raise NormalizationError(f"Cannot normalize agentic output: {e}") from e


def _extract_components_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Extract component execution results from agent messages."""
    components = []
    
    # Group messages by agent
    agent_tasks = {}
    agent_responses = {}
    
    for msg in messages:
        if msg["message_type"] == "task":
            agent_tasks[msg["to_agent"]] = msg
        elif msg["message_type"] == "response":
            agent_responses[msg["from_agent"]] = msg
    
    # Build components from responses
    for agent_name, response_msg in agent_responses.items():
        content = response_msg.get("content", {})
        
        component = {
            "tool_name": agent_name,
            "tool": agent_name,
            "status": content.get("status", "unknown"),
            "result": content,
            "confidence": content.get("confidence", 0.0),
            "executor": "agentic",
            "message_id": response_msg.get("message_id"),
            "timestamp": response_msg.get("timestamp")
        }
        
        components.append(component)
    
    return components


def validate_legacy_compatibility(normalized_output: Dict[str, Any]) -> bool:
    """
    Validate that normalized output has all required legacy fields.
    
    Returns:
        True if compatible, False otherwise
    """
    required_fields = ["status", "pipeline_id", "pipeline_name", "components_executed", "summary"]
    
    for field in required_fields:
        if field not in normalized_output:
            logger.error(f"Missing required field: {field}")
            return False
    
    if not isinstance(normalized_output["components_executed"], list):
        logger.error("components_executed must be a list")
        return False
    
    return True