Severity_Score / severity_model_pipeline.py
cloud450's picture
Upload folder using huggingface_hub
b85e25b verified
"""
=============================================================================
CIVIC ISSUE DETECTION β€” POTHOLE SEVERITY SCORING PIPELINE
=============================================================================
Produces a trained XGBoost regression model that predicts severity S ∈ [0,1]
from 10 engineered features derived from a civic-issue detection system.
Pipeline Stages
---------------
1. Synthetic dataset generation (10 000 samples, realistic distributions)
2. Ground-truth severity formula (weighted sum + infrastructure boost + noise)
3. Model training (XGBoost Regressor, 80/20 split)
4. Evaluation (RMSE, MAE, RΒ²)
5. Interpretability (SHAP summary + top-feature analysis)
6. Artefact export (severity_model.json, scaler, feature list)
7. Inference function (predict_severity β†’ score + label)
=============================================================================
"""
# ---------------------------------------------------------------------------
# Imports
# ---------------------------------------------------------------------------
import json
import os
import warnings
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import shap
import xgboost as xgb
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import joblib
warnings.filterwarnings("ignore")
# Ensure reproducible results
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
# =============================================================================
# STEP 1 β€” GENERATE SYNTHETIC DATASET
# =============================================================================
def generate_synthetic_dataset(n_samples: int = 10_000, seed: int = RANDOM_SEED) -> pd.DataFrame:
"""
Generate a synthetic dataset with realistic feature distributions for
pothole severity modelling.
Feature definitions (all in [0, 1]):
A β€” defect area ratio
D β€” defect density
C β€” centrality (closeness to road centre)
Q β€” detection confidence
M β€” multi-user confirmation score
T β€” temporal persistence
R β€” traffic importance (road hierarchy)
P β€” proximity to critical infrastructure
F β€” recurrence frequency
X β€” resolution failure score
"""
rng = np.random.default_rng(seed)
n = n_samples
# A: skewed small (most potholes are small) β€” Beta(2, 8)
A = rng.beta(2, 8, n)
# D: low-to-moderate, sparse β€” Beta(1.5, 6)
D = rng.beta(1.5, 6, n)
# C: uniform (pothole can be anywhere laterally) β€” Uniform(0, 1)
C = rng.uniform(0, 1, n)
# Q: high-biased (confident detections) β€” Beta(8, 2)
Q = rng.beta(8, 2, n)
# M: sparse confirmations β€” exponential-ish via Beta(1.2, 8)
M = rng.beta(1.2, 8, n)
# T: right-skewed (few very old issues) β€” Beta(1.5, 5)
T = rng.beta(1.5, 5, n)
# R: categorical road hierarchy mapped to numeric
road_types = rng.choice(
[1.0, 0.7, 0.4], # highway, main road, local street
size=n,
p=[0.10, 0.35, 0.55], # realistic road-type proportions
)
R = road_types.astype(float)
# P: mostly low, few high β€” Beta(1, 10)
P = rng.beta(1, 10, n)
# F: low recurrence freq β€” Beta(1.2, 9)
F = rng.beta(1.2, 9, n)
# X: very low resolution failure rate β€” Beta(1, 15)
X = rng.beta(1, 15, n)
df = pd.DataFrame({
"A": A,
"D": D,
"C": C,
"Q": Q,
"M": M,
"T": T,
"R": R,
"P": P,
"F": F,
"X": X,
})
return df
# =============================================================================
# STEP 2 β€” GROUND-TRUTH SEVERITY FORMULA
# =============================================================================
def compute_severity(df: pd.DataFrame, noise_std: float = 0.03, seed: int = RANDOM_SEED) -> pd.Series:
"""
Compute ground-truth severity scores.
Formula
-------
S_base = 0.28A + 0.10D + 0.14C + 0.04Q +
0.08M + 0.07T + 0.09R + 0.10P +
0.06F + 0.04X
K = 1 + 0.5 * P (infrastructure proximity multiplier)
S = clamp(S_base * K + noise, 0, 1)
"""
rng = np.random.default_rng(seed)
# Weighted severity base
S_base = (
0.28 * df["A"] +
0.10 * df["D"] +
0.14 * df["C"] +
0.04 * df["Q"] +
0.08 * df["M"] +
0.07 * df["T"] +
0.09 * df["R"] +
0.10 * df["P"] +
0.06 * df["F"] +
0.04 * df["X"]
)
# Critical-infrastructure proximity multiplier
K = 1 + 0.5 * df["P"]
# Boosted severity
S_raw = S_base * K
# Add Gaussian noise, clamp to [0, 1]
noise = rng.normal(loc=0, scale=noise_std, size=len(df))
S = np.clip(S_raw + noise, 0, 1)
return pd.Series(S, name="severity", index=df.index)
# =============================================================================
# STEP 3 β€” TRAIN XGBOOST MODEL
# =============================================================================
FEATURE_COLS = ["A", "D", "C", "Q", "M", "T", "R", "P", "F", "X"]
def build_and_train_model(
X_train: np.ndarray,
y_train: np.ndarray,
seed: int = RANDOM_SEED,
) -> xgb.XGBRegressor:
"""
Instantiate and train an XGBoost Regressor on the training split.
Hyperparameters are fixed as specified; no tuning loop is performed here
(add GridSearchCV / Optuna wrapping for production hyper-opt).
"""
model = xgb.XGBRegressor(
objective="reg:squarederror",
n_estimators=200,
max_depth=5,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
random_state=seed,
verbosity=0,
n_jobs=-1,
)
print("── Training XGBoost Regressor …")
model.fit(X_train, y_train)
print(" Training complete.\n")
return model
# =============================================================================
# STEP 4 β€” EVALUATION
# =============================================================================
def evaluate_model(
model: xgb.XGBRegressor,
X_test: np.ndarray,
y_test: np.ndarray,
feature_names: list[str],
) -> dict:
"""
Compute RMSE, MAE, RΒ² and print feature importance ranking.
Returns a dict of metric values.
"""
y_pred = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print("=" * 50)
print(" MODEL EVALUATION METRICS")
print("=" * 50)
print(f" RMSE : {rmse:.6f}")
print(f" MAE : {mae:.6f}")
print(f" RΒ² : {r2:.6f}")
print("=" * 50)
# Feature importance (gain-based)
importances = model.feature_importances_
importance_df = (
pd.DataFrame({"Feature": feature_names, "Importance": importances})
.sort_values("Importance", ascending=False)
.reset_index(drop=True)
)
print("\n FEATURE IMPORTANCE RANKING (gain)")
print(" " + "-" * 36)
for _, row in importance_df.iterrows():
bar = "β–ˆ" * int(row["Importance"] * 100)
print(f" {row['Feature']:>3} {row['Importance']:.4f} {bar}")
print()
return {"rmse": rmse, "mae": mae, "r2": r2, "importance": importance_df}
# =============================================================================
# STEP 5 β€” SHAP INTERPRETABILITY
# =============================================================================
def run_shap_analysis(
model: xgb.XGBRegressor,
X_test: np.ndarray,
feature_names: list[str],
output_dir: str = ".",
) -> None:
"""
Generate SHAP summary plot and print mean |SHAP| feature ranking.
Verifies that A, C, P dominate the explanation.
"""
print("── Running SHAP analysis …")
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# ── Summary bar plot ──────────────────────────────────────────────────
plt.figure(figsize=(10, 6))
shap.summary_plot(
shap_values,
X_test,
feature_names=feature_names,
plot_type="bar",
show=False,
)
plt.title("SHAP Feature Importance β€” Mean |SHAP value|", fontsize=14, fontweight="bold")
plt.tight_layout()
bar_path = os.path.join(output_dir, "shap_bar_plot.png")
plt.savefig(bar_path, dpi=150, bbox_inches="tight")
plt.close()
print(f" Saved: {bar_path}")
# ── Beeswarm / dot summary plot ───────────────────────────────────────
plt.figure(figsize=(10, 6))
shap.summary_plot(
shap_values,
X_test,
feature_names=feature_names,
show=False,
)
plt.title("SHAP Summary Plot β€” Impact on Severity Score", fontsize=14, fontweight="bold")
plt.tight_layout()
dot_path = os.path.join(output_dir, "shap_dot_plot.png")
plt.savefig(dot_path, dpi=150, bbox_inches="tight")
plt.close()
print(f" Saved: {dot_path}\n")
# ── Mean |SHAP| ranking ───────────────────────────────────────────────
mean_shap = np.abs(shap_values).mean(axis=0)
shap_df = (
pd.DataFrame({"Feature": feature_names, "Mean|SHAP|": mean_shap})
.sort_values("Mean|SHAP|", ascending=False)
.reset_index(drop=True)
)
print(" SHAP MEAN |VALUE| RANKING")
print(" " + "-" * 36)
top3 = shap_df["Feature"].head(3).tolist()
for rank, (_, row) in enumerate(shap_df.iterrows(), start=1):
tag = " β—€ dominant" if row["Feature"] in ["A", "C", "P"] else ""
print(f" #{rank:<2} {row['Feature']:>3} {row['Mean|SHAP|']:.5f}{tag}")
print()
# Verify dominance of A, C, P
expected_dominant = {"A", "C", "P"}
actual_top3 = set(top3)
overlap = expected_dominant & actual_top3
if len(overlap) >= 2:
print(f" βœ… Dominance check PASSED β€” {overlap} appear in top-3 SHAP features.")
else:
print(f" ⚠️ Dominance check NOTE β€” top-3 are {top3}; "
"model learned different patterns from the data.")
print()
# =============================================================================
# STEP 6 β€” SAVE MODEL & ARTEFACTS
# =============================================================================
def save_artefacts(
model: xgb.XGBRegressor,
scaler: MinMaxScaler | None,
feature_names: list[str],
output_dir: str = ".",
) -> None:
"""
Export:
severity_model.json β€” XGBoost model (native JSON format)
feature_scaler.pkl β€” fitted MinMaxScaler (or None sentinel)
feature_list.json β€” ordered list of feature names
"""
os.makedirs(output_dir, exist_ok=True)
# XGBoost native JSON
model_path = os.path.join(output_dir, "severity_model.json")
model.save_model(model_path)
print(f"── Model saved: {model_path}")
# Scaler
scaler_path = os.path.join(output_dir, "feature_scaler.pkl")
joblib.dump(scaler, scaler_path)
print(f"── Scaler saved: {scaler_path}")
# Feature list
feature_path = os.path.join(output_dir, "feature_list.json")
with open(feature_path, "w") as fp:
json.dump(feature_names, fp, indent=2)
print(f"── Feature list saved: {feature_path}\n")
# =============================================================================
# STEP 7 β€” INFERENCE FUNCTION
# =============================================================================
def load_inference_artefacts(
model_path: str = "severity_model.json",
scaler_path: str = "feature_scaler.pkl",
feature_list_path: str = "feature_list.json",
) -> tuple[xgb.XGBRegressor, MinMaxScaler | None, list[str]]:
"""Load saved model, scaler, and feature list for inference."""
model = xgb.XGBRegressor()
model.load_model(model_path)
scaler = joblib.load(scaler_path)
with open(feature_list_path) as fp:
feature_names = json.load(fp)
return model, scaler, feature_names
def _severity_label(score: float) -> str:
"""
Assign a human-readable label to a numeric severity score.
Thresholds (domain-tunable):
Low : score < 0.33
Medium : 0.33 ≀ score < 0.66
High : score β‰₯ 0.66
"""
if score < 0.33:
return "Low"
elif score < 0.66:
return "Medium"
else:
return "High"
def predict_severity(
features_dict: dict,
model: xgb.XGBRegressor,
scaler: MinMaxScaler | None,
feature_names: list[str],
) -> dict:
"""
Predict severity for a single pothole observation.
Parameters
----------
features_dict : dict
Keys must match feature_names; values are raw (pre-scaling) floats.
model : trained XGBRegressor
scaler : fitted MinMaxScaler (or None if features are already scaled)
feature_names : ordered list of feature column names
Returns
-------
dict with:
"score" : float β€” predicted severity in [0, 1]
"label" : str β€” "Low" | "Medium" | "High"
"""
# Validate input keys
missing = set(feature_names) - set(features_dict.keys())
if missing:
raise ValueError(f"Missing features in input dict: {missing}")
# Build ordered feature vector
row = np.array([[features_dict[f] for f in feature_names]], dtype=np.float32)
# Apply scaler if provided
if scaler is not None:
row = scaler.transform(row)
# Predict and clamp
raw_score = float(model.predict(row)[0])
score = float(np.clip(raw_score, 0.0, 1.0))
label = _severity_label(score)
return {"score": round(score, 4), "label": label}
# =============================================================================
# MAIN PIPELINE RUNNER
# =============================================================================
def main(output_dir: str = ".") -> None:
print("\n" + "=" * 60)
print(" CIVIC POTHOLE SEVERITY SCORING β€” FULL ML PIPELINE")
print("=" * 60 + "\n")
# ── 1. Generate dataset ──────────────────────────────────────────────
print("── [1/7] Generating synthetic dataset …")
df = generate_synthetic_dataset(n_samples=10_000)
y = compute_severity(df)
# Save the dataset for persistence/user inspection
full_dataset = df.copy()
full_dataset['severity'] = y
dataset_path = os.path.join(output_dir, "synthetic_pothole_data.csv")
full_dataset.to_csv(dataset_path, index=False)
print(f" Dataset shape : {df.shape}")
print(f" Dataset saved to: {dataset_path}")
print(f" Severity stats: mean={y.mean():.4f}, std={y.std():.4f}, "
f"min={y.min():.4f}, max={y.max():.4f}\n")
# ── 2. Feature scaling ───────────────────────────────────────────────
print("── [2/7] Scaling features (MinMaxScaler) …")
# NOTE: Features are already in [0, 1] by construction, but we fit a
# scaler so the inference function can handle raw un-normalised inputs
# if the production system requires it.
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(df[FEATURE_COLS])
print(" Scaling complete.\n")
# ── 3. Train / test split ────────────────────────────────────────────
print("── [3/7] Splitting data (80 % train / 20 % test) …")
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.20, random_state=RANDOM_SEED
)
print(f" Train samples : {len(X_train)}")
print(f" Test samples : {len(X_test)}\n")
# ── 4. Train model ───────────────────────────────────────────────────
print("── [4/7] Training model …")
model = build_and_train_model(X_train, y_train)
# ── 5. Evaluate ──────────────────────────────────────────────────────
print("── [5/7] Evaluating model …\n")
metrics = evaluate_model(model, X_test, y_test, FEATURE_COLS)
# ── 6. SHAP ──────────────────────────────────────────────────────────
print("── [6/7] SHAP interpretability …\n")
run_shap_analysis(model, X_test, FEATURE_COLS, output_dir=output_dir)
# ── 7. Save artefacts ────────────────────────────────────────────────
print("── [7/7] Saving model artefacts …")
save_artefacts(model, scaler, FEATURE_COLS, output_dir=output_dir)
# ── Sample predictions ───────────────────────────────────────────────
print("=" * 60)
print(" SAMPLE PREDICTIONS")
print("=" * 60)
sample_cases = [
{
"name": "Minor Local-Street Pothole",
"features": dict(zip(FEATURE_COLS,
[0.05, 0.08, 0.30, 0.90, 0.05, 0.10, 0.40, 0.02, 0.03, 0.01])),
},
{
"name": "Moderate Main-Road Pothole",
"features": dict(zip(FEATURE_COLS,
[0.25, 0.20, 0.55, 0.75, 0.35, 0.40, 0.70, 0.15, 0.20, 0.10])),
},
{
"name": "Severe Highway near Hospital",
"features": dict(zip(FEATURE_COLS,
[0.70, 0.55, 0.85, 0.95, 0.80, 0.75, 1.00, 0.90, 0.65, 0.40])),
},
{
"name": "Recurring Pothole (high reopen)",
"features": dict(zip(FEATURE_COLS,
[0.40, 0.35, 0.60, 0.80, 0.50, 0.85, 0.70, 0.30, 0.75, 0.80])),
},
]
for case in sample_cases:
result = predict_severity(
features_dict=case["features"],
model=model,
scaler=scaler,
feature_names=FEATURE_COLS,
)
print(f"\n πŸ“ {case['name']}")
feature_str = ", ".join(f"{k}={v}" for k, v in case["features"].items())
print(f" Features : {feature_str}")
print(f" Score : {result['score']:.4f}")
print(f" Label : {result['label']}")
print("\n" + "=" * 60)
print(" PIPELINE COMPLETE")
print(f" Output artefacts β†’ {os.path.abspath(output_dir)}")
print("=" * 60 + "\n")
if __name__ == "__main__":
# Output directory for all saved files (same folder as this script)
OUTPUT_DIR = os.path.dirname(os.path.abspath(__file__))
main(output_dir=OUTPUT_DIR)