Controller / utilities /policy.py
Gen-HVAC's picture
Upload 6 files
ba7b0bc verified
# unihvac/policy.py
from __future__ import annotations
import os
import json
from typing import Any, Dict, Tuple
import numpy as np
import torch
import torch.nn.functional as F
import requests
import numpy as np
import json
import requests
import numpy as np
class RemoteHTTPPolicy:
def __init__(self, server_url: str = "http://host.docker.internal:8000"):
self.server_url = server_url
self.predict_endpoint = f"{server_url}/predict"
self.reset_endpoint = f"{server_url}/reset"
print(f"[RemotePolicy] Connecting to {self.server_url}...")
def reset(self):
try:
requests.post(self.reset_endpoint, json={"message": "reset"})
print("[RemotePolicy] Remote buffer reset.")
except Exception as e:
print(f"[RemotePolicy] Reset failed: {e}")
def act(self, obs, info, step):
obs_list = np.array(obs, dtype=np.float32).tolist()
payload = {"step": int(step), "obs": obs_list, "info": {}}
try:
resp = requests.post(self.predict_endpoint, json=payload)
resp.raise_for_status()
action = np.array(resp.json()["action"], dtype=np.float32)
return action, {}, {}
except Exception as e:
print(f"[RemotePolicy] Error: {e}")
return np.array([21.0, 24.0] * 5, dtype=np.float32), {}, {}
def _get_int_env(name: str, default: int) -> int:
try:
v = int(os.environ.get(name, str(default)))
return v
except Exception:
return default
def _get_bool_env(name: str, default: bool) -> bool:
v = os.environ.get(name, None)
if v is None:
return default
return v.strip().lower() in ("1", "true", "yes", "y", "on")
# --------------------------------------------------------------------------------------
# Policies
# --------------------------------------------------------------------------------------
class ConstantSetpointPolicy5Zone:
"""
Constant rule-based controller: 5 zones × (htg, clg) each.
Returns action = [htg, clg] * 5.
"""
def __init__(self, heating_sp: float = 21.0, cooling_sp: float = 24.0):
self.heating_sp = float(heating_sp)
self.cooling_sp = float(cooling_sp)
self.action = np.array([self.heating_sp, self.cooling_sp] * 5, dtype=np.float32)
def reset(self):
return
def act(self, obs, info, step):
return self.action.copy(), {}, {}
class DecisionTransformerPolicy5Zone:
"""
CPU-safe DT policy with robust observation mapping and deadband protection.
"""
def __init__(
self,
ckpt_path: str,
model_config_path: str,
norm_stats_path: str,
context_len: int,
max_tokens_per_step: int,
device: str = "cpu",
temperature: float = 0.5,
):
import dataloader as dl
from embeddings import GeneralistComfortDT
# --- 1. CPU Settings ---
torch.set_grad_enabled(False)
torch.backends.mha.set_fastpath_enabled(True)
torch.backends.mkldnn.enabled = _get_bool_env("DT_MKLDNN", True)
import multiprocessing
avail = multiprocessing.cpu_count()
dt_threads = _get_int_env("DT_NUM_THREADS", min(18, avail))
torch.set_num_threads(dt_threads)
torch.set_num_interop_threads(1)
self.dl = dl
self.device = torch.device("cpu")
self.temperature = float(temperature)
# --- 2. Load Model ---
with open(model_config_path, "r") as f:
cfg = json.load(f)
cfg["CONTEXT_LEN"] = int(context_len)
self.L = int(context_len)
self.K = int(max_tokens_per_step)
self.model = GeneralistComfortDT(cfg).to(self.device)
ckpt = torch.load(ckpt_path, map_location="cpu")
self.model.load_state_dict(ckpt["model"], strict=True)
self.model.eval()
# --- 3. Load Stats ---
z = np.load(norm_stats_path)
self.obs_mean = z["obs_mean"].astype(np.float32)
self.obs_std = z["obs_std"].astype(np.float32)
self.act_mean = z["act_mean"].astype(np.float32)
self.act_std = z["act_std"].astype(np.float32)
self.max_return = float(z["max_return"][0]) if "max_return" in z else 1.0
self.rtg_scale_mode = "max_return"
self.rtg_constant_div = 1.0
self.desired_rtg_raw = -0.5
self.prev_action = np.array([21.0, 24.0] * 5, dtype=np.float32)
# --- 4. Define Keys (The Fix) ---
self.env_keys_order = [
'month', 'day_of_month', 'hour',
'outdoor_temp', 'core_temp', 'perim1_temp', 'perim2_temp', 'perim3_temp', 'perim4_temp',
'elec_power',
'core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count',
'outdoor_dewpoint', 'outdoor_wetbulb',
'core_rh', 'perim1_rh', 'perim2_rh', 'perim3_rh', 'perim4_rh',
'core_ash55_notcomfortable_summer', 'core_ash55_notcomfortable_winter', 'core_ash55_notcomfortable_any',
'p1_ash55_notcomfortable_any', 'p2_ash55_notcomfortable_any', 'p3_ash55_notcomfortable_any', 'p4_ash55_notcomfortable_any',
'total_electricity_HVAC'
]
self.model_state_keys = [
'outdoor_temp', 'core_temp', 'perim1_temp', 'perim2_temp', 'perim3_temp', 'perim4_temp',
'elec_power',
'core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count',
'outdoor_dewpoint', 'outdoor_wetbulb',
'core_rh', 'perim1_rh', 'perim2_rh', 'perim3_rh', 'perim4_rh',
'core_ash55_notcomfortable_summer', 'core_ash55_notcomfortable_winter', 'core_ash55_notcomfortable_any',
'p1_ash55_notcomfortable_any', 'p2_ash55_notcomfortable_any', 'p3_ash55_notcomfortable_any', 'p4_ash55_notcomfortable_any',
'month', 'hour'
]
self.obs_indices = []
for k in self.model_state_keys:
try:
self.obs_indices.append(self.env_keys_order.index(k))
except ValueError:
print(f"Key {k} missing")
self.obs_indices.append(0) # Fallback
self.obs_indices = np.array(self.obs_indices, dtype=np.int64)
self.action_keys = [
"htg_core", "clg_core", "htg_p1", "clg_p1", "htg_p2", "clg_p2",
"htg_p3", "clg_p3", "htg_p4", "clg_p4",
]
# Meta info
self.s_meta = [self.dl.parse_feature_identity(k, is_action=False) for k in self.model_state_keys]
self.a_meta = [self.dl.parse_feature_identity(k, is_action=True) for k in self.action_keys]
self.num_act = min(len(self.a_meta), self.K)
self.num_state = min(len(self.s_meta), self.K - self.num_act)
# --- 5. Precompute Token Layouts ---
self.row_feat_ids = np.zeros((self.K,), dtype=np.int64)
self.row_zone_ids = np.zeros((self.K,), dtype=np.int64)
self.row_attn = np.zeros((self.K,), dtype=np.int64)
self.row_feat_vals = np.zeros((self.K,), dtype=np.float32)
if self.num_state > 0:
s_meta = self.s_meta[:self.num_state]
self.row_feat_ids[:self.num_state] = np.array([m[0] for m in s_meta], dtype=np.int64)
self.row_zone_ids[:self.num_state] = np.array([m[1] for m in s_meta], dtype=np.int64)
self.row_attn[:self.num_state] = 1
if self.num_act > 0:
start = self.num_state
end = start + self.num_act
a_meta = self.a_meta[:self.num_act]
self.row_feat_ids[start:end] = np.array([m[0] for m in a_meta], dtype=np.int64)
self.row_zone_ids[start:end] = np.array([m[1] for m in a_meta], dtype=np.int64)
self.row_attn[start:end] = 1
# Context Dimension from Config
self.context_dim = cfg.get("CONTEXT_DIM", 10)
# Buffers
self.buf_feature_ids = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device)
self.buf_feature_vals = torch.zeros((self.L, self.K), dtype=torch.float32, device=self.device)
self.buf_zone_ids = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device)
self.buf_attn = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device)
self.buf_rtg = torch.zeros((self.L,), dtype=torch.float32, device=self.device)
# Inputs
self.t_feature_ids = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device)
self.t_feature_vals = torch.zeros((1, self.L, self.K), dtype=torch.float32, device=self.device)
self.t_zone_ids = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device)
self.t_attn = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device)
self.t_rtg = torch.zeros((1, self.L), dtype=torch.float32, device=self.device)
self.ptr = 0
self.filled = 0
#Context Buffer
self.t_context = torch.zeros((1, self.context_dim), dtype=torch.float32, device=self.device)
def reset(self):
self.buf_feature_ids.zero_()
self.buf_feature_vals.zero_()
self.buf_zone_ids.zero_()
self.buf_attn.zero_()
self.buf_rtg.zero_()
self.t_feature_ids.zero_()
self.t_feature_vals.zero_()
self.t_zone_ids.zero_()
self.t_attn.zero_()
self.t_rtg.zero_()
self.prev_action = np.array([21.0, 24.0] * 5, dtype=np.float32)
self.ptr = 0
self.filled = 0
def _decode_bin_to_setpoint(self, bin_id: int, key: str) -> float:
if "clg" in key.lower() or "cool" in key.lower():
lo, hi = self.dl.CLG_LOW, self.dl.CLG_HIGH
else:
lo, hi = self.dl.HTG_LOW, self.dl.HTG_HIGH
x = float(bin_id) / float(self.dl.NUM_ACTION_BINS - 1)
return lo + x * (hi - lo)
def _scale_rtg(self, rtg_raw: float) -> float:
if self.rtg_scale_mode == "max_return":
scale = max(self.max_return, 1e-6)
return float(rtg_raw) / scale
return float(rtg_raw) / float(self.rtg_constant_div)
def _write_model_inputs_from_ring(self):
if self.filled < self.L:
start = self.L - self.filled
self.t_feature_ids.zero_(); self.t_feature_vals.zero_()
self.t_zone_ids.zero_(); self.t_attn.zero_(); self.t_rtg.zero_()
self.t_feature_ids[0, start:].copy_(self.buf_feature_ids[: self.filled])
self.t_feature_vals[0, start:].copy_(self.buf_feature_vals[: self.filled])
self.t_zone_ids[0, start:].copy_(self.buf_zone_ids[: self.filled])
self.t_attn[0, start:].copy_(self.buf_attn[: self.filled])
self.t_rtg[0, start:].copy_(self.buf_rtg[: self.filled])
return
p = self.ptr
n1 = self.L - p
self.t_feature_ids[0, :n1].copy_(self.buf_feature_ids[p:])
self.t_feature_vals[0, :n1].copy_(self.buf_feature_vals[p:])
self.t_zone_ids[0, :n1].copy_(self.buf_zone_ids[p:])
self.t_attn[0, :n1].copy_(self.buf_attn[p:])
self.t_rtg[0, :n1].copy_(self.buf_rtg[p:])
self.t_feature_ids[0, n1:].copy_(self.buf_feature_ids[:p])
self.t_feature_vals[0, n1:].copy_(self.buf_feature_vals[:p])
self.t_zone_ids[0, n1:].copy_(self.buf_zone_ids[:p])
self.t_attn[0, n1:].copy_(self.buf_attn[:p])
self.t_rtg[0, n1:].copy_(self.buf_rtg[:p])
def act(self, obs: Any, info: Dict[str, Any], step: int) -> Tuple[np.ndarray, Dict, Dict]:
# Map raw obs (30 items) model obs (28 items)
obs_raw = np.asarray(obs, dtype=np.float32)
env_map = dict(zip(self.env_keys_order, obs_raw))
obs_ordered = np.array([env_map.get(k, 0.0) for k in self.model_state_keys], dtype=np.float32)
# --- 2. Normalization ---
obs_norm = obs_ordered.copy()
D = min(len(self.obs_mean), obs_norm.shape[0])
eps = 1e-6
obs_norm[:D] = (obs_norm[:D] - self.obs_mean[:D]) / (self.obs_std[:D] + eps)
# =========================================================================
# 3. CALCULATE CONTEXT VECTOR (Dynamic)
# =========================================================================
out_temp = env_map.get('outdoor_temp', 0.0)
out_dew = env_map.get('outdoor_dewpoint', 0.0)
hour = env_map.get('hour', 0.0)
month = env_map.get('month', 1.0)
occ_total = 0.0
occ_keys = ['core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count']
for k in occ_keys:
if env_map.get(k, 0.0) > 0.5: # Binary occupancy check
occ_total += 1.0
occ_frac = occ_total / 5.0
hr_sin = np.sin(2 * np.pi * hour / 24.0)
hr_cos = np.cos(2 * np.pi * hour / 24.0)
mth_norm = month - 1.0
mth_sin = np.sin(2 * np.pi * mth_norm / 12.0)
mth_cos = np.cos(2 * np.pi * mth_norm / 12.0)
ctx_vec = np.array([
out_temp, 0.0, # Temp Mean, Temp Std
out_dew, # Dewpoint
occ_frac, # Occ Fraction
hr_sin, hr_cos, # Hour
mth_sin, mth_cos, # Month
0.0, 0.0 # Spares
], dtype=np.float32)
self.t_context[0].copy_(torch.from_numpy(ctx_vec))
act_norm = self.prev_action.copy()
A = min(len(self.act_mean), act_norm.shape[0])
act_norm[:A] = (act_norm[:A] - self.act_mean[:A]) / self.act_std[:A]
self.row_feat_vals.fill(0.0)
if self.num_state > 0:
self.row_feat_vals[: self.num_state] = obs_norm[: self.num_state]
if self.num_act > 0:
s, e = self.num_state, self.num_state + self.num_act
if step < 5:
good_action = np.array([22.0, 25.0] * 5, dtype=np.float32)
good_norm = good_action.copy()
A_len = min(len(self.act_mean), good_norm.shape[0])
good_norm[:A_len] = (good_norm[:A_len] - self.act_mean[:A_len]) / self.act_std[:A_len]
self.row_feat_vals[s:e] = good_norm[: self.num_act]
else:
self.row_feat_vals[s:e] = act_norm[: self.num_act]
i = self.ptr
self.buf_feature_ids[i].copy_(torch.as_tensor(self.row_feat_ids, dtype=torch.long))
self.buf_zone_ids[i].copy_(torch.as_tensor(self.row_zone_ids, dtype=torch.long))
self.buf_attn[i].copy_(torch.as_tensor(self.row_attn, dtype=torch.long))
self.buf_feature_vals[i].copy_(torch.as_tensor(self.row_feat_vals, dtype=torch.float32))
self.buf_rtg[i] = float(self._scale_rtg(self.desired_rtg_raw))
self.ptr = (self.ptr + 1) % self.L
self.filled = min(self.filled + 1, self.L)
self._write_model_inputs_from_ring()
with torch.inference_mode():
with torch.amp.autocast(device_type="cpu", dtype=torch.bfloat16):
out = self.model(self.t_feature_ids, self.t_feature_vals, self.t_zone_ids, self.t_attn, rtg=self.t_rtg, context=self.t_context)
logits = out["action_logits"]
last = logits[0, -1] # [K, n_bins]
s, e = self.num_state, self.num_state + self.num_act
temp = max(self.temperature, 1e-4)
raw_logits = last[s:e]
if torch.isnan(raw_logits).any() or torch.isinf(raw_logits).any():
raw_logits = torch.nan_to_num(raw_logits, nan=0.0, posinf=10.0, neginf=-10.0)
# 1. Apply Temperature
action_logits = raw_logits / temp
# 2. Convert to Probabilities
action_probs = F.softmax(action_logits, dim=-1) # [Num_Actions, n_bins]
if torch.isnan(action_probs).any() or (action_probs < 0).any():
action_probs = torch.ones_like(action_probs) / action_probs.size(-1)
# 3. Sample from distribution
try:
pred_bins = torch.multinomial(action_probs, num_samples=1).flatten().cpu().numpy().astype(np.int64)
except RuntimeError as err:
pred_bins = torch.argmax(action_probs, dim=-1).cpu().numpy().astype(np.int64)
action = self.prev_action.copy()
for j in range(self.num_act):
action[j] = self._decode_bin_to_setpoint(int(pred_bins[j]), self.action_keys[j])
for j, k in enumerate(self.action_keys):
if "clg" in k.lower():
action[j] = float(np.clip(action[j], self.dl.CLG_LOW, self.dl.CLG_HIGH))
else:
action[j] = float(np.clip(action[j], self.dl.HTG_LOW, self.dl.HTG_HIGH))
DEADBAND_GAP = 3.0
for z in range(5):
h_idx = 2 * z
c_idx = 2 * z + 1
if action[c_idx] < action[h_idx] + DEADBAND_GAP:
action[c_idx] = min(self.dl.CLG_HIGH, action[h_idx] + DEADBAND_GAP)
if action[c_idx] < action[h_idx] + DEADBAND_GAP:
action[h_idx] = max(self.dl.HTG_LOW, action[c_idx] - DEADBAND_GAP)
if step < 5 or step % 1000 == 0:
print(f"[DT] Step {step} Raw Bins: {pred_bins}")
h_val = self._decode_bin_to_setpoint(int(pred_bins[0]), "htg_core")
c_val = self._decode_bin_to_setpoint(int(pred_bins[1]), "clg_core")
print(f"[DT] Step {step} Decoded Core: Heat {h_val:.2f} | Cool {c_val:.2f}")
self.prev_action = action
return action, {}, {}
def make_policy(policy_type: str, **kwargs):
policy_type = (policy_type or "").lower().strip()
if policy_type == "dt":
return DecisionTransformerPolicy5Zone(
ckpt_path=kwargs["ckpt_path"],
model_config_path=kwargs["model_config_path"],
norm_stats_path=kwargs["norm_stats_path"],
context_len=kwargs["context_len"],
max_tokens_per_step=kwargs["max_tokens_per_step"],
device=kwargs.get("device", "cpu"),
temperature=kwargs.get("temperature", 0.8),
)
raise ValueError(f"Unknown policy_type={policy_type}.")