Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import re | |
| import numpy as np | |
| def extract_roof_mm(text): | |
| """Return insulation thickness in mm, or None.""" | |
| if pd.isna(text): | |
| return None | |
| t = str(text).lower() | |
| # ignore U-value rows | |
| if "average thermal transmittance" in t: | |
| return None | |
| # match 300 mm, 300mm, 300+ mm, 300 + mm, 300+mm | |
| match = re.findall(r"(\d+)\s*\+?\s*mm", t) | |
| if not match: | |
| return None | |
| return int(match[0]) | |
| def classify_roof_type(text): | |
| if "pitched" in str(text).lower(): | |
| return "pitched" | |
| elif "flat" in str(text).lower(): | |
| return "flat" | |
| elif "roof" in str(text).lower(): | |
| return "roof" | |
| elif "above" in str(text).lower(): | |
| return "above" | |
| elif "average thermal transmittance" in str(text).lower(): | |
| return "measured_u" | |
| else: | |
| return "UKN" | |
| def normalize_mm_to_s9(mm): | |
| """ | |
| Normalize insulation thickness to SAP S9 valid categories. | |
| Input: | |
| mm : int, float, or None | |
| Output: | |
| int (SAP mm category) or None | |
| SAP S9 valid values: | |
| [0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400] | |
| Rules: | |
| - None β 0 | |
| - mm <= 0 β 0 | |
| - mm >= 400 β 400 | |
| - otherwise β nearest LOWER category | |
| """ | |
| SAP_S9_VALUES = [0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400] | |
| # Case: no value β treat as uninsulated | |
| if mm is None or (isinstance(mm, float) and np.isnan(mm)): | |
| return 0 | |
| # Convert to number | |
| mm = float(mm) | |
| # Negative or zero β uninsulated | |
| if mm <= 0: | |
| return 0 | |
| # β₯400 mm β use 400 category | |
| if mm >= 400: | |
| return 400 | |
| # Find largest S9 category <= mm | |
| eligible = [v for v in SAP_S9_VALUES if v <= mm] | |
| return eligible[-1] if eligible else 0 | |
| def classify_pitched_roof_category(text: str) -> str: | |
| """ | |
| Returns the appropriate U-value category: S9 (Measured) or S10 (Assumed/Other). | |
| """ | |
| text_lower = str(text).lower() | |
| # --- 1. S10 Triggers (Take precedence over measurement) --- | |
| if "rafters" in text_lower: | |
| return "S10_RAFTERS" | |
| # CORRECTED LOGIC: Each string must be checked against text_lower | |
| if ("assumed" in text_lower or | |
| "unknown loft insulation" in text_lower or | |
| "invalid input code" in text_lower): | |
| return "S10_JOISTS_UNKNOWN" | |
| # --- 2. S9 Triggers (Known/Measured Thickness) --- | |
| # Check for explicit 'no insulation' (observed) or '0 mm' | |
| if "no insulation" in text_lower or re.search(r"\b0\s*mm\b", text_lower): | |
| return "S9_NONE" | |
| # Check for any quantifiable number (mm) or a comparison (e.g., 300+) | |
| # This must come *before* the general 'pitched' check. | |
| mm_match = re.search(r'(\d+|\d+\+|\>\=\d+)', text_lower) | |
| if mm_match: | |
| return "S9_MEASURED" | |
| # --- 3. Default to S10 (General unquantified cases) --- | |
| # Catches descriptions like "pitched, loft insulation" or just "pitched" | |
| if "pitched" in text_lower: | |
| return "S10_JOISTS_UNKNOWN" | |
| return "NON_PITCHED_OR_UKN" | |
| def extract_pitch_u_value(text, sap_band_letter, S9_table, S10_table): | |
| """ | |
| Compute U-value for pitched roof using S9/S10 tables + description text. | |
| """ | |
| category = classify_pitched_roof_category(text) | |
| # ---- S9: No insulation (assumed) ---- | |
| if category == "S9_NONE": | |
| return 2.3 | |
| # ---- S9: Measured insulation thickness ---- | |
| elif category == "S9_MEASURED": | |
| mm = extract_roof_mm(text) | |
| mm = normalize_mm_to_s9(mm) | |
| value = S9_table.loc[S9_table["mm"] == mm, "slates_tiles"] | |
| return float(value.iloc[0]) if not value.empty else None | |
| # ---- S10: Rafters present ---- | |
| elif category == "S10_RAFTERS": | |
| # Older buildings (AβD) default to uninsulated | |
| if sap_band_letter in ["A", "B", "C", "D"]: | |
| return 2.3 | |
| value = S10_table.loc[S10_table["age_band"] == sap_band_letter, "Pitched_rafters"] | |
| return float(value.iloc[0]) if not value.empty else None | |
| # ---- S10: Unknown pitched roof form ---- | |
| else: | |
| if sap_band_letter in ["A", "B", "C", "D"]: | |
| return 2.3 | |
| value = S10_table.loc[S10_table["age_band"] == sap_band_letter, "Pitched_unknown"] | |
| return float(value.iloc[0]) if not value.empty else None | |
| def get_flat_roof_u_value(is_top_floor, sap_band_letter, s10): | |
| # 1. Not top floor β no heat loss | |
| if str(is_top_floor).strip().upper() == "N": | |
| return 0.0 | |
| # 2. Missing age band β can't compute | |
| if pd.isna(sap_band_letter): | |
| return None | |
| band = str(sap_band_letter).strip().upper() | |
| # Normalize S10 band column | |
| s10_bands = s10["age_band"].astype(str).str.strip().str.upper() | |
| # 3. Bands AβD β map to merged row "A, B, C, D" | |
| if band in ["A", "B", "C", "D"]: | |
| row = s10.loc[s10_bands == "A, B, C, D", "Flat_roof"] | |
| if not row.empty: | |
| return float(row.iloc[0]) | |
| else: | |
| return 2.3 # SAP fallback | |
| # 4. EβL: direct match | |
| row = s10.loc[s10_bands == band, "Flat_roof"] | |
| if not row.empty: | |
| return float(row.iloc[0]) | |
| # 5. SAP fallback for band L if missing in table | |
| if band == "L": | |
| return 0.18 # known SAP S10 value | |
| return None | |
| def extract_measured_u(description): | |
| if pd.isna(description): | |
| return None | |
| text = str(description).lower() | |
| if "average thermal transmittance" not in text: | |
| return None | |
| # match integer OR float | |
| match = re.search(r"(\d+(?:\.\d+)?)", text) | |
| if match: | |
| return float(match.group(1)) | |
| return None | |
| def get_room_in_roof_u_value(sap_band_letter, s10): | |
| # Check for "room in roof" in the description | |
| if sap_band_letter in ["A", "B", "C", "D"]: | |
| return 2.3 | |
| else: | |
| # Look up the U-value in the s10 DataFrame | |
| row = s10[s10["age_band"] == sap_band_letter] | |
| if not row.empty: | |
| u_value = row["Room_in_roof"].values[0] | |
| return u_value | |
| return None | |
| def calculate_overall_roof_u_value(row,s9,s10): | |
| roof_class = classify_roof_type(row["ROOF_DESCRIPTION"]) | |
| if roof_class == "pitched": | |
| return extract_pitch_u_value(row["ROOF_DESCRIPTION"], row["sap_band_letter"], s9, s10) | |
| elif roof_class == "flat": | |
| return get_flat_roof_u_value(row["FLAT_TOP_STOREY"], row["sap_band_letter"], s10) | |
| elif roof_class == "measured_u": | |
| return extract_measured_u(row["ROOF_DESCRIPTION"]) | |
| elif roof_class == "roof": | |
| return get_room_in_roof_u_value(row["sap_band_letter"], s10) | |
| elif roof_class == "above": | |
| return 0.0 | |
| else: | |
| return None | |
| def extract_roof_insulation(row): | |
| desc = row["ROOF_DESCRIPTION"] | |
| flat_top = row.get("FLAT_TOP_STOREY") | |
| t = str(desc).lower() | |
| # ------------------------------- | |
| # 0. ABOVE overrides everything | |
| # ------------------------------- | |
| if "above" in t or (flat_top is not None and str(flat_top).upper() == "N"): | |
| return "above" | |
| # ------------------------------- | |
| # 1. Measured U-value | |
| # ------------------------------- | |
| if "average thermal transmittance" in t: | |
| return "measured" | |
| # ------------------------------- | |
| # 2. explicit no insulation | |
| # ------------------------------- | |
| if "no insulation" in t: | |
| return "none" | |
| # ------------------------------- | |
| # 3. insulation at rafters | |
| # ------------------------------- | |
| if "insulated at rafters" in t: | |
| return "rafters" | |
| # ------------------------------- | |
| # 4. numerical mm thickness | |
| # ------------------------------- | |
| mm = extract_roof_mm(desc) | |
| if mm is not None: | |
| if mm == 0: | |
| return "none" | |
| return "loft_insulation" | |
| # ------------------------------- | |
| # 5. generic loft insulation | |
| # (no mm, still should count) | |
| # ------------------------------- | |
| if "loft insulation" in t: | |
| return "loft_insulation" | |
| # ------------------------------- | |
| # 6. UNKNOWN loft insulation | |
| # ------------------------------- | |
| if "unknown" in t and "loft" in t: | |
| return "unknown_loft" | |
| # ------------------------------- | |
| # 7. thatched roofs | |
| # ------------------------------- | |
| if "thatched" in t: | |
| return "thatched" | |
| # roof room variants with thatch | |
| if "roof room" in t and "thatched" in t: | |
| return "roof_room_thatched" | |
| # ------------------------------- | |
| # 8. limited insulation | |
| # ------------------------------- | |
| if "limited" in t: | |
| return "limited" | |
| # ------------------------------- | |
| # 9. generic insulated (not rafters) | |
| # ------------------------------- | |
| if "insulated" in t: | |
| return "insulated" | |
| # ------------------------------- | |
| # 10. roof room (no specific mm) | |
| # ------------------------------- | |
| if "roof room" in t: | |
| return "roof_room" | |
| # ------------------------------- | |
| # fallback | |
| # ------------------------------- | |
| return "unknown" | |
| S9_MM = np.array([0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400]) | |
| S9_U = np.array([2.3, 1.5, 1.0, 0.68, 0.50, 0.40, 0.30, 0.21, 0.17, 0.16, 0.14, 0.12, 0.11]) | |
| S9_LOOKUP = dict(zip(S9_MM, S9_U)) | |
| def build_roof_lookup(roof_desc: pd.Series) -> pd.DataFrame: | |
| """ | |
| Parse ROOF_DESCRIPTION once. | |
| Returns a lookup table keyed by ROOF_DESCRIPTION. | |
| """ | |
| s = roof_desc.fillna("").astype(str).str.lower() | |
| out = pd.DataFrame({ | |
| "ROOF_DESCRIPTION": roof_desc, | |
| "ROOF_CLASS": pd.NA, # pitched / flat / above / room / measured | |
| "ROOF_MM_RAW": pd.NA, | |
| "ROOF_MM_S9": pd.NA, | |
| "ROOF_PITCH_CATEGORY": pd.NA, # S9_MEASURED / S9_NONE / S10_RAFTERS / S10_UNKNOWN | |
| "ROOF_MEASURED_U": pd.NA, | |
| "ROOF_INSULATION_TYPE": pd.NA | |
| }).drop_duplicates("ROOF_DESCRIPTION") | |
| # --------------------------- | |
| # ROOF CLASS (priority order) | |
| # --------------------------- | |
| out.loc[s.str.contains("average thermal transmittance"), "ROOF_CLASS"] = "measured" | |
| out.loc[s.str.contains("above"), "ROOF_CLASS"] = "above" | |
| out.loc[s.str.contains("roof room"), "ROOF_CLASS"] = "room" | |
| out.loc[s.str.contains("flat"), "ROOF_CLASS"] = "flat" | |
| out.loc[s.str.contains("pitched"), "ROOF_CLASS"] = "pitched" | |
| # --------------------------- | |
| # MEASURED U-VALUE | |
| # --------------------------- | |
| m = ( | |
| s.where(s.str.contains("average thermal transmittance")) | |
| .str.extract(r"(\d+(?:\.\d+)?)", expand=False) | |
| ) | |
| out.loc[out["ROOF_CLASS"] == "measured", "ROOF_MEASURED_U"] = pd.to_numeric(m, errors="coerce") | |
| # --------------------------- | |
| # RAW MM EXTRACTION | |
| # --------------------------- | |
| mm = s.str.extract(r"(\d+)\s*\+?\s*mm", expand=False) | |
| out["ROOF_MM_RAW"] = pd.to_numeric(mm, errors="coerce") | |
| # --------------------------- | |
| # APPLY RETROFIT TO MEASURED U-VALUES | |
| # --------------------------- | |
| mask_measured_upgrade = ( | |
| out["ROOF_MEASURED_U"].notna() & | |
| out["ROOF_MM_RAW"].notna() | |
| ) | |
| if mask_measured_upgrade.any(): | |
| u_meas = out.loc[mask_measured_upgrade, "ROOF_MEASURED_U"].values | |
| mm_add = out.loc[mask_measured_upgrade, "ROOF_MM_RAW"].astype(int).values | |
| # inverse S9 (nearest) | |
| diff = np.abs(u_meas[:, None] - S9_U[None, :]) | |
| base_mm = S9_MM[diff.argmin(axis=1)] | |
| # add retrofit + clip | |
| new_mm = np.minimum(base_mm + mm_add, 400) | |
| # forward S9 lookup | |
| out.loc[mask_measured_upgrade, "ROOF_MEASURED_U"] = S9_U[ | |
| np.searchsorted(S9_MM, new_mm) | |
| ] | |
| # zero out insulation thickness for measured U-value rows | |
| out.loc[out["ROOF_CLASS"] == "measured", "ROOF_MM_RAW"] = pd.NA | |
| # --------------------------- | |
| # NORMALISE TO SAP S9 MM | |
| # --------------------------- | |
| SAP_S9_VALUES = np.array([0, 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400]) | |
| def to_s9(mm): | |
| if pd.isna(mm) or mm <= 0: | |
| return 0 | |
| if mm >= 400: | |
| return 400 | |
| return SAP_S9_VALUES[SAP_S9_VALUES <= mm].max() | |
| out["ROOF_MM_S9"] = out["ROOF_MM_RAW"].map(to_s9) | |
| # --------------------------- | |
| # PITCHED ROOF CATEGORY | |
| # --------------------------- | |
| pitched = out["ROOF_CLASS"] == "pitched" | |
| out.loc[pitched & s.str.contains("rafters"), "ROOF_PITCH_CATEGORY"] = "S10_RAFTERS" | |
| out.loc[pitched & s.str.contains("no insulation"), "ROOF_PITCH_CATEGORY"] = "S9_NONE" | |
| out.loc[pitched & out["ROOF_MM_RAW"].notna(), "ROOF_PITCH_CATEGORY"] = "S9_MEASURED" | |
| out.loc[ | |
| pitched & | |
| out["ROOF_PITCH_CATEGORY"].isna() & | |
| s.str.contains("assumed|unknown|invalid"), | |
| "ROOF_PITCH_CATEGORY" | |
| ] = "S10_UNKNOWN" | |
| out.loc[ | |
| pitched & out["ROOF_PITCH_CATEGORY"].isna(), | |
| "ROOF_PITCH_CATEGORY" | |
| ] = "S10_UNKNOWN" | |
| # --------------------------- | |
| # INSULATION TYPE (semantic) | |
| # --------------------------- | |
| out.loc[s.str.contains("rafters"), "ROOF_INSULATION_TYPE"] = "rafters" | |
| out.loc[s.str.contains("no insulation"), "ROOF_INSULATION_TYPE"] = "none" | |
| out.loc[s.str.contains("thatched"), "ROOF_INSULATION_TYPE"] = "thatched" | |
| out.loc[s.str.contains("loft"), "ROOF_INSULATION_TYPE"] = "loft" | |
| out.loc[out["ROOF_MM_RAW"].notna(), "ROOF_INSULATION_TYPE"] = "loft" | |
| return out | |
| def build_roof_u_dicts(s9: pd.DataFrame, s10: pd.DataFrame): | |
| S9_U = dict(zip(s9["mm"], s9["slates_tiles"])) | |
| S10_PITCHED = dict(zip(s10["age_band"], s10["Pitched_unknown"])) | |
| S10_RAFTERS = dict(zip(s10["age_band"], s10["Pitched_rafters"])) | |
| S10_FLAT = dict(zip(s10["age_band"], s10["Flat_roof"])) | |
| S10_ROOM = dict(zip(s10["age_band"], s10["Room_in_roof"])) | |
| return S9_U, S10_PITCHED, S10_RAFTERS, S10_FLAT, S10_ROOM | |
| def roof_feature_engineering( | |
| df: pd.DataFrame, | |
| s9: pd.DataFrame, | |
| s10: pd.DataFrame | |
| ) -> pd.DataFrame: | |
| df = df.copy() | |
| # ---------------------------------- | |
| # 1. Parse roof descriptions ONCE | |
| # ---------------------------------- | |
| roof_lookup = build_roof_lookup(df["ROOF_DESCRIPTION"]).set_index("ROOF_DESCRIPTION") | |
| df["ROOF_CLASS"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_CLASS"]) | |
| df["ROOF_MM_S9"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_MM_S9"]) | |
| df["ROOF_PITCH_CATEGORY"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_PITCH_CATEGORY"]) | |
| df["ROOF_MEASURED_U"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_MEASURED_U"]) | |
| df["ROOF_INSULATION_TYPE"] = df["ROOF_DESCRIPTION"].map(roof_lookup["ROOF_INSULATION_TYPE"]) | |
| # ---------------------------------- | |
| # 2. SAP lookup dicts | |
| # ---------------------------------- | |
| S9_U, S10_PITCHED, S10_RAFTERS, S10_FLAT, S10_ROOM = build_roof_u_dicts(s9, s10) | |
| band = df["sap_band_letter"] | |
| # ---------------------------------- | |
| # 3. Vectorised U-value logic | |
| # ---------------------------------- | |
| u = pd.Series(np.nan, index=df.index) | |
| # ABOVE | |
| u[df["ROOF_CLASS"] == "above"] = 0.0 | |
| # MEASURED overrides everything | |
| # u[df["ROOF_MEASURED_U"].notna()] = df.loc[ | |
| # df["ROOF_MEASURED_U"].notna(), "ROOF_MEASURED_U" | |
| # ] | |
| mask = df["ROOF_MEASURED_U"].notna() | |
| u.loc[mask] = df.loc[mask, "ROOF_MEASURED_U"].astype(float) | |
| # FLAT (top storey only) | |
| mask = ( | |
| (df["ROOF_CLASS"] == "flat") & | |
| ( | |
| df["FLAT_TOP_STOREY"].isna() | | |
| (df["FLAT_TOP_STOREY"].astype(str).str.upper() == "Y") | |
| ) | |
| ) | |
| u[mask] = band[mask].map(S10_FLAT) | |
| # FLAT roofs with another dwelling above β no heat loss | |
| mask = ( | |
| (df["ROOF_CLASS"] == "flat") & | |
| (df["FLAT_TOP_STOREY"].astype(str).str.upper() == "N") | |
| ) | |
| u[mask] = 0.0 | |
| # ROOM IN ROOF | |
| mask = df["ROOF_CLASS"] == "room" | |
| u[mask] = band[mask].map(S10_ROOM) | |
| # PITCHED β S9 MEASURED | |
| mask = ( | |
| (df["ROOF_CLASS"] == "pitched") & | |
| (df["ROOF_PITCH_CATEGORY"] == "S9_MEASURED") | |
| ) | |
| u[mask] = df.loc[mask, "ROOF_MM_S9"].map(S9_U) | |
| # π₯ FIX: PITCHED β NO INSULATION (S9_NONE) | |
| mask = ( | |
| (df["ROOF_CLASS"] == "pitched") & | |
| (df["ROOF_PITCH_CATEGORY"] == "S9_NONE") | |
| ) | |
| u[mask] = 2.3 | |
| # PITCHED β RAFTERS | |
| mask = ( | |
| (df["ROOF_CLASS"] == "pitched") & | |
| (df["ROOF_PITCH_CATEGORY"] == "S10_RAFTERS") | |
| ) | |
| u[mask] = band[mask].map(S10_RAFTERS) | |
| # PITCHED β UNKNOWN | |
| mask = ( | |
| (df["ROOF_CLASS"] == "pitched") & | |
| (df["ROOF_PITCH_CATEGORY"] == "S10_UNKNOWN") | |
| ) | |
| u[mask] = band[mask].map(S10_PITCHED) | |
| # ---------------------------------- | |
| # 4. SAP fallback for AβD | |
| # ---------------------------------- | |
| fallback = band.isin(["A", "B", "C", "D"]) & u.isna() | |
| u[fallback] = 2.3 | |
| df["ROOF_U_VALUE"] = u | |
| return df | |