| """ |
| SystemSimulator — the core state machine for NOC incident simulation. |
| |
| Responsibilities (Single Responsibility Principle): |
| - Maintain and evolve SystemMetrics across episode steps |
| - Apply incident drift and action effects |
| - Track episode termination conditions |
| |
| The simulator is framework-agnostic: it is shared by both the Gymnasium |
| training env and the OpenEnv HTTP server. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import random |
| from dataclasses import dataclass, field |
|
|
| import numpy as np |
|
|
| from .incidents import IncidentProfile, get_profile |
| from .models import ActionType, IncidentType, SystemMetrics |
|
|
| |
| _NOISE_STD: float = 0.008 |
|
|
| |
| _RESOLUTION_STREAK_REQUIRED: int = 3 |
|
|
|
|
| @dataclass |
| class StepInfo: |
| """Carries extra context returned alongside metrics after each step.""" |
|
|
| reward: float |
| done: bool |
| truncated: bool |
| crashed: bool |
| resolved: bool |
| action_was_effective: bool |
| step: int |
|
|
|
|
| class SystemSimulator: |
| """ |
| Simulates a Linux system node affected by a single incident. |
| |
| Usage:: |
| |
| sim = SystemSimulator(seed=42) |
| metrics = sim.reset(IncidentType.CPU_OVERLOAD) |
| metrics, info = sim.step(ActionType.THROTTLE_CPU) |
| """ |
|
|
| def __init__(self, seed: int | None = None) -> None: |
| self._rng = np.random.default_rng(seed) |
| self._random = random.Random(seed) |
|
|
| self._profile: IncidentProfile | None = None |
| self._metrics: SystemMetrics | None = None |
| self._step: int = 0 |
| self._resolution_streak: int = 0 |
| self._prev_health: float = 0.0 |
|
|
| |
| |
| |
|
|
| def reset(self, incident_type: IncidentType) -> SystemMetrics: |
| """ |
| Start a new episode with the given incident type. |
| |
| Returns the initial SystemMetrics observation. |
| """ |
| self._profile = get_profile(incident_type) |
| self._step = 0 |
| self._resolution_streak = 0 |
|
|
| self._metrics = SystemMetrics( |
| cpu_usage=self._jitter(self._profile.initial_cpu), |
| memory_usage=self._jitter(self._profile.initial_memory), |
| latency=self._jitter(self._profile.initial_latency), |
| packet_loss=self._jitter(self._profile.initial_packet_loss), |
| service_healthy=self._profile.initial_service_healthy, |
| error_rate=self._jitter(self._profile.initial_error_rate), |
| ) |
| self._prev_health = self._metrics.health_score |
| return self._metrics |
|
|
| def step(self, action: ActionType) -> tuple[SystemMetrics, StepInfo]: |
| """ |
| Advance the simulation by one step. |
| |
| - First applies incident drift (worsening) |
| - Then applies the chosen action effect |
| - Adds small Gaussian noise for realism |
| - Checks resolution and crash conditions |
| |
| Returns the updated metrics and a StepInfo with reward and flags. |
| """ |
| if self._profile is None or self._metrics is None: |
| raise RuntimeError("Call reset() before step()") |
|
|
| effect = self._profile.action_effects[action] |
| drift = self._profile.drift |
|
|
| |
| cpu = self._metrics.cpu_usage + drift.cpu_usage + effect.cpu_usage |
| memory = self._metrics.memory_usage + drift.memory_usage + effect.memory_usage |
| latency = self._metrics.latency + drift.latency + effect.latency |
| packet_loss = self._metrics.packet_loss + drift.packet_loss + effect.packet_loss |
| error_rate = self._metrics.error_rate + drift.error_rate + effect.error_rate |
|
|
| |
| if action == ActionType.RESTART_SERVICE: |
| service_healthy = 0.0 |
| else: |
| |
| service_healthy = 1.0 if error_rate < 0.50 else max(0.0, self._metrics.service_healthy - 0.1) |
|
|
| |
| noise = self._rng.normal(0.0, _NOISE_STD, 5) |
| cpu += noise[0] |
| memory += noise[1] |
| latency += noise[2] |
| packet_loss += noise[3] |
| error_rate += noise[4] |
|
|
| |
| self._metrics = SystemMetrics( |
| cpu_usage=float(np.clip(cpu, 0.0, 1.0)), |
| memory_usage=float(np.clip(memory, 0.0, 1.0)), |
| latency=float(np.clip(latency, 0.0, 1.0)), |
| packet_loss=float(np.clip(packet_loss, 0.0, 1.0)), |
| service_healthy=float(np.clip(service_healthy, 0.0, 1.0)), |
| error_rate=float(np.clip(error_rate, 0.0, 1.0)), |
| ) |
|
|
| self._step += 1 |
| current_health = self._metrics.health_score |
|
|
| |
| if self._metrics.is_resolved: |
| self._resolution_streak += 1 |
| else: |
| self._resolution_streak = 0 |
|
|
| resolved = self._resolution_streak >= _RESOLUTION_STREAK_REQUIRED |
| crashed = self._metrics.is_critical |
| truncated = self._step >= self._profile.max_steps |
| done = resolved or crashed or truncated |
|
|
| |
| action_was_effective = current_health > self._prev_health + 0.005 |
| self._prev_health = current_health |
|
|
| reward = self._calculate_reward( |
| metrics=self._metrics, |
| prev_health=self._prev_health, |
| current_health=current_health, |
| resolved=resolved, |
| crashed=crashed, |
| action=action, |
| action_was_effective=action_was_effective, |
| ) |
|
|
| return self._metrics, StepInfo( |
| reward=reward, |
| done=done, |
| truncated=truncated, |
| crashed=crashed, |
| resolved=resolved, |
| action_was_effective=action_was_effective, |
| step=self._step, |
| ) |
|
|
| @property |
| def current_metrics(self) -> SystemMetrics | None: |
| return self._metrics |
|
|
| @property |
| def current_step(self) -> int: |
| return self._step |
|
|
| @property |
| def profile(self) -> IncidentProfile | None: |
| return self._profile |
|
|
| |
| |
| |
|
|
| def _jitter(self, value: float, std: float = 0.02) -> float: |
| """Add small noise to initial values so each episode differs slightly.""" |
| noisy = value + float(self._rng.normal(0.0, std)) |
| return float(np.clip(noisy, 0.0, 1.0)) |
|
|
| def _calculate_reward( |
| self, |
| metrics: SystemMetrics, |
| prev_health: float, |
| current_health: float, |
| resolved: bool, |
| crashed: bool, |
| action: ActionType, |
| action_was_effective: bool, |
| ) -> float: |
| |
| health_delta = (current_health - prev_health) * 5.0 |
|
|
| |
| health_bonus = current_health * 0.5 |
|
|
| |
| step_penalty = -0.10 |
|
|
| |
| resolution_bonus = 15.0 if resolved else 0.0 |
| crash_penalty = -10.0 if crashed else 0.0 |
|
|
| |
| ineffective_penalty = ( |
| -0.30 |
| if (not action_was_effective and action != ActionType.DO_NOTHING and not resolved) |
| else 0.0 |
| ) |
|
|
| return health_delta + health_bonus + step_penalty + resolution_bonus + crash_penalty + ineffective_penalty |
|
|