Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Phase C3/C4 — Image detection eval harness. | |
| Reads MANIFEST.csv, runs classify_image on each fixture present on disk, | |
| and prints a per-family confusion matrix, F1 scores, and per-component | |
| breakdowns so ensemble weights can be tuned. | |
| Usage (from backend/): | |
| .venv/Scripts/python.exe scripts/run_image_eval.py | |
| .venv/Scripts/python.exe scripts/run_image_eval.py --manifest tests/eval/MANIFEST.csv | |
| .venv/Scripts/python.exe scripts/run_image_eval.py --threshold 0.5 --verbose | |
| The script does NOT download images. Populate tests/eval/images/ with the | |
| fixtures listed in MANIFEST.csv before running. | |
| Exit code: | |
| 0 — all per-family accuracy ≥ 70 % and overall accuracy ≥ 75 % | |
| 1 — accuracy thresholds not met (use for CI gating after C4 calibration) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import math | |
| import sys | |
| from pathlib import Path | |
| # Add backend/ to path so imports resolve when run from the project root. | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| MANIFEST_DEFAULT = Path(__file__).resolve().parent.parent / "tests" / "eval" / "MANIFEST.csv" | |
| IMAGES_DEFAULT = Path(__file__).resolve().parent.parent / "tests" / "eval" / "images" | |
| FAMILIES = ["camera-real", "face-swap", "gan-portrait", "diffusion-portrait", "diffusion-noface"] | |
| def _load_manifest(manifest_path: Path, images_root: Path) -> list[dict]: | |
| rows = [] | |
| with open(manifest_path, newline="", encoding="utf-8") as f: | |
| for row in csv.DictReader(f): | |
| img_path = images_root / Path(row["path"]).name | |
| rows.append({ | |
| "path": img_path, | |
| "label": row["label"], | |
| "family": row["family"], | |
| "source": row["source"], | |
| "notes": row.get("notes", ""), | |
| }) | |
| return rows | |
| def _safe_f1(tp: int, fp: int, fn: int) -> float: | |
| denom = 2 * tp + fp + fn | |
| return (2 * tp / denom) if denom > 0 else 0.0 | |
| def _run_eval(args) -> int: | |
| manifest_path = Path(args.manifest) | |
| images_root = Path(args.images_root) | |
| threshold = float(args.threshold) | |
| if not manifest_path.exists(): | |
| print(f"[ERROR] Manifest not found: {manifest_path}") | |
| return 1 | |
| rows = _load_manifest(manifest_path, images_root) | |
| present = [r for r in rows if r["path"].exists()] | |
| missing = [r for r in rows if not r["path"].exists()] | |
| if not present: | |
| print(f"[WARN] No eval images found in {images_root}. Populate the directory first.") | |
| print(f" Expected paths from MANIFEST.csv:") | |
| for r in rows[:5]: | |
| print(f" {r['path']}") | |
| return 0 | |
| print(f"\nEval set: {len(present)} images found / {len(rows)} in manifest " | |
| f"({len(missing)} missing — skipped)") | |
| if missing and args.verbose: | |
| for r in missing: | |
| print(f" [MISSING] {r['path'].name}") | |
| # Load models (lazy — only loads what's needed) | |
| print("\nLoading models...") | |
| from services.image_service import classify_image | |
| from services.exif_service import extract_exif | |
| from utils.scoring import compute_authenticity_score, get_verdict_label | |
| from PIL import Image | |
| results = [] | |
| for i, row in enumerate(present): | |
| try: | |
| pil = Image.open(row["path"]).convert("RGB") | |
| raw = row["path"].read_bytes() | |
| exif_summary = None | |
| try: | |
| exif_summary = extract_exif(pil, raw) | |
| except Exception: | |
| pass | |
| clf = classify_image(pil, exif=exif_summary) | |
| score = compute_authenticity_score(clf.confidence, clf.label) | |
| predicted_fake = clf.confidence >= threshold | |
| actual_fake = row["label"] == "fake" | |
| correct = predicted_fake == actual_fake | |
| results.append({ | |
| **row, | |
| "fake_prob": clf.confidence, | |
| "score": score, | |
| "method": clf.ensemble_method or "", | |
| "predicted_fake": predicted_fake, | |
| "actual_fake": actual_fake, | |
| "correct": correct, | |
| "gating": clf.gating_applied or "", | |
| "components": (clf.evidence_fusion or {}).get("components", {}), | |
| }) | |
| if args.verbose: | |
| mark = "✓" if correct else "✗" | |
| print(f" [{mark}] {row['path'].name:<35} " | |
| f"label={row['label']:<4} " | |
| f"prob={clf.confidence:.3f} score={score:3d} " | |
| f"family={row['family']}") | |
| except Exception as e: | |
| print(f" [ERR] {row['path'].name}: {e}") | |
| if not results: | |
| print("[WARN] No images could be scored.") | |
| return 0 | |
| print("\n" + "=" * 65) | |
| print("OVERALL RESULTS") | |
| print("=" * 65) | |
| total = len(results) | |
| correct = sum(1 for r in results if r["correct"]) | |
| overall_acc = correct / total * 100 | |
| tp = sum(1 for r in results if r["predicted_fake"] and r["actual_fake"]) | |
| fp = sum(1 for r in results if r["predicted_fake"] and not r["actual_fake"]) | |
| fn = sum(1 for r in results if not r["predicted_fake"] and r["actual_fake"]) | |
| tn = sum(1 for r in results if not r["predicted_fake"] and not r["actual_fake"]) | |
| precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 | |
| recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 | |
| f1 = _safe_f1(tp, fp, fn) | |
| fpr = fp / (fp + tn) * 100 if (fp + tn) > 0 else 0.0 | |
| print(f" Accuracy : {overall_acc:.1f}% ({correct}/{total})") | |
| print(f" Precision: {precision:.3f}") | |
| print(f" Recall : {recall:.3f}") | |
| print(f" F1 : {f1:.3f}") | |
| print(f" FPR (real→fake): {fpr:.1f}%") | |
| print("\n" + "-" * 65) | |
| print("PER-FAMILY RESULTS") | |
| print("-" * 65) | |
| family_pass = True | |
| for family in FAMILIES: | |
| family_rows = [r for r in results if r["family"] == family] | |
| if not family_rows: | |
| continue | |
| f_correct = sum(1 for r in family_rows if r["correct"]) | |
| f_acc = f_correct / len(family_rows) * 100 | |
| avg_prob = sum(r["fake_prob"] for r in family_rows) / len(family_rows) | |
| status = "PASS" if f_acc >= 70 else "FAIL" | |
| if f_acc < 70: | |
| family_pass = False | |
| print(f" {family:<22} acc={f_acc:5.1f}% avg_fake_prob={avg_prob:.3f} " | |
| f"n={len(family_rows):3d} [{status}]") | |
| print("\n" + "-" * 65) | |
| print("COMPONENT SIGNAL BREAKDOWN (mean fake_prob per signal per family)") | |
| print("-" * 65) | |
| signal_keys = ["face_stack", "general", "forensics", "exif"] | |
| header = f" {'family':<22}" + "".join(f" {k:<12}" for k in signal_keys) | |
| print(header) | |
| for family in FAMILIES: | |
| family_rows = [r for r in results if r["family"] == family] | |
| if not family_rows: | |
| continue | |
| row_str = f" {family:<22}" | |
| for key in signal_keys: | |
| vals = [r["components"].get(key) for r in family_rows if key in r["components"]] | |
| mean = sum(vals) / len(vals) if vals else None | |
| row_str += f" {mean:.3f} " if mean is not None else f" {'n/a':<12}" | |
| print(row_str) | |
| print("\n" + "-" * 65) | |
| print("GATING EVENTS") | |
| print("-" * 65) | |
| gated = [r for r in results if r["gating"]] | |
| print(f" Total gated: {len(gated)}") | |
| for r in gated: | |
| print(f" {r['path'].name:<35} label={r['label']} {r['gating']}") | |
| all_pass = family_pass and overall_acc >= 75.0 | |
| print("\n" + "=" * 65) | |
| if all_pass: | |
| print("RESULT: PASS — ready for production") | |
| else: | |
| print("RESULT: FAIL — review per-family accuracy and tune weights/thresholds") | |
| print(" Adjust GENERAL_AI_WEIGHT, DIFFUSION_AI_WEIGHT, FACE_STACK_WEIGHT_FACE,") | |
| print(" GENERAL_WEIGHT_FACE in .env or config.py, then re-run.") | |
| print("=" * 65 + "\n") | |
| return 0 if all_pass else 1 | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="DeepShield image detection eval harness") | |
| parser.add_argument("--manifest", default=str(MANIFEST_DEFAULT), | |
| help="Path to MANIFEST.csv") | |
| parser.add_argument("--images-root", default=str(IMAGES_DEFAULT), | |
| help="Directory containing eval images") | |
| parser.add_argument("--threshold", default=0.5, type=float, | |
| help="Fake probability threshold (default: 0.5)") | |
| parser.add_argument("--verbose", action="store_true", | |
| help="Print per-image results") | |
| args = parser.parse_args() | |
| sys.exit(_run_eval(args)) | |
| if __name__ == "__main__": | |
| main() | |