Spaces:
Sleeping
Sleeping
File size: 9,536 Bytes
4ba360f 6722f0a 4ba360f 6722f0a 4ba360f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | """
INFERENCE PIPELINE - Production ML Model Serving with Feature Consistency
=========================================================================
This module provides the core inference functionality for the Telco Churn prediction model.
It ensures that serving-time feature transformations exactly match training-time transformations,
which is CRITICAL for model accuracy in production.
Key Responsibilities:
1. Load MLflow-logged model and feature metadata from training
2. Apply identical feature transformations as used during training
3. Ensure correct feature ordering for model input
4. Convert model predictions to user-friendly output
CRITICAL PATTERN: Training/Serving Consistency
- Uses fixed BINARY_MAP for deterministic binary encoding
- Applies same one-hot encoding with drop_first=True
- Maintains exact feature column order from training
- Handles missing/new categorical values gracefully
Production Deployment:
- MODEL_DIR points to containerized model artifacts
- Feature schema loaded from training-time artifacts
- Optimized for single-row inference (real-time serving)
"""
import os
import json
import joblib
import pandas as pd
# === MODEL LOADING CONFIGURATION ===
# IMPORTANT: This path is set during Docker container build
# In development: uses local MLflow artifacts
# In production: uses model copied to container at build time
MODEL_DIR = "/app/model"
try:
# Load the trained model using joblib
model_path = os.path.join(os.path.dirname(__file__), "..", "..", "artifacts", "model.joblib")
model = joblib.load(model_path)
print(f"Model loaded successfully from {model_path}")
except Exception as e:
print(f"Failed to load model from {model_path}: {e}")
# Fallback for containerized environment
try:
model_path = os.path.join(MODEL_DIR, "model.joblib")
model = joblib.load(model_path)
print(f"Fallback: Loaded model from {model_path}")
except Exception as fallback_error:
raise Exception(f"Failed to load model: {e}. Fallback failed: {fallback_error}")
# === FEATURE SCHEMA LOADING ===
# CRITICAL: Load the exact feature column order used during training
# This ensures the model receives features in the expected order
try:
feature_file = os.path.join(os.path.dirname(__file__), "..", "..", "artifacts", "feature_columns.json")
with open(feature_file) as f:
FEATURE_COLS = json.load(f)
print(f"Loaded {len(FEATURE_COLS)} feature columns from training")
except Exception as e:
# Fallback for containerized environment
try:
feature_file = os.path.join(MODEL_DIR, "feature_columns.json")
with open(feature_file) as f:
FEATURE_COLS = json.load(f)
print(f"Fallback: Loaded {len(FEATURE_COLS)} feature columns")
except Exception as fallback_error:
raise Exception(f"Failed to load feature columns: {e}. Fallback failed: {fallback_error}")
# === FEATURE TRANSFORMATION CONSTANTS ===
# CRITICAL: These mappings must exactly match those used in training
# Any changes here will cause train/serve skew and degrade model performance
# Deterministic binary feature mappings (consistent with training)
BINARY_MAP = {
"gender": {"Female": 0, "Male": 1}, # Demographics
"Partner": {"No": 0, "Yes": 1}, # Has partner
"Dependents": {"No": 0, "Yes": 1}, # Has dependents
"PhoneService": {"No": 0, "Yes": 1}, # Phone service
"PaperlessBilling": {"No": 0, "Yes": 1}, # Billing preference
}
# Numeric columns that need type coercion
NUMERIC_COLS = ["tenure", "MonthlyCharges", "TotalCharges"]
def _serve_transform(df: pd.DataFrame) -> pd.DataFrame:
"""
Apply identical feature transformations as used during model training.
This function is CRITICAL for production ML - it ensures that features are
transformed exactly as they were during training to prevent train/serve skew.
Transformation Pipeline:
1. Clean column names and handle data types
2. Apply deterministic binary encoding (using BINARY_MAP)
3. One-hot encode remaining categorical features
4. Convert boolean columns to integers
5. Align features with training schema and order
Args:
df: Single-row DataFrame with raw customer data
Returns:
DataFrame with features transformed and ordered for model input
IMPORTANT: Any changes to this function must be reflected in training
feature engineering to maintain consistency.
"""
df = df.copy()
# Clean column names (remove any whitespace)
df.columns = df.columns.str.strip()
# === STEP 1: Numeric Type Coercion ===
# Ensure numeric columns are properly typed (handle string inputs)
for c in NUMERIC_COLS:
if c in df.columns:
# Convert to numeric, replacing invalid values with NaN
df[c] = pd.to_numeric(df[c], errors="coerce")
# Fill NaN with 0 (same as training preprocessing)
df[c] = df[c].fillna(0)
# === STEP 2: Binary Feature Encoding ===
# Apply deterministic mappings for binary features
# CRITICAL: Must use exact same mappings as training
for c, mapping in BINARY_MAP.items():
if c in df.columns:
df[c] = (
df[c]
.astype(str) # Convert to string
.str.strip() # Remove whitespace
.map(mapping) # Apply binary mapping
.astype("Int64") # Handle NaN values
.fillna(0) # Fill unknown values with 0
.astype(int) # Final integer conversion
)
# === STEP 3: One-Hot Encoding for Remaining Categorical Features ===
# Find remaining object/categorical columns (not in BINARY_MAP)
obj_cols = [c for c in df.select_dtypes(include=["object"]).columns]
if obj_cols:
# Apply one-hot encoding with drop_first=True (same as training)
# This prevents multicollinearity by dropping the first category
df = pd.get_dummies(df, columns=obj_cols, drop_first=True)
# === STEP 4: Boolean to Integer Conversion ===
# Convert any boolean columns to integers (XGBoost compatibility)
bool_cols = df.select_dtypes(include=["bool"]).columns
if len(bool_cols) > 0:
df[bool_cols] = df[bool_cols].astype(int)
# === STEP 5: Feature Alignment with Training Schema ===
# CRITICAL: Ensure features are in exact same order as training
# Missing features get filled with 0, extra features are dropped
df = df.reindex(columns=FEATURE_COLS, fill_value=0)
return df
def predict(input_dict: dict) -> str:
"""
Main prediction function for customer churn inference.
This function provides the complete inference pipeline from raw customer data
to business-friendly prediction output. It's called by both the FastAPI endpoint
and the Gradio interface to ensure consistent predictions.
Pipeline:
1. Convert input dictionary to DataFrame
2. Apply feature transformations (identical to training)
3. Generate model prediction using loaded XGBoost model
4. Convert prediction to user-friendly string
Args:
input_dict: Dictionary containing raw customer data with keys matching
the CustomerData schema (18 features total)
Returns:
Human-readable prediction string:
- "Likely to churn" for high-risk customers (model prediction = 1)
- "Not likely to churn" for low-risk customers (model prediction = 0)
Example:
>>> customer_data = {
... "gender": "Female", "tenure": 1, "Contract": "Month-to-month",
... "MonthlyCharges": 85.0, ... # other features
... }
>>> predict(customer_data)
"Likely to churn"
"""
# === STEP 1: Convert Input to DataFrame ===
# Create single-row DataFrame for pandas transformations
df = pd.DataFrame([input_dict])
# === STEP 2: Apply Feature Transformations ===
# Use the same transformation pipeline as training
df_enc = _serve_transform(df)
# === STEP 3: Generate Model Prediction ===
# Call the loaded MLflow model for inference
# The model returns predictions in various formats depending on the ML library
try:
preds = model.predict(df_enc)
# Normalize prediction output to consistent format
if hasattr(preds, "tolist"):
preds = preds.tolist() # Convert numpy array to list
# Extract single prediction value (for single-row input)
if isinstance(preds, (list, tuple)) and len(preds) == 1:
result = preds[0]
else:
result = preds
except Exception as e:
raise Exception(f"Model prediction failed: {e}")
# === STEP 4: Convert to Business-Friendly Output ===
# Convert binary prediction (0/1) to actionable business language
if result == 1:
return "Likely to churn" # High risk - needs intervention
else:
return "Not likely to churn" # Low risk - maintain normal service |