Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Simplified pipeline without Great Expectations validation | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import argparse | |
| import pandas as pd | |
| import mlflow | |
| import mlflow.sklearn | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import ( | |
| classification_report, precision_score, recall_score, | |
| f1_score, roc_auc_score | |
| ) | |
| from xgboost import XGBClassifier | |
| PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
| # === Fix import path for local modules === | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | |
| # Local modules - Core pipeline components | |
| from src.data.load_data import load_data # Data loading with error handling | |
| from src.data.preprocess import preprocess_data # Basic data cleaning | |
| from src.features.build_features import build_features # Feature engineering | |
| from src.models.train import train_model # Model training | |
| from src.models.evaluate import evaluate_model # Model evaluation | |
| def main(args): | |
| """Main training pipeline function""" | |
| # === MLflow Setup === | |
| project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
| mlruns_path = f"file://{project_root}/mlruns" | |
| mlflow.set_tracking_uri(mlruns_path) | |
| mlflow.set_experiment("churn_prediction_simple") | |
| # Start MLflow run | |
| with mlflow.start_run(): | |
| # Log hyperparameters | |
| mlflow.log_param("model", "xgboost") | |
| mlflow.log_param("threshold", args.threshold) | |
| mlflow.log_param("test_size", args.test_size) | |
| print("=== Customer Churn Prediction Pipeline ===") | |
| print(f"Input: {args.input}") | |
| print(f"Target: {args.target}") | |
| # === STAGE 1: Data Loading === | |
| print("\n" + "="*50) | |
| print("STAGE 1: LOADING DATA") | |
| print("="*50) | |
| df = load_data(args.input) | |
| print(f"Data loaded: {df.shape[0]} rows, {df.shape[1]} columns") | |
| # Log basic data stats | |
| mlflow.log_param("dataset_rows", df.shape[0]) | |
| mlflow.log_param("dataset_columns", df.shape[1]) | |
| # === STAGE 2: Data Preprocessing === | |
| print("\n" + "="*50) | |
| print("STAGE 2: DATA PREPROCESSING") | |
| print("="*50) | |
| df = preprocess_data(df) | |
| # Save processed dataset | |
| processed_path = os.path.join(project_root, "data", "processed", "telco_churn_processed.csv") | |
| os.makedirs(os.path.dirname(processed_path), exist_ok=True) | |
| df.to_csv(processed_path, index=False) | |
| print(f"Processed dataset saved to {processed_path}") | |
| # === STAGE 3: Feature Engineering === | |
| print("\n" + "="*50) | |
| print("STAGE 3: FEATURE ENGINEERING") | |
| print("="*50) | |
| target = args.target | |
| df_features = build_features(df, target) | |
| # Separate features and target | |
| feature_columns = [col for col in df_features.columns if col != target] | |
| X = df_features[feature_columns] | |
| y = df_features[target].map({'No': 0, 'Yes': 1}) # Convert to numeric | |
| print(f"Features built: {X.shape[1]} columns") | |
| print(f"Target distribution: {y.value_counts().to_dict()}") | |
| # Log feature info | |
| mlflow.log_param("feature_count", X.shape[1]) | |
| mlflow.log_param("target_positive_rate", y.mean()) | |
| # === STAGE 4: Train/Test Split === | |
| print("\n" + "="*50) | |
| print("STAGE 4: TRAIN/TEST SPLIT") | |
| print("="*50) | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, | |
| test_size=args.test_size, | |
| random_state=42, | |
| stratify=y # Maintain class distribution | |
| ) | |
| print(f"Train set: {X_train.shape[0]} samples") | |
| print(f"Test set: {X_test.shape[0]} samples") | |
| print(f"Train churn rate: {y_train.mean():.3f}") | |
| print(f"Test churn rate: {y_test.mean():.3f}") | |
| # === STAGE 5: Model Training === | |
| print("\n" + "="*50) | |
| print("STAGE 5: MODEL TRAINING") | |
| print("="*50) | |
| # Calculate class weight for imbalance | |
| scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum() | |
| print(f"Class imbalance ratio (negative/positive): {scale_pos_weight:.2f}") | |
| # Create and train XGBoost model directly | |
| model = XGBClassifier( | |
| n_estimators=300, | |
| learning_rate=0.1, | |
| max_depth=6, | |
| random_state=42, | |
| n_jobs=-1, | |
| eval_metric="logloss", | |
| scale_pos_weight=scale_pos_weight | |
| ) | |
| model.fit(X_train, y_train) | |
| print("Model training completed") | |
| # === STAGE 6: Model Evaluation === | |
| print("\n" + "="*50) | |
| print("STAGE 6: MODEL EVALUATION") | |
| print("="*50) | |
| # Calculate predictions and metrics | |
| y_pred = model.predict(X_test) | |
| y_pred_proba = model.predict_proba(X_test)[:, 1] | |
| # Apply threshold for binary predictions | |
| y_pred_thresholded = (y_pred_proba >= args.threshold).astype(int) | |
| # Calculate metrics | |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score | |
| metrics = { | |
| 'accuracy': accuracy_score(y_test, y_pred_thresholded), | |
| 'precision': precision_score(y_test, y_pred_thresholded), | |
| 'recall': recall_score(y_test, y_pred_thresholded), | |
| 'f1': f1_score(y_test, y_pred_thresholded), | |
| 'roc_auc': roc_auc_score(y_test, y_pred_proba) | |
| } | |
| # Print detailed results | |
| print(f"\nModel Performance Metrics (threshold={args.threshold}):") | |
| print(f"Accuracy: {metrics['accuracy']:.3f}") | |
| print(f"Precision: {metrics['precision']:.3f}") | |
| print(f"Recall: {metrics['recall']:.3f}") | |
| print(f"F1-Score: {metrics['f1']:.3f}") | |
| print(f"ROC-AUC: {metrics['roc_auc']:.3f}") | |
| # Also print classification report | |
| print("\nDetailed Classification Report:") | |
| print(classification_report(y_test, y_pred_thresholded)) | |
| # Log metrics to MLflow | |
| for metric_name, metric_value in metrics.items(): | |
| mlflow.log_metric(metric_name, metric_value) | |
| # === STAGE 7: Save Model and Artifacts === | |
| print("\n" + "="*50) | |
| print("STAGE 7: SAVE MODEL AND ARTIFACTS") | |
| print("="*50) | |
| # Save model with joblib | |
| import joblib | |
| artifacts_dir = os.path.join(project_root, "artifacts") | |
| os.makedirs(artifacts_dir, exist_ok=True) | |
| model_path = os.path.join(artifacts_dir, "model.pkl") | |
| joblib.dump(model, model_path) | |
| print(f"Model saved to {model_path}") | |
| # Save feature columns for inference | |
| import json | |
| feature_columns_path = os.path.join(artifacts_dir, "feature_columns.json") | |
| with open(feature_columns_path, 'w') as f: | |
| json.dump(feature_columns, f) | |
| print(f"Feature columns saved to {feature_columns_path}") | |
| # Save threshold for inference | |
| threshold_path = os.path.join(artifacts_dir, "threshold.json") | |
| with open(threshold_path, 'w') as f: | |
| json.dump({"threshold": args.threshold}, f) | |
| print(f"Classification threshold saved to {threshold_path}") | |
| print("\n" + "="*50) | |
| print("PIPELINE COMPLETED SUCCESSFULLY!") | |
| print("="*50) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Run churn prediction pipeline") | |
| parser.add_argument("--input", required=True, help="Path to raw CSV data") | |
| parser.add_argument("--target", default="Churn", help="Target column name") | |
| parser.add_argument("--threshold", type=float, default=0.35, help="Classification threshold") | |
| parser.add_argument("--test-size", type=float, default=0.2, help="Test set proportion") | |
| parser.add_argument("--mlflow-uri", help="MLflow tracking URI") | |
| parser.add_argument("--experiment", default="churn_prediction_simple", help="MLflow experiment name") | |
| args = parser.parse_args() | |
| main(args) | |