| |
| """ |
| Enhanced Mock ARF with scenario-aware metrics |
| Generates different values based on scenario characteristics |
| """ |
| import random |
| import time |
| from typing import Dict, Any, List |
| import json |
|
|
| |
| SCENARIO_CONFIGS = { |
| "Cache Miss Storm": { |
| "detection_confidence_range": (0.97, 0.995), |
| "detection_time_range": (35, 55), |
| "accuracy_range": (0.97, 0.995), |
| "similar_incidents_range": (2, 5), |
| "similarity_score_range": (0.88, 0.96), |
| "pattern_confidence_range": (0.91, 0.97), |
| "success_rate_range": (0.82, 0.93), |
| "cost_savings_range": (5000, 9000), |
| "resolution_time_range": (10, 18), |
| "affected_users_range": (30000, 60000), |
| "tags": ["cache", "redis", "latency", "memory"] |
| }, |
| "Database Connection Pool Exhaustion": { |
| "detection_confidence_range": (0.92, 0.98), |
| "detection_time_range": (40, 65), |
| "accuracy_range": (0.95, 0.985), |
| "similar_incidents_range": (1, 4), |
| "similarity_score_range": (0.85, 0.94), |
| "pattern_confidence_range": (0.88, 0.95), |
| "success_rate_range": (0.78, 0.88), |
| "cost_savings_range": (3500, 5500), |
| "resolution_time_range": (15, 25), |
| "affected_users_range": (15000, 30000), |
| "tags": ["database", "postgres", "connections", "pool"] |
| }, |
| "Kubernetes Memory Leak": { |
| "detection_confidence_range": (0.94, 0.99), |
| "detection_time_range": (30, 50), |
| "accuracy_range": (0.96, 0.99), |
| "similar_incidents_range": (3, 6), |
| "similarity_score_range": (0.89, 0.95), |
| "pattern_confidence_range": (0.90, 0.96), |
| "success_rate_range": (0.85, 0.92), |
| "cost_savings_range": (4500, 7500), |
| "resolution_time_range": (12, 22), |
| "affected_users_range": (20000, 40000), |
| "tags": ["kubernetes", "memory", "container", "leak"] |
| }, |
| "API Rate Limit Storm": { |
| "detection_confidence_range": (0.96, 0.99), |
| "detection_time_range": (25, 45), |
| "accuracy_range": (0.97, 0.99), |
| "similar_incidents_range": (2, 4), |
| "similarity_score_range": (0.87, 0.93), |
| "pattern_confidence_range": (0.89, 0.94), |
| "success_rate_range": (0.80, 0.90), |
| "cost_savings_range": (3000, 5000), |
| "resolution_time_range": (8, 15), |
| "affected_users_range": (10000, 25000), |
| "tags": ["api", "rate_limit", "throttling", "ddos"] |
| }, |
| "Network Partition": { |
| "detection_confidence_range": (0.98, 0.999), |
| "detection_time_range": (20, 40), |
| "accuracy_range": (0.98, 0.995), |
| "similar_incidents_range": (1, 3), |
| "similarity_score_range": (0.90, 0.97), |
| "pattern_confidence_range": (0.93, 0.98), |
| "success_rate_range": (0.75, 0.85), |
| "cost_savings_range": (8000, 15000), |
| "resolution_time_range": (20, 35), |
| "affected_users_range": (50000, 100000), |
| "tags": ["network", "partition", "connectivity", "failure"] |
| }, |
| "Storage I/O Saturation": { |
| "detection_confidence_range": (0.93, 0.98), |
| "detection_time_range": (45, 70), |
| "accuracy_range": (0.94, 0.98), |
| "similar_incidents_range": (2, 5), |
| "similarity_score_range": (0.86, 0.92), |
| "pattern_confidence_range": (0.87, 0.93), |
| "success_rate_range": (0.79, 0.87), |
| "cost_savings_range": (5500, 8500), |
| "resolution_time_range": (18, 28), |
| "affected_users_range": (25000, 45000), |
| "tags": ["storage", "disk", "io", "saturation"] |
| } |
| } |
|
|
| def get_scenario_config(scenario_name: str) -> Dict[str, Any]: |
| """Get configuration for a specific scenario with defaults""" |
| return SCENARIO_CONFIGS.get(scenario_name, { |
| "detection_confidence_range": (0.90, 0.98), |
| "detection_time_range": (30, 60), |
| "accuracy_range": (0.92, 0.98), |
| "similar_incidents_range": (1, 3), |
| "similarity_score_range": (0.85, 0.95), |
| "pattern_confidence_range": (0.85, 0.95), |
| "success_rate_range": (0.75, 0.90), |
| "cost_savings_range": (4000, 8000), |
| "resolution_time_range": (15, 30), |
| "affected_users_range": (20000, 50000), |
| "tags": ["unknown", "incident"] |
| }) |
|
|
| def simulate_arf_analysis(scenario_data: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Simulate ARF analysis with scenario-specific metrics |
| |
| Args: |
| scenario_data: Dictionary containing scenario information |
| |
| Returns: |
| Dictionary with analysis results |
| """ |
| scenario_name = scenario_data.get("name", "Unknown Scenario") |
| config = get_scenario_config(scenario_name) |
| |
| |
| detection_confidence = random.uniform(*config["detection_confidence_range"]) |
| detection_time = random.randint(*config["detection_time_range"]) |
| accuracy = random.uniform(*config["accuracy_range"]) |
| |
| return { |
| "analysis_complete": True, |
| "anomaly_detected": True, |
| "severity": scenario_data.get("severity", "HIGH"), |
| "confidence": round(detection_confidence, 3), |
| "detection_time_ms": detection_time * 1000, |
| "detection_time_seconds": detection_time, |
| "accuracy": round(accuracy, 3), |
| "component": scenario_data.get("component", "unknown"), |
| "scenario_specific": True, |
| "scenario_name": scenario_name, |
| "tags": config["tags"] |
| } |
|
|
| def run_rag_similarity_search(scenario_data: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """ |
| Simulate RAG similarity search with scenario-specific results |
| |
| Args: |
| scenario_data: Dictionary containing scenario information |
| |
| Returns: |
| List of similar incidents |
| """ |
| scenario_name = scenario_data.get("name", "Unknown Scenario") |
| config = get_scenario_config(scenario_name) |
| |
| similar_count = random.randint(*config["similar_incidents_range"]) |
| similar_incidents = [] |
| |
| |
| base_time = int(time.time()) |
| |
| for i in range(similar_count): |
| similarity_score = random.uniform(*config["similarity_score_range"]) |
| cost_savings = random.randint(*config["cost_savings_range"]) |
| resolution_time = random.randint(*config["resolution_time_range"]) |
| affected_users = random.randint(*config["affected_users_range"]) |
| |
| |
| if "cache" in scenario_name.lower() or "redis" in scenario_name.lower(): |
| resolution = random.choice(["scale_out", "warm_cache", "memory_increase", "add_replicas"]) |
| elif "database" in scenario_name.lower(): |
| resolution = random.choice(["restart", "connection_pool_resize", "index_optimization", "vacuum"]) |
| elif "kubernetes" in scenario_name.lower(): |
| resolution = random.choice(["restart_pod", "memory_limit_increase", "node_drain", "resource_quota"]) |
| elif "api" in scenario_name.lower(): |
| resolution = random.choice(["circuit_breaker", "rate_limit_increase", "caching", "load_balancer"]) |
| elif "network" in scenario_name.lower(): |
| resolution = random.choice(["route_update", "failover", "bandwidth_increase", "redundancy"]) |
| elif "storage" in scenario_name.lower(): |
| resolution = random.choice(["io_optimization", "disk_upgrade", "cache_addition", "load_distribution"]) |
| else: |
| resolution = random.choice(["investigate", "scale", "restart", "optimize"]) |
| |
| similar_incidents.append({ |
| "incident_id": f"inc_{base_time - random.randint(1, 90)}_00{i}", |
| "similarity_score": round(similarity_score, 3), |
| "success": random.random() > 0.15, |
| "resolution": resolution, |
| "cost_savings": cost_savings, |
| "detection_time": f"{random.randint(30, 60)}s", |
| "resolution_time": f"{resolution_time}m", |
| "pattern": f"{scenario_name.lower().replace(' ', '_')}_v{random.randint(1, 3)}", |
| "affected_users": affected_users, |
| "component_match": scenario_data.get("component", "unknown"), |
| "rag_source": "production_memory_v3", |
| "timestamp": f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}" |
| }) |
| |
| return similar_incidents |
|
|
| def calculate_pattern_confidence(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]]) -> float: |
| """ |
| Calculate pattern confidence based on similar incidents |
| |
| Args: |
| scenario_data: Dictionary containing scenario information |
| similar_incidents: List of similar incidents from RAG search |
| |
| Returns: |
| Pattern confidence score (0-1) |
| """ |
| scenario_name = scenario_data.get("name", "Unknown Scenario") |
| config = get_scenario_config(scenario_name) |
| |
| if not similar_incidents: |
| return random.uniform(*config["pattern_confidence_range"]) |
| |
| |
| similarity_scores = [inc["similarity_score"] for inc in similar_incidents] |
| success_rates = [1.0 if inc["success"] else 0.0 for inc in similar_incidents] |
| |
| avg_similarity = sum(similarity_scores) / len(similarity_scores) |
| avg_success = sum(success_rates) / len(success_rates) |
| |
| |
| confidence = (avg_similarity * 0.6) + (avg_success * 0.4) |
| |
| |
| min_conf, max_conf = config["pattern_confidence_range"] |
| confidence = max(min_conf, min(max_conf, confidence)) |
| |
| return round(confidence, 3) |
|
|
| def create_mock_healing_intent(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]], confidence: float) -> Dict[str, Any]: |
| """ |
| Create mock healing intent based on scenario and similar incidents |
| |
| Args: |
| scenario_data: Dictionary containing scenario information |
| similar_incidents: List of similar incidents from RAG search |
| confidence: Pattern confidence score |
| |
| Returns: |
| Healing intent dictionary |
| """ |
| scenario_name = scenario_data.get("name", "Unknown Scenario") |
| config = get_scenario_config(scenario_name) |
| |
| component = scenario_data.get("component", "unknown") |
| |
| |
| if "cache" in component.lower() or "redis" in component.lower(): |
| action = "scale_out" |
| parameters = { |
| "nodes": f"{random.randint(2, 4)}→{random.randint(5, 8)}", |
| "memory": f"{random.randint(8, 16)}GB→{random.randint(24, 64)}GB", |
| "strategy": "gradual_scale", |
| "region": "auto-select" |
| } |
| elif "database" in component.lower(): |
| action = "restart" |
| parameters = { |
| "connections": f"{random.randint(50, 100)}→{random.randint(150, 300)}", |
| "timeout": f"{random.randint(30, 60)}s", |
| "strategy": "rolling_restart", |
| "maintenance_window": "immediate" |
| } |
| elif "kubernetes" in component.lower(): |
| action = "memory_limit_increase" |
| parameters = { |
| "memory": f"{random.randint(512, 1024)}Mi→{random.randint(2048, 4096)}Mi", |
| "strategy": "pod_restart", |
| "drain_timeout": f"{random.randint(5, 15)}m" |
| } |
| elif "api" in component.lower(): |
| action = "circuit_breaker" |
| parameters = { |
| "threshold": f"{random.randint(70, 85)}%", |
| "window": f"{random.randint(3, 10)}m", |
| "fallback": "cached_response", |
| "retry_after": f"{random.randint(30, 120)}s" |
| } |
| elif "network" in component.lower(): |
| action = "failover" |
| parameters = { |
| "primary": "us-east-1", |
| "secondary": "us-west-2", |
| "timeout": f"{random.randint(10, 30)}s", |
| "health_check": "enhanced" |
| } |
| elif "storage" in component.lower(): |
| action = "io_optimization" |
| parameters = { |
| "iops": f"{random.randint(1000, 3000)}→{random.randint(5000, 10000)}", |
| "throughput": f"{random.randint(100, 250)}MB/s→{random.randint(500, 1000)}MB/s", |
| "cache_size": f"{random.randint(8, 16)}GB→{random.randint(32, 64)}GB" |
| } |
| else: |
| action = "investigate" |
| parameters = { |
| "priority": "high", |
| "escalation": "tier2", |
| "timeout": "30m" |
| } |
| |
| |
| if similar_incidents: |
| success_count = sum(1 for inc in similar_incidents if inc["success"]) |
| success_rate = success_count / len(similar_incidents) |
| else: |
| success_rate = random.uniform(*config["success_rate_range"]) |
| |
| |
| if similar_incidents: |
| avg_cost_savings = sum(inc["cost_savings"] for inc in similar_incidents) / len(similar_incidents) |
| avg_resolution_time = sum(int(inc["resolution_time"].replace('m', '')) for inc in similar_incidents) / len(similar_incidents) |
| else: |
| avg_cost_savings = sum(config["cost_savings_range"]) / 2 |
| avg_resolution_time = sum(config["resolution_time_range"]) / 2 |
| |
| return { |
| "action": action, |
| "component": component, |
| "confidence": round(confidence, 3), |
| "parameters": parameters, |
| "source": "mock_analysis", |
| "requires_enterprise": True, |
| "advisory_only": True, |
| "success_rate": round(success_rate, 3), |
| "estimated_impact": { |
| "cost_savings": int(avg_cost_savings), |
| "resolution_time_minutes": int(avg_resolution_time), |
| "users_protected": random.randint(*config["affected_users_range"]), |
| "mttr_reduction": f"{random.randint(60, 80)}%" |
| }, |
| "safety_checks": { |
| "blast_radius": f"{random.randint(1, 3)} services", |
| "business_hours": "compliant", |
| "rollback_plan": "available", |
| "approval_required": True, |
| "risk_level": "medium" if confidence < 0.9 else "low" |
| }, |
| "scenario_specific": True, |
| "scenario_name": scenario_name |
| } |
|
|
| def get_scenario_metrics(scenario_name: str) -> Dict[str, Any]: |
| """ |
| Get dynamic metrics for a specific scenario |
| |
| Args: |
| scenario_name: Name of the scenario |
| |
| Returns: |
| Dictionary with scenario-specific metrics |
| """ |
| config = get_scenario_config(scenario_name) |
| |
| |
| return { |
| "detection_confidence": round(random.uniform(*config["detection_confidence_range"]), 3), |
| "detection_time_seconds": random.randint(*config["detection_time_range"]), |
| "accuracy": round(random.uniform(*config["accuracy_range"]), 3), |
| "expected_similar_incidents": random.randint(*config["similar_incidents_range"]), |
| "avg_similarity_score": round(random.uniform(*config["similarity_score_range"]), 3), |
| "pattern_confidence": round(random.uniform(*config["pattern_confidence_range"]), 3), |
| "success_rate": round(random.uniform(*config["success_rate_range"]), 3), |
| "cost_savings_range": config["cost_savings_range"], |
| "resolution_time_range": config["resolution_time_range"], |
| "affected_users_range": config["affected_users_range"], |
| "tags": config["tags"] |
| } |