dc_ops_env / server /dc_ops_env_environment.py
Melikshah's picture
Upload folder using huggingface_hub
91495a2 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
DC-Ops Environment Implementation.
Wires the thermal and power simulations into OpenEnv's Environment interface.
Each step:
1. Parse the agent's command
2. Apply mutations to simulation state
3. Advance simulation by game-time dt (default 60s)
4. Render dashboard observation
5. Compute reward (via multi-objective RewardFunction)
6. Check termination conditions
"""
from __future__ import annotations
from typing import Any, Optional
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..config import (
ASHRAE_CLASSES,
DatacenterConfig,
PowerConfig,
make_default_datacenter_config,
load_datacenter_config,
)
from ..models import DcOpsAction, DcOpsObservation
from ..actions.parser import AVAILABLE_ACTIONS, CommandResult, parse_command
from ..rendering.dashboard import render_dashboard
from ..simulation.thermal import ThermalAlarm, ThermalSimulation
from ..simulation.power import PowerAlarm, PowerSimulation
from ..scenarios.base import Scenario, ScenarioResult
from ..scenarios.registry import get_scenario, random_scenario
from ..rewards.reward_function import RewardFunction
except ImportError:
from config import (
ASHRAE_CLASSES,
DatacenterConfig,
PowerConfig,
make_default_datacenter_config,
load_datacenter_config,
)
from models import DcOpsAction, DcOpsObservation
from actions.parser import AVAILABLE_ACTIONS, CommandResult, parse_command
from rendering.dashboard import render_dashboard
from simulation.thermal import ThermalAlarm, ThermalSimulation
from simulation.power import PowerAlarm, PowerSimulation
from scenarios.base import Scenario, ScenarioResult
from scenarios.registry import get_scenario, random_scenario
from rewards.reward_function import RewardFunction
# Default episode configuration
DEFAULT_STEP_BUDGET = 15
DEFAULT_GAME_TIME_PER_STEP_S = 60.0 # 1 minute of sim time per agent step
DEFAULT_SIM_DT_S = 1.0 # Physics integration timestep
class DcOpsEnvironment(Environment):
"""Datacenter operations environment for LLM-based RL agents.
The agent observes a text-based monitoring dashboard and issues
natural-language operator commands. The environment simulates
physics-based thermal and power dynamics.
Episode flow:
1. reset() initializes the datacenter and optionally injects a fault
2. step() parses the command, advances simulation, returns dashboard
3. Episode ends on: budget exhaustion, critical failure, escalation, or resolution
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self) -> None:
self._state = State(episode_id=str(uuid4()), step_count=0)
self._thermal_sim: ThermalSimulation | None = None
self._power_sim: PowerSimulation | None = None
self._config: DatacenterConfig | None = None
self._scenario: Scenario | None = None
self._reward_fn: RewardFunction | None = None
self._step_budget: int = DEFAULT_STEP_BUDGET
self._game_time_per_step_s: float = DEFAULT_GAME_TIME_PER_STEP_S
self._sim_dt_s: float = DEFAULT_SIM_DT_S
self._alert: str = ""
self._scenario_type: str = ""
self._done: bool = False
self._cumulative_reward: float = 0.0
self._action_history: list[str] = []
self._escalated: bool = False
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> DcOpsObservation:
"""Reset the environment and return initial observation.
Kwargs:
scenario (str | Scenario): Scenario ID (e.g. 'A1') or Scenario instance.
If provided, overrides config/alert/step_budget/scenario_type.
If not provided, uses raw kwargs (backward compatible).
config (DatacenterConfig): Custom datacenter configuration.
config_name (str): Built-in config name ("default", "small", "large").
Used when config is not provided (e.g. from WebSocket/HTTP JSON).
step_budget (int): Max steps for the episode.
game_time_per_step_s (float): Simulation time per step.
scenario_type (str): Scenario category label.
alert (str): Initial alert message.
fault_injection (dict): Fault to inject, e.g.
{"type": "crac_fault", "unit_id": "CRAC-3", "fault": "compressor"}
"""
# Episode state
self._state = State(
episode_id=episode_id or str(uuid4()),
step_count=0,
)
self._done = False
self._cumulative_reward = 0.0
self._action_history = []
self._escalated = False
# Resolve scenario
scenario_arg = kwargs.get("scenario")
if isinstance(scenario_arg, str):
self._scenario = get_scenario(scenario_arg)
elif isinstance(scenario_arg, Scenario):
self._scenario = scenario_arg
elif scenario_arg is None and kwargs.get("random_scenario"):
self._scenario = random_scenario(
scenario_type=kwargs.get("scenario_type"),
difficulty=kwargs.get("difficulty"),
seed=seed,
)
else:
self._scenario = None
# Reset scenario mutable state (counters, flags) for episode reuse
if self._scenario:
self._scenario.reset_state()
# Configuration — scenario can modify the base config
# Support config_name (string) from JSON APIs, or config (DatacenterConfig) from Python
config_arg = kwargs.get("config")
config_name = kwargs.get("config_name")
if isinstance(config_arg, DatacenterConfig):
self._config = config_arg
elif config_name and isinstance(config_name, str) and config_name != "default":
self._config = load_datacenter_config(config_name)
else:
self._config = make_default_datacenter_config()
if self._scenario:
self._config = self._scenario.configure(self._config)
# Episode parameters — scenario provides defaults, kwargs can override
if self._scenario:
self._step_budget = kwargs.get("step_budget", self._scenario.step_budget)
self._game_time_per_step_s = kwargs.get(
"game_time_per_step_s", self._scenario.game_time_per_step_s
)
self._scenario_type = kwargs.get("scenario_type", self._scenario.scenario_type)
self._alert = kwargs.get("alert", self._scenario.alert_message)
else:
self._step_budget = kwargs.get("step_budget", DEFAULT_STEP_BUDGET)
self._game_time_per_step_s = kwargs.get("game_time_per_step_s", DEFAULT_GAME_TIME_PER_STEP_S)
self._scenario_type = kwargs.get("scenario_type", "")
self._alert = kwargs.get("alert", "")
self._sim_dt_s = self._config.simulation_dt_s
# Initialize reward function with scenario-type-aware weights
self._reward_fn = RewardFunction(scenario_type=self._scenario_type)
self._reward_fn.reset() # Defensive: ensure clean state
# Initialize simulations
self._thermal_sim = ThermalSimulation(self._config)
# Initialize power sim if config has power infrastructure
if self._config.power and self._config.power.ups_units:
it_load = self._thermal_sim.state.total_it_load_kw
self._power_sim = PowerSimulation(self._config.power, it_load_kw=it_load)
# Wire power state into datacenter state
self._thermal_sim.state.power = self._power_sim.state
else:
self._power_sim = None
# Apply fault injection — scenario or raw kwargs
if self._scenario:
# Warmup FIRST, then inject fault (so DC is at steady-state)
self._warmup_simulation()
self._scenario.inject_fault(self._thermal_sim, self._power_sim)
else:
fault = kwargs.get("fault_injection")
if fault:
self._apply_fault_injection(fault)
self._warmup_simulation()
# Render initial observation
return self._make_observation(action_result="Environment initialized. Awaiting your command.")
def step(
self,
action: DcOpsAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> DcOpsObservation:
"""Execute one agent step.
1. Parse and execute the command
2. Advance simulation by game_time_per_step_s
3. Check for alarms and termination
4. Compute reward via RewardFunction
5. Return observation
"""
if self._done:
return self._make_observation(
action_result="Episode already ended. Call reset().",
reward=0.0,
)
self._state.step_count += 1
self._action_history.append(action.command)
# 1. Parse and execute command
cmd_result = parse_command(
action.command,
self._thermal_sim,
self._power_sim,
)
# Handle escalation
if cmd_result.command_name == "escalate":
self._escalated = True
self._done = True
# Evaluate scenario for procedure penalties
scenario_result: ScenarioResult | None = None
if self._scenario:
scenario_result = self._scenario.evaluate_step(
self._thermal_sim, self._power_sim,
action.command, self._action_history,
self._state.step_count,
)
# Compute base reward components — escalation penalty is handled
# by scenario procedure rules + action_quality, not doubled here
components = self._reward_fn.compute(
self._thermal_sim, self._power_sim, cmd_result,
action.command, self._action_history, scenario_result,
)
reward = components.total
self._cumulative_reward += reward
return self._make_observation(
action_result=cmd_result.message,
reward=reward,
)
# 2. Handle acknowledge_alarm — clear alert before new alarms overwrite
if cmd_result.command_name == "acknowledge_alarm" and cmd_result.success:
self._alert = ""
# 3. Advance simulation
thermal_alarms, power_alarms = self._advance_simulation()
# 4. Build alert from alarms (only new critical/warning alarms override)
self._update_alert(thermal_alarms, power_alarms)
# 5. Evaluate scenario (before reward, so progress is available)
scenario_result = None
if self._scenario:
scenario_result = self._scenario.evaluate_step(
self._thermal_sim, self._power_sim,
action.command, self._action_history,
self._state.step_count,
)
# 6. Compute reward via RewardFunction
components = self._reward_fn.compute(
self._thermal_sim, self._power_sim, cmd_result,
action.command, self._action_history, scenario_result,
)
reward = components.total
self._cumulative_reward += reward
# 7. Check termination
self._check_termination(thermal_alarms, power_alarms)
# 7b. Scenario resolution
if scenario_result and scenario_result.resolved and not self._done:
self._done = True
# Speed bonus: fraction of budget remaining
speed_bonus = (self._step_budget - self._state.step_count) / self._step_budget
reward += speed_bonus
self._cumulative_reward += speed_bonus
if scenario_result.resolution_message:
self._alert = scenario_result.resolution_message
return self._make_observation(
action_result=cmd_result.message,
reward=reward,
)
@property
def state(self) -> State:
return self._state
# -------------------------------------------------------------------
# Internal methods
# -------------------------------------------------------------------
def _warmup_simulation(self, warmup_steps: int = 120) -> None:
"""Run simulation for a brief warmup to reach quasi-steady-state."""
for _ in range(warmup_steps):
self._thermal_sim.step(self._sim_dt_s)
if self._power_sim:
it_load = self._thermal_sim.state.total_it_load_kw
self._power_sim.step(self._sim_dt_s, it_load)
def _advance_simulation(self) -> tuple[list[ThermalAlarm], list[PowerAlarm]]:
"""Advance simulation by game_time_per_step_s seconds."""
n_substeps = int(self._game_time_per_step_s / self._sim_dt_s)
all_thermal_alarms: list[ThermalAlarm] = []
all_power_alarms: list[PowerAlarm] = []
for _ in range(n_substeps):
# Thermal step
thermal_result = self._thermal_sim.step(self._sim_dt_s)
all_thermal_alarms.extend(thermal_result.alarms)
# Power step
if self._power_sim:
it_load = self._thermal_sim.state.total_it_load_kw
power_result = self._power_sim.step(self._sim_dt_s, it_load)
all_power_alarms.extend(power_result.alarms)
# Deduplicate alarms by type (keep most recent)
thermal_alarms = _dedupe_alarms_by_type(all_thermal_alarms)
power_alarms = _dedupe_alarms_by_type(all_power_alarms)
return thermal_alarms, power_alarms
def _update_alert(
self,
thermal_alarms: list[ThermalAlarm],
power_alarms: list[PowerAlarm],
) -> None:
"""Update the active alert string from current alarms."""
critical_messages: list[str] = []
for alarm in thermal_alarms:
if alarm.severity == "critical":
critical_messages.append(alarm.message)
for alarm in power_alarms:
if alarm.severity == "critical":
critical_messages.append(alarm.message)
if critical_messages:
self._alert = " | ".join(critical_messages[:3]) # Limit to 3 alerts
else:
# Check for warnings
warnings = []
for alarm in thermal_alarms:
if alarm.severity == "warning":
warnings.append(alarm.message)
for alarm in power_alarms:
if alarm.severity == "warning":
warnings.append(alarm.message)
if warnings:
self._alert = warnings[0]
else:
self._alert = ""
def _check_termination(
self,
thermal_alarms: list[ThermalAlarm],
power_alarms: list[PowerAlarm],
) -> None:
"""Check if episode should end."""
# Step budget exhausted
if self._state.step_count >= self._step_budget:
self._done = True
return
# Critical thermal failure: any rack above allowable max
for zone in self._thermal_sim.state.zones:
ashrae = ASHRAE_CLASSES.get(zone.ashrae_class)
if not ashrae:
continue
if zone.max_inlet_temp_c > ashrae.allowable_max_c + 5.0:
self._done = True
self._alert = (
f"CRITICAL: Zone {zone.zone_id} inlet temp "
f"{zone.max_inlet_temp_c:.1f}°C exceeds allowable max "
f"{ashrae.allowable_max_c:.1f}°C by >5°C. Emergency shutdown."
)
return
# UPS battery exhausted
if self._power_sim:
for ups in self._power_sim.state.ups_units:
if ups.mode.value == "fault" and ups.battery_soc <= 0:
self._done = True
self._alert = f"CRITICAL: {ups.unit_id} battery exhausted. Unprotected load."
return
def _apply_fault_injection(self, fault: dict) -> None:
"""Apply a fault injection to the simulation.
Supported fault types:
- crac_fault: {"type": "crac_fault", "unit_id": "CRAC-3", "fault": "compressor"}
- utility_loss: {"type": "utility_loss"}
- ups_fault: {"type": "ups_fault", "unit_id": "UPS-1"}
- rack_load_change: {"type": "rack_load_change", "rack_id": "A-01", "load_kw": 15.0}
- outside_temp: {"type": "outside_temp", "temp_c": 42.0}
"""
fault_type = fault.get("type", "")
if fault_type == "crac_fault":
from ..simulation.types import CRACFaultType
unit_id = fault.get("unit_id", "")
fault_name = fault.get("fault", "compressor")
try:
ft = CRACFaultType(fault_name)
except ValueError:
ft = CRACFaultType.COMPRESSOR
self._thermal_sim.inject_crac_fault(unit_id, ft)
elif fault_type == "utility_loss":
if self._power_sim:
self._power_sim.set_utility_available(False)
elif fault_type == "ups_fault":
if self._power_sim:
unit_id = fault.get("unit_id", "")
self._power_sim.inject_ups_fault(unit_id)
elif fault_type == "rack_load_change":
rack_id = fault.get("rack_id", "")
load_kw = fault.get("load_kw", 8.0)
self._thermal_sim.set_rack_load(rack_id, load_kw)
elif fault_type == "outside_temp":
temp_c = fault.get("temp_c", 35.0)
self._thermal_sim.set_outside_temp(temp_c)
def _make_observation(
self,
action_result: str = "",
reward: float = 0.0,
) -> DcOpsObservation:
"""Build the observation to return to the agent."""
dashboard = render_dashboard(
self._thermal_sim.state,
alert=self._alert,
step=self._state.step_count,
max_steps=self._step_budget,
scenario_type=self._scenario_type,
)
steps_remaining = max(0, self._step_budget - self._state.step_count)
# Build metadata with structured data
dc_state = self._thermal_sim.state
metadata = {
"sim_time_s": dc_state.sim_time_s,
"total_it_load_kw": dc_state.total_it_load_kw,
"total_cooling_power_kw": dc_state.total_cooling_power_kw,
"pue": dc_state.pue,
"outside_temp_c": dc_state.outside_temp_c,
"cumulative_reward": self._cumulative_reward,
"zones": {},
}
for zone in dc_state.zones:
metadata["zones"][zone.zone_id] = {
"cold_aisle_temp_c": zone.cold_aisle_temp_c,
"hot_aisle_temp_c": zone.hot_aisle_temp_c,
"max_inlet_temp_c": zone.max_inlet_temp_c,
"total_it_load_kw": zone.total_it_load_kw,
}
if self._power_sim:
power = self._power_sim.state
metadata["power"] = {
"utility_available": power.utility_available,
"on_generator": power.on_generator,
"total_ups_loss_kw": power.total_ups_loss_kw,
"total_pdu_loss_kw": power.total_pdu_loss_kw,
}
for ups in power.ups_units:
metadata["power"][ups.unit_id] = {
"mode": ups.mode.value,
"battery_soc": ups.battery_soc,
"load_fraction": ups.load_fraction,
"efficiency": ups.efficiency,
}
if self._scenario:
metadata["scenario"] = {
"id": self._scenario.scenario_id,
"name": self._scenario.name,
"difficulty": self._scenario.difficulty,
}
# Use scenario-specific actions if defined, otherwise all actions
actions = AVAILABLE_ACTIONS
if self._scenario and self._scenario.available_actions is not None:
actions = self._scenario.available_actions
return DcOpsObservation(
dashboard=dashboard,
available_actions=actions,
alert=self._alert,
scenario_type=self._scenario_type,
steps_remaining=steps_remaining,
action_result=action_result,
done=self._done,
reward=reward,
metadata=metadata,
)
def _dedupe_alarms_by_type(alarms: list) -> list:
"""Keep only the last alarm of each (component, alarm_type) pair."""
seen: dict[tuple[str, str], Any] = {}
for alarm in alarms:
key = (getattr(alarm, "component", ""), getattr(alarm, "alarm_type", ""))
seen[key] = alarm
return list(seen.values())