File size: 13,053 Bytes
d11b44e
 
 
 
 
 
 
 
c831cba
d11b44e
c831cba
 
 
 
 
 
 
d11b44e
 
 
 
 
c831cba
d11b44e
 
 
c831cba
 
 
d11b44e
 
 
 
c831cba
d11b44e
c831cba
 
d11b44e
 
c831cba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d11b44e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c831cba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d11b44e
 
 
c831cba
d11b44e
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
import re
import pandas as pd
import numpy as np


def extract_wall_u_from_text(text: str | float | None) -> float | None:
    """
    Extract numeric U-value from WALLS_DESCRIPTION when it contains
    'Average thermal transmittance ...'.

    Supports optional insulation thickness suffix:
        '..., 0 mm'
        '..., 50 mm'
        '..., 100 mm'
        etc.

    If insulation is present, applies R-addition.
    """
    if pd.isna(text):
        return None

    s = str(text).lower()

    if "average thermal transmittance" not in s:
        return None

    # ------------------------------------------------------------
    # 1. Extract baseline U-value
    # ------------------------------------------------------------
    nums = re.findall(r"([0-9]*\.?[0-9]+)", s)
    if not nums:
        return None

    u_base = float(nums[0])

    # EPC sometimes has '0.00' for missing
    if u_base < 0.05:
        return None

    # ------------------------------------------------------------
    # 2. Extract insulation thickness (mm), default = 0 mm
    # ------------------------------------------------------------
    mm_match = re.search(r"(\d+)\s*mm", s)
    mm = int(mm_match.group(1)) if mm_match else 0

    # ------------------------------------------------------------
    # 3. Apply R-addition if insulation present
    # ------------------------------------------------------------
    R_INS_MAP = {
        0:   0.0,
        50:  1.4,
        100: 2.8,
        150: 4.2,
        200: 5.6,
    }

    R_ins = R_INS_MAP.get(mm, 0.0)

    if R_ins > 0:
        R_old = 1.0 / u_base
        return 1.0 / (R_old + R_ins)

    return u_base



def classify_wall_type(text):
    """
    Classify EPC WALLS_DESCRIPTION into BASE wall construction type only.

    This function encodes *construction identity*, NOT insulation state
    and NOT performance. It is suitable for retrofit inference.

    Final categories:
        - solid
        - cavity
        - filled cavity
        - timber frame
        - system build
        - cob
        - unknown
    """
    if pd.isna(text):
        return "unknown"

    t = text.lower().strip()

    # --------------------------------------------------------
    # 0. Direct U-value entry β†’ unknown construction
    # --------------------------------------------------------
    if "average thermal transmittance" in t:
        return "unknown"

    # --------------------------------------------------------
    # 1. Cob (distinct SAP category)
    # --------------------------------------------------------
    if "cob" in t:
        return "cob"

    # --------------------------------------------------------
    # 2. Solid masonry (brick / stone)
    # --------------------------------------------------------
    if (
        "briciau solet" in t or
        any(x in t for x in [
            "solid brick",
            "solid stone",
            "sandstone",
            "limestone",
            "granite",
            "whinstone",
            "whin"
        ])
    ):
        return "solid"

    # --------------------------------------------------------
    # 3. Timber frame
    # --------------------------------------------------------
    if "timber frame" in t:
        return "timber frame"

    # --------------------------------------------------------
    # 4. System build (explicit SAP construction class)
    # --------------------------------------------------------
    if "system build" in t or "system built" in t:
        return "system built"

    # --------------------------------------------------------
    # 5. Cavity walls
    # --------------------------------------------------------
    if "cavity" in t:
        if "filled cavity" in t:
            return "filled cavity"
        else:
            return "unfilled cavity"

    # --------------------------------------------------------
    # 6. Basement / retaining walls (not envelope)
    # --------------------------------------------------------
    if "basement wall" in t or "retaining wall" in t:
        return "unknown"

    return "unknown"



