| from __future__ import annotations |
|
|
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.impute import SimpleImputer |
| from sklearn.pipeline import Pipeline |
| from sklearn.preprocessing import FunctionTransformer, StandardScaler |
|
|
| from .utils_cool import PhaseProgress |
|
|
|
|
| def _safe_apply_pipeline( |
| group_name: str, |
| df_in: pd.DataFrame, |
| cols: List[str], |
| pipeline: Pipeline, |
| *, |
| drop_on_error: bool, |
| warnings: List[str], |
| max_new_cols: int = 2000, |
| ) -> pd.DataFrame: |
| """ |
| Apply a user pipeline to df_in[cols] safely. |
| - If success and returns DataFrame with same index: replace group columns with returned columns (prefixed). |
| - If success and returns ndarray/sparse: if width <= max_new_cols, expand with auto names; else drop group. |
| - If failure and drop_on_error: drop group columns and record a warning. |
| """ |
| Xsub = df_in[cols] |
| try: |
| out = pipeline.fit_transform(Xsub) |
| except Exception as e: |
| if drop_on_error: |
| warnings.append( |
| f"[custom_featurizer] dropped '{group_name}' columns ({len(cols)}): {cols[:6]}{'...' if len(cols)>6 else ''} " |
| f"reason={type(e).__name__}: {str(e)[:180]}" |
| ) |
| return df_in.drop(columns=cols) |
| raise |
| |
| |
| if isinstance(out, pd.DataFrame): |
| out_df = out.copy() |
| |
| out_df.index = df_in.index |
| |
| out_df.columns = [f"{group_name}__{c}" for c in out_df.columns] |
| df_out = df_in.drop(columns=cols).join(out_df) |
| return df_out |
| |
| |
| try: |
| import numpy as _np |
| import scipy.sparse as _sp |
| if _sp.issparse(out): |
| out = out.toarray() |
| out = _np.asarray(out) |
| n_rows, n_cols = out.shape[0], (out.shape[1] if out.ndim == 2 else 1) |
| if n_rows != len(df_in): |
| raise ValueError(f"Pipeline for '{group_name}' returned {n_rows} rows; expected {len(df_in)}.") |
| if n_cols > max_new_cols: |
| warnings.append( |
| f"[custom_featurizer] '{group_name}' produced {n_cols} columns (> {max_new_cols}); dropping this group to avoid explosion." |
| ) |
| return df_in.drop(columns=cols) |
| if out.ndim == 1: |
| out = out.reshape(-1, 1) |
| n_cols = 1 |
| new_cols = [f"{group_name}__f{i}" for i in range(n_cols)] |
| out_df = pd.DataFrame(out, index=df_in.index, columns=new_cols) |
| df_out = df_in.drop(columns=cols).join(out_df) |
| return df_out |
| except Exception as e: |
| if drop_on_error: |
| warnings.append( |
| f"[custom_featurizer] failed to materialize output for '{group_name}'; dropping group. " |
| f"reason={type(e).__name__}: {str(e)[:180]}" |
| ) |
| return df_in.drop(columns=cols) |
| raise |
| |
| |
| def custom_featurizer( |
| df: pd.DataFrame, |
| *, |
| |
| label: Optional[str] = None, |
| |
| |
| numeric_pipeline: Optional[Pipeline] = None, |
| low_card_pipeline: Optional[Pipeline] = None, |
| text_pipeline: Optional[Pipeline] = None, |
| |
| |
| numeric_scale: bool = True, |
| text_lowercase: bool = True, |
| max_ohe_cardinality: int = 50, |
| |
| |
| nan_strategy: str = "impute", |
| |
| |
| on_pipeline_error: str = "drop", |
| |
| |
| max_new_cols_per_group: int = 2000, |
| |
| |
| progress: Optional[Any] = None, |
| |
| |
| verbose: bool = False, |
| ) -> Tuple[pd.DataFrame, Dict[str, Any]]: |
| """ |
| Featurize a mixed DataFrame while keeping the output as a DataFrame for downstream steps. |
| |
| Overview |
| -------- |
| Columns are split into three groups: |
| • numeric (including boolean) |
| • low-cardinality categoricals (nunique ≤ `max_ohe_cardinality`) |
| • text/high-cardinality (nunique > `max_ohe_cardinality`) |
| |
| You may pass custom sklearn `Pipeline`s per group. If a user pipeline raises and |
| `on_pipeline_error="drop"`, that entire group of columns is dropped and a short |
| warning is recorded in `stats["warnings"]` (the step does not crash). If `"raise"`, |
| the error is propagated. |
| |
| Defaults when pipelines are NOT provided |
| --------------------------------------- |
| • Numeric (when `numeric_pipeline is None`) |
| - NaNs: if `nan_strategy="impute"`, apply `SimpleImputer(strategy="median")`; |
| if `"drop"`, rows with NaNs in numeric columns are dropped BEFORE this step. |
| - Scaling: if `numeric_scale=True`, apply `StandardScaler(with_mean=False)`. |
| - Columns: replaced in place (same column names remain; values become numeric/float). |
| • Low-cardinality categoricals (when `low_card_pipeline is None`) |
| - NaNs: if `nan_strategy="impute"`, apply `SimpleImputer(strategy="most_frequent")`; |
| if `"drop"`, rows with NaNs in these columns are dropped BEFORE this step. |
| - Encoding: **no one-hot by default** (values stay as cleaned strings/categories). |
| If you want encodings, pass your own `low_card_pipeline` (e.g., OneHotEncoder/CatBoostEncoder). |
| - Columns: preserved (same names). |
| • Text / high-cardinality (when `text_pipeline is None`) |
| - Build a tiny pipeline: |
| concat selected text cols → `TfidfVectorizer(dtype=float32, lowercase=text_lowercase)` |
| - Output: numeric TF-IDF features. Original text columns are **replaced** by new |
| columns named `txt__f0`, `txt__f1`, … (or `txt__<col>` if your pipeline returns a DataFrame). |
| - Feature explosion guard: if the produced matrix has more than `max_new_cols_per_group` columns, |
| the entire text group is dropped and a warning is recorded. |
| - If TF-IDF fails (e.g., empty vocabulary on tiny data), the text group is dropped with a warning. |
| |
| NaN handling |
| ------------ |
| `nan_strategy` applies only to groups using the **default** pipeline: |
| - "impute": impute NaNs (as above) |
| - "drop" : drop rows containing NaNs in any default-handled feature column **before** |
| transforming. For groups with a **custom** pipeline, NaN handling is your pipeline’s responsibility. |
| |
| Parameters |
| ---------- |
| df : pd.DataFrame |
| Input table. If `label` is provided, rows with NaN in `label` are dropped first. |
| label : Optional[str], default=None |
| Name of the target column (only used to drop NaN labels). Not returned as `y`. |
| numeric_pipeline : Optional[sklearn.pipeline.Pipeline], default=None |
| Custom pipeline for numeric/boolean columns. If provided, `nan_strategy` is ignored for this group. |
| By default: median imputation (+ optional scaling) and columns are preserved. |
| low_card_pipeline : Optional[sklearn.pipeline.Pipeline], default=None |
| Custom pipeline for low-card categorical columns. If provided, `nan_strategy` is ignored for this group. |
| By default: most-frequent imputation only; **no encoding**; columns are preserved. |
| text_pipeline : Optional[sklearn.pipeline.Pipeline], default=None |
| Custom pipeline for text/high-card columns. If provided, it replaces the text columns with whatever |
| it outputs (DataFrame → prefixed columns; array/sparse → `txt__f*` columns). If not provided, the |
| built-in concat+TF-IDF is used (see defaults above). |
| numeric_scale : bool, default=True |
| Applies `StandardScaler(with_mean=False)` to numeric columns in the default numeric pipeline. |
| text_lowercase : bool, default=True |
| Forwarded to the default `TfidfVectorizer(lowercase=...)`. |
| max_ohe_cardinality : int, default=50 |
| Threshold to classify non-numeric columns as low-card (≤ threshold) vs text/high-card (> threshold). |
| nan_strategy : {"impute","drop"}, default="impute" |
| Strategy for **default** pipelines only (custom pipelines manage their own NaNs). |
| on_pipeline_error : {"drop","raise"}, default="drop" |
| If a **user** pipeline raises: "drop" → drop that group and record a warning; "raise" → propagate. |
| max_new_cols_per_group : int, default=2000 |
| Upper bound on the number of columns a group is allowed to add (applies to array/sparse outputs). |
| If exceeded, the group is dropped and a warning is recorded. |
| progress : Optional[tqdm], default=None |
| Phase-aware progress bar (clean → split → numeric → low_card → text → finalize). |
| If None, a local bar is created and closed automatically. |
| verbose : bool, default=False |
| When True, prints recorded warnings (in addition to returning them in `stats["warnings"]`). |
| |
| Returns |
| ------- |
| output_df : pd.DataFrame |
| Transformed DataFrame ready for the next pipeline step. Numeric/low-card columns are |
| updated in place by defaults; text columns are replaced by TF-IDF features when using |
| the default text path. |
| stats : Dict[str, Any] |
| Minimal metadata: |
| - "warnings": List[str] |
| - "cols": {"numeric": [...], "low_card": [...], "text": [...]} |
| - "n_rows_before": int |
| - "n_rows_after": int |
| |
| Notes |
| ----- |
| • If a user pipeline returns a DataFrame, columns are prefixed with the group name to avoid collisions. |
| If it returns an array/sparse matrix, columns are auto-named `"{group}__f{i}"`. |
| • The feature explosion guard applies after transformation; if you often hit it with text, consider passing |
| your own `TfidfVectorizer(max_features=...)` in `text_pipeline` to cap the width proactively. |
| """ |
| |
| local_bar = None |
| pp = None |
| if progress is None: |
| from tqdm.auto import tqdm |
| local_bar = tqdm(total=100, leave=True) |
| pp = PhaseProgress(local_bar, weights={ |
| "clean": .10, "split": .10, "numeric": .30, "low_card": .25, "text": .20, "finalize": .05 |
| }) |
| elif hasattr(progress, "set_description") and hasattr(progress, "update"): |
| if not hasattr(progress, "_last_val"): |
| progress._last_val = 0 |
| pp = PhaseProgress(progress, weights={ |
| "clean": .10, "split": .10, "numeric": .30, "low_card": .25, "text": .20, "finalize": .05 |
| }) |
| |
| warnings: List[str] = [] |
| |
| |
| n_before = len(df) |
| pp and pp.start("clean", extra={"N": n_before}) |
| if label is not None and label not in df.columns: |
| |
| if local_bar is not None: |
| pp.close() |
| raise KeyError(f"Target column '{label}' not in DataFrame.") |
| if label is not None: |
| df = df.dropna(subset=[label]).reset_index(drop=True) |
| pp and pp.tick_abs("clean", 1.0, extra={"N": len(df)}) |
| pp and pp.end("clean") |
| |
| |
| pp and pp.start("split") |
| Xf = df if label is None else df.drop(columns=[label]) |
| numeric_cols: List[str] = Xf.select_dtypes(include=[np.number, "boolean"]).columns.tolist() |
| non_numeric = [c for c in Xf.columns if c not in numeric_cols] |
| |
| low_card_cols: List[str] = [] |
| text_cols: List[str] = [] |
| for c in non_numeric: |
| nunq = Xf[c].nunique(dropna=True) |
| (low_card_cols if nunq <= max_ohe_cardinality else text_cols).append(c) |
| pp and pp.tick_abs("split", 1.0, extra={ |
| "num": len(numeric_cols), "low": len(low_card_cols), "txt": len(text_cols) |
| }) |
| pp and pp.end("split") |
| |
| |
| if nan_strategy not in {"impute", "drop"}: |
| if local_bar is not None: |
| pp.close() |
| raise ValueError("nan_strategy must be 'impute' or 'drop'") |
| if nan_strategy == "drop": |
| cols_to_check = [] |
| if numeric_cols and numeric_pipeline is None: |
| cols_to_check += numeric_cols |
| if low_card_cols and low_card_pipeline is None: |
| cols_to_check += low_card_cols |
| if text_cols and text_pipeline is None: |
| cols_to_check += text_cols |
| if cols_to_check: |
| mask = ~Xf[cols_to_check].isna().any(axis=1) |
| df = df.loc[mask].reset_index(drop=True) |
| Xf = df if label is None else df.drop(columns=[label]) |
| |
| |
| pp and pp.start("numeric", extra={"cols": len(numeric_cols)}) |
| if numeric_cols: |
| if numeric_pipeline is not None: |
| df = _safe_apply_pipeline( |
| "num", df, numeric_cols, numeric_pipeline, |
| drop_on_error=(on_pipeline_error == "drop"), |
| warnings=warnings, |
| max_new_cols=max_new_cols_per_group, |
| ) |
| else: |
| if nan_strategy == "impute": |
| imputed = SimpleImputer(strategy="median").fit_transform(df[numeric_cols]) |
| df.loc[:, numeric_cols] = imputed |
| if numeric_scale: |
| scaler = StandardScaler(with_mean=False) |
| scaled = scaler.fit_transform(df[numeric_cols].astype(float, copy=False)) |
| df.loc[:, numeric_cols] = np.asarray(scaled) |
| pp and pp.tick_abs("numeric", 1.0) |
| pp and pp.end("numeric", extra={"cols": len(numeric_cols)}) |
| |
| |
| pp and pp.start("low_card", extra={"cols": len(low_card_cols)}) |
| if low_card_cols: |
| if low_card_pipeline is not None: |
| df = _safe_apply_pipeline( |
| "cat", df, low_card_cols, low_card_pipeline, |
| drop_on_error=(on_pipeline_error == "drop"), |
| warnings=warnings, |
| max_new_cols=max_new_cols_per_group, |
| ) |
| else: |
| if nan_strategy == "impute": |
| df.loc[:, low_card_cols] = SimpleImputer(strategy="most_frequent").fit_transform(df[low_card_cols]) |
| pp and pp.tick_abs("low_card", 1.0) |
| pp and pp.end("low_card", extra={"cols": len(low_card_cols)}) |
| |
| |
| pp and pp.start("text", extra={"cols": len(text_cols)}) |
| if text_cols: |
| if text_pipeline is None: |
| def _concat_cols(Xframe: pd.DataFrame): |
| return Xframe.fillna("").astype(str).agg(" ".join, axis=1).values |
| |
| text_pipeline = Pipeline([ |
| ("concat", FunctionTransformer(_concat_cols, validate=False)), |
| ("tfidf", TfidfVectorizer( |
| dtype=np.float32, |
| lowercase=bool(text_lowercase), |
| )), |
| ]) |
| |
| df = _safe_apply_pipeline( |
| "txt", df, text_cols, text_pipeline, |
| drop_on_error=(on_pipeline_error == "drop"), |
| warnings=warnings, |
| max_new_cols=max_new_cols_per_group, |
| ) |
| |
| pp and pp.tick_abs("text", 1.0) |
| pp and pp.end("text", extra={"cols": len(text_cols)}) |
| |
| |
| pp and pp.start("finalize") |
| stats: Dict[str, Any] = { |
| "warnings": warnings, |
| "cols": {"numeric": numeric_cols, "low_card": low_card_cols, "text": text_cols}, |
| "n_rows_before": n_before, |
| "n_rows_after": len(df), |
| } |
| pp and pp.tick_abs("finalize", 1.0, extra={"warn": len(warnings)}) |
| pp and pp.end("finalize", extra={"warn": len(warnings)}) |
| |
| if verbose and warnings: |
| print("\n".join(warnings)) |
| if local_bar is not None: |
| pp.close() |
| |
| return df, stats |