oyabun-dev's picture
deploy: 2026-04-02T00:05:48Z
55bcd2b
import numpy as np
import torch
from PIL import Image, ImageFilter
from app.core.config import IMAGE_ENSEMBLE, IMAGE_FAST_ENSEMBLE
from app.core.device import DEVICE
from app.models.loader import load_image_model
# ── Model inference ────────────────────────────────────────────────────────────
def _infer_fake_score(proc, model, img: Image.Image) -> float:
"""
Stable inference: average over 3 passes to reduce variance.
Dynamically resolves fake/real indices from id2label, no hardcoded assumptions.
Returns a score 0β†’1 (1 = synthetic/fake).
"""
inputs = proc(images=img, return_tensors="pt").to(DEVICE)
with torch.no_grad():
logits_list = [model(**inputs).logits for _ in range(3)]
logits_mean = torch.stack(logits_list).mean(dim=0)
probs = torch.nn.functional.softmax(logits_mean, dim=-1)[0].cpu().numpy()
id2label = {int(k): v.lower() for k, v in model.config.id2label.items()}
fake_kw = ["fake", "ai", "artificial", "synthetic", "generated", "deepfake"]
real_kw = ["real", "human", "authentic", "genuine"]
fake_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in fake_kw)]
real_indices = [i for i, lbl in id2label.items() if any(w in lbl for w in real_kw)]
if not fake_indices and not real_indices:
return float(probs[1]) if len(probs) >= 2 else 0.5
fake_score = float(np.sum([probs[i] for i in fake_indices])) if fake_indices else 0.0
real_score = float(np.sum([probs[i] for i in real_indices])) if real_indices else 0.0
total = fake_score + real_score
return fake_score / total if total > 1e-9 else 0.5
def _run_ensemble(img: Image.Image, ensemble: list) -> dict:
"""Run all models in the ensemble and return weighted score + per-model details."""
results = {}
weighted_sum = 0.0
total_weight = 0.0
for cfg in ensemble:
loaded = load_image_model(cfg)
if loaded is None:
print(f" {cfg['key']} skipped (load failed)")
continue
proc, model = loaded
try:
score = _infer_fake_score(proc, model, img)
results[cfg["key"]] = {"score": round(score, 4), "weight": cfg["weight"], "desc": cfg["desc"]}
weighted_sum += score * cfg["weight"]
total_weight += cfg["weight"]
print(f" [{cfg['key']}] fake={score:.4f} Γ— {cfg['weight']}")
except Exception as e:
print(f" [{cfg['key']}] error: {e}")
ensemble_score = weighted_sum / total_weight if total_weight > 0 else 0.5
return {"models": results, "ensemble_score": round(ensemble_score, 4)}
# ── Forensic layers ────────────────────────────────────────────────────────────
def _analyze_exif(image_bytes: bytes) -> dict:
result = {"score": 0.50, "exif_absent": False, "has_camera_info": False,
"suspicious_software": False, "ai_source": None, "details": []}
try:
import piexif
exif_data = piexif.load(image_bytes)
has_content = any(len(exif_data.get(b, {})) > 0 for b in ["0th", "Exif", "GPS", "1st"])
if not has_content:
result["exif_absent"] = True
result["details"].append("EXIF absent")
return result
zeroth = exif_data.get("0th", {})
exif_ifd = exif_data.get("Exif", {})
gps_ifd = exif_data.get("GPS", {})
sw = zeroth.get(piexif.ImageIFD.Software, b"").decode("utf-8", errors="ignore").lower()
desc = zeroth.get(piexif.ImageIFD.ImageDescription, b"").decode("utf-8", errors="ignore").lower()
artist = zeroth.get(piexif.ImageIFD.Artist, b"").decode("utf-8", errors="ignore").lower()
combined = sw + " " + desc + " " + artist
ai_sources = {
"stable diffusion": "Stable Diffusion", "midjourney": "Midjourney",
"dall-e": "DALL-E", "dallΒ·e": "DALL-E", "comfyui": "ComfyUI/SD",
"automatic1111": "Automatic1111/SD", "generative": "IA GΓ©nΓ©rative",
"diffusion": "Modèle Diffusion", "novelai": "NovelAI",
"firefly": "Adobe Firefly", "imagen": "Google Imagen",
"gemini": "Google Gemini", "flux": "Flux (BFL)",
"ideogram": "Ideogram", "leonardo": "Leonardo.ai",
"adobe ai": "Adobe AI", "ai generated": "IA GΓ©nΓ©rique",
"synthid": "Google SynthID",
}
for kw, source in ai_sources.items():
if kw in combined:
result["suspicious_software"] = True
result["ai_source"] = source
result["score"] = 0.97
result["details"].append(f"Source IA dΓ©tectΓ©e: {source}")
return result
make = zeroth.get(piexif.ImageIFD.Make, b"")
cam = zeroth.get(piexif.ImageIFD.Model, b"")
iso = exif_ifd.get(piexif.ExifIFD.ISOSpeedRatings)
shut = exif_ifd.get(piexif.ExifIFD.ExposureTime)
gps = bool(gps_ifd and len(gps_ifd) > 2)
if make or cam:
result["has_camera_info"] = True
result["details"].append(
f"Appareil: {make.decode('utf-8', errors='ignore')} {cam.decode('utf-8', errors='ignore')}".strip()
)
if gps:
result["details"].append("GPS prΓ©sent")
if result["has_camera_info"] and gps and iso and shut:
result["score"] = 0.05
elif result["has_camera_info"] and (iso or shut):
result["score"] = 0.12
elif result["has_camera_info"]:
result["score"] = 0.28
else:
result["score"] = 0.55
except Exception as e:
result["exif_absent"] = True
result["details"].append(f"Erreur EXIF: {str(e)[:60]}")
return result
def _analyze_fft(img: Image.Image, fc: float = 0.0) -> dict:
result = {"score": 0.50, "details": []}
try:
gray = np.array(img.convert("L")).astype(np.float32)
mag = np.log1p(np.abs(np.fft.fftshift(np.fft.fft2(gray))))
h, w = mag.shape
cy, cx = h // 2, w // 2
Y, X = np.ogrid[:h, :w]
dist = np.sqrt((X - cx) ** 2 + (Y - cy) ** 2)
rl, rm = min(h, w) // 8, min(h, w) // 4
le = np.mean(mag[dist <= rl])
he = np.mean(mag[(dist > rl) & (dist <= rm)])
fr = he / (le + 1e-9)
tl = 0.18 if fc > 0.45 else 0.25
th = 0.85 if fc > 0.45 else 0.72
ss = 0.70 if fr < tl else (0.55 if fr > th else 0.20)
result["details"].append(f"Ratio freq. {fr:.3f}" + (" β†’ sur-lissage IA" if fr < tl else " βœ“"))
pr = np.sum((mag * (dist > 5)) > (np.mean(mag) + 5 * np.std(mag))) / (h * w)
ps = 0.85 if pr > 0.003 else (0.50 if pr > 0.001 else 0.15)
result["details"].append(f"Pics GAN: {pr:.4f}" + (" ⚠️" if pr > 0.003 else " βœ“"))
result["score"] = float(0.55 * ss + 0.45 * ps)
except Exception as e:
result["details"].append(f"Erreur FFT: {str(e)[:60]}")
return result
def _analyze_texture(img: Image.Image, fc: float = 0.0) -> dict:
result = {"score": 0.50, "details": []}
try:
arr = np.array(img).astype(np.float32)
gray = np.array(img.convert("L")).astype(np.float32)
lap = np.array(img.convert("L").filter(ImageFilter.FIND_EDGES)).astype(np.float32)
nl = float(np.std(lap))
if arr.shape[2] >= 3:
r, g, b = arr[:, :, 0], arr[:, :, 1], arr[:, :, 2]
if float(np.mean(np.abs(r - g) < 1)) > 0.98 and float(np.mean(np.abs(g - b) < 1)) > 0.98:
result["score"] = 0.85
result["details"].append("Canaux RGB identiques β†’ image IA synthΓ©tique")
return result
ts, tm = (5.0, 14.0) if fc > 0.45 else (8.0, 20.0)
ns = 0.75 if nl > 20.0 else (0.72 if nl < ts else (0.42 if nl < tm else 0.15))
result["details"].append(f"Bruit: {nl:.1f}")
h, w, bl = gray.shape[0], gray.shape[1], 32
stds = [np.std(gray[y:y + bl, x:x + bl]) for y in range(0, h - bl, bl) for x in range(0, w - bl, bl)]
u = np.std(stds) / (np.mean(stds) + 1e-9) if stds else 0.5
ul, uh = (0.20, 0.50) if fc > 0.45 else (0.30, 0.60)
us = 0.72 if u < ul else (0.38 if u < uh else 0.15)
result["details"].append(f"UniformitΓ©: {u:.3f}")
bg_ratio = float(np.mean(gray > 200))
border_std = float(np.std(gray[:h // 8, :]))
if bg_ratio > 0.50 and border_std < 6.0:
studio_score = 0.88
elif bg_ratio > 0.50 and border_std < 15.0:
studio_score = 0.82
elif bg_ratio > 0.35 and border_std < 25.0:
studio_score = 0.55
else:
studio_score = 0.10
result["details"].append(f"Fond: {bg_ratio:.0%}")
result["score"] = float(0.35 * ns + 0.25 * us + 0.40 * studio_score)
except Exception as e:
result["details"].append(f"Erreur texture: {str(e)[:60]}")
return result
def _analyze_color(img: Image.Image) -> dict:
result = {"score": 0.50, "details": []}
try:
arr = np.array(img.convert("RGB")).astype(np.float32)
r, g, b = arr[:, :, 0].flatten(), arr[:, :, 1].flatten(), arr[:, :, 2].flatten()
def channel_entropy(ch):
hist, _ = np.histogram(ch, bins=64, range=(0, 255), density=True)
hist = hist[hist > 0]
return float(-np.sum(hist * np.log2(hist + 1e-9)))
er, eg, eb = channel_entropy(r), channel_entropy(g), channel_entropy(b)
mean_entropy = (er + eg + eb) / 3.0
entropy_std = float(np.std([er, eg, eb]))
if mean_entropy > 5.2 and entropy_std < 0.15:
ent_score = 0.72
elif mean_entropy > 4.8 and entropy_std < 0.25:
ent_score = 0.45
else:
ent_score = 0.20
result["details"].append(f"Entropie couleur: {mean_entropy:.2f}")
lum = 0.299 * r + 0.587 * g + 0.114 * b
extreme_ratio = float(np.mean((lum < 8) | (lum > 247)))
ext_score = 0.65 if extreme_ratio < 0.005 else (0.35 if extreme_ratio < 0.02 else 0.15)
result["details"].append(f"Pixels extrΓͺmes: {extreme_ratio:.4f}")
result["score"] = float(0.60 * ent_score + 0.40 * ext_score)
except Exception as e:
result["details"].append(f"Erreur palette: {str(e)[:60]}")
return result
# ── Fusion ─────────────────────────────────────────────────────────────────────
def _fuse(ensemble_score: float, exif_r: dict, fft_r: dict, tex_r: dict, color_r: dict) -> dict:
exif_absent = exif_r.get("exif_absent", False)
if exif_r.get("suspicious_software"):
profile = "EXIF_IA_DETECTE"
w = {"ensemble": 0.20, "exif": 0.60, "fft": 0.12, "texture": 0.05, "color": 0.03}
elif not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.20:
profile = "EXIF_FIABLE"
w = {"ensemble": 0.45, "exif": 0.32, "fft": 0.12, "texture": 0.07, "color": 0.04}
elif exif_absent:
profile = "EXIF_ABSENT"
w = {"ensemble": 0.52, "exif": 0.00, "fft": 0.24, "texture": 0.14, "color": 0.10}
else:
profile = "STANDARD"
w = {"ensemble": 0.48, "exif": 0.22, "fft": 0.16, "texture": 0.09, "color": 0.05}
scores = {
"ensemble": ensemble_score,
"exif": exif_r["score"],
"fft": fft_r["score"],
"texture": tex_r["score"],
"color": color_r["score"],
}
raw = sum(w[k] * scores[k] for k in w)
# Anti-false-positive guardrails
if ensemble_score < 0.35 and fft_r["score"] < 0.38:
raw = min(raw, 0.46)
if not exif_absent and exif_r["has_camera_info"] and exif_r["score"] < 0.15:
raw = min(raw, 0.82)
if exif_r.get("suspicious_software") and raw < 0.85:
raw = max(raw, 0.90)
# High-confidence ensemble override: modern diffusion models evade forensic layers;
# when all ML models agree strongly, trust them over FFT/texture/color heuristics.
if ensemble_score >= 0.80 and not exif_r.get("has_camera_info"):
raw = max(raw, ensemble_score * 0.90)
if ensemble_score <= 0.20:
raw = min(raw, ensemble_score * 1.10 + 0.05)
return {
"fake_prob": round(raw, 4),
"real_prob": round(1.0 - raw, 4),
"layer_scores": {k: round(v, 4) for k, v in scores.items()},
"weights_used": {k: round(v, 2) for k, v in w.items()},
"fusion_profile": profile,
"ai_source": exif_r.get("ai_source"),
}
# ── Verdict ────────────────────────────────────────────────────────────────────
def _verdict(fake_prob: float, details: dict) -> dict:
if fake_prob > 0.65:
verdict = "DEEPFAKE"
confidence = "haute" if fake_prob > 0.85 else "moyenne"
reason = "Artefacts de synthèse détectés."
elif fake_prob < 0.35:
verdict = "AUTHENTIQUE"
confidence = "haute" if fake_prob < 0.15 else "moyenne"
reason = "Aucun artefact de synthèse détecté."
else:
verdict = "INDÉTERMINÉ"
confidence = "faible"
reason = "Signal ambigu, analyse non concluante."
if details.get("ai_source"):
reason = f"Source IA identifiΓ©e dans les mΓ©tadonnΓ©es: {details['ai_source']}."
return {"verdict": verdict, "confidence": confidence, "reason": reason}
# ── Public API ─────────────────────────────────────────────────────────────────
def run(img: Image.Image, image_bytes: bytes) -> dict:
"""Full analysis: 3-model ensemble + forensic layers."""
ensemble_result = _run_ensemble(img, IMAGE_ENSEMBLE)
exif_r = _analyze_exif(image_bytes)
fft_r = _analyze_fft(img)
tex_r = _analyze_texture(img)
color_r = _analyze_color(img)
fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r)
verdict = _verdict(fusion["fake_prob"], fusion)
return {**verdict, **fusion, "models": ensemble_result["models"]}
def run_fast(img: Image.Image, image_bytes: bytes) -> dict:
"""Fast analysis: 2-model ensemble + EXIF only."""
ensemble_result = _run_ensemble(img, IMAGE_FAST_ENSEMBLE)
exif_r = _analyze_exif(image_bytes)
fft_r = {"score": 0.50, "details": []}
tex_r = {"score": 0.50, "details": []}
color_r = {"score": 0.50, "details": []}
fusion = _fuse(ensemble_result["ensemble_score"], exif_r, fft_r, tex_r, color_r)
verdict = _verdict(fusion["fake_prob"], fusion)
return {**verdict, **fusion, "models": ensemble_result["models"]}