noc_agent / simulator.py
srini047's picture
Upload folder using huggingface_hub
5899fec verified
"""
SystemSimulator — the core state machine for NOC incident simulation.
Responsibilities (Single Responsibility Principle):
- Maintain and evolve SystemMetrics across episode steps
- Apply incident drift and action effects
- Track episode termination conditions
The simulator is framework-agnostic: it is shared by both the Gymnasium
training env and the OpenEnv HTTP server.
"""
from __future__ import annotations
import random
from dataclasses import dataclass, field
import numpy as np
from .incidents import IncidentProfile, get_profile
from .models import ActionType, IncidentType, SystemMetrics
# Gaussian noise std applied to every metric delta (adds realism)
_NOISE_STD: float = 0.008
# Consecutive steps all metrics must be within healthy range to count as resolved
_RESOLUTION_STREAK_REQUIRED: int = 3
@dataclass
class StepInfo:
"""Carries extra context returned alongside metrics after each step."""
reward: float
done: bool
truncated: bool
crashed: bool
resolved: bool
action_was_effective: bool
step: int
class SystemSimulator:
"""
Simulates a Linux system node affected by a single incident.
Usage::
sim = SystemSimulator(seed=42)
metrics = sim.reset(IncidentType.CPU_OVERLOAD)
metrics, info = sim.step(ActionType.THROTTLE_CPU)
"""
def __init__(self, seed: int | None = None) -> None:
self._rng = np.random.default_rng(seed)
self._random = random.Random(seed)
self._profile: IncidentProfile | None = None
self._metrics: SystemMetrics | None = None
self._step: int = 0
self._resolution_streak: int = 0
self._prev_health: float = 0.0
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def reset(self, incident_type: IncidentType) -> SystemMetrics:
"""
Start a new episode with the given incident type.
Returns the initial SystemMetrics observation.
"""
self._profile = get_profile(incident_type)
self._step = 0
self._resolution_streak = 0
self._metrics = SystemMetrics(
cpu_usage=self._jitter(self._profile.initial_cpu),
memory_usage=self._jitter(self._profile.initial_memory),
latency=self._jitter(self._profile.initial_latency),
packet_loss=self._jitter(self._profile.initial_packet_loss),
service_healthy=self._profile.initial_service_healthy,
error_rate=self._jitter(self._profile.initial_error_rate),
)
self._prev_health = self._metrics.health_score
return self._metrics
def step(self, action: ActionType) -> tuple[SystemMetrics, StepInfo]:
"""
Advance the simulation by one step.
- First applies incident drift (worsening)
- Then applies the chosen action effect
- Adds small Gaussian noise for realism
- Checks resolution and crash conditions
Returns the updated metrics and a StepInfo with reward and flags.
"""
if self._profile is None or self._metrics is None:
raise RuntimeError("Call reset() before step()")
effect = self._profile.action_effects[action]
drift = self._profile.drift
# Combine drift + action effect into a single delta per metric
cpu = self._metrics.cpu_usage + drift.cpu_usage + effect.cpu_usage
memory = self._metrics.memory_usage + drift.memory_usage + effect.memory_usage
latency = self._metrics.latency + drift.latency + effect.latency
packet_loss = self._metrics.packet_loss + drift.packet_loss + effect.packet_loss
error_rate = self._metrics.error_rate + drift.error_rate + effect.error_rate
# Service healthy: restart brings it back to 1.0 after one step down
if action == ActionType.RESTART_SERVICE:
service_healthy = 0.0 # momentarily down during restart
else:
# Gradually recover if error_rate is falling
service_healthy = 1.0 if error_rate < 0.50 else max(0.0, self._metrics.service_healthy - 0.1)
# Add Gaussian noise to all continuous metrics
noise = self._rng.normal(0.0, _NOISE_STD, 5)
cpu += noise[0]
memory += noise[1]
latency += noise[2]
packet_loss += noise[3]
error_rate += noise[4]
# Clamp all values to [0, 1]
self._metrics = SystemMetrics(
cpu_usage=float(np.clip(cpu, 0.0, 1.0)),
memory_usage=float(np.clip(memory, 0.0, 1.0)),
latency=float(np.clip(latency, 0.0, 1.0)),
packet_loss=float(np.clip(packet_loss, 0.0, 1.0)),
service_healthy=float(np.clip(service_healthy, 0.0, 1.0)),
error_rate=float(np.clip(error_rate, 0.0, 1.0)),
)
self._step += 1
current_health = self._metrics.health_score
# Resolution streak tracking
if self._metrics.is_resolved:
self._resolution_streak += 1
else:
self._resolution_streak = 0
resolved = self._resolution_streak >= _RESOLUTION_STREAK_REQUIRED
crashed = self._metrics.is_critical
truncated = self._step >= self._profile.max_steps
done = resolved or crashed or truncated
# Detect whether the action had any meaningful positive effect
action_was_effective = current_health > self._prev_health + 0.005
self._prev_health = current_health
reward = self._calculate_reward(
metrics=self._metrics,
prev_health=self._prev_health,
current_health=current_health,
resolved=resolved,
crashed=crashed,
action=action,
action_was_effective=action_was_effective,
)
return self._metrics, StepInfo(
reward=reward,
done=done,
truncated=truncated,
crashed=crashed,
resolved=resolved,
action_was_effective=action_was_effective,
step=self._step,
)
@property
def current_metrics(self) -> SystemMetrics | None:
return self._metrics
@property
def current_step(self) -> int:
return self._step
@property
def profile(self) -> IncidentProfile | None:
return self._profile
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _jitter(self, value: float, std: float = 0.02) -> float:
"""Add small noise to initial values so each episode differs slightly."""
noisy = value + float(self._rng.normal(0.0, std))
return float(np.clip(noisy, 0.0, 1.0))
def _calculate_reward(
self,
metrics: SystemMetrics, # noqa: ARG002 — reserved for future metric-specific shaping
prev_health: float,
current_health: float,
resolved: bool,
crashed: bool,
action: ActionType,
action_was_effective: bool,
) -> float:
# Continuous health signal: reward improvement, penalise worsening
health_delta = (current_health - prev_health) * 5.0
# Absolute health bonus: always positive nudge toward staying healthy
health_bonus = current_health * 0.5
# Per-step cost: encourages speed
step_penalty = -0.10
# Terminal bonuses/penalties
resolution_bonus = 15.0 if resolved else 0.0
crash_penalty = -10.0 if crashed else 0.0
# Penalise completely ineffective actions (not DO_NOTHING which is a valid choice)
ineffective_penalty = (
-0.30
if (not action_was_effective and action != ActionType.DO_NOTHING and not resolved)
else 0.0
)
return health_delta + health_bonus + step_penalty + resolution_bonus + crash_penalty + ineffective_penalty