# unihvac/policy.py from __future__ import annotations import os import json from typing import Any, Dict, Tuple import numpy as np import torch import torch.nn.functional as F import requests import numpy as np import json import requests import numpy as np class RemoteHTTPPolicy: def __init__(self, server_url: str = "http://host.docker.internal:8000"): self.server_url = server_url self.predict_endpoint = f"{server_url}/predict" self.reset_endpoint = f"{server_url}/reset" print(f"[RemotePolicy] Connecting to {self.server_url}...") def reset(self): try: requests.post(self.reset_endpoint, json={"message": "reset"}) print("[RemotePolicy] Remote buffer reset.") except Exception as e: print(f"[RemotePolicy] Reset failed: {e}") def act(self, obs, info, step): obs_list = np.array(obs, dtype=np.float32).tolist() payload = {"step": int(step), "obs": obs_list, "info": {}} try: resp = requests.post(self.predict_endpoint, json=payload) resp.raise_for_status() action = np.array(resp.json()["action"], dtype=np.float32) return action, {}, {} except Exception as e: print(f"[RemotePolicy] Error: {e}") return np.array([21.0, 24.0] * 5, dtype=np.float32), {}, {} def _get_int_env(name: str, default: int) -> int: try: v = int(os.environ.get(name, str(default))) return v except Exception: return default def _get_bool_env(name: str, default: bool) -> bool: v = os.environ.get(name, None) if v is None: return default return v.strip().lower() in ("1", "true", "yes", "y", "on") # -------------------------------------------------------------------------------------- # Policies # -------------------------------------------------------------------------------------- class ConstantSetpointPolicy5Zone: """ Constant rule-based controller: 5 zones × (htg, clg) each. Returns action = [htg, clg] * 5. """ def __init__(self, heating_sp: float = 21.0, cooling_sp: float = 24.0): self.heating_sp = float(heating_sp) self.cooling_sp = float(cooling_sp) self.action = np.array([self.heating_sp, self.cooling_sp] * 5, dtype=np.float32) def reset(self): return def act(self, obs, info, step): return self.action.copy(), {}, {} class DecisionTransformerPolicy5Zone: """ CPU-safe DT policy with robust observation mapping and deadband protection. """ def __init__( self, ckpt_path: str, model_config_path: str, norm_stats_path: str, context_len: int, max_tokens_per_step: int, device: str = "cpu", temperature: float = 0.5, ): import dataloader as dl from embeddings import GeneralistComfortDT # --- 1. CPU Settings --- torch.set_grad_enabled(False) torch.backends.mha.set_fastpath_enabled(True) torch.backends.mkldnn.enabled = _get_bool_env("DT_MKLDNN", True) import multiprocessing avail = multiprocessing.cpu_count() dt_threads = _get_int_env("DT_NUM_THREADS", min(18, avail)) torch.set_num_threads(dt_threads) torch.set_num_interop_threads(1) self.dl = dl self.device = torch.device("cpu") self.temperature = float(temperature) # --- 2. Load Model --- with open(model_config_path, "r") as f: cfg = json.load(f) cfg["CONTEXT_LEN"] = int(context_len) self.L = int(context_len) self.K = int(max_tokens_per_step) self.model = GeneralistComfortDT(cfg).to(self.device) ckpt = torch.load(ckpt_path, map_location="cpu") self.model.load_state_dict(ckpt["model"], strict=True) self.model.eval() # --- 3. Load Stats --- z = np.load(norm_stats_path) self.obs_mean = z["obs_mean"].astype(np.float32) self.obs_std = z["obs_std"].astype(np.float32) self.act_mean = z["act_mean"].astype(np.float32) self.act_std = z["act_std"].astype(np.float32) self.max_return = float(z["max_return"][0]) if "max_return" in z else 1.0 self.rtg_scale_mode = "max_return" self.rtg_constant_div = 1.0 self.desired_rtg_raw = -0.5 self.prev_action = np.array([21.0, 24.0] * 5, dtype=np.float32) # --- 4. Define Keys (The Fix) --- self.env_keys_order = [ 'month', 'day_of_month', 'hour', 'outdoor_temp', 'core_temp', 'perim1_temp', 'perim2_temp', 'perim3_temp', 'perim4_temp', 'elec_power', 'core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count', 'outdoor_dewpoint', 'outdoor_wetbulb', 'core_rh', 'perim1_rh', 'perim2_rh', 'perim3_rh', 'perim4_rh', 'core_ash55_notcomfortable_summer', 'core_ash55_notcomfortable_winter', 'core_ash55_notcomfortable_any', 'p1_ash55_notcomfortable_any', 'p2_ash55_notcomfortable_any', 'p3_ash55_notcomfortable_any', 'p4_ash55_notcomfortable_any', 'total_electricity_HVAC' ] self.model_state_keys = [ 'outdoor_temp', 'core_temp', 'perim1_temp', 'perim2_temp', 'perim3_temp', 'perim4_temp', 'elec_power', 'core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count', 'outdoor_dewpoint', 'outdoor_wetbulb', 'core_rh', 'perim1_rh', 'perim2_rh', 'perim3_rh', 'perim4_rh', 'core_ash55_notcomfortable_summer', 'core_ash55_notcomfortable_winter', 'core_ash55_notcomfortable_any', 'p1_ash55_notcomfortable_any', 'p2_ash55_notcomfortable_any', 'p3_ash55_notcomfortable_any', 'p4_ash55_notcomfortable_any', 'month', 'hour' ] self.obs_indices = [] for k in self.model_state_keys: try: self.obs_indices.append(self.env_keys_order.index(k)) except ValueError: print(f"Key {k} missing") self.obs_indices.append(0) # Fallback self.obs_indices = np.array(self.obs_indices, dtype=np.int64) self.action_keys = [ "htg_core", "clg_core", "htg_p1", "clg_p1", "htg_p2", "clg_p2", "htg_p3", "clg_p3", "htg_p4", "clg_p4", ] # Meta info self.s_meta = [self.dl.parse_feature_identity(k, is_action=False) for k in self.model_state_keys] self.a_meta = [self.dl.parse_feature_identity(k, is_action=True) for k in self.action_keys] self.num_act = min(len(self.a_meta), self.K) self.num_state = min(len(self.s_meta), self.K - self.num_act) # --- 5. Precompute Token Layouts --- self.row_feat_ids = np.zeros((self.K,), dtype=np.int64) self.row_zone_ids = np.zeros((self.K,), dtype=np.int64) self.row_attn = np.zeros((self.K,), dtype=np.int64) self.row_feat_vals = np.zeros((self.K,), dtype=np.float32) if self.num_state > 0: s_meta = self.s_meta[:self.num_state] self.row_feat_ids[:self.num_state] = np.array([m[0] for m in s_meta], dtype=np.int64) self.row_zone_ids[:self.num_state] = np.array([m[1] for m in s_meta], dtype=np.int64) self.row_attn[:self.num_state] = 1 if self.num_act > 0: start = self.num_state end = start + self.num_act a_meta = self.a_meta[:self.num_act] self.row_feat_ids[start:end] = np.array([m[0] for m in a_meta], dtype=np.int64) self.row_zone_ids[start:end] = np.array([m[1] for m in a_meta], dtype=np.int64) self.row_attn[start:end] = 1 # Context Dimension from Config self.context_dim = cfg.get("CONTEXT_DIM", 10) # Buffers self.buf_feature_ids = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device) self.buf_feature_vals = torch.zeros((self.L, self.K), dtype=torch.float32, device=self.device) self.buf_zone_ids = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device) self.buf_attn = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device) self.buf_rtg = torch.zeros((self.L,), dtype=torch.float32, device=self.device) # Inputs self.t_feature_ids = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device) self.t_feature_vals = torch.zeros((1, self.L, self.K), dtype=torch.float32, device=self.device) self.t_zone_ids = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device) self.t_attn = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device) self.t_rtg = torch.zeros((1, self.L), dtype=torch.float32, device=self.device) self.ptr = 0 self.filled = 0 #Context Buffer self.t_context = torch.zeros((1, self.context_dim), dtype=torch.float32, device=self.device) def reset(self): self.buf_feature_ids.zero_() self.buf_feature_vals.zero_() self.buf_zone_ids.zero_() self.buf_attn.zero_() self.buf_rtg.zero_() self.t_feature_ids.zero_() self.t_feature_vals.zero_() self.t_zone_ids.zero_() self.t_attn.zero_() self.t_rtg.zero_() self.prev_action = np.array([21.0, 24.0] * 5, dtype=np.float32) self.ptr = 0 self.filled = 0 def _decode_bin_to_setpoint(self, bin_id: int, key: str) -> float: if "clg" in key.lower() or "cool" in key.lower(): lo, hi = self.dl.CLG_LOW, self.dl.CLG_HIGH else: lo, hi = self.dl.HTG_LOW, self.dl.HTG_HIGH x = float(bin_id) / float(self.dl.NUM_ACTION_BINS - 1) return lo + x * (hi - lo) def _scale_rtg(self, rtg_raw: float) -> float: if self.rtg_scale_mode == "max_return": scale = max(self.max_return, 1e-6) return float(rtg_raw) / scale return float(rtg_raw) / float(self.rtg_constant_div) def _write_model_inputs_from_ring(self): if self.filled < self.L: start = self.L - self.filled self.t_feature_ids.zero_(); self.t_feature_vals.zero_() self.t_zone_ids.zero_(); self.t_attn.zero_(); self.t_rtg.zero_() self.t_feature_ids[0, start:].copy_(self.buf_feature_ids[: self.filled]) self.t_feature_vals[0, start:].copy_(self.buf_feature_vals[: self.filled]) self.t_zone_ids[0, start:].copy_(self.buf_zone_ids[: self.filled]) self.t_attn[0, start:].copy_(self.buf_attn[: self.filled]) self.t_rtg[0, start:].copy_(self.buf_rtg[: self.filled]) return p = self.ptr n1 = self.L - p self.t_feature_ids[0, :n1].copy_(self.buf_feature_ids[p:]) self.t_feature_vals[0, :n1].copy_(self.buf_feature_vals[p:]) self.t_zone_ids[0, :n1].copy_(self.buf_zone_ids[p:]) self.t_attn[0, :n1].copy_(self.buf_attn[p:]) self.t_rtg[0, :n1].copy_(self.buf_rtg[p:]) self.t_feature_ids[0, n1:].copy_(self.buf_feature_ids[:p]) self.t_feature_vals[0, n1:].copy_(self.buf_feature_vals[:p]) self.t_zone_ids[0, n1:].copy_(self.buf_zone_ids[:p]) self.t_attn[0, n1:].copy_(self.buf_attn[:p]) self.t_rtg[0, n1:].copy_(self.buf_rtg[:p]) def act(self, obs: Any, info: Dict[str, Any], step: int) -> Tuple[np.ndarray, Dict, Dict]: # Map raw obs (30 items) model obs (28 items) obs_raw = np.asarray(obs, dtype=np.float32) env_map = dict(zip(self.env_keys_order, obs_raw)) obs_ordered = np.array([env_map.get(k, 0.0) for k in self.model_state_keys], dtype=np.float32) # --- 2. Normalization --- obs_norm = obs_ordered.copy() D = min(len(self.obs_mean), obs_norm.shape[0]) eps = 1e-6 obs_norm[:D] = (obs_norm[:D] - self.obs_mean[:D]) / (self.obs_std[:D] + eps) # ========================================================================= # 3. CALCULATE CONTEXT VECTOR (Dynamic) # ========================================================================= out_temp = env_map.get('outdoor_temp', 0.0) out_dew = env_map.get('outdoor_dewpoint', 0.0) hour = env_map.get('hour', 0.0) month = env_map.get('month', 1.0) occ_total = 0.0 occ_keys = ['core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count'] for k in occ_keys: if env_map.get(k, 0.0) > 0.5: # Binary occupancy check occ_total += 1.0 occ_frac = occ_total / 5.0 hr_sin = np.sin(2 * np.pi * hour / 24.0) hr_cos = np.cos(2 * np.pi * hour / 24.0) mth_norm = month - 1.0 mth_sin = np.sin(2 * np.pi * mth_norm / 12.0) mth_cos = np.cos(2 * np.pi * mth_norm / 12.0) ctx_vec = np.array([ out_temp, 0.0, # Temp Mean, Temp Std out_dew, # Dewpoint occ_frac, # Occ Fraction hr_sin, hr_cos, # Hour mth_sin, mth_cos, # Month 0.0, 0.0 # Spares ], dtype=np.float32) self.t_context[0].copy_(torch.from_numpy(ctx_vec)) act_norm = self.prev_action.copy() A = min(len(self.act_mean), act_norm.shape[0]) act_norm[:A] = (act_norm[:A] - self.act_mean[:A]) / self.act_std[:A] self.row_feat_vals.fill(0.0) if self.num_state > 0: self.row_feat_vals[: self.num_state] = obs_norm[: self.num_state] if self.num_act > 0: s, e = self.num_state, self.num_state + self.num_act if step < 5: good_action = np.array([22.0, 25.0] * 5, dtype=np.float32) good_norm = good_action.copy() A_len = min(len(self.act_mean), good_norm.shape[0]) good_norm[:A_len] = (good_norm[:A_len] - self.act_mean[:A_len]) / self.act_std[:A_len] self.row_feat_vals[s:e] = good_norm[: self.num_act] else: self.row_feat_vals[s:e] = act_norm[: self.num_act] i = self.ptr self.buf_feature_ids[i].copy_(torch.as_tensor(self.row_feat_ids, dtype=torch.long)) self.buf_zone_ids[i].copy_(torch.as_tensor(self.row_zone_ids, dtype=torch.long)) self.buf_attn[i].copy_(torch.as_tensor(self.row_attn, dtype=torch.long)) self.buf_feature_vals[i].copy_(torch.as_tensor(self.row_feat_vals, dtype=torch.float32)) self.buf_rtg[i] = float(self._scale_rtg(self.desired_rtg_raw)) self.ptr = (self.ptr + 1) % self.L self.filled = min(self.filled + 1, self.L) self._write_model_inputs_from_ring() with torch.inference_mode(): with torch.amp.autocast(device_type="cpu", dtype=torch.bfloat16): out = self.model(self.t_feature_ids, self.t_feature_vals, self.t_zone_ids, self.t_attn, rtg=self.t_rtg, context=self.t_context) logits = out["action_logits"] last = logits[0, -1] # [K, n_bins] s, e = self.num_state, self.num_state + self.num_act temp = max(self.temperature, 1e-4) raw_logits = last[s:e] if torch.isnan(raw_logits).any() or torch.isinf(raw_logits).any(): raw_logits = torch.nan_to_num(raw_logits, nan=0.0, posinf=10.0, neginf=-10.0) # 1. Apply Temperature action_logits = raw_logits / temp # 2. Convert to Probabilities action_probs = F.softmax(action_logits, dim=-1) # [Num_Actions, n_bins] if torch.isnan(action_probs).any() or (action_probs < 0).any(): action_probs = torch.ones_like(action_probs) / action_probs.size(-1) # 3. Sample from distribution try: pred_bins = torch.multinomial(action_probs, num_samples=1).flatten().cpu().numpy().astype(np.int64) except RuntimeError as err: pred_bins = torch.argmax(action_probs, dim=-1).cpu().numpy().astype(np.int64) action = self.prev_action.copy() for j in range(self.num_act): action[j] = self._decode_bin_to_setpoint(int(pred_bins[j]), self.action_keys[j]) for j, k in enumerate(self.action_keys): if "clg" in k.lower(): action[j] = float(np.clip(action[j], self.dl.CLG_LOW, self.dl.CLG_HIGH)) else: action[j] = float(np.clip(action[j], self.dl.HTG_LOW, self.dl.HTG_HIGH)) DEADBAND_GAP = 3.0 for z in range(5): h_idx = 2 * z c_idx = 2 * z + 1 if action[c_idx] < action[h_idx] + DEADBAND_GAP: action[c_idx] = min(self.dl.CLG_HIGH, action[h_idx] + DEADBAND_GAP) if action[c_idx] < action[h_idx] + DEADBAND_GAP: action[h_idx] = max(self.dl.HTG_LOW, action[c_idx] - DEADBAND_GAP) if step < 5 or step % 1000 == 0: print(f"[DT] Step {step} Raw Bins: {pred_bins}") h_val = self._decode_bin_to_setpoint(int(pred_bins[0]), "htg_core") c_val = self._decode_bin_to_setpoint(int(pred_bins[1]), "clg_core") print(f"[DT] Step {step} Decoded Core: Heat {h_val:.2f} | Cool {c_val:.2f}") self.prev_action = action return action, {}, {} def make_policy(policy_type: str, **kwargs): policy_type = (policy_type or "").lower().strip() if policy_type == "dt": return DecisionTransformerPolicy5Zone( ckpt_path=kwargs["ckpt_path"], model_config_path=kwargs["model_config_path"], norm_stats_path=kwargs["norm_stats_path"], context_len=kwargs["context_len"], max_tokens_per_step=kwargs["max_tokens_per_step"], device=kwargs.get("device", "cpu"), temperature=kwargs.get("temperature", 0.8), ) raise ValueError(f"Unknown policy_type={policy_type}.")