Spaces:
Sleeping
Sleeping
Aryan Singh
Improve mule classifier: native-NaN + missingness (CV PR-AUC 0.88->0.91, recall 13->15/16)
67eae2d | """FeatureBuilder: the single fitted object that turns raw account records into | |
| the model's input matrix. Used identically by training and serving for parity. | |
| Pipeline (v2 — native-NaN): | |
| clean -> keep numerics WITH NaN (LightGBM splits on missingness natively) | |
| -> one-hot encode the semantic categoricals | |
| -> add per-feature missingness indicators + a row missing-count | |
| -> append an Isolation Forest anomaly score (computed on a median-imputed | |
| copy, since IsolationForest cannot consume NaN) | |
| -> select a stable top-K subset (importance voting + domain priors). | |
| Why native-NaN: ~28% of values are missing and mules are characterised largely | |
| by a distinctive *missingness* pattern; median-imputation erased that signal. | |
| The classifier sees raw NaNs; only the anomaly model gets an imputed copy. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from lightgbm import LGBMClassifier | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.model_selection import RepeatedStratifiedKFold | |
| from sklearn.preprocessing import OneHotEncoder | |
| from src import config | |
| from src.features.clean import clean_frame, split_column_types | |
| ANOMALY_COL = "anomaly_score" | |
| MISSCOUNT_COL = "missing_count" | |
| class FeatureBuilder: | |
| def __init__(self, n_select: int = config.N_SELECT, seed: int = config.SEED): | |
| self.n_select = n_select | |
| self.seed = seed | |
| self.num_cols_: list[str] = [] | |
| self.cat_cols_: list[str] = [] | |
| self.ohe_: OneHotEncoder | None = None | |
| self.ohe_cols_: list[str] = [] | |
| self.num_medians_: np.ndarray | None = None # only to feed the anomaly model | |
| self.iso_: IsolationForest | None = None | |
| self.feature_names_full_: list[str] = [] | |
| self.selected_features_: list[str] = [] | |
| self.selection_freq_: dict[str, float] = {} | |
| # ---- internal helpers -------------------------------------------------- | |
| def _numeric_frame(self, cleaned: pd.DataFrame) -> pd.DataFrame: | |
| """Numeric columns as float WITH NaN preserved (reindexed for serve safety).""" | |
| return (cleaned.reindex(columns=self.num_cols_) | |
| .apply(pd.to_numeric, errors="coerce").astype("float64")) | |
| def _ohe_frame(self, cleaned: pd.DataFrame) -> pd.DataFrame: | |
| if not self.cat_cols_: | |
| return pd.DataFrame(index=cleaned.index) | |
| vals = cleaned.reindex(columns=self.cat_cols_).astype("object") | |
| vals = vals.where(pd.notna(vals), "NA") | |
| mat = self.ohe_.transform(vals) | |
| return pd.DataFrame(mat, columns=self.ohe_cols_, index=cleaned.index) | |
| def _impute_for_iso(self, num: pd.DataFrame) -> np.ndarray: | |
| arr = num.values.copy() | |
| pos = np.where(np.isnan(arr)) | |
| arr[pos] = np.take(self.num_medians_, pos[1]) | |
| return arr | |
| def _assemble(self, cleaned: pd.DataFrame) -> pd.DataFrame: | |
| num = self._numeric_frame(cleaned) | |
| ind = num.isna().astype("float64") | |
| ind.columns = [f"{c}__isna" for c in num.columns] | |
| out = pd.concat([num, self._ohe_frame(cleaned), ind], axis=1) | |
| out[MISSCOUNT_COL] = num.isna().sum(axis=1).astype("float64").values | |
| out[ANOMALY_COL] = -self.iso_.decision_function(self._impute_for_iso(num)) | |
| return out | |
| def _known_important_columns(self, all_names: list[str]) -> list[str]: | |
| """Map bank-flagged features to their (possibly one-hot-expanded) columns.""" | |
| keep = set() | |
| priors = set(config.KNOWN_IMPORTANT) | {"F3888_age_days", "F3889_recency_ord"} | |
| for name in all_names: | |
| base = name.split("_")[0] # e.g. 'F3891_salaried' -> 'F3891' | |
| if name in priors or base in priors: | |
| keep.add(name) | |
| keep.add(ANOMALY_COL) | |
| keep.add(MISSCOUNT_COL) | |
| return sorted(keep) | |
| # ---- public API -------------------------------------------------------- | |
| def fit(self, X: pd.DataFrame, y: pd.Series) -> "FeatureBuilder": | |
| cleaned = clean_frame(X) | |
| self.num_cols_, self.cat_cols_ = split_column_types(cleaned) | |
| num = cleaned[self.num_cols_].apply(pd.to_numeric, errors="coerce").astype("float64") | |
| # Per-column median — used ONLY to feed the anomaly model (cannot take NaN). | |
| self.num_medians_ = np.nan_to_num(np.nanmedian(num.values, axis=0), nan=0.0) | |
| # One-hot encoder for the semantic categoricals. | |
| if self.cat_cols_: | |
| self.ohe_ = OneHotEncoder(handle_unknown="ignore", min_frequency=20, sparse_output=False) | |
| vals = cleaned[self.cat_cols_].astype("object") | |
| vals = vals.where(pd.notna(vals), "NA") | |
| self.ohe_.fit(vals) | |
| self.ohe_cols_ = list(self.ohe_.get_feature_names_out(self.cat_cols_)) | |
| else: | |
| self.ohe_cols_ = [] | |
| # Unsupervised anomaly model on median-imputed training rows only. | |
| self.iso_ = IsolationForest( | |
| n_estimators=200, contamination="auto", random_state=self.seed, n_jobs=-1 | |
| ).fit(self._impute_for_iso(num)) | |
| full = self._assemble(cleaned) | |
| self.feature_names_full_ = list(full.columns) | |
| # ---- feature selection via importance voting across CV folds ------- | |
| cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=2, random_state=self.seed) | |
| pos = int(y.sum()) | |
| neg = int((y == 0).sum()) | |
| votes = pd.Series(0.0, index=full.columns) | |
| for tr_idx, _ in cv.split(full, y): | |
| clf = LGBMClassifier( | |
| n_estimators=300, num_leaves=31, learning_rate=0.05, | |
| subsample=0.8, colsample_bytree=0.5, reg_lambda=1.0, | |
| scale_pos_weight=neg / max(pos, 1), random_state=self.seed, | |
| n_jobs=-1, verbose=-1, | |
| ) | |
| clf.fit(full.iloc[tr_idx], y.iloc[tr_idx]) | |
| imp = pd.Series(clf.feature_importances_, index=full.columns) | |
| top = imp.sort_values(ascending=False).head(self.n_select).index | |
| votes[top] += 1.0 | |
| self.selection_freq_ = (votes / (cv.get_n_splits())).to_dict() | |
| ranked = votes.sort_values(ascending=False) | |
| selected = list(ranked.head(self.n_select).index) | |
| # Always retain domain priors + anomaly score + missing-count. | |
| for col in self._known_important_columns(self.feature_names_full_): | |
| if col not in selected: | |
| selected.append(col) | |
| self.selected_features_ = selected | |
| return self | |
| def transform(self, X: pd.DataFrame) -> pd.DataFrame: | |
| full = self._assemble(clean_frame(X)) | |
| # Serving robustness: guarantee every selected column exists. | |
| for col in self.selected_features_: | |
| if col not in full.columns: | |
| full[col] = 0.0 | |
| return full[self.selected_features_] | |
| def fit_transform(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame: | |
| return self.fit(X, y).transform(X) | |