| |
| from __future__ import annotations |
|
|
| from typing import Callable, Dict, Any, Optional, List, Tuple |
|
|
| import gymnasium as gym |
| import numpy as np |
| import pandas as pd |
| import sinergym |
|
|
| from unihvac.comfort import ( |
| fix_ashrae_any_fixed, |
| quick_stats, |
| add_feature_availability_and_registry, |
| print_feature_availability, |
| compute_comfort_metrics_inplace, |
| ) |
|
|
|
|
| ZONE_TEMP_KEYS = ["core_temp", "perim1_temp", "perim2_temp", "perim3_temp", "perim4_temp"] |
| ZONE_OCC_KEYS = ["core_occ_count","perim1_occ_count","perim2_occ_count","perim3_occ_count","perim4_occ_count"] |
| RH_KEYS = ["core_rh","perim1_rh","perim2_rh","perim3_rh","perim4_rh"] |
|
|
| BASE_FEATURE_COLS = [ |
| "outdoor_temp","core_temp","perim1_temp","perim2_temp","perim3_temp","perim4_temp", |
| "elec_power", |
| "core_occ_count","perim1_occ_count","perim2_occ_count","perim3_occ_count","perim4_occ_count", |
| ] |
| NEW_FEATURE_COLS = [ |
| "outdoor_dewpoint","outdoor_wetbulb", |
| "core_rh","perim1_rh","perim2_rh","perim3_rh","perim4_rh", |
| "core_ash55_notcomfortable_summer","core_ash55_notcomfortable_winter","core_ash55_notcomfortable_any", |
| "p1_ash55_notcomfortable_any","p2_ash55_notcomfortable_any","p3_ash55_notcomfortable_any","p4_ash55_notcomfortable_any", |
| ] |
|
|
| ASH_COLS = [ |
| "core_ash55_notcomfortable_summer", |
| "core_ash55_notcomfortable_winter", |
| "core_ash55_any_fixed", |
| "p1_ash55_notcomfortable_any", |
| "p2_ash55_notcomfortable_any", |
| "p3_ash55_notcomfortable_any", |
| "p4_ash55_notcomfortable_any", |
| ] |
|
|
| PolicyFn = Callable[[np.ndarray, Dict[str, Any], int], np.ndarray] |
|
|
|
|
| class DummyReward: |
| def __init__(self, *args, **kwargs): |
| pass |
|
|
| def __call__(self, obs_dict): |
| return 0.0, {} |
|
|
|
|
| def make_env_officesmall_5zone( |
| building_path: str, |
| weather_path: str, |
| variables: Dict[str, tuple], |
| actuators: Dict[str, tuple], |
| action_low: float = 12.0, |
| action_high: float = 30.0, |
| action_dim: int = 10, |
| reward=None, |
| ): |
| |
| new_action_space = gym.spaces.Box( |
| low=action_low, high=action_high, shape=(action_dim,), dtype=np.float32 |
| ) |
|
|
| if reward is None: |
| reward = DummyReward |
| env = gym.make( |
| "Eplus-5zone-mixed-continuous-stochastic-v1", |
| building_file=building_path, |
| weather_files=[weather_path], |
| variables=variables, |
| actuators=actuators, |
| action_space=new_action_space, |
| reward=reward, |
| ) |
| obs_keys = env.unwrapped.observation_variables |
| print("ENVIRONMENT VARIABLES:", obs_keys) |
|
|
| obs_keys = env.unwrapped.observation_variables |
| month_idx = obs_keys.index("month") if "month" in obs_keys else None |
| return env, obs_keys, month_idx |
|
|
|
|
| def rollout_episode( |
| env, |
| policy_fn: PolicyFn, |
| obs_keys: List[str], |
| month_idx: Optional[int], |
| max_steps: Optional[int] = None, |
| ) -> pd.DataFrame: |
| |
| obs, info = env.reset() |
| data_log = [] |
|
|
| terminated = False |
| truncated = False |
| step = 0 |
|
|
| while not (terminated or truncated): |
| if max_steps is not None and step >= max_steps: |
| break |
|
|
| action = policy_fn(obs, info, step) |
| htg_sp = float(action[0]) |
| clg_sp = float(action[1]) |
| next_obs, _, terminated, truncated, info = env.step(action) |
|
|
| month_val = next_obs[month_idx] if month_idx is not None else info.get("month", np.nan) |
|
|
| row = {"step": step, "month": month_val} |
| row["setpoint_htg"] = htg_sp |
| row["setpoint_clg"] = clg_sp |
|
|
| row.update(dict(zip(obs_keys, next_obs))) |
| data_log.append(row) |
|
|
|
|
| obs = next_obs |
| step += 1 |
|
|
| df = pd.DataFrame(data_log) |
| if "month" in df.columns: |
| df["month"] = df["month"].round().astype(int) |
| return df |
|
|
|
|
| def add_energy_columns_inplace( |
| df: pd.DataFrame, |
| timestep_hours: float, |
| elec_col: str = "elec_power", |
| ) -> None: |
| if elec_col in df.columns: |
| df["elec_power_kw"] = df[elec_col] / 1000.0 |
| df["elec_energy_kwh"] = df["elec_power_kw"] * timestep_hours |
| else: |
| df["elec_power_kw"] = np.nan |
| df["elec_energy_kwh"] = np.nan |
|
|
|
|
| def postprocess_comfort_inplace( |
| df: pd.DataFrame, |
| location: str, |
| timestep_hours: float, |
| heating_sp: float, |
| cooling_sp: float, |
| verbose: bool = True, |
| ) -> None: |
| |
| |
| fix_ashrae_any_fixed(df) |
| if verbose: |
| quick_stats(df, ASH_COLS, "ASHRAE55 Not Comfortable (raw timestep values)") |
| add_feature_availability_and_registry(df, BASE_FEATURE_COLS, NEW_FEATURE_COLS) |
| if verbose: |
| print_feature_availability(df, location) |
| compute_comfort_metrics_inplace( |
| df=df, |
| location=location, |
| time_step_hours=timestep_hours, |
| heating_sp=heating_sp, |
| cooling_sp=cooling_sp, |
| zone_temp_keys=ZONE_TEMP_KEYS, |
| zone_occ_keys=ZONE_OCC_KEYS, |
| rh_keys=RH_KEYS, |
| ) |
|
|
|
|
| def run_rollout_to_df( |
| *, |
| building_path: str, |
| weather_path: str, |
| variables: Dict[str, tuple], |
| actuators: Dict[str, tuple], |
| policy_fn: PolicyFn, |
| location: str, |
| timestep_hours: float, |
| heating_sp: float, |
| cooling_sp: float, |
| reward=None, |
| max_steps: Optional[int] = None, |
| verbose: bool = True, |
| ) -> pd.DataFrame: |
| |
| env = None |
| try: |
| env, obs_keys, month_idx = make_env_officesmall_5zone( |
| building_path=building_path, |
| weather_path=weather_path, |
| variables=variables, |
| actuators=actuators, |
| reward=reward, |
| ) |
|
|
| df = rollout_episode( |
| env=env, |
| policy_fn=policy_fn, |
| obs_keys=list(obs_keys), |
| month_idx=month_idx, |
| max_steps=max_steps, |
| ) |
| finally: |
| if env is not None: |
| env.close() |
|
|
| add_energy_columns_inplace(df, timestep_hours=timestep_hours) |
| postprocess_comfort_inplace( |
| df=df, |
| location=location, |
| timestep_hours=timestep_hours, |
| heating_sp=heating_sp, |
| cooling_sp=cooling_sp, |
| verbose=verbose, |
| ) |
| return df |
|
|
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|