File size: 5,535 Bytes
992aa4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Feature engineering and preprocessing pipeline.

Key design decisions
--------------------
* CUSTOMERS.AFFECTED and OUTAGE.DURATION are **removed** from the feature
  matrix because they directly define the target (data leakage).
* Weather is proxied via ANOMALY.LEVEL (Oceanic Niño Index) which is already
  in the dataset — no external API needed.
* Engineered features are derived solely from existing columns.
* The sklearn ColumnTransformer is serialised so inference uses the exact
  same transformations.
"""
from __future__ import annotations

import json
import logging
from typing import Tuple

import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

from src.config import (
    ARTIFACTS_DIR,
    CATEGORICAL_FEATURES,
    FEATURE_NAMES_FILE,
    LEAK_COLUMNS,
    NUMERIC_FEATURES,
    PREPROCESSOR_FILE,
    RANDOM_STATE,
    TARGET_COL,
    TEST_SIZE,
)

logger = logging.getLogger(__name__)


def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """Derive additional columns from raw data — no external sources."""
    df = df.copy()

    # Demand-to-customer ratio: MW lost per thousand customers
    if "DEMAND.LOSS.MW" in df.columns and "TOTAL.CUSTOMERS" in df.columns:
        df["demand_per_1k_cust"] = (
            df["DEMAND.LOSS.MW"] / (df["TOTAL.CUSTOMERS"] / 1_000)
        ).replace([np.inf, -np.inf], np.nan)

    # Residential share of total sales (economic structure proxy)
    if "RES.SALES" in df.columns and "TOTAL.SALES" in df.columns:
        df["res_sales_share"] = (
            df["RES.SALES"] / df["TOTAL.SALES"]
        ).replace([np.inf, -np.inf], np.nan)

    # Price spread: industrial vs residential (rate design signal)
    if "RES.PRICE" in df.columns and "IND.PRICE" in df.columns:
        df["price_spread_res_ind"] = df["RES.PRICE"] - df["IND.PRICE"]

    # Population density contrast (urban vs rural stress indicator)
    if "POPDEN_URBAN" in df.columns and "POPDEN_RURAL" in df.columns:
        df["urban_rural_density_ratio"] = (
            df["POPDEN_URBAN"] / df["POPDEN_RURAL"].replace(0, np.nan)
        ).replace([np.inf, -np.inf], np.nan)

    # Season bucket from MONTH
    if "MONTH" in df.columns:
        month = df["MONTH"].astype(float)
        df["season"] = pd.cut(
            month,
            bins=[0, 3, 6, 9, 12],
            labels=["winter", "spring", "summer", "fall"],
            include_lowest=True,
        ).astype(str)

    return df


# Additional engineered numeric & categorical columns ----------------------
_ENGINEERED_NUMERIC = [
    "demand_per_1k_cust",
    "res_sales_share",
    "price_spread_res_ind",
    "urban_rural_density_ratio",
]
_ENGINEERED_CATEGORICAL = ["season"]


def _resolve_columns(df: pd.DataFrame) -> Tuple[list[str], list[str]]:
    """Return (numeric_cols, categorical_cols) present in df, minus leakage."""
    available = set(df.columns)
    leak = set(LEAK_COLUMNS)

    num = [c for c in NUMERIC_FEATURES + _ENGINEERED_NUMERIC if c in available and c not in leak]
    cat = [c for c in CATEGORICAL_FEATURES + _ENGINEERED_CATEGORICAL if c in available]
    return num, cat


def build_preprocessor(num_cols: list[str], cat_cols: list[str]) -> ColumnTransformer:
    """Construct a ColumnTransformer that handles imputation + encoding."""
    numeric_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler()),
    ])

    categorical_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="constant", fill_value="MISSING")),
        ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ("num", numeric_pipe, num_cols),
            ("cat", categorical_pipe, cat_cols),
        ],
        remainder="drop",
    )
    return preprocessor


def prepare_splits(
    df: pd.DataFrame,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, ColumnTransformer, list[str]]:
    """
    Full feature-engineering → split → fit-transform workflow.

    Returns
    -------
    X_train, X_test, y_train, y_test, fitted_preprocessor, feature_names
    """
    df = engineer_features(df)
    num_cols, cat_cols = _resolve_columns(df)
    logger.info("Feature columns — numeric: %d, categorical: %d", len(num_cols), len(cat_cols))

    X = df[num_cols + cat_cols]
    y = df[TARGET_COL].values

    X_train_raw, X_test_raw, y_train, y_test = train_test_split(
        X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y
    )

    preprocessor = build_preprocessor(num_cols, cat_cols)
    X_train = preprocessor.fit_transform(X_train_raw)
    X_test = preprocessor.transform(X_test_raw)

    # Recover human-readable feature names
    ohe = preprocessor.named_transformers_["cat"].named_steps["encoder"]
    cat_feature_names = list(ohe.get_feature_names_out(cat_cols))
    feature_names = num_cols + cat_feature_names

    # Persist preprocessor + feature names
    import joblib
    joblib.dump(preprocessor, ARTIFACTS_DIR / PREPROCESSOR_FILE)
    with open(ARTIFACTS_DIR / FEATURE_NAMES_FILE, "w") as f:
        json.dump(feature_names, f)

    logger.info("Train: %d samples | Test: %d samples", X_train.shape[0], X_test.shape[0])
    return X_train, X_test, y_train, y_test, preprocessor, feature_names