telco-churn-predictor / scripts /run_simple_pipeline.py
logan-codes's picture
Add Dockerfile, Gradio app, and core src modules
4ba360f
#!/usr/bin/env python3
"""
Simplified pipeline without Great Expectations validation
"""
import os
import sys
import time
import argparse
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
classification_report, precision_score, recall_score,
f1_score, roc_auc_score
)
from xgboost import XGBClassifier
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
# === Fix import path for local modules ===
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Local modules - Core pipeline components
from src.data.load_data import load_data # Data loading with error handling
from src.data.preprocess import preprocess_data # Basic data cleaning
from src.features.build_features import build_features # Feature engineering
from src.models.train import train_model # Model training
from src.models.evaluate import evaluate_model # Model evaluation
def main(args):
"""Main training pipeline function"""
# === MLflow Setup ===
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
mlruns_path = f"file://{project_root}/mlruns"
mlflow.set_tracking_uri(mlruns_path)
mlflow.set_experiment("churn_prediction_simple")
# Start MLflow run
with mlflow.start_run():
# Log hyperparameters
mlflow.log_param("model", "xgboost")
mlflow.log_param("threshold", args.threshold)
mlflow.log_param("test_size", args.test_size)
print("=== Customer Churn Prediction Pipeline ===")
print(f"Input: {args.input}")
print(f"Target: {args.target}")
# === STAGE 1: Data Loading ===
print("\n" + "="*50)
print("STAGE 1: LOADING DATA")
print("="*50)
df = load_data(args.input)
print(f"Data loaded: {df.shape[0]} rows, {df.shape[1]} columns")
# Log basic data stats
mlflow.log_param("dataset_rows", df.shape[0])
mlflow.log_param("dataset_columns", df.shape[1])
# === STAGE 2: Data Preprocessing ===
print("\n" + "="*50)
print("STAGE 2: DATA PREPROCESSING")
print("="*50)
df = preprocess_data(df)
# Save processed dataset
processed_path = os.path.join(project_root, "data", "processed", "telco_churn_processed.csv")
os.makedirs(os.path.dirname(processed_path), exist_ok=True)
df.to_csv(processed_path, index=False)
print(f"Processed dataset saved to {processed_path}")
# === STAGE 3: Feature Engineering ===
print("\n" + "="*50)
print("STAGE 3: FEATURE ENGINEERING")
print("="*50)
target = args.target
df_features = build_features(df, target)
# Separate features and target
feature_columns = [col for col in df_features.columns if col != target]
X = df_features[feature_columns]
y = df_features[target].map({'No': 0, 'Yes': 1}) # Convert to numeric
print(f"Features built: {X.shape[1]} columns")
print(f"Target distribution: {y.value_counts().to_dict()}")
# Log feature info
mlflow.log_param("feature_count", X.shape[1])
mlflow.log_param("target_positive_rate", y.mean())
# === STAGE 4: Train/Test Split ===
print("\n" + "="*50)
print("STAGE 4: TRAIN/TEST SPLIT")
print("="*50)
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=args.test_size,
random_state=42,
stratify=y # Maintain class distribution
)
print(f"Train set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")
print(f"Train churn rate: {y_train.mean():.3f}")
print(f"Test churn rate: {y_test.mean():.3f}")
# === STAGE 5: Model Training ===
print("\n" + "="*50)
print("STAGE 5: MODEL TRAINING")
print("="*50)
# Calculate class weight for imbalance
scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()
print(f"Class imbalance ratio (negative/positive): {scale_pos_weight:.2f}")
# Create and train XGBoost model directly
model = XGBClassifier(
n_estimators=300,
learning_rate=0.1,
max_depth=6,
random_state=42,
n_jobs=-1,
eval_metric="logloss",
scale_pos_weight=scale_pos_weight
)
model.fit(X_train, y_train)
print("Model training completed")
# === STAGE 6: Model Evaluation ===
print("\n" + "="*50)
print("STAGE 6: MODEL EVALUATION")
print("="*50)
# Calculate predictions and metrics
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Apply threshold for binary predictions
y_pred_thresholded = (y_pred_proba >= args.threshold).astype(int)
# Calculate metrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
metrics = {
'accuracy': accuracy_score(y_test, y_pred_thresholded),
'precision': precision_score(y_test, y_pred_thresholded),
'recall': recall_score(y_test, y_pred_thresholded),
'f1': f1_score(y_test, y_pred_thresholded),
'roc_auc': roc_auc_score(y_test, y_pred_proba)
}
# Print detailed results
print(f"\nModel Performance Metrics (threshold={args.threshold}):")
print(f"Accuracy: {metrics['accuracy']:.3f}")
print(f"Precision: {metrics['precision']:.3f}")
print(f"Recall: {metrics['recall']:.3f}")
print(f"F1-Score: {metrics['f1']:.3f}")
print(f"ROC-AUC: {metrics['roc_auc']:.3f}")
# Also print classification report
print("\nDetailed Classification Report:")
print(classification_report(y_test, y_pred_thresholded))
# Log metrics to MLflow
for metric_name, metric_value in metrics.items():
mlflow.log_metric(metric_name, metric_value)
# === STAGE 7: Save Model and Artifacts ===
print("\n" + "="*50)
print("STAGE 7: SAVE MODEL AND ARTIFACTS")
print("="*50)
# Save model with joblib
import joblib
artifacts_dir = os.path.join(project_root, "artifacts")
os.makedirs(artifacts_dir, exist_ok=True)
model_path = os.path.join(artifacts_dir, "model.pkl")
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")
# Save feature columns for inference
import json
feature_columns_path = os.path.join(artifacts_dir, "feature_columns.json")
with open(feature_columns_path, 'w') as f:
json.dump(feature_columns, f)
print(f"Feature columns saved to {feature_columns_path}")
# Save threshold for inference
threshold_path = os.path.join(artifacts_dir, "threshold.json")
with open(threshold_path, 'w') as f:
json.dump({"threshold": args.threshold}, f)
print(f"Classification threshold saved to {threshold_path}")
print("\n" + "="*50)
print("PIPELINE COMPLETED SUCCESSFULLY!")
print("="*50)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run churn prediction pipeline")
parser.add_argument("--input", required=True, help="Path to raw CSV data")
parser.add_argument("--target", default="Churn", help="Target column name")
parser.add_argument("--threshold", type=float, default=0.35, help="Classification threshold")
parser.add_argument("--test-size", type=float, default=0.2, help="Test set proportion")
parser.add_argument("--mlflow-uri", help="MLflow tracking URI")
parser.add_argument("--experiment", default="churn_prediction_simple", help="MLflow experiment name")
args = parser.parse_args()
main(args)