from __future__ import annotations import argparse import json from pathlib import Path from typing import Any import joblib import numpy as np import pandas as pd from sklearn.metrics import ( accuracy_score, balanced_accuracy_score, classification_report, confusion_matrix, precision_score, recall_score, roc_auc_score, f1_score, ) from .compare_models import rank_models from .config import load_config from .data_discovery import CANONICAL_LABELS, ID_TO_LABEL, prepare_data from .paths import ensure_dir from .reporting import ( plot_calibration, plot_combined_roc, plot_confusion_matrix, plot_metric_bars, plot_precision_recall_curve_single, plot_roc_curve_single, plot_sample_grid, write_markdown_report, ) from .utils import elapsed_ms, get_logger, model_file_size_mb, save_json, timer LOGGER = get_logger(__name__) def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray, y_prob: np.ndarray) -> dict[str, float | None]: cm = confusion_matrix(y_true, y_pred, labels=[0, 1]) tn, fp, fn, tp = cm.ravel() specificity = tn / (tn + fp) if (tn + fp) else 0.0 sensitivity = tp / (tp + fn) if (tp + fn) else 0.0 try: roc_auc = roc_auc_score(y_true, y_prob) if len(np.unique(y_true)) == 2 else None except ValueError: roc_auc = None return { "accuracy": float(accuracy_score(y_true, y_pred)), "precision": float(precision_score(y_true, y_pred, zero_division=0)), "recall": float(recall_score(y_true, y_pred, zero_division=0)), "f1": float(f1_score(y_true, y_pred, zero_division=0)), "balanced_accuracy": float(balanced_accuracy_score(y_true, y_pred)), "roc_auc": None if roc_auc is None else float(roc_auc), "specificity": float(specificity), "sensitivity": float(sensitivity), } def prediction_frame( split_df: pd.DataFrame, y_pred: np.ndarray, y_prob: np.ndarray, model_name: str, split: str, ) -> pd.DataFrame: out = split_df[["filepath", "label", "label_id", "split"]].copy().reset_index(drop=True) out["model_name"] = model_name out["eval_split"] = split out["y_true"] = out["label_id"].astype(int) out["y_pred"] = y_pred.astype(int) out["prob_damaged"] = y_prob.astype(float) out["pred_label"] = out["y_pred"].map(ID_TO_LABEL) out["confidence"] = np.where(out["y_pred"] == 1, out["prob_damaged"], 1.0 - out["prob_damaged"]) out["is_correct"] = out["y_true"] == out["y_pred"] return out def save_prediction_outputs( pred_df: pd.DataFrame, metrics: dict[str, Any], config: dict[str, Any], model_name: str, split: str, ) -> None: output_dir = Path(config["paths"]["output_dir"]) pred_dir = ensure_dir(output_dir / "predictions") plots_dir = ensure_dir(output_dir / "plots") reports_dir = ensure_dir(output_dir / "reports") safe = model_name.replace("/", "_") pred_df.to_csv(pred_dir / f"{safe}_{split}_predictions.csv", index=False) y_true = pred_df["y_true"].to_numpy() y_pred = pred_df["y_pred"].to_numpy() y_prob = pred_df["prob_damaged"].to_numpy() cm = confusion_matrix(y_true, y_pred, labels=[0, 1]) plot_confusion_matrix( cm, plots_dir / f"confusion_matrix_{safe}_{split}.png", f"{model_name} {split} Confusion Matrix", CANONICAL_LABELS, ) plot_roc_curve_single(y_true, y_prob, plots_dir / f"roc_{safe}_{split}.png", f"{model_name} {split} ROC") if config["evaluation"].get("save_precision_recall_curve", True): plot_precision_recall_curve_single( y_true, y_prob, plots_dir / f"precision_recall_{safe}_{split}.png", f"{model_name} {split} PR" ) if config["evaluation"].get("save_calibration_plot", False): plot_calibration(y_true, y_prob, plots_dir / f"calibration_{safe}_{split}.png", f"{model_name} {split}") report = classification_report( y_true, y_pred, labels=[0, 1], target_names=list(CANONICAL_LABELS), zero_division=0, output_dict=True, ) with (reports_dir / f"classification_report_{safe}_{split}.json").open("w", encoding="utf-8") as f: json.dump(report, f, indent=2) with (reports_dir / f"classification_report_{safe}_{split}.txt").open("w", encoding="utf-8") as f: f.write( classification_report( y_true, y_pred, labels=[0, 1], target_names=list(CANONICAL_LABELS), zero_division=0, ) ) save_json(metrics, reports_dir / f"metrics_{safe}_{split}.json") def evaluate_classical( model_path: Path, splits_df: pd.DataFrame, config: dict[str, Any], ) -> tuple[list[dict[str, Any]], list[pd.DataFrame]]: from .classical_features import extract_feature_matrix bundle = joblib.load(model_path) pipeline = bundle["pipeline"] metadata = bundle["metadata"] model_name = metadata["model_name"] feature_type = metadata["feature_type"] rows: list[dict[str, Any]] = [] pred_frames: list[pd.DataFrame] = [] for split in ["val", "test"]: split_df = splits_df[splits_df["split"] == split].reset_index(drop=True) if split_df.empty: continue start = timer() x, y_true, _ = extract_feature_matrix(split_df, feature_type, config, balance_train=False) y_prob = pipeline.predict_proba(x)[:, 1] y_pred = (y_prob >= float(config["evaluation"].get("threshold", 0.5))).astype(int) avg_ms = elapsed_ms(start, len(split_df)) pred_df = prediction_frame(split_df, y_pred, y_prob, model_name, split) metrics = compute_metrics(y_true, y_pred, y_prob) row = { "model_name": model_name, "model_type": "classical", "feature_type": feature_type, "split": split, "training_curves": "N/A", "model_path": str(model_path), "model_size_mb": model_file_size_mb(model_path), "avg_inference_ms": avg_ms, **metrics, } save_prediction_outputs(pred_df, row, config, model_name, split) rows.append(row) pred_frames.append(pred_df) return rows, pred_frames def evaluate_deep_learning( model_path: Path, splits_df: pd.DataFrame, config: dict[str, Any], ) -> tuple[list[dict[str, Any]], list[pd.DataFrame]]: import torch from torch.utils.data import DataLoader from .augmentations import build_eval_transform from .dataset import EggImageDataset from .dl_models import create_model, load_torch_checkpoint checkpoint = load_torch_checkpoint(model_path, map_location="cpu") model_key = checkpoint["model_key"] model_name = checkpoint.get("model_name", model_key) model = create_model(model_key, checkpoint.get("config", config), pretrained=False) model.load_state_dict(checkpoint["state_dict"]) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) model.eval() rows: list[dict[str, Any]] = [] pred_frames: list[pd.DataFrame] = [] batch_size = int(config["training"].get("batch_size", 16)) num_workers = int(config["training"].get("num_workers", 0)) for split in ["val", "test"]: split_df = splits_df[splits_df["split"] == split].reset_index(drop=True) if split_df.empty: continue loader = DataLoader( EggImageDataset(split_df, transform=build_eval_transform(config)), batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=bool(config["training"].get("pin_memory", True) and device.type == "cuda"), ) all_prob: list[float] = [] all_pred: list[int] = [] all_true: list[int] = [] start = timer() with torch.no_grad(): for images, labels, _ in loader: images = images.to(device, non_blocking=True) logits = model(images) probs = torch.softmax(logits, dim=1)[:, 1].detach().cpu().numpy() pred = (probs >= float(config["evaluation"].get("threshold", 0.5))).astype(int) all_prob.extend(probs.astype(float).tolist()) all_pred.extend(pred.astype(int).tolist()) all_true.extend(labels.numpy().astype(int).tolist()) avg_ms = elapsed_ms(start, len(split_df)) y_true = np.asarray(all_true, dtype=int) y_pred = np.asarray(all_pred, dtype=int) y_prob = np.asarray(all_prob, dtype=float) pred_df = prediction_frame(split_df, y_pred, y_prob, model_name, split) metrics = compute_metrics(y_true, y_pred, y_prob) row = { "model_name": model_name, "model_type": "deep_learning", "model_key": model_key, "split": split, "training_curves": str(Path(config["paths"]["output_dir"]) / "histories" / f"{model_key}_history.csv"), "model_path": str(model_path), "model_size_mb": model_file_size_mb(model_path), "avg_inference_ms": avg_ms, **metrics, } save_prediction_outputs(pred_df, row, config, model_name, split) rows.append(row) pred_frames.append(pred_df) return rows, pred_frames def find_model_files(config: dict[str, Any]) -> list[Path]: model_dir = Path(config["paths"]["model_dir"]) files = sorted(model_dir.glob("*.joblib")) + sorted(model_dir.glob("*.pt")) return [path for path in files if path.is_file()] def evaluate_all(config: dict[str, Any]) -> pd.DataFrame: split_csv = Path(config["paths"]["split_csv"]) splits_df = pd.read_csv(split_csv) if split_csv.exists() else prepare_data(config) output_dir = ensure_dir(config["paths"]["output_dir"]) all_metrics: list[dict[str, Any]] = [] all_predictions: list[pd.DataFrame] = [] for model_path in find_model_files(config): try: LOGGER.info("Evaluating model %s", model_path) if model_path.suffix == ".joblib": rows, preds = evaluate_classical(model_path, splits_df, config) elif model_path.suffix == ".pt": rows, preds = evaluate_deep_learning(model_path, splits_df, config) else: continue all_metrics.extend(rows) all_predictions.extend(preds) except Exception as exc: LOGGER.exception("Skipping %s because evaluation failed: %s", model_path, exc) metrics_df = pd.DataFrame(all_metrics) if metrics_df.empty: raise RuntimeError("No trained models were evaluated. Train models before running evaluation.") metrics_df.to_csv(output_dir / "metrics_summary.csv", index=False) save_json(metrics_df.to_dict(orient="records"), output_dir / "metrics_summary.json") test_predictions = [(df["model_name"].iloc[0], df) for df in all_predictions if df["eval_split"].iloc[0] == "test"] if test_predictions: plot_combined_roc(test_predictions, output_dir / "plots" / "combined_roc_test.png") plot_metric_bars(metrics_df, output_dir / "plots" / "metrics_bar_comparison.png") misclassified = pd.concat( [df[(df["eval_split"] == "test") & (~df["is_correct"])] for df in all_predictions], ignore_index=True, ) if all_predictions else pd.DataFrame() misclassified.to_csv(output_dir / "misclassified_samples.csv", index=False) leaderboard = rank_models(metrics_df, config) if not leaderboard.empty: best_name = leaderboard.iloc[0]["model_name"] best_preds = [df for name, df in test_predictions if name == best_name] if best_preds: best_df = best_preds[0].sort_values("confidence", ascending=False) n = int(config["evaluation"].get("sample_grid_count", 12)) plot_sample_grid( best_df[best_df["is_correct"]], output_dir / "plots" / f"sample_predictions_correct_{best_name}.png", f"{best_name}: Correct Test Predictions", max_images=n, ) plot_sample_grid( best_df[~best_df["is_correct"]].sort_values("confidence", ascending=False), output_dir / "plots" / f"sample_predictions_misclassified_{best_name}.png", f"{best_name}: Misclassified Test Predictions", max_images=n, ) try: if config.get("explainability", {}).get("enabled", True) and not leaderboard.empty: from .explainability import save_gradcam_examples_for_best save_gradcam_examples_for_best(config, splits_df, leaderboard) except Exception as exc: LOGGER.warning("Explainability generation skipped: %s", exc) write_markdown_report( config, splits_df, metrics_df, leaderboard, misclassified, output_dir / "reports" / "model_report.md", ) LOGGER.info("Saved metrics summary and report under %s", output_dir) return metrics_df def main() -> None: parser = argparse.ArgumentParser(description="Evaluate all trained egg damage models.") parser.add_argument("--config", default="configs/default.yaml") args = parser.parse_args() config = load_config(args.config) evaluate_all(config) if __name__ == "__main__": main()