"""FeatureBuilder: the single fitted object that turns raw account records into the model's input matrix. Used identically by training and serving for parity. Pipeline (v2 — native-NaN): clean -> keep numerics WITH NaN (LightGBM splits on missingness natively) -> one-hot encode the semantic categoricals -> add per-feature missingness indicators + a row missing-count -> append an Isolation Forest anomaly score (computed on a median-imputed copy, since IsolationForest cannot consume NaN) -> select a stable top-K subset (importance voting + domain priors). Why native-NaN: ~28% of values are missing and mules are characterised largely by a distinctive *missingness* pattern; median-imputation erased that signal. The classifier sees raw NaNs; only the anomaly model gets an imputed copy. """ from __future__ import annotations import numpy as np import pandas as pd from lightgbm import LGBMClassifier from sklearn.ensemble import IsolationForest from sklearn.model_selection import RepeatedStratifiedKFold from sklearn.preprocessing import OneHotEncoder from src import config from src.features.clean import clean_frame, split_column_types ANOMALY_COL = "anomaly_score" MISSCOUNT_COL = "missing_count" class FeatureBuilder: def __init__(self, n_select: int = config.N_SELECT, seed: int = config.SEED): self.n_select = n_select self.seed = seed self.num_cols_: list[str] = [] self.cat_cols_: list[str] = [] self.ohe_: OneHotEncoder | None = None self.ohe_cols_: list[str] = [] self.num_medians_: np.ndarray | None = None # only to feed the anomaly model self.iso_: IsolationForest | None = None self.feature_names_full_: list[str] = [] self.selected_features_: list[str] = [] self.selection_freq_: dict[str, float] = {} # ---- internal helpers -------------------------------------------------- def _numeric_frame(self, cleaned: pd.DataFrame) -> pd.DataFrame: """Numeric columns as float WITH NaN preserved (reindexed for serve safety).""" return (cleaned.reindex(columns=self.num_cols_) .apply(pd.to_numeric, errors="coerce").astype("float64")) def _ohe_frame(self, cleaned: pd.DataFrame) -> pd.DataFrame: if not self.cat_cols_: return pd.DataFrame(index=cleaned.index) vals = cleaned.reindex(columns=self.cat_cols_).astype("object") vals = vals.where(pd.notna(vals), "NA") mat = self.ohe_.transform(vals) return pd.DataFrame(mat, columns=self.ohe_cols_, index=cleaned.index) def _impute_for_iso(self, num: pd.DataFrame) -> np.ndarray: arr = num.values.copy() pos = np.where(np.isnan(arr)) arr[pos] = np.take(self.num_medians_, pos[1]) return arr def _assemble(self, cleaned: pd.DataFrame) -> pd.DataFrame: num = self._numeric_frame(cleaned) ind = num.isna().astype("float64") ind.columns = [f"{c}__isna" for c in num.columns] out = pd.concat([num, self._ohe_frame(cleaned), ind], axis=1) out[MISSCOUNT_COL] = num.isna().sum(axis=1).astype("float64").values out[ANOMALY_COL] = -self.iso_.decision_function(self._impute_for_iso(num)) return out def _known_important_columns(self, all_names: list[str]) -> list[str]: """Map bank-flagged features to their (possibly one-hot-expanded) columns.""" keep = set() priors = set(config.KNOWN_IMPORTANT) | {"F3888_age_days", "F3889_recency_ord"} for name in all_names: base = name.split("_")[0] # e.g. 'F3891_salaried' -> 'F3891' if name in priors or base in priors: keep.add(name) keep.add(ANOMALY_COL) keep.add(MISSCOUNT_COL) return sorted(keep) # ---- public API -------------------------------------------------------- def fit(self, X: pd.DataFrame, y: pd.Series) -> "FeatureBuilder": cleaned = clean_frame(X) self.num_cols_, self.cat_cols_ = split_column_types(cleaned) num = cleaned[self.num_cols_].apply(pd.to_numeric, errors="coerce").astype("float64") # Per-column median — used ONLY to feed the anomaly model (cannot take NaN). self.num_medians_ = np.nan_to_num(np.nanmedian(num.values, axis=0), nan=0.0) # One-hot encoder for the semantic categoricals. if self.cat_cols_: self.ohe_ = OneHotEncoder(handle_unknown="ignore", min_frequency=20, sparse_output=False) vals = cleaned[self.cat_cols_].astype("object") vals = vals.where(pd.notna(vals), "NA") self.ohe_.fit(vals) self.ohe_cols_ = list(self.ohe_.get_feature_names_out(self.cat_cols_)) else: self.ohe_cols_ = [] # Unsupervised anomaly model on median-imputed training rows only. self.iso_ = IsolationForest( n_estimators=200, contamination="auto", random_state=self.seed, n_jobs=-1 ).fit(self._impute_for_iso(num)) full = self._assemble(cleaned) self.feature_names_full_ = list(full.columns) # ---- feature selection via importance voting across CV folds ------- cv = RepeatedStratifiedKFold(n_splits=5, n_repeats=2, random_state=self.seed) pos = int(y.sum()) neg = int((y == 0).sum()) votes = pd.Series(0.0, index=full.columns) for tr_idx, _ in cv.split(full, y): clf = LGBMClassifier( n_estimators=300, num_leaves=31, learning_rate=0.05, subsample=0.8, colsample_bytree=0.5, reg_lambda=1.0, scale_pos_weight=neg / max(pos, 1), random_state=self.seed, n_jobs=-1, verbose=-1, ) clf.fit(full.iloc[tr_idx], y.iloc[tr_idx]) imp = pd.Series(clf.feature_importances_, index=full.columns) top = imp.sort_values(ascending=False).head(self.n_select).index votes[top] += 1.0 self.selection_freq_ = (votes / (cv.get_n_splits())).to_dict() ranked = votes.sort_values(ascending=False) selected = list(ranked.head(self.n_select).index) # Always retain domain priors + anomaly score + missing-count. for col in self._known_important_columns(self.feature_names_full_): if col not in selected: selected.append(col) self.selected_features_ = selected return self def transform(self, X: pd.DataFrame) -> pd.DataFrame: full = self._assemble(clean_frame(X)) # Serving robustness: guarantee every selected column exists. for col in self.selected_features_: if col not in full.columns: full[col] = 0.0 return full[self.selected_features_] def fit_transform(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame: return self.fit(X, y).transform(X)