| |
| |
|
|
| import logging |
| from typing import List, Dict |
| import numpy as np |
| try: |
| from .thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS |
| except (ImportError, ValueError): |
| try: |
| from thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS |
| except ImportError: |
| from server.thermal_grid_rl_agent_environment import ThermalGridTaskID, TASKS |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class ThermalGridGrader: |
| """ |
| Programmatic grader for the Thermal Grid RL environment. |
| Evaluates episode performance and returns a normalized score [0.0, 1.0]. |
| """ |
|
|
| def __init__(self, task_id: str): |
| try: |
| self.task_id = ThermalGridTaskID(task_id) |
| except ValueError: |
| self.task_id = ThermalGridTaskID.BASELINE |
| |
| self.config = TASKS[self.task_id] |
| self.history: List[Dict] = [] |
|
|
| def log_thermal_grid_step(self, observation, reward: float): |
| """Record a step's data for final grading.""" |
| sub_rewards = observation.metadata.get("sub_rewards", { |
| "energy_cost_score": 0.0, |
| "thermal_safety_score": 0.0, |
| "workload_throughput_score": 0.0, |
| "compliance_score": 1.0, |
| }) |
| self.history.append({ |
| "pue": observation.pue, |
| "reward": reward, |
| "max_cpu_temp": max(observation.max_cpu_temps_c) if observation.max_cpu_temps_c else 0.0, |
| "energy_price": observation.energy_price_per_kwh, |
| "dr_signal": observation.demand_response_signal, |
| "it_power": observation.total_it_power_w, |
| "facility_power": observation.total_facility_power_w, |
| "sub_rewards": sub_rewards, |
| }) |
|
|
| def get_thermal_grid_score(self) -> float: |
| """ |
| Calculate final normalized score [0.0, 1.0] based on task-specific criteria. |
| """ |
| if not self.history: |
| return 0.0 |
|
|
| n_steps = len(self.history) |
| avg_pue = np.mean([s["pue"] for s in self.history]) |
| avg_reward = np.mean([s["reward"] for s in self.history]) |
| max_temp_ever = max([s["max_cpu_temp"] for s in self.history]) |
| |
| |
| |
| safety_score = max(0.0, 1.0 - max(0.0, max_temp_ever - 80.0) / 10.0) |
|
|
| |
| |
| pue_score = max(0.0, 1.0 - (avg_pue - 1.1) / 0.4) |
|
|
| if self.task_id == ThermalGridTaskID.BASELINE: |
| |
| return float(np.clip(0.6 * pue_score + 0.4 * safety_score, 0.0, 1.0)) |
|
|
| elif self.task_id == ThermalGridTaskID.LOAD_SHIFT: |
| |
| |
| |
| reward_score = max(0.0, avg_reward) |
| return float(np.clip(0.5 * reward_score + 0.3 * pue_score + 0.2 * safety_score, 0.0, 1.0)) |
|
|
| elif self.task_id == ThermalGridTaskID.GRID_STRESS: |
| |
| |
| reward_score = max(0.0, avg_reward) |
| |
| hard_safety = safety_score if max_temp_ever < 85.0 else 0.0 |
| return float(np.clip(0.7 * reward_score + 0.3 * hard_safety, 0.0, 1.0)) |
|
|
| def get_rubric_scores(self) -> dict: |
| """ |
| Returns average per-rubric scores over the episode. |
| Each score is in [0, 1] (higher is better). |
| """ |
| if not self.history: |
| return { |
| "energy": 0.0, "safety": 0.0, |
| "workload": 0.0, "compliance": 1.0, |
| } |
| avg_energy = float(np.mean([h["sub_rewards"].get("energy_cost_score", 0.0) for h in self.history])) |
| avg_safety = float(np.mean([h["sub_rewards"].get("thermal_safety_score", 0.0) for h in self.history])) |
| avg_workload = float(np.mean([h["sub_rewards"].get("workload_throughput_score", 0.0) for h in self.history])) |
| avg_compliance = float(np.mean([h["sub_rewards"].get("compliance_score", 1.0) for h in self.history])) |
| return { |
| "energy": avg_energy, |
| "safety": avg_safety, |
| "workload": avg_workload, |
| "compliance": avg_compliance, |
| } |
|
|
|
|
| def _run_grading_episodes(env, agent, task_id: str, num_episodes: int = 3, max_steps: int = 24) -> float: |
| """Helper to run multiple evaluation episodes and return an average score in [0, 1].""" |
| grader = ThermalGridGrader(task_id=task_id) |
| for _ in range(num_episodes): |
| observation = env.reset(task_id=task_id) |
| for _ in range(max_steps): |
| |
| obs_vec = np.concatenate([ |
| np.array(observation.inlet_temps_c, dtype=np.float32), |
| np.array(observation.mean_cpu_temps_c, dtype=np.float32), |
| np.array(observation.max_cpu_temps_c, dtype=np.float32), |
| np.array(observation.max_gpu_temps_c, dtype=np.float32), |
| np.array([observation.thermal_mass_lag_c_per_min], dtype=np.float32), |
| np.array(observation.rack_powers_w, dtype=np.float32) / 10000.0, |
| np.array(observation.rack_utilisation, dtype=np.float32), |
| np.array([observation.live_traffic_load_w], dtype=np.float32) / 10000.0, |
| np.array([observation.pue], dtype=np.float32), |
| np.array([observation.ambient_temp_c], dtype=np.float32), |
| np.array([observation.energy_price_per_kwh], dtype=np.float32), |
| np.array([observation.demand_response_signal], dtype=np.float32), |
| np.array([observation.off_peak_window], dtype=np.float32), |
| ]).astype(np.float32) |
|
|
| |
| action_data = agent.predict(obs_vec) |
| |
| |
| action_vec = action_data[0] if isinstance(action_data, tuple) else action_data |
| |
| |
| try: |
| from ..models import ThermalGridRlAgentAction |
| except (ImportError, ValueError): |
| try: |
| from models import ThermalGridRlAgentAction |
| except ImportError: |
| from thermal_grid_rl_agent.models import ThermalGridRlAgentAction |
| tg_action = ThermalGridRlAgentAction( |
| crac_setpoint_c=12.0 + action_vec[0] * 15.0, |
| fan_speeds_pct=[20.0 + action_vec[1] * 80.0] * 10, |
| num_active_chillers=int(1 + action_vec[2] * 3), |
| batch_job_schedule=list(range(int(action_vec[3] * env._batch_queue.pending_count))), |
| workload_matrix=[[0.5] * 8] * 10, |
| power_caps_w=[[50.0 + action_vec[4] * 450.0] * 8] * 10, |
| region_traffic_weights=[0.5, 0.5] |
| ) |
|
|
| observation = env.step(tg_action) |
| grader.log_thermal_grid_step(observation, getattr(observation, 'reward', 0.0)) |
| |
| if getattr(observation, 'done', False): |
| break |
| return grader.get_thermal_grid_score() |
|
|
| def grade_baseline(env, agent) -> float: |
| return _run_grading_episodes(env, agent, "baseline") |
|
|
| def grade_load_shift(env, agent) -> float: |
| return _run_grading_episodes(env, agent, "load_shift") |
|
|
| def grade_grid_stress(env, agent) -> float: |
| return _run_grading_episodes(env, agent, "grid_stress") |
|
|