PayShield-ML / src /models /pipeline.py
Sibi Krishnamoorthy
prod
8a08300
"""
Feature Engineering Pipeline.
Constructs a robust Scikit-Learn pipeline for fraud detection.
Includes custom transformers for feature extraction and standard transformers
for scaling and encoding.
Derived from notebook analysis:
- Categorical: WOE Encoding (job, category)
- Numerical: Robust Scaling (amt, distance)
- Time: Cyclical encoding (sin/cos)
- Geo: Haversine distance
"""
from typing import Dict, List, Optional, Union
import numpy as np
import pandas as pd
from category_encoders import WOEEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import RobustScaler
from xgboost import XGBClassifier
class FraudFeatureExtractor(BaseEstimator, TransformerMixin):
"""
Custom transformer to compute derived features for fraud detection.
Implements feature engineering logic from research notebook:
1. Distance calculation (Haversine)
2. Cyclical time features (hour/day sin/cos)
3. Log transformations (amount, time diff)
4. Age calculation
5. Ratio features (if not already computed)
"""
def __init__(self):
pass
def fit(self, X, y=None):
return self
def transform(self, X):
"""
Compute derived features.
Args:
X: DataFrame with raw columns
Returns:
DataFrame with additional feature columns
"""
# Avoid modifying original dataframe
X = X.copy()
# 1. Date/Time Features
if "trans_date_trans_time" in X.columns:
# Convert to datetime if string
if X["trans_date_trans_time"].dtype == "object":
X["trans_date_trans_time"] = pd.to_datetime(X["trans_date_trans_time"])
dt = X["trans_date_trans_time"].dt
# Cyclical encoding for hour (0-23)
X["hour_sin"] = np.sin(2 * np.pi * dt.hour / 24)
X["hour_cos"] = np.cos(2 * np.pi * dt.hour / 24)
# Cyclical encoding for day of week (0-6)
X["day_sin"] = np.sin(2 * np.pi * dt.dayofweek / 7)
X["day_cos"] = np.cos(2 * np.pi * dt.dayofweek / 7)
# Calculate Age from DOB
if "dob" in X.columns:
if X["dob"].dtype == "object":
X["dob"] = pd.to_datetime(X["dob"])
# Approximation: (Dataset Year - DOB Year)
# Using transaction year
X["age"] = dt.year - X["dob"].dt.year
# 2. Geolocation Features (Haversine Distance)
if all(c in X.columns for c in ["lat", "long", "merch_lat", "merch_long"]):
X["distance_km"] = self._haversine_distance(
X["lat"], X["long"], X["merch_lat"], X["merch_long"]
)
# 3. Log Transformations
if "amt" in X.columns:
X["amt_log"] = np.log1p(X["amt"])
# 4. Gender Mapping (M=1, F=0)
if "gender" in X.columns:
X["gender"] = X["gender"].map({"M": 1, "F": 0}).astype(int)
return X
def _haversine_distance(self, lat1, lon1, lat2, lon2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees).
"""
# Convert decimal degrees to radians
lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
c = 2 * np.arcsin(np.sqrt(a))
r = 6371 # Radius of earth in kilometers
return c * r
def create_fraud_pipeline(params: Dict[str, any]) -> Pipeline:
"""
Create a complete training pipeline.
Args:
params: Dictionary of hyperparameters for XGBoost and encoders.
Returns:
Sklearn Pipeline: FeatureExtraction -> ColumnTransformer -> XGBClassifier
"""
# Define feature groups
categorical_features = ["job", "category"]
# Numerical features to scale (continuous, unbounded)
numerical_features = [
"amt_log",
"age",
"distance_km",
"trans_count_24h",
"amt_to_avg_ratio_24h",
"amt_relative_to_all_time",
]
# Binary features (0/1, no processing needed)
binary_features = ["gender"]
# Cyclical features (already normalized to -1 to 1, no processing needed)
cyclical_features = ["hour_sin", "hour_cos", "day_sin", "day_cos"]
# Preprocessing Pipeline
preprocessor = ColumnTransformer(
transformers=[
("cat", WOEEncoder(sigma=0.05, regularization=1.0), categorical_features),
("num", RobustScaler(), numerical_features),
("binary", "passthrough", binary_features),
("cyclical", "passthrough", cyclical_features),
],
remainder="drop", # Drop unused columns (like raw lat/long/timestamps)
verbose_feature_names_out=False,
)
# Full Pipeline
pipeline = Pipeline(
[
("features", FraudFeatureExtractor()),
("preprocessor", preprocessor),
(
"model",
XGBClassifier(
tree_method="hist",
max_depth=params.get("max_depth", 6),
learning_rate=params.get("learning_rate", 0.1),
n_estimators=params.get("n_estimators", 100),
objective="binary:logistic",
eval_metric="aucpr",
random_state=42,
n_jobs=-1,
scale_pos_weight=params.get("scale_pos_weight", 100), # Handle class imbalance
),
),
]
)
return pipeline