| """ |
| ============================================================================= |
| CIVIC ISSUE DETECTION β POTHOLE SEVERITY SCORING PIPELINE |
| ============================================================================= |
| Produces a trained XGBoost regression model that predicts severity S β [0,1] |
| from 10 engineered features derived from a civic-issue detection system. |
| |
| Pipeline Stages |
| --------------- |
| 1. Synthetic dataset generation (10 000 samples, realistic distributions) |
| 2. Ground-truth severity formula (weighted sum + infrastructure boost + noise) |
| 3. Model training (XGBoost Regressor, 80/20 split) |
| 4. Evaluation (RMSE, MAE, RΒ²) |
| 5. Interpretability (SHAP summary + top-feature analysis) |
| 6. Artefact export (severity_model.json, scaler, feature list) |
| 7. Inference function (predict_severity β score + label) |
| ============================================================================= |
| """ |
|
|
| |
| |
| |
| import json |
| import os |
| import warnings |
|
|
| import matplotlib.pyplot as plt |
| import numpy as np |
| import pandas as pd |
| import shap |
| import xgboost as xgb |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import MinMaxScaler |
| import joblib |
|
|
| warnings.filterwarnings("ignore") |
|
|
| |
| RANDOM_SEED = 42 |
| np.random.seed(RANDOM_SEED) |
|
|
|
|
| |
| |
| |
|
|
| def generate_synthetic_dataset(n_samples: int = 10_000, seed: int = RANDOM_SEED) -> pd.DataFrame: |
| """ |
| Generate a synthetic dataset with realistic feature distributions for |
| pothole severity modelling. |
| |
| Feature definitions (all in [0, 1]): |
| A β defect area ratio |
| D β defect density |
| C β centrality (closeness to road centre) |
| Q β detection confidence |
| M β multi-user confirmation score |
| T β temporal persistence |
| R β traffic importance (road hierarchy) |
| P β proximity to critical infrastructure |
| F β recurrence frequency |
| X β resolution failure score |
| """ |
| rng = np.random.default_rng(seed) |
|
|
| n = n_samples |
|
|
| |
| A = rng.beta(2, 8, n) |
|
|
| |
| D = rng.beta(1.5, 6, n) |
|
|
| |
| C = rng.uniform(0, 1, n) |
|
|
| |
| Q = rng.beta(8, 2, n) |
|
|
| |
| M = rng.beta(1.2, 8, n) |
|
|
| |
| T = rng.beta(1.5, 5, n) |
|
|
| |
| road_types = rng.choice( |
| [1.0, 0.7, 0.4], |
| size=n, |
| p=[0.10, 0.35, 0.55], |
| ) |
| R = road_types.astype(float) |
|
|
| |
| P = rng.beta(1, 10, n) |
|
|
| |
| F = rng.beta(1.2, 9, n) |
|
|
| |
| X = rng.beta(1, 15, n) |
|
|
| df = pd.DataFrame({ |
| "A": A, |
| "D": D, |
| "C": C, |
| "Q": Q, |
| "M": M, |
| "T": T, |
| "R": R, |
| "P": P, |
| "F": F, |
| "X": X, |
| }) |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
| def compute_severity(df: pd.DataFrame, noise_std: float = 0.03, seed: int = RANDOM_SEED) -> pd.Series: |
| """ |
| Compute ground-truth severity scores. |
| |
| Formula |
| ------- |
| S_base = 0.28A + 0.10D + 0.14C + 0.04Q + |
| 0.08M + 0.07T + 0.09R + 0.10P + |
| 0.06F + 0.04X |
| |
| K = 1 + 0.5 * P (infrastructure proximity multiplier) |
| |
| S = clamp(S_base * K + noise, 0, 1) |
| """ |
| rng = np.random.default_rng(seed) |
|
|
| |
| S_base = ( |
| 0.28 * df["A"] + |
| 0.10 * df["D"] + |
| 0.14 * df["C"] + |
| 0.04 * df["Q"] + |
| 0.08 * df["M"] + |
| 0.07 * df["T"] + |
| 0.09 * df["R"] + |
| 0.10 * df["P"] + |
| 0.06 * df["F"] + |
| 0.04 * df["X"] |
| ) |
|
|
| |
| K = 1 + 0.5 * df["P"] |
|
|
| |
| S_raw = S_base * K |
|
|
| |
| noise = rng.normal(loc=0, scale=noise_std, size=len(df)) |
| S = np.clip(S_raw + noise, 0, 1) |
|
|
| return pd.Series(S, name="severity", index=df.index) |
|
|
|
|
| |
| |
| |
|
|
| FEATURE_COLS = ["A", "D", "C", "Q", "M", "T", "R", "P", "F", "X"] |
|
|
| def build_and_train_model( |
| X_train: np.ndarray, |
| y_train: np.ndarray, |
| seed: int = RANDOM_SEED, |
| ) -> xgb.XGBRegressor: |
| """ |
| Instantiate and train an XGBoost Regressor on the training split. |
| |
| Hyperparameters are fixed as specified; no tuning loop is performed here |
| (add GridSearchCV / Optuna wrapping for production hyper-opt). |
| """ |
| model = xgb.XGBRegressor( |
| objective="reg:squarederror", |
| n_estimators=200, |
| max_depth=5, |
| learning_rate=0.05, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| random_state=seed, |
| verbosity=0, |
| n_jobs=-1, |
| ) |
|
|
| print("ββ Training XGBoost Regressor β¦") |
| model.fit(X_train, y_train) |
| print(" Training complete.\n") |
| return model |
|
|
|
|
| |
| |
| |
|
|
| def evaluate_model( |
| model: xgb.XGBRegressor, |
| X_test: np.ndarray, |
| y_test: np.ndarray, |
| feature_names: list[str], |
| ) -> dict: |
| """ |
| Compute RMSE, MAE, RΒ² and print feature importance ranking. |
| Returns a dict of metric values. |
| """ |
| y_pred = model.predict(X_test) |
|
|
| rmse = np.sqrt(mean_squared_error(y_test, y_pred)) |
| mae = mean_absolute_error(y_test, y_pred) |
| r2 = r2_score(y_test, y_pred) |
|
|
| print("=" * 50) |
| print(" MODEL EVALUATION METRICS") |
| print("=" * 50) |
| print(f" RMSE : {rmse:.6f}") |
| print(f" MAE : {mae:.6f}") |
| print(f" RΒ² : {r2:.6f}") |
| print("=" * 50) |
|
|
| |
| importances = model.feature_importances_ |
| importance_df = ( |
| pd.DataFrame({"Feature": feature_names, "Importance": importances}) |
| .sort_values("Importance", ascending=False) |
| .reset_index(drop=True) |
| ) |
|
|
| print("\n FEATURE IMPORTANCE RANKING (gain)") |
| print(" " + "-" * 36) |
| for _, row in importance_df.iterrows(): |
| bar = "β" * int(row["Importance"] * 100) |
| print(f" {row['Feature']:>3} {row['Importance']:.4f} {bar}") |
| print() |
|
|
| return {"rmse": rmse, "mae": mae, "r2": r2, "importance": importance_df} |
|
|
|
|
| |
| |
| |
|
|
| def run_shap_analysis( |
| model: xgb.XGBRegressor, |
| X_test: np.ndarray, |
| feature_names: list[str], |
| output_dir: str = ".", |
| ) -> None: |
| """ |
| Generate SHAP summary plot and print mean |SHAP| feature ranking. |
| Verifies that A, C, P dominate the explanation. |
| """ |
| print("ββ Running SHAP analysis β¦") |
|
|
| explainer = shap.TreeExplainer(model) |
| shap_values = explainer.shap_values(X_test) |
|
|
| |
| plt.figure(figsize=(10, 6)) |
| shap.summary_plot( |
| shap_values, |
| X_test, |
| feature_names=feature_names, |
| plot_type="bar", |
| show=False, |
| ) |
| plt.title("SHAP Feature Importance β Mean |SHAP value|", fontsize=14, fontweight="bold") |
| plt.tight_layout() |
| bar_path = os.path.join(output_dir, "shap_bar_plot.png") |
| plt.savefig(bar_path, dpi=150, bbox_inches="tight") |
| plt.close() |
| print(f" Saved: {bar_path}") |
|
|
| |
| plt.figure(figsize=(10, 6)) |
| shap.summary_plot( |
| shap_values, |
| X_test, |
| feature_names=feature_names, |
| show=False, |
| ) |
| plt.title("SHAP Summary Plot β Impact on Severity Score", fontsize=14, fontweight="bold") |
| plt.tight_layout() |
| dot_path = os.path.join(output_dir, "shap_dot_plot.png") |
| plt.savefig(dot_path, dpi=150, bbox_inches="tight") |
| plt.close() |
| print(f" Saved: {dot_path}\n") |
|
|
| |
| mean_shap = np.abs(shap_values).mean(axis=0) |
| shap_df = ( |
| pd.DataFrame({"Feature": feature_names, "Mean|SHAP|": mean_shap}) |
| .sort_values("Mean|SHAP|", ascending=False) |
| .reset_index(drop=True) |
| ) |
|
|
| print(" SHAP MEAN |VALUE| RANKING") |
| print(" " + "-" * 36) |
| top3 = shap_df["Feature"].head(3).tolist() |
| for rank, (_, row) in enumerate(shap_df.iterrows(), start=1): |
| tag = " β dominant" if row["Feature"] in ["A", "C", "P"] else "" |
| print(f" #{rank:<2} {row['Feature']:>3} {row['Mean|SHAP|']:.5f}{tag}") |
| print() |
|
|
| |
| expected_dominant = {"A", "C", "P"} |
| actual_top3 = set(top3) |
| overlap = expected_dominant & actual_top3 |
| if len(overlap) >= 2: |
| print(f" β
Dominance check PASSED β {overlap} appear in top-3 SHAP features.") |
| else: |
| print(f" β οΈ Dominance check NOTE β top-3 are {top3}; " |
| "model learned different patterns from the data.") |
| print() |
|
|
|
|
| |
| |
| |
|
|
| def save_artefacts( |
| model: xgb.XGBRegressor, |
| scaler: MinMaxScaler | None, |
| feature_names: list[str], |
| output_dir: str = ".", |
| ) -> None: |
| """ |
| Export: |
| severity_model.json β XGBoost model (native JSON format) |
| feature_scaler.pkl β fitted MinMaxScaler (or None sentinel) |
| feature_list.json β ordered list of feature names |
| """ |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| |
| model_path = os.path.join(output_dir, "severity_model.json") |
| model.save_model(model_path) |
| print(f"ββ Model saved: {model_path}") |
|
|
| |
| scaler_path = os.path.join(output_dir, "feature_scaler.pkl") |
| joblib.dump(scaler, scaler_path) |
| print(f"ββ Scaler saved: {scaler_path}") |
|
|
| |
| feature_path = os.path.join(output_dir, "feature_list.json") |
| with open(feature_path, "w") as fp: |
| json.dump(feature_names, fp, indent=2) |
| print(f"ββ Feature list saved: {feature_path}\n") |
|
|
|
|
| |
| |
| |
|
|
| def load_inference_artefacts( |
| model_path: str = "severity_model.json", |
| scaler_path: str = "feature_scaler.pkl", |
| feature_list_path: str = "feature_list.json", |
| ) -> tuple[xgb.XGBRegressor, MinMaxScaler | None, list[str]]: |
| """Load saved model, scaler, and feature list for inference.""" |
| model = xgb.XGBRegressor() |
| model.load_model(model_path) |
|
|
| scaler = joblib.load(scaler_path) |
|
|
| with open(feature_list_path) as fp: |
| feature_names = json.load(fp) |
|
|
| return model, scaler, feature_names |
|
|
|
|
| def _severity_label(score: float) -> str: |
| """ |
| Assign a human-readable label to a numeric severity score. |
| |
| Thresholds (domain-tunable): |
| Low : score < 0.33 |
| Medium : 0.33 β€ score < 0.66 |
| High : score β₯ 0.66 |
| """ |
| if score < 0.33: |
| return "Low" |
| elif score < 0.66: |
| return "Medium" |
| else: |
| return "High" |
|
|
|
|
| def predict_severity( |
| features_dict: dict, |
| model: xgb.XGBRegressor, |
| scaler: MinMaxScaler | None, |
| feature_names: list[str], |
| ) -> dict: |
| """ |
| Predict severity for a single pothole observation. |
| |
| Parameters |
| ---------- |
| features_dict : dict |
| Keys must match feature_names; values are raw (pre-scaling) floats. |
| model : trained XGBRegressor |
| scaler : fitted MinMaxScaler (or None if features are already scaled) |
| feature_names : ordered list of feature column names |
| |
| Returns |
| ------- |
| dict with: |
| "score" : float β predicted severity in [0, 1] |
| "label" : str β "Low" | "Medium" | "High" |
| """ |
| |
| missing = set(feature_names) - set(features_dict.keys()) |
| if missing: |
| raise ValueError(f"Missing features in input dict: {missing}") |
|
|
| |
| row = np.array([[features_dict[f] for f in feature_names]], dtype=np.float32) |
|
|
| |
| if scaler is not None: |
| row = scaler.transform(row) |
|
|
| |
| raw_score = float(model.predict(row)[0]) |
| score = float(np.clip(raw_score, 0.0, 1.0)) |
| label = _severity_label(score) |
|
|
| return {"score": round(score, 4), "label": label} |
|
|
|
|
| |
| |
| |
|
|
| def main(output_dir: str = ".") -> None: |
| print("\n" + "=" * 60) |
| print(" CIVIC POTHOLE SEVERITY SCORING β FULL ML PIPELINE") |
| print("=" * 60 + "\n") |
|
|
| |
| print("ββ [1/7] Generating synthetic dataset β¦") |
| df = generate_synthetic_dataset(n_samples=10_000) |
| y = compute_severity(df) |
| |
| |
| full_dataset = df.copy() |
| full_dataset['severity'] = y |
| dataset_path = os.path.join(output_dir, "synthetic_pothole_data.csv") |
| full_dataset.to_csv(dataset_path, index=False) |
| |
| print(f" Dataset shape : {df.shape}") |
| print(f" Dataset saved to: {dataset_path}") |
| print(f" Severity stats: mean={y.mean():.4f}, std={y.std():.4f}, " |
| f"min={y.min():.4f}, max={y.max():.4f}\n") |
|
|
| |
| print("ββ [2/7] Scaling features (MinMaxScaler) β¦") |
| |
| |
| |
| scaler = MinMaxScaler() |
| X_scaled = scaler.fit_transform(df[FEATURE_COLS]) |
| print(" Scaling complete.\n") |
|
|
| |
| print("ββ [3/7] Splitting data (80 % train / 20 % test) β¦") |
| X_train, X_test, y_train, y_test = train_test_split( |
| X_scaled, y, test_size=0.20, random_state=RANDOM_SEED |
| ) |
| print(f" Train samples : {len(X_train)}") |
| print(f" Test samples : {len(X_test)}\n") |
|
|
| |
| print("ββ [4/7] Training model β¦") |
| model = build_and_train_model(X_train, y_train) |
|
|
| |
| print("ββ [5/7] Evaluating model β¦\n") |
| metrics = evaluate_model(model, X_test, y_test, FEATURE_COLS) |
|
|
| |
| print("ββ [6/7] SHAP interpretability β¦\n") |
| run_shap_analysis(model, X_test, FEATURE_COLS, output_dir=output_dir) |
|
|
| |
| print("ββ [7/7] Saving model artefacts β¦") |
| save_artefacts(model, scaler, FEATURE_COLS, output_dir=output_dir) |
|
|
| |
| print("=" * 60) |
| print(" SAMPLE PREDICTIONS") |
| print("=" * 60) |
|
|
| sample_cases = [ |
| { |
| "name": "Minor Local-Street Pothole", |
| "features": dict(zip(FEATURE_COLS, |
| [0.05, 0.08, 0.30, 0.90, 0.05, 0.10, 0.40, 0.02, 0.03, 0.01])), |
| }, |
| { |
| "name": "Moderate Main-Road Pothole", |
| "features": dict(zip(FEATURE_COLS, |
| [0.25, 0.20, 0.55, 0.75, 0.35, 0.40, 0.70, 0.15, 0.20, 0.10])), |
| }, |
| { |
| "name": "Severe Highway near Hospital", |
| "features": dict(zip(FEATURE_COLS, |
| [0.70, 0.55, 0.85, 0.95, 0.80, 0.75, 1.00, 0.90, 0.65, 0.40])), |
| }, |
| { |
| "name": "Recurring Pothole (high reopen)", |
| "features": dict(zip(FEATURE_COLS, |
| [0.40, 0.35, 0.60, 0.80, 0.50, 0.85, 0.70, 0.30, 0.75, 0.80])), |
| }, |
| ] |
|
|
| for case in sample_cases: |
| result = predict_severity( |
| features_dict=case["features"], |
| model=model, |
| scaler=scaler, |
| feature_names=FEATURE_COLS, |
| ) |
| print(f"\n π {case['name']}") |
| feature_str = ", ".join(f"{k}={v}" for k, v in case["features"].items()) |
| print(f" Features : {feature_str}") |
| print(f" Score : {result['score']:.4f}") |
| print(f" Label : {result['label']}") |
|
|
| print("\n" + "=" * 60) |
| print(" PIPELINE COMPLETE") |
| print(f" Output artefacts β {os.path.abspath(output_dir)}") |
| print("=" * 60 + "\n") |
|
|
|
|
| if __name__ == "__main__": |
| |
| OUTPUT_DIR = os.path.dirname(os.path.abspath(__file__)) |
| main(output_dir=OUTPUT_DIR) |
|
|