#!/usr/bin/env python3 """ Simplified pipeline without Great Expectations validation """ import os import sys import time import argparse import pandas as pd import mlflow import mlflow.sklearn from sklearn.model_selection import train_test_split from sklearn.metrics import ( classification_report, precision_score, recall_score, f1_score, roc_auc_score ) from xgboost import XGBClassifier PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) # === Fix import path for local modules === sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) # Local modules - Core pipeline components from src.data.load_data import load_data # Data loading with error handling from src.data.preprocess import preprocess_data # Basic data cleaning from src.features.build_features import build_features # Feature engineering from src.models.train import train_model # Model training from src.models.evaluate import evaluate_model # Model evaluation def main(args): """Main training pipeline function""" # === MLflow Setup === project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) mlruns_path = f"file://{project_root}/mlruns" mlflow.set_tracking_uri(mlruns_path) mlflow.set_experiment("churn_prediction_simple") # Start MLflow run with mlflow.start_run(): # Log hyperparameters mlflow.log_param("model", "xgboost") mlflow.log_param("threshold", args.threshold) mlflow.log_param("test_size", args.test_size) print("=== Customer Churn Prediction Pipeline ===") print(f"Input: {args.input}") print(f"Target: {args.target}") # === STAGE 1: Data Loading === print("\n" + "="*50) print("STAGE 1: LOADING DATA") print("="*50) df = load_data(args.input) print(f"Data loaded: {df.shape[0]} rows, {df.shape[1]} columns") # Log basic data stats mlflow.log_param("dataset_rows", df.shape[0]) mlflow.log_param("dataset_columns", df.shape[1]) # === STAGE 2: Data Preprocessing === print("\n" + "="*50) print("STAGE 2: DATA PREPROCESSING") print("="*50) df = preprocess_data(df) # Save processed dataset processed_path = os.path.join(project_root, "data", "processed", "telco_churn_processed.csv") os.makedirs(os.path.dirname(processed_path), exist_ok=True) df.to_csv(processed_path, index=False) print(f"Processed dataset saved to {processed_path}") # === STAGE 3: Feature Engineering === print("\n" + "="*50) print("STAGE 3: FEATURE ENGINEERING") print("="*50) target = args.target df_features = build_features(df, target) # Separate features and target feature_columns = [col for col in df_features.columns if col != target] X = df_features[feature_columns] y = df_features[target].map({'No': 0, 'Yes': 1}) # Convert to numeric print(f"Features built: {X.shape[1]} columns") print(f"Target distribution: {y.value_counts().to_dict()}") # Log feature info mlflow.log_param("feature_count", X.shape[1]) mlflow.log_param("target_positive_rate", y.mean()) # === STAGE 4: Train/Test Split === print("\n" + "="*50) print("STAGE 4: TRAIN/TEST SPLIT") print("="*50) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=args.test_size, random_state=42, stratify=y # Maintain class distribution ) print(f"Train set: {X_train.shape[0]} samples") print(f"Test set: {X_test.shape[0]} samples") print(f"Train churn rate: {y_train.mean():.3f}") print(f"Test churn rate: {y_test.mean():.3f}") # === STAGE 5: Model Training === print("\n" + "="*50) print("STAGE 5: MODEL TRAINING") print("="*50) # Calculate class weight for imbalance scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum() print(f"Class imbalance ratio (negative/positive): {scale_pos_weight:.2f}") # Create and train XGBoost model directly model = XGBClassifier( n_estimators=300, learning_rate=0.1, max_depth=6, random_state=42, n_jobs=-1, eval_metric="logloss", scale_pos_weight=scale_pos_weight ) model.fit(X_train, y_train) print("Model training completed") # === STAGE 6: Model Evaluation === print("\n" + "="*50) print("STAGE 6: MODEL EVALUATION") print("="*50) # Calculate predictions and metrics y_pred = model.predict(X_test) y_pred_proba = model.predict_proba(X_test)[:, 1] # Apply threshold for binary predictions y_pred_thresholded = (y_pred_proba >= args.threshold).astype(int) # Calculate metrics from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score metrics = { 'accuracy': accuracy_score(y_test, y_pred_thresholded), 'precision': precision_score(y_test, y_pred_thresholded), 'recall': recall_score(y_test, y_pred_thresholded), 'f1': f1_score(y_test, y_pred_thresholded), 'roc_auc': roc_auc_score(y_test, y_pred_proba) } # Print detailed results print(f"\nModel Performance Metrics (threshold={args.threshold}):") print(f"Accuracy: {metrics['accuracy']:.3f}") print(f"Precision: {metrics['precision']:.3f}") print(f"Recall: {metrics['recall']:.3f}") print(f"F1-Score: {metrics['f1']:.3f}") print(f"ROC-AUC: {metrics['roc_auc']:.3f}") # Also print classification report print("\nDetailed Classification Report:") print(classification_report(y_test, y_pred_thresholded)) # Log metrics to MLflow for metric_name, metric_value in metrics.items(): mlflow.log_metric(metric_name, metric_value) # === STAGE 7: Save Model and Artifacts === print("\n" + "="*50) print("STAGE 7: SAVE MODEL AND ARTIFACTS") print("="*50) # Save model with joblib import joblib artifacts_dir = os.path.join(project_root, "artifacts") os.makedirs(artifacts_dir, exist_ok=True) model_path = os.path.join(artifacts_dir, "model.pkl") joblib.dump(model, model_path) print(f"Model saved to {model_path}") # Save feature columns for inference import json feature_columns_path = os.path.join(artifacts_dir, "feature_columns.json") with open(feature_columns_path, 'w') as f: json.dump(feature_columns, f) print(f"Feature columns saved to {feature_columns_path}") # Save threshold for inference threshold_path = os.path.join(artifacts_dir, "threshold.json") with open(threshold_path, 'w') as f: json.dump({"threshold": args.threshold}, f) print(f"Classification threshold saved to {threshold_path}") print("\n" + "="*50) print("PIPELINE COMPLETED SUCCESSFULLY!") print("="*50) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run churn prediction pipeline") parser.add_argument("--input", required=True, help="Path to raw CSV data") parser.add_argument("--target", default="Churn", help="Target column name") parser.add_argument("--threshold", type=float, default=0.35, help="Classification threshold") parser.add_argument("--test-size", type=float, default=0.2, help="Test set proportion") parser.add_argument("--mlflow-uri", help="MLflow tracking URI") parser.add_argument("--experiment", default="churn_prediction_simple", help="MLflow experiment name") args = parser.parse_args() main(args)