Spaces:
Sleeping
Sleeping
| """ | |
| KubeSimEnv Utils - Probability helpers and simulation utilities | |
| Random failure rate generators, latency simulators, resource usage curves. | |
| Makes the simulation feel realistic and non-deterministic in the right ways. | |
| """ | |
| import numpy as np | |
| import math | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| _RNG = np.random.default_rng() | |
| def set_random_seed(seed: Optional[int]) -> None: | |
| """Set module-level RNG seed for deterministic utility behavior.""" | |
| global _RNG | |
| _RNG = np.random.default_rng(seed) | |
| class ProbabilityHelpers: | |
| """Helpers for generating realistic probabilities and distributions""" | |
| def weighted_random_choice(choices: List[Any], weights: List[float]) -> Any: | |
| """Make a weighted random choice""" | |
| if not choices or not weights or len(choices) != len(weights): | |
| return _RNG.choice(choices) if choices else None | |
| # Normalize weights | |
| total_weight = sum(weights) | |
| if total_weight == 0: | |
| return _RNG.choice(choices) | |
| normalized_weights = [w / total_weight for w in weights] | |
| return _RNG.choice(choices, p=normalized_weights) | |
| def exponential_backoff(attempt: int, base_delay: float = 1.0, max_delay: float = 60.0) -> float: | |
| """Calculate exponential backoff delay""" | |
| delay = base_delay * (2 ** attempt) | |
| return min(delay, max_delay) | |
| def poisson_arrival_rate(lambda_rate: float, time_window: float) -> int: | |
| """Generate number of events in time window using Poisson distribution""" | |
| return int(_RNG.poisson(max(lambda_rate * time_window, 0))) | |
| def failure_probability_over_time(base_rate: float, time_elapsed: float, | |
| max_rate: float = 1.0) -> float: | |
| """Calculate failure probability that increases over time""" | |
| probability = base_rate * (1 + math.log(1 + time_elapsed)) | |
| return min(probability, max_rate) | |
| def random_failure_rate(min_rate: float = 0.1, max_rate: float = 0.9) -> float: | |
| """Generate a random failure rate within bounds""" | |
| return float(_RNG.uniform(min_rate, max_rate)) | |
| class LatencySimulator: | |
| """Simulates network and service latency""" | |
| def __init__(self, base_latency_ms: float = 50.0): | |
| self.base_latency_ms = base_latency_ms | |
| self.load_factor = 1.0 | |
| def set_load(self, load_factor: float): | |
| """Set system load factor (1.0 = normal, >1.0 = overloaded)""" | |
| self.load_factor = max(0.1, load_factor) | |
| def get_latency(self) -> float: | |
| """Get simulated latency in milliseconds""" | |
| # Base latency + load-dependent component + random jitter | |
| load_latency = self.base_latency_ms * (self.load_factor - 1.0) * 2 | |
| jitter = float(_RNG.normal(0, self.base_latency_ms * 0.1)) | |
| latency = self.base_latency_ms + max(0, load_latency) + jitter | |
| return max(1.0, latency) # Minimum 1ms latency | |
| def get_latency_with_spike(self, spike_probability: float = 0.05, | |
| spike_multiplier: float = 5.0) -> float: | |
| """Get latency with occasional spikes""" | |
| latency = self.get_latency() | |
| if float(_RNG.random()) < spike_probability: | |
| latency *= spike_multiplier | |
| return latency | |
| class ResourceUsageSimulator: | |
| """Simulates realistic CPU and memory usage patterns""" | |
| def __init__(self): | |
| self.time_offset = float(_RNG.uniform(0, 2 * math.pi)) | |
| def get_cpu_usage(self, base_usage: float = 0.3, | |
| variation: float = 0.2) -> float: | |
| """Get CPU usage as percentage (0-100)""" | |
| # Simulate daily patterns with some noise | |
| time_factor = (datetime.now().timestamp() / 3600) % 24 # Hours in day | |
| daily_pattern = 0.5 * math.sin(2 * math.pi * time_factor / 24) + 0.5 | |
| usage = base_usage + variation * daily_pattern | |
| usage += float(_RNG.normal(0, 0.05)) # Noise | |
| return max(0.0, min(1.0, usage)) * 100 # Clamp to 0-100% | |
| def get_memory_usage(self, base_usage: float = 0.4, | |
| variation: float = 0.15) -> float: | |
| """Get memory usage as percentage (0-100)""" | |
| # Memory usage tends to creep up over time (simulate leak) | |
| time_factor = min((datetime.now().timestamp() / 86400) % 7, 1.0) # Weekly pattern | |
| leak_factor = 0.1 * time_factor # Slow leak over week | |
| usage = base_usage + leak_factor | |
| usage += float(_RNG.normal(0, 0.03)) # Noise | |
| return max(0.0, min(1.0, usage)) * 100 # Clamp to 0-100% | |
| def get_resource_curve(self, resource_type: str, | |
| time_elapsed: float) -> float: | |
| """Get resource usage following a specific curve""" | |
| if resource_type == "cpu": | |
| # CPU: periodic with bursts | |
| return 0.3 + 0.4 * math.sin(time_elapsed / 100) + 0.2 * float(_RNG.random()) | |
| elif resource_type == "memory": | |
| # Memory: gradual increase with occasional GC drops | |
| base = 0.2 + 0.6 * (1 - math.exp(-time_elapsed / 1000)) | |
| gc_drop = 0.3 if float(_RNG.random()) < 0.01 else 0 # Occasional GC | |
| return max(0, base - gc_drop) | |
| elif resource_type == "disk": | |
| # Disk: steady growth | |
| return 0.1 + 0.8 * min(time_elapsed / 10000, 1.0) | |
| else: | |
| return 0.5 | |
| class NetworkSimulator: | |
| """Simulates network conditions and partitions""" | |
| def __init__(self): | |
| self.partition_probability = 0.01 | |
| self.latency_ms = 10.0 | |
| self.bandwidth_mbps = 1000.0 | |
| def simulate_partition(self) -> bool: | |
| """Return True if network partition is simulated""" | |
| return float(_RNG.random()) < self.partition_probability | |
| def get_latency(self) -> float: | |
| """Get network latency in milliseconds""" | |
| # Base latency with occasional spikes | |
| latency = self.latency_ms + float(_RNG.normal(0, self.latency_ms * 0.2)) | |
| if float(_RNG.random()) < 0.05: # 5% chance of spike | |
| latency *= float(_RNG.uniform(2, 10)) | |
| return max(1.0, latency) | |
| def get_bandwidth(self) -> float: | |
| """Get available bandwidth in Mbps""" | |
| # Bandwidth varies with usage and conditions | |
| usage_factor = float(_RNG.uniform(0.3, 0.9)) | |
| condition_factor = float(_RNG.uniform(0.8, 1.2)) | |
| return self.bandwidth_mbps * usage_factor * condition_factor | |
| def generate_failure_scenario(config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Generate a random failure scenario based on config""" | |
| scenario = { | |
| "type": str(_RNG.choice(["crashloop", "oom", "node_failure", "cascade"])), | |
| "severity": float(_RNG.uniform(0.3, 0.9)), | |
| "duration": int(_RNG.integers(30, 301)), # seconds | |
| "affected_components": [] | |
| } | |
| # Add specific parameters based on type | |
| if scenario["type"] == "crashloop": | |
| scenario["failure_rate"] = config.get("crash_loop_failure_rate", 0.7) | |
| elif scenario["type"] == "oom": | |
| scenario["failure_rate"] = config.get("oom_kill_failure_rate", 0.6) | |
| elif scenario["type"] == "node_failure": | |
| scenario["failure_rate"] = config.get("node_failure_rate", 0.4) | |
| elif scenario["type"] == "cascade": | |
| scenario["probability"] = config.get("cascade_failure_probability", 0.5) | |
| return scenario | |
| def apply_realistic_noise(value: float, noise_percent: float = 10.0) -> float: | |
| """Apply realistic noise to a value""" | |
| noise = float(_RNG.normal(0, value * (noise_percent / 100.0))) | |
| return max(0, value + noise) |