Spaces:
Sleeping
Sleeping
| """ | |
| ============================================================================= | |
| CIVIC ISSUE DETECTION β POTHOLE SEVERITY SCORING PIPELINE | |
| ============================================================================= | |
| Produces a trained XGBoost regression model that predicts severity S β [0,1] | |
| from 10 engineered features derived from a civic-issue detection system. | |
| Pipeline Stages | |
| --------------- | |
| 1. Synthetic dataset generation (10 000 samples, realistic distributions) | |
| 2. Ground-truth severity formula (weighted sum + infrastructure boost + noise) | |
| 3. Model training (XGBoost Regressor, 80/20 split) | |
| 4. Evaluation (RMSE, MAE, RΒ²) | |
| 5. Interpretability (SHAP summary + top-feature analysis) | |
| 6. Artefact export (severity_model.json, scaler, feature list) | |
| 7. Inference function (predict_severity β score + label) | |
| ============================================================================= | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # Imports | |
| # --------------------------------------------------------------------------- | |
| import json | |
| import os | |
| import warnings | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import shap | |
| import xgboost as xgb | |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import MinMaxScaler | |
| import joblib | |
| warnings.filterwarnings("ignore") | |
| # Ensure reproducible results | |
| RANDOM_SEED = 42 | |
| np.random.seed(RANDOM_SEED) | |
| # ============================================================================= | |
| # STEP 1 β GENERATE SYNTHETIC DATASET | |
| # ============================================================================= | |
| def generate_synthetic_dataset(n_samples: int = 10_000, seed: int = RANDOM_SEED) -> pd.DataFrame: | |
| """ | |
| Generate a synthetic dataset with realistic feature distributions for | |
| pothole severity modelling. | |
| Feature definitions (all in [0, 1]): | |
| A β defect area ratio | |
| D β defect density | |
| C β centrality (closeness to road centre) | |
| Q β detection confidence | |
| M β multi-user confirmation score | |
| T β temporal persistence | |
| R β traffic importance (road hierarchy) | |
| P β proximity to critical infrastructure | |
| F β recurrence frequency | |
| X β resolution failure score | |
| """ | |
| rng = np.random.default_rng(seed) | |
| n = n_samples | |
| # A: skewed small (most potholes are small) β Beta(2, 8) | |
| A = rng.beta(2, 8, n) | |
| # D: low-to-moderate, sparse β Beta(1.5, 6) | |
| D = rng.beta(1.5, 6, n) | |
| # C: uniform (pothole can be anywhere laterally) β Uniform(0, 1) | |
| C = rng.uniform(0, 1, n) | |
| # Q: high-biased (confident detections) β Beta(8, 2) | |
| Q = rng.beta(8, 2, n) | |
| # M: sparse confirmations β exponential-ish via Beta(1.2, 8) | |
| M = rng.beta(1.2, 8, n) | |
| # T: right-skewed (few very old issues) β Beta(1.5, 5) | |
| T = rng.beta(1.5, 5, n) | |
| # R: categorical road hierarchy mapped to numeric | |
| road_types = rng.choice( | |
| [1.0, 0.7, 0.4], # highway, main road, local street | |
| size=n, | |
| p=[0.10, 0.35, 0.55], # realistic road-type proportions | |
| ) | |
| R = road_types.astype(float) | |
| # P: mostly low, few high β Beta(1, 10) | |
| P = rng.beta(1, 10, n) | |
| # F: low recurrence freq β Beta(1.2, 9) | |
| F = rng.beta(1.2, 9, n) | |
| # X: very low resolution failure rate β Beta(1, 15) | |
| X = rng.beta(1, 15, n) | |
| df = pd.DataFrame({ | |
| "A": A, | |
| "D": D, | |
| "C": C, | |
| "Q": Q, | |
| "M": M, | |
| "T": T, | |
| "R": R, | |
| "P": P, | |
| "F": F, | |
| "X": X, | |
| }) | |
| return df | |
| # ============================================================================= | |
| # STEP 2 β GROUND-TRUTH SEVERITY FORMULA | |
| # ============================================================================= | |
| def compute_severity(df: pd.DataFrame, noise_std: float = 0.03, seed: int = RANDOM_SEED) -> pd.Series: | |
| """ | |
| Compute ground-truth severity scores. | |
| Formula | |
| ------- | |
| S_base = 0.28A + 0.10D + 0.14C + 0.04Q + | |
| 0.08M + 0.07T + 0.09R + 0.10P + | |
| 0.06F + 0.04X | |
| K = 1 + 0.5 * P (infrastructure proximity multiplier) | |
| S = clamp(S_base * K + noise, 0, 1) | |
| """ | |
| rng = np.random.default_rng(seed) | |
| # Weighted severity base | |
| S_base = ( | |
| 0.28 * df["A"] + | |
| 0.10 * df["D"] + | |
| 0.14 * df["C"] + | |
| 0.04 * df["Q"] + | |
| 0.08 * df["M"] + | |
| 0.07 * df["T"] + | |
| 0.09 * df["R"] + | |
| 0.10 * df["P"] + | |
| 0.06 * df["F"] + | |
| 0.04 * df["X"] | |
| ) | |
| # Critical-infrastructure proximity multiplier | |
| K = 1 + 0.5 * df["P"] | |
| # Boosted severity | |
| S_raw = S_base * K | |
| # Add Gaussian noise, clamp to [0, 1] | |
| noise = rng.normal(loc=0, scale=noise_std, size=len(df)) | |
| S = np.clip(S_raw + noise, 0, 1) | |
| return pd.Series(S, name="severity", index=df.index) | |
| # ============================================================================= | |
| # STEP 3 β TRAIN XGBOOST MODEL | |
| # ============================================================================= | |
| FEATURE_COLS = ["A", "D", "C", "Q", "M", "T", "R", "P", "F", "X"] | |
| def build_and_train_model( | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| seed: int = RANDOM_SEED, | |
| ) -> xgb.XGBRegressor: | |
| """ | |
| Instantiate and train an XGBoost Regressor on the training split. | |
| Hyperparameters are fixed as specified; no tuning loop is performed here | |
| (add GridSearchCV / Optuna wrapping for production hyper-opt). | |
| """ | |
| model = xgb.XGBRegressor( | |
| objective="reg:squarederror", | |
| n_estimators=200, | |
| max_depth=5, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| random_state=seed, | |
| verbosity=0, | |
| n_jobs=-1, | |
| ) | |
| print("ββ Training XGBoost Regressor β¦") | |
| model.fit(X_train, y_train) | |
| print(" Training complete.\n") | |
| return model | |
| # ============================================================================= | |
| # STEP 4 β EVALUATION | |
| # ============================================================================= | |
| def evaluate_model( | |
| model: xgb.XGBRegressor, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| feature_names: list[str], | |
| ) -> dict: | |
| """ | |
| Compute RMSE, MAE, RΒ² and print feature importance ranking. | |
| Returns a dict of metric values. | |
| """ | |
| y_pred = model.predict(X_test) | |
| rmse = np.sqrt(mean_squared_error(y_test, y_pred)) | |
| mae = mean_absolute_error(y_test, y_pred) | |
| r2 = r2_score(y_test, y_pred) | |
| print("=" * 50) | |
| print(" MODEL EVALUATION METRICS") | |
| print("=" * 50) | |
| print(f" RMSE : {rmse:.6f}") | |
| print(f" MAE : {mae:.6f}") | |
| print(f" RΒ² : {r2:.6f}") | |
| print("=" * 50) | |
| # Feature importance (gain-based) | |
| importances = model.feature_importances_ | |
| importance_df = ( | |
| pd.DataFrame({"Feature": feature_names, "Importance": importances}) | |
| .sort_values("Importance", ascending=False) | |
| .reset_index(drop=True) | |
| ) | |
| print("\n FEATURE IMPORTANCE RANKING (gain)") | |
| print(" " + "-" * 36) | |
| for _, row in importance_df.iterrows(): | |
| bar = "β" * int(row["Importance"] * 100) | |
| print(f" {row['Feature']:>3} {row['Importance']:.4f} {bar}") | |
| print() | |
| return {"rmse": rmse, "mae": mae, "r2": r2, "importance": importance_df} | |
| # ============================================================================= | |
| # STEP 5 β SHAP INTERPRETABILITY | |
| # ============================================================================= | |
| def run_shap_analysis( | |
| model: xgb.XGBRegressor, | |
| X_test: np.ndarray, | |
| feature_names: list[str], | |
| output_dir: str = ".", | |
| ) -> None: | |
| """ | |
| Generate SHAP summary plot and print mean |SHAP| feature ranking. | |
| Verifies that A, C, P dominate the explanation. | |
| """ | |
| print("ββ Running SHAP analysis β¦") | |
| explainer = shap.TreeExplainer(model) | |
| shap_values = explainer.shap_values(X_test) | |
| # ββ Summary bar plot ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| plt.figure(figsize=(10, 6)) | |
| shap.summary_plot( | |
| shap_values, | |
| X_test, | |
| feature_names=feature_names, | |
| plot_type="bar", | |
| show=False, | |
| ) | |
| plt.title("SHAP Feature Importance β Mean |SHAP value|", fontsize=14, fontweight="bold") | |
| plt.tight_layout() | |
| bar_path = os.path.join(output_dir, "shap_bar_plot.png") | |
| plt.savefig(bar_path, dpi=150, bbox_inches="tight") | |
| plt.close() | |
| print(f" Saved: {bar_path}") | |
| # ββ Beeswarm / dot summary plot βββββββββββββββββββββββββββββββββββββββ | |
| plt.figure(figsize=(10, 6)) | |
| shap.summary_plot( | |
| shap_values, | |
| X_test, | |
| feature_names=feature_names, | |
| show=False, | |
| ) | |
| plt.title("SHAP Summary Plot β Impact on Severity Score", fontsize=14, fontweight="bold") | |
| plt.tight_layout() | |
| dot_path = os.path.join(output_dir, "shap_dot_plot.png") | |
| plt.savefig(dot_path, dpi=150, bbox_inches="tight") | |
| plt.close() | |
| print(f" Saved: {dot_path}\n") | |
| # ββ Mean |SHAP| ranking βββββββββββββββββββββββββββββββββββββββββββββββ | |
| mean_shap = np.abs(shap_values).mean(axis=0) | |
| shap_df = ( | |
| pd.DataFrame({"Feature": feature_names, "Mean|SHAP|": mean_shap}) | |
| .sort_values("Mean|SHAP|", ascending=False) | |
| .reset_index(drop=True) | |
| ) | |
| print(" SHAP MEAN |VALUE| RANKING") | |
| print(" " + "-" * 36) | |
| top3 = shap_df["Feature"].head(3).tolist() | |
| for rank, (_, row) in enumerate(shap_df.iterrows(), start=1): | |
| tag = " β dominant" if row["Feature"] in ["A", "C", "P"] else "" | |
| print(f" #{rank:<2} {row['Feature']:>3} {row['Mean|SHAP|']:.5f}{tag}") | |
| print() | |
| # Verify dominance of A, C, P | |
| expected_dominant = {"A", "C", "P"} | |
| actual_top3 = set(top3) | |
| overlap = expected_dominant & actual_top3 | |
| if len(overlap) >= 2: | |
| print(f" β Dominance check PASSED β {overlap} appear in top-3 SHAP features.") | |
| else: | |
| print(f" β οΈ Dominance check NOTE β top-3 are {top3}; " | |
| "model learned different patterns from the data.") | |
| print() | |
| # ============================================================================= | |
| # STEP 6 β SAVE MODEL & ARTEFACTS | |
| # ============================================================================= | |
| def save_artefacts( | |
| model: xgb.XGBRegressor, | |
| scaler: MinMaxScaler | None, | |
| feature_names: list[str], | |
| output_dir: str = ".", | |
| ) -> None: | |
| """ | |
| Export: | |
| severity_model.json β XGBoost model (native JSON format) | |
| feature_scaler.pkl β fitted MinMaxScaler (or None sentinel) | |
| feature_list.json β ordered list of feature names | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| # XGBoost native JSON | |
| model_path = os.path.join(output_dir, "severity_model.json") | |
| model.save_model(model_path) | |
| print(f"ββ Model saved: {model_path}") | |
| # Scaler | |
| scaler_path = os.path.join(output_dir, "feature_scaler.pkl") | |
| joblib.dump(scaler, scaler_path) | |
| print(f"ββ Scaler saved: {scaler_path}") | |
| # Feature list | |
| feature_path = os.path.join(output_dir, "feature_list.json") | |
| with open(feature_path, "w") as fp: | |
| json.dump(feature_names, fp, indent=2) | |
| print(f"ββ Feature list saved: {feature_path}\n") | |
| # ============================================================================= | |
| # STEP 7 β INFERENCE FUNCTION | |
| # ============================================================================= | |
| def load_inference_artefacts( | |
| model_path: str = "severity_model.json", | |
| scaler_path: str = "feature_scaler.pkl", | |
| feature_list_path: str = "feature_list.json", | |
| ) -> tuple[xgb.XGBRegressor, MinMaxScaler | None, list[str]]: | |
| """Load saved model, scaler, and feature list for inference.""" | |
| model = xgb.XGBRegressor() | |
| model.load_model(model_path) | |
| scaler = joblib.load(scaler_path) | |
| with open(feature_list_path) as fp: | |
| feature_names = json.load(fp) | |
| return model, scaler, feature_names | |
| def _severity_label(score: float) -> str: | |
| """ | |
| Assign a human-readable label to a numeric severity score. | |
| Thresholds (domain-tunable): | |
| Low : score < 0.33 | |
| Medium : 0.33 β€ score < 0.66 | |
| High : score β₯ 0.66 | |
| """ | |
| if score < 0.33: | |
| return "Low" | |
| elif score < 0.66: | |
| return "Medium" | |
| else: | |
| return "High" | |
| def predict_severity( | |
| features_dict: dict, | |
| model: xgb.XGBRegressor, | |
| scaler: MinMaxScaler | None, | |
| feature_names: list[str], | |
| ) -> dict: | |
| """ | |
| Predict severity for a single pothole observation. | |
| Parameters | |
| ---------- | |
| features_dict : dict | |
| Keys must match feature_names; values are raw (pre-scaling) floats. | |
| model : trained XGBRegressor | |
| scaler : fitted MinMaxScaler (or None if features are already scaled) | |
| feature_names : ordered list of feature column names | |
| Returns | |
| ------- | |
| dict with: | |
| "score" : float β predicted severity in [0, 1] | |
| "label" : str β "Low" | "Medium" | "High" | |
| """ | |
| # Validate input keys | |
| missing = set(feature_names) - set(features_dict.keys()) | |
| if missing: | |
| raise ValueError(f"Missing features in input dict: {missing}") | |
| # Build ordered feature vector | |
| row = np.array([[features_dict[f] for f in feature_names]], dtype=np.float32) | |
| # Apply scaler if provided | |
| if scaler is not None: | |
| row = scaler.transform(row) | |
| # Predict and clamp | |
| raw_score = float(model.predict(row)[0]) | |
| score = float(np.clip(raw_score, 0.0, 1.0)) | |
| label = _severity_label(score) | |
| return {"score": round(score, 4), "label": label} | |
| # ============================================================================= | |
| # MAIN PIPELINE RUNNER | |
| # ============================================================================= | |
| def main(output_dir: str = ".") -> None: | |
| print("\n" + "=" * 60) | |
| print(" CIVIC POTHOLE SEVERITY SCORING β FULL ML PIPELINE") | |
| print("=" * 60 + "\n") | |
| # ββ 1. Generate dataset ββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("ββ [1/7] Generating synthetic dataset β¦") | |
| df = generate_synthetic_dataset(n_samples=10_000) | |
| y = compute_severity(df) | |
| # Save the dataset for persistence/user inspection | |
| full_dataset = df.copy() | |
| full_dataset['severity'] = y | |
| dataset_path = os.path.join(output_dir, "synthetic_pothole_data.csv") | |
| full_dataset.to_csv(dataset_path, index=False) | |
| print(f" Dataset shape : {df.shape}") | |
| print(f" Dataset saved to: {dataset_path}") | |
| print(f" Severity stats: mean={y.mean():.4f}, std={y.std():.4f}, " | |
| f"min={y.min():.4f}, max={y.max():.4f}\n") | |
| # ββ 2. Feature scaling βββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("ββ [2/7] Scaling features (MinMaxScaler) β¦") | |
| # NOTE: Features are already in [0, 1] by construction, but we fit a | |
| # scaler so the inference function can handle raw un-normalised inputs | |
| # if the production system requires it. | |
| scaler = MinMaxScaler() | |
| X_scaled = scaler.fit_transform(df[FEATURE_COLS]) | |
| print(" Scaling complete.\n") | |
| # ββ 3. Train / test split ββββββββββββββββββββββββββββββββββββββββββββ | |
| print("ββ [3/7] Splitting data (80 % train / 20 % test) β¦") | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X_scaled, y, test_size=0.20, random_state=RANDOM_SEED | |
| ) | |
| print(f" Train samples : {len(X_train)}") | |
| print(f" Test samples : {len(X_test)}\n") | |
| # ββ 4. Train model βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("ββ [4/7] Training model β¦") | |
| model = build_and_train_model(X_train, y_train) | |
| # ββ 5. Evaluate ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("ββ [5/7] Evaluating model β¦\n") | |
| metrics = evaluate_model(model, X_test, y_test, FEATURE_COLS) | |
| # ββ 6. SHAP ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("ββ [6/7] SHAP interpretability β¦\n") | |
| run_shap_analysis(model, X_test, FEATURE_COLS, output_dir=output_dir) | |
| # ββ 7. Save artefacts ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("ββ [7/7] Saving model artefacts β¦") | |
| save_artefacts(model, scaler, FEATURE_COLS, output_dir=output_dir) | |
| # ββ Sample predictions βββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("=" * 60) | |
| print(" SAMPLE PREDICTIONS") | |
| print("=" * 60) | |
| sample_cases = [ | |
| { | |
| "name": "Minor Local-Street Pothole", | |
| "features": dict(zip(FEATURE_COLS, | |
| [0.05, 0.08, 0.30, 0.90, 0.05, 0.10, 0.40, 0.02, 0.03, 0.01])), | |
| }, | |
| { | |
| "name": "Moderate Main-Road Pothole", | |
| "features": dict(zip(FEATURE_COLS, | |
| [0.25, 0.20, 0.55, 0.75, 0.35, 0.40, 0.70, 0.15, 0.20, 0.10])), | |
| }, | |
| { | |
| "name": "Severe Highway near Hospital", | |
| "features": dict(zip(FEATURE_COLS, | |
| [0.70, 0.55, 0.85, 0.95, 0.80, 0.75, 1.00, 0.90, 0.65, 0.40])), | |
| }, | |
| { | |
| "name": "Recurring Pothole (high reopen)", | |
| "features": dict(zip(FEATURE_COLS, | |
| [0.40, 0.35, 0.60, 0.80, 0.50, 0.85, 0.70, 0.30, 0.75, 0.80])), | |
| }, | |
| ] | |
| for case in sample_cases: | |
| result = predict_severity( | |
| features_dict=case["features"], | |
| model=model, | |
| scaler=scaler, | |
| feature_names=FEATURE_COLS, | |
| ) | |
| print(f"\n π {case['name']}") | |
| feature_str = ", ".join(f"{k}={v}" for k, v in case["features"].items()) | |
| print(f" Features : {feature_str}") | |
| print(f" Score : {result['score']:.4f}") | |
| print(f" Label : {result['label']}") | |
| print("\n" + "=" * 60) | |
| print(" PIPELINE COMPLETE") | |
| print(f" Output artefacts β {os.path.abspath(output_dir)}") | |
| print("=" * 60 + "\n") | |
| if __name__ == "__main__": | |
| # Output directory for all saved files (same folder as this script) | |
| OUTPUT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| main(output_dir=OUTPUT_DIR) | |