File size: 6,455 Bytes
6df13ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# services/agentic_orchestrator_wrapper.py
"""
Agentic Orchestrator Wrapper - COMPLETE ISOLATION from pipeline_executor.

This module provides the ONLY interface for agentic orchestration.
NO internal agent details are exposed.
"""
import os
import logging
from typing import Dict, Any, Generator, Optional

from services.output_normalizer import normalize_agentic_output, NormalizationError
from services.agentic_integration_logger import (
    log_agentic_execution,
    log_fallback_trigger
)

logger = logging.getLogger("agentic.wrapper")


def execute_with_agentic_orchestration(
    pipeline: Dict[str, Any],
    file_path: str,
    session_id: Optional[str] = None
) -> Generator[Dict[str, Any], None, None]:
    """
    Execute pipeline using agentic orchestration - ISOLATED WRAPPER.
    
    This function:
    1. Instantiates MasterOrchestratorAgent
    2. Translates pipeline → agent tasks
    3. Executes delegation & message passing
    4. Normalizes output to legacy format
    5. Falls back on ANY failure
    
    Args:
        pipeline: Pipeline configuration
        file_path: Path to file being processed
        session_id: Optional session identifier
        
    Yields:
        Events in LEGACY-COMPATIBLE format
        
    Raises:
        Any exception triggers immediate fallback (caught by caller)
    """
    try:
        # Import here to avoid circular dependencies and keep isolation
        from services.agents.master_orchestrator import MasterOrchestratorAgent
        
        logger.info(f"Initializing agentic orchestration for session {session_id}")
        
        # Yield status
        yield {
            "type": "status",
            "message": "Initializing agentic orchestration...",
            "executor": "agentic"
        }
        
        # Create master orchestrator
        master = MasterOrchestratorAgent()
        
        # PHASE 1: Create initial plan
        plan_description = f"Execute pipeline: {pipeline.get('pipeline_name', 'unnamed')}"
        context = {
            "pipeline": pipeline,
            "file_path": file_path,
            "session_id": session_id
        }
        
        plan_v1 = master.create_plan(plan_description, context)
        
        yield {
            "type": "status",
            "message": f"Plan v{plan_v1['version']} created",
            "executor": "agentic"
        }
        
        # PHASE 2: Delegate to agents based on pipeline components
        components = pipeline.get("components", pipeline.get("pipeline_steps", []))
        
        if not components:
            raise ValueError("No components found in pipeline")
        
        for idx, component in enumerate(components, 1):
            tool_name = component.get("tool_name", component.get("tool", "unknown"))
            
            yield {
                "type": "step",
                "step": idx,
                "tool": tool_name,
                "status": "executing",
                "executor": "agentic"
            }
            
            # Prepare task input
            task_input = {
                "filename": os.path.basename(file_path),
                "temp_files": {os.path.basename(file_path): file_path},
                "start_page": component.get("start_page", 1),
                "end_page": component.get("end_page", 1)
            }
            
            # Delegate to agent
            response = master.delegate_task(
                agent_name=tool_name,
                task_description=f"Execute {tool_name} on {os.path.basename(file_path)}",
                task_input=task_input
            )
            
            # Evaluate response
            evaluation = master.evaluate_response(response)
            
            yield {
                "type": "step",
                "step": idx,
                "tool": tool_name,
                "status": "completed" if evaluation["accepted"] else "rejected",
                "confidence": evaluation["confidence"],
                "executor": "agentic"
            }
            
            # Handle rejection (MANDATORY: at least one rejection for demo)
            if not evaluation["accepted"]:
                # Reject output
                master.reject_output(
                    agent_name=tool_name,
                    message_id=response.message_id,
                    reason=evaluation["reason"]
                )
                
                yield {
                    "type": "rejection",
                    "agent": tool_name,
                    "reason": evaluation["reason"],
                    "executor": "agentic"
                }
                
                # Modify plan
                plan_v2 = master.modify_plan(
                    description=f"Adjusted plan after {tool_name} rejection",
                    reason=evaluation["reason"],
                    modifications=[f"Skip or retry {tool_name}"]
                )
                
                yield {
                    "type": "replan",
                    "from_version": plan_v1["version"],
                    "to_version": plan_v2["version"],
                    "reason": evaluation["reason"],
                    "executor": "agentic"
                }
        
        # Get execution summary
        summary = master.get_execution_summary()
        
        # Normalize to legacy format
        normalized = normalize_agentic_output(summary, pipeline)
        
        # Validate compatibility
        from services.output_normalizer import validate_legacy_compatibility
        if not validate_legacy_compatibility(normalized):
            raise NormalizationError("Output validation failed")
        
        # Log success
        log_agentic_execution(
            session_id=session_id or "unknown",
            pipeline=pipeline,
            agentic_summary=summary,
            result="success"
        )
        
        # Yield final result
        yield {
            "type": "final",
            "data": normalized,
            "executor": "agentic"
        }
        
        logger.info(f"Agentic orchestration completed successfully for session {session_id}")
        
    except Exception as e:
        # Log fallback trigger
        log_fallback_trigger(
            session_id=session_id or "unknown",
            reason="Agentic execution failed",
            exception=e
        )
        
        # Re-raise to trigger fallback in caller
        raise