# unihvac/rollout.py from __future__ import annotations from typing import Callable, Dict, Any, Optional, List, Tuple import gymnasium as gym import numpy as np import pandas as pd import sinergym from unihvac.comfort import ( fix_ashrae_any_fixed, quick_stats, add_feature_availability_and_registry, print_feature_availability, compute_comfort_metrics_inplace, ) ZONE_TEMP_KEYS = ["core_temp", "perim1_temp", "perim2_temp", "perim3_temp", "perim4_temp"] ZONE_OCC_KEYS = ["core_occ_count","perim1_occ_count","perim2_occ_count","perim3_occ_count","perim4_occ_count"] RH_KEYS = ["core_rh","perim1_rh","perim2_rh","perim3_rh","perim4_rh"] BASE_FEATURE_COLS = [ "outdoor_temp","core_temp","perim1_temp","perim2_temp","perim3_temp","perim4_temp", "elec_power", "core_occ_count","perim1_occ_count","perim2_occ_count","perim3_occ_count","perim4_occ_count", ] NEW_FEATURE_COLS = [ "outdoor_dewpoint","outdoor_wetbulb", "core_rh","perim1_rh","perim2_rh","perim3_rh","perim4_rh", "core_ash55_notcomfortable_summer","core_ash55_notcomfortable_winter","core_ash55_notcomfortable_any", "p1_ash55_notcomfortable_any","p2_ash55_notcomfortable_any","p3_ash55_notcomfortable_any","p4_ash55_notcomfortable_any", ] ASH_COLS = [ "core_ash55_notcomfortable_summer", "core_ash55_notcomfortable_winter", "core_ash55_any_fixed", "p1_ash55_notcomfortable_any", "p2_ash55_notcomfortable_any", "p3_ash55_notcomfortable_any", "p4_ash55_notcomfortable_any", ] PolicyFn = Callable[[np.ndarray, Dict[str, Any], int], np.ndarray] class DummyReward: def __init__(self, *args, **kwargs): pass def __call__(self, obs_dict): return 0.0, {} def make_env_officesmall_5zone( building_path: str, weather_path: str, variables: Dict[str, tuple], actuators: Dict[str, tuple], action_low: float = 12.0, action_high: float = 30.0, action_dim: int = 10, reward=None, ): new_action_space = gym.spaces.Box( low=action_low, high=action_high, shape=(action_dim,), dtype=np.float32 ) if reward is None: reward = DummyReward env = gym.make( "Eplus-5zone-mixed-continuous-stochastic-v1", building_file=building_path, weather_files=[weather_path], variables=variables, actuators=actuators, action_space=new_action_space, reward=reward, ) obs_keys = env.unwrapped.observation_variables print("ENVIRONMENT VARIABLES:", obs_keys) obs_keys = env.unwrapped.observation_variables month_idx = obs_keys.index("month") if "month" in obs_keys else None return env, obs_keys, month_idx def rollout_episode( env, policy_fn: PolicyFn, obs_keys: List[str], month_idx: Optional[int], max_steps: Optional[int] = None, ) -> pd.DataFrame: obs, info = env.reset() data_log = [] terminated = False truncated = False step = 0 while not (terminated or truncated): if max_steps is not None and step >= max_steps: break action = policy_fn(obs, info, step) htg_sp = float(action[0]) clg_sp = float(action[1]) next_obs, _, terminated, truncated, info = env.step(action) month_val = next_obs[month_idx] if month_idx is not None else info.get("month", np.nan) row = {"step": step, "month": month_val} row["setpoint_htg"] = htg_sp row["setpoint_clg"] = clg_sp row.update(dict(zip(obs_keys, next_obs))) data_log.append(row) obs = next_obs step += 1 df = pd.DataFrame(data_log) if "month" in df.columns: df["month"] = df["month"].round().astype(int) return df def add_energy_columns_inplace( df: pd.DataFrame, timestep_hours: float, elec_col: str = "elec_power", ) -> None: if elec_col in df.columns: df["elec_power_kw"] = df[elec_col] / 1000.0 df["elec_energy_kwh"] = df["elec_power_kw"] * timestep_hours else: df["elec_power_kw"] = np.nan df["elec_energy_kwh"] = np.nan def postprocess_comfort_inplace( df: pd.DataFrame, location: str, timestep_hours: float, heating_sp: float, cooling_sp: float, verbose: bool = True, ) -> None: fix_ashrae_any_fixed(df) if verbose: quick_stats(df, ASH_COLS, "ASHRAE55 Not Comfortable (raw timestep values)") add_feature_availability_and_registry(df, BASE_FEATURE_COLS, NEW_FEATURE_COLS) if verbose: print_feature_availability(df, location) compute_comfort_metrics_inplace( df=df, location=location, time_step_hours=timestep_hours, heating_sp=heating_sp, cooling_sp=cooling_sp, zone_temp_keys=ZONE_TEMP_KEYS, zone_occ_keys=ZONE_OCC_KEYS, rh_keys=RH_KEYS, ) def run_rollout_to_df( *, building_path: str, weather_path: str, variables: Dict[str, tuple], actuators: Dict[str, tuple], policy_fn: PolicyFn, location: str, timestep_hours: float, heating_sp: float, cooling_sp: float, reward=None, max_steps: Optional[int] = None, verbose: bool = True, ) -> pd.DataFrame: env = None try: env, obs_keys, month_idx = make_env_officesmall_5zone( building_path=building_path, weather_path=weather_path, variables=variables, actuators=actuators, reward=reward, ) df = rollout_episode( env=env, policy_fn=policy_fn, obs_keys=list(obs_keys), month_idx=month_idx, max_steps=max_steps, ) finally: if env is not None: env.close() add_energy_columns_inplace(df, timestep_hours=timestep_hours) postprocess_comfort_inplace( df=df, location=location, timestep_hours=timestep_hours, heating_sp=heating_sp, cooling_sp=cooling_sp, verbose=verbose, ) return df # ====================================================================================== # INDEX MAPPING (Sinergym / OfficeSmall 5-Zone) # # 00: month # 01: day_of_month # 02: hour # 03: outdoor_temp # 04: core_temp # 05: perim1_temp # 06: perim2_temp # 07: perim3_temp # 08: perim4_temp # 09: elec_power # 10: core_occ_count # 11: perim1_occ_count # 12: perim2_occ_count # 13: perim3_occ_count # 14: perim4_occ_count # 15: outdoor_dewpoint # 16: outdoor_wetbulb # 17: core_rh # 18: perim1_rh # 19: perim2_rh # 20: perim3_rh # 21: perim4_rh # 22: core_ash55_notcomfortable_summer # 23: core_ash55_notcomfortable_winter # 24: core_ash55_notcomfortable_any # 25: p1_ash55_notcomfortable_any # 26: p2_ash55_notcomfortable_any # 27: p3_ash55_notcomfortable_any # 28: p4_ash55_notcomfortable_any # 29: total_electricity_HVAC # # # ======================================================================================