Spaces:
Running
Running
| """ | |
| Model Evaluation and Benchmarking Script. | |
| Implements diagnostics from the ML accuracy report: | |
| - Confusion matrix, precision, recall, F1, AUROC | |
| - Class balance analysis | |
| - Data leakage check (train/val/test path overlap) | |
| - RandomizedSearchCV hyperparameter search | |
| Usage: | |
| python scripts/evaluate_model.py | |
| python scripts/evaluate_model.py --hparam-search | |
| """ | |
| import csv | |
| import json | |
| import logging | |
| import argparse | |
| import numpy as np | |
| from pathlib import Path | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| ROOT = Path(__file__).parents[1] | |
| MANIFEST = ROOT / "data" / "manifest.csv" | |
| FEATURES = ROOT / "data" / "features.csv" | |
| RESULTS_OUT = ROOT / "data" / "reference" / "eval_results.json" | |
| def check_class_balance(manifest_path: Path) -> dict: | |
| counts = {} | |
| with open(manifest_path, newline="", encoding="utf-8") as f: | |
| for row in csv.DictReader(f): | |
| label = row.get("label", "") | |
| counts[label] = counts.get(label, 0) + 1 | |
| total = sum(counts.values()) | |
| balance_ratio = min(counts.values()) / max(counts.values(), 1) | |
| imbalanced = balance_ratio < 0.30 | |
| logger.info(f"Class balance: {counts} (ratio={balance_ratio:.3f})") | |
| if imbalanced: | |
| logger.warning( | |
| f"Class imbalance detected (ratio={balance_ratio:.3f}). " | |
| "Consider resampling or scale_pos_weight." | |
| ) | |
| return {"counts": counts, "total": total, | |
| "balance_ratio": round(balance_ratio, 4), "imbalanced": imbalanced} | |
| def check_data_leakage(manifest_path: Path) -> dict: | |
| split_paths: dict = {} | |
| with open(manifest_path, newline="", encoding="utf-8") as f: | |
| for row in csv.DictReader(f): | |
| split = row.get("split", "train") | |
| path = row.get("path", "") | |
| if split not in split_paths: | |
| split_paths[split] = set() | |
| split_paths[split].add(path) | |
| overlaps = {} | |
| splits = list(split_paths.keys()) | |
| for i in range(len(splits)): | |
| for j in range(i + 1, len(splits)): | |
| a, b = splits[i], splits[j] | |
| overlap = split_paths[a] & split_paths[b] | |
| if overlap: | |
| key = f"{a}_vs_{b}" | |
| overlaps[key] = len(overlap) | |
| logger.warning(f"Data leakage: {len(overlap)} duplicate paths between {a} and {b}") | |
| leakage = len(overlaps) > 0 | |
| if not leakage: | |
| logger.info("No path-based data leakage detected.") | |
| return {"split_sizes": {k: len(v) for k, v in split_paths.items()}, | |
| "leakage_detected": leakage, "overlapping_paths": overlaps} | |
| def evaluate_xgboost(features_path: Path, threshold: float = 0.5) -> dict: | |
| import pickle | |
| from sklearn.model_selection import StratifiedShuffleSplit | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, | |
| f1_score, roc_auc_score, confusion_matrix, classification_report, | |
| ) | |
| model_path = ROOT / "data" / "reference" / "ensemble_xgb.pkl" | |
| if not model_path.exists(): | |
| logger.warning("ensemble_xgb.pkl not found. Run scripts/train_ensemble.py first.") | |
| return {"error": "Model not found"} | |
| with open(model_path, "rb") as f: | |
| pkg = pickle.load(f) | |
| model = pkg["model"] | |
| feature_names = pkg["feature_names"] | |
| rows, labels = [], [] | |
| with open(features_path, newline="", encoding="utf-8") as f: | |
| for row in csv.DictReader(f): | |
| labels.append(int(row["label"])) | |
| rows.append([float(row.get(k, 0.5)) for k in feature_names]) | |
| X = np.array(rows) | |
| y = np.array(labels) | |
| sss = StratifiedShuffleSplit(n_splits=1, test_size=0.20, random_state=42) | |
| _, test_idx = next(sss.split(X, y)) | |
| X_test, y_test = X[test_idx], y[test_idx] | |
| y_pred = (model.predict_proba(X_test)[:, 1] >= threshold).astype(int) | |
| y_score = model.predict_proba(X_test)[:, 1] | |
| acc = float(accuracy_score(y_test, y_pred)) | |
| prec = float(precision_score(y_test, y_pred, zero_division=0)) | |
| rec = float(recall_score(y_test, y_pred, zero_division=0)) | |
| f1 = float(f1_score(y_test, y_pred, zero_division=0)) | |
| auroc = float(roc_auc_score(y_test, y_score)) | |
| cm = confusion_matrix(y_test, y_pred).tolist() | |
| alarms = [] | |
| if acc < 0.90: alarms.append(f"Accuracy {acc:.3f} below threshold 0.90") | |
| if prec < 0.85: alarms.append(f"Precision {prec:.3f} below threshold 0.85") | |
| if rec < 0.80: alarms.append(f"Recall {rec:.3f} below threshold 0.80") | |
| if f1 < 0.83: alarms.append(f"F1 {f1:.3f} below threshold 0.83") | |
| if auroc < 0.92: alarms.append(f"AUROC {auroc:.3f} below threshold 0.92") | |
| logger.info(f"Evaluation on {len(y_test)} held-out samples:") | |
| logger.info(f" Accuracy: {acc:.4f}") | |
| logger.info(f" Precision: {prec:.4f}") | |
| logger.info(f" Recall: {rec:.4f}") | |
| logger.info(f" F1: {f1:.4f}") | |
| logger.info(f" AUROC: {auroc:.4f}") | |
| for alarm in alarms: | |
| logger.warning(f"ALARM: {alarm}") | |
| if not alarms: | |
| logger.info("All metrics above alarm thresholds.") | |
| return { | |
| "n_test_samples": len(y_test), "threshold": threshold, | |
| "accuracy": round(acc, 4), "precision_ai": round(prec, 4), | |
| "recall_ai": round(rec, 4), "f1_ai": round(f1, 4), | |
| "auroc": round(auroc, 4), "confusion_matrix": cm, | |
| "classification_report": classification_report( | |
| y_test, y_pred, target_names=["real", "ai"], output_dict=True), | |
| "alarms": alarms, | |
| "targets": {"accuracy": 0.95, "precision_ai": 0.90, | |
| "recall_ai": 0.88, "f1_ai": 0.89, "auroc": 0.97}, | |
| "thresholds": {"accuracy": 0.90, "precision_ai": 0.85, | |
| "recall_ai": 0.80, "f1_ai": 0.83, "auroc": 0.92}, | |
| } | |
| def hyperparameter_search(features_path: Path, n_iter: int = 20) -> dict: | |
| import xgboost as xgb | |
| from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold | |
| rows, labels, feature_names = [], [], None | |
| with open(features_path, newline="", encoding="utf-8") as f: | |
| for row in csv.DictReader(f): | |
| if feature_names is None: | |
| feature_names = [k for k in row if k not in ("label", "path")] | |
| labels.append(int(row["label"])) | |
| rows.append([float(row[k]) for k in feature_names]) | |
| X = np.array(rows) | |
| y = np.array(labels) | |
| param_grid = { | |
| "learning_rate": [0.01, 0.05, 0.10, 0.15, 0.20], | |
| "max_depth": [3, 4, 5, 6], | |
| "n_estimators": [100, 200, 300, 400], | |
| "subsample": [0.6, 0.7, 0.8, 0.9], | |
| "colsample_bytree": [0.6, 0.7, 0.8, 0.9], | |
| "min_child_weight": [1, 3, 5], | |
| "gamma": [0, 0.1, 0.2, 0.5], | |
| } | |
| model = xgb.XGBClassifier(eval_metric="logloss", random_state=42) | |
| cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) | |
| logger.info(f"Running RandomizedSearchCV with {n_iter} iterations") | |
| search = RandomizedSearchCV(model, param_grid, n_iter=n_iter, | |
| cv=cv, scoring="roc_auc", | |
| random_state=42, n_jobs=-1, verbose=1) | |
| search.fit(X, y) | |
| logger.info(f"Best params: {search.best_params_}") | |
| logger.info(f"Best CV AUC: {search.best_score_:.4f}") | |
| return {"best_params": search.best_params_, | |
| "best_cv_auc": round(search.best_score_, 4), "n_iterations": n_iter} | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--threshold", type=float, default=0.5) | |
| parser.add_argument("--hparam-search", action="store_true") | |
| parser.add_argument("--n-iter", type=int, default=20) | |
| args = parser.parse_args() | |
| results = {} | |
| if MANIFEST.exists(): | |
| logger.info("=== CLASS BALANCE CHECK ===") | |
| results["class_balance"] = check_class_balance(MANIFEST) | |
| logger.info("=== DATA LEAKAGE CHECK ===") | |
| results["leakage_check"] = check_data_leakage(MANIFEST) | |
| else: | |
| logger.warning("manifest.csv not found") | |
| if FEATURES.exists(): | |
| logger.info("=== MODEL EVALUATION ===") | |
| results["evaluation"] = evaluate_xgboost(FEATURES, threshold=args.threshold) | |
| if args.hparam_search: | |
| logger.info("=== HYPERPARAMETER SEARCH ===") | |
| results["hparam_search"] = hyperparameter_search(FEATURES, n_iter=args.n_iter) | |
| else: | |
| logger.warning("features.csv not found — run scripts/extract_features.py first") | |
| RESULTS_OUT.parent.mkdir(parents=True, exist_ok=True) | |
| with open(RESULTS_OUT, "w") as f: | |
| json.dump(results, f, indent=2, default=str) | |
| logger.info(f"Saved to {RESULTS_OUT}") | |
| if __name__ == "__main__": | |
| main() | |