File size: 7,987 Bytes
5a22808 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | # Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import logging
from typing import List, Dict
import numpy as np
try:
from .thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS
except (ImportError, ValueError):
try:
from thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS
except ImportError:
from server.thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS
logger = logging.getLogger(__name__)
class ThermalGridGrader:
"""
Programmatic grader for the Thermal Grid RL environment.
Evaluates episode performance and returns a normalized score [0.0, 1.0].
"""
def __init__(self, task_id: str):
try:
self.task_id = ThermalGridTaskID(task_id)
except ValueError:
self.task_id = ThermalGridTaskID.BASELINE
self.config = TASKS[self.task_id]
self.history: List[Dict] = []
def log_thermal_grid_step(self, observation, reward: float):
"""Record a step's data for final grading."""
sub_rewards = observation.metadata.get("sub_rewards", {
"energy_cost_score": 0.0,
"thermal_safety_score": 0.0,
"workload_throughput_score": 0.0,
"compliance_score": 1.0,
})
self.history.append({
"pue": observation.pue,
"reward": reward,
"max_cpu_temp": max(observation.max_cpu_temps_c) if observation.max_cpu_temps_c else 0.0,
"energy_price": observation.energy_price_per_kwh,
"dr_signal": observation.demand_response_signal,
"it_power": observation.total_it_power_w,
"facility_power": observation.total_facility_power_w,
"sub_rewards": sub_rewards,
})
def get_thermal_grid_score(self) -> float:
"""
Calculate final normalized score [0.0, 1.0] based on task-specific criteria.
"""
if not self.history:
return 0.0
n_steps = len(self.history)
avg_pue = np.mean([s["pue"] for s in self.history])
avg_reward = np.mean([s["reward"] for s in self.history])
max_temp_ever = max([s["max_cpu_temp"] for s in self.history])
# 1. Safety Score (Common to all tasks)
# 85C is max. We penalize heavily if exceeded.
safety_score = max(0.0, 1.0 - max(0.0, max_temp_ever - 80.0) / 10.0)
# 2. Efficiency Score (PUE focus)
# Target 1.25 -> 1.0 score. 1.50 -> 0.0 score.
pue_score = max(0.0, 1.0 - (avg_pue - 1.1) / 0.4)
if self.task_id == ThermalGridTaskID.BASELINE:
# Easy: Just stay safe and reasonably efficient
return float(np.clip(0.6 * pue_score + 0.4 * safety_score, 0.0, 1.0))
elif self.task_id == ThermalGridTaskID.LOAD_SHIFT:
# Medium: Focus on reward (which includes cost) and PUE
# We normalize reward to [0, 1] range roughly.
# Base reward is 1.0 - cost. 1.0 is great, 0.0 is okay, negative is bad.
reward_score = max(0.0, avg_reward)
return float(np.clip(0.5 * reward_score + 0.3 * pue_score + 0.2 * safety_score, 0.0, 1.0))
elif self.task_id == ThermalGridTaskID.GRID_STRESS:
# Hard: Focus on and safety under extreme heat
# A reward score of 1.0 here is extremely hard.
reward_score = max(0.0, avg_reward)
# Extra penalty for safety violations in hard task
hard_safety = safety_score if max_temp_ever < 85.0 else 0.0
return float(np.clip(0.7 * reward_score + 0.3 * hard_safety, 0.0, 1.0))
def get_rubric_scores(self) -> dict:
"""
Returns average per-rubric scores over the episode.
Each score is in [0, 1] (higher is better).
"""
if not self.history:
return {
"energy": 0.0, "safety": 0.0,
"workload": 0.0, "compliance": 1.0,
}
avg_energy = float(np.mean([h["sub_rewards"].get("energy_cost_score", 0.0) for h in self.history]))
avg_safety = float(np.mean([h["sub_rewards"].get("thermal_safety_score", 0.0) for h in self.history]))
avg_workload = float(np.mean([h["sub_rewards"].get("workload_throughput_score", 0.0) for h in self.history]))
avg_compliance = float(np.mean([h["sub_rewards"].get("compliance_score", 1.0) for h in self.history]))
return {
"energy": avg_energy,
"safety": avg_safety,
"workload": avg_workload,
"compliance": avg_compliance,
}
def _run_grading_episodes(env, agent, task_id: str, num_episodes: int = 3, max_steps: int = 24) -> float:
"""Helper to run multiple evaluation episodes and return an average score in [0, 1]."""
grader = ThermalGridGrader(task_id=task_id)
for _ in range(num_episodes):
observation = env.reset(task_id=task_id)
for _ in range(max_steps):
# Convert Pydantic observation to vectorized observation (as gym_env does)
obs_vec = np.concatenate([
np.array(observation.inlet_temps_c, dtype=np.float32),
np.array(observation.mean_cpu_temps_c, dtype=np.float32),
np.array(observation.max_cpu_temps_c, dtype=np.float32),
np.array(observation.max_gpu_temps_c, dtype=np.float32), # Added GPU monitoring (67 dims total)
np.array([observation.thermal_mass_lag_c_per_min], dtype=np.float32),
np.array(observation.rack_powers_w, dtype=np.float32) / 10000.0,
np.array(observation.rack_utilisation, dtype=np.float32),
np.array([observation.live_traffic_load_w], dtype=np.float32) / 10000.0,
np.array([observation.pue], dtype=np.float32),
np.array([observation.ambient_temp_c], dtype=np.float32),
np.array([observation.energy_price_per_kwh], dtype=np.float32),
np.array([observation.demand_response_signal], dtype=np.float32),
np.array([observation.off_peak_window], dtype=np.float32),
]).astype(np.float32)
# agent.predict is typical for stable-baselines3
action_data = agent.predict(obs_vec)
# Handle (action, state) or action return
action_vec = action_data[0] if isinstance(action_data, tuple) else action_data
# Convert vector action back to Pydantic for the env
try:
from ..models import ThermalGridRlAgentAction
except (ImportError, ValueError):
try:
from models import ThermalGridRlAgentAction
except ImportError:
from thermal_grid_rl_agent.models import ThermalGridRlAgentAction
tg_action = ThermalGridRlAgentAction(
crac_setpoint_c=12.0 + action_vec[0] * 15.0,
fan_speeds_pct=[20.0 + action_vec[1] * 80.0] * 10,
num_active_chillers=int(1 + action_vec[2] * 3),
batch_job_schedule=list(range(int(action_vec[3] * env._batch_queue.pending_count))),
workload_matrix=[[0.5] * 8] * 10,
power_caps_w=[[50.0 + action_vec[4] * 450.0] * 8] * 10,
region_traffic_weights=[0.5, 0.5]
)
observation = env.step(tg_action)
grader.log_thermal_grid_step(observation, getattr(observation, 'reward', 0.0))
if getattr(observation, 'done', False):
break
return grader.get_thermal_grid_score()
def grade_baseline(env, agent) -> float:
return _run_grading_episodes(env, agent, "baseline")
def grade_load_shift(env, agent) -> float:
return _run_grading_episodes(env, agent, "load_shift")
def grade_grid_stress(env, agent) -> float:
return _run_grading_episodes(env, agent, "grid_stress")
|