File size: 6,572 Bytes
fba30db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Isotonic calibration for EfficientNetAutoAttB4 — §7.6 of MERGE_PLAN.

Fits sklearn.isotonic.IsotonicRegression on EfficientNetAutoAttB4's raw sigmoid
outputs and persists the result to backend/models/efficientnet_calibrator.pkl.

Usage:
    .venv/Scripts/python.exe scripts/fit_calibrator.py --real PATH --fake PATH [--val-split 0.2]

Directory layout expected:
    --real path/to/real/faces/      (JPEG/PNG face images, labelled 0)
    --fake path/to/fake/faces/      (JPEG/PNG deepfake images, labelled 1)

FFPP c40 example (from Phase 11.1 Colab download):
    --real training/datasets/ffpp/c40/real/
    --fake training/datasets/ffpp/c40/fake/

The script:
  1. Runs EfficientNet inference on all images (face detection → sigmoid score).
  2. Splits into train/val (stratified, default 80/20).
  3. Fits IsotonicRegression(out_of_bounds='clip') on training split.
  4. Evaluates on val split: accuracy, real→fake FPR, fake→real FNR.
  5. Saves calibrator to backend/models/efficientnet_calibrator.pkl.

Run time: ~5 min on a 50-200 image set on CPU.
"""
from __future__ import annotations

import argparse
import pickle
import sys
from pathlib import Path

import numpy as np
from loguru import logger

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".bmp"}
CALIBRATOR_OUT = Path(__file__).resolve().parent.parent / "models" / "efficientnet_calibrator.pkl"


def collect_images(directory: Path) -> list[Path]:
    return sorted(p for p in directory.rglob("*") if p.suffix.lower() in IMAGE_EXTS)


def score_images(det, paths: list[Path]) -> list[float]:
    """Run EfficientNet on each image; return raw sigmoid scores (-1 sentinel for no-face)."""
    from PIL import Image
    scores = []
    for i, p in enumerate(paths):
        try:
            pil = Image.open(p).convert("RGB")
        except Exception as e:
            logger.warning(f"Cannot open {p}: {e}")
            scores.append(-1.0)
            continue
        import numpy as np_inner
        img_np = np_inner.array(pil)
        frame_data = det.face_extractor.process_image(img=img_np)
        faces = frame_data.get("faces", [])
        if not faces:
            scores.append(-1.0)
        else:
            face_t = det._face_tensor(faces[0])
            import torch
            logit = det.raw_logit(face_t)
            from scipy.special import expit
            scores.append(float(expit(logit)))
        if (i + 1) % 10 == 0:
            print(f"  scored {i + 1}/{len(paths)}", end="\r")
    print()
    return scores


def main() -> int:
    parser = argparse.ArgumentParser(description="Fit isotonic calibrator for EfficientNetAutoAttB4")
    parser.add_argument("--real", required=True, type=Path, help="Directory of real face images (label=0)")
    parser.add_argument("--fake", required=True, type=Path, help="Directory of deepfake images (label=1)")
    parser.add_argument("--val-split", type=float, default=0.2, help="Fraction held out for validation (default 0.2)")
    parser.add_argument("--out", type=Path, default=CALIBRATOR_OUT, help="Output pkl path")
    args = parser.parse_args()

    if not args.real.is_dir():
        print(f"ERROR: --real must be a directory: {args.real}")
        return 1
    if not args.fake.is_dir():
        print(f"ERROR: --fake must be a directory: {args.fake}")
        return 1

    real_paths = collect_images(args.real)
    fake_paths = collect_images(args.fake)
    if not real_paths:
        print(f"ERROR: No images found in {args.real}")
        return 1
    if not fake_paths:
        print(f"ERROR: No images found in {args.fake}")
        return 1
    print(f"Found {len(real_paths)} real | {len(fake_paths)} fake images")

    print("Loading EfficientNetDetector (weights cached after first run)…")
    from services.efficientnet_service import EfficientNetDetector
    # Load without applying existing calibrator — we are building a new one.
    det = EfficientNetDetector(calibrator_path=Path("/dev/null"))

    print("Scoring real images…")
    real_scores = score_images(det, real_paths)
    print("Scoring fake images…")
    fake_scores = score_images(det, fake_paths)

    # Build arrays, drop no-face sentinels.
    r_scores = np.array([s for s in real_scores if s >= 0])
    f_scores = np.array([s for s in fake_scores if s >= 0])
    r_labels = np.zeros(len(r_scores))
    f_labels = np.ones(len(f_scores))

    X = np.concatenate([r_scores, f_scores])
    y = np.concatenate([r_labels, f_labels])
    print(f"\nUsable samples: {len(r_scores)} real | {len(f_scores)} fake")
    print(f"No-face dropped: {sum(s < 0 for s in real_scores)} real | {sum(s < 0 for s in fake_scores)} fake")

    if len(X) < 10:
        print("ERROR: Too few usable samples (<10) to fit a calibrator.")
        return 1

    # Stratified train/val split.
    from sklearn.model_selection import train_test_split
    X_tr, X_val, y_tr, y_val = train_test_split(
        X, y, test_size=args.val_split, stratify=y, random_state=42
    )
    print(f"Split: {len(X_tr)} train | {len(X_val)} val")

    print("Fitting IsotonicRegression…")
    from sklearn.isotonic import IsotonicRegression
    cal = IsotonicRegression(out_of_bounds="clip")
    cal.fit(X_tr.reshape(-1, 1), y_tr)

    # Evaluate on val set.
    y_pred_raw = (X_val >= 0.5).astype(int)
    y_pred_cal = (cal.predict(X_val.reshape(-1, 1)) >= 0.5).astype(int)

    def metrics(y_true, y_pred, tag):
        acc = (y_true == y_pred).mean() * 100
        real_mask = y_true == 0
        fpr = (y_pred[real_mask] == 1).mean() * 100 if real_mask.sum() > 0 else 0.0
        fake_mask = y_true == 1
        fnr = (y_pred[fake_mask] == 0).mean() * 100 if fake_mask.sum() > 0 else 0.0
        print(f"  [{tag}]  acc={acc:.1f}%  real→fake FPR={fpr:.1f}%  fake→real FNR={fnr:.1f}%")
        return acc, fpr

    print("\nValidation metrics:")
    acc_raw, fpr_raw = metrics(y_val, y_pred_raw, "raw     ")
    acc_cal, fpr_cal = metrics(y_val, y_pred_cal, "calibrated")

    # Gate G3: ≥88% accuracy, ≤8% FPR.
    g3_pass = acc_cal >= 88.0 and fpr_cal <= 8.0
    print(f"\n  Gate G3: {'PASS ✓' if g3_pass else 'FAIL ✗'} (need acc≥88%, FPR≤8%)")

    args.out.parent.mkdir(parents=True, exist_ok=True)
    with args.out.open("wb") as f:
        pickle.dump(cal, f, protocol=pickle.HIGHEST_PROTOCOL)
    print(f"\nCalibrator saved → {args.out}")
    print("Restart the backend server for the calibrator to take effect.")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())