File size: 15,157 Bytes
55bcd2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
import numpy as np
import torch
from PIL import Image, ImageFilter

from app.core.config import IMAGE_ENSEMBLE, IMAGE_FAST_ENSEMBLE
from app.core.device import DEVICE
from app.models.loader import load_image_model


# ── Model inference ────────────────────────────────────────────────────────────

def _infer_fake_score(proc, model, img: Image.Image) -> float:
    """
    Stable inference: average over 3 passes to reduce variance.
    Dynamically resolves fake/real indices from id2label, no hardcoded assumptions.
    Returns a score 0β†’1 (1 = synthetic/fake).
    """
    inputs = proc(images=img, return_tensors="pt").to(DEVICE)
    with torch.no_grad():
        logits_list = [model(**inputs).logits for _ in range(3)]
        logits_mean = torch.stack(logits_list).mean(dim=0)
        probs = torch.nn.functional.softmax(logits_mean, dim=-1)[0].cpu().numpy()

    id2label = {int(k): v.lower() for k, v in model.config.id2label.items()}
    fake_kw = ["fake", "ai", "artificial", "synthetic", "generated", "deepfake"]
    real_kw = ["real", "human", "authentic", "genuine"]

    fake_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in fake_kw)]
    real_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in real_kw)]

    if not fake_indices and not real_indices:
        return float(probs[1]) if len(probs) >= 2 else 0.5

    fake_score = float(np.sum([probs[i] for i in fake_indices])) if fake_indices else 0.0
    real_score = float(np.sum([probs[i] for i in real_indices])) if real_indices else 0.0
    total = fake_score + real_score
    return fake_score / total if total > 1e-9 else 0.5


def _run_ensemble(img: Image.Image, ensemble: list) -> dict:
    """Run all models in the ensemble and return weighted score + per-model details."""
    results = {}
    weighted_sum = 0.0
    total_weight = 0.0

    for cfg in ensemble:
        loaded = load_image_model(cfg)
        if loaded is None:
            print(f"  {cfg['key']} skipped (load failed)")
            continue
        proc, model = loaded
        try:
            score = _infer_fake_score(proc, model, img)
            results[cfg["key"]] = {"score": round(score, 4), "weight": cfg["weight"], "desc": cfg["desc"]}
            weighted_sum += score * cfg["weight"]
            total_weight += cfg["weight"]
            print(f"  [{cfg['key']}] fake={score:.4f} Γ— {cfg['weight']}")
        except Exception as e:
            print(f"  [{cfg['key']}] error: {e}")

    ensemble_score = weighted_sum / total_weight if total_weight > 0 else 0.5
    return {"models": results, "ensemble_score": round(ensemble_score, 4)}


# ── Forensic layers ────────────────────────────────────────────────────────────

