import re import pandas as pd import numpy as np def extract_wall_u_from_text(text: str | float | None) -> float | None: """ Extract numeric U-value from WALLS_DESCRIPTION when it contains 'Average thermal transmittance ...'. Supports optional insulation thickness suffix: '..., 0 mm' '..., 50 mm' '..., 100 mm' etc. If insulation is present, applies R-addition. """ if pd.isna(text): return None s = str(text).lower() if "average thermal transmittance" not in s: return None # ------------------------------------------------------------ # 1. Extract baseline U-value # ------------------------------------------------------------ nums = re.findall(r"([0-9]*\.?[0-9]+)", s) if not nums: return None u_base = float(nums[0]) # EPC sometimes has '0.00' for missing if u_base < 0.05: return None # ------------------------------------------------------------ # 2. Extract insulation thickness (mm), default = 0 mm # ------------------------------------------------------------ mm_match = re.search(r"(\d+)\s*mm", s) mm = int(mm_match.group(1)) if mm_match else 0 # ------------------------------------------------------------ # 3. Apply R-addition if insulation present # ------------------------------------------------------------ R_INS_MAP = { 0: 0.0, 50: 1.4, 100: 2.8, 150: 4.2, 200: 5.6, } R_ins = R_INS_MAP.get(mm, 0.0) if R_ins > 0: R_old = 1.0 / u_base return 1.0 / (R_old + R_ins) return u_base def classify_wall_type(text): """ Classify EPC WALLS_DESCRIPTION into BASE wall construction type only. This function encodes *construction identity*, NOT insulation state and NOT performance. It is suitable for retrofit inference. Final categories: - solid - cavity - filled cavity - timber frame - system build - cob - unknown """ if pd.isna(text): return "unknown" t = text.lower().strip() # -------------------------------------------------------- # 0. Direct U-value entry → unknown construction # -------------------------------------------------------- if "average thermal transmittance" in t: return "unknown" # -------------------------------------------------------- # 1. Cob (distinct SAP category) # -------------------------------------------------------- if "cob" in t: return "cob" # -------------------------------------------------------- # 2. Solid masonry (brick / stone) # -------------------------------------------------------- if ( "briciau solet" in t or any(x in t for x in [ "solid brick", "solid stone", "sandstone", "limestone", "granite", "whinstone", "whin" ]) ): return "solid" # -------------------------------------------------------- # 3. Timber frame # -------------------------------------------------------- if "timber frame" in t: return "timber frame" # -------------------------------------------------------- # 4. System build (explicit SAP construction class) # -------------------------------------------------------- if "system build" in t or "system built" in t: return "system built" # -------------------------------------------------------- # 5. Cavity walls # -------------------------------------------------------- if "cavity" in t: if "filled cavity" in t: return "filled cavity" else: return "unfilled cavity" # -------------------------------------------------------- # 6. Basement / retaining walls (not envelope) # -------------------------------------------------------- if "basement wall" in t or "retaining wall" in t: return "unknown" return "unknown" def extract_wall_insulation(text): """ Extract wall insulation depth category from WALLS_DESCRIPTION. Returns one of: - "as built" - "50 mm" - "100 mm" - "150 mm" - "200 mm" - None (measured U-value only) Insulation state ONLY. No construction identity. """ if pd.isna(text): return None t = text.lower() # -------------------------------------------------------- # 0. Direct U-value entry → no insulation category # -------------------------------------------------------- if "average thermal transmittance" in t: return None # -------------------------------------------------------- # 1. Explicit thickness (must come FIRST) # -------------------------------------------------------- if "200 mm" in t: return "200 mm" if "150 mm" in t: return "150 mm" if "100 mm" in t: return "100 mm" if "50 mm" in t: return "50 mm" # -------------------------------------------------------- # 2. Generic insulation statements # -------------------------------------------------------- if "internal insulation" in t or "external insulation" in t: return "50 mm" if "partial insulation" in t or "insulated" in t: return "50 mm" # -------------------------------------------------------- # 3. Explicit no insulation # -------------------------------------------------------- if "no insulation" in t or "as built" in t: return "as built" # -------------------------------------------------------- # 4. Default # -------------------------------------------------------- return "as built" def lookup_wall_u_value(row, walls_u_values): wall_type = classify_wall_type(row["WALLS_DESCRIPTION"]) raw_age = row["sap_band_label"] # If EPC gives numeric U-value → use it numeric_u = extract_wall_u_from_text(row["WALLS_DESCRIPTION"]) if numeric_u is not None: return numeric_u # -------------------------------------------------------- # INTERNAL SAP AGE-BAND MAPPING (TABLE S1 – England & Wales) # -------------------------------------------------------- AGE_BAND_MAP = { "pre-1900": "before 1900", "before 1900": "before 1900", "1900-1929": "1900–1929", "1930-1949": "1930–1949", "1950-1966": "1950–1966", "1967-1975": "1967–1975", "1976-1982": "1976–1982", "1983-1990": "1983–1990", "1991-1995": "1991–1995", "1996-2002": "1996–2002", # Also catch accidental unicode/duplicate variations "1996–2002": "1996–2002", "2003-2006": "2003–2006", "2007-2011": "2007–2011", "2012+": "2012 onwards", "2012 onwards": "2012 onwards", } # -------------------------------------------------------- # Convert incoming SAP band → exact column name in U-value table # -------------------------------------------------------- age_label = AGE_BAND_MAP.get(raw_age, None) # If mapping fails, return NaN (should be extremely rare) if age_label is None: return np.nan # If wall type is None → cannot assign table U-value if wall_type is None: return np.nan # -------------------------------------------------------- # U-value lookup (exact match required) # -------------------------------------------------------- if age_label in walls_u_values.columns: vals = walls_u_values.loc[ walls_u_values["External wall type"] == wall_type, age_label ] if len(vals) > 0: return vals.values[0] return np.nan def prepare_wall_u_table(walls_u_values: pd.DataFrame) -> pd.DataFrame: return walls_u_values.melt( id_vars="External wall type", var_name="WALL_AGE_LABEL", value_name="WALL_U_TABLE" ) def build_wall_lookup(walls_desc: pd.Series) -> pd.DataFrame: """ Parse each unique WALLS_DESCRIPTION once. """ uniq = walls_desc.dropna().unique() rows = [] for desc in uniq: rows.append({ "WALLS_DESCRIPTION": desc, "WALL_TYPE": classify_wall_type(desc), "WALL_INSULATION": extract_wall_insulation(desc), "WALL_U_MEASURED": extract_wall_u_from_text(desc), }) return pd.DataFrame(rows) AGE_BAND_MAP = { "pre-1900": "before 1900", "before 1900": "before 1900", "1900-1929": "1900–1929", "1930-1949": "1930–1949", "1950-1966": "1950–1966", "1967-1975": "1967–1975", "1976-1982": "1976–1982", "1983-1990": "1983–1990", "1991-1995": "1991–1995", "1996-2002": "1996–2002", "1996–2002": "1996–2002", "2003-2006": "2003–2006", "2007-2011": "2007–2011", "2012+": "2012 onwards", "2012 onwards": "2012 onwards", } def merge_wall_type_for_sap(wall_type: str, insulation: str) -> str: """ Merge wall base type + insulation into SAP external wall type label. Used ONLY for SAP U-value lookup. """ if wall_type is None: return None if insulation in (None, "as built"): return f"{wall_type}- as built" return f"{wall_type}- {insulation} insulation" def wall_feature_engineering( df: pd.DataFrame, walls_u_values: pd.DataFrame, ) -> pd.DataFrame: """ Wall feature engineering using dictionary-based lookups only. No DataFrame merges (memory-safe and consistent with multi-key logic). Steps: 1. Parse WALLS_DESCRIPTION → wall semantics 2. Normalise SAP age band 3. Lookup SAP wall U-values via (WALL_TYPE, WALL_AGE_LABEL) 4. Final U-value resolution: measured > SAP table """ df = df.copy() # ------------------------------------------------------------ # 1. Parse wall descriptions ONCE (dictionary lookup) # ------------------------------------------------------------ # build_wall_lookup must return a DataFrame with: # ["WALLS_DESCRIPTION", "WALL_TYPE", "WALL_INSULATION", "WALL_U_MEASURED"] wall_lookup_df = build_wall_lookup(df["WALLS_DESCRIPTION"]) wall_lookup_dict = { desc: ( row["WALL_TYPE"], row["WALL_INSULATION"], row["WALL_U_MEASURED"], ) for desc, row in wall_lookup_df.set_index("WALLS_DESCRIPTION").iterrows() } parsed = df["WALLS_DESCRIPTION"].map(wall_lookup_dict) df["WALL_TYPE"] = parsed.str[0] df["WALL_INSULATION"] = parsed.str[1] df["WALL_U_MEASURED"] = parsed.str[2] # ------------------------------------------------------------ # 2. Normalise SAP age band (pure map, no join) # ------------------------------------------------------------ df["WALL_AGE_LABEL"] = df["sap_band_label"].map(AGE_BAND_MAP) # ------------------------------------------------------------ # 3. SAP wall U-value lookup via dictionary # ------------------------------------------------------------ # Prepare long SAP table once walls_u_long = prepare_wall_u_table(walls_u_values) wall_u_dict = { (row["External wall type"], row["WALL_AGE_LABEL"]): row["WALL_U_TABLE"] for _, row in walls_u_long.iterrows() } # wall_keys = zip(df["WALL_TYPE"], df["WALL_AGE_LABEL"]) old version # Merge wall type + insulation for SAP key (vectorised) df["WALL_TYPE_SAP"] = [ merge_wall_type_for_sap(wt, ins) for wt, ins in zip(df["WALL_TYPE"], df["WALL_INSULATION"]) ] wall_keys = zip(df["WALL_TYPE_SAP"], df["WALL_AGE_LABEL"]) df["WALL_U_TABLE"] = [wall_u_dict.get(k) for k in wall_keys] # ------------------------------------------------------------ # 4. Final U-value resolution (SAP rule) # ------------------------------------------------------------ df["WALL_U_VALUE"] = df["WALL_U_MEASURED"].combine_first(df["WALL_U_TABLE"]) # ------------------------------------------------------------ # 4.5 Vectorised insulation collapse for ML model # ------------------------------------------------------------ # Start with default = insulated df["WALL_INSULATION_MODEL"] = "insulated" # as built → as built mask_as_built = df["WALL_INSULATION"].isin(["as built"]) df.loc[mask_as_built, "WALL_INSULATION_MODEL"] = "as built" # unknown / NaN → unknown mask_unknown = df["WALL_INSULATION"].isna() | df["WALL_INSULATION"].isin(["unknown"]) df.loc[mask_unknown, "WALL_INSULATION_MODEL"] = "unknown" # ------------------------------------------------------------ # 5. Optional clean-up # ------------------------------------------------------------ df.drop(columns=["WALL_U_TABLE","WALL_INSULATION"], inplace=True, errors="ignore") return df