def extract_wall_insulation(text):
    """
    Extract wall insulation depth category from WALLS_DESCRIPTION.

    Returns one of:
        - "as built"
        - "50 mm"
        - "100 mm"
        - "150 mm"
        - "200 mm"
        - None  (measured U-value only)

    Insulation state ONLY. No construction identity.
    """
    if pd.isna(text):
        return None

    t = text.lower()

    # --------------------------------------------------------
    # 0. Direct U-value entry β†’ no insulation category
    # --------------------------------------------------------
    if "average thermal transmittance" in t:
        return None

    # --------------------------------------------------------
    # 1. Explicit thickness (must come FIRST)
    # --------------------------------------------------------
    if "200 mm" in t:
        return "200 mm"
    if "150 mm" in t:
        return "150 mm"
    if "100 mm" in t:
        return "100 mm"
    if "50 mm" in t:
        return "50 mm"

    # --------------------------------------------------------
    # 2. Generic insulation statements
    # --------------------------------------------------------
    if "internal insulation" in t or "external insulation" in t:
        return "50 mm"

    if "partial insulation" in t or "insulated" in t:
        return "50 mm"

    # --------------------------------------------------------
    # 3. Explicit no insulation
    # --------------------------------------------------------
    if "no insulation" in t or "as built" in t:
        return "as built"

    # --------------------------------------------------------
    # 4. Default
    # --------------------------------------------------------
    return "as built"


def lookup_wall_u_value(row, walls_u_values):
    wall_type = classify_wall_type(row["WALLS_DESCRIPTION"])
    raw_age = row["sap_band_label"]

    # If EPC gives numeric U-value β†’ use it
    numeric_u = extract_wall_u_from_text(row["WALLS_DESCRIPTION"])
    if numeric_u is not None:
        return numeric_u

    # --------------------------------------------------------
    # INTERNAL SAP AGE-BAND MAPPING (TABLE S1 – England & Wales)
    # --------------------------------------------------------
    AGE_BAND_MAP = {
        "pre-1900":       "before 1900",
        "before 1900":    "before 1900",

        "1900-1929":      "1900–1929",
        "1930-1949":      "1930–1949",
        "1950-1966":      "1950–1966",
        "1967-1975":      "1967–1975",
        "1976-1982":      "1976–1982",
        "1983-1990":      "1983–1990",
        "1991-1995":      "1991–1995",
        "1996-2002":      "1996–2002",

        # Also catch accidental unicode/duplicate variations
        "1996–2002":      "1996–2002",

        "2003-2006":      "2003–2006",
        "2007-2011":      "2007–2011",

        "2012+":          "2012 onwards",
        "2012 onwards":   "2012 onwards",
    }

    # --------------------------------------------------------
    # Convert incoming SAP band β†’ exact column name in U-value table
    # --------------------------------------------------------
    age_label = AGE_BAND_MAP.get(raw_age, None)

    # If mapping fails, return NaN (should be extremely rare)
    if age_label is None:
        return np.nan

    # If wall type is None β†’ cannot assign table U-value
    if wall_type is None:
        return np.nan

    # --------------------------------------------------------
    # U-value lookup (exact match required)
    # --------------------------------------------------------
    if age_label in walls_u_values.columns:
        vals = walls_u_values.loc[
            walls_u_values["External wall type"] == wall_type,
            age_label
        ]

        if len(vals) > 0:
            return vals.values[0]

    return np.nan



def prepare_wall_u_table(walls_u_values: pd.DataFrame) -> pd.DataFrame:
    return walls_u_values.melt(
        id_vars="External wall type",
        var_name="WALL_AGE_LABEL",
        value_name="WALL_U_TABLE"
    )


def build_wall_lookup(walls_desc: pd.Series) -> pd.DataFrame:
    """
    Parse each unique WALLS_DESCRIPTION once.
    """
    uniq = walls_desc.dropna().unique()

    rows = []
    for desc in uniq:
        rows.append({
            "WALLS_DESCRIPTION": desc,
            "WALL_TYPE": classify_wall_type(desc),
            "WALL_INSULATION": extract_wall_insulation(desc),
            "WALL_U_MEASURED": extract_wall_u_from_text(desc),
        })

    return pd.DataFrame(rows)


AGE_BAND_MAP = {
    "pre-1900": "before 1900",
    "before 1900": "before 1900",
    "1900-1929": "1900–1929",
    "1930-1949": "1930–1949",
    "1950-1966": "1950–1966",
    "1967-1975": "1967–1975",
    "1976-1982": "1976–1982",
    "1983-1990": "1983–1990",
    "1991-1995": "1991–1995",
    "1996-2002": "1996–2002",
    "1996–2002": "1996–2002",
    "2003-2006": "2003–2006",
    "2007-2011": "2007–2011",
    "2012+": "2012 onwards",
    "2012 onwards": "2012 onwards",
}