def _analyze_exif(image_bytes: bytes) -> dict:
    result = {"score": 0.50, "exif_absent": False, "has_camera_info": False,
              "suspicious_software": False, "ai_source": None, "details": []}
    try:
        import piexif
        exif_data   = piexif.load(image_bytes)
        has_content = any(len(exif_data.get(b, {})) > 0 for b in ["0th", "Exif", "GPS", "1st"])
        if not has_content:
            result["exif_absent"] = True
            result["details"].append("EXIF absent")
            return result

        zeroth   = exif_data.get("0th", {})
        exif_ifd = exif_data.get("Exif", {})
        gps_ifd  = exif_data.get("GPS", {})

        sw     = zeroth.get(piexif.ImageIFD.Software, b"").decode("utf-8", errors="ignore").lower()
        desc   = zeroth.get(piexif.ImageIFD.ImageDescription, b"").decode("utf-8", errors="ignore").lower()
        artist = zeroth.get(piexif.ImageIFD.Artist, b"").decode("utf-8", errors="ignore").lower()
        combined = sw + " " + desc + " " + artist

        ai_sources = {
            "stable diffusion": "Stable Diffusion", "midjourney": "Midjourney",
            "dall-e": "DALL-E", "dallΒ·e": "DALL-E", "comfyui": "ComfyUI/SD",
            "automatic1111": "Automatic1111/SD", "generative": "IA GΓ©nΓ©rative",
            "diffusion": "Modèle Diffusion", "novelai": "NovelAI",
            "firefly": "Adobe Firefly", "imagen": "Google Imagen",
            "gemini": "Google Gemini", "flux": "Flux (BFL)",
            "ideogram": "Ideogram", "leonardo": "Leonardo.ai",
            "adobe ai": "Adobe AI", "ai generated": "IA GΓ©nΓ©rique",
            "synthid": "Google SynthID",
        }
        for kw, source in ai_sources.items():
            if kw in combined:
                result["suspicious_software"] = True
                result["ai_source"] = source
                result["score"] = 0.97
                result["details"].append(f"Source IA dΓ©tectΓ©e: {source}")
                return result

        make = zeroth.get(piexif.ImageIFD.Make, b"")
        cam  = zeroth.get(piexif.ImageIFD.Model, b"")
        iso  = exif_ifd.get(piexif.ExifIFD.ISOSpeedRatings)
        shut = exif_ifd.get(piexif.ExifIFD.ExposureTime)
        gps  = bool(gps_ifd and len(gps_ifd) > 2)

        if make or cam:
            result["has_camera_info"] = True
            result["details"].append(
                f"Appareil: {make.decode('utf-8', errors='ignore')} {cam.decode('utf-8', errors='ignore')}".strip()
            )
        if gps:
            result["details"].append("GPS prΓ©sent")

        if result["has_camera_info"] and gps and iso and shut:
            result["score"] = 0.05
        elif result["has_camera_info"] and (iso or shut):
            result["score"] = 0.12
        elif result["has_camera_info"]:
            result["score"] = 0.28
        else:
            result["score"] = 0.55

    except Exception as e:
        result["exif_absent"] = True
        result["details"].append(f"Erreur EXIF: {str(e)[:60]}")
    return result


def _analyze_fft(img: Image.Image, fc: float = 0.0) -> dict:
    result = {"score": 0.50, "details": []}
    try:
        gray = np.array(img.convert("L")).astype(np.float32)
        mag  = np.log1p(np.abs(np.fft.fftshift(np.fft.fft2(gray))))
        h, w = mag.shape
        cy, cx = h // 2, w // 2
        Y, X = np.ogrid[:h, :w]
        dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2)
        rl, rm = min(h, w) // 8, min(h, w) // 4
        le = np.mean(mag[dist <= rl])
        he = np.mean(mag[(dist > rl) & (dist <= rm)])
        fr = he / (le + 1e-9)
        tl = 0.18 if fc > 0.45 else 0.25
        th = 0.85 if fc > 0.45 else 0.72
        ss = 0.70 if fr < tl else (0.55 if fr > th else 0.20)
        result["details"].append(f"Ratio freq. {fr:.3f}" + (" β†’ sur-lissage IA" if fr < tl else " βœ“"))

        pr = np.sum((mag * (dist > 5)) > (np.mean(mag) + 5 * np.std(mag))) / (h * w)
        ps = 0.85 if pr > 0.003 else (0.50 if pr > 0.001 else 0.15)
        result["details"].append(f"Pics GAN: {pr:.4f}" + (" ⚠️" if pr > 0.003 else " βœ“"))

        result["score"] = float(0.55 * ss + 0.45 * ps)
    except Exception as e:
        result["details"].append(f"Erreur FFT: {str(e)[:60]}")
    return result


