Spaces:
Sleeping
Sleeping
| import io | |
| import logging | |
| from typing import Optional | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| _pipe = None | |
| def _load_pipeline(): | |
| global _pipe | |
| if _pipe is not None: | |
| return _pipe | |
| try: | |
| from transformers import pipeline | |
| _pipe = pipeline( | |
| "image-classification", | |
| model="dima806/deepfake_vs_real_image_detection", | |
| device=-1, | |
| ) | |
| logger.info("Deepfake detection model loaded.") | |
| except Exception as e: | |
| logger.warning(f"Could not load deepfake model: {e}. Using heuristic fallback.") | |
| _pipe = None | |
| return _pipe | |
| def _make_synthetic_image(is_fake: bool): | |
| from PIL import Image | |
| rng = np.random.default_rng(seed=1 if is_fake else 99) | |
| img = Image.new("RGB", (224, 224)) | |
| pixels = img.load() | |
| for i in range(224): | |
| for j in range(224): | |
| if is_fake: | |
| r = int(128 + 60 * np.sin(i / 9.0) * np.cos(j / 9.0)) | |
| g = int(128 + 60 * np.cos(i / 7.0) * np.sin(j / 11.0)) | |
| b = int(128 + 40 * np.sin((i + j) / 14.0)) | |
| else: | |
| base = int(80 + 100 * (i / 224)) | |
| noise = int(rng.normal(0, 12)) | |
| r = max(0, min(255, base + noise + 20)) | |
| g = max(0, min(255, base + noise)) | |
| b = max(0, min(255, base + noise - 15)) | |
| pixels[j, i] = ( | |
| max(0, min(255, r)), | |
| max(0, min(255, g)), | |
| max(0, min(255, b)), | |
| ) | |
| return img | |
| def score_deepfake(is_fake: bool) -> float: | |
| pipe = _load_pipeline() | |
| if pipe is None: | |
| return 0.78 if is_fake else 0.22 | |
| try: | |
| img = _make_synthetic_image(is_fake) | |
| results = pipe(img) | |
| for r in results: | |
| label_lower = r["label"].lower() | |
| if any(kw in label_lower for kw in ("fake", "deepfake", "manipulat", "ai_gen", "synthetic")): | |
| return float(r["score"]) | |
| top_label = results[0]["label"].lower() | |
| top_score = float(results[0]["score"]) | |
| if any(kw in top_label for kw in ("real", "authentic", "genuine")): | |
| return 1.0 - top_score | |
| return top_score | |
| except Exception as e: | |
| logger.warning(f"Deepfake scoring error: {e}") | |
| return 0.75 if is_fake else 0.25 | |
| def precompute_detector_scores(items: list) -> list: | |
| enriched = [] | |
| for item in items: | |
| is_fake = item.get("ground_truth", {}).get("is_deepfake", False) | |
| item = dict(item) | |
| item["detector_score"] = score_deepfake(is_fake) | |
| enriched.append(item) | |
| return enriched | |