petter2025's picture
Update demo/mock_arf.py
2b6faa9 verified
raw
history blame
15.7 kB
# demo/mock_arf.py
"""
Enhanced Mock ARF with scenario-aware metrics
Generates different values based on scenario characteristics
"""
import random
import time
from typing import Dict, Any, List
import json
# Scenario-specific configurations
SCENARIO_CONFIGS = {
"Cache Miss Storm": {
"detection_confidence_range": (0.97, 0.995), # 97-99.5%
"detection_time_range": (35, 55), # 35-55 seconds
"accuracy_range": (0.97, 0.995), # 97-99.5%
"similar_incidents_range": (2, 5), # 2-5 similar incidents
"similarity_score_range": (0.88, 0.96), # 88-96% similarity
"pattern_confidence_range": (0.91, 0.97), # 91-97% confidence
"success_rate_range": (0.82, 0.93), # 82-93% success rate
"cost_savings_range": (5000, 9000), # $5K-$9K savings
"resolution_time_range": (10, 18), # 10-18 minutes
"affected_users_range": (30000, 60000), # 30K-60K users
"tags": ["cache", "redis", "latency", "memory"]
},
"Database Connection Pool Exhaustion": {
"detection_confidence_range": (0.92, 0.98),
"detection_time_range": (40, 65),
"accuracy_range": (0.95, 0.985),
"similar_incidents_range": (1, 4),
"similarity_score_range": (0.85, 0.94),
"pattern_confidence_range": (0.88, 0.95),
"success_rate_range": (0.78, 0.88),
"cost_savings_range": (3500, 5500),
"resolution_time_range": (15, 25),
"affected_users_range": (15000, 30000),
"tags": ["database", "postgres", "connections", "pool"]
},
"Kubernetes Memory Leak": {
"detection_confidence_range": (0.94, 0.99),
"detection_time_range": (30, 50),
"accuracy_range": (0.96, 0.99),
"similar_incidents_range": (3, 6),
"similarity_score_range": (0.89, 0.95),
"pattern_confidence_range": (0.90, 0.96),
"success_rate_range": (0.85, 0.92),
"cost_savings_range": (4500, 7500),
"resolution_time_range": (12, 22),
"affected_users_range": (20000, 40000),
"tags": ["kubernetes", "memory", "container", "leak"]
},
"API Rate Limit Storm": {
"detection_confidence_range": (0.96, 0.99),
"detection_time_range": (25, 45),
"accuracy_range": (0.97, 0.99),
"similar_incidents_range": (2, 4),
"similarity_score_range": (0.87, 0.93),
"pattern_confidence_range": (0.89, 0.94),
"success_rate_range": (0.80, 0.90),
"cost_savings_range": (3000, 5000),
"resolution_time_range": (8, 15),
"affected_users_range": (10000, 25000),
"tags": ["api", "rate_limit", "throttling", "ddos"]
},
"Network Partition": {
"detection_confidence_range": (0.98, 0.999),
"detection_time_range": (20, 40),
"accuracy_range": (0.98, 0.995),
"similar_incidents_range": (1, 3),
"similarity_score_range": (0.90, 0.97),
"pattern_confidence_range": (0.93, 0.98),
"success_rate_range": (0.75, 0.85),
"cost_savings_range": (8000, 15000),
"resolution_time_range": (20, 35),
"affected_users_range": (50000, 100000),
"tags": ["network", "partition", "connectivity", "failure"]
},
"Storage I/O Saturation": {
"detection_confidence_range": (0.93, 0.98),
"detection_time_range": (45, 70),
"accuracy_range": (0.94, 0.98),
"similar_incidents_range": (2, 5),
"similarity_score_range": (0.86, 0.92),
"pattern_confidence_range": (0.87, 0.93),
"success_rate_range": (0.79, 0.87),
"cost_savings_range": (5500, 8500),
"resolution_time_range": (18, 28),
"affected_users_range": (25000, 45000),
"tags": ["storage", "disk", "io", "saturation"]
}
}
def get_scenario_config(scenario_name: str) -> Dict[str, Any]:
"""Get configuration for a specific scenario with defaults"""
return SCENARIO_CONFIGS.get(scenario_name, {
"detection_confidence_range": (0.90, 0.98),
"detection_time_range": (30, 60),
"accuracy_range": (0.92, 0.98),
"similar_incidents_range": (1, 3),
"similarity_score_range": (0.85, 0.95),
"pattern_confidence_range": (0.85, 0.95),
"success_rate_range": (0.75, 0.90),
"cost_savings_range": (4000, 8000),
"resolution_time_range": (15, 30),
"affected_users_range": (20000, 50000),
"tags": ["unknown", "incident"]
})
def simulate_arf_analysis(scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Simulate ARF analysis with scenario-specific metrics
Args:
scenario_data: Dictionary containing scenario information
Returns:
Dictionary with analysis results
"""
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
# Generate scenario-specific values
detection_confidence = random.uniform(*config["detection_confidence_range"])
detection_time = random.randint(*config["detection_time_range"])
accuracy = random.uniform(*config["accuracy_range"])
return {
"analysis_complete": True,
"anomaly_detected": True,
"severity": scenario_data.get("severity", "HIGH"),
"confidence": round(detection_confidence, 3), # Round to 3 decimals
"detection_time_ms": detection_time * 1000, # Convert to ms for display
"detection_time_seconds": detection_time,
"accuracy": round(accuracy, 3),
"component": scenario_data.get("component", "unknown"),
"scenario_specific": True,
"scenario_name": scenario_name,
"tags": config["tags"]
}
def run_rag_similarity_search(scenario_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Simulate RAG similarity search with scenario-specific results
Args:
scenario_data: Dictionary containing scenario information
Returns:
List of similar incidents
"""
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
similar_count = random.randint(*config["similar_incidents_range"])
similar_incidents = []
# Generate similar incidents based on scenario
base_time = int(time.time())
for i in range(similar_count):
similarity_score = random.uniform(*config["similarity_score_range"])
cost_savings = random.randint(*config["cost_savings_range"])
resolution_time = random.randint(*config["resolution_time_range"])
affected_users = random.randint(*config["affected_users_range"])
# Different resolutions based on scenario type
if "cache" in scenario_name.lower() or "redis" in scenario_name.lower():
resolution = random.choice(["scale_out", "warm_cache", "memory_increase", "add_replicas"])
elif "database" in scenario_name.lower():
resolution = random.choice(["restart", "connection_pool_resize", "index_optimization", "vacuum"])
elif "kubernetes" in scenario_name.lower():
resolution = random.choice(["restart_pod", "memory_limit_increase", "node_drain", "resource_quota"])
elif "api" in scenario_name.lower():
resolution = random.choice(["circuit_breaker", "rate_limit_increase", "caching", "load_balancer"])
elif "network" in scenario_name.lower():
resolution = random.choice(["route_update", "failover", "bandwidth_increase", "redundancy"])
elif "storage" in scenario_name.lower():
resolution = random.choice(["io_optimization", "disk_upgrade", "cache_addition", "load_distribution"])
else:
resolution = random.choice(["investigate", "scale", "restart", "optimize"])
similar_incidents.append({
"incident_id": f"inc_{base_time - random.randint(1, 90)}_00{i}",
"similarity_score": round(similarity_score, 3),
"success": random.random() > 0.15, # 85% success rate
"resolution": resolution,
"cost_savings": cost_savings,
"detection_time": f"{random.randint(30, 60)}s",
"resolution_time": f"{resolution_time}m",
"pattern": f"{scenario_name.lower().replace(' ', '_')}_v{random.randint(1, 3)}",
"affected_users": affected_users,
"component_match": scenario_data.get("component", "unknown"),
"rag_source": "production_memory_v3",
"timestamp": f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}"
})
return similar_incidents
def calculate_pattern_confidence(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]]) -> float:
"""
Calculate pattern confidence based on similar incidents
Args:
scenario_data: Dictionary containing scenario information
similar_incidents: List of similar incidents from RAG search
Returns:
Pattern confidence score (0-1)
"""
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
if not similar_incidents:
return random.uniform(*config["pattern_confidence_range"])
# Calculate average similarity and success rate
similarity_scores = [inc["similarity_score"] for inc in similar_incidents]
success_rates = [1.0 if inc["success"] else 0.0 for inc in similar_incidents]
avg_similarity = sum(similarity_scores) / len(similarity_scores)
avg_success = sum(success_rates) / len(success_rates)
# Weighted average: 60% similarity, 40% success rate
confidence = (avg_similarity * 0.6) + (avg_success * 0.4)
# Add some randomness but keep within scenario range
min_conf, max_conf = config["pattern_confidence_range"]
confidence = max(min_conf, min(max_conf, confidence))
return round(confidence, 3)
def create_mock_healing_intent(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]], confidence: float) -> Dict[str, Any]:
"""
Create mock healing intent based on scenario and similar incidents
Args:
scenario_data: Dictionary containing scenario information
similar_incidents: List of similar incidents from RAG search
confidence: Pattern confidence score
Returns:
Healing intent dictionary
"""
scenario_name = scenario_data.get("name", "Unknown Scenario")
config = get_scenario_config(scenario_name)
component = scenario_data.get("component", "unknown")
# Determine action based on component and scenario
if "cache" in component.lower() or "redis" in component.lower():
action = "scale_out"
parameters = {
"nodes": f"{random.randint(2, 4)}{random.randint(5, 8)}",
"memory": f"{random.randint(8, 16)}GB→{random.randint(24, 64)}GB",
"strategy": "gradual_scale",
"region": "auto-select"
}
elif "database" in component.lower():
action = "restart"
parameters = {
"connections": f"{random.randint(50, 100)}{random.randint(150, 300)}",
"timeout": f"{random.randint(30, 60)}s",
"strategy": "rolling_restart",
"maintenance_window": "immediate"
}
elif "kubernetes" in component.lower():
action = "memory_limit_increase"
parameters = {
"memory": f"{random.randint(512, 1024)}Mi→{random.randint(2048, 4096)}Mi",
"strategy": "pod_restart",
"drain_timeout": f"{random.randint(5, 15)}m"
}
elif "api" in component.lower():
action = "circuit_breaker"
parameters = {
"threshold": f"{random.randint(70, 85)}%",
"window": f"{random.randint(3, 10)}m",
"fallback": "cached_response",
"retry_after": f"{random.randint(30, 120)}s"
}
elif "network" in component.lower():
action = "failover"
parameters = {
"primary": "us-east-1",
"secondary": "us-west-2",
"timeout": f"{random.randint(10, 30)}s",
"health_check": "enhanced"
}
elif "storage" in component.lower():
action = "io_optimization"
parameters = {
"iops": f"{random.randint(1000, 3000)}{random.randint(5000, 10000)}",
"throughput": f"{random.randint(100, 250)}MB/s→{random.randint(500, 1000)}MB/s",
"cache_size": f"{random.randint(8, 16)}GB→{random.randint(32, 64)}GB"
}
else:
action = "investigate"
parameters = {
"priority": "high",
"escalation": "tier2",
"timeout": "30m"
}
# Calculate success rate from similar incidents
if similar_incidents:
success_count = sum(1 for inc in similar_incidents if inc["success"])
success_rate = success_count / len(similar_incidents)
else:
success_rate = random.uniform(*config["success_rate_range"])
# Calculate estimated impact
if similar_incidents:
avg_cost_savings = sum(inc["cost_savings"] for inc in similar_incidents) / len(similar_incidents)
avg_resolution_time = sum(int(inc["resolution_time"].replace('m', '')) for inc in similar_incidents) / len(similar_incidents)
else:
avg_cost_savings = sum(config["cost_savings_range"]) / 2
avg_resolution_time = sum(config["resolution_time_range"]) / 2
return {
"action": action,
"component": component,
"confidence": round(confidence, 3),
"parameters": parameters,
"source": "mock_analysis",
"requires_enterprise": True,
"advisory_only": True,
"success_rate": round(success_rate, 3),
"estimated_impact": {
"cost_savings": int(avg_cost_savings),
"resolution_time_minutes": int(avg_resolution_time),
"users_protected": random.randint(*config["affected_users_range"]),
"mttr_reduction": f"{random.randint(60, 80)}%"
},
"safety_checks": {
"blast_radius": f"{random.randint(1, 3)} services",
"business_hours": "compliant",
"rollback_plan": "available",
"approval_required": True,
"risk_level": "medium" if confidence < 0.9 else "low"
},
"scenario_specific": True,
"scenario_name": scenario_name
}
def get_scenario_metrics(scenario_name: str) -> Dict[str, Any]:
"""
Get dynamic metrics for a specific scenario
Args:
scenario_name: Name of the scenario
Returns:
Dictionary with scenario-specific metrics
"""
config = get_scenario_config(scenario_name)
# Generate dynamic values within ranges
return {
"detection_confidence": round(random.uniform(*config["detection_confidence_range"]), 3),
"detection_time_seconds": random.randint(*config["detection_time_range"]),
"accuracy": round(random.uniform(*config["accuracy_range"]), 3),
"expected_similar_incidents": random.randint(*config["similar_incidents_range"]),
"avg_similarity_score": round(random.uniform(*config["similarity_score_range"]), 3),
"pattern_confidence": round(random.uniform(*config["pattern_confidence_range"]), 3),
"success_rate": round(random.uniform(*config["success_rate_range"]), 3),
"cost_savings_range": config["cost_savings_range"],
"resolution_time_range": config["resolution_time_range"],
"affected_users_range": config["affected_users_range"],
"tags": config["tags"]
}