Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import math | |
| import pandas as pd | |
| import re | |
| import math | |
| from functools import lru_cache | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| def extract_mm(text): | |
| if pd.isna(text): | |
| return None | |
| m = re.findall(r"(\d+)\s*mm", str(text).lower()) | |
| return int(m[0]) if m else None | |
| def extract_measured_u(text): | |
| if pd.isna(text): | |
| return None | |
| t = str(text).lower() | |
| if "average thermal transmittance" not in t: | |
| return None | |
| nums = re.findall(r"([0-9]*\.?[0-9]+)", t) | |
| if not nums: | |
| return None | |
| u = float(nums[0]) | |
| return None if u < 0.05 else u # treat 0.00 etc. as missing | |
| def floor_ins_thickness_s11(sap_band_letter): | |
| # From Table S11 (England & Wales column) | |
| # A,B,C,D,E,F: none; G: 25; H: 75; I:100; J:100; K:100; L:100 | |
| m = { | |
| "A": 0, "B": 0, | |
| "C": 0, "D": 0, "E": 0, "F": 0, | |
| "G": 0, | |
| "H": 0, | |
| "I": 25, | |
| "J": 75, | |
| "K": 100, | |
| "L": 100, | |
| } | |
| return m.get(sap_band_letter, 0) | |
| def estimate_B(a, p = None): | |
| if p is None: | |
| return 0.5 * math.sqrt(a) | |
| else: | |
| return 2 * a/p | |
| def classify_floor_boundary(desc): | |
| if pd.isna(desc): | |
| return "ground" | |
| t = str(desc).lower() | |
| # --- 1. Measured U-value always wins --- | |
| if "average thermal transmittance" in t: | |
| return "measured_u_value" | |
| # --- 2. No heat loss --- | |
| if any(x in t for x in [ | |
| "another dwelling below", | |
| "other premises below", | |
| "same dwelling below", | |
| "eiddo arall islaw" | |
| ]): | |
| return "another_dwelling_below" | |
| # --- 3. Partially heated space below (S5.7) --- | |
| if "partially heated" in t: | |
| return "partially_heated_below" | |
| # --- 4. Exposed to outside air (S5.6) --- | |
| if "to external air" in t or "external air" in t: | |
| return "exposed" | |
| # --- 5. Semi-exposed: unheated enclosed space (S5.6) --- | |
| if "to unheated space" in t or "unheated space" in t or "garage" in t: | |
| return "semi_exposed" | |
| # --- 6. Default: ground floor (S5.5) --- | |
| return "ground" | |
| def effective_floor_insulation_mm(desc, sap_band_letter): | |
| """ | |
| SAP S11 rule: | |
| - if retrofitted insulation → max(50 mm, table value) | |
| - otherwise → table value | |
| """ | |
| base_mm = floor_ins_thickness_s11(sap_band_letter) | |
| t = str(desc).lower() | |
| if "insulated" in t: | |
| return max(50, base_mm) | |
| return base_mm | |
| def classify_wall_type_s3(desc: str) -> str: | |
| if desc is None or not isinstance(desc, str): | |
| return "solid brick" # safe SAP fallback | |
| t = desc.lower() | |
| # --- Welsh solid brick --- | |
| if "briciau solet" in t: | |
| return "solid brick" | |
| # --- Stone / solid masonry --- | |
| if any(x in t for x in [ | |
| "stone", | |
| "sandstone", | |
| "limestone", | |
| "granite", | |
| "whinstone", | |
| "whin", | |
| ]): | |
| return "stone" | |
| # --- Cob --- | |
| if "cob" in t: | |
| return "cob" | |
| # --- Solid brick --- | |
| if "solid brick" in t: | |
| return "solid brick" | |
| # --- Cavity --- | |
| if "cavity" in t: | |
| return "cavity" | |
| # --- Timber frame --- | |
| if "timber frame" in t: | |
| return "timber frame" | |
| # --- System build --- | |
| if "system built" in t: | |
| return "system build" | |
| # --- Park home --- | |
| if "park home" in t: | |
| return "park home" | |
| # --- Basement walls (SAP treats as solid masonry) --- | |
| if "basement wall" in t: | |
| return "stone" | |
| # --- Fallback (SAP-safe) --- | |
| return "solid brick" | |
| def w_look_up_s3(desc, sap_band_letter, s3): | |
| wall_type = classify_wall_type_s3(desc) | |
| row = s3[ | |
| (s3["Wall Type"] == wall_type) & | |
| (s3["sap_band"] == sap_band_letter) | |
| ] | |
| if row.empty: | |
| raise ValueError( | |
| f"No S3 wall thickness for wall_type={wall_type}, age={sap_band_letter}" | |
| ) | |
| return float(row["thickness_mm"].iloc[0])/ 1000.0 | |
| # ---------- S5.5 solid ground floor ---------- | |
| def u_solid_ground_floor( | |
| desc_floor, | |
| desc_wall, | |
| area, | |
| sap_band_letter, | |
| s3, | |
| p=None, | |
| lg=1.5 | |
| ): | |
| """ | |
| SAP RdSAP 2012 S5.5 – Solid ground floor (ISO 13370) | |
| """ | |
| Rsi = 0.17 | |
| Rse = 0.04 | |
| # wall thickness from Table S3 | |
| w = w_look_up_s3(desc_wall, sap_band_letter, s3) | |
| # insulation thickness | |
| dins_mm = effective_floor_insulation_mm(desc_floor, sap_band_letter) | |
| Rf = 0.001 * dins_mm / 0.035 if dins_mm > 0 else 0.0 | |
| dt = w + lg * (Rsi + Rf + Rse) | |
| # geometric factor | |
| if p is None: | |
| # assume square plan: P = 4√A → B = √A / 2 | |
| B = math.sqrt(area) / 2 | |
| else: | |
| B = 2 * area / p | |
| if dt < B: | |
| return (2 * lg * math.log(math.pi * B / dt + 1.0)) / (math.pi * B + dt) | |
| else: | |
| return lg / (0.457 * B + dt) | |
| def u_suspended_ground_floor( | |
| desc_floor, | |
| desc_wall, | |
| area, | |
| sap_band_letter, | |
| s3, | |
| p=None, | |
| lg=1.5 | |
| ): | |
| """ | |
| SAP RdSAP 2012 S5.5 – Suspended ground floor | |
| """ | |
| Rsi = 0.17 | |
| Uw = 1.5 | |
| h = 0.3 | |
| v = 5.0 | |
| fw = 0.05 | |
| e = 0.003 | |
| # wall thickness from S3 | |
| w = w_look_up_s3(desc_wall, sap_band_letter, s3) | |
| # insulation resistance | |
| dins_mm = effective_floor_insulation_mm(desc_floor, sap_band_letter) | |
| if dins_mm > 0: | |
| Rf = (0.001 * dins_mm / 0.035) + 0.2 | |
| else: | |
| Rf = 0.2 | |
| dg = w + lg * (Rsi + 0.04) | |
| # geometry | |
| if p is None: | |
| B = math.sqrt(area) / 2 | |
| else: | |
| B = 2 * area / p | |
| Ug = (2 * lg * math.log(math.pi * B / dg + 1.0)) / (math.pi * B + dg) | |
| Ux = (2 * h * Uw / B) + (1450 * e * v * fw / B) | |
| return 1.0 / (2 * Rsi + Rf + 1.0 / (Ug + Ux)) | |
| def bin_floor_insulation_s12(desc, mm): | |
| """ | |
| SAP RdSAP 2012 Table S12 insulation binning. | |
| Used ONLY for exposed / semi-exposed floors. | |
| """ | |
| t = str(desc).lower() | |
| # Explicitly uninsulated | |
| if "no insulation" in t or "uninsulated" in t or "average thermal transmittance" in t: | |
| return "as_built" | |
| # Measured thickness → bin | |
| if mm is not None: | |
| if mm >= 150: | |
| return "150mm" | |
| elif mm >= 100: | |
| return "100mm" | |
| elif mm >= 50: | |
| return "50mm" | |
| else: | |
| return "as_built" | |
| # Insulated but unknown thickness | |
| if "insulated" in t: | |
| return "50mm" | |
| # Default | |
| return "as_built" | |
| def lookup_s12_u_value(sap_band_letter, insulation_class): | |
| """ | |
| SAP RdSAP 2012 Table S12 (England & Wales). | |
| """ | |
| table = { | |
| # A–G | |
| "A": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "B": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "C": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "D": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "E": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "F": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "G": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| # H–I | |
| "H": {"as_built": 0.51, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "I": {"as_built": 0.51, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| # J | |
| "J": {"as_built": 0.25, "50mm": 0.25, "100mm": 0.25, "150mm": 0.22}, | |
| # K | |
| "K": {"as_built": 0.22, "50mm": 0.22, "100mm": 0.22, "150mm": 0.22}, | |
| # L | |
| "L": {"as_built": 0.22, "50mm": 0.22, "100mm": 0.22, "150mm": 0.22}, | |
| } | |
| return table[sap_band_letter][insulation_class] | |
| def floor_u_value_s12(desc, sap_band_letter): | |
| """ | |
| SAP RdSAP 2012 S5.6 – Exposed / Semi-exposed floors | |
| Uses Table S12 only. | |
| """ | |
| mm = extract_mm(desc) | |
| ins_class = bin_floor_insulation_s12(desc, mm) | |
| return lookup_s12_u_value(sap_band_letter, ins_class) | |
| def floors_u_rule(row,s3): | |
| boundary = classify_floor_boundary(row["FLOOR_DESCRIPTION"]) | |
| if boundary == "another_dwelling_below": | |
| return 0.0 | |
| if boundary == "partially_heated_below": | |
| return 0.7 | |
| if boundary in ["exposed", "semi_exposed"]: | |
| return floor_u_value_s12( | |
| desc = row["FLOOR_DESCRIPTION"], | |
| sap_band_letter=row["sap_band_letter"] | |
| ) | |
| if boundary == "ground": | |
| if "suspended" in str(row["FLOOR_DESCRIPTION"]).lower(): | |
| return u_suspended_ground_floor( | |
| desc_floor=row["FLOOR_DESCRIPTION"], | |
| desc_wall=row["WALLS_DESCRIPTION"], | |
| area=row["TOTAL_FLOOR_AREA"], | |
| sap_band_letter=row["sap_band_letter"], | |
| s3=s3 | |
| ) | |
| else: | |
| return u_solid_ground_floor( | |
| desc_floor=row["FLOOR_DESCRIPTION"], | |
| desc_wall=row["WALLS_DESCRIPTION"], | |
| area=row["TOTAL_FLOOR_AREA"], | |
| sap_band_letter=row["sap_band_letter"], | |
| s3=s3 | |
| ) | |
| if boundary == "measured_u_value": | |
| return extract_measured_u(row["FLOOR_DESCRIPTION"]) | |
| return None | |
| def floors_insulation_type(row): | |
| mm = extract_mm(row["FLOOR_DESCRIPTION"]) | |
| desc = row["FLOOR_DESCRIPTION"] | |
| return bin_floor_insulation_s12(desc,mm) | |
| def floor_feature_engineering(df: pd.DataFrame, s3: pd.DataFrame) -> pd.DataFrame: | |
| df = df.copy() | |
| df["FLOOR_U_VALUE"] = df.apply(lambda row: floors_u_rule(row,s3), axis=1) | |
| df["FLOOR_INSULATION_TYPE"] = df.apply(floors_insulation_type, axis=1) | |
| df["FLOOR_BOUNDARY_TYPE"] = df["FLOOR_DESCRIPTION"].apply(classify_floor_boundary) | |
| return df | |
| # ============================================================ | |
| # FAST + PHYSICS-PRESERVING FLOOR FEATURE ENGINEERING | |
| # (no area binning; caches SAP-dependent parameters; vectorized math) | |
| # ============================================================ | |
| # ----------------------------- | |
| # Helpers: parsing (vectorized) | |
| # ----------------------------- | |
| _MM_RE = re.compile(r"(\d+)\s*mm", flags=re.IGNORECASE) | |
| _U_RE = re.compile(r"([0-9]*\.?[0-9]+)", flags=re.IGNORECASE) | |
| def extract_mm_vectorised(series: pd.Series) -> pd.Series: | |
| """Extract first '<int> mm' -> float mm; else NaN.""" | |
| s = series.fillna("").astype(str).str.lower() | |
| mm = s.str.extract(r"(\d+)\s*mm", expand=False) | |
| return pd.to_numeric(mm, errors="coerce") | |
| def extract_measured_u_vectorised(series: pd.Series) -> pd.Series: | |
| """ | |
| Robust vectorized extraction of measured floor U-values from EPC text. | |
| Handles '=', ':', encoding junk, and keeps small non-zero values. | |
| """ | |
| s = ( | |
| series.fillna("") | |
| .astype(str) | |
| .str.lower() | |
| .str.replace("¦", "", regex=False) | |
| .str.replace("?", "", regex=False) | |
| .str.replace(",", ".", regex=False) | |
| ) | |
| mask = s.str.contains("average thermal transmittance") | |
| # Allow '=', ':' or whitespace before number | |
| num = s.where(mask).str.extract( | |
| r"average thermal transmittance\s*[:=]?\s*([0-9]*\.?[0-9]+)", | |
| expand=False | |
| ) | |
| u = pd.to_numeric(num, errors="coerce") | |
| # Only treat true placeholders as missing | |
| u = u.where(~(u.abs() < 1e-9), np.nan) | |
| return u | |
| def classify_floor_boundary_vectorised(floor_desc: pd.Series) -> pd.Series: | |
| """ | |
| Vectorized boundary classification. | |
| """ | |
| s = floor_desc.fillna("").astype(str).str.lower() | |
| out = pd.Series("ground", index=floor_desc.index, dtype="object") | |
| # 1) measured U wins | |
| measured = s.str.contains("average thermal transmittance") | |
| out[measured] = "measured_u_value" | |
| # 2) no heat loss: another dwelling below | |
| below = s.str.contains( | |
| "another dwelling below|other premises below|same dwelling below|eiddo arall islaw" | |
| ) | |
| out[below & ~measured] = "another_dwelling_below" | |
| # 3) partially heated | |
| ph = s.str.contains("partially heated") | |
| out[ph & ~measured & ~below] = "partially_heated_below" | |
| # 4) exposed | |
| exposed = s.str.contains(r"to external air|external air") | |
| out[exposed & ~measured & ~below & ~ph] = "exposed" | |
| # 5) semi-exposed | |
| semi = s.str.contains(r"to unheated space|unheated space|garage") | |
| out[semi & ~measured & ~below & ~ph & ~exposed] = "semi_exposed" | |
| # default already ground | |
| return out | |
| # ----------------------------------------- | |
| # SAP S11: base insulation thickness by band | |
| # ----------------------------------------- | |
| _S11_BASE_MM = { | |
| "A": 0, "B": 0, "C": 0, "D": 0, "E": 0, "F": 0, "G": 0, | |
| "H": 0, | |
| "I": 25, | |
| "J": 75, | |
| "K": 100, | |
| "L": 100, | |
| } | |
| def effective_floor_insulation_mm_vectorised(floor_desc: pd.Series, sap_band_letter: pd.Series) -> pd.Series: | |
| """ | |
| SAP S11 rule: | |
| - base_mm from S11 map | |
| - if 'insulated' in description => max(50, base_mm) | |
| Returns float mm. | |
| """ | |
| band = sap_band_letter.fillna("").astype(str).str.strip().str.upper() | |
| base = band.map(_S11_BASE_MM).fillna(0).astype(float) | |
| s = floor_desc.fillna("").astype(str).str.lower() | |
| insulated = s.str.contains("insulated") | |
| eff = base.copy() | |
| eff[insulated] = np.maximum(50.0, base[insulated]) | |
| return eff | |
| # ----------------------------- | |
| # Wall type for S3 thickness map | |
| # (SAP-safe categorization) | |
| # ----------------------------- | |
| def classify_wall_type_s3_vectorised(walls_desc: pd.Series) -> pd.Series: | |
| s = walls_desc.fillna("").astype(str).str.lower() | |
| out = pd.Series("solid brick", index=walls_desc.index, dtype="object") | |
| out[s.str.contains("briciau solet")] = "solid brick" | |
| stone = s.str.contains("stone|sandstone|limestone|granite|whinstone|\\bwhin\\b") | |
| out[stone] = "stone" | |
| out[s.str.contains("cob")] = "cob" | |
| out[s.str.contains("solid brick")] = "solid brick" | |
| out[s.str.contains("cavity")] = "cavity" | |
| out[s.str.contains("timber frame")] = "timber frame" | |
| out[s.str.contains("system built")] = "system build" | |
| out[s.str.contains("park home")] = "park home" | |
| out[s.str.contains("basement wall")] = "stone" | |
| return out | |
| # ----------------------------- | |
| # Table S12 lookup (fast dict) | |
| # ----------------------------- | |
| _S12_TABLE = { | |
| # A–G | |
| "A": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "B": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "C": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "D": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "E": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "F": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "G": {"as_built": 1.20, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| # H–I | |
| "H": {"as_built": 0.51, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| "I": {"as_built": 0.51, "50mm": 0.50, "100mm": 0.30, "150mm": 0.22}, | |
| # J | |
| "J": {"as_built": 0.25, "50mm": 0.25, "100mm": 0.25, "150mm": 0.22}, | |
| # K–L | |
| "K": {"as_built": 0.22, "50mm": 0.22, "100mm": 0.22, "150mm": 0.22}, | |
| "L": {"as_built": 0.22, "50mm": 0.22, "100mm": 0.22, "150mm": 0.22}, | |
| } | |
| def bin_floor_insulation_s12_vectorised(floor_desc: pd.Series, mm_measured: pd.Series) -> pd.Series: | |
| """ | |
| SAP RdSAP Table S12 binning (for exposed/semi-exposed). | |
| Returns class: as_built, 50mm, 100mm, 150mm | |
| """ | |
| s = floor_desc.fillna("").astype(str).str.lower() | |
| # start as as_built | |
| out = pd.Series("as_built", index=floor_desc.index, dtype="object") | |
| explicit_unins = s.str.contains("no insulation|uninsulated|average thermal transmittance") | |
| out[explicit_unins] = "as_built" | |
| # measured thickness bins | |
| mm = mm_measured | |
| out[(mm >= 50) & (mm < 100) & ~explicit_unins] = "50mm" | |
| out[(mm >= 100) & (mm < 150) & ~explicit_unins] = "100mm" | |
| out[(mm >= 150) & ~explicit_unins] = "150mm" | |
| # insulated but unknown thickness -> assume 50mm | |
| insulated_unknown = s.str.contains("insulated") & mm.isna() & ~explicit_unins | |
| out[insulated_unknown] = "50mm" | |
| return out | |
| def lookup_s12_u_vectorised(sap_band_letter: pd.Series, ins_class: pd.Series) -> pd.Series: | |
| band = sap_band_letter.fillna("").astype(str).str.strip().str.upper() | |
| # map (band, class) -> value via dict of dicts | |
| # faster: create a combined key | |
| keys = list(_S12_TABLE.keys()) | |
| # We'll do row-wise via small map, but without apply on full DF: | |
| # Convert to numpy and loop in Python is OK here because only exposed/semi_exposed subset is used. | |
| out = np.full(len(band), np.nan, dtype=float) | |
| b = band.to_numpy() | |
| c = ins_class.to_numpy() | |
| for i in range(len(out)): | |
| bi = b[i] | |
| ci = c[i] | |
| if bi in _S12_TABLE and ci in _S12_TABLE[bi]: | |
| out[i] = _S12_TABLE[bi][ci] | |
| return pd.Series(out, index=sap_band_letter.index) | |
| # ----------------------------- | |
| # S3 thickness mapping (prebuilt) | |
| # ----------------------------- | |
| def build_s3_thickness_map(s3: pd.DataFrame) -> dict[tuple[str, str], float]: | |
| """ | |
| Expect s3 columns: ['Wall Type','sap_band','thickness_mm']. | |
| Returns meters. | |
| """ | |
| tmp = s3.copy() | |
| tmp["Wall Type"] = tmp["Wall Type"].astype(str).str.strip().str.lower() | |
| tmp["sap_band"] = tmp["sap_band"].astype(str).str.strip().str.upper() | |
| # meters | |
| tmp["thickness_m"] = tmp["thickness_mm"].astype(float) / 1000.0 | |
| return {(r["Wall Type"], r["sap_band"]): r["thickness_m"] for _, r in tmp.iterrows()} | |
| # ----------------------------- | |
| # Physics core: vectorized formulas | |
| # ----------------------------- | |
| def _u_solid_ground_floor_vectorised(B: np.ndarray, dt: np.ndarray, lg: float = 1.5) -> np.ndarray: | |
| """ | |
| Vectorized SAP/RdSAP 2012 S5.5 solid ground floor equation. | |
| B, dt arrays in meters. | |
| """ | |
| # two regimes: dt < B else | |
| out = np.empty_like(B, dtype=float) | |
| mask = dt < B | |
| # (2*lg*ln(pi*B/dt + 1)) / (pi*B + dt) | |
| out[mask] = (2.0 * lg * np.log(np.pi * B[mask] / dt[mask] + 1.0)) / (np.pi * B[mask] + dt[mask]) | |
| # lg / (0.457*B + dt) | |
| out[~mask] = lg / (0.457 * B[~mask] + dt[~mask]) | |
| return out | |
| def _u_suspended_ground_floor_vectorised(B: np.ndarray, dg: np.ndarray, Rf: np.ndarray, lg: float = 1.5) -> np.ndarray: | |
| """ | |
| Vectorized SAP/RdSAP 2012 S5.5 suspended ground floor equation. | |
| Uses constants from your function. | |
| """ | |
| Rsi = 0.17 | |
| Uw = 1.5 | |
| h = 0.3 | |
| v = 5.0 | |
| fw = 0.05 | |
| e = 0.003 | |
| Ug = (2.0 * lg * np.log(np.pi * B / dg + 1.0)) / (np.pi * B + dg) | |
| Ux = (2.0 * h * Uw / B) + (1450.0 * e * v * fw / B) | |
| return 1.0 / (2.0 * Rsi + Rf + 1.0 / (Ug + Ux)) | |
| # ----------------------------- | |
| # Cached SAP-dependent parameters (NO area binning) | |
| # ----------------------------- | |
| def build_floor_param_caches(s3: pd.DataFrame): | |
| """ | |
| Returns two cached functions: | |
| - solid_dt(wall_type_s3, sap_band, ins_mm) -> dt | |
| - susp_params(wall_type_s3, sap_band, ins_mm) -> (dg, Rf) | |
| """ | |
| thickness_map = build_s3_thickness_map(s3) | |
| def solid_dt(wall_type: str, sap_band: str, ins_mm: int, lg: float = 1.5) -> float: | |
| # constants | |
| Rsi = 0.17 | |
| Rse = 0.04 | |
| w = thickness_map.get((wall_type.lower(), sap_band.upper())) | |
| if w is None: | |
| # SAP-safe fallback | |
| w = thickness_map.get(("solid brick", sap_band.upper()), 0.22) | |
| # insulation resistance | |
| if ins_mm > 0: | |
| Rf = 0.001 * ins_mm / 0.035 | |
| else: | |
| Rf = 0.0 | |
| dt = w + lg * (Rsi + Rf + Rse) | |
| return float(dt) | |
| def susp_dg_rf(wall_type: str, sap_band: str, ins_mm: int, lg: float = 1.5) -> Tuple[float, float]: | |
| Rsi = 0.17 | |
| w = thickness_map.get((wall_type.lower(), sap_band.upper())) | |
| if w is None: | |
| w = thickness_map.get(("solid brick", sap_band.upper()), 0.22) | |
| # insulation resistance | |
| if ins_mm > 0: | |
| Rf = (0.001 * ins_mm / 0.035) + 0.2 | |
| else: | |
| Rf = 0.2 | |
| dg = w + lg * (Rsi + 0.04) | |
| return float(dg), float(Rf) | |
| return solid_dt, susp_dg_rf | |
| # ----------------------------- | |
| # Main pipeline (fast) | |
| # ----------------------------- | |
| def floor_feature_engineering_fast(df: pd.DataFrame, s3: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Fast floor feature engineering: | |
| - vectorized boundary classification | |
| - measured U extracted vectorized | |
| - exposed/semi_exposed uses S12 vectorized + small loop only over subset | |
| - ground floors: preserves full area resolution: | |
| * precompute B = sqrt(area)/2 (continuous) | |
| * cache dt/dg/Rf parameters by (wall_type, band, insulation_mm) | |
| * compute U with vectorized numpy formulas | |
| Requirements: columns | |
| - FLOOR_DESCRIPTION | |
| - WALLS_DESCRIPTION | |
| - TOTAL_FLOOR_AREA | |
| - sap_band_letter | |
| """ | |
| df = df.copy() | |
| # Ensure band normalized | |
| df["sap_band_letter"] = df["sap_band_letter"].astype(str).str.strip().str.upper() | |
| # 0) Precompute B (continuous, no binning) | |
| area = pd.to_numeric(df["TOTAL_FLOOR_AREA"], errors="coerce") | |
| df["FLOOR_B"] = np.sqrt(area) / 2.0 # SAP square-plan assumption | |
| # 1) Boundary type (vectorized) | |
| df["FLOOR_BOUNDARY_TYPE"] = classify_floor_boundary_vectorised(df["FLOOR_DESCRIPTION"]) | |
| # 2) Measured U (vectorized) | |
| measured_u = extract_measured_u_vectorised(df["FLOOR_DESCRIPTION"]) | |
| # 3) Insulation thickness: | |
| # - For ground floors we use effective S11 rule (vectorized) | |
| # - For exposed/semi-exposed we need measured mm for S12 binning (vectorized) | |
| mm_measured = extract_mm_vectorised(df["FLOOR_DESCRIPTION"]) | |
| eff_mm = effective_floor_insulation_mm_vectorised(df["FLOOR_DESCRIPTION"], df["sap_band_letter"]) | |
| # 4) FLOOR_INSULATION_TYPE (your current approach uses S12 binning) | |
| df["FLOOR_INSULATION_TYPE"] = bin_floor_insulation_s12_vectorised(df["FLOOR_DESCRIPTION"], mm_measured) | |
| # 5) Wall type for S3 thickness (vectorized) | |
| df["WALL_TYPE_S3"] = classify_wall_type_s3_vectorised(df["WALLS_DESCRIPTION"]) | |
| # 6) Build cached parameter functions | |
| solid_dt_cached, susp_dg_rf_cached = build_floor_param_caches(s3) | |
| # 7) Assemble FLOOR_U_VALUE (vectorized masks) | |
| u = pd.Series(np.nan, index=df.index, dtype=float) | |
| boundary = df["FLOOR_BOUNDARY_TYPE"] | |
| band = df["sap_band_letter"] | |
| # a) another dwelling below | |
| u[boundary == "another_dwelling_below"] = 0.0 | |
| # b) partially heated below | |
| u[boundary == "partially_heated_below"] = 0.7 | |
| # c) measured u | |
| u[boundary == "measured_u_value"] = measured_u[boundary == "measured_u_value"] | |
| # d) exposed / semi-exposed -> S12 | |
| exp_mask = boundary.isin(["exposed", "semi_exposed"]) | |
| if exp_mask.any(): | |
| ins_class = df.loc[exp_mask, "FLOOR_INSULATION_TYPE"] | |
| u.loc[exp_mask] = lookup_s12_u_vectorised(band[exp_mask], ins_class).values | |
| # e) ground floors -> ISO13370-ish SAP formulas (continuous area kept) | |
| ground_mask = boundary == "ground" | |
| if ground_mask.any(): | |
| floor_desc = df.loc[ground_mask, "FLOOR_DESCRIPTION"].fillna("").astype(str).str.lower() | |
| is_suspended = floor_desc.str.contains("suspended") | |
| gm_idx = df.index[ground_mask] | |
| solid_idx = gm_idx[~is_suspended.to_numpy()] | |
| susp_idx = gm_idx[is_suspended.to_numpy()] | |
| # ---- SOLID GROUND ---- | |
| if len(solid_idx) > 0: | |
| B = df.loc[solid_idx, "FLOOR_B"].to_numpy(dtype=float) | |
| wall_t = df.loc[solid_idx, "WALL_TYPE_S3"].astype(str).to_numpy() | |
| sb = df.loc[solid_idx, "sap_band_letter"].astype(str).to_numpy() | |
| mm = eff_mm.loc[solid_idx].fillna(0).astype(int).to_numpy() | |
| # cache dt per row (small Python loop, but only computing cache keys; | |
| # dt computation itself is cached & cheap, and number of unique keys is small) | |
| dt = np.empty(len(solid_idx), dtype=float) | |
| for i in range(len(solid_idx)): | |
| dt[i] = solid_dt_cached(wall_t[i], sb[i], int(mm[i])) | |
| u.loc[solid_idx] = _u_solid_ground_floor_vectorised(B, dt) | |
| # ---- SUSPENDED GROUND ---- | |
| if len(susp_idx) > 0: | |
| B = df.loc[susp_idx, "FLOOR_B"].to_numpy(dtype=float) | |
| wall_t = df.loc[susp_idx, "WALL_TYPE_S3"].astype(str).to_numpy() | |
| sb = df.loc[susp_idx, "sap_band_letter"].astype(str).to_numpy() | |
| mm = eff_mm.loc[susp_idx].fillna(0).astype(int).to_numpy() | |
| dg = np.empty(len(susp_idx), dtype=float) | |
| Rf = np.empty(len(susp_idx), dtype=float) | |
| for i in range(len(susp_idx)): | |
| dgi, Rfi = susp_dg_rf_cached(wall_t[i], sb[i], int(mm[i])) | |
| dg[i] = dgi | |
| Rf[i] = Rfi | |
| u.loc[susp_idx] = _u_suspended_ground_floor_vectorised(B, dg, Rf) | |
| df["FLOOR_U_VALUE"] = u | |
| return df | |
| # ============================================================ | |
| # Usage example: | |
| # s3 = pd.read_csv(...) or pd.read_excel(...) with columns: | |
| # Wall Type | sap_band | thickness_mm | |
| # df_total = floor_feature_engineering_fast(df_total, s3) | |
| # ============================================================ | |