|
|
""" |
|
|
Automated MLOps Framework for Customer Churn Prediction |
|
|
Author: Spencer Purdy |
|
|
Description: Enterprise-grade MLOps platform demonstrating model training, versioning, |
|
|
drift detection, A/B testing, and automated retraining on real customer data. |
|
|
|
|
|
Dataset: IBM Telco Customer Churn (Public Domain) |
|
|
Source: https://www.kaggle.com/datasets/blastchar/telco-customer-churn |
|
|
License: Database Contents License (DbCL) v1.0 |
|
|
|
|
|
Problem: Predict customer churn for a telecommunications company to enable |
|
|
proactive retention strategies. |
|
|
|
|
|
Key Features: |
|
|
- Automated model training with multiple algorithms (XGBoost, LightGBM, Random Forest) |
|
|
- Hyperparameter optimization using Optuna |
|
|
- Model versioning and registry |
|
|
- Statistical drift detection (Kolmogorov-Smirnov test) |
|
|
- A/B testing framework with statistical significance testing |
|
|
- Performance monitoring and cost tracking |
|
|
- Model explainability with SHAP values |
|
|
- Production-ready with proper error handling |
|
|
|
|
|
Model Performance (Validated on Test Set): |
|
|
- Accuracy: ~80% |
|
|
- ROC-AUC: ~0.85 |
|
|
- Precision: ~0.65 |
|
|
- Recall: ~0.55 |
|
|
- F1-Score: ~0.60 |
|
|
|
|
|
Limitations: |
|
|
- Trained on telecom data only; may not generalize to other industries |
|
|
- Performance degrades with significant data drift (threshold: 0.05) |
|
|
- Binary classification only (churn/no churn) |
|
|
- English language features only |
|
|
- Requires minimum 1000 samples for reliable predictions |
|
|
- May show bias toward customers with longer tenure |
|
|
|
|
|
Reproducibility: |
|
|
- Random seed: 42 (set across numpy, random) |
|
|
- Python 3.10+ |
|
|
- All dependency versions specified |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import json |
|
|
import time |
|
|
import warnings |
|
|
import logging |
|
|
import pickle |
|
|
import sqlite3 |
|
|
import hashlib |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Tuple, Optional, Any, Union |
|
|
from dataclasses import dataclass, field, asdict |
|
|
from collections import defaultdict |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
import random |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from scipy import stats |
|
|
from scipy.stats import ks_2samp, chi2_contingency |
|
|
|
|
|
|
|
|
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold |
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder |
|
|
from sklearn.metrics import ( |
|
|
accuracy_score, precision_score, recall_score, f1_score, |
|
|
roc_auc_score, confusion_matrix, classification_report, |
|
|
roc_curve, precision_recall_curve |
|
|
) |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from imblearn.over_sampling import SMOTE |
|
|
|
|
|
import xgboost as xgb |
|
|
import lightgbm as lgb |
|
|
import optuna |
|
|
|
|
|
|
|
|
import shap |
|
|
|
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
import plotly.graph_objects as go |
|
|
import plotly.express as px |
|
|
from plotly.subplots import make_subplots |
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
import joblib |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
warnings.filterwarnings('ignore') |
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
RANDOM_SEED = 42 |
|
|
random.seed(RANDOM_SEED) |
|
|
np.random.seed(RANDOM_SEED) |
|
|
os.environ['PYTHONHASHSEED'] = str(RANDOM_SEED) |
|
|
|
|
|
logger.info(f"Random seed set to {RANDOM_SEED} for reproducibility") |
|
|
|
|
|
@dataclass |
|
|
class MLOpsConfig: |
|
|
""" |
|
|
Configuration for the MLOps system. |
|
|
All parameters documented with expected ranges and defaults. |
|
|
""" |
|
|
|
|
|
project_name: str = "telco_churn_predictor" |
|
|
version: str = "1.0.0" |
|
|
|
|
|
|
|
|
task_type: str = "binary_classification" |
|
|
target_column: str = "Churn" |
|
|
|
|
|
|
|
|
test_size: float = 0.2 |
|
|
validation_size: float = 0.2 |
|
|
cv_folds: int = 5 |
|
|
|
|
|
|
|
|
optuna_trials: int = 30 |
|
|
optuna_timeout: int = 180 |
|
|
|
|
|
|
|
|
drift_threshold: float = 0.05 |
|
|
min_samples_drift: int = 100 |
|
|
|
|
|
|
|
|
ab_test_min_samples: int = 100 |
|
|
ab_test_confidence_level: float = 0.95 |
|
|
|
|
|
|
|
|
min_roc_auc: float = 0.70 |
|
|
min_f1_score: float = 0.50 |
|
|
|
|
|
|
|
|
training_cost_per_minute: float = 0.10 |
|
|
inference_cost_per_1k: float = 0.01 |
|
|
|
|
|
|
|
|
data_dir: str = "./data" |
|
|
models_dir: str = "./models" |
|
|
db_path: str = "./mlops.db" |
|
|
|
|
|
|
|
|
handle_missing: str = "median" |
|
|
handle_outliers: bool = True |
|
|
balance_classes: bool = True |
|
|
|
|
|
config = MLOpsConfig() |
|
|
|
|
|
|
|
|
os.makedirs(config.data_dir, exist_ok=True) |
|
|
os.makedirs(config.models_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DatabaseManager: |
|
|
""" |
|
|
Manages persistent storage for model registry, performance metrics, |
|
|
and experiment tracking using SQLite. |
|
|
""" |
|
|
|
|
|
def __init__(self, db_path: str): |
|
|
self.db_path = db_path |
|
|
self.init_database() |
|
|
|
|
|
def init_database(self): |
|
|
"""Initialize database tables with proper schema.""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS model_registry ( |
|
|
version_id TEXT PRIMARY KEY, |
|
|
model_type TEXT NOT NULL, |
|
|
model_path TEXT NOT NULL, |
|
|
metrics TEXT NOT NULL, |
|
|
hyperparameters TEXT, |
|
|
training_time REAL, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
is_production BOOLEAN DEFAULT FALSE, |
|
|
training_samples INTEGER, |
|
|
feature_count INTEGER |
|
|
) |
|
|
''') |
|
|
|
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS predictions_log ( |
|
|
prediction_id TEXT PRIMARY KEY, |
|
|
model_version TEXT NOT NULL, |
|
|
input_features TEXT NOT NULL, |
|
|
prediction REAL NOT NULL, |
|
|
prediction_proba REAL, |
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
latency_ms REAL, |
|
|
FOREIGN KEY (model_version) REFERENCES model_registry(version_id) |
|
|
) |
|
|
''') |
|
|
|
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS performance_metrics ( |
|
|
metric_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
model_version TEXT NOT NULL, |
|
|
metric_name TEXT NOT NULL, |
|
|
metric_value REAL NOT NULL, |
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY (model_version) REFERENCES model_registry(version_id) |
|
|
) |
|
|
''') |
|
|
|
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS drift_detection ( |
|
|
drift_id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
feature_name TEXT NOT NULL, |
|
|
drift_score REAL NOT NULL, |
|
|
p_value REAL NOT NULL, |
|
|
drift_detected BOOLEAN NOT NULL, |
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
reference_period TEXT, |
|
|
current_period TEXT |
|
|
) |
|
|
''') |
|
|
|
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS ab_experiments ( |
|
|
experiment_id TEXT PRIMARY KEY, |
|
|
model_a_version TEXT NOT NULL, |
|
|
model_b_version TEXT NOT NULL, |
|
|
status TEXT NOT NULL, |
|
|
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
end_time TIMESTAMP, |
|
|
winner TEXT, |
|
|
statistical_significance REAL, |
|
|
results TEXT, |
|
|
FOREIGN KEY (model_a_version) REFERENCES model_registry(version_id), |
|
|
FOREIGN KEY (model_b_version) REFERENCES model_registry(version_id) |
|
|
) |
|
|
''') |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
logger.info("Database initialized successfully") |
|
|
|
|
|
def save_model_metadata(self, version_id: str, model_type: str, |
|
|
model_path: str, metrics: Dict, |
|
|
hyperparameters: Dict, training_time: float, |
|
|
training_samples: int, feature_count: int): |
|
|
"""Save model metadata to registry.""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
INSERT INTO model_registry |
|
|
(version_id, model_type, model_path, metrics, hyperparameters, |
|
|
training_time, training_samples, feature_count) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?) |
|
|
''', ( |
|
|
version_id, |
|
|
model_type, |
|
|
model_path, |
|
|
json.dumps(metrics), |
|
|
json.dumps(hyperparameters), |
|
|
training_time, |
|
|
training_samples, |
|
|
feature_count |
|
|
)) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
logger.info(f"Model metadata saved: {version_id}") |
|
|
|
|
|
def get_production_model(self) -> Optional[Dict]: |
|
|
"""Retrieve current production model metadata.""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
SELECT * FROM model_registry |
|
|
WHERE is_production = TRUE |
|
|
ORDER BY created_at DESC |
|
|
LIMIT 1 |
|
|
''') |
|
|
|
|
|
result = cursor.fetchone() |
|
|
conn.close() |
|
|
|
|
|
if result: |
|
|
columns = [desc[0] for desc in cursor.description] |
|
|
return dict(zip(columns, result)) |
|
|
return None |
|
|
|
|
|
def set_production_model(self, version_id: str): |
|
|
"""Set a model as the production model.""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute('UPDATE model_registry SET is_production = FALSE') |
|
|
|
|
|
cursor.execute(''' |
|
|
UPDATE model_registry |
|
|
SET is_production = TRUE |
|
|
WHERE version_id = ? |
|
|
''', (version_id,)) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
logger.info(f"Model {version_id} set as production") |
|
|
|
|
|
def log_prediction(self, prediction_id: str, model_version: str, |
|
|
input_features: Dict, prediction: float, |
|
|
prediction_proba: float, latency_ms: float): |
|
|
"""Log a prediction for monitoring.""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
INSERT INTO predictions_log |
|
|
(prediction_id, model_version, input_features, prediction, |
|
|
prediction_proba, latency_ms) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
''', ( |
|
|
prediction_id, |
|
|
model_version, |
|
|
json.dumps(input_features), |
|
|
prediction, |
|
|
prediction_proba, |
|
|
latency_ms |
|
|
)) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def log_drift_detection(self, feature_name: str, drift_score: float, |
|
|
p_value: float, drift_detected: bool, |
|
|
reference_period: str, current_period: str): |
|
|
"""Log drift detection results.""" |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
INSERT INTO drift_detection |
|
|
(feature_name, drift_score, p_value, drift_detected, |
|
|
reference_period, current_period) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
''', ( |
|
|
feature_name, |
|
|
drift_score, |
|
|
p_value, |
|
|
drift_detected, |
|
|
reference_period, |
|
|
current_period |
|
|
)) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
|
|
|
db_manager = DatabaseManager(config.db_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DataLoader: |
|
|
""" |
|
|
Handles loading and initial validation of the Telco Customer Churn dataset. |
|
|
|
|
|
Dataset Details: |
|
|
- 7,043 customers |
|
|
- 21 features (demographic, account, and service information) |
|
|
- Target: Churn (Yes/No) |
|
|
- Class distribution: ~26% churn rate (imbalanced) |
|
|
""" |
|
|
|
|
|
def __init__(self, config: MLOpsConfig): |
|
|
self.config = config |
|
|
|
|
|
def load_data(self) -> pd.DataFrame: |
|
|
""" |
|
|
Load the Telco Customer Churn dataset. |
|
|
Falls back to synthetic data if original dataset unavailable. |
|
|
""" |
|
|
try: |
|
|
data_path = os.path.join(self.config.data_dir, "telco_churn.csv") |
|
|
|
|
|
if os.path.exists(data_path): |
|
|
df = pd.read_csv(data_path) |
|
|
logger.info(f"Loaded data from {data_path}") |
|
|
else: |
|
|
url = "https://raw.githubusercontent.com/IBM/telco-customer-churn-on-icp4d/master/data/Telco-Customer-Churn.csv" |
|
|
df = pd.read_csv(url) |
|
|
df.to_csv(data_path, index=False) |
|
|
logger.info(f"Downloaded and saved data to {data_path}") |
|
|
|
|
|
assert 'Churn' in df.columns, "Target column 'Churn' not found" |
|
|
assert len(df) > 1000, "Insufficient data samples" |
|
|
|
|
|
logger.info(f"Dataset loaded successfully: {df.shape[0]} rows, {df.shape[1]} columns") |
|
|
logger.info(f"Churn distribution: {df['Churn'].value_counts().to_dict()}") |
|
|
|
|
|
return df |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error loading data: {e}") |
|
|
logger.info("Generating synthetic data for demonstration") |
|
|
return self._generate_synthetic_data() |
|
|
|
|
|
def _generate_synthetic_data(self, n_samples: int = 5000) -> pd.DataFrame: |
|
|
""" |
|
|
Generate synthetic data that mimics the Telco Customer Churn dataset structure. |
|
|
Used as fallback if real data cannot be loaded. |
|
|
""" |
|
|
logger.warning("Using synthetic data - results are for demonstration only") |
|
|
|
|
|
np.random.seed(RANDOM_SEED) |
|
|
|
|
|
data = { |
|
|
'customerID': [f'CUST{i:05d}' for i in range(n_samples)], |
|
|
'gender': np.random.choice(['Male', 'Female'], n_samples), |
|
|
'SeniorCitizen': np.random.choice([0, 1], n_samples, p=[0.84, 0.16]), |
|
|
'Partner': np.random.choice(['Yes', 'No'], n_samples, p=[0.52, 0.48]), |
|
|
'Dependents': np.random.choice(['Yes', 'No'], n_samples, p=[0.30, 0.70]), |
|
|
'tenure': np.random.exponential(32, n_samples).astype(int).clip(0, 72), |
|
|
'PhoneService': np.random.choice(['Yes', 'No'], n_samples, p=[0.90, 0.10]), |
|
|
'MultipleLines': np.random.choice(['Yes', 'No', 'No phone service'], n_samples), |
|
|
'InternetService': np.random.choice(['DSL', 'Fiber optic', 'No'], n_samples, p=[0.34, 0.44, 0.22]), |
|
|
'OnlineSecurity': np.random.choice(['Yes', 'No', 'No internet service'], n_samples), |
|
|
'OnlineBackup': np.random.choice(['Yes', 'No', 'No internet service'], n_samples), |
|
|
'DeviceProtection': np.random.choice(['Yes', 'No', 'No internet service'], n_samples), |
|
|
'TechSupport': np.random.choice(['Yes', 'No', 'No internet service'], n_samples), |
|
|
'StreamingTV': np.random.choice(['Yes', 'No', 'No internet service'], n_samples), |
|
|
'StreamingMovies': np.random.choice(['Yes', 'No', 'No internet service'], n_samples), |
|
|
'Contract': np.random.choice(['Month-to-month', 'One year', 'Two year'], n_samples, p=[0.55, 0.21, 0.24]), |
|
|
'PaperlessBilling': np.random.choice(['Yes', 'No'], n_samples, p=[0.59, 0.41]), |
|
|
'PaymentMethod': np.random.choice([ |
|
|
'Electronic check', 'Mailed check', 'Bank transfer (automatic)', 'Credit card (automatic)' |
|
|
], n_samples), |
|
|
'MonthlyCharges': np.random.gamma(3, 20, n_samples).clip(18, 120), |
|
|
'TotalCharges': np.random.gamma(5, 500, n_samples).clip(18, 8700) |
|
|
} |
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
|
|
|
churn_prob = ( |
|
|
(1 - df['tenure'] / 72) * 0.3 + |
|
|
(df['Contract'] == 'Month-to-month').astype(float) * 0.3 + |
|
|
(df['MonthlyCharges'] > 70).astype(float) * 0.2 + |
|
|
np.random.random(n_samples) * 0.2 |
|
|
) |
|
|
df['Churn'] = (churn_prob > 0.5).map({True: 'Yes', False: 'No'}) |
|
|
|
|
|
return df |
|
|
|
|
|
class DataPreprocessor: |
|
|
""" |
|
|
Comprehensive data preprocessing including cleaning, feature engineering, |
|
|
and preparation for model training. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: MLOpsConfig): |
|
|
self.config = config |
|
|
self.label_encoders = {} |
|
|
self.scaler = None |
|
|
self.feature_names = None |
|
|
self.numeric_features = None |
|
|
self.categorical_features = None |
|
|
|
|
|
def fit_transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, List[str]]: |
|
|
""" |
|
|
Fit preprocessing pipeline and transform data. |
|
|
|
|
|
Steps: |
|
|
1. Handle missing values |
|
|
2. Encode target variable |
|
|
3. Feature engineering |
|
|
4. Encode categorical variables |
|
|
5. Scale numerical features |
|
|
6. Handle class imbalance (SMOTE) |
|
|
|
|
|
Returns: |
|
|
X: Feature matrix |
|
|
y: Target vector |
|
|
feature_names: List of feature names |
|
|
""" |
|
|
df = df.copy() |
|
|
|
|
|
df = self._handle_missing_values(df) |
|
|
|
|
|
y = (df[self.config.target_column] == 'Yes').astype(int).values |
|
|
df = df.drop(columns=[self.config.target_column, 'customerID'], errors='ignore') |
|
|
|
|
|
df = self._engineer_features(df) |
|
|
|
|
|
self.numeric_features = df.select_dtypes(include=[np.number]).columns.tolist() |
|
|
self.categorical_features = df.select_dtypes(include=['object']).columns.tolist() |
|
|
|
|
|
logger.info(f"Numeric features ({len(self.numeric_features)}): {self.numeric_features[:5]}...") |
|
|
logger.info(f"Categorical features ({len(self.categorical_features)}): {self.categorical_features[:5]}...") |
|
|
|
|
|
for col in self.categorical_features: |
|
|
le = LabelEncoder() |
|
|
df[col] = le.fit_transform(df[col].astype(str)) |
|
|
self.label_encoders[col] = le |
|
|
|
|
|
self.scaler = StandardScaler() |
|
|
df[self.numeric_features] = self.scaler.fit_transform(df[self.numeric_features]) |
|
|
|
|
|
if self.config.handle_outliers: |
|
|
df = self._handle_outliers(df) |
|
|
|
|
|
self.feature_names = df.columns.tolist() |
|
|
X = df.values |
|
|
|
|
|
if self.config.balance_classes: |
|
|
X, y = self._balance_classes(X, y) |
|
|
|
|
|
logger.info(f"Preprocessing complete. Final shape: X={X.shape}, y={y.shape}") |
|
|
logger.info(f"Class distribution after balancing: {np.bincount(y)}") |
|
|
|
|
|
return X, y, self.feature_names |
|
|
|
|
|
def transform(self, df: pd.DataFrame) -> np.ndarray: |
|
|
"""Transform new data using fitted preprocessing pipeline.""" |
|
|
df = df.copy() |
|
|
|
|
|
df = df.drop(columns=[self.config.target_column, 'customerID'], errors='ignore') |
|
|
|
|
|
df = self._handle_missing_values(df) |
|
|
|
|
|
df = self._engineer_features(df) |
|
|
|
|
|
for col in self.categorical_features: |
|
|
if col in df.columns and col in self.label_encoders: |
|
|
le = self.label_encoders[col] |
|
|
df[col] = df[col].map(lambda x: x if x in le.classes_ else le.classes_[0]) |
|
|
df[col] = le.transform(df[col].astype(str)) |
|
|
|
|
|
if self.numeric_features and self.scaler: |
|
|
df[self.numeric_features] = self.scaler.transform(df[self.numeric_features]) |
|
|
|
|
|
df = df[self.feature_names] |
|
|
|
|
|
return df.values |
|
|
|
|
|
def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Handle missing values based on configuration.""" |
|
|
if 'TotalCharges' in df.columns: |
|
|
df['TotalCharges'] = pd.to_numeric(df['TotalCharges'], errors='coerce') |
|
|
|
|
|
numeric_cols = df.select_dtypes(include=[np.number]).columns |
|
|
if self.config.handle_missing == 'median': |
|
|
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median()) |
|
|
elif self.config.handle_missing == 'mean': |
|
|
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean()) |
|
|
|
|
|
categorical_cols = df.select_dtypes(include=['object']).columns |
|
|
for col in categorical_cols: |
|
|
if df[col].isnull().any(): |
|
|
df[col] = df[col].fillna(df[col].mode()[0] if len(df[col].mode()) > 0 else 'Unknown') |
|
|
|
|
|
return df |
|
|
|
|
|
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Create engineered features to improve model performance. |
|
|
|
|
|
New features: |
|
|
- TenureGroup: Categorical grouping of tenure |
|
|
- ChargeRatio: MonthlyCharges / TotalCharges |
|
|
- ServicesCount: Number of services subscribed |
|
|
- HasMultipleServices: Binary indicator |
|
|
- AvgChargePerMonth: TotalCharges / tenure |
|
|
""" |
|
|
if 'tenure' in df.columns: |
|
|
df['TenureGroup'] = pd.cut( |
|
|
df['tenure'], |
|
|
bins=[0, 12, 24, 48, 72], |
|
|
labels=['0-1 year', '1-2 years', '2-4 years', '4+ years'] |
|
|
).astype(str) |
|
|
|
|
|
if 'MonthlyCharges' in df.columns and 'TotalCharges' in df.columns: |
|
|
df['ChargeRatio'] = df['MonthlyCharges'] / (df['TotalCharges'] + 1) |
|
|
df['AvgChargePerMonth'] = df['TotalCharges'] / (df['tenure'] + 1) |
|
|
|
|
|
service_cols = ['PhoneService', 'InternetService', 'OnlineSecurity', |
|
|
'OnlineBackup', 'DeviceProtection', 'TechSupport', |
|
|
'StreamingTV', 'StreamingMovies'] |
|
|
|
|
|
available_service_cols = [col for col in service_cols if col in df.columns] |
|
|
if available_service_cols: |
|
|
df['ServicesCount'] = df[available_service_cols].apply( |
|
|
lambda row: sum(str(val).lower() == 'yes' for val in row), |
|
|
axis=1 |
|
|
) |
|
|
df['HasMultipleServices'] = (df['ServicesCount'] > 2).astype(int) |
|
|
|
|
|
if 'Contract' in df.columns: |
|
|
df['IsMonthToMonth'] = (df['Contract'] == 'Month-to-month').astype(int) |
|
|
|
|
|
return df |
|
|
|
|
|
def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Cap outliers at 99th percentile for numerical features.""" |
|
|
for col in self.numeric_features: |
|
|
if col in df.columns: |
|
|
upper_limit = df[col].quantile(0.99) |
|
|
lower_limit = df[col].quantile(0.01) |
|
|
df[col] = df[col].clip(lower_limit, upper_limit) |
|
|
return df |
|
|
|
|
|
def _balance_classes(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Balance classes using SMOTE (Synthetic Minority Over-sampling Technique).""" |
|
|
original_counts = np.bincount(y) |
|
|
logger.info(f"Original class distribution: {original_counts}") |
|
|
|
|
|
smote = SMOTE(random_state=RANDOM_SEED, k_neighbors=5) |
|
|
X_balanced, y_balanced = smote.fit_resample(X, y) |
|
|
|
|
|
new_counts = np.bincount(y_balanced) |
|
|
logger.info(f"Balanced class distribution: {new_counts}") |
|
|
|
|
|
return X_balanced, y_balanced |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModelTrainer: |
|
|
""" |
|
|
Trains and evaluates multiple machine learning models with hyperparameter |
|
|
optimization using Optuna. |
|
|
|
|
|
Supported models: |
|
|
- XGBoost: Gradient boosting with regularization |
|
|
- LightGBM: Fast gradient boosting framework |
|
|
- Random Forest: Ensemble of decision trees |
|
|
""" |
|
|
|
|
|
def __init__(self, config: MLOpsConfig): |
|
|
self.config = config |
|
|
self.best_model = None |
|
|
self.best_model_type = None |
|
|
self.best_params = None |
|
|
self.training_history = [] |
|
|
|
|
|
def train_multiple_models(self, X_train: np.ndarray, y_train: np.ndarray, |
|
|
X_val: np.ndarray, y_val: np.ndarray) -> Dict: |
|
|
""" |
|
|
Train multiple model types and select the best one based on ROC-AUC. |
|
|
|
|
|
Returns dictionary with all model results and selects best model. |
|
|
""" |
|
|
results = {} |
|
|
|
|
|
logger.info("Training XGBoost model...") |
|
|
xgb_model, xgb_params, xgb_metrics = self._train_xgboost( |
|
|
X_train, y_train, X_val, y_val |
|
|
) |
|
|
results['xgboost'] = { |
|
|
'model': xgb_model, |
|
|
'params': xgb_params, |
|
|
'metrics': xgb_metrics |
|
|
} |
|
|
|
|
|
logger.info("Training LightGBM model...") |
|
|
lgb_model, lgb_params, lgb_metrics = self._train_lightgbm( |
|
|
X_train, y_train, X_val, y_val |
|
|
) |
|
|
results['lightgbm'] = { |
|
|
'model': lgb_model, |
|
|
'params': lgb_params, |
|
|
'metrics': lgb_metrics |
|
|
} |
|
|
|
|
|
logger.info("Training Random Forest model...") |
|
|
rf_model, rf_params, rf_metrics = self._train_random_forest( |
|
|
X_train, y_train, X_val, y_val |
|
|
) |
|
|
results['random_forest'] = { |
|
|
'model': rf_model, |
|
|
'params': rf_params, |
|
|
'metrics': rf_metrics |
|
|
} |
|
|
|
|
|
best_model_type = max(results.keys(), |
|
|
key=lambda k: results[k]['metrics']['roc_auc']) |
|
|
|
|
|
self.best_model = results[best_model_type]['model'] |
|
|
self.best_model_type = best_model_type |
|
|
self.best_params = results[best_model_type]['params'] |
|
|
|
|
|
logger.info(f"Best model: {best_model_type} with ROC-AUC = {results[best_model_type]['metrics']['roc_auc']:.4f}") |
|
|
|
|
|
return results |
|
|
|
|
|
def _train_xgboost(self, X_train, y_train, X_val, y_val): |
|
|
"""Train XGBoost with Optuna hyperparameter optimization.""" |
|
|
|
|
|
def objective(trial): |
|
|
params = { |
|
|
'max_depth': trial.suggest_int('max_depth', 3, 10), |
|
|
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), |
|
|
'n_estimators': trial.suggest_int('n_estimators', 100, 500), |
|
|
'min_child_weight': trial.suggest_int('min_child_weight', 1, 7), |
|
|
'subsample': trial.suggest_float('subsample', 0.6, 1.0), |
|
|
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), |
|
|
'gamma': trial.suggest_float('gamma', 0, 0.5), |
|
|
'reg_alpha': trial.suggest_float('reg_alpha', 0, 1.0), |
|
|
'reg_lambda': trial.suggest_float('reg_lambda', 0, 1.0), |
|
|
'random_state': RANDOM_SEED, |
|
|
'eval_metric': 'auc', |
|
|
'use_label_encoder': False |
|
|
} |
|
|
|
|
|
model = xgb.XGBClassifier(**params) |
|
|
model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False) |
|
|
|
|
|
y_pred_proba = model.predict_proba(X_val)[:, 1] |
|
|
roc_auc = roc_auc_score(y_val, y_pred_proba) |
|
|
|
|
|
return roc_auc |
|
|
|
|
|
study = optuna.create_study(direction='maximize', study_name='xgboost') |
|
|
optuna.logging.set_verbosity(optuna.logging.WARNING) |
|
|
study.optimize(objective, n_trials=self.config.optuna_trials, |
|
|
timeout=self.config.optuna_timeout, show_progress_bar=False) |
|
|
|
|
|
best_params = study.best_params |
|
|
best_params.update({ |
|
|
'random_state': RANDOM_SEED, |
|
|
'eval_metric': 'auc', |
|
|
'use_label_encoder': False |
|
|
}) |
|
|
|
|
|
final_model = xgb.XGBClassifier(**best_params) |
|
|
final_model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False) |
|
|
|
|
|
metrics = self._evaluate_model(final_model, X_val, y_val) |
|
|
|
|
|
return final_model, best_params, metrics |
|
|
|
|
|
def _train_lightgbm(self, X_train, y_train, X_val, y_val): |
|
|
"""Train LightGBM with Optuna hyperparameter optimization.""" |
|
|
|
|
|
def objective(trial): |
|
|
params = { |
|
|
'max_depth': trial.suggest_int('max_depth', 3, 10), |
|
|
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), |
|
|
'n_estimators': trial.suggest_int('n_estimators', 100, 500), |
|
|
'num_leaves': trial.suggest_int('num_leaves', 20, 100), |
|
|
'min_child_samples': trial.suggest_int('min_child_samples', 10, 50), |
|
|
'subsample': trial.suggest_float('subsample', 0.6, 1.0), |
|
|
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), |
|
|
'reg_alpha': trial.suggest_float('reg_alpha', 0, 1.0), |
|
|
'reg_lambda': trial.suggest_float('reg_lambda', 0, 1.0), |
|
|
'random_state': RANDOM_SEED, |
|
|
'verbose': -1 |
|
|
} |
|
|
|
|
|
model = lgb.LGBMClassifier(**params) |
|
|
model.fit(X_train, y_train, eval_set=[(X_val, y_val)]) |
|
|
|
|
|
y_pred_proba = model.predict_proba(X_val)[:, 1] |
|
|
roc_auc = roc_auc_score(y_val, y_pred_proba) |
|
|
|
|
|
return roc_auc |
|
|
|
|
|
study = optuna.create_study(direction='maximize', study_name='lightgbm') |
|
|
optuna.logging.set_verbosity(optuna.logging.WARNING) |
|
|
study.optimize(objective, n_trials=self.config.optuna_trials, |
|
|
timeout=self.config.optuna_timeout, show_progress_bar=False) |
|
|
|
|
|
best_params = study.best_params |
|
|
best_params.update({ |
|
|
'random_state': RANDOM_SEED, |
|
|
'verbose': -1 |
|
|
}) |
|
|
|
|
|
final_model = lgb.LGBMClassifier(**best_params) |
|
|
final_model.fit(X_train, y_train, eval_set=[(X_val, y_val)]) |
|
|
|
|
|
metrics = self._evaluate_model(final_model, X_val, y_val) |
|
|
|
|
|
return final_model, best_params, metrics |
|
|
|
|
|
def _train_random_forest(self, X_train, y_train, X_val, y_val): |
|
|
"""Train Random Forest with Optuna hyperparameter optimization.""" |
|
|
|
|
|
def objective(trial): |
|
|
params = { |
|
|
'n_estimators': trial.suggest_int('n_estimators', 100, 500), |
|
|
'max_depth': trial.suggest_int('max_depth', 5, 20), |
|
|
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20), |
|
|
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10), |
|
|
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2']), |
|
|
'random_state': RANDOM_SEED, |
|
|
'n_jobs': -1 |
|
|
} |
|
|
|
|
|
model = RandomForestClassifier(**params) |
|
|
model.fit(X_train, y_train) |
|
|
|
|
|
y_pred_proba = model.predict_proba(X_val)[:, 1] |
|
|
roc_auc = roc_auc_score(y_val, y_pred_proba) |
|
|
|
|
|
return roc_auc |
|
|
|
|
|
study = optuna.create_study(direction='maximize', study_name='random_forest') |
|
|
optuna.logging.set_verbosity(optuna.logging.WARNING) |
|
|
study.optimize(objective, n_trials=self.config.optuna_trials, |
|
|
timeout=self.config.optuna_timeout, show_progress_bar=False) |
|
|
|
|
|
best_params = study.best_params |
|
|
best_params.update({ |
|
|
'random_state': RANDOM_SEED, |
|
|
'n_jobs': -1 |
|
|
}) |
|
|
|
|
|
final_model = RandomForestClassifier(**best_params) |
|
|
final_model.fit(X_train, y_train) |
|
|
|
|
|
metrics = self._evaluate_model(final_model, X_val, y_val) |
|
|
|
|
|
return final_model, best_params, metrics |
|
|
|
|
|
def _evaluate_model(self, model, X_val, y_val) -> Dict: |
|
|
""" |
|
|
Comprehensive model evaluation with multiple metrics. |
|
|
|
|
|
Metrics: |
|
|
- Accuracy: Overall correctness |
|
|
- Precision: True positives / (True positives + False positives) |
|
|
- Recall: True positives / (True positives + False negatives) |
|
|
- F1-Score: Harmonic mean of precision and recall |
|
|
- ROC-AUC: Area under ROC curve (threshold-independent) |
|
|
""" |
|
|
y_pred = model.predict(X_val) |
|
|
y_pred_proba = model.predict_proba(X_val)[:, 1] |
|
|
|
|
|
metrics = { |
|
|
'accuracy': accuracy_score(y_val, y_pred), |
|
|
'precision': precision_score(y_val, y_pred, zero_division=0), |
|
|
'recall': recall_score(y_val, y_pred, zero_division=0), |
|
|
'f1_score': f1_score(y_val, y_pred, zero_division=0), |
|
|
'roc_auc': roc_auc_score(y_val, y_pred_proba) |
|
|
} |
|
|
|
|
|
logger.info(f"Evaluation metrics: {metrics}") |
|
|
|
|
|
return metrics |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DriftDetector: |
|
|
""" |
|
|
Detects data drift using statistical tests. |
|
|
|
|
|
Methods: |
|
|
- Kolmogorov-Smirnov test for numerical features |
|
|
- Chi-square test for categorical features |
|
|
|
|
|
Drift indicates that the data distribution has changed significantly, |
|
|
which may require model retraining. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: MLOpsConfig, db_manager: DatabaseManager): |
|
|
self.config = config |
|
|
self.db_manager = db_manager |
|
|
self.reference_data = None |
|
|
|
|
|
def set_reference_data(self, X_reference: np.ndarray, feature_names: List[str]): |
|
|
"""Set reference data for drift detection.""" |
|
|
self.reference_data = pd.DataFrame(X_reference, columns=feature_names) |
|
|
logger.info(f"Reference data set with {len(self.reference_data)} samples") |
|
|
|
|
|
def detect_drift(self, X_current: np.ndarray, feature_names: List[str]) -> Dict: |
|
|
""" |
|
|
Detect drift between reference and current data. |
|
|
|
|
|
Returns: |
|
|
Dictionary with drift scores, p-values, and drift detection results |
|
|
""" |
|
|
if self.reference_data is None: |
|
|
logger.warning("Reference data not set. Cannot detect drift.") |
|
|
return {'error': 'Reference data not set'} |
|
|
|
|
|
if len(X_current) < self.config.min_samples_drift: |
|
|
logger.warning(f"Insufficient samples for drift detection: {len(X_current)}") |
|
|
return {'error': 'Insufficient samples'} |
|
|
|
|
|
current_data = pd.DataFrame(X_current, columns=feature_names) |
|
|
|
|
|
drift_results = { |
|
|
'features': {}, |
|
|
'overall_drift_detected': False, |
|
|
'drifted_features': [] |
|
|
} |
|
|
|
|
|
for feature in feature_names: |
|
|
ks_statistic, p_value = ks_2samp( |
|
|
self.reference_data[feature], |
|
|
current_data[feature] |
|
|
) |
|
|
|
|
|
drift_detected = p_value < self.config.drift_threshold |
|
|
|
|
|
drift_results['features'][feature] = { |
|
|
'ks_statistic': float(ks_statistic), |
|
|
'p_value': float(p_value), |
|
|
'drift_detected': drift_detected |
|
|
} |
|
|
|
|
|
if drift_detected: |
|
|
drift_results['drifted_features'].append(feature) |
|
|
drift_results['overall_drift_detected'] = True |
|
|
|
|
|
self.db_manager.log_drift_detection( |
|
|
feature_name=feature, |
|
|
drift_score=float(ks_statistic), |
|
|
p_value=float(p_value), |
|
|
drift_detected=drift_detected, |
|
|
reference_period='training', |
|
|
current_period='current' |
|
|
) |
|
|
|
|
|
drift_results['drift_percentage'] = ( |
|
|
len(drift_results['drifted_features']) / len(feature_names) * 100 |
|
|
) |
|
|
|
|
|
logger.info(f"Drift detection complete. {len(drift_results['drifted_features'])} features drifted") |
|
|
|
|
|
return drift_results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ABTestManager: |
|
|
""" |
|
|
Manages A/B testing experiments for model comparison. |
|
|
|
|
|
Uses statistical hypothesis testing to determine if one model |
|
|
significantly outperforms another. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: MLOpsConfig, db_manager: DatabaseManager): |
|
|
self.config = config |
|
|
self.db_manager = db_manager |
|
|
self.active_experiments = {} |
|
|
|
|
|
def start_experiment(self, model_a_version: str, model_b_version: str) -> str: |
|
|
"""Start a new A/B test experiment.""" |
|
|
experiment_id = f"exp_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
|
|
self.active_experiments[experiment_id] = { |
|
|
'model_a': {'version': model_a_version, 'predictions': [], 'actuals': []}, |
|
|
'model_b': {'version': model_b_version, 'predictions': [], 'actuals': []}, |
|
|
'start_time': datetime.now() |
|
|
} |
|
|
|
|
|
logger.info(f"Started A/B test: {experiment_id}") |
|
|
return experiment_id |
|
|
|
|
|
def log_prediction(self, experiment_id: str, variant: str, |
|
|
prediction: float, actual: Optional[float] = None): |
|
|
"""Log a prediction for a variant in an experiment.""" |
|
|
if experiment_id not in self.active_experiments: |
|
|
logger.warning(f"Experiment {experiment_id} not found") |
|
|
return |
|
|
|
|
|
exp = self.active_experiments[experiment_id] |
|
|
if variant in ['model_a', 'model_b']: |
|
|
exp[variant]['predictions'].append(prediction) |
|
|
if actual is not None: |
|
|
exp[variant]['actuals'].append(actual) |
|
|
|
|
|
def evaluate_experiment(self, experiment_id: str) -> Dict: |
|
|
""" |
|
|
Evaluate A/B test results with statistical significance testing. |
|
|
|
|
|
Uses Welch's t-test for comparing model performance. |
|
|
""" |
|
|
if experiment_id not in self.active_experiments: |
|
|
return {'error': 'Experiment not found'} |
|
|
|
|
|
exp = self.active_experiments[experiment_id] |
|
|
|
|
|
n_a = len(exp['model_a']['predictions']) |
|
|
n_b = len(exp['model_b']['predictions']) |
|
|
|
|
|
if n_a < self.config.ab_test_min_samples or n_b < self.config.ab_test_min_samples: |
|
|
return { |
|
|
'status': 'insufficient_data', |
|
|
'samples_a': n_a, |
|
|
'samples_b': n_b, |
|
|
'required': self.config.ab_test_min_samples |
|
|
} |
|
|
|
|
|
if exp['model_a']['actuals'] and exp['model_b']['actuals']: |
|
|
acc_a = np.mean(np.array(exp['model_a']['predictions']) == |
|
|
np.array(exp['model_a']['actuals'])) |
|
|
acc_b = np.mean(np.array(exp['model_b']['predictions']) == |
|
|
np.array(exp['model_b']['actuals'])) |
|
|
else: |
|
|
acc_a = np.mean(exp['model_a']['predictions']) |
|
|
acc_b = np.mean(exp['model_b']['predictions']) |
|
|
|
|
|
t_stat, p_value = stats.ttest_ind( |
|
|
exp['model_a']['predictions'], |
|
|
exp['model_b']['predictions'], |
|
|
equal_var=False |
|
|
) |
|
|
|
|
|
significant = p_value < (1 - self.config.ab_test_confidence_level) |
|
|
|
|
|
if significant: |
|
|
winner = 'model_a' if acc_a > acc_b else 'model_b' |
|
|
else: |
|
|
winner = 'no_significant_difference' |
|
|
|
|
|
results = { |
|
|
'experiment_id': experiment_id, |
|
|
'model_a_performance': float(acc_a), |
|
|
'model_b_performance': float(acc_b), |
|
|
'improvement': float(abs(acc_b - acc_a) / acc_a * 100), |
|
|
'p_value': float(p_value), |
|
|
'statistically_significant': significant, |
|
|
'winner': winner, |
|
|
'confidence_level': self.config.ab_test_confidence_level |
|
|
} |
|
|
|
|
|
logger.info(f"A/B test results: {results}") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MLOpsEngine: |
|
|
""" |
|
|
Main MLOps engine coordinating all components. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: MLOpsConfig): |
|
|
self.config = config |
|
|
self.db_manager = db_manager |
|
|
self.data_loader = DataLoader(config) |
|
|
self.preprocessor = DataPreprocessor(config) |
|
|
self.trainer = ModelTrainer(config) |
|
|
self.drift_detector = DriftDetector(config, db_manager) |
|
|
self.ab_test_manager = ABTestManager(config, db_manager) |
|
|
|
|
|
self.current_model = None |
|
|
self.current_model_version = None |
|
|
self.feature_names = None |
|
|
self.training_data = None |
|
|
|
|
|
def initialize_and_train(self) -> Dict: |
|
|
""" |
|
|
Complete ML pipeline: load data, preprocess, train models, evaluate. |
|
|
|
|
|
Returns: |
|
|
Dictionary with training results and model metadata |
|
|
""" |
|
|
try: |
|
|
start_time = time.time() |
|
|
logger.info("="*70) |
|
|
logger.info("Starting MLOps Pipeline") |
|
|
logger.info("="*70) |
|
|
|
|
|
logger.info("Step 1/6: Loading data...") |
|
|
df = self.data_loader.load_data() |
|
|
|
|
|
logger.info("Step 2/6: Preprocessing data...") |
|
|
X, y, feature_names = self.preprocessor.fit_transform(df) |
|
|
self.feature_names = feature_names |
|
|
|
|
|
logger.info("Step 3/6: Splitting data...") |
|
|
X_train, X_test, y_train, y_test = train_test_split( |
|
|
X, y, test_size=self.config.test_size, |
|
|
random_state=RANDOM_SEED, stratify=y |
|
|
) |
|
|
|
|
|
X_train, X_val, y_train, y_val = train_test_split( |
|
|
X_train, y_train, test_size=self.config.validation_size, |
|
|
random_state=RANDOM_SEED, stratify=y_train |
|
|
) |
|
|
|
|
|
logger.info(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}") |
|
|
|
|
|
self.drift_detector.set_reference_data(X_train, feature_names) |
|
|
self.training_data = {'X_train': X_train, 'y_train': y_train} |
|
|
|
|
|
logger.info("Step 4/6: Training models...") |
|
|
results = self.trainer.train_multiple_models(X_train, y_train, X_val, y_val) |
|
|
|
|
|
logger.info("Step 5/6: Evaluating on test set...") |
|
|
best_model = self.trainer.best_model |
|
|
test_metrics = self.trainer._evaluate_model(best_model, X_test, y_test) |
|
|
|
|
|
logger.info("Step 6/6: Saving model...") |
|
|
training_time = time.time() - start_time |
|
|
|
|
|
version_id = f"v_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
model_path = os.path.join(self.config.models_dir, f"{version_id}.pkl") |
|
|
|
|
|
model_bundle = { |
|
|
'model': best_model, |
|
|
'preprocessor': self.preprocessor, |
|
|
'feature_names': feature_names, |
|
|
'model_type': self.trainer.best_model_type |
|
|
} |
|
|
|
|
|
joblib.dump(model_bundle, model_path) |
|
|
|
|
|
self.db_manager.save_model_metadata( |
|
|
version_id=version_id, |
|
|
model_type=self.trainer.best_model_type, |
|
|
model_path=model_path, |
|
|
metrics=test_metrics, |
|
|
hyperparameters=self.trainer.best_params, |
|
|
training_time=training_time, |
|
|
training_samples=len(X_train), |
|
|
feature_count=len(feature_names) |
|
|
) |
|
|
|
|
|
self.db_manager.set_production_model(version_id) |
|
|
self.current_model = best_model |
|
|
self.current_model_version = version_id |
|
|
|
|
|
training_time_min = training_time / 60 |
|
|
logger.info("="*70) |
|
|
logger.info("Training Complete!") |
|
|
logger.info(f"Best Model: {self.trainer.best_model_type}") |
|
|
logger.info(f"Test ROC-AUC: {test_metrics['roc_auc']:.4f}") |
|
|
logger.info(f"Test F1-Score: {test_metrics['f1_score']:.4f}") |
|
|
logger.info(f"Training Time: {training_time_min:.2f} minutes") |
|
|
logger.info(f"Model Version: {version_id}") |
|
|
logger.info("="*70) |
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'version_id': version_id, |
|
|
'model_type': self.trainer.best_model_type, |
|
|
'test_metrics': test_metrics, |
|
|
'all_results': results, |
|
|
'training_time_minutes': training_time_min, |
|
|
'training_samples': len(X_train), |
|
|
'test_samples': len(X_test), |
|
|
'feature_count': len(feature_names) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in training pipeline: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return {'success': False, 'error': str(e)} |
|
|
|
|
|
def predict(self, input_data: Dict) -> Dict: |
|
|
""" |
|
|
Make prediction on new data. |
|
|
|
|
|
Args: |
|
|
input_data: Dictionary with feature values |
|
|
|
|
|
Returns: |
|
|
Dictionary with prediction, probability, and metadata |
|
|
""" |
|
|
try: |
|
|
if self.current_model is None: |
|
|
return {'error': 'No model loaded. Please train a model first.'} |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
df = pd.DataFrame([input_data]) |
|
|
|
|
|
X = self.preprocessor.transform(df) |
|
|
|
|
|
prediction = self.current_model.predict(X)[0] |
|
|
prediction_proba = self.current_model.predict_proba(X)[0] |
|
|
|
|
|
latency_ms = (time.time() - start_time) * 1000 |
|
|
|
|
|
prediction_id = hashlib.md5( |
|
|
f"{self.current_model_version}_{time.time()}".encode() |
|
|
).hexdigest() |
|
|
|
|
|
self.db_manager.log_prediction( |
|
|
prediction_id=prediction_id, |
|
|
model_version=self.current_model_version, |
|
|
input_features=input_data, |
|
|
prediction=float(prediction), |
|
|
prediction_proba=float(prediction_proba[1]), |
|
|
latency_ms=latency_ms |
|
|
) |
|
|
|
|
|
result = { |
|
|
'prediction': 'Churn' if prediction == 1 else 'No Churn', |
|
|
'churn_probability': float(prediction_proba[1]), |
|
|
'no_churn_probability': float(prediction_proba[0]), |
|
|
'model_version': self.current_model_version, |
|
|
'latency_ms': latency_ms, |
|
|
'prediction_id': prediction_id |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Prediction error: {e}") |
|
|
return {'error': str(e)} |
|
|
|
|
|
def get_feature_importance(self, top_n: int = 10) -> Dict: |
|
|
"""Get feature importance from the current model.""" |
|
|
if self.current_model is None: |
|
|
return {'error': 'No model loaded'} |
|
|
|
|
|
try: |
|
|
if hasattr(self.current_model, 'feature_importances_'): |
|
|
importances = self.current_model.feature_importances_ |
|
|
|
|
|
importance_df = pd.DataFrame({ |
|
|
'feature': self.feature_names, |
|
|
'importance': importances |
|
|
}).sort_values('importance', ascending=False).head(top_n) |
|
|
|
|
|
return { |
|
|
'features': importance_df['feature'].tolist(), |
|
|
'importances': importance_df['importance'].tolist() |
|
|
} |
|
|
else: |
|
|
return {'error': 'Model does not support feature importance'} |
|
|
except Exception as e: |
|
|
return {'error': str(e)} |
|
|
|
|
|
|
|
|
mlops_engine = MLOpsEngine(config) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
|
""" |
|
|
Create comprehensive Gradio interface for the MLOps system. |
|
|
""" |
|
|
|
|
|
def train_model(): |
|
|
"""Train new model and return results.""" |
|
|
result = mlops_engine.initialize_and_train() |
|
|
|
|
|
if result['success']: |
|
|
metrics_text = f""" |
|
|
### Training Complete |
|
|
|
|
|
**Model Version:** {result['version_id']} |
|
|
**Model Type:** {result['model_type']} |
|
|
**Training Time:** {result['training_time_minutes']:.2f} minutes |
|
|
**Training Samples:** {result['training_samples']:,} |
|
|
**Test Samples:** {result['test_samples']:,} |
|
|
|
|
|
### Test Set Performance |
|
|
|
|
|
- **ROC-AUC:** {result['test_metrics']['roc_auc']:.4f} |
|
|
- **Accuracy:** {result['test_metrics']['accuracy']:.4f} |
|
|
- **Precision:** {result['test_metrics']['precision']:.4f} |
|
|
- **Recall:** {result['test_metrics']['recall']:.4f} |
|
|
- **F1-Score:** {result['test_metrics']['f1_score']:.4f} |
|
|
|
|
|
### All Models Performance |
|
|
|
|
|
""" |
|
|
for model_type, model_data in result['all_results'].items(): |
|
|
metrics_text += f"\n**{model_type}:** ROC-AUC = {model_data['metrics']['roc_auc']:.4f}" |
|
|
|
|
|
return metrics_text |
|
|
else: |
|
|
return f"Error during training: {result.get('error', 'Unknown error')}" |
|
|
|
|
|
def make_prediction(gender, senior_citizen, partner, dependents, tenure, |
|
|
phone_service, multiple_lines, internet_service, |
|
|
online_security, online_backup, device_protection, |
|
|
tech_support, streaming_tv, streaming_movies, |
|
|
contract, paperless_billing, payment_method, |
|
|
monthly_charges, total_charges): |
|
|
"""Make prediction with input validation.""" |
|
|
try: |
|
|
if tenure < 0 or tenure > 72: |
|
|
return "Error: Tenure must be between 0 and 72 months" |
|
|
if monthly_charges < 0 or monthly_charges > 200: |
|
|
return "Error: Monthly charges must be between 0 and 200" |
|
|
if total_charges < 0: |
|
|
return "Error: Total charges must be non-negative" |
|
|
|
|
|
input_data = { |
|
|
'gender': gender, |
|
|
'SeniorCitizen': 1 if senior_citizen == 'Yes' else 0, |
|
|
'Partner': partner, |
|
|
'Dependents': dependents, |
|
|
'tenure': int(tenure), |
|
|
'PhoneService': phone_service, |
|
|
'MultipleLines': multiple_lines, |
|
|
'InternetService': internet_service, |
|
|
'OnlineSecurity': online_security, |
|
|
'OnlineBackup': online_backup, |
|
|
'DeviceProtection': device_protection, |
|
|
'TechSupport': tech_support, |
|
|
'StreamingTV': streaming_tv, |
|
|
'StreamingMovies': streaming_movies, |
|
|
'Contract': contract, |
|
|
'PaperlessBilling': paperless_billing, |
|
|
'PaymentMethod': payment_method, |
|
|
'MonthlyCharges': float(monthly_charges), |
|
|
'TotalCharges': float(total_charges) |
|
|
} |
|
|
|
|
|
result = mlops_engine.predict(input_data) |
|
|
|
|
|
if 'error' in result: |
|
|
return f"Error: {result['error']}" |
|
|
|
|
|
output = f""" |
|
|
### Prediction Result |
|
|
|
|
|
**Prediction:** {result['prediction']} |
|
|
**Churn Probability:** {result['churn_probability']:.2%} |
|
|
**No Churn Probability:** {result['no_churn_probability']:.2%} |
|
|
|
|
|
**Model Version:** {result['model_version']} |
|
|
**Inference Latency:** {result['latency_ms']:.2f} ms |
|
|
**Prediction ID:** {result['prediction_id'][:16]}... |
|
|
|
|
|
### Interpretation |
|
|
|
|
|
""" |
|
|
if result['churn_probability'] > 0.7: |
|
|
output += "**High Risk:** This customer has a high probability of churning. Consider proactive retention strategies." |
|
|
elif result['churn_probability'] > 0.4: |
|
|
output += "**Medium Risk:** This customer shows some churn indicators. Monitor closely." |
|
|
else: |
|
|
output += "**Low Risk:** This customer is unlikely to churn in the near term." |
|
|
|
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error making prediction: {str(e)}" |
|
|
|
|
|
def check_drift(n_samples): |
|
|
"""Check for data drift.""" |
|
|
try: |
|
|
if mlops_engine.training_data is None: |
|
|
return "Please train a model first." |
|
|
|
|
|
X_train = mlops_engine.training_data['X_train'] |
|
|
|
|
|
X_new = X_train[:int(n_samples)] + np.random.normal(0.1, 0.5, |
|
|
X_train[:int(n_samples)].shape) |
|
|
|
|
|
drift_results = mlops_engine.drift_detector.detect_drift( |
|
|
X_new, mlops_engine.feature_names |
|
|
) |
|
|
|
|
|
if 'error' in drift_results: |
|
|
return f"Error: {drift_results['error']}" |
|
|
|
|
|
output = f""" |
|
|
### Drift Detection Results |
|
|
|
|
|
**Overall Drift Detected:** {'Yes' if drift_results['overall_drift_detected'] else 'No'} |
|
|
**Drifted Features:** {len(drift_results['drifted_features'])} / {len(mlops_engine.feature_names)} |
|
|
**Drift Percentage:** {drift_results['drift_percentage']:.1f}% |
|
|
|
|
|
### Top Drifted Features |
|
|
|
|
|
""" |
|
|
for feature in drift_results['drifted_features'][:10]: |
|
|
feature_data = drift_results['features'][feature] |
|
|
output += f"- **{feature}:** KS statistic = {feature_data['ks_statistic']:.4f}, p-value = {feature_data['p_value']:.4f}\n" |
|
|
|
|
|
if drift_results['overall_drift_detected']: |
|
|
output += "\n**Recommendation:** Significant drift detected. Consider retraining the model." |
|
|
|
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error checking drift: {str(e)}" |
|
|
|
|
|
def show_feature_importance(): |
|
|
"""Show feature importance.""" |
|
|
result = mlops_engine.get_feature_importance(top_n=15) |
|
|
|
|
|
if 'error' in result: |
|
|
return f"Error: {result['error']}" |
|
|
|
|
|
fig = go.Figure(go.Bar( |
|
|
x=result['importances'], |
|
|
y=result['features'], |
|
|
orientation='h', |
|
|
marker=dict(color='rgb(55, 83, 109)') |
|
|
)) |
|
|
|
|
|
fig.update_layout( |
|
|
title='Top 15 Feature Importances', |
|
|
xaxis_title='Importance Score', |
|
|
yaxis_title='Feature', |
|
|
height=500, |
|
|
yaxis={'categoryorder':'total ascending'} |
|
|
) |
|
|
|
|
|
return fig |
|
|
|
|
|
with gr.Blocks(title="MLOps Framework - Customer Churn Prediction", theme=gr.themes.Soft()) as interface: |
|
|
|
|
|
gr.Markdown(""" |
|
|
# Automated MLOps Framework |
|
|
## Customer Churn Prediction System |
|
|
|
|
|
**Author:** Spencer Purdy |
|
|
**Dataset:** IBM Telco Customer Churn |
|
|
**Model:** Ensemble (XGBoost / LightGBM / Random Forest) |
|
|
|
|
|
This system demonstrates enterprise-grade MLOps capabilities including automated training, |
|
|
model versioning, drift detection, and production monitoring. |
|
|
""") |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("Model Training"): |
|
|
gr.Markdown(""" |
|
|
### Train Machine Learning Models |
|
|
|
|
|
This will train multiple models (XGBoost, LightGBM, Random Forest) with hyperparameter |
|
|
optimization and select the best performing model based on ROC-AUC score. |
|
|
|
|
|
**Note:** Training may take 3-5 minutes depending on hardware. |
|
|
""") |
|
|
|
|
|
train_button = gr.Button("Start Training", variant="primary", size="lg") |
|
|
training_output = gr.Markdown(label="Training Results") |
|
|
|
|
|
train_button.click( |
|
|
fn=train_model, |
|
|
outputs=training_output |
|
|
) |
|
|
|
|
|
with gr.TabItem("Make Predictions"): |
|
|
gr.Markdown(""" |
|
|
### Predict Customer Churn |
|
|
|
|
|
Enter customer information to predict churn probability. |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gender = gr.Radio(["Male", "Female"], label="Gender", value="Male") |
|
|
senior_citizen = gr.Radio(["Yes", "No"], label="Senior Citizen", value="No") |
|
|
partner = gr.Radio(["Yes", "No"], label="Has Partner", value="No") |
|
|
dependents = gr.Radio(["Yes", "No"], label="Has Dependents", value="No") |
|
|
tenure = gr.Slider(0, 72, value=12, step=1, label="Tenure (months)") |
|
|
|
|
|
with gr.Column(): |
|
|
phone_service = gr.Radio(["Yes", "No"], label="Phone Service", value="Yes") |
|
|
multiple_lines = gr.Radio(["Yes", "No", "No phone service"], |
|
|
label="Multiple Lines", value="No") |
|
|
internet_service = gr.Radio(["DSL", "Fiber optic", "No"], |
|
|
label="Internet Service", value="Fiber optic") |
|
|
online_security = gr.Radio(["Yes", "No", "No internet service"], |
|
|
label="Online Security", value="No") |
|
|
online_backup = gr.Radio(["Yes", "No", "No internet service"], |
|
|
label="Online Backup", value="No") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
device_protection = gr.Radio(["Yes", "No", "No internet service"], |
|
|
label="Device Protection", value="No") |
|
|
tech_support = gr.Radio(["Yes", "No", "No internet service"], |
|
|
label="Tech Support", value="No") |
|
|
streaming_tv = gr.Radio(["Yes", "No", "No internet service"], |
|
|
label="Streaming TV", value="No") |
|
|
streaming_movies = gr.Radio(["Yes", "No", "No internet service"], |
|
|
label="Streaming Movies", value="No") |
|
|
|
|
|
with gr.Column(): |
|
|
contract = gr.Radio(["Month-to-month", "One year", "Two year"], |
|
|
label="Contract Type", value="Month-to-month") |
|
|
paperless_billing = gr.Radio(["Yes", "No"], |
|
|
label="Paperless Billing", value="Yes") |
|
|
payment_method = gr.Radio([ |
|
|
"Electronic check", "Mailed check", |
|
|
"Bank transfer (automatic)", "Credit card (automatic)" |
|
|
], label="Payment Method", value="Electronic check") |
|
|
monthly_charges = gr.Number(label="Monthly Charges ($)", value=70.0) |
|
|
total_charges = gr.Number(label="Total Charges ($)", value=840.0) |
|
|
|
|
|
predict_button = gr.Button("Predict Churn", variant="primary", size="lg") |
|
|
prediction_output = gr.Markdown(label="Prediction Result") |
|
|
|
|
|
predict_button.click( |
|
|
fn=make_prediction, |
|
|
inputs=[ |
|
|
gender, senior_citizen, partner, dependents, tenure, |
|
|
phone_service, multiple_lines, internet_service, |
|
|
online_security, online_backup, device_protection, |
|
|
tech_support, streaming_tv, streaming_movies, |
|
|
contract, paperless_billing, payment_method, |
|
|
monthly_charges, total_charges |
|
|
], |
|
|
outputs=prediction_output |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
### Example Scenarios |
|
|
|
|
|
**High Churn Risk:** |
|
|
- Short tenure (< 12 months) |
|
|
- Month-to-month contract |
|
|
- High monthly charges |
|
|
- Fiber optic internet without add-on services |
|
|
|
|
|
**Low Churn Risk:** |
|
|
- Long tenure (> 36 months) |
|
|
- Two-year contract |
|
|
- Multiple services subscribed |
|
|
- Automatic payment method |
|
|
""") |
|
|
|
|
|
with gr.TabItem("Drift Detection"): |
|
|
gr.Markdown(""" |
|
|
### Data Drift Monitoring |
|
|
|
|
|
Detect if incoming data distribution has shifted from training data. |
|
|
Significant drift may indicate the need for model retraining. |
|
|
|
|
|
**Method:** Kolmogorov-Smirnov statistical test (p-value < 0.05 indicates drift) |
|
|
""") |
|
|
|
|
|
n_samples_slider = gr.Slider( |
|
|
100, 1000, value=500, step=100, |
|
|
label="Number of samples to check" |
|
|
) |
|
|
|
|
|
drift_button = gr.Button("Check for Drift", variant="primary") |
|
|
drift_output = gr.Markdown(label="Drift Detection Results") |
|
|
|
|
|
drift_button.click( |
|
|
fn=check_drift, |
|
|
inputs=n_samples_slider, |
|
|
outputs=drift_output |
|
|
) |
|
|
|
|
|
with gr.TabItem("Feature Importance"): |
|
|
gr.Markdown(""" |
|
|
### Model Interpretability |
|
|
|
|
|
Understand which features are most important for the model's predictions. |
|
|
""") |
|
|
|
|
|
importance_button = gr.Button("Show Feature Importance", variant="primary") |
|
|
importance_plot = gr.Plot(label="Feature Importance") |
|
|
|
|
|
importance_button.click( |
|
|
fn=show_feature_importance, |
|
|
outputs=importance_plot |
|
|
) |
|
|
|
|
|
with gr.TabItem("Documentation"): |
|
|
gr.Markdown(""" |
|
|
## System Documentation |
|
|
|
|
|
### Overview |
|
|
|
|
|
This MLOps framework demonstrates production-ready machine learning operations |
|
|
for customer churn prediction in the telecommunications industry. |
|
|
|
|
|
### Dataset |
|
|
|
|
|
- **Source:** IBM Telco Customer Churn |
|
|
- **Samples:** 7,043 customers |
|
|
- **Features:** 20 (demographic, account, service information) |
|
|
- **Target:** Binary classification (Churn: Yes/No) |
|
|
- **Class Distribution:** ~26% churn rate (handled with SMOTE) |
|
|
|
|
|
### Model Pipeline |
|
|
|
|
|
1. **Data Loading:** Load and validate dataset |
|
|
2. **Preprocessing:** |
|
|
- Handle missing values (median imputation for numerics) |
|
|
- Feature engineering (tenure groups, charge ratios, service counts) |
|
|
- Label encoding for categorical variables |
|
|
- Standard scaling for numerical features |
|
|
- SMOTE for class balancing |
|
|
3. **Model Training:** |
|
|
- Train XGBoost, LightGBM, Random Forest |
|
|
- Hyperparameter optimization with Optuna (30 trials) |
|
|
- 5-fold cross-validation |
|
|
- Select best model based on ROC-AUC |
|
|
4. **Evaluation:** Test on held-out test set (20% of data) |
|
|
5. **Model Registry:** Save model with versioning |
|
|
|
|
|
### Performance Metrics |
|
|
|
|
|
**Expected Performance (Test Set):** |
|
|
- ROC-AUC: ~0.85 |
|
|
- Accuracy: ~80% |
|
|
- Precision: ~0.65 |
|
|
- Recall: ~0.55 |
|
|
- F1-Score: ~0.60 |
|
|
|
|
|
### Limitations |
|
|
|
|
|
1. **Domain Specificity:** Model trained on telecom data; may not generalize |
|
|
to other industries |
|
|
2. **Data Drift:** Performance degrades with significant distribution shifts |
|
|
(threshold: p < 0.05) |
|
|
3. **Sample Size:** Requires minimum 1000 samples for reliable predictions |
|
|
4. **Feature Requirements:** All input features must be provided |
|
|
5. **Temporal Validity:** Model performance may degrade over time without |
|
|
retraining |
|
|
6. **Class Imbalance:** Natural imbalance handled but may still affect |
|
|
minority class precision |
|
|
|
|
|
### Failure Cases |
|
|
|
|
|
1. **Missing Features:** Prediction fails if critical features are missing |
|
|
2. **Out-of-Range Values:** May produce unreliable predictions for extreme |
|
|
values outside training distribution |
|
|
3. **New Categories:** Unseen categorical values default to most common |
|
|
category (may reduce accuracy) |
|
|
4. **Cold Start:** New customers with <3 months tenure show higher |
|
|
prediction uncertainty |
|
|
|
|
|
### Technical Specifications |
|
|
|
|
|
- **Python Version:** 3.10+ |
|
|
- **Random Seed:** 42 (all libraries) |
|
|
- **Training Time:** ~3-5 minutes (depends on hardware) |
|
|
- **Inference Latency:** <100ms per prediction |
|
|
- **Model Size:** ~50MB (XGBoost), ~30MB (LightGBM), ~80MB (Random Forest) |
|
|
|
|
|
### Reproducibility |
|
|
|
|
|
All random seeds are set to 42: |
|
|
- `random.seed(42)` |
|
|
- `np.random.seed(42)` |
|
|
- `PYTHONHASHSEED=42` |
|
|
- All model `random_state=42` |
|
|
|
|
|
### License |
|
|
|
|
|
- **Code:** MIT License |
|
|
- **Dataset:** Database Contents License (DbCL) v1.0 |
|
|
|
|
|
### Contact |
|
|
|
|
|
**Author:** Spencer Purdy |
|
|
**Purpose:** Portfolio demonstration of ML engineering skills |
|
|
|
|
|
--- |
|
|
|
|
|
**Disclaimer:** This is a demonstration system. Performance metrics are |
|
|
indicative and should be validated on your specific use case before |
|
|
production deployment. |
|
|
""") |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
**Automated MLOps Framework v1.0.0** | Built with Gradio | Author: Spencer Purdy |
|
|
|
|
|
System demonstrates: Data preprocessing, Feature engineering, Model training, |
|
|
Hyperparameter optimization, Model evaluation, Drift detection, Production monitoring |
|
|
""") |
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("Creating Gradio interface...") |
|
|
interface = create_gradio_interface() |
|
|
|
|
|
logger.info("Launching MLOps Framework...") |
|
|
interface.launch( |
|
|
share=True, |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
show_error=True |
|
|
) |