petter2025's picture
Create arf_adapter.py
5d54760 verified
raw
history blame
10 kB
"""
ARF Adapter Pattern for clean integration with real or mock ARF
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
import asyncio
import logging
from config.settings import settings, ARFMode
logger = logging.getLogger(__name__)
class ARFAdapter(ABC):
"""Abstract adapter for ARF integration"""
@abstractmethod
async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Detect anomalies in metrics"""
pass
@abstractmethod
async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Recall similar incidents from memory"""
pass
@abstractmethod
async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Generate healing intent"""
pass
@abstractmethod
async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""Complete analysis pipeline"""
pass
class MockARFAdapter(ARFAdapter):
"""Mock ARF implementation for demo mode"""
def __init__(self):
logger.info("Initializing MockARFAdapter")
# Lazy imports to avoid circular dependencies
self._simulate_arf_analysis = None
self._run_rag_similarity_search = None
self._create_mock_healing_intent = None
self._calculate_pattern_confidence = None
def _import_mock_functions(self):
"""Lazy import of mock functions"""
if self._simulate_arf_analysis is None:
from demo.mock_arf import (
simulate_arf_analysis,
run_rag_similarity_search,
create_mock_healing_intent,
calculate_pattern_confidence
)
self._simulate_arf_analysis = simulate_arf_analysis
self._run_rag_similarity_search = run_rag_similarity_search
self._create_mock_healing_intent = create_mock_healing_intent
self._calculate_pattern_confidence = calculate_pattern_confidence
async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Mock anomaly detection"""
self._import_mock_functions()
# Simulate processing time
await asyncio.sleep(0.1)
result = self._simulate_arf_analysis({"metrics": metrics})
result["detection_method"] = "mock_ml_algorithm"
result["confidence"] = 0.987 # 98.7%
return result
async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Mock RAG similarity search"""
self._import_mock_functions()
# Simulate processing time
await asyncio.sleep(0.2)
similar = self._run_rag_similarity_search(incident)
# Enhance with additional metadata
for item in similar:
item["source"] = "mock_rag_memory"
item["retrieval_time"] = "45ms"
return similar
async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Mock decision making"""
self._import_mock_functions()
# Get similar incidents from context or recall them
similar = context.get("similar_incidents")
if not similar:
similar = await self.recall(incident)
# Calculate confidence
confidence = self._calculate_pattern_confidence(incident, similar)
# Generate healing intent
intent = self._create_mock_healing_intent(incident, similar, confidence)
# Add safety check
intent["safety_checks"] = {
"blast_radius": "2 services",
"business_hours": "compliant",
"rollback_plan": "available",
"approval_required": True
}
return intent
async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""Complete mock analysis pipeline"""
logger.info(f"Starting mock analysis for: {scenario_name}")
# Step 1: Detection
detection_result = await self.detect(scenario_data.get("metrics", {}))
# Step 2: Recall
recall_result = await self.recall(scenario_data)
# Step 3: Decision
decision_result = await self.decision(
scenario_data,
{"similar_incidents": recall_result}
)
return {
"scenario": scenario_name,
"detection": detection_result,
"recall": recall_result,
"decision": decision_result,
"overall_confidence": decision_result.get("confidence", 0.85),
"processing_time_ms": 450,
"agents_executed": ["detection", "recall", "decision"]
}
class RealARFAdapter(ARFAdapter):
"""Real ARF integration (requires agentic-reliability-framework package)"""
def __init__(self, api_key: Optional[str] = None):
logger.info("Initializing RealARFAdapter")
try:
from agentic_reliability_framework import ARFClient
self.client = ARFClient(api_key=api_key or settings.arf_api_key)
self._available = True
except ImportError as e:
logger.error(f"Failed to import ARF package: {e}")
self._available = False
raise RuntimeError(
"Real ARF integration requires 'agentic-reliability-framework' package. "
"Install with: pip install agentic-reliability-framework"
)
async def detect(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Real anomaly detection"""
if not self._available:
raise RuntimeError("ARF client not available")
try:
# Assuming async API
result = await self.client.detect_anomaly_async(metrics)
return result
except AttributeError:
# Fallback to sync if async not available
result = self.client.detect_anomaly(metrics)
return result
async def recall(self, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Real RAG similarity search"""
if not self._available:
raise RuntimeError("ARF client not available")
try:
# Assuming async API
result = await self.client.recall_similar_async(incident)
return result
except AttributeError:
# Fallback to sync
result = self.client.recall_similar(incident)
return result
async def decide(self, incident: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Real decision making"""
if not self._available:
raise RuntimeError("ARF client not available")
try:
# Assuming async API
result = await self.client.generate_intent_async(incident, context)
return result
except AttributeError:
# Fallback to sync
result = self.client.generate_intent(incident, context)
return result
async def analyze(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""Complete real analysis pipeline"""
logger.info(f"Starting real analysis for: {scenario_name}")
# Run agents in parallel where possible
detection_task = asyncio.create_task(self.detect(scenario_data.get("metrics", {})))
recall_task = asyncio.create_task(self.recall(scenario_data))
detection_result, recall_result = await asyncio.gather(detection_task, recall_task)
# Decision depends on recall results
decision_result = await self.decision(
scenario_data,
{"similar_incidents": recall_result}
)
return {
"scenario": scenario_name,
"detection": detection_result,
"recall": recall_result,
"decision": decision_result,
"overall_confidence": decision_result.get("confidence", 0.85),
"processing_time_ms": 250, # Real system should be faster
"agents_executed": ["detection", "recall", "decision"]
}
def get_arf_adapter() -> ARFAdapter:
"""
Factory function to get appropriate ARF adapter based on settings
Returns:
ARFAdapter instance
"""
if settings.arf_mode == ARFMode.DEMO or settings.use_mock_arf:
logger.info("Using MockARFAdapter (demo mode)")
return MockARFAdapter()
elif settings.arf_mode == ARFMode.OSS:
logger.info("Using RealARFAdapter (OSS mode)")
return RealARFAdapter()
elif settings.arf_mode == ARFMode.ENTERPRISE:
logger.info("Using RealARFAdapter (Enterprise mode)")
return RealARFAdapter()
else:
logger.warning("Unknown ARF mode, falling back to mock")
return MockARFAdapter()
# Async helper for easy integration
async def analyze_scenario_async(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""Convenience function for async scenario analysis"""
adapter = get_arf_adapter()
return await adapter.analyze(scenario_name, scenario_data)
# Sync wrapper for compatibility
def analyze_scenario_sync(scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""Sync wrapper for scenario analysis"""
import asyncio
async def _analyze():
return await analyze_scenario_async(scenario_name, scenario_data)
try:
loop = asyncio.get_running_loop()
# Already in async context, create task
return asyncio.create_task(_analyze())
except RuntimeError:
# Create new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(_analyze())
finally:
loop.close()