Spaces:
Sleeping
Sleeping
File size: 5,535 Bytes
992aa4f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """
Feature engineering and preprocessing pipeline.
Key design decisions
--------------------
* CUSTOMERS.AFFECTED and OUTAGE.DURATION are **removed** from the feature
matrix because they directly define the target (data leakage).
* Weather is proxied via ANOMALY.LEVEL (Oceanic Niño Index) which is already
in the dataset — no external API needed.
* Engineered features are derived solely from existing columns.
* The sklearn ColumnTransformer is serialised so inference uses the exact
same transformations.
"""
from __future__ import annotations
import json
import logging
from typing import Tuple
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from src.config import (
ARTIFACTS_DIR,
CATEGORICAL_FEATURES,
FEATURE_NAMES_FILE,
LEAK_COLUMNS,
NUMERIC_FEATURES,
PREPROCESSOR_FILE,
RANDOM_STATE,
TARGET_COL,
TEST_SIZE,
)
logger = logging.getLogger(__name__)
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Derive additional columns from raw data — no external sources."""
df = df.copy()
# Demand-to-customer ratio: MW lost per thousand customers
if "DEMAND.LOSS.MW" in df.columns and "TOTAL.CUSTOMERS" in df.columns:
df["demand_per_1k_cust"] = (
df["DEMAND.LOSS.MW"] / (df["TOTAL.CUSTOMERS"] / 1_000)
).replace([np.inf, -np.inf], np.nan)
# Residential share of total sales (economic structure proxy)
if "RES.SALES" in df.columns and "TOTAL.SALES" in df.columns:
df["res_sales_share"] = (
df["RES.SALES"] / df["TOTAL.SALES"]
).replace([np.inf, -np.inf], np.nan)
# Price spread: industrial vs residential (rate design signal)
if "RES.PRICE" in df.columns and "IND.PRICE" in df.columns:
df["price_spread_res_ind"] = df["RES.PRICE"] - df["IND.PRICE"]
# Population density contrast (urban vs rural stress indicator)
if "POPDEN_URBAN" in df.columns and "POPDEN_RURAL" in df.columns:
df["urban_rural_density_ratio"] = (
df["POPDEN_URBAN"] / df["POPDEN_RURAL"].replace(0, np.nan)
).replace([np.inf, -np.inf], np.nan)
# Season bucket from MONTH
if "MONTH" in df.columns:
month = df["MONTH"].astype(float)
df["season"] = pd.cut(
month,
bins=[0, 3, 6, 9, 12],
labels=["winter", "spring", "summer", "fall"],
include_lowest=True,
).astype(str)
return df
# Additional engineered numeric & categorical columns ----------------------
_ENGINEERED_NUMERIC = [
"demand_per_1k_cust",
"res_sales_share",
"price_spread_res_ind",
"urban_rural_density_ratio",
]
_ENGINEERED_CATEGORICAL = ["season"]
def _resolve_columns(df: pd.DataFrame) -> Tuple[list[str], list[str]]:
"""Return (numeric_cols, categorical_cols) present in df, minus leakage."""
available = set(df.columns)
leak = set(LEAK_COLUMNS)
num = [c for c in NUMERIC_FEATURES + _ENGINEERED_NUMERIC if c in available and c not in leak]
cat = [c for c in CATEGORICAL_FEATURES + _ENGINEERED_CATEGORICAL if c in available]
return num, cat
def build_preprocessor(num_cols: list[str], cat_cols: list[str]) -> ColumnTransformer:
"""Construct a ColumnTransformer that handles imputation + encoding."""
numeric_pipe = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
])
categorical_pipe = Pipeline([
("imputer", SimpleImputer(strategy="constant", fill_value="MISSING")),
("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_pipe, num_cols),
("cat", categorical_pipe, cat_cols),
],
remainder="drop",
)
return preprocessor
def prepare_splits(
df: pd.DataFrame,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, ColumnTransformer, list[str]]:
"""
Full feature-engineering → split → fit-transform workflow.
Returns
-------
X_train, X_test, y_train, y_test, fitted_preprocessor, feature_names
"""
df = engineer_features(df)
num_cols, cat_cols = _resolve_columns(df)
logger.info("Feature columns — numeric: %d, categorical: %d", len(num_cols), len(cat_cols))
X = df[num_cols + cat_cols]
y = df[TARGET_COL].values
X_train_raw, X_test_raw, y_train, y_test = train_test_split(
X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y
)
preprocessor = build_preprocessor(num_cols, cat_cols)
X_train = preprocessor.fit_transform(X_train_raw)
X_test = preprocessor.transform(X_test_raw)
# Recover human-readable feature names
ohe = preprocessor.named_transformers_["cat"].named_steps["encoder"]
cat_feature_names = list(ohe.get_feature_names_out(cat_cols))
feature_names = num_cols + cat_feature_names
# Persist preprocessor + feature names
import joblib
joblib.dump(preprocessor, ARTIFACTS_DIR / PREPROCESSOR_FILE)
with open(ARTIFACTS_DIR / FEATURE_NAMES_FILE, "w") as f:
json.dump(feature_names, f)
logger.info("Train: %d samples | Test: %d samples", X_train.shape[0], X_test.shape[0])
return X_train, X_test, y_train, y_test, preprocessor, feature_names
|