| """ |
| APOO ML Module — Travel Time Prediction with Uncertainty |
| ========================================================= |
| XGBoost quantile regression for travel time prediction. |
| SHAP explainability for feature importance. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| import xgboost as xgb |
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
| import shap |
| import matplotlib |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| from apoo_core import IndianTrafficGenerator |
|
|
|
|
| |
| |
| |
|
|
| FEATURE_COLUMNS = [ |
| "link_length_m", "speed_limit_kmh", "num_lanes", "gradient_pct", |
| "side_friction", "pct_two_wheeler", "pct_car", "pct_auto", |
| "pct_bus", "pct_truck", "density_veh_km_lane", |
| "weather_speed_factor", "time_of_day_sin", "time_of_day_cos", |
| "is_peak", "is_weekend", "platoon_size", "platoon_pcu", |
| "upstream_queue_pcu", "downstream_queue_pcu", |
| ] |
|
|
| TARGET_COLUMN = "actual_travel_time_s" |
|
|
|
|
| def prepare_features(df: pd.DataFrame): |
| """Extract features and target from training data.""" |
| X = df[FEATURE_COLUMNS].copy() |
| y = df[TARGET_COLUMN].copy() |
| return X, y |
|
|
|
|
| |
| |
| |
|
|
| class APOOPredictor: |
| """ |
| Uncertainty-aware travel time predictor using XGBoost quantile regression. |
| |
| Trains 3 models: P10 (lower bound), P50 (median), P90 (upper bound). |
| This gives 80% prediction intervals for each travel time estimate. |
| """ |
| |
| def __init__(self, n_estimators: int = 300, max_depth: int = 6, |
| learning_rate: float = 0.05): |
| self.n_estimators = n_estimators |
| self.max_depth = max_depth |
| self.learning_rate = learning_rate |
| self.models = {} |
| self.quantiles = [0.1, 0.5, 0.9] |
| self.feature_names = FEATURE_COLUMNS |
| self.train_metrics = {} |
| self.shap_values = None |
| self.explainer = None |
| |
| def train(self, X_train, y_train, X_val=None, y_val=None): |
| """Train quantile regression models.""" |
| for q in self.quantiles: |
| print(f" Training Q{q:.0%} model...") |
| model = xgb.XGBRegressor( |
| objective='reg:quantileerror', |
| quantile_alpha=q, |
| n_estimators=self.n_estimators, |
| max_depth=self.max_depth, |
| learning_rate=self.learning_rate, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| tree_method='hist', |
| random_state=42, |
| ) |
| |
| eval_set = [(X_train, y_train)] |
| if X_val is not None: |
| eval_set.append((X_val, y_val)) |
| |
| model.fit( |
| X_train, y_train, |
| eval_set=eval_set, |
| verbose=False, |
| ) |
| |
| self.models[q] = model |
| |
| |
| if X_val is not None: |
| self._compute_metrics(X_val, y_val) |
| |
| |
| self._compute_shap(X_train[:min(500, len(X_train))]) |
| |
| return self |
| |
| def predict(self, X): |
| """Predict with uncertainty bounds.""" |
| p10 = self.models[0.1].predict(X) |
| p50 = self.models[0.5].predict(X) |
| p90 = self.models[0.9].predict(X) |
| uncertainty = (p90 - p10) / 2 |
| return p50, p10, p90, uncertainty |
| |
| def _compute_metrics(self, X_val, y_val): |
| """Compute validation metrics.""" |
| p50 = self.models[0.5].predict(X_val) |
| p10 = self.models[0.1].predict(X_val) |
| p90 = self.models[0.9].predict(X_val) |
| |
| mae = mean_absolute_error(y_val, p50) |
| rmse = np.sqrt(mean_squared_error(y_val, p50)) |
| r2 = r2_score(y_val, p50) |
| |
| |
| in_interval = ((y_val >= p10) & (y_val <= p90)).mean() * 100 |
| |
| |
| mean_width = np.mean(p90 - p10) |
| |
| self.train_metrics = { |
| "MAE (s)": round(mae, 2), |
| "RMSE (s)": round(rmse, 2), |
| "R² Score": round(r2, 4), |
| "80% PI Coverage (%)": round(in_interval, 1), |
| "Mean PI Width (s)": round(mean_width, 2), |
| "MAPE (%)": round(np.mean(np.abs(y_val - p50) / np.clip(y_val, 1, None)) * 100, 2), |
| } |
| |
| print(f" Validation Metrics:") |
| for k, v in self.train_metrics.items(): |
| print(f" {k}: {v}") |
| |
| def _compute_shap(self, X_sample): |
| """Compute SHAP values for explainability.""" |
| try: |
| self.explainer = shap.TreeExplainer(self.models[0.5]) |
| self.shap_values = self.explainer(X_sample) |
| except Exception as e: |
| print(f" SHAP computation warning: {e}") |
| |
| self.shap_values = None |
| |
| def get_feature_importance(self) -> pd.DataFrame: |
| """Get feature importance from median model.""" |
| model = self.models[0.5] |
| importance = model.feature_importances_ |
| return pd.DataFrame({ |
| "Feature": self.feature_names, |
| "Importance": importance, |
| }).sort_values("Importance", ascending=False) |
| |
| |
| |
| def plot_predictions_vs_actual(self, X_val, y_val, title=""): |
| """Scatter plot of predicted vs actual travel times.""" |
| p50, p10, p90, _ = self.predict(X_val) |
| |
| fig, axes = plt.subplots(1, 2, figsize=(14, 6)) |
| |
| |
| ax = axes[0] |
| sorted_idx = np.argsort(y_val.values) |
| y_sorted = y_val.values[sorted_idx] |
| p50_s = p50[sorted_idx] |
| p10_s = p10[sorted_idx] |
| p90_s = p90[sorted_idx] |
| |
| x_range = np.arange(len(y_sorted)) |
| ax.scatter(x_range, y_sorted, alpha=0.3, s=8, color='#2c3e50', label='Actual', zorder=3) |
| ax.plot(x_range, p50_s, color='#e74c3c', linewidth=1.5, label='Predicted (P50)', zorder=4) |
| ax.fill_between(x_range, p10_s, p90_s, alpha=0.2, color='#3498db', label='80% PI [P10-P90]', zorder=2) |
| ax.set_xlabel("Sample Index (sorted by actual)", fontsize=11) |
| ax.set_ylabel("Travel Time (seconds)", fontsize=11) |
| ax.set_title(f"Predictions with Uncertainty Bands{' — ' + title if title else ''}", fontsize=12) |
| ax.legend(fontsize=10) |
| ax.grid(alpha=0.3) |
| |
| |
| ax2 = axes[1] |
| residuals = y_val.values - p50 |
| ax2.hist(residuals, bins=50, alpha=0.7, color='#3498db', edgecolor='white') |
| ax2.axvline(0, color='#e74c3c', linestyle='--', linewidth=2, label=f'Zero Error') |
| ax2.axvline(np.mean(residuals), color='#f39c12', linestyle='--', linewidth=2, |
| label=f'Mean: {np.mean(residuals):.1f}s') |
| ax2.set_xlabel("Residual (Actual - Predicted) [seconds]", fontsize=11) |
| ax2.set_ylabel("Count", fontsize=11) |
| ax2.set_title("Residual Distribution", fontsize=12) |
| ax2.legend(fontsize=10) |
| ax2.grid(alpha=0.3) |
| |
| plt.tight_layout() |
| plt.close(fig) |
| return fig |
| |
| def plot_shap_beeswarm(self, max_display=15): |
| """SHAP beeswarm plot showing feature impact distribution.""" |
| if self.shap_values is None: |
| return self._fallback_importance_plot() |
| |
| fig, ax = plt.subplots(figsize=(11, 7)) |
| shap.plots.beeswarm(self.shap_values, max_display=max_display, show=False) |
| plt.title("SHAP Feature Impact on Travel Time Prediction", fontsize=13, fontweight='bold') |
| plt.tight_layout() |
| fig = plt.gcf() |
| plt.close(fig) |
| return fig |
| |
| def plot_shap_bar(self, max_display=15): |
| """SHAP global feature importance bar plot.""" |
| if self.shap_values is None: |
| return self._fallback_importance_plot() |
| |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| shap.plots.bar(self.shap_values, max_display=max_display, show=False) |
| plt.title("Global Feature Importance (Mean |SHAP|)", fontsize=13, fontweight='bold') |
| plt.tight_layout() |
| fig = plt.gcf() |
| plt.close(fig) |
| return fig |
| |
| def plot_shap_waterfall(self, X_sample, idx=0): |
| """SHAP waterfall plot for a single prediction.""" |
| if self.shap_values is None or self.explainer is None: |
| return self._fallback_importance_plot() |
| |
| try: |
| sv = self.explainer(X_sample[idx:idx+1]) |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| shap.plots.waterfall(sv[0], show=False) |
| plt.title(f"SHAP Waterfall — Prediction Breakdown (Sample {idx})", fontsize=12, fontweight='bold') |
| plt.tight_layout() |
| fig = plt.gcf() |
| plt.close(fig) |
| return fig |
| except: |
| return self._fallback_importance_plot() |
| |
| def _fallback_importance_plot(self): |
| """Fallback: XGBoost native feature importance.""" |
| importance_df = self.get_feature_importance() |
| |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| bars = ax.barh(importance_df["Feature"][:15][::-1], |
| importance_df["Importance"][:15][::-1], |
| color='#3498db', edgecolor='white') |
| ax.set_xlabel("Feature Importance (Gain)", fontsize=11) |
| ax.set_title("XGBoost Feature Importance (Fallback)", fontsize=13, fontweight='bold') |
| ax.grid(alpha=0.3, axis='x') |
| plt.tight_layout() |
| plt.close(fig) |
| return fig |
| |
| def plot_quantile_calibration(self, X_val, y_val): |
| """Check if quantile predictions are well-calibrated.""" |
| fig, ax = plt.subplots(figsize=(8, 6)) |
| |
| test_quantiles = [0.1, 0.5, 0.9] |
| observed_below = [] |
| |
| for q in test_quantiles: |
| pred = self.models[q].predict(X_val) |
| frac_below = (y_val.values <= pred).mean() |
| observed_below.append(frac_below) |
| |
| ax.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Perfect Calibration') |
| ax.scatter(test_quantiles, observed_below, s=120, c='#e74c3c', |
| zorder=5, edgecolors='white', linewidth=2) |
| ax.plot(test_quantiles, observed_below, 'r-', alpha=0.7, linewidth=2, label='Model') |
| |
| for q, obs in zip(test_quantiles, observed_below): |
| ax.annotate(f'Q{q:.0%}: {obs:.1%}', (q, obs), |
| textcoords="offset points", xytext=(10, 10), fontsize=10) |
| |
| ax.set_xlabel("Predicted Quantile", fontsize=12) |
| ax.set_ylabel("Observed Fraction Below", fontsize=12) |
| ax.set_title("Quantile Calibration Plot", fontsize=13, fontweight='bold') |
| ax.legend(fontsize=11) |
| ax.set_xlim(-0.05, 1.05) |
| ax.set_ylim(-0.05, 1.05) |
| ax.grid(alpha=0.3) |
| plt.tight_layout() |
| plt.close(fig) |
| return fig |
|
|
|
|
| |
| |
| |
|
|
| def train_apoo_model(n_samples: int = 5000, city_type: str = "metro"): |
| """Full training pipeline for APOO predictor.""" |
| print("=" * 60) |
| print("APOO ML Training Pipeline") |
| print("=" * 60) |
| |
| |
| print("\n[1/4] Generating synthetic Indian traffic data...") |
| gen = IndianTrafficGenerator(seed=42) |
| df = gen.generate_training_data(n_samples=n_samples, city_type=city_type) |
| print(f" Generated {len(df)} samples with {len(FEATURE_COLUMNS)} features") |
| print(f" Target stats: mean={df[TARGET_COLUMN].mean():.1f}s, " |
| f"std={df[TARGET_COLUMN].std():.1f}s, " |
| f"range=[{df[TARGET_COLUMN].min():.1f}, {df[TARGET_COLUMN].max():.1f}]s") |
| |
| |
| print("\n[2/4] Preparing features & splitting data...") |
| X, y = prepare_features(df) |
| X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) |
| print(f" Train: {len(X_train)}, Validation: {len(X_val)}") |
| |
| |
| print("\n[3/4] Training XGBoost quantile models...") |
| predictor = APOOPredictor(n_estimators=300, max_depth=6, learning_rate=0.05) |
| predictor.train(X_train, y_train, X_val, y_val) |
| |
| |
| print("\n[4/4] Training complete!") |
| print(f" Model metrics: {predictor.train_metrics}") |
| |
| return predictor, X_train, X_val, y_train, y_val, df |
|
|
|
|
| if __name__ == "__main__": |
| predictor, X_train, X_val, y_train, y_val, df = train_apoo_model(n_samples=5000) |
| |
| |
| fig1 = predictor.plot_predictions_vs_actual(X_val, y_val) |
| fig1.savefig("/app/pred_vs_actual.png", dpi=150, bbox_inches='tight') |
| |
| fig2 = predictor.plot_shap_beeswarm() |
| fig2.savefig("/app/shap_beeswarm.png", dpi=150, bbox_inches='tight') |
| |
| print("Plots saved.") |
|
|