Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import io | |
| from typing import List | |
| import numpy as np | |
| from loguru import logger | |
| from PIL import Image | |
| from schemas.common import ArtifactIndicator | |
| def _severity_from_score(score: float) -> str: | |
| if score >= 0.7: | |
| return "high" | |
| if score >= 0.4: | |
| return "medium" | |
| return "low" | |
| # ---------- 1. GAN high-frequency signature (FFT) ---------- | |
| def detect_gan_hf_artifact(pil_img: Image.Image) -> ArtifactIndicator | None: | |
| """Compute high-frequency energy ratio on the luminance channel. | |
| Real photos typically follow a ~1/f spectrum; many GAN outputs show | |
| elevated HF energy or spectral peaks. | |
| """ | |
| try: | |
| gray = np.asarray(pil_img.convert("L"), dtype=np.float32) | |
| # downsample for speed | |
| if max(gray.shape) > 512: | |
| import cv2 | |
| scale = 512 / max(gray.shape) | |
| gray = cv2.resize(gray, (int(gray.shape[1] * scale), int(gray.shape[0] * scale))) | |
| fft = np.fft.fftshift(np.fft.fft2(gray)) | |
| mag = np.abs(fft) | |
| h, w = mag.shape | |
| cy, cx = h // 2, w // 2 | |
| y, x = np.ogrid[:h, :w] | |
| r = np.sqrt((x - cx) ** 2 + (y - cy) ** 2) | |
| r_max = np.sqrt(cx * cx + cy * cy) | |
| hf_mask = r > (0.5 * r_max) | |
| total = float(mag.sum() + 1e-9) | |
| hf = float(mag[hf_mask].sum()) | |
| ratio = hf / total | |
| # Passport/ID portraits often have strong fine detail from hair, fabric, | |
| # sharpening, and JPEG ringing. Treat this as a weak forensic signal | |
| # unless it is extreme; the classifier ensemble remains authoritative. | |
| score = max(0.0, min(1.0, (ratio - 0.24) / 0.28)) | |
| sev = _severity_from_score(score) | |
| return ArtifactIndicator( | |
| type="gan_artifact", | |
| severity=sev, | |
| description=( | |
| "Unnatural pixel noise detected (common in AI-generated images)" if score > 0.4 | |
| else "Pixel noise is within expected range for a natural photo" | |
| ), | |
| confidence=float(score), | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"GAN HF detection failed: {e}") | |
| return None | |
| # ---------- 2. JPEG quantization table anomaly ---------- | |
| _STANDARD_Q_SUMS = { # rough heuristic: camera JPEGs fall in these ranges | |
| 50: (1500, 4500), | |
| 75: (600, 2500), | |
| 90: (200, 1000), | |
| 95: (100, 600), | |
| } | |
| def detect_compression_anomaly(raw_bytes: bytes) -> ArtifactIndicator | None: | |
| """Inspect JPEG quantization tables. Missing tables, non-standard layouts, | |
| or re-saved tables often indicate manipulation or re-encoding. | |
| """ | |
| try: | |
| img = Image.open(io.BytesIO(raw_bytes)) | |
| if img.format != "JPEG": | |
| return ArtifactIndicator( | |
| type="compression", | |
| severity="low", | |
| description=f"Non-JPEG format ({img.format}); compression signature not available", | |
| confidence=0.1, | |
| ) | |
| q = getattr(img, "quantization", None) | |
| if not q: | |
| return ArtifactIndicator( | |
| type="compression", | |
| severity="low", | |
| description="No JPEG quantization tables readable", | |
| confidence=0.2, | |
| ) | |
| tables = list(q.values()) | |
| sums = [int(sum(t)) for t in tables] | |
| num_tables = len(tables) | |
| # Heuristics: very low sum → very high quality (possibly re-saved); | |
| # non-standard number of tables; extreme values. | |
| suspicious = 0.0 | |
| reasons: list[str] = [] | |
| if num_tables not in (1, 2): | |
| suspicious += 0.4 | |
| reasons.append(f"unusual table count ({num_tables})") | |
| if any(s < 60 for s in sums): | |
| suspicious += 0.3 | |
| reasons.append("very low quantization sums (possible re-encoding)") | |
| if any(s > 8000 for s in sums): | |
| suspicious += 0.2 | |
| reasons.append("very high quantization sums") | |
| score = max(0.0, min(1.0, suspicious)) | |
| sev = _severity_from_score(score) | |
| desc = ( | |
| "Signs of repeated image compression or manipulation detected" if reasons else "No unusual compression artifacts detected" | |
| ) | |
| return ArtifactIndicator( | |
| type="compression", | |
| severity=sev, | |
| description=desc, | |
| confidence=float(score), | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"Compression anomaly detection failed: {e}") | |
| return None | |
| # ---------- 3. Facial boundary + 4. Lighting (MediaPipe) ---------- | |
| def detect_face_based_artifacts(pil_img: Image.Image) -> List[ArtifactIndicator]: | |
| """If a face is detected, analyze jaw boundary variance and per-quadrant | |
| luminance balance. Returns 0, 1, or 2 indicators. | |
| """ | |
| results: List[ArtifactIndicator] = [] | |
| try: | |
| from models.model_loader import get_model_loader | |
| detector = get_model_loader().load_face_detector() | |
| if detector is None: | |
| return results | |
| rgb = np.asarray(pil_img.convert("RGB")) | |
| h, w = rgb.shape[:2] | |
| mp_result = detector.process(rgb) | |
| if not mp_result.multi_face_landmarks: | |
| return results | |
| landmarks = mp_result.multi_face_landmarks[0].landmark | |
| # ----- Jaw boundary jitter ----- | |
| # FaceMesh jaw/oval landmark indices (approximate face contour) | |
| JAW_IDX = [ | |
| 10, 338, 297, 332, 284, 251, 389, 356, 454, 323, 361, | |
| 288, 397, 365, 379, 378, 400, 377, 152, 148, 176, 149, | |
| 150, 136, 172, 58, 132, 93, 234, 127, 162, 21, 54, 103, 67, 109, | |
| ] | |
| pts = np.array([(landmarks[i].x * w, landmarks[i].y * h) for i in JAW_IDX]) | |
| # Second-difference magnitude = local curvature jitter | |
| diffs = np.diff(pts, axis=0) | |
| seconds = np.diff(diffs, axis=0) | |
| jitter = float(np.linalg.norm(seconds, axis=1).mean()) / max(w, h) | |
| jitter_score = max(0.0, min(1.0, (jitter - 0.003) / 0.010)) | |
| results.append( | |
| ArtifactIndicator( | |
| type="facial_boundary", | |
| severity=_severity_from_score(jitter_score), | |
| description=( | |
| "Inconsistent blending detected around the facial boundary" if jitter_score > 0.4 | |
| else "Facial boundary blending appears smooth and natural" | |
| ), | |
| confidence=float(jitter_score), | |
| ) | |
| ) | |
| # ----- Lighting inconsistency (per-quadrant luminance) ----- | |
| xs = np.array([lm.x * w for lm in landmarks]) | |
| ys = np.array([lm.y * h for lm in landmarks]) | |
| x0, x1 = int(max(0, xs.min())), int(min(w, xs.max())) | |
| y0, y1 = int(max(0, ys.min())), int(min(h, ys.max())) | |
| if x1 > x0 + 4 and y1 > y0 + 4: | |
| face_crop = rgb[y0:y1, x0:x1] | |
| gray = 0.299 * face_crop[..., 0] + 0.587 * face_crop[..., 1] + 0.114 * face_crop[..., 2] | |
| hh, ww = gray.shape | |
| quads = [ | |
| gray[: hh // 2, : ww // 2], | |
| gray[: hh // 2, ww // 2 :], | |
| gray[hh // 2 :, : ww // 2], | |
| gray[hh // 2 :, ww // 2 :], | |
| ] | |
| means = np.array([q.mean() for q in quads if q.size > 0]) | |
| if means.size == 4 and means.mean() > 1e-3: | |
| imbalance = float(means.std() / means.mean()) | |
| lighting_score = max(0.0, min(1.0, (imbalance - 0.08) / 0.20)) | |
| results.append( | |
| ArtifactIndicator( | |
| type="lighting", | |
| severity=_severity_from_score(lighting_score), | |
| description=( | |
| "Unnatural or inconsistent lighting detected across the face" if lighting_score > 0.4 | |
| else "Facial lighting appears natural and uniform" | |
| ), | |
| confidence=float(lighting_score), | |
| ) | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"Face-based artifact detection failed: {e}") | |
| return results | |
| # ---------- Orchestrator ---------- | |
| def scan_artifacts(pil_img: Image.Image, raw_bytes: bytes) -> List[ArtifactIndicator]: | |
| indicators: List[ArtifactIndicator] = [] | |
| for fn in ( | |
| lambda: detect_gan_hf_artifact(pil_img), | |
| lambda: detect_compression_anomaly(raw_bytes), | |
| ): | |
| ind = fn() | |
| if ind is not None: | |
| indicators.append(ind) | |
| indicators.extend(detect_face_based_artifacts(pil_img)) | |
| return indicators | |