varshu23's picture
Clean commit without images
5a22808
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import logging
from typing import List, Dict
import numpy as np
try:
from .thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS
except (ImportError, ValueError):
try:
from thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS
except ImportError:
from server.thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS
logger = logging.getLogger(__name__)
class ThermalGridGrader:
"""
Programmatic grader for the Thermal Grid RL environment.
Evaluates episode performance and returns a normalized score [0.0, 1.0].
"""
def __init__(self, task_id: str):
try:
self.task_id = ThermalGridTaskID(task_id)
except ValueError:
self.task_id = ThermalGridTaskID.BASELINE
self.config = TASKS[self.task_id]
self.history: List[Dict] = []
def log_thermal_grid_step(self, observation, reward: float):
"""Record a step's data for final grading."""
sub_rewards = observation.metadata.get("sub_rewards", {
"energy_cost_score": 0.0,
"thermal_safety_score": 0.0,
"workload_throughput_score": 0.0,
"compliance_score": 1.0,
})
self.history.append({
"pue": observation.pue,
"reward": reward,
"max_cpu_temp": max(observation.max_cpu_temps_c) if observation.max_cpu_temps_c else 0.0,
"energy_price": observation.energy_price_per_kwh,
"dr_signal": observation.demand_response_signal,
"it_power": observation.total_it_power_w,
"facility_power": observation.total_facility_power_w,
"sub_rewards": sub_rewards,
})
def get_thermal_grid_score(self) -> float:
"""
Calculate final normalized score [0.0, 1.0] based on task-specific criteria.
"""
if not self.history:
return 0.0
n_steps = len(self.history)
avg_pue = np.mean([s["pue"] for s in self.history])
avg_reward = np.mean([s["reward"] for s in self.history])
max_temp_ever = max([s["max_cpu_temp"] for s in self.history])
# 1. Safety Score (Common to all tasks)
# 85C is max. We penalize heavily if exceeded.
safety_score = max(0.0, 1.0 - max(0.0, max_temp_ever - 80.0) / 10.0)
# 2. Efficiency Score (PUE focus)
# Target 1.25 -> 1.0 score. 1.50 -> 0.0 score.
pue_score = max(0.0, 1.0 - (avg_pue - 1.1) / 0.4)
if self.task_id == ThermalGridTaskID.BASELINE:
# Easy: Just stay safe and reasonably efficient
return float(np.clip(0.6 * pue_score + 0.4 * safety_score, 0.0, 1.0))
elif self.task_id == ThermalGridTaskID.LOAD_SHIFT:
# Medium: Focus on reward (which includes cost) and PUE
# We normalize reward to [0, 1] range roughly.
# Base reward is 1.0 - cost. 1.0 is great, 0.0 is okay, negative is bad.
reward_score = max(0.0, avg_reward)
return float(np.clip(0.5 * reward_score + 0.3 * pue_score + 0.2 * safety_score, 0.0, 1.0))
elif self.task_id == ThermalGridTaskID.GRID_STRESS:
# Hard: Focus on and safety under extreme heat
# A reward score of 1.0 here is extremely hard.
reward_score = max(0.0, avg_reward)
# Extra penalty for safety violations in hard task
hard_safety = safety_score if max_temp_ever < 85.0 else 0.0
return float(np.clip(0.7 * reward_score + 0.3 * hard_safety, 0.0, 1.0))
def get_rubric_scores(self) -> dict:
"""
Returns average per-rubric scores over the episode.
Each score is in [0, 1] (higher is better).
"""
if not self.history:
return {
"energy": 0.0, "safety": 0.0,
"workload": 0.0, "compliance": 1.0,
}
avg_energy = float(np.mean([h["sub_rewards"].get("energy_cost_score", 0.0) for h in self.history]))
avg_safety = float(np.mean([h["sub_rewards"].get("thermal_safety_score", 0.0) for h in self.history]))
avg_workload = float(np.mean([h["sub_rewards"].get("workload_throughput_score", 0.0) for h in self.history]))
avg_compliance = float(np.mean([h["sub_rewards"].get("compliance_score", 1.0) for h in self.history]))
return {
"energy": avg_energy,
"safety": avg_safety,
"workload": avg_workload,
"compliance": avg_compliance,
}
def _run_grading_episodes(env, agent, task_id: str, num_episodes: int = 3, max_steps: int = 24) -> float:
"""Helper to run multiple evaluation episodes and return an average score in [0, 1]."""
grader = ThermalGridGrader(task_id=task_id)
for _ in range(num_episodes):
observation = env.reset(task_id=task_id)
for _ in range(max_steps):
# Convert Pydantic observation to vectorized observation (as gym_env does)
obs_vec = np.concatenate([
np.array(observation.inlet_temps_c, dtype=np.float32),
np.array(observation.mean_cpu_temps_c, dtype=np.float32),
np.array(observation.max_cpu_temps_c, dtype=np.float32),
np.array(observation.max_gpu_temps_c, dtype=np.float32), # Added GPU monitoring (67 dims total)
np.array([observation.thermal_mass_lag_c_per_min], dtype=np.float32),
np.array(observation.rack_powers_w, dtype=np.float32) / 10000.0,
np.array(observation.rack_utilisation, dtype=np.float32),
np.array([observation.live_traffic_load_w], dtype=np.float32) / 10000.0,
np.array([observation.pue], dtype=np.float32),
np.array([observation.ambient_temp_c], dtype=np.float32),
np.array([observation.energy_price_per_kwh], dtype=np.float32),
np.array([observation.demand_response_signal], dtype=np.float32),
np.array([observation.off_peak_window], dtype=np.float32),
]).astype(np.float32)
# agent.predict is typical for stable-baselines3
action_data = agent.predict(obs_vec)
# Handle (action, state) or action return
action_vec = action_data[0] if isinstance(action_data, tuple) else action_data
# Convert vector action back to Pydantic for the env
try:
from ..models import ThermalGridRlAgentAction
except (ImportError, ValueError):
try:
from models import ThermalGridRlAgentAction
except ImportError:
from thermal_grid_rl_agent.models import ThermalGridRlAgentAction
tg_action = ThermalGridRlAgentAction(
crac_setpoint_c=12.0 + action_vec[0] * 15.0,
fan_speeds_pct=[20.0 + action_vec[1] * 80.0] * 10,
num_active_chillers=int(1 + action_vec[2] * 3),
batch_job_schedule=list(range(int(action_vec[3] * env._batch_queue.pending_count))),
workload_matrix=[[0.5] * 8] * 10,
power_caps_w=[[50.0 + action_vec[4] * 450.0] * 8] * 10,
region_traffic_weights=[0.5, 0.5]
)
observation = env.step(tg_action)
grader.log_thermal_grid_step(observation, getattr(observation, 'reward', 0.0))
if getattr(observation, 'done', False):
break
return grader.get_thermal_grid_score()
def grade_baseline(env, agent) -> float:
return _run_grading_episodes(env, agent, "baseline")
def grade_load_shift(env, agent) -> float:
return _run_grading_episodes(env, agent, "load_shift")
def grade_grid_stress(env, agent) -> float:
return _run_grading_episodes(env, agent, "grid_stress")