Controller / utilities /rollout.py
Gen-HVAC's picture
Upload 6 files
ba7b0bc verified
raw
history blame
7.03 kB
# unihvac/rollout.py
from __future__ import annotations
from typing import Callable, Dict, Any, Optional, List, Tuple
import gymnasium as gym
import numpy as np
import pandas as pd
import sinergym
from unihvac.comfort import (
fix_ashrae_any_fixed,
quick_stats,
add_feature_availability_and_registry,
print_feature_availability,
compute_comfort_metrics_inplace,
)
ZONE_TEMP_KEYS = ["core_temp", "perim1_temp", "perim2_temp", "perim3_temp", "perim4_temp"]
ZONE_OCC_KEYS = ["core_occ_count","perim1_occ_count","perim2_occ_count","perim3_occ_count","perim4_occ_count"]
RH_KEYS = ["core_rh","perim1_rh","perim2_rh","perim3_rh","perim4_rh"]
BASE_FEATURE_COLS = [
"outdoor_temp","core_temp","perim1_temp","perim2_temp","perim3_temp","perim4_temp",
"elec_power",
"core_occ_count","perim1_occ_count","perim2_occ_count","perim3_occ_count","perim4_occ_count",
]
NEW_FEATURE_COLS = [
"outdoor_dewpoint","outdoor_wetbulb",
"core_rh","perim1_rh","perim2_rh","perim3_rh","perim4_rh",
"core_ash55_notcomfortable_summer","core_ash55_notcomfortable_winter","core_ash55_notcomfortable_any",
"p1_ash55_notcomfortable_any","p2_ash55_notcomfortable_any","p3_ash55_notcomfortable_any","p4_ash55_notcomfortable_any",
]
ASH_COLS = [
"core_ash55_notcomfortable_summer",
"core_ash55_notcomfortable_winter",
"core_ash55_any_fixed",
"p1_ash55_notcomfortable_any",
"p2_ash55_notcomfortable_any",
"p3_ash55_notcomfortable_any",
"p4_ash55_notcomfortable_any",
]
PolicyFn = Callable[[np.ndarray, Dict[str, Any], int], np.ndarray]
class DummyReward:
def __init__(self, *args, **kwargs):
pass
def __call__(self, obs_dict):
return 0.0, {}
def make_env_officesmall_5zone(
building_path: str,
weather_path: str,
variables: Dict[str, tuple],
actuators: Dict[str, tuple],
action_low: float = 12.0,
action_high: float = 30.0,
action_dim: int = 10,
reward=None,
):
new_action_space = gym.spaces.Box(
low=action_low, high=action_high, shape=(action_dim,), dtype=np.float32
)
if reward is None:
reward = DummyReward
env = gym.make(
"Eplus-5zone-mixed-continuous-stochastic-v1",
building_file=building_path,
weather_files=[weather_path],
variables=variables,
actuators=actuators,
action_space=new_action_space,
reward=reward,
)
obs_keys = env.unwrapped.observation_variables
print("ENVIRONMENT VARIABLES:", obs_keys)
obs_keys = env.unwrapped.observation_variables
month_idx = obs_keys.index("month") if "month" in obs_keys else None
return env, obs_keys, month_idx
def rollout_episode(
env,
policy_fn: PolicyFn,
obs_keys: List[str],
month_idx: Optional[int],
max_steps: Optional[int] = None,
) -> pd.DataFrame:
obs, info = env.reset()
data_log = []
terminated = False
truncated = False
step = 0
while not (terminated or truncated):
if max_steps is not None and step >= max_steps:
break
action = policy_fn(obs, info, step)
htg_sp = float(action[0])
clg_sp = float(action[1])
next_obs, _, terminated, truncated, info = env.step(action)
month_val = next_obs[month_idx] if month_idx is not None else info.get("month", np.nan)
row = {"step": step, "month": month_val}
row["setpoint_htg"] = htg_sp
row["setpoint_clg"] = clg_sp
row.update(dict(zip(obs_keys, next_obs)))
data_log.append(row)
obs = next_obs
step += 1
df = pd.DataFrame(data_log)
if "month" in df.columns:
df["month"] = df["month"].round().astype(int)
return df
def add_energy_columns_inplace(
df: pd.DataFrame,
timestep_hours: float,
elec_col: str = "elec_power",
) -> None:
if elec_col in df.columns:
df["elec_power_kw"] = df[elec_col] / 1000.0
df["elec_energy_kwh"] = df["elec_power_kw"] * timestep_hours
else:
df["elec_power_kw"] = np.nan
df["elec_energy_kwh"] = np.nan
def postprocess_comfort_inplace(
df: pd.DataFrame,
location: str,
timestep_hours: float,
heating_sp: float,
cooling_sp: float,
verbose: bool = True,
) -> None:
fix_ashrae_any_fixed(df)
if verbose:
quick_stats(df, ASH_COLS, "ASHRAE55 Not Comfortable (raw timestep values)")
add_feature_availability_and_registry(df, BASE_FEATURE_COLS, NEW_FEATURE_COLS)
if verbose:
print_feature_availability(df, location)
compute_comfort_metrics_inplace(
df=df,
location=location,
time_step_hours=timestep_hours,
heating_sp=heating_sp,
cooling_sp=cooling_sp,
zone_temp_keys=ZONE_TEMP_KEYS,
zone_occ_keys=ZONE_OCC_KEYS,
rh_keys=RH_KEYS,
)
def run_rollout_to_df(
*,
building_path: str,
weather_path: str,
variables: Dict[str, tuple],
actuators: Dict[str, tuple],
policy_fn: PolicyFn,
location: str,
timestep_hours: float,
heating_sp: float,
cooling_sp: float,
reward=None,
max_steps: Optional[int] = None,
verbose: bool = True,
) -> pd.DataFrame:
env = None
try:
env, obs_keys, month_idx = make_env_officesmall_5zone(
building_path=building_path,
weather_path=weather_path,
variables=variables,
actuators=actuators,
reward=reward,
)
df = rollout_episode(
env=env,
policy_fn=policy_fn,
obs_keys=list(obs_keys),
month_idx=month_idx,
max_steps=max_steps,
)
finally:
if env is not None:
env.close()
add_energy_columns_inplace(df, timestep_hours=timestep_hours)
postprocess_comfort_inplace(
df=df,
location=location,
timestep_hours=timestep_hours,
heating_sp=heating_sp,
cooling_sp=cooling_sp,
verbose=verbose,
)
return df
# ======================================================================================
# INDEX MAPPING (Sinergym / OfficeSmall 5-Zone)
#
# 00: month
# 01: day_of_month
# 02: hour
# 03: outdoor_temp
# 04: core_temp
# 05: perim1_temp
# 06: perim2_temp
# 07: perim3_temp
# 08: perim4_temp
# 09: elec_power
# 10: core_occ_count
# 11: perim1_occ_count
# 12: perim2_occ_count
# 13: perim3_occ_count
# 14: perim4_occ_count
# 15: outdoor_dewpoint
# 16: outdoor_wetbulb
# 17: core_rh
# 18: perim1_rh
# 19: perim2_rh
# 20: perim3_rh
# 21: perim4_rh
# 22: core_ash55_notcomfortable_summer
# 23: core_ash55_notcomfortable_winter
# 24: core_ash55_notcomfortable_any
# 25: p1_ash55_notcomfortable_any
# 26: p2_ash55_notcomfortable_any
# 27: p3_ash55_notcomfortable_any
# 28: p4_ash55_notcomfortable_any
# 29: total_electricity_HVAC
#
#
# ======================================================================================