""" APOO ML Module — Travel Time Prediction with Uncertainty ========================================================= XGBoost quantile regression for travel time prediction. SHAP explainability for feature importance. """ import numpy as np import pandas as pd import xgboost as xgb from sklearn.model_selection import train_test_split from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score import shap import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import warnings warnings.filterwarnings('ignore') from apoo_core import IndianTrafficGenerator # ============================================================ # 1. FEATURE ENGINEERING # ============================================================ FEATURE_COLUMNS = [ "link_length_m", "speed_limit_kmh", "num_lanes", "gradient_pct", "side_friction", "pct_two_wheeler", "pct_car", "pct_auto", "pct_bus", "pct_truck", "density_veh_km_lane", "weather_speed_factor", "time_of_day_sin", "time_of_day_cos", "is_peak", "is_weekend", "platoon_size", "platoon_pcu", "upstream_queue_pcu", "downstream_queue_pcu", ] TARGET_COLUMN = "actual_travel_time_s" def prepare_features(df: pd.DataFrame): """Extract features and target from training data.""" X = df[FEATURE_COLUMNS].copy() y = df[TARGET_COLUMN].copy() return X, y # ============================================================ # 2. XGBOOST QUANTILE REGRESSION MODELS # ============================================================ class APOOPredictor: """ Uncertainty-aware travel time predictor using XGBoost quantile regression. Trains 3 models: P10 (lower bound), P50 (median), P90 (upper bound). This gives 80% prediction intervals for each travel time estimate. """ def __init__(self, n_estimators: int = 300, max_depth: int = 6, learning_rate: float = 0.05): self.n_estimators = n_estimators self.max_depth = max_depth self.learning_rate = learning_rate self.models = {} self.quantiles = [0.1, 0.5, 0.9] self.feature_names = FEATURE_COLUMNS self.train_metrics = {} self.shap_values = None self.explainer = None def train(self, X_train, y_train, X_val=None, y_val=None): """Train quantile regression models.""" for q in self.quantiles: print(f" Training Q{q:.0%} model...") model = xgb.XGBRegressor( objective='reg:quantileerror', quantile_alpha=q, n_estimators=self.n_estimators, max_depth=self.max_depth, learning_rate=self.learning_rate, subsample=0.8, colsample_bytree=0.8, tree_method='hist', random_state=42, ) eval_set = [(X_train, y_train)] if X_val is not None: eval_set.append((X_val, y_val)) model.fit( X_train, y_train, eval_set=eval_set, verbose=False, ) self.models[q] = model # Compute metrics on validation set if X_val is not None: self._compute_metrics(X_val, y_val) # Compute SHAP values (on training subset for speed) self._compute_shap(X_train[:min(500, len(X_train))]) return self def predict(self, X): """Predict with uncertainty bounds.""" p10 = self.models[0.1].predict(X) p50 = self.models[0.5].predict(X) p90 = self.models[0.9].predict(X) uncertainty = (p90 - p10) / 2 return p50, p10, p90, uncertainty def _compute_metrics(self, X_val, y_val): """Compute validation metrics.""" p50 = self.models[0.5].predict(X_val) p10 = self.models[0.1].predict(X_val) p90 = self.models[0.9].predict(X_val) mae = mean_absolute_error(y_val, p50) rmse = np.sqrt(mean_squared_error(y_val, p50)) r2 = r2_score(y_val, p50) # Coverage: % of actual values within [P10, P90] in_interval = ((y_val >= p10) & (y_val <= p90)).mean() * 100 # Mean interval width mean_width = np.mean(p90 - p10) self.train_metrics = { "MAE (s)": round(mae, 2), "RMSE (s)": round(rmse, 2), "R² Score": round(r2, 4), "80% PI Coverage (%)": round(in_interval, 1), "Mean PI Width (s)": round(mean_width, 2), "MAPE (%)": round(np.mean(np.abs(y_val - p50) / np.clip(y_val, 1, None)) * 100, 2), } print(f" Validation Metrics:") for k, v in self.train_metrics.items(): print(f" {k}: {v}") def _compute_shap(self, X_sample): """Compute SHAP values for explainability.""" try: self.explainer = shap.TreeExplainer(self.models[0.5]) self.shap_values = self.explainer(X_sample) except Exception as e: print(f" SHAP computation warning: {e}") # Fallback: use basic feature importance self.shap_values = None def get_feature_importance(self) -> pd.DataFrame: """Get feature importance from median model.""" model = self.models[0.5] importance = model.feature_importances_ return pd.DataFrame({ "Feature": self.feature_names, "Importance": importance, }).sort_values("Importance", ascending=False) # ---- Plotting Methods ---- def plot_predictions_vs_actual(self, X_val, y_val, title=""): """Scatter plot of predicted vs actual travel times.""" p50, p10, p90, _ = self.predict(X_val) fig, axes = plt.subplots(1, 2, figsize=(14, 6)) # Left: Scatter with uncertainty ax = axes[0] sorted_idx = np.argsort(y_val.values) y_sorted = y_val.values[sorted_idx] p50_s = p50[sorted_idx] p10_s = p10[sorted_idx] p90_s = p90[sorted_idx] x_range = np.arange(len(y_sorted)) ax.scatter(x_range, y_sorted, alpha=0.3, s=8, color='#2c3e50', label='Actual', zorder=3) ax.plot(x_range, p50_s, color='#e74c3c', linewidth=1.5, label='Predicted (P50)', zorder=4) ax.fill_between(x_range, p10_s, p90_s, alpha=0.2, color='#3498db', label='80% PI [P10-P90]', zorder=2) ax.set_xlabel("Sample Index (sorted by actual)", fontsize=11) ax.set_ylabel("Travel Time (seconds)", fontsize=11) ax.set_title(f"Predictions with Uncertainty Bands{' — ' + title if title else ''}", fontsize=12) ax.legend(fontsize=10) ax.grid(alpha=0.3) # Right: Residual distribution ax2 = axes[1] residuals = y_val.values - p50 ax2.hist(residuals, bins=50, alpha=0.7, color='#3498db', edgecolor='white') ax2.axvline(0, color='#e74c3c', linestyle='--', linewidth=2, label=f'Zero Error') ax2.axvline(np.mean(residuals), color='#f39c12', linestyle='--', linewidth=2, label=f'Mean: {np.mean(residuals):.1f}s') ax2.set_xlabel("Residual (Actual - Predicted) [seconds]", fontsize=11) ax2.set_ylabel("Count", fontsize=11) ax2.set_title("Residual Distribution", fontsize=12) ax2.legend(fontsize=10) ax2.grid(alpha=0.3) plt.tight_layout() plt.close(fig) return fig def plot_shap_beeswarm(self, max_display=15): """SHAP beeswarm plot showing feature impact distribution.""" if self.shap_values is None: return self._fallback_importance_plot() fig, ax = plt.subplots(figsize=(11, 7)) shap.plots.beeswarm(self.shap_values, max_display=max_display, show=False) plt.title("SHAP Feature Impact on Travel Time Prediction", fontsize=13, fontweight='bold') plt.tight_layout() fig = plt.gcf() plt.close(fig) return fig def plot_shap_bar(self, max_display=15): """SHAP global feature importance bar plot.""" if self.shap_values is None: return self._fallback_importance_plot() fig, ax = plt.subplots(figsize=(10, 6)) shap.plots.bar(self.shap_values, max_display=max_display, show=False) plt.title("Global Feature Importance (Mean |SHAP|)", fontsize=13, fontweight='bold') plt.tight_layout() fig = plt.gcf() plt.close(fig) return fig def plot_shap_waterfall(self, X_sample, idx=0): """SHAP waterfall plot for a single prediction.""" if self.shap_values is None or self.explainer is None: return self._fallback_importance_plot() try: sv = self.explainer(X_sample[idx:idx+1]) fig, ax = plt.subplots(figsize=(10, 6)) shap.plots.waterfall(sv[0], show=False) plt.title(f"SHAP Waterfall — Prediction Breakdown (Sample {idx})", fontsize=12, fontweight='bold') plt.tight_layout() fig = plt.gcf() plt.close(fig) return fig except: return self._fallback_importance_plot() def _fallback_importance_plot(self): """Fallback: XGBoost native feature importance.""" importance_df = self.get_feature_importance() fig, ax = plt.subplots(figsize=(10, 6)) bars = ax.barh(importance_df["Feature"][:15][::-1], importance_df["Importance"][:15][::-1], color='#3498db', edgecolor='white') ax.set_xlabel("Feature Importance (Gain)", fontsize=11) ax.set_title("XGBoost Feature Importance (Fallback)", fontsize=13, fontweight='bold') ax.grid(alpha=0.3, axis='x') plt.tight_layout() plt.close(fig) return fig def plot_quantile_calibration(self, X_val, y_val): """Check if quantile predictions are well-calibrated.""" fig, ax = plt.subplots(figsize=(8, 6)) test_quantiles = [0.1, 0.5, 0.9] observed_below = [] for q in test_quantiles: pred = self.models[q].predict(X_val) frac_below = (y_val.values <= pred).mean() observed_below.append(frac_below) ax.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Perfect Calibration') ax.scatter(test_quantiles, observed_below, s=120, c='#e74c3c', zorder=5, edgecolors='white', linewidth=2) ax.plot(test_quantiles, observed_below, 'r-', alpha=0.7, linewidth=2, label='Model') for q, obs in zip(test_quantiles, observed_below): ax.annotate(f'Q{q:.0%}: {obs:.1%}', (q, obs), textcoords="offset points", xytext=(10, 10), fontsize=10) ax.set_xlabel("Predicted Quantile", fontsize=12) ax.set_ylabel("Observed Fraction Below", fontsize=12) ax.set_title("Quantile Calibration Plot", fontsize=13, fontweight='bold') ax.legend(fontsize=11) ax.set_xlim(-0.05, 1.05) ax.set_ylim(-0.05, 1.05) ax.grid(alpha=0.3) plt.tight_layout() plt.close(fig) return fig # ============================================================ # 3. TRAINING PIPELINE # ============================================================ def train_apoo_model(n_samples: int = 5000, city_type: str = "metro"): """Full training pipeline for APOO predictor.""" print("=" * 60) print("APOO ML Training Pipeline") print("=" * 60) # Step 1: Generate training data print("\n[1/4] Generating synthetic Indian traffic data...") gen = IndianTrafficGenerator(seed=42) df = gen.generate_training_data(n_samples=n_samples, city_type=city_type) print(f" Generated {len(df)} samples with {len(FEATURE_COLUMNS)} features") print(f" Target stats: mean={df[TARGET_COLUMN].mean():.1f}s, " f"std={df[TARGET_COLUMN].std():.1f}s, " f"range=[{df[TARGET_COLUMN].min():.1f}, {df[TARGET_COLUMN].max():.1f}]s") # Step 2: Prepare features print("\n[2/4] Preparing features & splitting data...") X, y = prepare_features(df) X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) print(f" Train: {len(X_train)}, Validation: {len(X_val)}") # Step 3: Train models print("\n[3/4] Training XGBoost quantile models...") predictor = APOOPredictor(n_estimators=300, max_depth=6, learning_rate=0.05) predictor.train(X_train, y_train, X_val, y_val) # Step 4: Save artifacts print("\n[4/4] Training complete!") print(f" Model metrics: {predictor.train_metrics}") return predictor, X_train, X_val, y_train, y_val, df if __name__ == "__main__": predictor, X_train, X_val, y_train, y_val, df = train_apoo_model(n_samples=5000) # Generate plots fig1 = predictor.plot_predictions_vs_actual(X_val, y_val) fig1.savefig("/app/pred_vs_actual.png", dpi=150, bbox_inches='tight') fig2 = predictor.plot_shap_beeswarm() fig2.savefig("/app/shap_beeswarm.png", dpi=150, bbox_inches='tight') print("Plots saved.")