petter2025's picture
Update demo/mock_arf.py
23c10b8 verified
raw
history blame
19.1 kB
"""
Enhanced Mock ARF components for demo purposes
In production, these would use the real agentic-reliability-framework package
"""
import time
import json
import hashlib
from typing import Dict, Any, List, Optional
import random
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class MockARFSimulator:
"""Enhanced mock ARF simulator with realistic patterns"""
def __init__(self, seed: Optional[int] = None):
self.seed = seed or int(time.time())
random.seed(self.seed)
self._incident_patterns = self._initialize_patterns()
self._healing_actions = self._initialize_healing_actions()
def _initialize_patterns(self) -> Dict[str, Dict[str, Any]]:
"""Initialize realistic incident patterns"""
return {
"cache_miss_storm": {
"pattern": "exponential_miss_increase",
"indicators": ["cache_hit_rate < 30%", "database_load > 80%", "response_time > 1500ms"],
"typical_causes": ["key_eviction", "cold_cache", "traffic_spike"],
"resolution_patterns": ["scale_out", "cache_warming", "ttl_optimization"]
},
"db_connection_exhaustion": {
"pattern": "connection_pool_saturation",
"indicators": ["active_connections > 95%", "connection_wait > 30s", "query_timeout_rate > 10%"],
"typical_causes": ["connection_leak", "slow_queries", "connection_pool_misconfig"],
"resolution_patterns": ["pool_tuning", "query_optimization", "circuit_breaker"]
},
"memory_leak": {
"pattern": "gradual_memory_increase",
"indicators": ["memory_usage > 90%", "gc_frequency_high", "restart_count_increasing"],
"typical_causes": ["object_retention", "resource_leak", "cache_growth"],
"resolution_patterns": ["heap_analysis", "restart", "memory_limit"]
},
"api_rate_limit": {
"pattern": "rate_limit_cascade",
"indicators": ["429_rate > 40%", "retry_storm", "cascade_failures"],
"typical_causes": ["burst_traffic", "misconfigured_limits", "retry_logic"],
"resolution_patterns": ["backoff_strategy", "circuit_breaker", "cache_responses"]
}
}
def _initialize_healing_actions(self) -> Dict[str, Dict[str, Any]]:
"""Initialize healing actions with success rates"""
return {
"scale_out": {
"action": "increase_capacity",
"success_rate": 0.87,
"typical_recovery_time": "5-15 minutes",
"risk_level": "low",
"prerequisites": ["capacity_available", "auto_scaling_enabled"]
},
"cache_warming": {
"action": "preload_cache",
"success_rate": 0.72,
"typical_recovery_time": "2-10 minutes",
"risk_level": "very_low",
"prerequisites": ["predictive_model", "cache_pattern_known"]
},
"restart_container": {
"action": "graceful_restart",
"success_rate": 0.95,
"typical_recovery_time": "1-3 minutes",
"risk_level": "medium",
"prerequisites": ["health_checks", "load_balancer", "redundancy"]
},
"circuit_breaker": {
"action": "fail_fast_protection",
"success_rate": 0.89,
"typical_recovery_time": "instant",
"risk_level": "low",
"prerequisites": ["dependency_awareness", "fallback_strategy"]
}
}
def simulate_arf_analysis(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate ARF analysis pipeline with enhanced realism"""
component = scenario.get('component', 'unknown')
pattern_name = self._detect_pattern(component, scenario)
return {
"analysis_complete": True,
"anomaly_detected": True,
"severity": self._determine_severity(scenario),
"root_cause": scenario.get('root_cause', 'resource_constraint'),
"pattern_detected": True,
"pattern_name": pattern_name,
"pattern_confidence": self._calculate_pattern_confidence(pattern_name),
"detection_method": "ensemble_ml_model",
"detection_time_ms": random.randint(150, 350),
"analysis_timestamp": time.time(),
"processing_time_ms": random.randint(200, 500),
"model_version": "arf-ml-v3.3.6",
"features_analyzed": self._extract_features(scenario)
}
def run_rag_similarity_search(self, scenario: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Simulate RAG similarity search with realistic data"""
component = scenario.get('component', 'redis_cache')
pattern_name = self._detect_pattern(component, scenario)
# Generate realistic similar incidents
similar_incidents = []
base_time = time.time()
for i in range(random.randint(3, 5)):
days_ago = random.randint(1, 90)
incident_time = base_time - (days_ago * 86400)
similarity = random.uniform(0.75, 0.95)
success = similarity > 0.82
incident = {
"incident_id": f"inc_{int(incident_time)}_{i}",
"component": component,
"pattern": pattern_name,
"similarity_score": similarity,
"cosine_similarity": similarity,
"success": success,
"resolution": self._get_recommended_action(component),
"actions_taken": self._get_action_sequence(component, success),
"resolution_time_minutes": random.uniform(3.5, 18.5),
"timestamp": incident_time,
"occurred_at": datetime.fromtimestamp(incident_time).isoformat(),
"engineers_involved": random.randint(1, 3),
"blast_radius": f"{random.randint(1, 5)} services",
"root_cause_analysis": self._generate_root_cause(component)
}
if success:
cost_saved = random.randint(1500, 12500)
incident["cost_savings"] = cost_saved
incident["mttr_reduction"] = f"{random.randint(60, 85)}%"
incident["user_impact"] = f"{random.randint(85, 99)}% reduction"
similar_incidents.append(incident)
# Sort by similarity
similar_incidents.sort(key=lambda x: x['similarity_score'], reverse=True)
# Add RAG metadata
rag_metadata = {
"vector_db": "chroma_v0.4.0",
"embedding_model": "all-MiniLM-L6-v2",
"index_size": f"{random.randint(500, 5000)} incidents",
"retrieval_time_ms": random.randint(45, 120),
"top_k": len(similar_incidents)
}
for incident in similar_incidents:
incident["rag_metadata"] = rag_metadata
return similar_incidents
def calculate_pattern_confidence(self, scenario: Dict[str, Any],
similar_incidents: List[Dict[str, Any]]) -> float:
"""Calculate pattern detection confidence with enhanced logic"""
if not similar_incidents:
return 0.70 # Base confidence without similar incidents
# Base confidence from pattern matching
component = scenario.get('component', 'unknown')
pattern_name = self._detect_pattern(component, scenario)
base_confidence = self._calculate_pattern_confidence(pattern_name)
# Boost based on number of similar incidents
incident_count = len(similar_incidents)
incident_boost = min(0.15, incident_count * 0.025)
# Boost based on average similarity
avg_similarity = sum(i['similarity_score'] for i in similar_incidents) / incident_count
similarity_boost = avg_similarity * 0.12
# Boost based on success rate
success_count = sum(1 for i in similar_incidents if i['success'])
success_rate = success_count / incident_count
success_boost = success_rate * 0.10
# Boost based on recency (weight recent incidents more)
recency_boost = self._calculate_recency_boost(similar_incidents)
total_confidence = (
base_confidence +
incident_boost +
similarity_boost +
success_boost +
recency_boost
)
# Cap at 0.98 and ensure minimum
return max(0.70, min(0.98, total_confidence))
def create_mock_healing_intent(self, scenario: Dict[str, Any],
similar_incidents: List[Dict[str, Any]],
confidence: float = 0.85) -> Dict[str, Any]:
"""Create a realistic mock HealingIntent object"""
component = scenario.get('component', 'redis_cache')
pattern_name = self._detect_pattern(component, scenario)
# Determine action based on component and pattern
action_info = self._determine_healing_action(component, pattern_name)
# Generate deterministic ID
params_hash = hashlib.md5(
json.dumps(action_info['parameters'], sort_keys=True).encode()
).hexdigest()[:8]
# Calculate RAG similarity metrics
rag_metrics = self._calculate_rag_metrics(similar_incidents)
# Create healing intent
healing_intent = {
"action": action_info['action'],
"component": component,
"pattern": pattern_name,
"parameters": action_info['parameters'],
"justification": action_info['justification'],
"confidence": confidence,
"incident_id": f"inc_{int(time.time())}",
"detected_at": time.time(),
"similar_incidents_count": len(similar_incidents),
"rag_similarity_score": rag_metrics['avg_similarity'],
"rag_metrics": rag_metrics,
"source": "oss_analysis",
"intent_id": f"intent_{int(time.time())}_{params_hash}",
"created_at": time.time(),
"status": "created",
"edition": "community",
"requires_enterprise": True,
"execution_allowed": False,
"safety_checks": {
"blast_radius": f"{random.randint(1, 3)} services",
"business_hours": "compliant",
"rollback_plan": "available",
"approval_required": True,
"risk_assessment": "low",
"compliance_check": "passed"
},
"expected_outcome": {
"recovery_time_minutes": action_info['recovery_time'],
"success_probability": action_info['success_rate'],
"cost_savings_estimate": self._estimate_savings(scenario),
"user_impact_reduction": f"{random.randint(85, 99)}%"
},
"deterministic_id": f"intent_{params_hash}"
}
return healing_intent
# Helper methods
def _detect_pattern(self, component: str, scenario: Dict[str, Any]) -> str:
"""Detect incident pattern based on component"""
if 'cache' in component.lower():
return "cache_miss_storm"
elif 'database' in component.lower() or 'postgres' in component.lower():
return "db_connection_exhaustion"
elif 'memory' in component.lower() or 'java' in component.lower():
return "memory_leak"
elif 'api' in component.lower() or 'rate' in component.lower():
return "api_rate_limit"
else:
return "unknown_pattern"
def _determine_severity(self, scenario: Dict[str, Any]) -> str:
"""Determine incident severity"""
metrics = scenario.get('metrics', {})
if 'error_rate' in metrics and metrics['error_rate'] > 30:
return "critical"
elif 'response_time_ms' in metrics and metrics['response_time_ms'] > 2000:
return "critical"
elif 'memory_usage' in metrics and metrics['memory_usage'] > 90:
return "high"
else:
return random.choice(["high", "medium"])
def _calculate_pattern_confidence(self, pattern_name: str) -> float:
"""Calculate confidence for specific pattern"""
confidence_map = {
"cache_miss_storm": 0.92,
"db_connection_exhaustion": 0.88,
"memory_leak": 0.85,
"api_rate_limit": 0.90,
"unknown_pattern": 0.70
}
return confidence_map.get(pattern_name, 0.75)
def _extract_features(self, scenario: Dict[str, Any]) -> List[str]:
"""Extract features for ML analysis"""
features = []
metrics = scenario.get('metrics', {})
for key, value in metrics.items():
if isinstance(value, (int, float)):
features.append(f"{key}:{value}")
# Add derived features
if 'cache_hit_rate' in metrics and metrics['cache_hit_rate'] < 30:
features.append("cache_miss_critical")
if 'error_rate' in metrics and metrics['error_rate'] > 10:
features.append("error_rate_high")
return features[:10] # Limit to 10 features
def _get_recommended_action(self, component: str) -> str:
"""Get recommended healing action"""
if 'cache' in component.lower():
return 'scale_out'
elif 'database' in component.lower():
return 'optimize_connections'
elif 'memory' in component.lower():
return 'restart_container'
else:
return 'circuit_breaker'
def _get_action_sequence(self, component: str, success: bool) -> List[str]:
"""Get sequence of actions taken"""
base_actions = []
if 'cache' in component.lower():
base_actions = ["scale_out", "adjust_cache_ttl", "implement_warming"]
elif 'database' in component.lower():
base_actions = ["increase_pool_size", "add_timeout", "optimize_queries"]
if success and random.random() > 0.5:
base_actions.append("add_monitoring")
return base_actions
def _generate_root_cause(self, component: str) -> str:
"""Generate realistic root cause"""
causes = {
'cache': ["key_eviction_policy", "cold_cache_after_deploy", "traffic_spike_2x"],
'database': ["connection_leak_in_pool", "slow_query_cascade", "max_connections_limit"],
'memory': ["object_retention_in_cache", "thread_local_leak", "off_heap_memory_growth"]
}
for key, cause_list in causes.items():
if key in component.lower():
return random.choice(cause_list)
return "resource_constraint_under_load"
def _calculate_recency_boost(self, incidents: List[Dict[str, Any]]) -> float:
"""Calculate boost based on incident recency"""
if not incidents:
return 0.0
now = time.time()
recent_count = 0
for incident in incidents:
incident_time = incident.get('timestamp', now)
days_ago = (now - incident_time) / 86400
if days_ago < 7: # Within last week
recent_count += 1
return min(0.08, recent_count * 0.02)
def _determine_healing_action(self, component: str, pattern: str) -> Dict[str, Any]:
"""Determine healing action with parameters"""
if 'cache' in component.lower():
return {
"action": 'scale_out',
"parameters": {'scale_factor': random.choice([2, 3]), 'cache_ttl': 300},
"justification": "Scale Redis cluster and adjust cache TTL based on historical pattern",
"success_rate": 0.87,
"recovery_time": "5-15 minutes"
}
elif 'database' in component.lower():
return {
"action": 'optimize_connections',
"parameters": {'max_connections': 200, 'connection_timeout': 30},
"justification": "Optimize database connection pool settings based on load patterns",
"success_rate": 0.82,
"recovery_time": "2-8 minutes"
}
else:
return {
"action": 'restart_container',
"parameters": {'grace_period': 30, 'drain_connections': True},
"justification": "Restart container to resolve memory issues with graceful shutdown",
"success_rate": 0.95,
"recovery_time": "1-3 minutes"
}
def _calculate_rag_metrics(self, incidents: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate RAG metrics"""
if not incidents:
return {
"avg_similarity": 0.0,
"similarity_std": 0.0,
"coverage_score": 0.0
}
similarities = [i.get('similarity_score', 0) for i in incidents]
return {
"avg_similarity": sum(similarities) / len(similarities),
"similarity_std": np.std(similarities) if len(similarities) > 1 else 0.0,
"coverage_score": min(1.0, len(incidents) / 5),
"diversity_score": random.uniform(0.6, 0.9)
}
def _estimate_savings(self, scenario: Dict[str, Any]) -> int:
"""Estimate cost savings"""
impact = scenario.get('business_impact', {})
revenue_loss = impact.get('revenue_loss_per_hour', 5000)
# 70-90% savings estimate
savings_percentage = random.uniform(0.7, 0.9)
return int(revenue_loss * savings_percentage)
# Global simulator instance
_simulator = MockARFSimulator()
# Public API functions (backward compatibility)
def simulate_arf_analysis(scenario: Dict[str, Any]) -> Dict[str, Any]:
return _simulator.simulate_arf_analysis(scenario)
def run_rag_similarity_search(scenario: Dict[str, Any]) -> List[Dict[str, Any]]:
return _simulator.run_rag_similarity_search(scenario)
def calculate_pattern_confidence(scenario: Dict[str, Any], similar_incidents: List[Dict[str, Any]]) -> float:
return _simulator.calculate_pattern_confidence(scenario, similar_incidents)
def create_mock_healing_intent(scenario: Dict[str, Any],
similar_incidents: List[Dict[str, Any]],
confidence: float = 0.85) -> Dict[str, Any]:
return _simulator.create_mock_healing_intent(scenario, similar_incidents, confidence)