Zalaid's picture
Step 3: data preprocessing pipeline with scaling, train/test split, and SMOTE
41e2db1
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from imblearn.over_sampling import SMOTE
import joblib
RAW_PATH = os.path.join('data', 'raw', 'creditcard.csv')
PROCESSED_DIR = os.path.join('data', 'processed')
SCALER_PATH = os.path.join('models', 'scaler.pkl')
RANDOM_STATE = 42
TEST_SIZE = 0.20
def load_data(path: str = RAW_PATH) -> pd.DataFrame:
df = pd.read_csv(path)
print(f"Loaded : {df.shape[0]:,} rows, {df.shape[1]} columns")
return df
def scale_features(df: pd.DataFrame, fit: bool = True, scaler=None):
"""
Scale Amount and Time with RobustScaler (resistant to outliers).
V1-V28 are already PCA-scaled β€” leave them untouched.
"""
df = df.copy()
if fit:
scaler = RobustScaler()
df[['Amount', 'Time']] = scaler.fit_transform(df[['Amount', 'Time']])
os.makedirs(os.path.dirname(SCALER_PATH), exist_ok=True)
joblib.dump(scaler, SCALER_PATH)
print(f"Scaler : fitted and saved to {SCALER_PATH}")
else:
df[['Amount', 'Time']] = scaler.transform(df[['Amount', 'Time']])
return df, scaler
def split_data(df: pd.DataFrame):
X = df.drop('Class', axis=1)
y = df['Class']
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=TEST_SIZE,
random_state=RANDOM_STATE,
stratify=y # preserves 0.17% fraud ratio in both splits
)
print(f"Train : {X_train.shape[0]:,} rows (fraud: {y_train.sum():,})")
print(f"Test : {X_test.shape[0]:,} rows (fraud: {y_test.sum():,})")
return X_train, X_test, y_train, y_test
def apply_smote(X_train: pd.DataFrame, y_train: pd.Series):
"""
SMOTE applied ONLY to training data β€” never to test data.
Creates synthetic fraud examples so the model sees a balanced dataset.
"""
print(f"Before SMOTE β€” normal: {(y_train==0).sum():,} fraud: {(y_train==1).sum():,}")
sm = SMOTE(random_state=RANDOM_STATE)
X_res, y_res = sm.fit_resample(X_train, y_train)
print(f"After SMOTE β€” normal: {(y_res==0).sum():,} fraud: {(y_res==1).sum():,}")
return X_res, y_res
def save_splits(X_train, X_test, y_train, y_test, X_train_sm, y_train_sm):
os.makedirs(PROCESSED_DIR, exist_ok=True)
joblib.dump(X_train, os.path.join(PROCESSED_DIR, 'X_train_raw.pkl'))
joblib.dump(X_test, os.path.join(PROCESSED_DIR, 'X_test.pkl'))
joblib.dump(y_train, os.path.join(PROCESSED_DIR, 'y_train_raw.pkl'))
joblib.dump(y_test, os.path.join(PROCESSED_DIR, 'y_test.pkl'))
joblib.dump(X_train_sm, os.path.join(PROCESSED_DIR, 'X_train_smote.pkl'))
joblib.dump(y_train_sm, os.path.join(PROCESSED_DIR, 'y_train_smote.pkl'))
print(f"Saved : all splits to {PROCESSED_DIR}/")
def run():
print("=" * 45)
print(" Preprocessing Pipeline")
print("=" * 45)
df = load_data()
df, scaler = scale_features(df, fit=True)
X_train, X_test, y_train, y_test = split_data(df)
X_train_sm, y_train_sm = apply_smote(X_train, y_train)
save_splits(X_train, X_test, y_train, y_test, X_train_sm, y_train_sm)
print("=" * 45)
print(" Done β€” preprocessing complete")
print("=" * 45)
return X_train, X_test, y_train, y_test, X_train_sm, y_train_sm, scaler
if __name__ == '__main__':
run()