Spaces:
Running
Running
| import numpy as np | |
| import torch | |
| from PIL import Image, ImageFilter | |
| from app.core.config import IMAGE_ENSEMBLE, IMAGE_FAST_ENSEMBLE | |
| from app.core.device import DEVICE | |
| from app.models.loader import load_image_model | |
| # ββ Model inference ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _infer_fake_score(proc, model, img: Image.Image) -> float: | |
| """ | |
| Stable inference: average over 3 passes to reduce variance. | |
| Dynamically resolves fake/real indices from id2label, no hardcoded assumptions. | |
| Returns a score 0β1 (1 = synthetic/fake). | |
| """ | |
| inputs = proc(images=img, return_tensors="pt").to(DEVICE) | |
| with torch.no_grad(): | |
| logits_list = [model(**inputs).logits for _ in range(3)] | |
| logits_mean = torch.stack(logits_list).mean(dim=0) | |
| probs = torch.nn.functional.softmax(logits_mean, dim=-1)[0].cpu().numpy() | |
| id2label = {int(k): v.lower() for k, v in model.config.id2label.items()} | |
| fake_kw = ["fake", "ai", "artificial", "synthetic", "generated", "deepfake"] | |
| real_kw = ["real", "human", "authentic", "genuine"] | |
| fake_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in fake_kw)] | |
| real_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in real_kw)] | |
| if not fake_indices and not real_indices: | |
| return float(probs[1]) if len(probs) >= 2 else 0.5 | |
| fake_score = float(np.sum([probs[i] for i in fake_indices])) if fake_indices else 0.0 | |
| real_score = float(np.sum([probs[i] for i in real_indices])) if real_indices else 0.0 | |
| total = fake_score + real_score | |
| return fake_score / total if total > 1e-9 else 0.5 | |
| def _run_ensemble(img: Image.Image, ensemble: list) -> dict: | |
| """Run all models in the ensemble and return weighted score + per-model details.""" | |
| results = {} | |
| weighted_sum = 0.0 | |
| total_weight = 0.0 | |
| for cfg in ensemble: | |
| loaded = load_image_model(cfg) | |
| if loaded is None: | |
| print(f" {cfg['key']} skipped (load failed)") | |
| continue | |
| proc, model = loaded | |
| try: | |
| score = _infer_fake_score(proc, model, img) | |
| results[cfg["key"]] = {"score": round(score, 4), "weight": cfg["weight"], "desc": cfg["desc"]} | |
| weighted_sum += score * cfg["weight"] | |
| total_weight += cfg["weight"] | |
| print(f" [{cfg['key']}] fake={score:.4f} Γ {cfg['weight']}") | |
| except Exception as e: | |
| print(f" [{cfg['key']}] error: {e}") | |
| ensemble_score = weighted_sum / total_weight if total_weight > 0 else 0.5 | |
| return {"models": results, "ensemble_score": round(ensemble_score, 4)} | |
| # ββ Forensic layers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _analyze_exif(image_bytes: bytes) -> dict: | |
| result = {"score": 0.50, "exif_absent": False, "has_camera_info": False, | |
| "suspicious_software": False, "ai_source": None, "details": []} | |
| try: | |
| import piexif | |
| exif_data = piexif.load(image_bytes) | |
| has_content = any(len(exif_data.get(b, {})) > 0 for b in ["0th", "Exif", "GPS", "1st"]) | |
| if not has_content: | |
| result["exif_absent"] = True | |
| result["details"].append("EXIF absent") | |
| return result | |
| zeroth = exif_data.get("0th", {}) | |
| exif_ifd = exif_data.get("Exif", {}) | |
| gps_ifd = exif_data.get("GPS", {}) | |
| sw = zeroth.get(piexif.ImageIFD.Software, b"").decode("utf-8", errors="ignore").lower() | |
| desc = zeroth.get(piexif.ImageIFD.ImageDescription, b"").decode("utf-8", errors="ignore").lower() | |
| artist = zeroth.get(piexif.ImageIFD.Artist, b"").decode("utf-8", errors="ignore").lower() | |
| combined = sw + " " + desc + " " + artist | |
| ai_sources = { | |
| "stable diffusion": "Stable Diffusion", "midjourney": "Midjourney", | |
| "dall-e": "DALL-E", "dallΒ·e": "DALL-E", "comfyui": "ComfyUI/SD", | |
| "automatic1111": "Automatic1111/SD", "generative": "IA GΓ©nΓ©rative", | |
| "diffusion": "Modèle Diffusion", "novelai": "NovelAI", | |
| "firefly": "Adobe Firefly", "imagen": "Google Imagen", | |
| "gemini": "Google Gemini", "flux": "Flux (BFL)", | |
| "ideogram": "Ideogram", "leonardo": "Leonardo.ai", | |
| "adobe ai": "Adobe AI", "ai generated": "IA GΓ©nΓ©rique", | |
| "synthid": "Google SynthID", | |
| } | |
| for kw, source in ai_sources.items(): | |
| if kw in combined: | |
| result["suspicious_software"] = True | |
| result["ai_source"] = source | |
| result["score"] = 0.97 | |
| result["details"].append(f"Source IA dΓ©tectΓ©e: {source}") | |
| return result | |
| make = zeroth.get(piexif.ImageIFD.Make, b"") | |
| cam = zeroth.get(piexif.ImageIFD.Model, b"") | |
| iso = exif_ifd.get(piexif.ExifIFD.ISOSpeedRatings) | |
| shut = exif_ifd.get(piexif.ExifIFD.ExposureTime) | |
| gps = bool(gps_ifd and len(gps_ifd) > 2) | |
| if make or cam: | |
| result["has_camera_info"] = True | |
| result["details"].append( | |
| f"Appareil: {make.decode('utf-8', errors='ignore')} {cam.decode('utf-8', errors='ignore')}".strip() | |
| ) | |
| if gps: | |
| result["details"].append("GPS prΓ©sent") | |
| if result["has_camera_info"] and gps and iso and shut: | |
| result["score"] = 0.05 | |
| elif result["has_camera_info"] and (iso or shut): | |
| result["score"] = 0.12 | |
| elif result["has_camera_info"]: | |
| result["score"] = 0.28 | |
| else: | |
| result["score"] = 0.55 | |
| except Exception as e: | |
| result["exif_absent"] = True | |
| result["details"].append(f"Erreur EXIF: {str(e)[:60]}") | |
| return result | |
| def _analyze_fft(img: Image.Image, fc: float = 0.0) -> dict: | |
| result = {"score": 0.50, "details": []} | |
| try: | |
| gray = np.array(img.convert("L")).astype(np.float32) | |
| mag = np.log1p(np.abs(np.fft.fftshift(np.fft.fft2(gray)))) | |
| h, w = mag.shape | |
| cy, cx = h // 2, w // 2 | |
| Y, X = np.ogrid[:h, :w] | |
| dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2) | |
| rl, rm = min(h, w) // 8, min(h, w) // 4 | |
| le = np.mean(mag[dist <= rl]) | |
| he = np.mean(mag[(dist > rl) & (dist <= rm)]) | |
| fr = he / (le + 1e-9) | |
| tl = 0.18 if fc > 0.45 else 0.25 | |
| th = 0.85 if fc > 0.45 else 0.72 | |
| ss = 0.70 if fr < tl else (0.55 if fr > th else 0.20) | |
| result["details"].append(f"Ratio freq. {fr:.3f}" + (" β sur-lissage IA" if fr < tl else " β")) | |
| pr = np.sum((mag * (dist > 5)) > (np.mean(mag) + 5 * np.std(mag))) / (h * w) | |
| ps = 0.85 if pr > 0.003 else (0.50 if pr > 0.001 else 0.15) | |
| result["details"].append(f"Pics GAN: {pr:.4f}" + (" β οΈ" if pr > 0.003 else " β")) | |
| result["score"] = float(0.55 * ss + 0.45 * ps) | |
| except Exception as e: | |
| result["details"].append(f"Erreur FFT: {str(e)[:60]}") | |
| return result | |
| def _analyze_texture(img: Image.Image, fc: float = 0.0) -> dict: | |
| result = {"score": 0.50, "details": []} | |
| try: | |
| arr = np.array(img).astype(np.float32) | |
| gray = np.array(img.convert("L")).astype(np.float32) | |
| lap = np.array(img.convert("L").filter(ImageFilter.FIND_EDGES)).astype(np.float32) | |
| nl = float(np.std(lap)) | |
| if arr.shape[2] >= 3: | |
| r, g, b = arr[:, :, 0], arr[:, :, 1], arr[:, :, 2] | |
| if float(np.mean(np.abs(r - g) < 1)) > 0.98 and float(np.mean(np.abs(g - b) < 1)) > 0.98: | |
| result["score"] = 0.85 | |
| result["details"].append("Canaux RGB identiques β image IA synthΓ©tique") | |
| return result | |
| ts, tm = (5.0, 14.0) if fc > 0.45 else (8.0, 20.0) | |
| ns = 0.75 if nl > 20.0 else (0.72 if nl < ts else (0.42 if nl < tm else 0.15)) | |
| result["details"].append(f"Bruit: {nl:.1f}") | |
| h, w, bl = gray.shape[0], gray.shape[1], 32 | |
| stds = [np.std(gray[y:y + bl, x:x + bl]) for y in range(0, h - bl, bl) for x in range(0, w - bl, bl)] | |
| u = np.std(stds) / (np.mean(stds) + 1e-9) if stds else 0.5 | |
| ul, uh = (0.20, 0.50) if fc > 0.45 else (0.30, 0.60) | |
| us = 0.72 if u < ul else (0.38 if u < uh else 0.15) | |
| result["details"].append(f"UniformitΓ©: {u:.3f}") | |
| bg_ratio = float(np.mean(gray > 200)) | |
| border_std = float(np.std(gray[:h // 8, :])) | |
| if bg_ratio > 0.50 and border_std < 6.0: | |
| studio_score = 0.88 | |
| elif bg_ratio > 0.50 and border_std < 15.0: | |
| studio_score = 0.82 | |
| elif bg_ratio > 0.35 and border_std < 25.0: | |
| studio_score = 0.55 | |
| else: | |
| studio_score = 0.10 | |
| result["details"].append(f"Fond: {bg_ratio:.0%}") | |
| result["score"] = float(0.35 * ns + 0.25 * us + 0.40 * studio_score) | |
| except Exception as e: | |
| result["details"].append(f"Erreur texture: {str(e)[:60]}") | |
| return result | |
| def _analyze_color(img: Image.Image) -> dict: | |
| result = {"score": 0.50, "details": []} | |
| try: | |
| arr = np.array(img.convert("RGB")).astype(np.float32) | |
| r, g, b = arr[:, :, 0].flatten(), arr[:, :, 1].flatten(), arr[:, :, 2].flatten() | |
| def channel_entropy(ch): | |
| hist, _ = np.histogram(ch, bins=64, range=(0, 255), density=True) | |
| hist = hist[hist > 0] | |
| return float(-np.sum(hist * np.log2(hist + 1e-9))) | |
| er, eg, eb = channel_entropy(r), channel_entropy(g), channel_entropy(b) | |
| mean_entropy = (er + eg + eb) / 3.0 | |
| entropy_std = float(np.std([er, eg, eb])) | |
| if mean_entropy > 5.2 and entropy_std < 0.15: | |
| ent_score = 0.72 | |
| elif mean_entropy > 4.8 and entropy_std < 0.25: | |
| ent_score = 0.45 | |
| else: | |
| ent_score = 0.20 | |
| result["details"].append(f"Entropie couleur: {mean_entropy:.2f}") | |
| lum = 0.299 * r + 0.587 * g + 0.114 * b | |
| extreme_ratio = float(np.mean((lum < 8) | (lum > 247))) | |
| ext_score = 0.65 if extreme_ratio < 0.005 else (0.35 if extreme_ratio < 0.02 else 0.15) | |
| result["details"].append(f"Pixels extrΓͺmes: {extreme_ratio:.4f}") | |
| result["score"] = float(0.60 * ent_score + 0.40 * ext_score) | |
| except Exception as e: | |
| result["details"].append(f"Erreur palette: {str(e)[:60]}") | |
| return result | |
| # ββ Fusion βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fuse(ensemble_score: float, exif_r: dict, fft_r: dict, tex_r: dict, color_r: dict) -> dict: | |
| exif_absent = exif_r.get("exif_absent", False) | |
| if exif_r.get("suspicious_software"): | |
| profile = "EXIF_IA_DETECTE" | |
| w = {"ensemble": 0.20, "exif": 0.60, "fft": 0.12, "texture": 0.05, "color": 0.03} | |
| elif not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.20: | |
| profile = "EXIF_FIABLE" | |
| w = {"ensemble": 0.45, "exif": 0.32, "fft": 0.12, "texture": 0.07, "color": 0.04} | |
| elif exif_absent: | |
| profile = "EXIF_ABSENT" | |
| w = {"ensemble": 0.52, "exif": 0.00, "fft": 0.24, "texture": 0.14, "color": 0.10} | |
| else: | |
| profile = "STANDARD" | |
| w = {"ensemble": 0.48, "exif": 0.22, "fft": 0.16, "texture": 0.09, "color": 0.05} | |
| scores = { | |
| "ensemble": ensemble_score, | |
| "exif": exif_r["score"], | |
| "fft": fft_r["score"], | |
| "texture": tex_r["score"], | |
| "color": color_r["score"], | |
| } | |
| raw = sum(w[k] * scores[k] for k in w) | |
| # Anti-false-positive guardrails | |
| if ensemble_score < 0.35 and fft_r["score"] < 0.38: | |
| raw = min(raw, 0.46) | |
| if not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.15: | |
| raw = min(raw, 0.82) | |
| if exif_r.get("suspicious_software") and raw < 0.85: | |
| raw = max(raw, 0.90) | |
| # High-confidence ensemble override: modern diffusion models evade forensic layers; | |
| # when all ML models agree strongly, trust them over FFT/texture/color heuristics. | |
| if ensemble_score >= 0.80 and not exif_r.get("has_camera_info"): | |
| raw = max(raw, ensemble_score * 0.90) | |
| if ensemble_score <= 0.20: | |
| raw = min(raw, ensemble_score * 1.10 + 0.05) | |
| return { | |
| "fake_prob": round(raw, 4), | |
| "real_prob": round(1.0 - raw, 4), | |
| "layer_scores": {k: round(v, 4) for k, v in scores.items()}, | |
| "weights_used": {k: round(v, 2) for k, v in w.items()}, | |
| "fusion_profile": profile, | |
| "ai_source": exif_r.get("ai_source"), | |
| } | |
| # ββ Verdict ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _verdict(fake_prob: float, details: dict) -> dict: | |
| if fake_prob > 0.65: | |
| verdict = "DEEPFAKE" | |
| confidence = "haute" if fake_prob > 0.85 else "moyenne" | |
| reason = "Artefacts de synthèse détectés." | |
| elif fake_prob < 0.35: | |
| verdict = "AUTHENTIQUE" | |
| confidence = "haute" if fake_prob < 0.15 else "moyenne" | |
| reason = "Aucun artefact de synthèse détecté." | |
| else: | |
| verdict = "INDΓTERMINΓ" | |
| confidence = "faible" | |
| reason = "Signal ambigu, analyse non concluante." | |
| if details.get("ai_source"): | |
| reason = f"Source IA identifiΓ©e dans les mΓ©tadonnΓ©es: {details['ai_source']}." | |
| return {"verdict": verdict, "confidence": confidence, "reason": reason} | |
| # ββ Public API βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run(img: Image.Image, image_bytes: bytes) -> dict: | |
| """Full analysis: 3-model ensemble + forensic layers.""" | |
| ensemble_result = _run_ensemble(img, IMAGE_ENSEMBLE) | |
| exif_r = _analyze_exif(image_bytes) | |
| fft_r = _analyze_fft(img) | |
| tex_r = _analyze_texture(img) | |
| color_r = _analyze_color(img) | |
| fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r) | |
| verdict = _verdict(fusion["fake_prob"], fusion) | |
| return {**verdict, **fusion, "models": ensemble_result["models"]} | |
| def run_fast(img: Image.Image, image_bytes: bytes) -> dict: | |
| """Fast analysis: 2-model ensemble + EXIF only.""" | |
| ensemble_result = _run_ensemble(img, IMAGE_FAST_ENSEMBLE) | |
| exif_r = _analyze_exif(image_bytes) | |
| fft_r = {"score": 0.50, "details": []} | |
| tex_r = {"score": 0.50, "details": []} | |
| color_r = {"score": 0.50, "details": []} | |
| fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r) | |
| verdict = _verdict(fusion["fake_prob"], fusion) | |
| return {**verdict, **fusion, "models": ensemble_result["models"]} | |