def merge_wall_type_for_sap(wall_type: str, insulation: str) -> str:
    """
    Merge wall base type + insulation into SAP external wall type label.
    Used ONLY for SAP U-value lookup.
    """

    if wall_type is None:
        return None

    if insulation in (None, "as built"):
        return f"{wall_type}- as built"

    return f"{wall_type}- {insulation} insulation"


def wall_feature_engineering(
    df: pd.DataFrame,
    walls_u_values: pd.DataFrame,
) -> pd.DataFrame:
    """
    Wall feature engineering using dictionary-based lookups only.
    No DataFrame merges (memory-safe and consistent with multi-key logic).

    Steps:
    1. Parse WALLS_DESCRIPTION β†’ wall semantics
    2. Normalise SAP age band
    3. Lookup SAP wall U-values via (WALL_TYPE, WALL_AGE_LABEL)
    4. Final U-value resolution: measured > SAP table
    """

    df = df.copy()

    # ------------------------------------------------------------
    # 1. Parse wall descriptions ONCE (dictionary lookup)
    # ------------------------------------------------------------
    # build_wall_lookup must return a DataFrame with:
    # ["WALLS_DESCRIPTION", "WALL_TYPE", "WALL_INSULATION", "WALL_U_MEASURED"]
    wall_lookup_df = build_wall_lookup(df["WALLS_DESCRIPTION"])

    wall_lookup_dict = {
        desc: (
            row["WALL_TYPE"],
            row["WALL_INSULATION"],
            row["WALL_U_MEASURED"],
        )
        for desc, row in wall_lookup_df.set_index("WALLS_DESCRIPTION").iterrows()
    }

    parsed = df["WALLS_DESCRIPTION"].map(wall_lookup_dict)

    df["WALL_TYPE"] = parsed.str[0]
    df["WALL_INSULATION"] = parsed.str[1]
    df["WALL_U_MEASURED"] = parsed.str[2]

    # ------------------------------------------------------------
    # 2. Normalise SAP age band (pure map, no join)
    # ------------------------------------------------------------
    df["WALL_AGE_LABEL"] = df["sap_band_label"].map(AGE_BAND_MAP)

    # ------------------------------------------------------------
    # 3. SAP wall U-value lookup via dictionary
    # ------------------------------------------------------------
    # Prepare long SAP table once
    walls_u_long = prepare_wall_u_table(walls_u_values)

    wall_u_dict = {
        (row["External wall type"], row["WALL_AGE_LABEL"]): row["WALL_U_TABLE"]
        for _, row in walls_u_long.iterrows()
    }

    # wall_keys = zip(df["WALL_TYPE"], df["WALL_AGE_LABEL"]) old version
    # Merge wall type + insulation for SAP key (vectorised)
    df["WALL_TYPE_SAP"] = [
        merge_wall_type_for_sap(wt, ins)
        for wt, ins in zip(df["WALL_TYPE"], df["WALL_INSULATION"])
    ]

    wall_keys = zip(df["WALL_TYPE_SAP"], df["WALL_AGE_LABEL"])

    df["WALL_U_TABLE"] = [wall_u_dict.get(k) for k in wall_keys]

    # ------------------------------------------------------------
    # 4. Final U-value resolution (SAP rule)
    # ------------------------------------------------------------
    df["WALL_U_VALUE"] = df["WALL_U_MEASURED"].combine_first(df["WALL_U_TABLE"])

    # ------------------------------------------------------------
    # 4.5 Vectorised insulation collapse for ML model
    # ------------------------------------------------------------

    # Start with default = insulated
    df["WALL_INSULATION_MODEL"] = "insulated"

    # as built β†’ as built
    mask_as_built = df["WALL_INSULATION"].isin(["as built"])
    df.loc[mask_as_built, "WALL_INSULATION_MODEL"] = "as built"

    # unknown / NaN β†’ unknown
    mask_unknown = df["WALL_INSULATION"].isna() | df["WALL_INSULATION"].isin(["unknown"])
    df.loc[mask_unknown, "WALL_INSULATION_MODEL"] = "unknown"

    # ------------------------------------------------------------
    # 5. Optional clean-up
    # ------------------------------------------------------------
    df.drop(columns=["WALL_U_TABLE","WALL_INSULATION"], inplace=True, errors="ignore")

    return df