Controller / utilities /comfort.py
Gen-HVAC's picture
Upload 6 files
ba7b0bc verified
import math
import numpy as np
import pandas as pd
from typing import Optional
def _sat_vapor_pressure_kpa(t_c: float) -> float:
return 0.61078 * math.exp((17.2694 * t_c) / (t_c + 237.29))
def pmv_ppd_fanger(
ta_c: float,
tr_c: Optional[float] = None,
rh: float = 50.0,
vel: float = 0.1,
met: float = 1.2,
clo: float = 0.7,
wme: float = 0.0,
):
if tr_c is None:
tr_c = ta_c
ta = ta_c
tr = tr_c
pa_kpa = rh / 100.0 * _sat_vapor_pressure_kpa(ta)
pa = pa_kpa * 1000.0
m = met * 58.15
w = wme * 58.15
mw = m - w
icl = 0.155 * clo
if icl <= 1e-9:
icl = 1e-9
if icl <= 0.078:
fcl = 1.0 + 1.29 * icl
else:
fcl = 1.05 + 0.645 * icl
hcf = 12.1 * math.sqrt(max(vel, 1e-9))
taa = ta + 273.0
tra = tr + 273.0
tcla = taa + (35.5 - ta) / (3.5 * icl + 0.1)
p1 = icl * fcl
p2 = p1 * 3.96
p3 = p1 * 100.0
p4 = p1 * taa
p5 = 308.7 - 0.028 * mw + p2 * ((tra / 100.0) ** 4)
xn = tcla / 100.0
xf = xn
eps = 0.00015
n = 0
while True:
xf = (xf + xn) / 2.0
tcl = 100.0 * xf - 273.0
hcn = 2.38 * (abs(100.0 * xf - taa) ** 0.25)
hc = max(hcf, hcn)
xn = (p5 + p4 * hc - p2 * (xf**4)) / (100.0 + p3 * hc)
n += 1
if n > 150 or abs(xn - xf) <= eps:
break
tcl = 100.0 * xn - 273.0
hl1 = 3.05 * 0.001 * (5733.0 - 6.99 * mw - pa)
hl2 = 0.42 * (mw - 58.15) if mw > 58.15 else 0.0
hl3 = 1.7 * 0.00001 * m * (5867.0 - pa)
hl4 = 0.0014 * m * (34.0 - ta)
hl5 = 3.96 * fcl * ((xn**4) - ((tra / 100.0) ** 4))
hl6 = fcl * hc * (tcl - ta)
ts = 0.303 * math.exp(-0.036 * m) + 0.028
pmv = ts * (mw - hl1 - hl2 - hl3 - hl4 - hl5 - hl6)
ppd = 100.0 - 95.0 * math.exp(-0.03353 * (pmv**4) - 0.2179 * (pmv**2))
return pmv, ppd
# ==========================================
def ashrae_any(df: pd.DataFrame) -> None:
if {"core_ash55_notcomfortable_summer", "core_ash55_notcomfortable_winter"}.issubset(df.columns):
# 1. Calculate raw combination
raw_val = np.maximum(
df["core_ash55_notcomfortable_summer"].astype(float),
df["core_ash55_notcomfortable_winter"].astype(float),
)
if "core_occ_count" in df.columns:
is_occupied = (df["core_occ_count"] > 1e-6).astype(float)
df["core_ash55_any_fixed"] = raw_val * is_occupied
else:
df["core_ash55_any_fixed"] = raw_val
else:
df["core_ash55_any_fixed"] = np.nan
def add_feature_availability_and_registry(
df: pd.DataFrame,
base_feature_cols,
new_feature_cols,
) -> None:
for c in base_feature_cols + new_feature_cols:
df[f"has_{c}"] = c in df.columns
present = [c for c in base_feature_cols + new_feature_cols if c in df.columns]
df["feature_registry"] = ";".join(present)
def compute_comfort_metrics_inplace(
df: pd.DataFrame,
location: str,
time_step_hours: float,
heating_sp: float,
cooling_sp: float,
zone_temp_keys,
zone_occ_keys,
rh_keys,
) -> None:
missing_t = [k for k in zone_temp_keys if k not in df.columns]
missing_o = [k for k in zone_occ_keys if k not in df.columns]
if missing_t or missing_o:
print(f"[{location}] WARNING: missing temp cols: {missing_t}, occ cols: {missing_o}")
df["comfort_violation_degCh"] = 0.0
df["comfort_violation_fixed_degCh"] = 0.0
df["pmv_weighted"] = np.nan
df["ppd_weighted"] = np.nan
df["rh_weighted"] = np.nan
return
temps = df[zone_temp_keys].to_numpy(dtype=np.float64)
occs = df[zone_occ_keys].to_numpy(dtype=np.float64)
total_occ = occs.sum(axis=1)
mean_temps = temps.mean(axis=1)
comfort_temp = np.where(
total_occ > 1e-6,
(temps * occs).sum(axis=1) / np.maximum(total_occ, 1e-6),
mean_temps,
)
if all(k in df.columns for k in rh_keys):
rhs = df[rh_keys].to_numpy(dtype=np.float64)
rh_weighted = np.where(
total_occ > 1e-6,
(rhs * occs).sum(axis=1) / np.maximum(total_occ, 1e-6),
rhs.mean(axis=1),
)
df["rh_weighted"] = rh_weighted
else:
df["rh_weighted"] = np.nan
RH_series = df["rh_weighted"].to_numpy(dtype=np.float64) if "rh_weighted" in df.columns else None
VEL = 0.1
MET = 1.2
CLO = 0.7
WME = 0.0
pmv_list = []
ppd_list = []
for i, t in enumerate(comfort_temp):
if total_occ[i] <= 1e-6:
pmv_list.append(0.0)
ppd_list.append(0.0)
continue
rh_i = float(RH_series[i]) if RH_series is not None and np.isfinite(RH_series[i]) else 50.0
rh_i = float(np.clip(rh_i, 0.0, 100.0))
pmv, ppd = pmv_ppd_fanger(
ta_c=float(t),
tr_c=float(t),
rh=rh_i,
vel=VEL,
met=MET,
clo=CLO,
wme=WME,
)
pmv_list.append(pmv)
ppd_list.append(ppd)
df["pmv_weighted"] = np.array(pmv_list, dtype=np.float64)
df["ppd_weighted"] = np.array(ppd_list, dtype=np.float64)
FIXED_HEAT = 21.0
FIXED_COOL = 24.0
fixed_lower = FIXED_HEAT - 0.5
fixed_upper = FIXED_COOL + 0.5
fixed_dev = np.clip(fixed_lower - comfort_temp, 0.0, None) + np.clip(comfort_temp - fixed_upper, 0.0, None)
is_occupied = (total_occ > 1e-6).astype(np.float64)
fixed_violation = fixed_dev * time_step_hours * is_occupied
df["comfort_violation_degCh"] = fixed_violation
df["comfort_violation_fixed_degCh"] = fixed_violation