import re import numpy as np import pandas as pd def classify_main_heating_system(text): if text is None or not isinstance(text, str): return "other" t = text.lower() if "community" in t: return "community_heating" if "heat pump" in t: return "heat_pump" if "boiler" in t: return "boiler" if "warm air" in t or "electricaire" in t: return "warm_air" if "storage heater" in t or "electric storage" in t: return "storage_heater" if "room heaters" in t: return "room_heater" if "electric" in t and "heater" in t: return "direct_electric" if "sap05" in t: return "other" return "other" def classify_secondary_heating(text): if text is None or not isinstance(text, str): return "none" t = text.lower() # --- explicit none / missing --- if t in ["none", "dim"] or "no system" in t: return "none" # --- solid fuels --- if any(x in t for x in [ "coal", "anthracite", "wood", "pellet", "chips", "smokeless" ]): return "solid_fuel" # --- oil --- if "oil" in t: return "oil_room_heater" # --- gas & LPG --- if any(x in t for x in [ "mains gas", "lpg", "lng", "bottled gas" ]): return "gas_room_heater" # --- electric --- if "electric" in t: return "direct_electric" # --- SAP placeholders --- if "sap05" in t: return "other" return "other" def classify_main_fuel_type(text): """ Classify EPC MAINHEAT_DESCRIPTION into SAP-compatible main fuel types. """ if text is None or not isinstance(text, str): return "other" t = text.lower() # --- 1. Community heating --- if "community" in t: return "heat_network" # --- 2. Heat pumps (always electricity in EPC) --- if "heat pump" in t: return "electricity" # --- 3. Electricity --- if any(x in t for x in [ "electric", "electricaire", "storage heater", "electric underfloor", "electric ceiling", ]): return "electricity" # --- 4. Mains gas --- if "mains gas" in t or "nwy prif" in t: return "mains_gas" # --- 5. LPG --- if any(x in t for x in [ "lpg", "bottled lpg", "bottled gas", ]): return "lpg" # --- 6. Oil --- if "oil" in t: return "oil" # --- 7. Biomass --- if any(x in t for x in [ "biomass", "wood pellets", "wood chips", ]): return "biomass" # --- 8. Solid fuels --- if any(x in t for x in [ "coal", "anthracite", "smokeless", "wood logs", "dual fuel", ]): return "solid_fuel" return "other" def classify_dhw_system(text): """ Classify EPC HOTWATER_DESCRIPTION into SAP / ML compatible DHW system types. """ if text is None or not isinstance(text, str): return "other" t = text.lower() # --- SAP placeholders / missing --- if "sap05" in t or "no system present" in t: return "other" # --- Community DHW --- if "community" in t: return "community" # --- Heat pump DHW --- if "heat pump" in t: return "heat_pump" # --- Solar-assisted DHW --- # (still fundamentally main heating or immersion, but solar flag dominates) if "solar" in t: return "solar_assisted" # --- Gas instantaneous / multipoint --- if any(x in t for x in [ "gas instantaneous", "gas multipoint", "single-point gas", ]): return "gas_instantaneous" # --- Electric instantaneous (point of use) --- if "electric instantaneous" in t: return "direct_electric" # --- Electric immersion (storage) --- if "electric immersion" in t: return "electric_storage" # --- From main / secondary heating system --- if any(x in t for x in [ "from main system", "from secondary system", "boiler/circulator", "range cooker", "o'r brif system", "og’r brif system", "second main heating system", ]): return "main_heating" return "other" def classify_ventilation_system(text): """ SAP / RdSAP 2012 ventilation system classification. """ if text is None or not isinstance(text, str): return "natural" t = text.lower() if "heat recovery" in t or "mvhr" in t: return "mvhr" if "positive input" in t: return "piv" if "supply and extract" in t: return "mech_supply_extract" if "extract" in t: return "mech_extract" if "mechanical" in t: return "mech_extract" # includes 'natural' and 'NO DATA!' return "natural" def extract_low_energy_lighting_fraction(text): """ Extract fraction of low-energy lighting from EPC LIGHTING_DESCRIPTION. Returns float in [0,1] or None if unknown. """ if text is None or not isinstance(text, str): return None t = text.lower() # --- Explicit none --- if "no low energy lighting" in t: return 0.0 # --- All outlets --- if "all fixed outlets" in t or "ym mhob" in t: return 1.0 # --- Percentage extraction (robust) --- m = re.search(r"(\d+(\.\d+)?)\s*%", t) if m: pct = float(m.group(1)) return max(0.0, min(pct / 100.0, 1.0)) # --- Qualitative EPC fallbacks --- if "excellent lighting efficiency" in t or "excelent lighting efficiency" in t: return 1.0 if "good lighting efficiency" in t: return 0.8 if "below average lighting efficiency" in t: return 0.2 # --- SAP placeholders --- if "sap05" in t: return None return None def estimate_pv_kwp_from_row(row): """ Estimate installed PV capacity (kWp) from a single EPC row using SAP S11(b)-compliant logic. Required EPC fields in `row`: - TOTAL_FLOOR_AREA - PROPERTY_TYPE - PHOTO_SUPPLY - FLAT_STOREY_COUNT - ROOF_DESCRIPTION """ # ------------------------------- # 0. Guard clauses # ------------------------------- tfa = row.get("TOTAL_FLOOR_AREA") photo_supply = row.get("PHOTO_SUPPLY") if ( tfa is None or photo_supply is None or tfa <= 0 or photo_supply <= 0 ): return 0.0 property_type = str(row.get("PROPERTY_TYPE", "")).lower() roof_desc = str(row.get("ROOF_DESCRIPTION", "")).lower() # ------------------------------- # 1. Horizontal roof projection # ------------------------------- if property_type == "flat": storeys = row.get("FLAT_STOREY_COUNT") if storeys is None or storeys <= 0: return 0.0 # cannot apportion roof vertically roof_projection = tfa / storeys else: # House, bungalow, maisonette roof_projection = tfa / 2.0 # ------------------------------- # 2. Roof pitch inference (geometry only) # ------------------------------- if "flat" in roof_desc: roof_is_pitched = False elif any(x in roof_desc for x in ["pitched", "rafters", "roof room"]): roof_is_pitched = True else: # fallback by property type roof_is_pitched = property_type in ["house", "bungalow", "maisonette"] pitch_factor = ( 1.0 / np.cos(np.deg2rad(35)) if roof_is_pitched else 1.0 ) # ------------------------------- # 3. PV-covered area (SAP S11) # ------------------------------- pv_area = ( roof_projection * (photo_supply / 100.0) * pitch_factor ) # ------------------------------- # 4. Convert area → capacity # ------------------------------- pv_kwp = 0.12 * pv_area return pv_kwp def energy_system_feature_engineering(df): df = df.copy() df["MAIN_HEATING_SYSTEM"] = df["MAINHEAT_DESCRIPTION"].apply(classify_main_heating_system) df["SECONDARY_HEATING_SYSTEM"] = df["SECONDHEAT_DESCRIPTION"].apply(classify_secondary_heating) df["MAIN_FUEL_TYPE"] = df["MAINHEAT_DESCRIPTION"].apply(classify_main_fuel_type) df["DHW_SUPPLY_SYSTEM"] = df["HOTWATER_DESCRIPTION"].apply(classify_dhw_system) df["VENTILATION_SYSTEM"] = df["MECHANICAL_VENTILATION"].apply(classify_ventilation_system) df["LIGHTENING_TYPE"] = df["LIGHTING_DESCRIPTION"].apply(extract_low_energy_lighting_fraction) df["PV_KWP"] = df.apply(estimate_pv_kwp_from_row, axis=1) return df def classify_main_heating_system_vectorised(series: pd.Series) -> pd.Series: s = series.fillna("").str.lower() out = pd.Series("other", index=s.index) # IMPORTANT: apply in the SAME ORDER as scalar version mask = out.eq("other") & s.str.contains("community") out[mask] = "community_heating" mask = out.eq("other") & s.str.contains("heat pump") out[mask] = "heat_pump" mask = out.eq("other") & s.str.contains("boiler") out[mask] = "boiler" mask = out.eq("other") & s.str.contains("warm air|electricaire") out[mask] = "warm_air" mask = out.eq("other") & s.str.contains("storage heater|electric storage") out[mask] = "storage_heater" mask = out.eq("other") & s.str.contains("room heaters") out[mask] = "room_heater" mask = out.eq("other") & s.str.contains("electric") & s.str.contains("heater") out[mask] = "direct_electric" # sap05 and everything else remain "other" return out def classify_secondary_heating_vectorised(series: pd.Series) -> pd.Series: s = series.fillna("").str.lower().str.strip() # SAP default: no secondary heating out = pd.Series("none", index=s.index) # Solid fuels (incl. bioethanol, B30K) mask = out.eq("none") & s.str.contains( r"coal|anthracite|wood|pellet|chips|smokeless|bioethanol|b30k" ) out[mask] = "solid_fuel" # Oil mask = out.eq("none") & s.str.contains("oil") out[mask] = "oil_room_heater" # Gas & LPG (English + Welsh) mask = out.eq("none") & s.str.contains( r"mains gas|lpg|lng|bottled gas|nwy prif" ) out[mask] = "gas_room_heater" # Electric mask = out.eq("none") & s.str.contains("electric") out[mask] = "direct_electric" # Everything else stays "none" by design return out def classify_main_fuel_type_vectorised(series: pd.Series) -> pd.Series: s = series.fillna("").str.lower() out = pd.Series("other", index=s.index) # 1. Community heating (highest priority) m = s.str.contains("community") out[m] = "heat_network" # 2. Heat pumps → electricity m = s.str.contains("heat pump") & (out == "other") out[m] = "electricity" # 3. Electricity (direct / storage / underfloor) m = s.str.contains( "electric|electricaire|storage heater|electric underfloor|electric ceiling" ) & (out == "other") out[m] = "electricity" # 4. Mains gas m = s.str.contains("mains gas|nwy prif") & (out == "other") out[m] = "mains_gas" # 5. LPG m = s.str.contains("lpg|bottled lpg|bottled gas") & (out == "other") out[m] = "lpg" # 6. Oil m = s.str.contains("oil") & (out == "other") out[m] = "oil" # 7. Biomass m = s.str.contains("biomass|wood pellets|wood chips") & (out == "other") out[m] = "biomass" # 8. Solid fuels m = s.str.contains("coal|anthracite|smokeless|wood logs|dual fuel") & (out == "other") out[m] = "solid_fuel" return out def classify_dhw_system_vectorised(series: pd.Series) -> pd.Series: s = series.fillna("").str.lower() out = pd.Series("other", index=s.index) # 0. SAP placeholders / missing m = s.str.contains("sap05|no system present") out[m] = "other" # 1. Community DHW m = s.str.contains("community") & (out == "other") out[m] = "community" # 2. Heat pump DHW m = s.str.contains("heat pump") & (out == "other") out[m] = "heat_pump" # 3. Solar-assisted DHW (dominant flag) m = s.str.contains("solar") & (out == "other") out[m] = "solar_assisted" # 4. Gas instantaneous / multipoint m = s.str.contains( "gas instantaneous|gas multipoint|single-point gas" ) & (out == "other") out[m] = "gas_instantaneous" # 5. Electric instantaneous (point-of-use) m = s.str.contains("electric instantaneous") & (out == "other") out[m] = "direct_electric" # 6. Electric immersion (storage) m = s.str.contains("electric immersion") & (out == "other") out[m] = "electric_storage" # 7. From main / secondary heating system (fallback) m = s.str.contains( "from main system|from secondary system|boiler/circulator|range cooker|" "o'r brif system|og’r brif system|second main heating system" ) & (out == "other") out[m] = "main_heating" return out def classify_ventilation_system_vectorised(series: pd.Series) -> pd.Series: s = series.fillna("").str.lower() out = pd.Series("natural", index=s.index) # 1. MVHR (explicit, must exclude "without heat recovery") m = ( ( s.str.contains("mvhr") | (s.str.contains("heat recovery") & ~s.str.contains("without heat recovery")) ) & (out == "natural") ) out[m] = "mvhr" # 2. Positive input ventilation m = s.str.contains("positive input") & (out == "natural") out[m] = "piv" # 3. Mechanical supply & extract m = s.str.contains("supply and extract") & (out == "natural") out[m] = "mech_supply_extract" # 4. Mechanical extract (fallback) m = s.str.contains("extract|mechanical") & (out == "natural") out[m] = "mech_extract" return out def extract_low_energy_lighting_fraction_vectorised(series: pd.Series) -> pd.Series: s = series.fillna("").str.lower() out = pd.Series(np.nan, index=s.index) # Explicit none out[s.str.contains("no low energy lighting")] = 0.0 # All outlets out[s.str.contains("all fixed outlets|ym mhob")] = 1.0 # Qualitative descriptors (handle misspelling) out[s.str.contains("excellent lighting efficiency|excelent lighting efficiency")] = 1.0 out[s.str.contains("good lighting efficiency")] = 0.8 out[s.str.contains("below average lighting efficiency")] = 0.2 # Percentage extraction (overrides qualitative if present) pct = s.str.extract(r"(\d+(?:\.\d+)?)\s*%", expand=False).astype(float) out[pct.notna()] = (pct / 100).clip(0, 1) return out def estimate_pv_kwp_vectorised(df: pd.DataFrame) -> pd.Series: tfa = df["TOTAL_FLOOR_AREA"] photo = df["PHOTO_SUPPLY"] valid = (tfa > 0) & (photo > 0) property_type = df["PROPERTY_TYPE"].fillna("").str.lower() roof_desc = df["ROOF_DESCRIPTION"].fillna("").str.lower() roof_projection = pd.Series(0.0, index=df.index) # Flats flats = property_type.eq("flat") roof_projection[flats] = tfa[flats] / df.loc[flats, "FLAT_STOREY_COUNT"].replace(0, np.nan) # Houses / bungalows / maisonettes roof_projection[~flats] = tfa[~flats] / 2.0 roof_is_pitched = ( roof_desc.str.contains("pitched|rafters|roof room") | (~roof_desc.str.contains("flat") & property_type.isin(["house", "bungalow", "maisonette"])) ) pitch_factor = np.where(roof_is_pitched, 1 / np.cos(np.deg2rad(35)), 1.0) pv_area = roof_projection * (photo / 100.0) * pitch_factor pv_kwp = 0.12 * pv_area return pv_kwp.where(valid, 0.0).fillna(0.0) def energy_system_feature_engineering_vectorised(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() df["MAIN_HEATING_SYSTEM"] = classify_main_heating_system_vectorised(df["MAINHEAT_DESCRIPTION"]) df["SECONDARY_HEATING_SYSTEM"] = classify_secondary_heating_vectorised(df["SECONDHEAT_DESCRIPTION"]) df["MAIN_FUEL_TYPE"] = classify_main_fuel_type_vectorised(df["MAINHEAT_DESCRIPTION"]) df["DHW_SUPPLY_SYSTEM"] = classify_dhw_system_vectorised(df["HOTWATER_DESCRIPTION"]) df["VENTILATION_SYSTEM"] = classify_ventilation_system_vectorised(df["MECHANICAL_VENTILATION"]) df["LIGHTING_FRACTION_LOW_ENERGY"] = extract_low_energy_lighting_fraction_vectorised(df["LIGHTING_DESCRIPTION"]) df["PV_KWP"] = estimate_pv_kwp_vectorised(df) return df