""" Evaluate the current predictor on a separate local field-test dataset. Expected folder layout: data/local_test/ plastic/ paper/ organic/ metal/ glass/ unknown/ optional residu/ optional alias for unknown Usage: python scripts/evaluate_local.py --data_dir data/local_test --output_dir reports/local_eval """ import argparse import csv import json import os import sys from pathlib import Path PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) MPL_CONFIG_DIR = os.path.join(PROJECT_ROOT, ".cache", "matplotlib") os.makedirs(MPL_CONFIG_DIR, exist_ok=True) os.environ.setdefault("MPLCONFIGDIR", MPL_CONFIG_DIR) sys.path.insert(0, PROJECT_ROOT) import matplotlib.pyplot as plt import numpy as np from sklearn.metrics import classification_report, confusion_matrix from app.predictor import CLASS_NAMES, WastePredictor KNOWN_CLASSES = CLASS_NAMES UNKNOWN_ALIASES = {"unknown", "residu", "residue", "other", "lainnya"} EVAL_LABELS = KNOWN_CLASSES + ["unknown"] INPUT_EXTS = {".jpg", ".jpeg", ".png", ".webp"} AUTO_APPROVE_THRESHOLD = float(os.getenv("AUTO_APPROVE_THRESHOLD", "0.85")) AMBIGUITY_GAP_THRESHOLD = float(os.getenv("AMBIGUITY_GAP_THRESHOLD", "0.15")) UNKNOWN_ON_AMBIGUOUS = os.getenv("UNKNOWN_ON_AMBIGUOUS", "true").lower() == "true" def normalize_label(label: str) -> str: normalized = label.strip().lower() if normalized in UNKNOWN_ALIASES: return "unknown" return normalized def collect_images(data_dir: Path) -> list[tuple[Path, str]]: rows = [] for class_dir in sorted(data_dir.iterdir()): if not class_dir.is_dir(): continue label = normalize_label(class_dir.name) if label not in EVAL_LABELS: print(f"Skipping unsupported folder: {class_dir}") continue for image_path in sorted(class_dir.rglob("*")): if image_path.is_file() and image_path.suffix.lower() in INPUT_EXTS: rows.append((image_path, label)) if not rows: raise ValueError(f"No evaluation images found under {data_dir}") return rows def choose_decision_class(result: dict) -> str: ranked_scores = sorted(result["all_scores"].items(), key=lambda item: item[1], reverse=True) top_class, top_score = ranked_scores[0] second_score = ranked_scores[1][1] if len(ranked_scores) > 1 else top_score confidence_gap = float(top_score) - float(second_score) if float(result["confidence"]) < AUTO_APPROVE_THRESHOLD: return "unknown" if UNKNOWN_ON_AMBIGUOUS and confidence_gap < AMBIGUITY_GAP_THRESHOLD: return "unknown" return top_class def save_confusion_matrix(y_true: list[str], y_pred: list[str], output_path: Path) -> None: cm = confusion_matrix(y_true, y_pred, labels=EVAL_LABELS, normalize="true") fig, ax = plt.subplots(figsize=(8, 7)) image = ax.imshow(cm, cmap="Blues", vmin=0, vmax=1) ax.set_xticks(range(len(EVAL_LABELS))) ax.set_yticks(range(len(EVAL_LABELS))) ax.set_xticklabels(EVAL_LABELS, rotation=45, ha="right") ax.set_yticklabels(EVAL_LABELS) ax.set_xlabel("Predicted") ax.set_ylabel("Actual") ax.set_title("Local Field-Test Confusion Matrix") plt.colorbar(image, ax=ax) for row in range(len(EVAL_LABELS)): for col in range(len(EVAL_LABELS)): value = cm[row, col] ax.text( col, row, f"{value:.2f}", ha="center", va="center", fontsize=8, color="white" if value > 0.5 else "black", ) plt.tight_layout() fig.savefig(output_path, dpi=150) plt.close(fig) def main() -> None: parser = argparse.ArgumentParser(description="Evaluate model on a local field-test dataset.") parser.add_argument("--data_dir", default="data/local_test") parser.add_argument("--output_dir", default="reports/local_eval") args = parser.parse_args() data_dir = Path(args.data_dir) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) predictor = WastePredictor() predictor.load() samples = collect_images(data_dir) predictions = [] y_true = [] y_pred = [] for image_path, actual_label in samples: result = predictor.predict(image_path.read_bytes()) predicted_label = choose_decision_class(result) predicted_label = normalize_label(predicted_label) y_true.append(actual_label) y_pred.append(predicted_label) predictions.append( { "image_path": str(image_path), "actual": actual_label, "predicted": predicted_label, "model_class": result["class"], "confidence": result["confidence"], "raw_class": result.get("raw_class"), "raw_confidence": result.get("raw_confidence"), "all_scores": result["all_scores"], "correct": actual_label == predicted_label, } ) report = classification_report( y_true, y_pred, labels=EVAL_LABELS, output_dict=True, zero_division=0, ) readable_report = classification_report( y_true, y_pred, labels=EVAL_LABELS, zero_division=0, ) (output_dir / "metrics.json").write_text(json.dumps(report, indent=2), encoding="utf-8") (output_dir / "classification_report.txt").write_text(readable_report, encoding="utf-8") save_confusion_matrix(y_true, y_pred, output_dir / "confusion_matrix.png") with (output_dir / "predictions.csv").open("w", newline="", encoding="utf-8") as file: writer = csv.DictWriter( file, fieldnames=[ "image_path", "actual", "predicted", "model_class", "confidence", "raw_class", "raw_confidence", "correct", ], ) writer.writeheader() for row in predictions: writer.writerow({key: row.get(key) for key in writer.fieldnames}) mistakes = [row for row in predictions if not row["correct"]] (output_dir / "mistakes.json").write_text(json.dumps(mistakes, indent=2), encoding="utf-8") print(readable_report) print(f"\nEvaluated images : {len(samples)}") print(f"Mistakes : {len(mistakes)}") print(f"Output dir : {output_dir.resolve()}") if __name__ == "__main__": main()