Spaces:
Running
Running
| """Isotonic calibration for EfficientNetAutoAttB4 — §7.6 of MERGE_PLAN. | |
| Fits sklearn.isotonic.IsotonicRegression on EfficientNetAutoAttB4's raw sigmoid | |
| outputs and persists the result to backend/models/efficientnet_calibrator.pkl. | |
| Usage: | |
| .venv/Scripts/python.exe scripts/fit_calibrator.py --real PATH --fake PATH [--val-split 0.2] | |
| Directory layout expected: | |
| --real path/to/real/faces/ (JPEG/PNG face images, labelled 0) | |
| --fake path/to/fake/faces/ (JPEG/PNG deepfake images, labelled 1) | |
| FFPP c40 example (from Phase 11.1 Colab download): | |
| --real training/datasets/ffpp/c40/real/ | |
| --fake training/datasets/ffpp/c40/fake/ | |
| The script: | |
| 1. Runs EfficientNet inference on all images (face detection → sigmoid score). | |
| 2. Splits into train/val (stratified, default 80/20). | |
| 3. Fits IsotonicRegression(out_of_bounds='clip') on training split. | |
| 4. Evaluates on val split: accuracy, real→fake FPR, fake→real FNR. | |
| 5. Saves calibrator to backend/models/efficientnet_calibrator.pkl. | |
| Run time: ~5 min on a 50-200 image set on CPU. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import pickle | |
| import sys | |
| from pathlib import Path | |
| import numpy as np | |
| from loguru import logger | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".bmp"} | |
| CALIBRATOR_OUT = Path(__file__).resolve().parent.parent / "models" / "efficientnet_calibrator.pkl" | |
| def collect_images(directory: Path) -> list[Path]: | |
| return sorted(p for p in directory.rglob("*") if p.suffix.lower() in IMAGE_EXTS) | |
| def score_images(det, paths: list[Path]) -> list[float]: | |
| """Run EfficientNet on each image; return raw sigmoid scores (-1 sentinel for no-face).""" | |
| from PIL import Image | |
| scores = [] | |
| for i, p in enumerate(paths): | |
| try: | |
| pil = Image.open(p).convert("RGB") | |
| except Exception as e: | |
| logger.warning(f"Cannot open {p}: {e}") | |
| scores.append(-1.0) | |
| continue | |
| import numpy as np_inner | |
| img_np = np_inner.array(pil) | |
| frame_data = det.face_extractor.process_image(img=img_np) | |
| faces = frame_data.get("faces", []) | |
| if not faces: | |
| scores.append(-1.0) | |
| else: | |
| face_t = det._face_tensor(faces[0]) | |
| import torch | |
| logit = det.raw_logit(face_t) | |
| from scipy.special import expit | |
| scores.append(float(expit(logit))) | |
| if (i + 1) % 10 == 0: | |
| print(f" scored {i + 1}/{len(paths)}", end="\r") | |
| print() | |
| return scores | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description="Fit isotonic calibrator for EfficientNetAutoAttB4") | |
| parser.add_argument("--real", required=True, type=Path, help="Directory of real face images (label=0)") | |
| parser.add_argument("--fake", required=True, type=Path, help="Directory of deepfake images (label=1)") | |
| parser.add_argument("--val-split", type=float, default=0.2, help="Fraction held out for validation (default 0.2)") | |
| parser.add_argument("--out", type=Path, default=CALIBRATOR_OUT, help="Output pkl path") | |
| args = parser.parse_args() | |
| if not args.real.is_dir(): | |
| print(f"ERROR: --real must be a directory: {args.real}") | |
| return 1 | |
| if not args.fake.is_dir(): | |
| print(f"ERROR: --fake must be a directory: {args.fake}") | |
| return 1 | |
| real_paths = collect_images(args.real) | |
| fake_paths = collect_images(args.fake) | |
| if not real_paths: | |
| print(f"ERROR: No images found in {args.real}") | |
| return 1 | |
| if not fake_paths: | |
| print(f"ERROR: No images found in {args.fake}") | |
| return 1 | |
| print(f"Found {len(real_paths)} real | {len(fake_paths)} fake images") | |
| print("Loading EfficientNetDetector (weights cached after first run)…") | |
| from services.efficientnet_service import EfficientNetDetector | |
| # Load without applying existing calibrator — we are building a new one. | |
| det = EfficientNetDetector(calibrator_path=Path("/dev/null")) | |
| print("Scoring real images…") | |
| real_scores = score_images(det, real_paths) | |
| print("Scoring fake images…") | |
| fake_scores = score_images(det, fake_paths) | |
| # Build arrays, drop no-face sentinels. | |
| r_scores = np.array([s for s in real_scores if s >= 0]) | |
| f_scores = np.array([s for s in fake_scores if s >= 0]) | |
| r_labels = np.zeros(len(r_scores)) | |
| f_labels = np.ones(len(f_scores)) | |
| X = np.concatenate([r_scores, f_scores]) | |
| y = np.concatenate([r_labels, f_labels]) | |
| print(f"\nUsable samples: {len(r_scores)} real | {len(f_scores)} fake") | |
| print(f"No-face dropped: {sum(s < 0 for s in real_scores)} real | {sum(s < 0 for s in fake_scores)} fake") | |
| if len(X) < 10: | |
| print("ERROR: Too few usable samples (<10) to fit a calibrator.") | |
| return 1 | |
| # Stratified train/val split. | |
| from sklearn.model_selection import train_test_split | |
| X_tr, X_val, y_tr, y_val = train_test_split( | |
| X, y, test_size=args.val_split, stratify=y, random_state=42 | |
| ) | |
| print(f"Split: {len(X_tr)} train | {len(X_val)} val") | |
| print("Fitting IsotonicRegression…") | |
| from sklearn.isotonic import IsotonicRegression | |
| cal = IsotonicRegression(out_of_bounds="clip") | |
| cal.fit(X_tr.reshape(-1, 1), y_tr) | |
| # Evaluate on val set. | |
| y_pred_raw = (X_val >= 0.5).astype(int) | |
| y_pred_cal = (cal.predict(X_val.reshape(-1, 1)) >= 0.5).astype(int) | |
| def metrics(y_true, y_pred, tag): | |
| acc = (y_true == y_pred).mean() * 100 | |
| real_mask = y_true == 0 | |
| fpr = (y_pred[real_mask] == 1).mean() * 100 if real_mask.sum() > 0 else 0.0 | |
| fake_mask = y_true == 1 | |
| fnr = (y_pred[fake_mask] == 0).mean() * 100 if fake_mask.sum() > 0 else 0.0 | |
| print(f" [{tag}] acc={acc:.1f}% real→fake FPR={fpr:.1f}% fake→real FNR={fnr:.1f}%") | |
| return acc, fpr | |
| print("\nValidation metrics:") | |
| acc_raw, fpr_raw = metrics(y_val, y_pred_raw, "raw ") | |
| acc_cal, fpr_cal = metrics(y_val, y_pred_cal, "calibrated") | |
| # Gate G3: ≥88% accuracy, ≤8% FPR. | |
| g3_pass = acc_cal >= 88.0 and fpr_cal <= 8.0 | |
| print(f"\n Gate G3: {'PASS ✓' if g3_pass else 'FAIL ✗'} (need acc≥88%, FPR≤8%)") | |
| args.out.parent.mkdir(parents=True, exist_ok=True) | |
| with args.out.open("wb") as f: | |
| pickle.dump(cal, f, protocol=pickle.HIGHEST_PROTOCOL) | |
| print(f"\nCalibrator saved → {args.out}") | |
| print("Restart the backend server for the calibrator to take effect.") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |