""" ARF Adapter Pattern for clean integration with real or mock ARF """ from abc import ABC, abstractmethod from typing import Dict, Any, List, Optional import asyncio import logging from config.settings import settings, ARFMode logger = logging.getLogger(__name__) class ARFAdapter(ABC): """Abstract adapter for ARF integration""" @abstractmethod async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]: """Detect anomalies in metrics""" pass @abstractmethod async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]: """Recall similar incidents from memory""" pass @abstractmethod async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """Generate healing intent""" pass @abstractmethod async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: """Complete analysis pipeline""" pass class MockARFAdapter(ARFAdapter): """Mock ARF implementation for demo mode""" def __init__(self): logger.info("Initializing MockARFAdapter") # Lazy imports to avoid circular dependencies self._simulate_arf_analysis = None self._run_rag_similarity_search = None self._create_mock_healing_intent = None self._calculate_pattern_confidence = None def _import_mock_functions(self): """Lazy import of mock functions""" if self._simulate_arf_analysis is None: from demo.mock_arf import ( simulate_arf_analysis, run_rag_similarity_search, create_mock_healing_intent, calculate_pattern_confidence ) self._simulate_arf_analysis = simulate_arf_analysis self._run_rag_similarity_search = run_rag_similarity_search self._create_mock_healing_intent = create_mock_healing_intent self._calculate_pattern_confidence = calculate_pattern_confidence async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]: """Mock anomaly detection""" self._import_mock_functions() # Simulate processing time await asyncio.sleep(0.1) result = self._simulate_arf_analysis({"metrics": metrics}) result["detection_method"] = "mock_ml_algorithm" result["confidence"] = 0.987 # 98.7% return result async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]: """Mock RAG similarity search""" self._import_mock_functions() # Simulate processing time await asyncio.sleep(0.2) similar = self._run_rag_similarity_search(incident) # Enhance with additional metadata for item in similar: item["source"] = "mock_rag_memory" item["retrieval_time"] = "45ms" return similar async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """Mock decision making""" self._import_mock_functions() # Get similar incidents from context or recall them similar = context.get("similar_incidents") if not similar: similar = await self.recall(incident) # Calculate confidence confidence = self._calculate_pattern_confidence(incident, similar) # Generate healing intent intent = self._create_mock_healing_intent(incident, similar, confidence) # Add safety check intent["safety_checks"] = { "blast_radius": "2 services", "business_hours": "compliant", "rollback_plan": "available", "approval_required": True } return intent async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: """Complete mock analysis pipeline""" logger.info(f"Starting mock analysis for: {scenario_name}") # Step 1: Detection detection_result = await self.detect(scenario_data.get("metrics", {})) # Step 2: Recall recall_result = await self.recall(scenario_data) # Step 3: Decision decision_result = await self.decision( scenario_data, {"similar_incidents": recall_result} ) return { "scenario": scenario_name, "detection": detection_result, "recall": recall_result, "decision": decision_result, "overall_confidence": decision_result.get("confidence", 0.85), "processing_time_ms": 450, "agents_executed": ["detection", "recall", "decision"] } class RealARFAdapter(ARFAdapter): """Real ARF integration (requires agentic-reliability-framework package)""" def __init__(self, api_key: Optional[str] = None): logger.info("Initializing RealARFAdapter") try: from agentic_reliability_framework import ARFClient self.client = ARFClient(api_key=api_key or settings.arf_api_key) self._available = True except ImportError as e: logger.error(f"Failed to import ARF package: {e}") self._available = False raise RuntimeError( "Real ARF integration requires 'agentic-reliability-framework' package. " "Install with: pip install agentic-reliability-framework" ) async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]: """Real anomaly detection""" if not self._available: raise RuntimeError("ARF client not available") try: # Assuming async API result = await self.client.detect_anomaly_async(metrics) return result except AttributeError: # Fallback to sync if async not available result = self.client.detect_anomaly(metrics) return result async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]: """Real RAG similarity search""" if not self._available: raise RuntimeError("ARF client not available") try: # Assuming async API result = await self.client.recall_similar_async(incident) return result except AttributeError: # Fallback to sync result = self.client.recall_similar(incident) return result async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """Real decision making""" if not self._available: raise RuntimeError("ARF client not available") try: # Assuming async API result = await self.client.generate_intent_async(incident, context) return result except AttributeError: # Fallback to sync result = self.client.generate_intent(incident, context) return result async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: """Complete real analysis pipeline""" logger.info(f"Starting real analysis for: {scenario_name}") # Run agents in parallel where possible detection_task = asyncio.create_task(self.detect(scenario_data.get("metrics", {}))) recall_task = asyncio.create_task(self.recall(scenario_data)) detection_result, recall_result = await asyncio.gather(detection_task, recall_task) # Decision depends on recall results decision_result = await self.decision( scenario_data, {"similar_incidents": recall_result} ) return { "scenario": scenario_name, "detection": detection_result, "recall": recall_result, "decision": decision_result, "overall_confidence": decision_result.get("confidence", 0.85), "processing_time_ms": 250, # Real system should be faster "agents_executed": ["detection", "recall", "decision"] } def get_arf_adapter() -> ARFAdapter: """ Factory function to get appropriate ARF adapter based on settings Returns: ARFAdapter instance """ if settings.arf_mode == ARFMode.DEMO or settings.use_mock_arf: logger.info("Using MockARFAdapter (demo mode)") return MockARFAdapter() elif settings.arf_mode == ARFMode.OSS: logger.info("Using RealARFAdapter (OSS mode)") return RealARFAdapter() elif settings.arf_mode == ARFMode.ENTERPRISE: logger.info("Using RealARFAdapter (Enterprise mode)") return RealARFAdapter() else: logger.warning("Unknown ARF mode, falling back to mock") return MockARFAdapter() # Async helper for easy integration async def analyze_scenario_async(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: """Convenience function for async scenario analysis""" adapter = get_arf_adapter() return await adapter.analyze(scenario_name, scenario_data) # Sync wrapper for compatibility def analyze_scenario_sync(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: """Sync wrapper for scenario analysis""" import asyncio async def _analyze(): return await analyze_scenario_async(scenario_name, scenario_data) try: loop = asyncio.get_running_loop() # Already in async context, create task return asyncio.create_task(_analyze()) except RuntimeError: # Create new event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(_analyze()) finally: loop.close()