| """ |
| ARF Adapter Pattern for clean integration with real or mock ARF |
| """ |
| from abc import ABC, abstractmethod |
| from typing import Dict, Any, List, Optional |
| import asyncio |
| import logging |
| from config.settings import settings, ARFMode |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ARFAdapter(ABC): |
| """Abstract adapter for ARF integration""" |
| |
| @abstractmethod |
| async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]: |
| """Detect anomalies in metrics""" |
| pass |
| |
| @abstractmethod |
| async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """Recall similar incidents from memory""" |
| pass |
| |
| @abstractmethod |
| async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: |
| """Generate healing intent""" |
| pass |
| |
| @abstractmethod |
| async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: |
| """Complete analysis pipeline""" |
| pass |
|
|
|
|
| class MockARFAdapter(ARFAdapter): |
| """Mock ARF implementation for demo mode""" |
| |
| def __init__(self): |
| logger.info("Initializing MockARFAdapter") |
| |
| self._simulate_arf_analysis = None |
| self._run_rag_similarity_search = None |
| self._create_mock_healing_intent = None |
| self._calculate_pattern_confidence = None |
| |
| def _import_mock_functions(self): |
| """Lazy import of mock functions""" |
| if self._simulate_arf_analysis is None: |
| from demo.mock_arf import ( |
| simulate_arf_analysis, |
| run_rag_similarity_search, |
| create_mock_healing_intent, |
| calculate_pattern_confidence |
| ) |
| self._simulate_arf_analysis = simulate_arf_analysis |
| self._run_rag_similarity_search = run_rag_similarity_search |
| self._create_mock_healing_intent = create_mock_healing_intent |
| self._calculate_pattern_confidence = calculate_pattern_confidence |
| |
| async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]: |
| """Mock anomaly detection""" |
| self._import_mock_functions() |
| |
| |
| await asyncio.sleep(0.1) |
| |
| result = self._simulate_arf_analysis({"metrics": metrics}) |
| result["detection_method"] = "mock_ml_algorithm" |
| result["confidence"] = 0.987 |
| |
| return result |
| |
| async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """Mock RAG similarity search""" |
| self._import_mock_functions() |
| |
| |
| await asyncio.sleep(0.2) |
| |
| similar = self._run_rag_similarity_search(incident) |
| |
| |
| for item in similar: |
| item["source"] = "mock_rag_memory" |
| item["retrieval_time"] = "45ms" |
| |
| return similar |
| |
| async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: |
| """Mock decision making""" |
| self._import_mock_functions() |
| |
| |
| similar = context.get("similar_incidents") |
| if not similar: |
| similar = await self.recall(incident) |
| |
| |
| confidence = self._calculate_pattern_confidence(incident, similar) |
| |
| |
| intent = self._create_mock_healing_intent(incident, similar, confidence) |
| |
| |
| intent["safety_checks"] = { |
| "blast_radius": "2 services", |
| "business_hours": "compliant", |
| "rollback_plan": "available", |
| "approval_required": True |
| } |
| |
| return intent |
| |
| async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: |
| """Complete mock analysis pipeline""" |
| logger.info(f"Starting mock analysis for: {scenario_name}") |
| |
| |
| detection_result = await self.detect(scenario_data.get("metrics", {})) |
| |
| |
| recall_result = await self.recall(scenario_data) |
| |
| |
| decision_result = await self.decision( |
| scenario_data, |
| {"similar_incidents": recall_result} |
| ) |
| |
| return { |
| "scenario": scenario_name, |
| "detection": detection_result, |
| "recall": recall_result, |
| "decision": decision_result, |
| "overall_confidence": decision_result.get("confidence", 0.85), |
| "processing_time_ms": 450, |
| "agents_executed": ["detection", "recall", "decision"] |
| } |
|
|
|
|
| class RealARFAdapter(ARFAdapter): |
| """Real ARF integration (requires agentic-reliability-framework package)""" |
| |
| def __init__(self, api_key: Optional[str] = None): |
| logger.info("Initializing RealARFAdapter") |
| |
| try: |
| from agentic_reliability_framework import ARFClient |
| self.client = ARFClient(api_key=api_key or settings.arf_api_key) |
| self._available = True |
| except ImportError as e: |
| logger.error(f"Failed to import ARF package: {e}") |
| self._available = False |
| raise RuntimeError( |
| "Real ARF integration requires 'agentic-reliability-framework' package. " |
| "Install with: pip install agentic-reliability-framework" |
| ) |
| |
| async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]: |
| """Real anomaly detection""" |
| if not self._available: |
| raise RuntimeError("ARF client not available") |
| |
| try: |
| |
| result = await self.client.detect_anomaly_async(metrics) |
| return result |
| except AttributeError: |
| |
| result = self.client.detect_anomaly(metrics) |
| return result |
| |
| async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """Real RAG similarity search""" |
| if not self._available: |
| raise RuntimeError("ARF client not available") |
| |
| try: |
| |
| result = await self.client.recall_similar_async(incident) |
| return result |
| except AttributeError: |
| |
| result = self.client.recall_similar(incident) |
| return result |
| |
| async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: |
| """Real decision making""" |
| if not self._available: |
| raise RuntimeError("ARF client not available") |
| |
| try: |
| |
| result = await self.client.generate_intent_async(incident, context) |
| return result |
| except AttributeError: |
| |
| result = self.client.generate_intent(incident, context) |
| return result |
| |
| async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: |
| """Complete real analysis pipeline""" |
| logger.info(f"Starting real analysis for: {scenario_name}") |
| |
| |
| detection_task = asyncio.create_task(self.detect(scenario_data.get("metrics", {}))) |
| recall_task = asyncio.create_task(self.recall(scenario_data)) |
| |
| detection_result, recall_result = await asyncio.gather(detection_task, recall_task) |
| |
| |
| decision_result = await self.decision( |
| scenario_data, |
| {"similar_incidents": recall_result} |
| ) |
| |
| return { |
| "scenario": scenario_name, |
| "detection": detection_result, |
| "recall": recall_result, |
| "decision": decision_result, |
| "overall_confidence": decision_result.get("confidence", 0.85), |
| "processing_time_ms": 250, |
| "agents_executed": ["detection", "recall", "decision"] |
| } |
|
|
|
|
| def get_arf_adapter() -> ARFAdapter: |
| """ |
| Factory function to get appropriate ARF adapter based on settings |
| |
| Returns: |
| ARFAdapter instance |
| """ |
| if settings.arf_mode == ARFMode.DEMO or settings.use_mock_arf: |
| logger.info("Using MockARFAdapter (demo mode)") |
| return MockARFAdapter() |
| elif settings.arf_mode == ARFMode.OSS: |
| logger.info("Using RealARFAdapter (OSS mode)") |
| return RealARFAdapter() |
| elif settings.arf_mode == ARFMode.ENTERPRISE: |
| logger.info("Using RealARFAdapter (Enterprise mode)") |
| return RealARFAdapter() |
| else: |
| logger.warning("Unknown ARF mode, falling back to mock") |
| return MockARFAdapter() |
|
|
|
|
| |
| async def analyze_scenario_async(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: |
| """Convenience function for async scenario analysis""" |
| adapter = get_arf_adapter() |
| return await adapter.analyze(scenario_name, scenario_data) |
|
|
|
|
| |
| def analyze_scenario_sync(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: |
| """Sync wrapper for scenario analysis""" |
| import asyncio |
| |
| async def _analyze(): |
| return await analyze_scenario_async(scenario_name, scenario_data) |
| |
| try: |
| loop = asyncio.get_running_loop() |
| |
| return asyncio.create_task(_analyze()) |
| except RuntimeError: |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| return loop.run_until_complete(_analyze()) |
| finally: |
| loop.close() |