deepshield / scripts /fit_calibrator.py
ar07xd's picture
Sync from GitHub via hub-sync
fba30db verified
"""Isotonic calibration for EfficientNetAutoAttB4 — §7.6 of MERGE_PLAN.
Fits sklearn.isotonic.IsotonicRegression on EfficientNetAutoAttB4's raw sigmoid
outputs and persists the result to backend/models/efficientnet_calibrator.pkl.
Usage:
.venv/Scripts/python.exe scripts/fit_calibrator.py --real PATH --fake PATH [--val-split 0.2]
Directory layout expected:
--real path/to/real/faces/ (JPEG/PNG face images, labelled 0)
--fake path/to/fake/faces/ (JPEG/PNG deepfake images, labelled 1)
FFPP c40 example (from Phase 11.1 Colab download):
--real training/datasets/ffpp/c40/real/
--fake training/datasets/ffpp/c40/fake/
The script:
1. Runs EfficientNet inference on all images (face detection → sigmoid score).
2. Splits into train/val (stratified, default 80/20).
3. Fits IsotonicRegression(out_of_bounds='clip') on training split.
4. Evaluates on val split: accuracy, real→fake FPR, fake→real FNR.
5. Saves calibrator to backend/models/efficientnet_calibrator.pkl.
Run time: ~5 min on a 50-200 image set on CPU.
"""
from __future__ import annotations
import argparse
import pickle
import sys
from pathlib import Path
import numpy as np
from loguru import logger
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".bmp"}
CALIBRATOR_OUT = Path(__file__).resolve().parent.parent / "models" / "efficientnet_calibrator.pkl"
def collect_images(directory: Path) -> list[Path]:
return sorted(p for p in directory.rglob("*") if p.suffix.lower() in IMAGE_EXTS)
def score_images(det, paths: list[Path]) -> list[float]:
"""Run EfficientNet on each image; return raw sigmoid scores (-1 sentinel for no-face)."""
from PIL import Image
scores = []
for i, p in enumerate(paths):
try:
pil = Image.open(p).convert("RGB")
except Exception as e:
logger.warning(f"Cannot open {p}: {e}")
scores.append(-1.0)
continue
import numpy as np_inner
img_np = np_inner.array(pil)
frame_data = det.face_extractor.process_image(img=img_np)
faces = frame_data.get("faces", [])
if not faces:
scores.append(-1.0)
else:
face_t = det._face_tensor(faces[0])
import torch
logit = det.raw_logit(face_t)
from scipy.special import expit
scores.append(float(expit(logit)))
if (i + 1) % 10 == 0:
print(f" scored {i + 1}/{len(paths)}", end="\r")
print()
return scores
def main() -> int:
parser = argparse.ArgumentParser(description="Fit isotonic calibrator for EfficientNetAutoAttB4")
parser.add_argument("--real", required=True, type=Path, help="Directory of real face images (label=0)")
parser.add_argument("--fake", required=True, type=Path, help="Directory of deepfake images (label=1)")
parser.add_argument("--val-split", type=float, default=0.2, help="Fraction held out for validation (default 0.2)")
parser.add_argument("--out", type=Path, default=CALIBRATOR_OUT, help="Output pkl path")
args = parser.parse_args()
if not args.real.is_dir():
print(f"ERROR: --real must be a directory: {args.real}")
return 1
if not args.fake.is_dir():
print(f"ERROR: --fake must be a directory: {args.fake}")
return 1
real_paths = collect_images(args.real)
fake_paths = collect_images(args.fake)
if not real_paths:
print(f"ERROR: No images found in {args.real}")
return 1
if not fake_paths:
print(f"ERROR: No images found in {args.fake}")
return 1
print(f"Found {len(real_paths)} real | {len(fake_paths)} fake images")
print("Loading EfficientNetDetector (weights cached after first run)…")
from services.efficientnet_service import EfficientNetDetector
# Load without applying existing calibrator — we are building a new one.
det = EfficientNetDetector(calibrator_path=Path("/dev/null"))
print("Scoring real images…")
real_scores = score_images(det, real_paths)
print("Scoring fake images…")
fake_scores = score_images(det, fake_paths)
# Build arrays, drop no-face sentinels.
r_scores = np.array([s for s in real_scores if s >= 0])
f_scores = np.array([s for s in fake_scores if s >= 0])
r_labels = np.zeros(len(r_scores))
f_labels = np.ones(len(f_scores))
X = np.concatenate([r_scores, f_scores])
y = np.concatenate([r_labels, f_labels])
print(f"\nUsable samples: {len(r_scores)} real | {len(f_scores)} fake")
print(f"No-face dropped: {sum(s < 0 for s in real_scores)} real | {sum(s < 0 for s in fake_scores)} fake")
if len(X) < 10:
print("ERROR: Too few usable samples (<10) to fit a calibrator.")
return 1
# Stratified train/val split.
from sklearn.model_selection import train_test_split
X_tr, X_val, y_tr, y_val = train_test_split(
X, y, test_size=args.val_split, stratify=y, random_state=42
)
print(f"Split: {len(X_tr)} train | {len(X_val)} val")
print("Fitting IsotonicRegression…")
from sklearn.isotonic import IsotonicRegression
cal = IsotonicRegression(out_of_bounds="clip")
cal.fit(X_tr.reshape(-1, 1), y_tr)
# Evaluate on val set.
y_pred_raw = (X_val >= 0.5).astype(int)
y_pred_cal = (cal.predict(X_val.reshape(-1, 1)) >= 0.5).astype(int)
def metrics(y_true, y_pred, tag):
acc = (y_true == y_pred).mean() * 100
real_mask = y_true == 0
fpr = (y_pred[real_mask] == 1).mean() * 100 if real_mask.sum() > 0 else 0.0
fake_mask = y_true == 1
fnr = (y_pred[fake_mask] == 0).mean() * 100 if fake_mask.sum() > 0 else 0.0
print(f" [{tag}] acc={acc:.1f}% real→fake FPR={fpr:.1f}% fake→real FNR={fnr:.1f}%")
return acc, fpr
print("\nValidation metrics:")
acc_raw, fpr_raw = metrics(y_val, y_pred_raw, "raw ")
acc_cal, fpr_cal = metrics(y_val, y_pred_cal, "calibrated")
# Gate G3: ≥88% accuracy, ≤8% FPR.
g3_pass = acc_cal >= 88.0 and fpr_cal <= 8.0
print(f"\n Gate G3: {'PASS ✓' if g3_pass else 'FAIL ✗'} (need acc≥88%, FPR≤8%)")
args.out.parent.mkdir(parents=True, exist_ok=True)
with args.out.open("wb") as f:
pickle.dump(cal, f, protocol=pickle.HIGHEST_PROTOCOL)
print(f"\nCalibrator saved → {args.out}")
print("Restart the backend server for the calibrator to take effect.")
return 0
if __name__ == "__main__":
raise SystemExit(main())