fraud-detection-mlops-api / src /preprocessing.py
github-actions[bot]
deploy: sync snapshot from github
4937cba
"""Training/inference preprocessing pipeline utilities."""
from __future__ import annotations
from pathlib import Path
from typing import Any
import joblib
import numpy as np
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
TARGET_COLUMN = "Class"
SCALE_COLUMNS = ["Time", "Amount"]
def split_data(
df: pd.DataFrame,
*,
target_column: str = TARGET_COLUMN,
test_size: float = 0.2,
random_state: int = 42,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
"""Split dataframe into train/test with class stratification."""
if target_column not in df.columns:
raise ValueError(f"Missing target column: {target_column}")
if not 0 < test_size < 1:
raise ValueError("test_size must be between 0 and 1")
X = df.drop(columns=[target_column])
y = df[target_column]
return train_test_split(
X,
y,
test_size=test_size,
random_state=random_state,
stratify=y,
)
def scale_features(
df: pd.DataFrame,
*,
columns: list[str] | None = None,
scaler: StandardScaler | None = None,
) -> tuple[pd.DataFrame, StandardScaler]:
"""Scale selected columns and return transformed dataframe and scaler."""
scale_columns = columns or SCALE_COLUMNS
missing = [column for column in scale_columns if column not in df.columns]
if missing:
raise ValueError(f"Columns not found for scaling: {missing}")
local_scaler = scaler or StandardScaler()
result = df.copy()
result[scale_columns] = local_scaler.fit_transform(df[scale_columns])
return result, local_scaler
def build_preprocessor(
feature_columns: list[str],
*,
scale_columns: list[str] | None = None,
) -> ColumnTransformer:
"""Build a column transformer for consistent training/inference transforms."""
chosen_scale_columns = scale_columns or SCALE_COLUMNS
missing = [column for column in chosen_scale_columns if column not in feature_columns]
if missing:
raise ValueError(f"Scale columns missing from features: {missing}")
preprocessor = ColumnTransformer(
transformers=[("scale", StandardScaler(), chosen_scale_columns)],
remainder="passthrough",
verbose_feature_names_out=False,
)
preprocessor.set_output(transform="pandas")
return preprocessor
def transform_features(
preprocessor: ColumnTransformer,
X: pd.DataFrame,
) -> pd.DataFrame:
"""Transform feature dataframe using a fitted preprocessor."""
transformed = preprocessor.transform(X)
if not isinstance(transformed, pd.DataFrame):
transformed = pd.DataFrame(transformed, columns=preprocessor.get_feature_names_out())
return transformed
def handle_imbalance(
X_train: pd.DataFrame,
y_train: pd.Series,
*,
method: str = "class_weight",
random_state: int = 42,
sampling_strategy: float = 0.5,
) -> tuple[pd.DataFrame, pd.Series, dict[str, Any]]:
"""Handle class imbalance using strategy selected by method."""
selected = method.lower()
if selected not in {"none", "class_weight", "smote"}:
raise ValueError("method must be one of: none, class_weight, smote")
if selected == "none":
return X_train, y_train, {"method": "none", "class_weight": None}
if selected == "class_weight":
classes = np.array(sorted(y_train.unique().tolist()))
weights = compute_class_weight(class_weight="balanced", classes=classes, y=y_train)
class_weight = {int(label): float(weight) for label, weight in zip(classes, weights)}
return X_train, y_train, {"method": "class_weight", "class_weight": class_weight}
smote = SMOTE(random_state=random_state, sampling_strategy=sampling_strategy)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
X_balanced = pd.DataFrame(X_resampled, columns=X_train.columns)
y_balanced = pd.Series(y_resampled, name=y_train.name)
return X_balanced, y_balanced, {"method": "smote", "class_weight": None}
def save_preprocessor(preprocessor: ColumnTransformer, output_path: str | Path) -> Path:
"""Persist fitted preprocessor to disk."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(preprocessor, path)
return path
def load_preprocessor(preprocessor_path: str | Path) -> ColumnTransformer:
"""Load persisted preprocessor from disk."""
return joblib.load(Path(preprocessor_path))
def preprocess_for_training(
df: pd.DataFrame,
*,
target_column: str = TARGET_COLUMN,
test_size: float = 0.2,
random_state: int = 42,
imbalance_method: str = "class_weight",
preprocessor_path: str | Path = "models/preprocessor.pkl",
) -> dict[str, Any]:
"""Run train/test split, fit/transform preprocessor, and handle imbalance."""
X_train_raw, X_test_raw, y_train, y_test = split_data(
df,
target_column=target_column,
test_size=test_size,
random_state=random_state,
)
preprocessor = build_preprocessor(feature_columns=X_train_raw.columns.tolist())
preprocessor.fit(X_train_raw)
X_train = transform_features(preprocessor, X_train_raw)
X_test = transform_features(preprocessor, X_test_raw)
X_train_final, y_train_final, imbalance_metadata = handle_imbalance(
X_train,
y_train,
method=imbalance_method,
random_state=random_state,
)
save_preprocessor(preprocessor, preprocessor_path)
return {
"X_train": X_train_final,
"X_test": X_test,
"y_train": y_train_final,
"y_test": y_test,
"preprocessor": preprocessor,
"imbalance_metadata": imbalance_metadata,
}