verifile-x-api / scripts /evaluate_model.py
abinazebinoy's picture
feat(ml): evaluation pipeline and improved XGBoost training from diagnostic report
da8a50a
Raw
History Blame Contribute Delete
8.82 kB
"""
Model Evaluation and Benchmarking Script.
Implements diagnostics from the ML accuracy report:
- Confusion matrix, precision, recall, F1, AUROC
- Class balance analysis
- Data leakage check (train/val/test path overlap)
- RandomizedSearchCV hyperparameter search
Usage:
python scripts/evaluate_model.py
python scripts/evaluate_model.py --hparam-search
"""
import csv
import json
import logging
import argparse
import numpy as np
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
ROOT = Path(__file__).parents[1]
MANIFEST = ROOT / "data" / "manifest.csv"
FEATURES = ROOT / "data" / "features.csv"
RESULTS_OUT = ROOT / "data" / "reference" / "eval_results.json"
def check_class_balance(manifest_path: Path) -> dict:
counts = {}
with open(manifest_path, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
label = row.get("label", "")
counts[label] = counts.get(label, 0) + 1
total = sum(counts.values())
balance_ratio = min(counts.values()) / max(counts.values(), 1)
imbalanced = balance_ratio < 0.30
logger.info(f"Class balance: {counts} (ratio={balance_ratio:.3f})")
if imbalanced:
logger.warning(
f"Class imbalance detected (ratio={balance_ratio:.3f}). "
"Consider resampling or scale_pos_weight."
)
return {"counts": counts, "total": total,
"balance_ratio": round(balance_ratio, 4), "imbalanced": imbalanced}
def check_data_leakage(manifest_path: Path) -> dict:
split_paths: dict = {}
with open(manifest_path, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
split = row.get("split", "train")
path = row.get("path", "")
if split not in split_paths:
split_paths[split] = set()
split_paths[split].add(path)
overlaps = {}
splits = list(split_paths.keys())
for i in range(len(splits)):
for j in range(i + 1, len(splits)):
a, b = splits[i], splits[j]
overlap = split_paths[a] & split_paths[b]
if overlap:
key = f"{a}_vs_{b}"
overlaps[key] = len(overlap)
logger.warning(f"Data leakage: {len(overlap)} duplicate paths between {a} and {b}")
leakage = len(overlaps) > 0
if not leakage:
logger.info("No path-based data leakage detected.")
return {"split_sizes": {k: len(v) for k, v in split_paths.items()},
"leakage_detected": leakage, "overlapping_paths": overlaps}
def evaluate_xgboost(features_path: Path, threshold: float = 0.5) -> dict:
import pickle
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix, classification_report,
)
model_path = ROOT / "data" / "reference" / "ensemble_xgb.pkl"
if not model_path.exists():
logger.warning("ensemble_xgb.pkl not found. Run scripts/train_ensemble.py first.")
return {"error": "Model not found"}
with open(model_path, "rb") as f:
pkg = pickle.load(f)
model = pkg["model"]
feature_names = pkg["feature_names"]
rows, labels = [], []
with open(features_path, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
labels.append(int(row["label"]))
rows.append([float(row.get(k, 0.5)) for k in feature_names])
X = np.array(rows)
y = np.array(labels)
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.20, random_state=42)
_, test_idx = next(sss.split(X, y))
X_test, y_test = X[test_idx], y[test_idx]
y_pred = (model.predict_proba(X_test)[:, 1] >= threshold).astype(int)
y_score = model.predict_proba(X_test)[:, 1]
acc = float(accuracy_score(y_test, y_pred))
prec = float(precision_score(y_test, y_pred, zero_division=0))
rec = float(recall_score(y_test, y_pred, zero_division=0))
f1 = float(f1_score(y_test, y_pred, zero_division=0))
auroc = float(roc_auc_score(y_test, y_score))
cm = confusion_matrix(y_test, y_pred).tolist()
alarms = []
if acc < 0.90: alarms.append(f"Accuracy {acc:.3f} below threshold 0.90")
if prec < 0.85: alarms.append(f"Precision {prec:.3f} below threshold 0.85")
if rec < 0.80: alarms.append(f"Recall {rec:.3f} below threshold 0.80")
if f1 < 0.83: alarms.append(f"F1 {f1:.3f} below threshold 0.83")
if auroc < 0.92: alarms.append(f"AUROC {auroc:.3f} below threshold 0.92")
logger.info(f"Evaluation on {len(y_test)} held-out samples:")
logger.info(f" Accuracy: {acc:.4f}")
logger.info(f" Precision: {prec:.4f}")
logger.info(f" Recall: {rec:.4f}")
logger.info(f" F1: {f1:.4f}")
logger.info(f" AUROC: {auroc:.4f}")
for alarm in alarms:
logger.warning(f"ALARM: {alarm}")
if not alarms:
logger.info("All metrics above alarm thresholds.")
return {
"n_test_samples": len(y_test), "threshold": threshold,
"accuracy": round(acc, 4), "precision_ai": round(prec, 4),
"recall_ai": round(rec, 4), "f1_ai": round(f1, 4),
"auroc": round(auroc, 4), "confusion_matrix": cm,
"classification_report": classification_report(
y_test, y_pred, target_names=["real", "ai"], output_dict=True),
"alarms": alarms,
"targets": {"accuracy": 0.95, "precision_ai": 0.90,
"recall_ai": 0.88, "f1_ai": 0.89, "auroc": 0.97},
"thresholds": {"accuracy": 0.90, "precision_ai": 0.85,
"recall_ai": 0.80, "f1_ai": 0.83, "auroc": 0.92},
}
def hyperparameter_search(features_path: Path, n_iter: int = 20) -> dict:
import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold
rows, labels, feature_names = [], [], None
with open(features_path, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
if feature_names is None:
feature_names = [k for k in row if k not in ("label", "path")]
labels.append(int(row["label"]))
rows.append([float(row[k]) for k in feature_names])
X = np.array(rows)
y = np.array(labels)
param_grid = {
"learning_rate": [0.01, 0.05, 0.10, 0.15, 0.20],
"max_depth": [3, 4, 5, 6],
"n_estimators": [100, 200, 300, 400],
"subsample": [0.6, 0.7, 0.8, 0.9],
"colsample_bytree": [0.6, 0.7, 0.8, 0.9],
"min_child_weight": [1, 3, 5],
"gamma": [0, 0.1, 0.2, 0.5],
}
model = xgb.XGBClassifier(eval_metric="logloss", random_state=42)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
logger.info(f"Running RandomizedSearchCV with {n_iter} iterations")
search = RandomizedSearchCV(model, param_grid, n_iter=n_iter,
cv=cv, scoring="roc_auc",
random_state=42, n_jobs=-1, verbose=1)
search.fit(X, y)
logger.info(f"Best params: {search.best_params_}")
logger.info(f"Best CV AUC: {search.best_score_:.4f}")
return {"best_params": search.best_params_,
"best_cv_auc": round(search.best_score_, 4), "n_iterations": n_iter}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--threshold", type=float, default=0.5)
parser.add_argument("--hparam-search", action="store_true")
parser.add_argument("--n-iter", type=int, default=20)
args = parser.parse_args()
results = {}
if MANIFEST.exists():
logger.info("=== CLASS BALANCE CHECK ===")
results["class_balance"] = check_class_balance(MANIFEST)
logger.info("=== DATA LEAKAGE CHECK ===")
results["leakage_check"] = check_data_leakage(MANIFEST)
else:
logger.warning("manifest.csv not found")
if FEATURES.exists():
logger.info("=== MODEL EVALUATION ===")
results["evaluation"] = evaluate_xgboost(FEATURES, threshold=args.threshold)
if args.hparam_search:
logger.info("=== HYPERPARAMETER SEARCH ===")
results["hparam_search"] = hyperparameter_search(FEATURES, n_iter=args.n_iter)
else:
logger.warning("features.csv not found — run scripts/extract_features.py first")
RESULTS_OUT.parent.mkdir(parents=True, exist_ok=True)
with open(RESULTS_OUT, "w") as f:
json.dump(results, f, indent=2, default=str)
logger.info(f"Saved to {RESULTS_OUT}")
if __name__ == "__main__":
main()