File size: 10,042 Bytes
5d54760
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""
ARF Adapter Pattern for clean integration with real or mock ARF
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
import asyncio
import logging
from config.settings import settings, ARFMode

logger = logging.getLogger(__name__)


class ARFAdapter(ABC):
    """Abstract adapter for ARF integration"""
    
    @abstractmethod
    async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
        """Detect anomalies in metrics"""
        pass
    
    @abstractmethod
    async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Recall similar incidents from memory"""
        pass
    
    @abstractmethod
    async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        """Generate healing intent"""
        pass
    
    @abstractmethod
    async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
        """Complete analysis pipeline"""
        pass


class MockARFAdapter(ARFAdapter):
    """Mock ARF implementation for demo mode"""
    
    def __init__(self):
        logger.info("Initializing MockARFAdapter")
        # Lazy imports to avoid circular dependencies
        self._simulate_arf_analysis = None
        self._run_rag_similarity_search = None
        self._create_mock_healing_intent = None
        self._calculate_pattern_confidence = None
    
    def _import_mock_functions(self):
        """Lazy import of mock functions"""
        if self._simulate_arf_analysis is None:
            from demo.mock_arf import (
                simulate_arf_analysis,
                run_rag_similarity_search,
                create_mock_healing_intent,
                calculate_pattern_confidence
            )
            self._simulate_arf_analysis = simulate_arf_analysis
            self._run_rag_similarity_search = run_rag_similarity_search
            self._create_mock_healing_intent = create_mock_healing_intent
            self._calculate_pattern_confidence = calculate_pattern_confidence
    
    async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
        """Mock anomaly detection"""
        self._import_mock_functions()
        
        # Simulate processing time
        await asyncio.sleep(0.1)
        
        result = self._simulate_arf_analysis({"metrics": metrics})
        result["detection_method"] = "mock_ml_algorithm"
        result["confidence"] = 0.987  # 98.7%
        
        return result
    
    async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Mock RAG similarity search"""
        self._import_mock_functions()
        
        # Simulate processing time
        await asyncio.sleep(0.2)
        
        similar = self._run_rag_similarity_search(incident)
        
        # Enhance with additional metadata
        for item in similar:
            item["source"] = "mock_rag_memory"
            item["retrieval_time"] = "45ms"
        
        return similar
    
    async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        """Mock decision making"""
        self._import_mock_functions()
        
        # Get similar incidents from context or recall them
        similar = context.get("similar_incidents")
        if not similar:
            similar = await self.recall(incident)
        
        # Calculate confidence
        confidence = self._calculate_pattern_confidence(incident, similar)
        
        # Generate healing intent
        intent = self._create_mock_healing_intent(incident, similar, confidence)
        
        # Add safety check
        intent["safety_checks"] = {
            "blast_radius": "2 services",
            "business_hours": "compliant",
            "rollback_plan": "available",
            "approval_required": True
        }
        
        return intent
    
    async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
        """Complete mock analysis pipeline"""
        logger.info(f"Starting mock analysis for: {scenario_name}")
        
        # Step 1: Detection
        detection_result = await self.detect(scenario_data.get("metrics", {}))
        
        # Step 2: Recall
        recall_result = await self.recall(scenario_data)
        
        # Step 3: Decision
        decision_result = await self.decision(
            scenario_data,
            {"similar_incidents": recall_result}
        )
        
        return {
            "scenario": scenario_name,
            "detection": detection_result,
            "recall": recall_result,
            "decision": decision_result,
            "overall_confidence": decision_result.get("confidence", 0.85),
            "processing_time_ms": 450,
            "agents_executed": ["detection", "recall", "decision"]
        }


class RealARFAdapter(ARFAdapter):
    """Real ARF integration (requires agentic-reliability-framework package)"""
    
    def __init__(self, api_key: Optional[str] = None):
        logger.info("Initializing RealARFAdapter")
        
        try:
            from agentic_reliability_framework import ARFClient
            self.client = ARFClient(api_key=api_key or settings.arf_api_key)
            self._available = True
        except ImportError as e:
            logger.error(f"Failed to import ARF package: {e}")
            self._available = False
            raise RuntimeError(
                "Real ARF integration requires 'agentic-reliability-framework' package. "
                "Install with: pip install agentic-reliability-framework"
            )
    
    async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
        """Real anomaly detection"""
        if not self._available:
            raise RuntimeError("ARF client not available")
        
        try:
            # Assuming async API
            result = await self.client.detect_anomaly_async(metrics)
            return result
        except AttributeError:
            # Fallback to sync if async not available
            result = self.client.detect_anomaly(metrics)
            return result
    
    async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Real RAG similarity search"""
        if not self._available:
            raise RuntimeError("ARF client not available")
        
        try:
            # Assuming async API
            result = await self.client.recall_similar_async(incident)
            return result
        except AttributeError:
            # Fallback to sync
            result = self.client.recall_similar(incident)
            return result
    
    async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        """Real decision making"""
        if not self._available:
            raise RuntimeError("ARF client not available")
        
        try:
            # Assuming async API
            result = await self.client.generate_intent_async(incident, context)
            return result
        except AttributeError:
            # Fallback to sync
            result = self.client.generate_intent(incident, context)
            return result
    
    async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
        """Complete real analysis pipeline"""
        logger.info(f"Starting real analysis for: {scenario_name}")
        
        # Run agents in parallel where possible
        detection_task = asyncio.create_task(self.detect(scenario_data.get("metrics", {})))
        recall_task = asyncio.create_task(self.recall(scenario_data))
        
        detection_result, recall_result = await asyncio.gather(detection_task, recall_task)
        
        # Decision depends on recall results
        decision_result = await self.decision(
            scenario_data,
            {"similar_incidents": recall_result}
        )
        
        return {
            "scenario": scenario_name,
            "detection": detection_result,
            "recall": recall_result,
            "decision": decision_result,
            "overall_confidence": decision_result.get("confidence", 0.85),
            "processing_time_ms": 250,  # Real system should be faster
            "agents_executed": ["detection", "recall", "decision"]
        }


def get_arf_adapter() -> ARFAdapter:
    """
    Factory function to get appropriate ARF adapter based on settings
    
    Returns:
        ARFAdapter instance
    """
    if settings.arf_mode == ARFMode.DEMO or settings.use_mock_arf:
        logger.info("Using MockARFAdapter (demo mode)")
        return MockARFAdapter()
    elif settings.arf_mode == ARFMode.OSS:
        logger.info("Using RealARFAdapter (OSS mode)")
        return RealARFAdapter()
    elif settings.arf_mode == ARFMode.ENTERPRISE:
        logger.info("Using RealARFAdapter (Enterprise mode)")
        return RealARFAdapter()
    else:
        logger.warning("Unknown ARF mode, falling back to mock")
        return MockARFAdapter()


# Async helper for easy integration
async def analyze_scenario_async(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
    """Convenience function for async scenario analysis"""
    adapter = get_arf_adapter()
    return await adapter.analyze(scenario_name, scenario_data)


# Sync wrapper for compatibility
def analyze_scenario_sync(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
    """Sync wrapper for scenario analysis"""
    import asyncio
    
    async def _analyze():
        return await analyze_scenario_async(scenario_name, scenario_data)
    
    try:
        loop = asyncio.get_running_loop()
        # Already in async context, create task
        return asyncio.create_task(_analyze())
    except RuntimeError:
        # Create new event loop
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            return loop.run_until_complete(_analyze())
        finally:
            loop.close()