""" ============================================================================= CIVIC ISSUE DETECTION — POTHOLE SEVERITY SCORING PIPELINE ============================================================================= Produces a trained XGBoost regression model that predicts severity S ∈ [0,1] from 10 engineered features derived from a civic-issue detection system. Pipeline Stages --------------- 1. Synthetic dataset generation (10 000 samples, realistic distributions) 2. Ground-truth severity formula (weighted sum + infrastructure boost + noise) 3. Model training (XGBoost Regressor, 80/20 split) 4. Evaluation (RMSE, MAE, R²) 5. Interpretability (SHAP summary + top-feature analysis) 6. Artefact export (severity_model.json, scaler, feature list) 7. Inference function (predict_severity → score + label) ============================================================================= """ # --------------------------------------------------------------------------- # Imports # --------------------------------------------------------------------------- import json import os import warnings import matplotlib.pyplot as plt import numpy as np import pandas as pd import shap import xgboost as xgb from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler import joblib warnings.filterwarnings("ignore") # Ensure reproducible results RANDOM_SEED = 42 np.random.seed(RANDOM_SEED) # ============================================================================= # STEP 1 — GENERATE SYNTHETIC DATASET # ============================================================================= def generate_synthetic_dataset(n_samples: int = 10_000, seed: int = RANDOM_SEED) -> pd.DataFrame: """ Generate a synthetic dataset with realistic feature distributions for pothole severity modelling. Feature definitions (all in [0, 1]): A — defect area ratio D — defect density C — centrality (closeness to road centre) Q — detection confidence M — multi-user confirmation score T — temporal persistence R — traffic importance (road hierarchy) P — proximity to critical infrastructure F — recurrence frequency X — resolution failure score """ rng = np.random.default_rng(seed) n = n_samples # A: skewed small (most potholes are small) — Beta(2, 8) A = rng.beta(2, 8, n) # D: low-to-moderate, sparse — Beta(1.5, 6) D = rng.beta(1.5, 6, n) # C: uniform (pothole can be anywhere laterally) — Uniform(0, 1) C = rng.uniform(0, 1, n) # Q: high-biased (confident detections) — Beta(8, 2) Q = rng.beta(8, 2, n) # M: sparse confirmations — exponential-ish via Beta(1.2, 8) M = rng.beta(1.2, 8, n) # T: right-skewed (few very old issues) — Beta(1.5, 5) T = rng.beta(1.5, 5, n) # R: categorical road hierarchy mapped to numeric road_types = rng.choice( [1.0, 0.7, 0.4], # highway, main road, local street size=n, p=[0.10, 0.35, 0.55], # realistic road-type proportions ) R = road_types.astype(float) # P: mostly low, few high — Beta(1, 10) P = rng.beta(1, 10, n) # F: low recurrence freq — Beta(1.2, 9) F = rng.beta(1.2, 9, n) # X: very low resolution failure rate — Beta(1, 15) X = rng.beta(1, 15, n) df = pd.DataFrame({ "A": A, "D": D, "C": C, "Q": Q, "M": M, "T": T, "R": R, "P": P, "F": F, "X": X, }) return df # ============================================================================= # STEP 2 — GROUND-TRUTH SEVERITY FORMULA # ============================================================================= def compute_severity(df: pd.DataFrame, noise_std: float = 0.03, seed: int = RANDOM_SEED) -> pd.Series: """ Compute ground-truth severity scores. Formula ------- S_base = 0.28A + 0.10D + 0.14C + 0.04Q + 0.08M + 0.07T + 0.09R + 0.10P + 0.06F + 0.04X K = 1 + 0.5 * P (infrastructure proximity multiplier) S = clamp(S_base * K + noise, 0, 1) """ rng = np.random.default_rng(seed) # Weighted severity base S_base = ( 0.28 * df["A"] + 0.10 * df["D"] + 0.14 * df["C"] + 0.04 * df["Q"] + 0.08 * df["M"] + 0.07 * df["T"] + 0.09 * df["R"] + 0.10 * df["P"] + 0.06 * df["F"] + 0.04 * df["X"] ) # Critical-infrastructure proximity multiplier K = 1 + 0.5 * df["P"] # Boosted severity S_raw = S_base * K # Add Gaussian noise, clamp to [0, 1] noise = rng.normal(loc=0, scale=noise_std, size=len(df)) S = np.clip(S_raw + noise, 0, 1) return pd.Series(S, name="severity", index=df.index) # ============================================================================= # STEP 3 — TRAIN XGBOOST MODEL # ============================================================================= FEATURE_COLS = ["A", "D", "C", "Q", "M", "T", "R", "P", "F", "X"] def build_and_train_model( X_train: np.ndarray, y_train: np.ndarray, seed: int = RANDOM_SEED, ) -> xgb.XGBRegressor: """ Instantiate and train an XGBoost Regressor on the training split. Hyperparameters are fixed as specified; no tuning loop is performed here (add GridSearchCV / Optuna wrapping for production hyper-opt). """ model = xgb.XGBRegressor( objective="reg:squarederror", n_estimators=200, max_depth=5, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, random_state=seed, verbosity=0, n_jobs=-1, ) print("── Training XGBoost Regressor …") model.fit(X_train, y_train) print(" Training complete.\n") return model # ============================================================================= # STEP 4 — EVALUATION # ============================================================================= def evaluate_model( model: xgb.XGBRegressor, X_test: np.ndarray, y_test: np.ndarray, feature_names: list[str], ) -> dict: """ Compute RMSE, MAE, R² and print feature importance ranking. Returns a dict of metric values. """ y_pred = model.predict(X_test) rmse = np.sqrt(mean_squared_error(y_test, y_pred)) mae = mean_absolute_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) print("=" * 50) print(" MODEL EVALUATION METRICS") print("=" * 50) print(f" RMSE : {rmse:.6f}") print(f" MAE : {mae:.6f}") print(f" R² : {r2:.6f}") print("=" * 50) # Feature importance (gain-based) importances = model.feature_importances_ importance_df = ( pd.DataFrame({"Feature": feature_names, "Importance": importances}) .sort_values("Importance", ascending=False) .reset_index(drop=True) ) print("\n FEATURE IMPORTANCE RANKING (gain)") print(" " + "-" * 36) for _, row in importance_df.iterrows(): bar = "█" * int(row["Importance"] * 100) print(f" {row['Feature']:>3} {row['Importance']:.4f} {bar}") print() return {"rmse": rmse, "mae": mae, "r2": r2, "importance": importance_df} # ============================================================================= # STEP 5 — SHAP INTERPRETABILITY # ============================================================================= def run_shap_analysis( model: xgb.XGBRegressor, X_test: np.ndarray, feature_names: list[str], output_dir: str = ".", ) -> None: """ Generate SHAP summary plot and print mean |SHAP| feature ranking. Verifies that A, C, P dominate the explanation. """ print("── Running SHAP analysis …") explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_test) # ── Summary bar plot ────────────────────────────────────────────────── plt.figure(figsize=(10, 6)) shap.summary_plot( shap_values, X_test, feature_names=feature_names, plot_type="bar", show=False, ) plt.title("SHAP Feature Importance — Mean |SHAP value|", fontsize=14, fontweight="bold") plt.tight_layout() bar_path = os.path.join(output_dir, "shap_bar_plot.png") plt.savefig(bar_path, dpi=150, bbox_inches="tight") plt.close() print(f" Saved: {bar_path}") # ── Beeswarm / dot summary plot ─────────────────────────────────────── plt.figure(figsize=(10, 6)) shap.summary_plot( shap_values, X_test, feature_names=feature_names, show=False, ) plt.title("SHAP Summary Plot — Impact on Severity Score", fontsize=14, fontweight="bold") plt.tight_layout() dot_path = os.path.join(output_dir, "shap_dot_plot.png") plt.savefig(dot_path, dpi=150, bbox_inches="tight") plt.close() print(f" Saved: {dot_path}\n") # ── Mean |SHAP| ranking ─────────────────────────────────────────────── mean_shap = np.abs(shap_values).mean(axis=0) shap_df = ( pd.DataFrame({"Feature": feature_names, "Mean|SHAP|": mean_shap}) .sort_values("Mean|SHAP|", ascending=False) .reset_index(drop=True) ) print(" SHAP MEAN |VALUE| RANKING") print(" " + "-" * 36) top3 = shap_df["Feature"].head(3).tolist() for rank, (_, row) in enumerate(shap_df.iterrows(), start=1): tag = " ◀ dominant" if row["Feature"] in ["A", "C", "P"] else "" print(f" #{rank:<2} {row['Feature']:>3} {row['Mean|SHAP|']:.5f}{tag}") print() # Verify dominance of A, C, P expected_dominant = {"A", "C", "P"} actual_top3 = set(top3) overlap = expected_dominant & actual_top3 if len(overlap) >= 2: print(f" ✅ Dominance check PASSED — {overlap} appear in top-3 SHAP features.") else: print(f" ⚠️ Dominance check NOTE — top-3 are {top3}; " "model learned different patterns from the data.") print() # ============================================================================= # STEP 6 — SAVE MODEL & ARTEFACTS # ============================================================================= def save_artefacts( model: xgb.XGBRegressor, scaler: MinMaxScaler | None, feature_names: list[str], output_dir: str = ".", ) -> None: """ Export: severity_model.json — XGBoost model (native JSON format) feature_scaler.pkl — fitted MinMaxScaler (or None sentinel) feature_list.json — ordered list of feature names """ os.makedirs(output_dir, exist_ok=True) # XGBoost native JSON model_path = os.path.join(output_dir, "severity_model.json") model.save_model(model_path) print(f"── Model saved: {model_path}") # Scaler scaler_path = os.path.join(output_dir, "feature_scaler.pkl") joblib.dump(scaler, scaler_path) print(f"── Scaler saved: {scaler_path}") # Feature list feature_path = os.path.join(output_dir, "feature_list.json") with open(feature_path, "w") as fp: json.dump(feature_names, fp, indent=2) print(f"── Feature list saved: {feature_path}\n") # ============================================================================= # STEP 7 — INFERENCE FUNCTION # ============================================================================= def load_inference_artefacts( model_path: str = "severity_model.json", scaler_path: str = "feature_scaler.pkl", feature_list_path: str = "feature_list.json", ) -> tuple[xgb.XGBRegressor, MinMaxScaler | None, list[str]]: """Load saved model, scaler, and feature list for inference.""" model = xgb.XGBRegressor() model.load_model(model_path) scaler = joblib.load(scaler_path) with open(feature_list_path) as fp: feature_names = json.load(fp) return model, scaler, feature_names def _severity_label(score: float) -> str: """ Assign a human-readable label to a numeric severity score. Thresholds (domain-tunable): Low : score < 0.33 Medium : 0.33 ≤ score < 0.66 High : score ≥ 0.66 """ if score < 0.33: return "Low" elif score < 0.66: return "Medium" else: return "High" def predict_severity( features_dict: dict, model: xgb.XGBRegressor, scaler: MinMaxScaler | None, feature_names: list[str], ) -> dict: """ Predict severity for a single pothole observation. Parameters ---------- features_dict : dict Keys must match feature_names; values are raw (pre-scaling) floats. model : trained XGBRegressor scaler : fitted MinMaxScaler (or None if features are already scaled) feature_names : ordered list of feature column names Returns ------- dict with: "score" : float — predicted severity in [0, 1] "label" : str — "Low" | "Medium" | "High" """ # Validate input keys missing = set(feature_names) - set(features_dict.keys()) if missing: raise ValueError(f"Missing features in input dict: {missing}") # Build ordered feature vector row = np.array([[features_dict[f] for f in feature_names]], dtype=np.float32) # Apply scaler if provided if scaler is not None: row = scaler.transform(row) # Predict and clamp raw_score = float(model.predict(row)[0]) score = float(np.clip(raw_score, 0.0, 1.0)) label = _severity_label(score) return {"score": round(score, 4), "label": label} # ============================================================================= # MAIN PIPELINE RUNNER # ============================================================================= def main(output_dir: str = ".") -> None: print("\n" + "=" * 60) print(" CIVIC POTHOLE SEVERITY SCORING — FULL ML PIPELINE") print("=" * 60 + "\n") # ── 1. Generate dataset ────────────────────────────────────────────── print("── [1/7] Generating synthetic dataset …") df = generate_synthetic_dataset(n_samples=10_000) y = compute_severity(df) # Save the dataset for persistence/user inspection full_dataset = df.copy() full_dataset['severity'] = y dataset_path = os.path.join(output_dir, "synthetic_pothole_data.csv") full_dataset.to_csv(dataset_path, index=False) print(f" Dataset shape : {df.shape}") print(f" Dataset saved to: {dataset_path}") print(f" Severity stats: mean={y.mean():.4f}, std={y.std():.4f}, " f"min={y.min():.4f}, max={y.max():.4f}\n") # ── 2. Feature scaling ─────────────────────────────────────────────── print("── [2/7] Scaling features (MinMaxScaler) …") # NOTE: Features are already in [0, 1] by construction, but we fit a # scaler so the inference function can handle raw un-normalised inputs # if the production system requires it. scaler = MinMaxScaler() X_scaled = scaler.fit_transform(df[FEATURE_COLS]) print(" Scaling complete.\n") # ── 3. Train / test split ──────────────────────────────────────────── print("── [3/7] Splitting data (80 % train / 20 % test) …") X_train, X_test, y_train, y_test = train_test_split( X_scaled, y, test_size=0.20, random_state=RANDOM_SEED ) print(f" Train samples : {len(X_train)}") print(f" Test samples : {len(X_test)}\n") # ── 4. Train model ─────────────────────────────────────────────────── print("── [4/7] Training model …") model = build_and_train_model(X_train, y_train) # ── 5. Evaluate ────────────────────────────────────────────────────── print("── [5/7] Evaluating model …\n") metrics = evaluate_model(model, X_test, y_test, FEATURE_COLS) # ── 6. SHAP ────────────────────────────────────────────────────────── print("── [6/7] SHAP interpretability …\n") run_shap_analysis(model, X_test, FEATURE_COLS, output_dir=output_dir) # ── 7. Save artefacts ──────────────────────────────────────────────── print("── [7/7] Saving model artefacts …") save_artefacts(model, scaler, FEATURE_COLS, output_dir=output_dir) # ── Sample predictions ─────────────────────────────────────────────── print("=" * 60) print(" SAMPLE PREDICTIONS") print("=" * 60) sample_cases = [ { "name": "Minor Local-Street Pothole", "features": dict(zip(FEATURE_COLS, [0.05, 0.08, 0.30, 0.90, 0.05, 0.10, 0.40, 0.02, 0.03, 0.01])), }, { "name": "Moderate Main-Road Pothole", "features": dict(zip(FEATURE_COLS, [0.25, 0.20, 0.55, 0.75, 0.35, 0.40, 0.70, 0.15, 0.20, 0.10])), }, { "name": "Severe Highway near Hospital", "features": dict(zip(FEATURE_COLS, [0.70, 0.55, 0.85, 0.95, 0.80, 0.75, 1.00, 0.90, 0.65, 0.40])), }, { "name": "Recurring Pothole (high reopen)", "features": dict(zip(FEATURE_COLS, [0.40, 0.35, 0.60, 0.80, 0.50, 0.85, 0.70, 0.30, 0.75, 0.80])), }, ] for case in sample_cases: result = predict_severity( features_dict=case["features"], model=model, scaler=scaler, feature_names=FEATURE_COLS, ) print(f"\n 📍 {case['name']}") feature_str = ", ".join(f"{k}={v}" for k, v in case["features"].items()) print(f" Features : {feature_str}") print(f" Score : {result['score']:.4f}") print(f" Label : {result['label']}") print("\n" + "=" * 60) print(" PIPELINE COMPLETE") print(f" Output artefacts → {os.path.abspath(output_dir)}") print("=" * 60 + "\n") if __name__ == "__main__": # Output directory for all saved files (same folder as this script) OUTPUT_DIR = os.path.dirname(os.path.abspath(__file__)) main(output_dir=OUTPUT_DIR)