def _analyze_texture(img: Image.Image, fc: float = 0.0) -> dict:
    result = {"score": 0.50, "details": []}
    try:
        arr  = np.array(img).astype(np.float32)
        gray = np.array(img.convert("L")).astype(np.float32)
        lap  = np.array(img.convert("L").filter(ImageFilter.FIND_EDGES)).astype(np.float32)
        nl   = float(np.std(lap))

        if arr.shape[2] >= 3:
            r, g, b = arr[:, :, 0], arr[:, :, 1], arr[:, :, 2]
            if float(np.mean(np.abs(r - g) < 1)) > 0.98 and float(np.mean(np.abs(g - b) < 1)) > 0.98:
                result["score"] = 0.85
                result["details"].append("Canaux RGB identiques β†’ image IA synthΓ©tique")
                return result

        ts, tm = (5.0, 14.0) if fc > 0.45 else (8.0, 20.0)
        ns = 0.75 if nl > 20.0 else (0.72 if nl < ts else (0.42 if nl < tm else 0.15))
        result["details"].append(f"Bruit: {nl:.1f}")

        h, w, bl = gray.shape[0], gray.shape[1], 32
        stds = [np.std(gray[y:y + bl, x:x + bl]) for y in range(0, h - bl, bl) for x in range(0, w - bl, bl)]
        u  = np.std(stds) / (np.mean(stds) + 1e-9) if stds else 0.5
        ul, uh = (0.20, 0.50) if fc > 0.45 else (0.30, 0.60)
        us = 0.72 if u < ul else (0.38 if u < uh else 0.15)
        result["details"].append(f"UniformitΓ©: {u:.3f}")

        bg_ratio   = float(np.mean(gray > 200))
        border_std = float(np.std(gray[:h // 8, :]))
        if bg_ratio > 0.50 and border_std < 6.0:
            studio_score = 0.88
        elif bg_ratio > 0.50 and border_std < 15.0:
            studio_score = 0.82
        elif bg_ratio > 0.35 and border_std < 25.0:
            studio_score = 0.55
        else:
            studio_score = 0.10
        result["details"].append(f"Fond: {bg_ratio:.0%}")

        result["score"] = float(0.35 * ns + 0.25 * us + 0.40 * studio_score)
    except Exception as e:
        result["details"].append(f"Erreur texture: {str(e)[:60]}")
    return result


def _analyze_color(img: Image.Image) -> dict:
    result = {"score": 0.50, "details": []}
    try:
        arr = np.array(img.convert("RGB")).astype(np.float32)
        r, g, b = arr[:, :, 0].flatten(), arr[:, :, 1].flatten(), arr[:, :, 2].flatten()

        def channel_entropy(ch):
            hist, _ = np.histogram(ch, bins=64, range=(0, 255), density=True)
            hist = hist[hist > 0]
            return float(-np.sum(hist * np.log2(hist + 1e-9)))

        er, eg, eb = channel_entropy(r), channel_entropy(g), channel_entropy(b)
        mean_entropy = (er + eg + eb) / 3.0
        entropy_std  = float(np.std([er, eg, eb]))

        if mean_entropy > 5.2 and entropy_std < 0.15:
            ent_score = 0.72
        elif mean_entropy > 4.8 and entropy_std < 0.25:
            ent_score = 0.45
        else:
            ent_score = 0.20
        result["details"].append(f"Entropie couleur: {mean_entropy:.2f}")

        lum = 0.299 * r + 0.587 * g + 0.114 * b
        extreme_ratio = float(np.mean((lum < 8) | (lum > 247)))
        ext_score = 0.65 if extreme_ratio < 0.005 else (0.35 if extreme_ratio < 0.02 else 0.15)
        result["details"].append(f"Pixels extrΓͺmes: {extreme_ratio:.4f}")

        result["score"] = float(0.60 * ent_score + 0.40 * ext_score)
    except Exception as e:
        result["details"].append(f"Erreur palette: {str(e)[:60]}")
    return result


# ── Fusion ─────────────────────────────────────────────────────────────────────

def _fuse(ensemble_score: float, exif_r: dict, fft_r: dict, tex_r: dict, color_r: dict) -> dict:
    exif_absent = exif_r.get("exif_absent", False)

    if exif_r.get("suspicious_software"):
        profile = "EXIF_IA_DETECTE"
        w = {"ensemble": 0.20, "exif": 0.60, "fft": 0.12, "texture": 0.05, "color": 0.03}
    elif not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.20:
        profile = "EXIF_FIABLE"
        w = {"ensemble": 0.45, "exif": 0.32, "fft": 0.12, "texture": 0.07, "color": 0.04}
    elif exif_absent:
        profile = "EXIF_ABSENT"
        w = {"ensemble": 0.52, "exif": 0.00, "fft": 0.24, "texture": 0.14, "color": 0.10}
    else:
        profile = "STANDARD"
        w = {"ensemble": 0.48, "exif": 0.22, "fft": 0.16, "texture": 0.09, "color": 0.05}

    scores = {
        "ensemble": ensemble_score,
        "exif":     exif_r["score"],
        "fft":      fft_r["score"],
        "texture":  tex_r["score"],
        "color":    color_r["score"],
    }

    raw = sum(w[k] * scores[k] for k in w)

    # Anti-false-positive guardrails
    if ensemble_score < 0.35 and fft_r["score"] < 0.38:
        raw = min(raw, 0.46)
    if not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.15:
        raw = min(raw, 0.82)
    if exif_r.get("suspicious_software") and raw < 0.85:
        raw = max(raw, 0.90)

    # High-confidence ensemble override: modern diffusion models evade forensic layers;
    # when all ML models agree strongly, trust them over FFT/texture/color heuristics.
    if ensemble_score >= 0.80 and not exif_r.get("has_camera_info"):
        raw = max(raw, ensemble_score * 0.90)
    if ensemble_score <= 0.20:
        raw = min(raw, ensemble_score * 1.10 + 0.05)

    return {
        "fake_prob":      round(raw, 4),
        "real_prob":      round(1.0 - raw, 4),
        "layer_scores":   {k: round(v, 4) for k, v in scores.items()},
        "weights_used":   {k: round(v, 2) for k, v in w.items()},
        "fusion_profile": profile,
        "ai_source":      exif_r.get("ai_source"),
    }


# ── Verdict ────────────────────────────────────────────────────────────────────

def _verdict(fake_prob: float, details: dict) -> dict:
    if fake_prob > 0.65:
        verdict    = "DEEPFAKE"
        confidence = "haute" if fake_prob > 0.85 else "moyenne"
        reason     = "Artefacts de synthèse détectés."
    elif fake_prob < 0.35:
        verdict    = "AUTHENTIQUE"
        confidence = "haute" if fake_prob < 0.15 else "moyenne"
        reason     = "Aucun artefact de synthèse détecté."
    else:
        verdict    = "INDÉTERMINÉ"
        confidence = "faible"
        reason     = "Signal ambigu, analyse non concluante."

    if details.get("ai_source"):
        reason = f"Source IA identifiΓ©e dans les mΓ©tadonnΓ©es: {details['ai_source']}."

    return {"verdict": verdict, "confidence": confidence, "reason": reason}


# ── Public API ─────────────────────────────────────────────────────────────────

def run(img: Image.Image, image_bytes: bytes) -> dict:
    """Full analysis: 3-model ensemble + forensic layers."""
    ensemble_result = _run_ensemble(img, IMAGE_ENSEMBLE)
    exif_r  = _analyze_exif(image_bytes)
    fft_r   = _analyze_fft(img)
    tex_r   = _analyze_texture(img)
    color_r = _analyze_color(img)

    fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r)
    verdict = _verdict(fusion["fake_prob"], fusion)

    return {**verdict, **fusion, "models": ensemble_result["models"]}


def run_fast(img: Image.Image, image_bytes: bytes) -> dict:
    """Fast analysis: 2-model ensemble + EXIF only."""
    ensemble_result = _run_ensemble(img, IMAGE_FAST_ENSEMBLE)
    exif_r  = _analyze_exif(image_bytes)
    fft_r   = {"score": 0.50, "details": []}
    tex_r   = {"score": 0.50, "details": []}
    color_r = {"score": 0.50, "details": []}

    fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r)
    verdict = _verdict(fusion["fake_prob"], fusion)

    return {**verdict, **fusion, "models": ensemble_result["models"]}