import pandas as pd import re import numpy as np def extract_roof_mm(text): """Return insulation thickness in mm, or None.""" if pd.isna(text): return None t = str(text).lower() # ignore U-value rows if "average thermal transmittance" in t: return None # match 300 mm, 300mm, 300+ mm, 300 + mm, 300+mm match = re.findall(r"(\d+)\s*\+?\s*mm", t) if not match: return None return int(match[0]) def classify_roof_type(text): if "pitched" in str(text).lower(): return "pitched" elif "flat" in str(text).lower(): return "flat" elif "roof" in str(text).lower(): return "roof" elif "above" in str(text).lower(): return "above" elif "average thermal transmittance" in str(text).lower(): return "measured_u" else: return "UKN" def normalize_mm_to_s9(mm): """ Normalize insulation thickness to SAP S9 valid categories. Input: mm : int, float, or None Output: int (SAP mm category) or None SAP S9 valid values: [0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400] Rules: - None → 0 - mm <= 0 → 0 - mm >= 400 → 400 - otherwise → nearest LOWER category """ SAP_S9_VALUES = [0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400] # Case: no value → treat as uninsulated if mm is None or (isinstance(mm, float) and np.isnan(mm)): return 0 # Convert to number mm = float(mm) # Negative or zero → uninsulated if mm <= 0: return 0 # ≥400 mm → use 400 category if mm >= 400: return 400 # Find largest S9 category <= mm eligible = [v for v in SAP_S9_VALUES if v <= mm] return eligible[-1] if eligible else 0 def classify_pitched_roof_category(text: str) -> str: """ Returns the appropriate U-value category: S9 (Measured) or S10 (Assumed/Other). """ text_lower = str(text).lower() # --- 1. S10 Triggers (Take precedence over measurement) --- if "rafters" in text_lower: return "S10_RAFTERS" # CORRECTED LOGIC: Each string must be checked against text_lower if ("assumed" in text_lower or "unknown loft insulation" in text_lower or "invalid input code" in text_lower): return "S10_JOISTS_UNKNOWN" # --- 2. S9 Triggers (Known/Measured Thickness) --- # Check for explicit 'no insulation' (observed) or '0 mm' if "no insulation" in text_lower or re.search(r"\b0\s*mm\b", text_lower): return "S9_NONE" # Check for any quantifiable number (mm) or a comparison (e.g., 300+) # This must come *before* the general 'pitched' check. mm_match = re.search(r'(\d+|\d+\+|\>\=\d+)', text_lower) if mm_match: return "S9_MEASURED" # --- 3. Default to S10 (General unquantified cases) --- # Catches descriptions like "pitched, loft insulation" or just "pitched" if "pitched" in text_lower: return "S10_JOISTS_UNKNOWN" return "NON_PITCHED_OR_UKN" def extract_pitch_u_value(text, sap_band_letter, S9_table, S10_table): """ Compute U-value for pitched roof using S9/S10 tables + description text. """ category = classify_pitched_roof_category(text) # ---- S9: No insulation (assumed) ---- if category == "S9_NONE": return 2.3 # ---- S9: Measured insulation thickness ---- elif category == "S9_MEASURED": mm = extract_roof_mm(text) mm = normalize_mm_to_s9(mm) value = S9_table.loc[S9_table["mm"] == mm, "slates_tiles"] return float(value.iloc[0]) if not value.empty else None # ---- S10: Rafters present ---- elif category == "S10_RAFTERS": # Older buildings (A–D) default to uninsulated if sap_band_letter in ["A", "B", "C", "D"]: return 2.3 value = S10_table.loc[S10_table["age_band"] == sap_band_letter, "Pitched_rafters"] return float(value.iloc[0]) if not value.empty else None # ---- S10: Unknown pitched roof form ---- else: if sap_band_letter in ["A", "B", "C", "D"]: return 2.3 value = S10_table.loc[S10_table["age_band"] == sap_band_letter, "Pitched_unknown"] return float(value.iloc[0]) if not value.empty else None def get_flat_roof_u_value(is_top_floor, sap_band_letter, s10): # 1. Not top floor → no heat loss if str(is_top_floor).strip().upper() == "N": return 0.0 # 2. Missing age band → can't compute if pd.isna(sap_band_letter): return None band = str(sap_band_letter).strip().upper() # Normalize S10 band column s10_bands = s10["age_band"].astype(str).str.strip().str.upper() # 3. Bands A–D → map to merged row "A, B, C, D" if band in ["A", "B", "C", "D"]: row = s10.loc[s10_bands == "A, B, C, D", "Flat_roof"] if not row.empty: return float(row.iloc[0]) else: return 2.3 # SAP fallback # 4. E–L: direct match row = s10.loc[s10_bands == band, "Flat_roof"] if not row.empty: return float(row.iloc[0]) # 5. SAP fallback for band L if missing in table if band == "L": return 0.18 # known SAP S10 value return None def extract_measured_u(description): if pd.isna(description): return None text = str(description).lower() if "average thermal transmittance" not in text: return None # match integer OR float match = re.search(r"(\d+(?:\.\d+)?)", text) if match: return float(match.group(1)) return None def get_room_in_roof_u_value(sap_band_letter, s10): # Check for "room in roof" in the description if sap_band_letter in ["A", "B", "C", "D"]: return 2.3 else: # Look up the U-value in the s10 DataFrame row = s10[s10["age_band"] == sap_band_letter] if not row.empty: u_value = row["Room_in_roof"].values[0] return u_value return None def calculate_overall_roof_u_value(row,s9,s10): roof_class = classify_roof_type(row["ROOF_DESCRIPTION"]) if roof_class == "pitched": return extract_pitch_u_value(row["ROOF_DESCRIPTION"], row["sap_band_letter"], s9, s10) elif roof_class == "flat": return get_flat_roof_u_value(row["FLAT_TOP_STOREY"], row["sap_band_letter"], s10) elif roof_class == "measured_u": return extract_measured_u(row["ROOF_DESCRIPTION"]) elif roof_class == "roof": return get_room_in_roof_u_value(row["sap_band_letter"], s10) elif roof_class == "above": return 0.0 else: return None def extract_roof_insulation(row): desc = row["ROOF_DESCRIPTION"] flat_top = row.get("FLAT_TOP_STOREY") t = str(desc).lower() # ------------------------------- # 0. ABOVE overrides everything # ------------------------------- if "above" in t or (flat_top is not None and str(flat_top).upper() == "N"): return "above" # ------------------------------- # 1. Measured U-value # ------------------------------- if "average thermal transmittance" in t: return "measured" # ------------------------------- # 2. explicit no insulation # ------------------------------- if "no insulation" in t: return "none" # ------------------------------- # 3. insulation at rafters # ------------------------------- if "insulated at rafters" in t: return "rafters" # ------------------------------- # 4. numerical mm thickness # ------------------------------- mm = extract_roof_mm(desc) if mm is not None: if mm == 0: return "none" return "loft_insulation" # ------------------------------- # 5. generic loft insulation # (no mm, still should count) # ------------------------------- if "loft insulation" in t: return "loft_insulation" # ------------------------------- # 6. UNKNOWN loft insulation # ------------------------------- if "unknown" in t and "loft" in t: return "unknown_loft" # ------------------------------- # 7. thatched roofs # ------------------------------- if "thatched" in t: return "thatched" # roof room variants with thatch if "roof room" in t and "thatched" in t: return "roof_room_thatched" # ------------------------------- # 8. limited insulation # ------------------------------- if "limited" in t: return "limited" # ------------------------------- # 9. generic insulated (not rafters) # ------------------------------- if "insulated" in t: return "insulated" # ------------------------------- # 10. roof room (no specific mm) # ------------------------------- if "roof room" in t: return "roof_room" # ------------------------------- # fallback # ------------------------------- return "unknown" S9_MM = np.array([0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400]) S9_U = np.array([2.3, 1.5, 1.0, 0.68, 0.50, 0.40, 0.30, 0.21, 0.17, 0.16, 0.14, 0.12, 0.11]) S9_LOOKUP = dict(zip(S9_MM, S9_U)) def build_roof_lookup(roof_desc: pd.Series) -> pd.DataFrame: """ Parse ROOF_DESCRIPTION once. Returns a lookup table keyed by ROOF_DESCRIPTION. """ s = roof_desc.fillna("").astype(str).str.lower() out = pd.DataFrame({ "ROOF_DESCRIPTION": roof_desc, "ROOF_CLASS": pd.NA, # pitched / flat / above / room / measured "ROOF_MM_RAW": pd.NA, "ROOF_MM_S9": pd.NA, "ROOF_PITCH_CATEGORY": pd.NA, # S9_MEASURED / S9_NONE / S10_RAFTERS / S10_UNKNOWN "ROOF_MEASURED_U": pd.NA, "ROOF_INSULATION_TYPE": pd.NA }).drop_duplicates("ROOF_DESCRIPTION") # --------------------------- # ROOF CLASS (priority order) # --------------------------- out.loc[s.str.contains("average thermal transmittance"), "ROOF_CLASS"] = "measured" out.loc[s.str.contains("above"), "ROOF_CLASS"] = "above" out.loc[s.str.contains("roof room"), "ROOF_CLASS"] = "room" out.loc[s.str.contains("flat"), "ROOF_CLASS"] = "flat" out.loc[s.str.contains("pitched"), "ROOF_CLASS"] = "pitched" # --------------------------- # MEASURED U-VALUE # --------------------------- m = ( s.where(s.str.contains("average thermal transmittance")) .str.extract(r"(\d+(?:\.\d+)?)", expand=False) ) out.loc[out["ROOF_CLASS"] == "measured", "ROOF_MEASURED_U"] = pd.to_numeric(m, errors="coerce") # --------------------------- # RAW MM EXTRACTION # --------------------------- mm = s.str.extract(r"(\d+)\s*\+?\s*mm", expand=False) out["ROOF_MM_RAW"] = pd.to_numeric(mm, errors="coerce") # --------------------------- # APPLY RETROFIT TO MEASURED U-VALUES # --------------------------- mask_measured_upgrade = ( out["ROOF_MEASURED_U"].notna() & out["ROOF_MM_RAW"].notna() ) if mask_measured_upgrade.any(): u_meas = out.loc[mask_measured_upgrade, "ROOF_MEASURED_U"].values mm_add = out.loc[mask_measured_upgrade, "ROOF_MM_RAW"].astype(int).values # inverse S9 (nearest) diff = np.abs(u_meas[:, None] - S9_U[None, :]) base_mm = S9_MM[diff.argmin(axis=1)] # add retrofit + clip new_mm = np.minimum(base_mm + mm_add, 400) # forward S9 lookup out.loc[mask_measured_upgrade, "ROOF_MEASURED_U"] = S9_U[ np.searchsorted(S9_MM, new_mm) ] # zero out insulation thickness for measured U-value rows out.loc[out["ROOF_CLASS"] == "measured", "ROOF_MM_RAW"] = pd.NA # --------------------------- # NORMALISE TO SAP S9 MM # --------------------------- SAP_S9_VALUES = np.array([0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400]) def to_s9(mm): if pd.isna(mm) or mm <= 0: return 0 if mm >= 400: return 400 return SAP_S9_VALUES[SAP_S9_VALUES <= mm].max() out["ROOF_MM_S9"] = out["ROOF_MM_RAW"].map(to_s9) # --------------------------- # PITCHED ROOF CATEGORY # --------------------------- pitched = out["ROOF_CLASS"] == "pitched" out.loc[pitched & s.str.contains("rafters"), "ROOF_PITCH_CATEGORY"] = "S10_RAFTERS" out.loc[pitched & s.str.contains("no insulation"), "ROOF_PITCH_CATEGORY"] = "S9_NONE" out.loc[pitched & out["ROOF_MM_RAW"].notna(), "ROOF_PITCH_CATEGORY"] = "S9_MEASURED" out.loc[ pitched & out["ROOF_PITCH_CATEGORY"].isna() & s.str.contains("assumed|unknown|invalid"), "ROOF_PITCH_CATEGORY" ] = "S10_UNKNOWN" out.loc[ pitched & out["ROOF_PITCH_CATEGORY"].isna(), "ROOF_PITCH_CATEGORY" ] = "S10_UNKNOWN" # --------------------------- # INSULATION TYPE (semantic) # --------------------------- out.loc[s.str.contains("rafters"), "ROOF_INSULATION_TYPE"] = "rafters" out.loc[s.str.contains("no insulation"), "ROOF_INSULATION_TYPE"] = "none" out.loc[s.str.contains("thatched"), "ROOF_INSULATION_TYPE"] = "thatched" out.loc[s.str.contains("loft"), "ROOF_INSULATION_TYPE"] = "loft" out.loc[out["ROOF_MM_RAW"].notna(), "ROOF_INSULATION_TYPE"] = "loft" return out def build_roof_u_dicts(s9: pd.DataFrame, s10: pd.DataFrame): S9_U = dict(zip(s9["mm"], s9["slates_tiles"])) S10_PITCHED = dict(zip(s10["age_band"], s10["Pitched_unknown"])) S10_RAFTERS = dict(zip(s10["age_band"], s10["Pitched_rafters"])) S10_FLAT = dict(zip(s10["age_band"], s10["Flat_roof"])) S10_ROOM = dict(zip(s10["age_band"], s10["Room_in_roof"])) return S9_U, S10_PITCHED, S10_RAFTERS, S10_FLAT, S10_ROOM def roof_feature_engineering( df: pd.DataFrame, s9: pd.DataFrame, s10: pd.DataFrame ) -> pd.DataFrame: df = df.copy() # ---------------------------------- # 1. Parse roof descriptions ONCE # ---------------------------------- roof_lookup = build_roof_lookup(df["ROOF_DESCRIPTION"]).set_index("ROOF_DESCRIPTION") df["ROOF_CLASS"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_CLASS"]) df["ROOF_MM_S9"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_MM_S9"]) df["ROOF_PITCH_CATEGORY"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_PITCH_CATEGORY"]) df["ROOF_MEASURED_U"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_MEASURED_U"]) df["ROOF_INSULATION_TYPE"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_INSULATION_TYPE"]) # ---------------------------------- # 2. SAP lookup dicts # ---------------------------------- S9_U, S10_PITCHED, S10_RAFTERS, S10_FLAT, S10_ROOM = build_roof_u_dicts(s9, s10) band = df["sap_band_letter"] # ---------------------------------- # 3. Vectorised U-value logic # ---------------------------------- u = pd.Series(np.nan, index=df.index) # ABOVE u[df["ROOF_CLASS"] == "above"] = 0.0 # MEASURED overrides everything # u[df["ROOF_MEASURED_U"].notna()] = df.loc[ # df["ROOF_MEASURED_U"].notna(), "ROOF_MEASURED_U" # ] mask = df["ROOF_MEASURED_U"].notna() u.loc[mask] = df.loc[mask, "ROOF_MEASURED_U"].astype(float) # FLAT (top storey only) mask = ( (df["ROOF_CLASS"] == "flat") & ( df["FLAT_TOP_STOREY"].isna() | (df["FLAT_TOP_STOREY"].astype(str).str.upper() == "Y") ) ) u[mask] = band[mask].map(S10_FLAT) # FLAT roofs with another dwelling above → no heat loss mask = ( (df["ROOF_CLASS"] == "flat") & (df["FLAT_TOP_STOREY"].astype(str).str.upper() == "N") ) u[mask] = 0.0 # ROOM IN ROOF mask = df["ROOF_CLASS"] == "room" u[mask] = band[mask].map(S10_ROOM) # PITCHED – S9 MEASURED mask = ( (df["ROOF_CLASS"] == "pitched") & (df["ROOF_PITCH_CATEGORY"] == "S9_MEASURED") ) u[mask] = df.loc[mask, "ROOF_MM_S9"].map(S9_U) # 🔥 FIX: PITCHED – NO INSULATION (S9_NONE) mask = ( (df["ROOF_CLASS"] == "pitched") & (df["ROOF_PITCH_CATEGORY"] == "S9_NONE") ) u[mask] = 2.3 # PITCHED – RAFTERS mask = ( (df["ROOF_CLASS"] == "pitched") & (df["ROOF_PITCH_CATEGORY"] == "S10_RAFTERS") ) u[mask] = band[mask].map(S10_RAFTERS) # PITCHED – UNKNOWN mask = ( (df["ROOF_CLASS"] == "pitched") & (df["ROOF_PITCH_CATEGORY"] == "S10_UNKNOWN") ) u[mask] = band[mask].map(S10_PITCHED) # ---------------------------------- # 4. SAP fallback for A–D # ---------------------------------- fallback = band.isin(["A", "B", "C", "D"]) & u.isna() u[fallback] = 2.3 df["ROOF_U_VALUE"] = u return df