""" SmartCertify ML — Preprocessing Pipeline Full sklearn pipeline for data cleaning, transformation, and balancing. """ import numpy as np import pandas as pd import logging from pathlib import Path from typing import Tuple, Optional from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.preprocessing import StandardScaler, FunctionTransformer from sklearn.impute import SimpleImputer from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.model_selection import train_test_split import sys sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) from app.config.settings import RANDOM_SEED, TEST_SIZE, DATASET_PATH, MODEL_DIR from app.utils.model_io import save_sklearn_model, load_sklearn_model logger = logging.getLogger(__name__) # ─── Feature Columns ───────────────────────────────────────── NUMERIC_FEATURES = [ "issuer_reputation_score", "certificate_age_days", "metadata_completeness_score", "ocr_confidence_score", "template_match_score", "domain_verification_status", "previous_verification_count", "time_since_last_verification_days", ] TEXT_FEATURES = ["issuer_name", "course_name"] DATE_FEATURES = ["issue_date", "expiry_date"] TARGET = "label" def extract_date_features(df: pd.DataFrame) -> pd.DataFrame: """Extract numeric features from date columns.""" df = df.copy() if "issue_date" in df.columns: issue_date = pd.to_datetime(df["issue_date"], errors="coerce") df["issue_month"] = issue_date.dt.month.fillna(0).astype(int) df["issue_year"] = issue_date.dt.year.fillna(0).astype(int) df["issue_dayofweek"] = issue_date.dt.dayofweek.fillna(0).astype(int) df["weekend_issued"] = (df["issue_dayofweek"] >= 5).astype(int) if "expiry_date" in df.columns and "issue_date" in df.columns: expiry_date = pd.to_datetime(df["expiry_date"], errors="coerce") days_to_expiry = (expiry_date - issue_date).dt.days df["days_to_expiry"] = days_to_expiry.fillna(0).astype(int) df["is_expired"] = (days_to_expiry < 0).astype(int) # Check for future issue dates (fraud signal) if "issue_date" in df.columns: now = pd.Timestamp.now() df["is_future_issue"] = (issue_date > now).astype(int) return df def build_preprocessor(X_train: pd.DataFrame) -> ColumnTransformer: """Build and fit the sklearn ColumnTransformer preprocessor.""" numeric_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ]) # All numeric columns including engineered ones numeric_cols = [c for c in X_train.columns if c not in TEXT_FEATURES + DATE_FEATURES + [TARGET, "credential_hash", "recipient_name", "combined_text"]] preprocessor = ColumnTransformer( transformers=[ ("num", numeric_pipeline, [c for c in numeric_cols if c in X_train.columns]), ("text", TfidfVectorizer(max_features=500, ngram_range=(1, 2), stop_words="english"), "combined_text"), ], remainder="drop", ) return preprocessor def _combine_text_columns(df: pd.DataFrame) -> pd.DataFrame: """Combine text feature columns into a single column for TF-IDF.""" df = df.copy() text_parts = [] for col in TEXT_FEATURES: if col in df.columns: text_parts.append(df[col].fillna("unknown").astype(str)) if text_parts: df["combined_text"] = text_parts[0] for part in text_parts[1:]: df["combined_text"] = df["combined_text"] + " " + part else: df["combined_text"] = "unknown" return df def prepare_data( df: Optional[pd.DataFrame] = None, apply_smote: bool = False, # Disabled by default to save memory ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, object]: """ Full data preparation pipeline: 1. Load data 2. Extract date features 3. Build and fit preprocessor 4. Apply SMOTE for class balancing 5. Return X_train, X_test, y_train, y_test, preprocessor """ if df is None: df = pd.read_csv(DATASET_PATH) logger.info(f"Loaded dataset: {df.shape}") # Extract date features df = extract_date_features(df) # Combine text columns into a single column df = _combine_text_columns(df) # Split features and target y = df[TARGET].values X = df.drop(columns=[TARGET, "credential_hash", "recipient_name"], errors="ignore") # Train-test split X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=TEST_SIZE, random_state=RANDOM_SEED, stratify=y ) logger.info(f"Train: {X_train.shape}, Test: {X_test.shape}") logger.info(f"Train class distribution: {np.bincount(y_train)}") # Build and fit preprocessor preprocessor = build_preprocessor(X_train) X_train_processed = preprocessor.fit_transform(X_train) X_test_processed = preprocessor.transform(X_test) # Convert sparse matrices to dense if hasattr(X_train_processed, "toarray"): X_train_processed = X_train_processed.toarray() if hasattr(X_test_processed, "toarray"): X_test_processed = X_test_processed.toarray() # Apply SMOTE on training data only if apply_smote: try: from imblearn.over_sampling import SMOTE smote = SMOTE(random_state=RANDOM_SEED) X_train_processed, y_train = smote.fit_resample(X_train_processed, y_train) logger.info(f"After SMOTE — Train: {X_train_processed.shape}, Class dist: {np.bincount(y_train)}") except ImportError: logger.warning("imblearn not installed, skipping SMOTE") # Save the preprocessor save_sklearn_model(preprocessor, "preprocessor.joblib", metadata={"n_features": X_train_processed.shape[1]}) logger.info("Saved preprocessor to saved_models/preprocessor.joblib") return X_train_processed, X_test_processed, y_train, y_test, preprocessor def preprocess_single(data: dict, preprocessor=None) -> np.ndarray: """Preprocess a single certificate record for inference.""" if preprocessor is None: preprocessor = load_sklearn_model("preprocessor.joblib") df = pd.DataFrame([data]) df = extract_date_features(df) df = _combine_text_columns(df) df = df.drop(columns=["credential_hash", "recipient_name", TARGET], errors="ignore") processed = preprocessor.transform(df) if hasattr(processed, "toarray"): processed = processed.toarray() return processed def main(): """Run the preprocessing pipeline.""" print("Running preprocessing pipeline...") X_train, X_test, y_train, y_test, preprocessor = prepare_data() print(f"\n✅ Preprocessing complete:") print(f" • X_train shape: {X_train.shape}") print(f" • X_test shape: {X_test.shape}") print(f" • y_train distribution: {dict(zip(*np.unique(y_train, return_counts=True)))}") print(f" • y_test distribution: {dict(zip(*np.unique(y_test, return_counts=True)))}") print(f" • Preprocessor saved to: saved_models/preprocessor.joblib") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) main()