import numpy as np import torch from PIL import Image, ImageFilter from app.core.config import IMAGE_ENSEMBLE, IMAGE_FAST_ENSEMBLE from app.core.device import DEVICE from app.models.loader import load_image_model # ── Model inference ──────────────────────────────────────────────────────────── def _infer_fake_score(proc, model, img: Image.Image) -> float: """ Stable inference: average over 3 passes to reduce variance. Dynamically resolves fake/real indices from id2label, no hardcoded assumptions. Returns a score 0→1 (1 = synthetic/fake). """ inputs = proc(images=img, return_tensors="pt").to(DEVICE) with torch.no_grad(): logits_list = [model(**inputs).logits for _ in range(3)] logits_mean = torch.stack(logits_list).mean(dim=0) probs = torch.nn.functional.softmax(logits_mean, dim=-1)[0].cpu().numpy() id2label = {int(k): v.lower() for k, v in model.config.id2label.items()} fake_kw = ["fake", "ai", "artificial", "synthetic", "generated", "deepfake"] real_kw = ["real", "human", "authentic", "genuine"] fake_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in fake_kw)] real_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in real_kw)] if not fake_indices and not real_indices: return float(probs[1]) if len(probs) >= 2 else 0.5 fake_score = float(np.sum([probs[i] for i in fake_indices])) if fake_indices else 0.0 real_score = float(np.sum([probs[i] for i in real_indices])) if real_indices else 0.0 total = fake_score + real_score return fake_score / total if total > 1e-9 else 0.5 def _run_ensemble(img: Image.Image, ensemble: list) -> dict: """Run all models in the ensemble and return weighted score + per-model details.""" results = {} weighted_sum = 0.0 total_weight = 0.0 for cfg in ensemble: loaded = load_image_model(cfg) if loaded is None: print(f" {cfg['key']} skipped (load failed)") continue proc, model = loaded try: score = _infer_fake_score(proc, model, img) results[cfg["key"]] = {"score": round(score, 4), "weight": cfg["weight"], "desc": cfg["desc"]} weighted_sum += score * cfg["weight"] total_weight += cfg["weight"] print(f" [{cfg['key']}] fake={score:.4f} × {cfg['weight']}") except Exception as e: print(f" [{cfg['key']}] error: {e}") ensemble_score = weighted_sum / total_weight if total_weight > 0 else 0.5 return {"models": results, "ensemble_score": round(ensemble_score, 4)} # ── Forensic layers ──────────────────────────────────────────────────────────── def _analyze_exif(image_bytes: bytes) -> dict: result = {"score": 0.50, "exif_absent": False, "has_camera_info": False, "suspicious_software": False, "ai_source": None, "details": []} try: import piexif exif_data = piexif.load(image_bytes) has_content = any(len(exif_data.get(b, {})) > 0 for b in ["0th", "Exif", "GPS", "1st"]) if not has_content: result["exif_absent"] = True result["details"].append("EXIF absent") return result zeroth = exif_data.get("0th", {}) exif_ifd = exif_data.get("Exif", {}) gps_ifd = exif_data.get("GPS", {}) sw = zeroth.get(piexif.ImageIFD.Software, b"").decode("utf-8", errors="ignore").lower() desc = zeroth.get(piexif.ImageIFD.ImageDescription, b"").decode("utf-8", errors="ignore").lower() artist = zeroth.get(piexif.ImageIFD.Artist, b"").decode("utf-8", errors="ignore").lower() combined = sw + " " + desc + " " + artist ai_sources = { "stable diffusion": "Stable Diffusion", "midjourney": "Midjourney", "dall-e": "DALL-E", "dall·e": "DALL-E", "comfyui": "ComfyUI/SD", "automatic1111": "Automatic1111/SD", "generative": "IA Générative", "diffusion": "Modèle Diffusion", "novelai": "NovelAI", "firefly": "Adobe Firefly", "imagen": "Google Imagen", "gemini": "Google Gemini", "flux": "Flux (BFL)", "ideogram": "Ideogram", "leonardo": "Leonardo.ai", "adobe ai": "Adobe AI", "ai generated": "IA Générique", "synthid": "Google SynthID", } for kw, source in ai_sources.items(): if kw in combined: result["suspicious_software"] = True result["ai_source"] = source result["score"] = 0.97 result["details"].append(f"Source IA détectée: {source}") return result make = zeroth.get(piexif.ImageIFD.Make, b"") cam = zeroth.get(piexif.ImageIFD.Model, b"") iso = exif_ifd.get(piexif.ExifIFD.ISOSpeedRatings) shut = exif_ifd.get(piexif.ExifIFD.ExposureTime) gps = bool(gps_ifd and len(gps_ifd) > 2) if make or cam: result["has_camera_info"] = True result["details"].append( f"Appareil: {make.decode('utf-8', errors='ignore')} {cam.decode('utf-8', errors='ignore')}".strip() ) if gps: result["details"].append("GPS présent") if result["has_camera_info"] and gps and iso and shut: result["score"] = 0.05 elif result["has_camera_info"] and (iso or shut): result["score"] = 0.12 elif result["has_camera_info"]: result["score"] = 0.28 else: result["score"] = 0.55 except Exception as e: result["exif_absent"] = True result["details"].append(f"Erreur EXIF: {str(e)[:60]}") return result def _analyze_fft(img: Image.Image, fc: float = 0.0) -> dict: result = {"score": 0.50, "details": []} try: gray = np.array(img.convert("L")).astype(np.float32) mag = np.log1p(np.abs(np.fft.fftshift(np.fft.fft2(gray)))) h, w = mag.shape cy, cx = h // 2, w // 2 Y, X = np.ogrid[:h, :w] dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2) rl, rm = min(h, w) // 8, min(h, w) // 4 le = np.mean(mag[dist <= rl]) he = np.mean(mag[(dist > rl) & (dist <= rm)]) fr = he / (le + 1e-9) tl = 0.18 if fc > 0.45 else 0.25 th = 0.85 if fc > 0.45 else 0.72 ss = 0.70 if fr < tl else (0.55 if fr > th else 0.20) result["details"].append(f"Ratio freq. {fr:.3f}" + (" → sur-lissage IA" if fr < tl else " ✓")) pr = np.sum((mag * (dist > 5)) > (np.mean(mag) + 5 * np.std(mag))) / (h * w) ps = 0.85 if pr > 0.003 else (0.50 if pr > 0.001 else 0.15) result["details"].append(f"Pics GAN: {pr:.4f}" + (" ⚠️" if pr > 0.003 else " ✓")) result["score"] = float(0.55 * ss + 0.45 * ps) except Exception as e: result["details"].append(f"Erreur FFT: {str(e)[:60]}") return result def _analyze_texture(img: Image.Image, fc: float = 0.0) -> dict: result = {"score": 0.50, "details": []} try: arr = np.array(img).astype(np.float32) gray = np.array(img.convert("L")).astype(np.float32) lap = np.array(img.convert("L").filter(ImageFilter.FIND_EDGES)).astype(np.float32) nl = float(np.std(lap)) if arr.shape[2] >= 3: r, g, b = arr[:, :, 0], arr[:, :, 1], arr[:, :, 2] if float(np.mean(np.abs(r - g) < 1)) > 0.98 and float(np.mean(np.abs(g - b) < 1)) > 0.98: result["score"] = 0.85 result["details"].append("Canaux RGB identiques → image IA synthétique") return result ts, tm = (5.0, 14.0) if fc > 0.45 else (8.0, 20.0) ns = 0.75 if nl > 20.0 else (0.72 if nl < ts else (0.42 if nl < tm else 0.15)) result["details"].append(f"Bruit: {nl:.1f}") h, w, bl = gray.shape[0], gray.shape[1], 32 stds = [np.std(gray[y:y + bl, x:x + bl]) for y in range(0, h - bl, bl) for x in range(0, w - bl, bl)] u = np.std(stds) / (np.mean(stds) + 1e-9) if stds else 0.5 ul, uh = (0.20, 0.50) if fc > 0.45 else (0.30, 0.60) us = 0.72 if u < ul else (0.38 if u < uh else 0.15) result["details"].append(f"Uniformité: {u:.3f}") bg_ratio = float(np.mean(gray > 200)) border_std = float(np.std(gray[:h // 8, :])) if bg_ratio > 0.50 and border_std < 6.0: studio_score = 0.88 elif bg_ratio > 0.50 and border_std < 15.0: studio_score = 0.82 elif bg_ratio > 0.35 and border_std < 25.0: studio_score = 0.55 else: studio_score = 0.10 result["details"].append(f"Fond: {bg_ratio:.0%}") result["score"] = float(0.35 * ns + 0.25 * us + 0.40 * studio_score) except Exception as e: result["details"].append(f"Erreur texture: {str(e)[:60]}") return result def _analyze_color(img: Image.Image) -> dict: result = {"score": 0.50, "details": []} try: arr = np.array(img.convert("RGB")).astype(np.float32) r, g, b = arr[:, :, 0].flatten(), arr[:, :, 1].flatten(), arr[:, :, 2].flatten() def channel_entropy(ch): hist, _ = np.histogram(ch, bins=64, range=(0, 255), density=True) hist = hist[hist > 0] return float(-np.sum(hist * np.log2(hist + 1e-9))) er, eg, eb = channel_entropy(r), channel_entropy(g), channel_entropy(b) mean_entropy = (er + eg + eb) / 3.0 entropy_std = float(np.std([er, eg, eb])) if mean_entropy > 5.2 and entropy_std < 0.15: ent_score = 0.72 elif mean_entropy > 4.8 and entropy_std < 0.25: ent_score = 0.45 else: ent_score = 0.20 result["details"].append(f"Entropie couleur: {mean_entropy:.2f}") lum = 0.299 * r + 0.587 * g + 0.114 * b extreme_ratio = float(np.mean((lum < 8) | (lum > 247))) ext_score = 0.65 if extreme_ratio < 0.005 else (0.35 if extreme_ratio < 0.02 else 0.15) result["details"].append(f"Pixels extrêmes: {extreme_ratio:.4f}") result["score"] = float(0.60 * ent_score + 0.40 * ext_score) except Exception as e: result["details"].append(f"Erreur palette: {str(e)[:60]}") return result # ── Fusion ───────────────────────────────────────────────────────────────────── def _fuse(ensemble_score: float, exif_r: dict, fft_r: dict, tex_r: dict, color_r: dict) -> dict: exif_absent = exif_r.get("exif_absent", False) if exif_r.get("suspicious_software"): profile = "EXIF_IA_DETECTE" w = {"ensemble": 0.20, "exif": 0.60, "fft": 0.12, "texture": 0.05, "color": 0.03} elif not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.20: profile = "EXIF_FIABLE" w = {"ensemble": 0.45, "exif": 0.32, "fft": 0.12, "texture": 0.07, "color": 0.04} elif exif_absent: profile = "EXIF_ABSENT" w = {"ensemble": 0.52, "exif": 0.00, "fft": 0.24, "texture": 0.14, "color": 0.10} else: profile = "STANDARD" w = {"ensemble": 0.48, "exif": 0.22, "fft": 0.16, "texture": 0.09, "color": 0.05} scores = { "ensemble": ensemble_score, "exif": exif_r["score"], "fft": fft_r["score"], "texture": tex_r["score"], "color": color_r["score"], } raw = sum(w[k] * scores[k] for k in w) # Anti-false-positive guardrails if ensemble_score < 0.35 and fft_r["score"] < 0.38: raw = min(raw, 0.46) if not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.15: raw = min(raw, 0.82) if exif_r.get("suspicious_software") and raw < 0.85: raw = max(raw, 0.90) # High-confidence ensemble override: modern diffusion models evade forensic layers; # when all ML models agree strongly, trust them over FFT/texture/color heuristics. if ensemble_score >= 0.80 and not exif_r.get("has_camera_info"): raw = max(raw, ensemble_score * 0.90) if ensemble_score <= 0.20: raw = min(raw, ensemble_score * 1.10 + 0.05) return { "fake_prob": round(raw, 4), "real_prob": round(1.0 - raw, 4), "layer_scores": {k: round(v, 4) for k, v in scores.items()}, "weights_used": {k: round(v, 2) for k, v in w.items()}, "fusion_profile": profile, "ai_source": exif_r.get("ai_source"), } # ── Verdict ──────────────────────────────────────────────────────────────────── def _verdict(fake_prob: float, details: dict) -> dict: if fake_prob > 0.65: verdict = "DEEPFAKE" confidence = "haute" if fake_prob > 0.85 else "moyenne" reason = "Artefacts de synthèse détectés." elif fake_prob < 0.35: verdict = "AUTHENTIQUE" confidence = "haute" if fake_prob < 0.15 else "moyenne" reason = "Aucun artefact de synthèse détecté." else: verdict = "INDÉTERMINÉ" confidence = "faible" reason = "Signal ambigu, analyse non concluante." if details.get("ai_source"): reason = f"Source IA identifiée dans les métadonnées: {details['ai_source']}." return {"verdict": verdict, "confidence": confidence, "reason": reason} # ── Public API ───────────────────────────────────────────────────────────────── def run(img: Image.Image, image_bytes: bytes) -> dict: """Full analysis: 3-model ensemble + forensic layers.""" ensemble_result = _run_ensemble(img, IMAGE_ENSEMBLE) exif_r = _analyze_exif(image_bytes) fft_r = _analyze_fft(img) tex_r = _analyze_texture(img) color_r = _analyze_color(img) fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r) verdict = _verdict(fusion["fake_prob"], fusion) return {**verdict, **fusion, "models": ensemble_result["models"]} def run_fast(img: Image.Image, image_bytes: bytes) -> dict: """Fast analysis: 2-model ensemble + EXIF only.""" ensemble_result = _run_ensemble(img, IMAGE_FAST_ENSEMBLE) exif_r = _analyze_exif(image_bytes) fft_r = {"score": 0.50, "details": []} tex_r = {"score": 0.50, "details": []} color_r = {"score": 0.50, "details": []} fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r) verdict = _verdict(fusion["fake_prob"], fusion) return {**verdict, **fusion, "models": ensemble_result["models"]}