MuleGuard / src /features /builder.py
Aryan Singh
Improve mule classifier: native-NaN + missingness (CV PR-AUC 0.88->0.91, recall 13->15/16)
67eae2d
Raw
History Blame Contribute Delete
6.93 kB
"""FeatureBuilder: the single fitted object that turns raw account records into
the model's input matrix. Used identically by training and serving for parity.
Pipeline (v2 — native-NaN):
clean -> keep numerics WITH NaN (LightGBM splits on missingness natively)
-> one-hot encode the semantic categoricals
-> add per-feature missingness indicators + a row missing-count
-> append an Isolation Forest anomaly score (computed on a median-imputed
copy, since IsolationForest cannot consume NaN)
-> select a stable top-K subset (importance voting + domain priors).
Why native-NaN: ~28% of values are missing and mules are characterised largely
by a distinctive *missingness* pattern; median-imputation erased that signal.
The classifier sees raw NaNs; only the anomaly model gets an imputed copy.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from lightgbm import LGBMClassifier
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.preprocessing import OneHotEncoder
from src import config
from src.features.clean import clean_frame, split_column_types
ANOMALY_COL = "anomaly_score"
MISSCOUNT_COL = "missing_count"
class FeatureBuilder:
def __init__(self, n_select: int = config.N_SELECT, seed: int = config.SEED):
self.n_select = n_select
self.seed = seed
self.num_cols_: list[str] = []
self.cat_cols_: list[str] = []
self.ohe_: OneHotEncoder | None = None
self.ohe_cols_: list[str] = []
self.num_medians_: np.ndarray | None = None # only to feed the anomaly model
self.iso_: IsolationForest | None = None
self.feature_names_full_: list[str] = []
self.selected_features_: list[str] = []
self.selection_freq_: dict[str, float] = {}
# ---- internal helpers --------------------------------------------------
def _numeric_frame(self, cleaned: pd.DataFrame) -> pd.DataFrame:
"""Numeric columns as float WITH NaN preserved (reindexed for serve safety)."""
return (cleaned.reindex(columns=self.num_cols_)
.apply(pd.to_numeric, errors="coerce").astype("float64"))
def _ohe_frame(self, cleaned: pd.DataFrame) -> pd.DataFrame:
if not self.cat_cols_:
return pd.DataFrame(index=cleaned.index)
vals = cleaned.reindex(columns=self.cat_cols_).astype("object")
vals = vals.where(pd.notna(vals), "NA")
mat = self.ohe_.transform(vals)
return pd.DataFrame(mat, columns=self.ohe_cols_, index=cleaned.index)
def _impute_for_iso(self, num: pd.DataFrame) -> np.ndarray:
arr = num.values.copy()
pos = np.where(np.isnan(arr))
arr[pos] = np.take(self.num_medians_, pos[1])
return arr
def _assemble(self, cleaned: pd.DataFrame) -> pd.DataFrame:
num = self._numeric_frame(cleaned)
ind = num.isna().astype("float64")
ind.columns = [f"{c}__isna" for c in num.columns]
out = pd.concat([num, self._ohe_frame(cleaned), ind], axis=1)
out[MISSCOUNT_COL] = num.isna().sum(axis=1).astype("float64").values
out[ANOMALY_COL] = -self.iso_.decision_function(self._impute_for_iso(num))
return out
def _known_important_columns(self, all_names: list[str]) -> list[str]:
"""Map bank-flagged features to their (possibly one-hot-expanded) columns."""
keep = set()
priors = set(config.KNOWN_IMPORTANT) | {"F3888_age_days", "F3889_recency_ord"}
for name in all_names:
base = name.split("_")[0] # e.g. 'F3891_salaried' -> 'F3891'
if name in priors or base in priors:
keep.add(name)
keep.add(ANOMALY_COL)
keep.add(MISSCOUNT_COL)
return sorted(keep)
# ---- public API --------------------------------------------------------
def fit(self, X: pd.DataFrame, y: pd.Series) -> "FeatureBuilder":
cleaned = clean_frame(X)
self.num_cols_, self.cat_cols_ = split_column_types(cleaned)
num = cleaned[self.num_cols_].apply(pd.to_numeric, errors="coerce").astype("float64")
# Per-column median — used ONLY to feed the anomaly model (cannot take NaN).
self.num_medians_ = np.nan_to_num(np.nanmedian(num.values, axis=0), nan=0.0)
# One-hot encoder for the semantic categoricals.
if self.cat_cols_:
self.ohe_ = OneHotEncoder(handle_unknown="ignore", min_frequency=20, sparse_output=False)
vals = cleaned[self.cat_cols_].astype("object")
vals = vals.where(pd.notna(vals), "NA")
self.ohe_.fit(vals)
self.ohe_cols_ = list(self.ohe_.get_feature_names_out(self.cat_cols_))
else:
self.ohe_cols_ = []
# Unsupervised anomaly model on median-imputed training rows only.
self.iso_ = IsolationForest(
n_estimators=200, contamination="auto", random_state=self.seed, n_jobs=-1
).fit(self._impute_for_iso(num))
full = self._assemble(cleaned)
self.feature_names_full_ = list(full.columns)
# ---- feature selection via importance voting across CV folds -------
cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=2, random_state=self.seed)
pos = int(y.sum())
neg = int((y == 0).sum())
votes = pd.Series(0.0, index=full.columns)
for tr_idx, _ in cv.split(full, y):
clf = LGBMClassifier(
n_estimators=300, num_leaves=31, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.5, reg_lambda=1.0,
scale_pos_weight=neg / max(pos, 1), random_state=self.seed,
n_jobs=-1, verbose=-1,
)
clf.fit(full.iloc[tr_idx], y.iloc[tr_idx])
imp = pd.Series(clf.feature_importances_, index=full.columns)
top = imp.sort_values(ascending=False).head(self.n_select).index
votes[top] += 1.0
self.selection_freq_ = (votes / (cv.get_n_splits())).to_dict()
ranked = votes.sort_values(ascending=False)
selected = list(ranked.head(self.n_select).index)
# Always retain domain priors + anomaly score + missing-count.
for col in self._known_important_columns(self.feature_names_full_):
if col not in selected:
selected.append(col)
self.selected_features_ = selected
return self
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
full = self._assemble(clean_frame(X))
# Serving robustness: guarantee every selected column exists.
for col in self.selected_features_:
if col not in full.columns:
full[col] = 0.0
return full[self.selected_features_]
def fit_transform(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
return self.fit(X, y).transform(X)