omshrivastava's picture
Add APOO ML module
404b7cf verified
"""
APOO ML Module — Travel Time Prediction with Uncertainty
=========================================================
XGBoost quantile regression for travel time prediction.
SHAP explainability for feature importance.
"""
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import shap
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
from apoo_core import IndianTrafficGenerator
# ============================================================
# 1. FEATURE ENGINEERING
# ============================================================
FEATURE_COLUMNS = [
"link_length_m", "speed_limit_kmh", "num_lanes", "gradient_pct",
"side_friction", "pct_two_wheeler", "pct_car", "pct_auto",
"pct_bus", "pct_truck", "density_veh_km_lane",
"weather_speed_factor", "time_of_day_sin", "time_of_day_cos",
"is_peak", "is_weekend", "platoon_size", "platoon_pcu",
"upstream_queue_pcu", "downstream_queue_pcu",
]
TARGET_COLUMN = "actual_travel_time_s"
def prepare_features(df: pd.DataFrame):
"""Extract features and target from training data."""
X = df[FEATURE_COLUMNS].copy()
y = df[TARGET_COLUMN].copy()
return X, y
# ============================================================
# 2. XGBOOST QUANTILE REGRESSION MODELS
# ============================================================
class APOOPredictor:
"""
Uncertainty-aware travel time predictor using XGBoost quantile regression.
Trains 3 models: P10 (lower bound), P50 (median), P90 (upper bound).
This gives 80% prediction intervals for each travel time estimate.
"""
def __init__(self, n_estimators: int = 300, max_depth: int = 6,
learning_rate: float = 0.05):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.learning_rate = learning_rate
self.models = {}
self.quantiles = [0.1, 0.5, 0.9]
self.feature_names = FEATURE_COLUMNS
self.train_metrics = {}
self.shap_values = None
self.explainer = None
def train(self, X_train, y_train, X_val=None, y_val=None):
"""Train quantile regression models."""
for q in self.quantiles:
print(f" Training Q{q:.0%} model...")
model = xgb.XGBRegressor(
objective='reg:quantileerror',
quantile_alpha=q,
n_estimators=self.n_estimators,
max_depth=self.max_depth,
learning_rate=self.learning_rate,
subsample=0.8,
colsample_bytree=0.8,
tree_method='hist',
random_state=42,
)
eval_set = [(X_train, y_train)]
if X_val is not None:
eval_set.append((X_val, y_val))
model.fit(
X_train, y_train,
eval_set=eval_set,
verbose=False,
)
self.models[q] = model
# Compute metrics on validation set
if X_val is not None:
self._compute_metrics(X_val, y_val)
# Compute SHAP values (on training subset for speed)
self._compute_shap(X_train[:min(500, len(X_train))])
return self
def predict(self, X):
"""Predict with uncertainty bounds."""
p10 = self.models[0.1].predict(X)
p50 = self.models[0.5].predict(X)
p90 = self.models[0.9].predict(X)
uncertainty = (p90 - p10) / 2
return p50, p10, p90, uncertainty
def _compute_metrics(self, X_val, y_val):
"""Compute validation metrics."""
p50 = self.models[0.5].predict(X_val)
p10 = self.models[0.1].predict(X_val)
p90 = self.models[0.9].predict(X_val)
mae = mean_absolute_error(y_val, p50)
rmse = np.sqrt(mean_squared_error(y_val, p50))
r2 = r2_score(y_val, p50)
# Coverage: % of actual values within [P10, P90]
in_interval = ((y_val >= p10) & (y_val <= p90)).mean() * 100
# Mean interval width
mean_width = np.mean(p90 - p10)
self.train_metrics = {
"MAE (s)": round(mae, 2),
"RMSE (s)": round(rmse, 2),
"R² Score": round(r2, 4),
"80% PI Coverage (%)": round(in_interval, 1),
"Mean PI Width (s)": round(mean_width, 2),
"MAPE (%)": round(np.mean(np.abs(y_val - p50) / np.clip(y_val, 1, None)) * 100, 2),
}
print(f" Validation Metrics:")
for k, v in self.train_metrics.items():
print(f" {k}: {v}")
def _compute_shap(self, X_sample):
"""Compute SHAP values for explainability."""
try:
self.explainer = shap.TreeExplainer(self.models[0.5])
self.shap_values = self.explainer(X_sample)
except Exception as e:
print(f" SHAP computation warning: {e}")
# Fallback: use basic feature importance
self.shap_values = None
def get_feature_importance(self) -> pd.DataFrame:
"""Get feature importance from median model."""
model = self.models[0.5]
importance = model.feature_importances_
return pd.DataFrame({
"Feature": self.feature_names,
"Importance": importance,
}).sort_values("Importance", ascending=False)
# ---- Plotting Methods ----
def plot_predictions_vs_actual(self, X_val, y_val, title=""):
"""Scatter plot of predicted vs actual travel times."""
p50, p10, p90, _ = self.predict(X_val)
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Left: Scatter with uncertainty
ax = axes[0]
sorted_idx = np.argsort(y_val.values)
y_sorted = y_val.values[sorted_idx]
p50_s = p50[sorted_idx]
p10_s = p10[sorted_idx]
p90_s = p90[sorted_idx]
x_range = np.arange(len(y_sorted))
ax.scatter(x_range, y_sorted, alpha=0.3, s=8, color='#2c3e50', label='Actual', zorder=3)
ax.plot(x_range, p50_s, color='#e74c3c', linewidth=1.5, label='Predicted (P50)', zorder=4)
ax.fill_between(x_range, p10_s, p90_s, alpha=0.2, color='#3498db', label='80% PI [P10-P90]', zorder=2)
ax.set_xlabel("Sample Index (sorted by actual)", fontsize=11)
ax.set_ylabel("Travel Time (seconds)", fontsize=11)
ax.set_title(f"Predictions with Uncertainty Bands{' — ' + title if title else ''}", fontsize=12)
ax.legend(fontsize=10)
ax.grid(alpha=0.3)
# Right: Residual distribution
ax2 = axes[1]
residuals = y_val.values - p50
ax2.hist(residuals, bins=50, alpha=0.7, color='#3498db', edgecolor='white')
ax2.axvline(0, color='#e74c3c', linestyle='--', linewidth=2, label=f'Zero Error')
ax2.axvline(np.mean(residuals), color='#f39c12', linestyle='--', linewidth=2,
label=f'Mean: {np.mean(residuals):.1f}s')
ax2.set_xlabel("Residual (Actual - Predicted) [seconds]", fontsize=11)
ax2.set_ylabel("Count", fontsize=11)
ax2.set_title("Residual Distribution", fontsize=12)
ax2.legend(fontsize=10)
ax2.grid(alpha=0.3)
plt.tight_layout()
plt.close(fig)
return fig
def plot_shap_beeswarm(self, max_display=15):
"""SHAP beeswarm plot showing feature impact distribution."""
if self.shap_values is None:
return self._fallback_importance_plot()
fig, ax = plt.subplots(figsize=(11, 7))
shap.plots.beeswarm(self.shap_values, max_display=max_display, show=False)
plt.title("SHAP Feature Impact on Travel Time Prediction", fontsize=13, fontweight='bold')
plt.tight_layout()
fig = plt.gcf()
plt.close(fig)
return fig
def plot_shap_bar(self, max_display=15):
"""SHAP global feature importance bar plot."""
if self.shap_values is None:
return self._fallback_importance_plot()
fig, ax = plt.subplots(figsize=(10, 6))
shap.plots.bar(self.shap_values, max_display=max_display, show=False)
plt.title("Global Feature Importance (Mean |SHAP|)", fontsize=13, fontweight='bold')
plt.tight_layout()
fig = plt.gcf()
plt.close(fig)
return fig
def plot_shap_waterfall(self, X_sample, idx=0):
"""SHAP waterfall plot for a single prediction."""
if self.shap_values is None or self.explainer is None:
return self._fallback_importance_plot()
try:
sv = self.explainer(X_sample[idx:idx+1])
fig, ax = plt.subplots(figsize=(10, 6))
shap.plots.waterfall(sv[0], show=False)
plt.title(f"SHAP Waterfall — Prediction Breakdown (Sample {idx})", fontsize=12, fontweight='bold')
plt.tight_layout()
fig = plt.gcf()
plt.close(fig)
return fig
except:
return self._fallback_importance_plot()
def _fallback_importance_plot(self):
"""Fallback: XGBoost native feature importance."""
importance_df = self.get_feature_importance()
fig, ax = plt.subplots(figsize=(10, 6))
bars = ax.barh(importance_df["Feature"][:15][::-1],
importance_df["Importance"][:15][::-1],
color='#3498db', edgecolor='white')
ax.set_xlabel("Feature Importance (Gain)", fontsize=11)
ax.set_title("XGBoost Feature Importance (Fallback)", fontsize=13, fontweight='bold')
ax.grid(alpha=0.3, axis='x')
plt.tight_layout()
plt.close(fig)
return fig
def plot_quantile_calibration(self, X_val, y_val):
"""Check if quantile predictions are well-calibrated."""
fig, ax = plt.subplots(figsize=(8, 6))
test_quantiles = [0.1, 0.5, 0.9]
observed_below = []
for q in test_quantiles:
pred = self.models[q].predict(X_val)
frac_below = (y_val.values <= pred).mean()
observed_below.append(frac_below)
ax.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Perfect Calibration')
ax.scatter(test_quantiles, observed_below, s=120, c='#e74c3c',
zorder=5, edgecolors='white', linewidth=2)
ax.plot(test_quantiles, observed_below, 'r-', alpha=0.7, linewidth=2, label='Model')
for q, obs in zip(test_quantiles, observed_below):
ax.annotate(f'Q{q:.0%}: {obs:.1%}', (q, obs),
textcoords="offset points", xytext=(10, 10), fontsize=10)
ax.set_xlabel("Predicted Quantile", fontsize=12)
ax.set_ylabel("Observed Fraction Below", fontsize=12)
ax.set_title("Quantile Calibration Plot", fontsize=13, fontweight='bold')
ax.legend(fontsize=11)
ax.set_xlim(-0.05, 1.05)
ax.set_ylim(-0.05, 1.05)
ax.grid(alpha=0.3)
plt.tight_layout()
plt.close(fig)
return fig
# ============================================================
# 3. TRAINING PIPELINE
# ============================================================
def train_apoo_model(n_samples: int = 5000, city_type: str = "metro"):
"""Full training pipeline for APOO predictor."""
print("=" * 60)
print("APOO ML Training Pipeline")
print("=" * 60)
# Step 1: Generate training data
print("\n[1/4] Generating synthetic Indian traffic data...")
gen = IndianTrafficGenerator(seed=42)
df = gen.generate_training_data(n_samples=n_samples, city_type=city_type)
print(f" Generated {len(df)} samples with {len(FEATURE_COLUMNS)} features")
print(f" Target stats: mean={df[TARGET_COLUMN].mean():.1f}s, "
f"std={df[TARGET_COLUMN].std():.1f}s, "
f"range=[{df[TARGET_COLUMN].min():.1f}, {df[TARGET_COLUMN].max():.1f}]s")
# Step 2: Prepare features
print("\n[2/4] Preparing features & splitting data...")
X, y = prepare_features(df)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
print(f" Train: {len(X_train)}, Validation: {len(X_val)}")
# Step 3: Train models
print("\n[3/4] Training XGBoost quantile models...")
predictor = APOOPredictor(n_estimators=300, max_depth=6, learning_rate=0.05)
predictor.train(X_train, y_train, X_val, y_val)
# Step 4: Save artifacts
print("\n[4/4] Training complete!")
print(f" Model metrics: {predictor.train_metrics}")
return predictor, X_train, X_val, y_train, y_val, df
if __name__ == "__main__":
predictor, X_train, X_val, y_train, y_val, df = train_apoo_model(n_samples=5000)
# Generate plots
fig1 = predictor.plot_predictions_vs_actual(X_val, y_val)
fig1.savefig("/app/pred_vs_actual.png", dpi=150, bbox_inches='tight')
fig2 = predictor.plot_shap_beeswarm()
fig2.savefig("/app/shap_beeswarm.png", dpi=150, bbox_inches='tight')
print("Plots saved.")