Spaces:
Runtime error
Runtime error
| # demo/mock_arf.py | |
| """ | |
| Enhanced Mock ARF with scenario-aware metrics | |
| Generates different values based on scenario characteristics | |
| DOCTRINAL COMPLIANCE VERSION 3.3.9+restraint | |
| Key Addition: Explicit Observation Gate for psychological advantage | |
| """ | |
| import random | |
| import time | |
| import datetime | |
| from typing import Dict, Any, List | |
| import json | |
| # Scenario-specific configurations | |
| SCENARIO_CONFIGS = { | |
| "Cache Miss Storm": { | |
| "detection_confidence_range": (0.97, 0.995), # 97-99.5% | |
| "detection_time_range": (35, 55), # 35-55 seconds | |
| "accuracy_range": (0.97, 0.995), # 97-99.5% | |
| "similar_incidents_range": (2, 5), # 2-5 similar incidents | |
| "similarity_score_range": (0.88, 0.96), # 88-96% similarity | |
| "pattern_confidence_range": (0.91, 0.97), # 91-97% confidence | |
| "success_rate_range": (0.82, 0.93), # 82-93% success rate | |
| "cost_savings_range": (5000, 9000), # $5K-$9K savings | |
| "resolution_time_range": (10, 18), # 10-18 minutes | |
| "affected_users_range": (30000, 60000), # 30K-60K users | |
| "tags": ["cache", "redis", "latency", "memory"] | |
| }, | |
| "Database Connection Pool Exhaustion": { | |
| "detection_confidence_range": (0.92, 0.98), | |
| "detection_time_range": (40, 65), | |
| "accuracy_range": (0.95, 0.985), | |
| "similar_incidents_range": (1, 4), | |
| "similarity_score_range": (0.85, 0.94), | |
| "pattern_confidence_range": (0.88, 0.95), | |
| "success_rate_range": (0.78, 0.88), | |
| "cost_savings_range": (3500, 5500), | |
| "resolution_time_range": (15, 25), | |
| "affected_users_range": (15000, 30000), | |
| "tags": ["database", "postgres", "connections", "pool"] | |
| }, | |
| "Kubernetes Memory Leak": { | |
| "detection_confidence_range": (0.94, 0.99), | |
| "detection_time_range": (30, 50), | |
| "accuracy_range": (0.96, 0.99), | |
| "similar_incidents_range": (3, 6), | |
| "similarity_score_range": (0.89, 0.95), | |
| "pattern_confidence_range": (0.90, 0.96), | |
| "success_rate_range": (0.85, 0.92), | |
| "cost_savings_range": (4500, 7500), | |
| "resolution_time_range": (12, 22), | |
| "affected_users_range": (20000, 40000), | |
| "tags": ["kubernetes", "memory", "container", "leak"] | |
| }, | |
| "API Rate Limit Storm": { | |
| "detection_confidence_range": (0.96, 0.99), | |
| "detection_time_range": (25, 45), | |
| "accuracy_range": (0.97, 0.99), | |
| "similar_incidents_range": (2, 4), | |
| "similarity_score_range": (0.87, 0.93), | |
| "pattern_confidence_range": (0.89, 0.94), | |
| "success_rate_range": (0.80, 0.90), | |
| "cost_savings_range": (3000, 5000), | |
| "resolution_time_range": (8, 15), | |
| "affected_users_range": (10000, 25000), | |
| "tags": ["api", "rate_limit", "throttling", "ddos"] | |
| }, | |
| "Network Partition": { | |
| "detection_confidence_range": (0.98, 0.999), | |
| "detection_time_range": (20, 40), | |
| "accuracy_range": (0.98, 0.995), | |
| "similar_incidents_range": (1, 3), | |
| "similarity_score_range": (0.90, 0.97), | |
| "pattern_confidence_range": (0.93, 0.98), | |
| "success_rate_range": (0.75, 0.85), | |
| "cost_savings_range": (8000, 15000), | |
| "resolution_time_range": (20, 35), | |
| "affected_users_range": (50000, 100000), | |
| "tags": ["network", "partition", "connectivity", "failure"] | |
| }, | |
| "Storage I/O Saturation": { | |
| "detection_confidence_range": (0.93, 0.98), | |
| "detection_time_range": (45, 70), | |
| "accuracy_range": (0.94, 0.98), | |
| "similar_incidents_range": (2, 5), | |
| "similarity_score_range": (0.86, 0.92), | |
| "pattern_confidence_range": (0.87, 0.93), | |
| "success_rate_range": (0.79, 0.87), | |
| "cost_savings_range": (5500, 8500), | |
| "resolution_time_range": (18, 28), | |
| "affected_users_range": (25000, 45000), | |
| "tags": ["storage", "disk", "io", "saturation"] | |
| } | |
| } | |
| def get_scenario_config(scenario_name: str) -> Dict[str, Any]: | |
| """Get configuration for a specific scenario with defaults""" | |
| return SCENARIO_CONFIGS.get(scenario_name, { | |
| "detection_confidence_range": (0.90, 0.98), | |
| "detection_time_range": (30, 60), | |
| "accuracy_range": (0.92, 0.98), | |
| "similar_incidents_range": (1, 3), | |
| "similarity_score_range": (0.85, 0.95), | |
| "pattern_confidence_range": (0.85, 0.95), | |
| "success_rate_range": (0.75, 0.90), | |
| "cost_savings_range": (4000, 8000), | |
| "resolution_time_range": (15, 30), | |
| "affected_users_range": (20000, 50000), | |
| "tags": ["unknown", "incident"] | |
| }) | |
| def simulate_arf_analysis(scenario_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Simulate ARF analysis with scenario-specific metrics | |
| Args: | |
| scenario_data: Dictionary containing scenario information | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| scenario_name = scenario_data.get("name", "Unknown Scenario") | |
| config = get_scenario_config(scenario_name) | |
| # Generate scenario-specific values | |
| detection_confidence = random.uniform(*config["detection_confidence_range"]) | |
| detection_time = random.randint(*config["detection_time_range"]) | |
| accuracy = random.uniform(*config["accuracy_range"]) | |
| return { | |
| "analysis_complete": True, | |
| "anomaly_detected": True, | |
| "severity": scenario_data.get("severity", "HIGH_VARIANCE"), # Changed from "HIGH" to "HIGH_VARIANCE" | |
| "confidence": round(detection_confidence, 3), # Round to 3 decimals | |
| "detection_time_ms": detection_time * 1000, # Convert to ms for display | |
| "detection_time_seconds": detection_time, | |
| "accuracy": round(accuracy, 3), | |
| "component": scenario_data.get("component", "unknown"), | |
| "scenario_specific": True, | |
| "scenario_name": scenario_name, | |
| "tags": config["tags"] | |
| } | |
| def run_rag_similarity_search(scenario_data: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Simulate RAG similarity search with scenario-specific results | |
| Args: | |
| scenario_data: Dictionary containing scenario information | |
| Returns: | |
| List of similar incidents | |
| """ | |
| scenario_name = scenario_data.get("name", "Unknown Scenario") | |
| config = get_scenario_config(scenario_name) | |
| similar_count = random.randint(*config["similar_incidents_range"]) | |
| similar_incidents = [] | |
| # Generate similar incidents based on scenario | |
| base_time = int(time.time()) | |
| for i in range(similar_count): | |
| similarity_score = random.uniform(*config["similarity_score_range"]) | |
| cost_savings = random.randint(*config["cost_savings_range"]) | |
| resolution_time = random.randint(*config["resolution_time_range"]) | |
| affected_users = random.randint(*config["affected_users_range"]) | |
| # Different resolutions based on scenario type | |
| if "cache" in scenario_name.lower() or "redis" in scenario_name.lower(): | |
| resolution = random.choice(["scale_out", "warm_cache", "memory_increase", "add_replicas"]) | |
| elif "database" in scenario_name.lower(): | |
| resolution = random.choice(["restart", "connection_pool_resize", "index_optimization", "vacuum"]) | |
| elif "kubernetes" in scenario_name.lower(): | |
| resolution = random.choice(["restart_pod", "memory_limit_increase", "node_drain", "resource_quota"]) | |
| elif "api" in scenario_name.lower(): | |
| resolution = random.choice(["circuit_breaker", "rate_limit_increase", "caching", "load_balancer"]) | |
| elif "network" in scenario_name.lower(): | |
| resolution = random.choice(["route_update", "failover", "bandwidth_increase", "redundancy"]) | |
| elif "storage" in scenario_name.lower(): | |
| resolution = random.choice(["io_optimization", "disk_upgrade", "cache_addition", "load_distribution"]) | |
| else: | |
| resolution = random.choice(["investigate", "scale", "restart", "optimize"]) | |
| similar_incidents.append({ | |
| "incident_id": f"inc_{base_time - random.randint(1, 90)}_00{i}", | |
| "similarity_score": round(similarity_score, 3), | |
| "success": random.random() > 0.15, # 85% success rate | |
| "resolution": resolution, | |
| "cost_savings": cost_savings, | |
| "detection_time": f"{random.randint(30, 60)}s", | |
| "resolution_time": f"{resolution_time}m", | |
| "pattern": f"{scenario_name.lower().replace(' ', '_')}_v{random.randint(1, 3)}", | |
| "affected_users": affected_users, | |
| "component_match": scenario_data.get("component", "unknown"), | |
| "rag_source": "production_memory_v3", | |
| "timestamp": f"2024-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}" | |
| }) | |
| return similar_incidents | |
| def calculate_pattern_confidence(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]]) -> float: | |
| """ | |
| Calculate pattern confidence based on similar incidents | |
| Args: | |
| scenario_data: Dictionary containing scenario information | |
| similar_incidents: List of similar incidents from RAG search | |
| Returns: | |
| Pattern confidence score (0-1) | |
| """ | |
| scenario_name = scenario_data.get("name", "Unknown Scenario") | |
| config = get_scenario_config(scenario_name) | |
| if not similar_incidents: | |
| return random.uniform(*config["pattern_confidence_range"]) | |
| # Calculate average similarity and success rate | |
| similarity_scores = [inc["similarity_score"] for inc in similar_incidents] | |
| success_rates = [1.0 if inc["success"] else 0.0 for inc in similar_incidents] | |
| avg_similarity = sum(similarity_scores) / len(similarity_scores) | |
| avg_success = sum(success_rates) / len(success_rates) | |
| # Weighted average: 60% similarity, 40% success rate | |
| confidence = (avg_similarity * 0.6) + (avg_success * 0.4) | |
| # Add some randomness but keep within scenario range | |
| min_conf, max_conf = config["pattern_confidence_range"] | |
| confidence = max(min_conf, min(max_conf, confidence)) | |
| return round(confidence, 3) | |
| def calculate_internal_success_rate(similar_incidents: List[Dict[str, Any]]) -> float: | |
| """ | |
| Calculate success rate for internal logic only. | |
| Not for UI display in Decision View. | |
| Doctrinal: Percentages invite debate, narratives shut it down. | |
| Keep this internal for logic, surface only in Outcome View. | |
| """ | |
| if not similar_incidents: | |
| return 0.0 | |
| success_count = sum(1 for inc in similar_incidents if inc.get("success", False)) | |
| return round(success_count / len(similar_incidents), 3) | |
| def check_contraindications(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Check for contraindications based on retry amplification signatures and historical evidence | |
| Returns: | |
| Dictionary with contraindication analysis | |
| """ | |
| component = scenario_data.get("component", "").lower() | |
| scenario_name = scenario_data.get("name", "").lower() | |
| # Detect retry amplification signatures | |
| retry_amplification = False | |
| evidence = [] | |
| # Check telemetry for retry storm indicators | |
| telemetry = scenario_data.get("telemetry", {}) | |
| if telemetry.get("retry_storm", False): | |
| retry_amplification = True | |
| evidence.append("Telemetry shows retry_storm: True") | |
| # Check for amplification factor in metrics | |
| metrics = scenario_data.get("metrics", {}) | |
| amplification_factor = metrics.get("amplification_factor", 1.0) | |
| if amplification_factor > 2.0: | |
| retry_amplification = True | |
| evidence.append(f"Amplification factor {amplification_factor} > 2.0") | |
| # Check database load | |
| db_load = metrics.get("database_load_percent", 0) | |
| if db_load > 85: | |
| retry_amplification = True | |
| evidence.append(f"Database load {db_load}% > 85%") | |
| # Check historical incidents for scaling-first failures | |
| historical_scaling_failures = False | |
| scaling_failure_evidence = [] | |
| for incident in similar_incidents: | |
| resolution = incident.get("resolution", "").lower() | |
| success = incident.get("success", True) | |
| # Check for scaling-first resolutions that failed | |
| if any(scale_term in resolution for scale_term in ["scale", "increase", "add_replicas"]): | |
| if not success: | |
| historical_scaling_failures = True | |
| scaling_failure_evidence.append( | |
| f"{incident.get('timestamp', 'Unknown date')}: {resolution} failed" | |
| ) | |
| contraindicated_actions = [] | |
| if retry_amplification or historical_scaling_failures: | |
| contraindicated_actions.append("scale_during_retry_amplification") | |
| return { | |
| "retry_amplification": retry_amplification, | |
| "historical_scaling_failures": historical_scaling_failures, | |
| "evidence": evidence + scaling_failure_evidence, | |
| "contraindicated_actions": contraindicated_actions, | |
| "confidence": 0.92 if evidence else 0.0 | |
| } | |
| def create_mock_healing_intent(scenario_data: Dict[str, Any], similar_incidents: List[Dict[str, Any]], confidence: float) -> Dict[str, Any]: | |
| """ | |
| Create doctrinally compliant healing intent with sequencing thesis enforcement | |
| Doctrinal Addition: Explicit Observation Gate when contraindications exist OR confidence < threshold | |
| Psychological Goal: Make inaction an explicit, powerful decision | |
| """ | |
| # Check for contraindications FIRST (doctrinal constraint) | |
| contraindications = check_contraindications(scenario_data, similar_incidents) | |
| scenario_name = scenario_data.get("name", "Unknown Scenario") | |
| config = get_scenario_config(scenario_name) | |
| component = scenario_data.get("component", "unknown") | |
| # ============ OBSERVATION GATE LOGIC ============ | |
| # Key psychological addition: Explicit deferral when uncertainty is high | |
| observation_gate_threshold = 0.70 # Below this, we observe first | |
| should_observe_first = ( | |
| contraindications["retry_amplification"] or | |
| contraindications["historical_scaling_failures"] or | |
| confidence < observation_gate_threshold or | |
| len(similar_incidents) < 2 # Insufficient historical evidence | |
| ) | |
| if should_observe_first: | |
| # Return OBSERVATION GATE state - intentional inaction | |
| current_time = datetime.datetime.now() | |
| next_evaluation = current_time + datetime.timedelta(minutes=5) | |
| return { | |
| "action": "defer_decision_for_trend_confirmation", | |
| "component": component, | |
| "confidence": round(confidence, 3), | |
| "parameters": { | |
| "observation_window": "5m", | |
| "metrics_to_watch": ["retry_count", "database_load_percent", "error_rate"], | |
| "trend_threshold": "stabilizing_or_declining" | |
| }, | |
| "source": "observation_gate_logic", | |
| "requires_enterprise": False, | |
| "advisory_only": True, | |
| # CRITICAL PSYCHOLOGICAL FIELDS | |
| "execution_state": "observe_only", | |
| "next_evaluation_window": "5m", | |
| "decision_frozen_until": next_evaluation.isoformat(), | |
| "deferral_reason": "uncertainty_too_high_for_action" if confidence < observation_gate_threshold else | |
| "contraindications_present" if contraindications["retry_amplification"] else | |
| "historical_failures_detected" if contraindications["historical_scaling_failures"] else | |
| "insufficient_historical_evidence", | |
| # FORMAL HEALINGINTENT FIELDS | |
| "preconditions": [ | |
| f"Confidence threshold not met ({confidence:.2f} < {observation_gate_threshold})" if confidence < observation_gate_threshold else | |
| "Retry amplification detected" if contraindications["retry_amplification"] else | |
| "Historical scaling failures present" if contraindications["historical_scaling_failures"] else | |
| "Insufficient similar incidents for pattern matching" | |
| ], | |
| "contraindicated_actions": ["any_healing_action_during_high_uncertainty"], | |
| "reversibility_statement": "Evaluation resumes automatically after 5-minute observation window", | |
| "sequencing_rule": "observe_before_any_action_when_uncertain", | |
| "historical_evidence": [ | |
| f"{len(similar_incidents)} similar incidents analyzed (minimum 2 required)", | |
| "Observation-first reduces incorrect actions by 67% (historical analysis)" | |
| ], | |
| # SUCCESS RATE HANDLING (kept internal, not surfaced early) | |
| "_internal_success_rate": calculate_internal_success_rate(similar_incidents) if similar_incidents else 0.0, | |
| "_internal_notes": "Success rate kept internal; percentages invite debate, narratives shut it down", | |
| "scenario_specific": True, | |
| "scenario_name": scenario_name | |
| } | |
| # If retry amplification detected (but passed observation gate threshold), enforce dampening-first logic | |
| if contraindications["retry_amplification"]: | |
| return { | |
| "action": "implement_request_coalescing_with_exponential_backoff", | |
| "component": component, | |
| "confidence": max(confidence, 0.85), # High confidence for dampening-first | |
| "parameters": { | |
| "coalescing_window_ms": "100-500ms", | |
| "backoff_factor": "exponential", | |
| "max_retries": 3, | |
| "timeout": "10m" | |
| }, | |
| "source": "contraindication_detection", | |
| "requires_enterprise": False, | |
| "advisory_only": False, | |
| # CRITICAL: Add observation window even for dampening actions | |
| "post_action_observation": { | |
| "required": True, | |
| "duration": "5m", | |
| "metrics": ["retry_count", "database_load_percent", "latency_p99"] | |
| }, | |
| "success_rate": 0.88, | |
| "estimated_impact": { | |
| "cost_savings": 4500, | |
| "resolution_time_minutes": 12, | |
| "users_protected": random.randint(*config["affected_users_range"]), | |
| "mttr_reduction": "73%" | |
| }, | |
| "safety_checks": { | |
| "blast_radius": "single_service", | |
| "business_hours": "compliant", | |
| "rollback_plan": "coalescing_disable", | |
| "approval_required": False, | |
| "risk_level": "low" | |
| }, | |
| # FORMAL HEALINGINTENT FIELDS (doctrinal constraint) | |
| "preconditions": [ | |
| "Retry amplification signature detected", | |
| f"Amplification factor > {scenario_data.get('metrics', {}).get('amplification_factor', 2.0)}", | |
| "Database load > 85%" | |
| ], | |
| "contraindicated_actions": ["scale_during_retry_storm", "add_capacity_during_amplification"], | |
| "reversibility_statement": "Remove coalescing window after 10 minutes of stable operation", | |
| "sequencing_rule": "dampening_first_then_observe_then_optional_scale", | |
| "historical_evidence": contraindications["evidence"][:3], # Top 3 evidence items | |
| "scenario_specific": True, | |
| "scenario_name": scenario_name | |
| } | |
| # Only proceed with normal logic if no contraindications AND passed observation gate | |
| # Determine action based on component and scenario WITH sequencing logic | |
| ranked_actions = [] | |
| # DAMPENING actions (always first in sequence) | |
| dampening_actions = [] | |
| if "api" in component.lower() or "rate" in scenario_name.lower(): | |
| dampening_actions.append({ | |
| "action": "circuit_breaker", | |
| "confidence": confidence * 0.95, # Slightly lower confidence for dampening | |
| "parameters": { | |
| "threshold": f"{random.randint(70, 85)}%", | |
| "window": f"{random.randint(3, 10)}m", | |
| "fallback": "cached_response", | |
| "retry_after": f"{random.randint(30, 120)}s" | |
| } | |
| }) | |
| # Add general dampening for retry-prone scenarios | |
| if any(term in component.lower() for term in ["redis", "cache", "database"]): | |
| dampening_actions.append({ | |
| "action": "request_batching_with_timeout", | |
| "confidence": confidence * 0.92, | |
| "parameters": { | |
| "batch_size": "10-50 requests", | |
| "timeout_ms": "100ms", | |
| "strategy": "adaptive" | |
| } | |
| }) | |
| # Add dampening actions to ranked list | |
| for i, act in enumerate(dampening_actions): | |
| ranked_actions.append({ | |
| "rank": len(ranked_actions) + 1, | |
| "action": act["action"], | |
| "confidence": round(act["confidence"], 3), | |
| "parameters": act["parameters"], | |
| "category": "dampening" | |
| }) | |
| # CONCURRENCY CAP actions (second in sequence) | |
| if "database" in component.lower(): | |
| ranked_actions.append({ | |
| "rank": len(ranked_actions) + 1, | |
| "action": "connection_pool_limit_adjustment", | |
| "confidence": confidence * 0.88, | |
| "parameters": { | |
| "max_connections": f"{random.randint(100, 200)}", | |
| "timeout": f"{random.randint(30, 60)}s" | |
| }, | |
| "category": "concurrency_control" | |
| }) | |
| # OBSERVE actions (third in sequence) | |
| ranked_actions.append({ | |
| "rank": len(ranked_actions) + 1, | |
| "action": "enhanced_monitoring_with_telemetry", | |
| "confidence": confidence * 0.85, | |
| "parameters": { | |
| "duration": "5m", | |
| "metrics": ["latency_p99", "error_rate", "throughput"], | |
| "alert_threshold": "2x_baseline" | |
| }, | |
| "category": "observation" | |
| }) | |
| # SCALING actions (ONLY if no contraindications AND last in sequence) | |
| # AND only if confidence justifies scaling over dampening | |
| scaling_confidence_threshold = 0.75 # Scaling requires higher confidence | |
| if confidence > scaling_confidence_threshold and not contraindications["historical_scaling_failures"]: | |
| if "cache" in component.lower() or "redis" in component.lower(): | |
| scaling_action = { | |
| "rank": len(ranked_actions) + 1, | |
| "action": "gradual_scale_out", | |
| "confidence": confidence * 0.80, # Lower confidence than dampening | |
| "parameters": { | |
| "nodes": f"{random.randint(2, 4)}→{random.randint(4, 6)}", | |
| "strategy": "one_by_one", | |
| "health_check_interval": "30s" | |
| }, | |
| "category": "scaling", | |
| "constraints": ["Only if dampening insufficient after 5 minutes"] | |
| } | |
| ranked_actions.append(scaling_action) | |
| # Calculate success rate internally only | |
| _internal_success_rate = calculate_internal_success_rate(similar_incidents) if similar_incidents else random.uniform(*config["success_rate_range"]) | |
| # Calculate estimated impact | |
| if similar_incidents: | |
| avg_cost_savings = sum(inc["cost_savings"] for inc in similar_incidents) / len(similar_incidents) | |
| avg_resolution_time = sum(int(inc["resolution_time"].replace('m', '')) for inc in similar_incidents) / len(similar_incidents) | |
| else: | |
| avg_cost_savings = sum(config["cost_savings_range"]) / 2 | |
| avg_resolution_time = sum(config["resolution_time_range"]) / 2 | |
| # Primary action is first in ranked_actions (dampening-first) | |
| primary_action = ranked_actions[0] if ranked_actions else { | |
| "action": "investigate", | |
| "confidence": confidence, | |
| "parameters": {"priority": "high"} | |
| } | |
| return { | |
| "action": primary_action["action"], | |
| "component": component, | |
| "confidence": round(confidence, 3), | |
| "parameters": primary_action.get("parameters", {}), | |
| "source": "sequencing_analysis", | |
| "requires_enterprise": True, | |
| "advisory_only": True, | |
| # SUCCESS RATE: Internal only, not for UI display in Decision View | |
| "_internal_success_rate": _internal_success_rate, | |
| "_internal_notes": "Success rate for internal logic; surface narrative outcomes, not percentages", | |
| "estimated_impact": { | |
| "cost_savings": int(avg_cost_savings), | |
| "resolution_time_minutes": int(avg_resolution_time), | |
| "users_protected": random.randint(*config["affected_users_range"]), | |
| "mttr_reduction": f"{random.randint(60, 80)}%" | |
| }, | |
| "safety_checks": { | |
| "blast_radius": f"{random.randint(1, 3)} services", | |
| "business_hours": "compliant", | |
| "rollback_plan": "available", | |
| "approval_required": True, | |
| "risk_level": "medium" if confidence < 0.9 else "low" | |
| }, | |
| # FORMAL HEALINGINTENT FIELDS (doctrinal constraint) | |
| "preconditions": [ | |
| f"Component: {component}", | |
| f"Confidence threshold > {scaling_confidence_threshold}", | |
| "No retry amplification detected", | |
| "Historical scaling success rate > 70%" | |
| ], | |
| "contraindicated_actions": contraindications["contraindicated_actions"], | |
| "reversibility_statement": f"Rollback to previous configuration available within {random.randint(5, 15)} minutes", | |
| "sequencing_rule": "dampening_before_concurrency_before_observation_before_scaling", | |
| "ranked_actions": ranked_actions, | |
| "historical_evidence": [f"{len(similar_incidents)} similar incidents analyzed"], | |
| "scenario_specific": True, | |
| "scenario_name": scenario_name | |
| } | |
| def get_scenario_metrics(scenario_name: str) -> Dict[str, Any]: | |
| """ | |
| Get dynamic metrics for a specific scenario | |
| Args: | |
| scenario_name: Name of the scenario | |
| Returns: | |
| Dictionary with scenario-specific metrics | |
| """ | |
| config = get_scenario_config(scenario_name) | |
| # Generate dynamic values within ranges | |
| return { | |
| "detection_confidence": round(random.uniform(*config["detection_confidence_range"]), 3), | |
| "detection_time_seconds": random.randint(*config["detection_time_range"]), | |
| "accuracy": round(random.uniform(*config["accuracy_range"]), 3), | |
| "expected_similar_incidents": random.randint(*config["similar_incidents_range"]), | |
| "avg_similarity_score": round(random.uniform(*config["similarity_score_range"]), 3), | |
| "pattern_confidence": round(random.uniform(*config["pattern_confidence_range"]), 3), | |
| "success_rate": round(random.uniform(*config["success_rate_range"]), 3), | |
| "cost_savings_range": config["cost_savings_range"], | |
| "resolution_time_range": config["resolution_time_range"], | |
| "affected_users_range": config["affected_users_range"], | |
| "tags": config["tags"] | |
| } | |
| def detect_retry_amplification(telemetry_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Detect retry amplification signatures from telemetry data | |
| Doctrinal constraint: Must be REAL detection, not hardcoded in scenarios | |
| Args: | |
| telemetry_data: Dictionary containing telemetry metrics | |
| Returns: | |
| Dictionary with detection results | |
| """ | |
| # Extract metrics with defaults | |
| retry_storm = telemetry_data.get("retry_storm", False) | |
| retry_count = telemetry_data.get("retry_count", 0) | |
| success_count = telemetry_data.get("success_count", 1) # Avoid division by zero | |
| database_load = telemetry_data.get("database_load_percent", 0) | |
| retry_cascade_depth = telemetry_data.get("retry_cascade_depth", 0) | |
| # Calculate amplification factor | |
| amplification_factor = 1.0 | |
| if success_count > 0: | |
| amplification_factor = retry_count / success_count | |
| # Detect signatures | |
| detected = ( | |
| retry_storm or | |
| amplification_factor > 2.0 or | |
| retry_cascade_depth > 2 or | |
| database_load > 85 | |
| ) | |
| signature = None | |
| if detected: | |
| if retry_storm and amplification_factor > 3.0: | |
| signature = "exponential_retry_cascade" | |
| elif database_load > 85 and amplification_factor > 1.5: | |
| signature = "database_amplified_retry" | |
| else: | |
| signature = "retry_amplification_detected" | |
| # Calculate confidence based on evidence strength | |
| confidence_factors = [] | |
| if retry_storm: | |
| confidence_factors.append(0.3) | |
| if amplification_factor > 2.0: | |
| confidence_factors.append(0.25 * min(amplification_factor / 5.0, 1.0)) | |
| if retry_cascade_depth > 2: | |
| confidence_factors.append(0.2 * min(retry_cascade_depth / 5.0, 1.0)) | |
| if database_load > 85: | |
| confidence_factors.append(0.25 * min(database_load / 100.0, 1.0)) | |
| confidence = min(0.98, 0.1 + sum(confidence_factors)) if confidence_factors else 0.0 | |
| return { | |
| "detected": detected, | |
| "amplification_factor": round(amplification_factor, 2), | |
| "signature": signature, | |
| "confidence": round(confidence, 3), | |
| "metrics": { | |
| "retry_storm": retry_storm, | |
| "retry_count": retry_count, | |
| "success_count": success_count, | |
| "database_load_percent": database_load, | |
| "retry_cascade_depth": retry_cascade_depth | |
| }, | |
| "recommendation": "implement_dampening_first" if detected else "proceed_with_caution" | |
| } |