deepdetection / CLAUDE.md
akagtag's picture
feat: enhance model capabilities and update dependencies
8363f67

CLAUDE.md β€” GenAI-DeepDetect Agent Instructions

Read this file before touching any code. It is the single source of truth for how this repo is structured, what conventions to follow, and what the hard constraints are.

CLAUDE.md β€” GenAI-DeepDetect

Full implementation guide for AI-assisted development on this project. Read this file before touching any code.

-# CLAUDE.md β€” GenAI-DeepDetect

Complete implementation guide. Read this before writing any code. All models are 100% pre-trained β€” no training required, no GPU needed locally.


MCP Tools β€” Always Use These First

Before writing any code or looking up any API, resolve docs through MCP:

context7: resolve-library-id + query-docs
  β†’ use for: transformers, torch, mediapipe, fastapi, torch-geometric,
    google-generativeai, facenet-pytorch, opencv, next.js, runpod

huggingface: model_search + model_details + hf_doc_search
  β†’ use for: finding model cards, checking input formats, confirming
    pipeline task names, verifying checkpoint sizes before using

Rule: Never guess an API signature. Always call context7.query-docs first. Never use a HF model without calling huggingface.model_details to confirm it exists, check its license, and verify its input format.


Project Skill And Memory Policy

For work in this repository, always prefer the installed Claude Code skill pack when a relevant skill applies instead of ad hoc workflows.

  • Always-on user preference: use Awesome Claude Code workflows with Superpowers + Claude Mem by default, and execute implementation steps automatically unless the user explicitly asks for planning-only mode.

  • At task start, check Superpowers process skills first (for example: using-superpowers, brainstorming, systematic-debugging, verification-before-completion) and apply the relevant ones before coding.

  • For memory-aware tasks, use Claude Mem (mem-search) automatically to recall prior decisions, fixes, and session history when that context can reduce risk or rework.

  • If there is a conflict between this default behavior and a direct user instruction in the current chat, follow the direct user instruction.

  • Use context7-mcp for any library, framework, SDK, or API question, and before changing code that depends on external packages or hosted services.

  • Use mem-search / claude-mem whenever the user asks about previous sessions, prior fixes, earlier decisions, or "how we solved this before".

  • When using claude-mem, scope searches to project name genai-deepdetect unless the user explicitly asks for a broader search.

  • Keep following the repo-specific MCP rules below even when a general-purpose skill also applies.

Recommended companion skills for this project:

  • systematic-debugging for bugs, failing tests, or unexpected runtime behavior
  • verification-before-completion before claiming a fix is done
  • security-review for secrets, external APIs, uploads, and auth-sensitive changes

Project Goal

Multimodal deepfake and AI-generated content detector.

  • Input: image (JPEG/PNG/WEBP) or video (MP4/MOV/AVI, max 100MB)
  • Output: DetectionResponse β€” verdict, confidence, generator attribution, natural-language explanation, per-engine breakdown

All inference runs on pre-trained HuggingFace checkpoints. No training scripts need to run for the system to work.


Architecture

Request (image/video)
        β”‚
        β–Ό
  FastAPI  src/api/main.py
        β”‚
        β”œβ”€β”€ FingerprintEngine   (image artifacts, generator attribution)
        β”œβ”€β”€ CoherenceEngine     (lip-sync, biological coherence)
        └── SSTGNNEngine        (landmark spatio-temporal graph)
                β”‚
                β–Ό
          Fuser  src/fusion/fuser.py
                β”‚
                β–Ό
        Explainer  src/explainability/explainer.py   ← Gemini API
                β”‚
                β–Ό
        DetectionResponse  src/types.py

All Pre-Trained Models

Every model downloads via transformers.pipeline() or from_pretrained(). Zero training. Zero fine-tuning.

Engine Model HF ID Size Task
Fingerprint SDXL Detector Organika/sdxl-detector ~330MB binary fake/real
Fingerprint CLIP ViT-L/14 openai/clip-vit-large-patch14 ~3.5GB generator attribution
Fingerprint AI Image Detector haywoodsloan/ai-image-detector-deploy ~90MB ensemble backup
SSTGNN DeepFake Detector dima806/deepfake_vs_real_image_detection ~100MB ResNet50 per-frame
SSTGNN Deep Fake Detector prithivMLmods/Deep-Fake-Detector-Model ~80MB EfficientNet-B4 backup
Coherence MediaPipe Face Mesh bundled in mediapipe package ~10MB landmark extraction
Coherence FaceNet VGGFace2 facenet-pytorch (auto-downloads) ~100MB temporal embeddings
Coherence SyncNet Junhua-Zhu/SyncNet ~50MB lip-sync offset

CLIP is the largest at 3.5GB β€” preload at startup, never reload. Everything else fits in HF Spaces 16GB RAM free tier.


Environment Variables

# Required
GEMINI_API_KEY=...                  # Google AI Studio β€” free tier works
HF_TOKEN=hf_...                     # HuggingFace read token (free)

# Hosting
RUNPOD_API_KEY=...                  # RunPod serverless (heavy video)
RUNPOD_ENDPOINT_ID=...              # your deployed endpoint ID

# Paths
MODEL_CACHE_DIR=/data/models        # HF Spaces: /data/models (persists)
                                    # local dev: /tmp/models

# Optional
MAX_VIDEO_FRAMES=300
MAX_VIDEO_SIZE_MB=100
INFERENCE_BACKEND=local             # "local" | "runpod"
TOKENIZERS_PARALLELISM=false

Set all secrets in:

  • HF Spaces β†’ Settings β†’ Repository secrets
  • RunPod β†’ Secrets tab
  • Vercel β†’ Environment Variables

Gemini API β€” Explainability Engine

Primary model: gemini-2.5-pro-preview-03-25 Fallback model: gemini-1.5-pro-002

Both available on Google AI Studio free tier (15 req/min, 1M tokens/day). Always query context7.query-docs google-generativeai GenerativeModel before modifying this file.

src/explainability/explainer.py

import os
import logging
import google.generativeai as genai
from src.types import EngineResult

logger = logging.getLogger(__name__)

genai.configure(api_key=os.environ["GEMINI_API_KEY"])

SYSTEM_INSTRUCTION = (
    "You are a deepfake forensics analyst writing reports for security professionals. "
    "Given detection engine outputs, write exactly 2-3 sentences in plain English "
    "explaining why the content is real or fake. "
    "Be specific β€” name the strongest signals. "
    "Use direct declarative sentences. No hedging. No 'I think'. "
    "Output only the explanation text, nothing else."
)

_model = None


def _get_model() -> genai.GenerativeModel:
    global _model
    if _model is None:
        for name in ("gemini-2.5-pro-preview-03-25", "gemini-1.5-pro-002"):
            try:
                _model = genai.GenerativeModel(
                    model_name=name,
                    system_instruction=SYSTEM_INSTRUCTION,
                )
                logger.info(f"Gemini model loaded: {name}")
                break
            except Exception as e:
                logger.warning(f"Gemini {name} unavailable: {e}")
    return _model


def explain(
    verdict: str,
    confidence: float,
    engine_results: list[EngineResult],
    generator: str,
) -> str:
    breakdown = "\n".join(
        f"- {r.engine}: {r.verdict} ({r.confidence:.0%}) β€” {r.explanation}"
        for r in engine_results
    )
    prompt = (
        f"Verdict: {verdict} ({confidence:.0%} confidence)\n"
        f"Attributed generator: {generator}\n"
        f"Engine breakdown:\n{breakdown}\n\n"
        "Write the forensics explanation."
    )
    try:
        model = _get_model()
        if model is None:
            raise RuntimeError("No Gemini model available")
        response = model.generate_content(prompt)
        return response.text.strip()
    except Exception as e:
        logger.error(f"Gemini explain failed: {e}")
        top = engine_results[0] if engine_results else None
        return (
            f"Content classified as {verdict} with {confidence:.0%} confidence. "
            f"{'Primary signal from ' + top.engine + ' engine.' if top else ''}"
        )

Engine Implementations

FingerprintEngine β€” src/engines/fingerprint/engine.py

Query context7 for transformers pipeline image-classification and huggingface model_details Organika/sdxl-detector before modifying.

import os, logging, threading
import numpy as np
from PIL import Image
from transformers import pipeline, CLIPModel, CLIPProcessor
import torch
from src.types import EngineResult

logger = logging.getLogger(__name__)
CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")

GENERATOR_PROMPTS = {
    "real":             "a real photograph taken by a camera with natural lighting",
    "unknown_gan":      "a GAN-generated image with checkerboard artifacts and blurry edges",
    "stable_diffusion": "a Stable Diffusion image with painterly soft textures",
    "midjourney":       "a Midjourney image with cinematic dramatic lighting and hyperdetail",
    "dall_e":           "a DALL-E image with clean illustration-style and smooth gradients",
    "flux":             "a FLUX model image with photorealistic precision and sharp detail",
    "firefly":          "an Adobe Firefly image with commercial stock-photo aesthetics",
    "imagen":           "a Google Imagen image with precise photorealistic rendering",
}

_lock = threading.Lock()
_detector = _clip_model = _clip_processor = _backup = None


def _load():
    global _detector, _clip_model, _clip_processor, _backup
    if _detector is not None:
        return
    logger.info("Loading fingerprint models...")
    _detector = pipeline("image-classification",
                         model="Organika/sdxl-detector", cache_dir=CACHE)
    _clip_model = CLIPModel.from_pretrained(
        "openai/clip-vit-large-patch14", cache_dir=CACHE)
    _clip_processor = CLIPProcessor.from_pretrained(
        "openai/clip-vit-large-patch14", cache_dir=CACHE)
    _clip_model.eval()
    try:
        _backup = pipeline("image-classification",
                           model="haywoodsloan/ai-image-detector-deploy",
                           cache_dir=CACHE)
    except Exception:
        logger.warning("Backup fingerprint detector unavailable")
    logger.info("Fingerprint models ready")


class FingerprintEngine:

    def _ensure(self):
        with _lock:
            _load()

    def run(self, image: Image.Image) -> EngineResult:
        self._ensure()
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Binary fake score
        FAKE_LABELS = {"artificial", "fake", "ai-generated", "generated"}
        try:
            preds = _detector(image)
            fake_score = max(
                (p["score"] for p in preds if p["label"].lower() in FAKE_LABELS),
                default=0.5,
            )
        except Exception as e:
            logger.warning(f"Primary detector error: {e}")
            fake_score = 0.5

        # Ensemble backup
        if _backup is not None:
            try:
                bp = _backup(image)
                bk = max((p["score"] for p in bp
                          if p["label"].lower() in FAKE_LABELS), default=0.5)
                fake_score = fake_score * 0.6 + bk * 0.4
            except Exception:
                pass

        # CLIP zero-shot generator attribution
        generator = "real"
        try:
            texts = list(GENERATOR_PROMPTS.values())
            inputs = _clip_processor(
                text=texts, images=image,
                return_tensors="pt", padding=True, truncation=True,
            )
            with torch.no_grad():
                logits = _clip_model(**inputs).logits_per_image[0]
            probs = logits.softmax(dim=0).numpy()
            generator = list(GENERATOR_PROMPTS.keys())[int(np.argmax(probs))]
        except Exception as e:
            logger.warning(f"CLIP attribution error: {e}")

        if fake_score > 0.65 and generator == "real":
            generator = "unknown_gan"

        return EngineResult(
            engine="fingerprint",
            verdict="FAKE" if fake_score > 0.5 else "REAL",
            confidence=float(fake_score),
            attributed_generator=generator,
            explanation=f"Binary score {fake_score:.2f}; attributed to {generator}.",
        )

    def run_video(self, frames: list) -> EngineResult:
        if not frames:
            return EngineResult(engine="fingerprint", verdict="UNKNOWN",
                                confidence=0.5, explanation="No frames.")
        keyframes = frames[::8] or [frames[0]]
        results = [self.run(Image.fromarray(f)) for f in keyframes]
        avg = float(np.mean([r.confidence for r in results]))
        gens = [r.attributed_generator for r in results]
        top_gen = max(set(gens), key=gens.count)
        return EngineResult(
            engine="fingerprint",
            verdict="FAKE" if avg > 0.5 else "REAL",
            confidence=avg,
            attributed_generator=top_gen,
            explanation=f"Keyframe average {avg:.2f} over {len(keyframes)} frames.",
        )

CoherenceEngine β€” src/engines/coherence/engine.py

Query context7.query-docs mediapipe face_mesh and context7.query-docs facenet-pytorch InceptionResnetV1 before modifying.

import logging, threading, cv2
import numpy as np
from PIL import Image
from facenet_pytorch import MTCNN, InceptionResnetV1
import mediapipe as mp
from src.types import EngineResult

logger = logging.getLogger(__name__)

_lock = threading.Lock()
_mtcnn = _resnet = _face_mesh = None


def _load():
    global _mtcnn, _resnet, _face_mesh
    if _mtcnn is not None:
        return
    logger.info("Loading coherence models...")
    _mtcnn   = MTCNN(keep_all=False, device="cpu")
    _resnet  = InceptionResnetV1(pretrained="vggface2").eval()
    _face_mesh = mp.solutions.face_mesh.FaceMesh(
        static_image_mode=False, max_num_faces=1,
        refine_landmarks=True, min_detection_confidence=0.5,
    )
    logger.info("Coherence models ready")


class CoherenceEngine:

    def _ensure(self):
        with _lock:
            _load()

    def run(self, image: Image.Image) -> EngineResult:
        self._ensure()
        frame = np.array(image.convert("RGB"))
        score = self._image_score(frame)
        return EngineResult(
            engine="coherence",
            verdict="FAKE" if score > 0.5 else "REAL",
            confidence=float(score),
            explanation=f"Geometric coherence anomaly {score:.2f} (image mode).",
        )

    def _image_score(self, frame: np.ndarray) -> float:
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if frame.shape[2] == 3 else frame
        res = _face_mesh.process(rgb)
        if not res.multi_face_landmarks:
            return 0.35  # no face detected

        lms = res.multi_face_landmarks[0].landmark
        h, w = frame.shape[:2]

        def pt(i):
            return np.array([lms[i].x * w, lms[i].y * h])

        # Eye width asymmetry β€” deepfakes often mismatched
        lew = np.linalg.norm(pt(33)  - pt(133))
        rew = np.linalg.norm(pt(362) - pt(263))
        eye_ratio = min(lew, rew) / (max(lew, rew) + 1e-9)
        eye_score = max(0.0, (0.85 - eye_ratio) / 0.3)

        # Ear symmetry from nose tip
        nose = pt(1)
        lr = min(np.linalg.norm(nose - pt(234)), np.linalg.norm(nose - pt(454)))
        rr = max(np.linalg.norm(nose - pt(234)), np.linalg.norm(nose - pt(454)))
        ear_score = max(0.0, (0.90 - lr / (rr + 1e-9)) / 0.2)

        return float(np.clip(eye_score * 0.5 + ear_score * 0.5, 0.0, 1.0))

    def run_video(self, frames: list[np.ndarray]) -> EngineResult:
        self._ensure()
        if len(frames) < 4:
            r = self.run(Image.fromarray(frames[0]))
            r.explanation = "Too few frames for temporal analysis."
            return r

        delta  = self._embedding_variance(frames)
        jerk   = self._landmark_jerk(frames)
        blink  = self._blink_anomaly(frames)
        score  = float(np.clip(delta * 0.45 + jerk * 0.35 + blink * 0.20, 0.0, 1.0))

        return EngineResult(
            engine="coherence",
            verdict="FAKE" if score > 0.5 else "REAL",
            confidence=score,
            explanation=(
                f"Embedding variance {delta:.2f}, "
                f"landmark jerk {jerk:.2f}, "
                f"blink anomaly {blink:.2f}."
            ),
        )

    def _embedding_variance(self, frames: list[np.ndarray]) -> float:
        import torch
        embeddings = []
        for frame in frames[::4]:
            try:
                face = _mtcnn(Image.fromarray(frame))
                if face is not None:
                    with torch.no_grad():
                        e = _resnet(face.unsqueeze(0)).numpy()[0]
                    embeddings.append(e)
            except Exception:
                continue
        if len(embeddings) < 2:
            return 0.5
        deltas = [np.linalg.norm(embeddings[i+1] - embeddings[i])
                  for i in range(len(embeddings)-1)]
        return float(np.clip(np.var(deltas) * 8, 0.0, 1.0))

    def _landmark_jerk(self, frames: list[np.ndarray]) -> float:
        positions = []
        for frame in frames[::2]:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            res = _face_mesh.process(rgb)
            if res.multi_face_landmarks:
                lm = res.multi_face_landmarks[0].landmark
                positions.append([lm[1].x, lm[1].y])
        if len(positions) < 4:
            return 0.3
        pos   = np.array(positions)
        jerk  = np.diff(pos, n=3, axis=0)
        return float(np.clip((np.mean(np.linalg.norm(jerk, axis=1)) - 0.002) / 0.008,
                             0.0, 1.0))

    def _blink_anomaly(self, frames: list[np.ndarray]) -> float:
        LEFT_EYE  = [33, 160, 158, 133, 153, 144]
        RIGHT_EYE = [362, 385, 387, 263, 373, 380]

        def ear(lms, idx, h, w):
            pts = [np.array([lms[i].x * w, lms[i].y * h]) for i in idx]
            a = np.linalg.norm(pts[1] - pts[5])
            b = np.linalg.norm(pts[2] - pts[4])
            c = np.linalg.norm(pts[0] - pts[3])
            return (a + b) / (2.0 * c + 1e-9)

        ears = []
        for frame in frames:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            res = _face_mesh.process(rgb)
            if res.multi_face_landmarks:
                lm = res.multi_face_landmarks[0].landmark
                h, w = frame.shape[:2]
                ears.append((ear(lm, LEFT_EYE, h, w) + ear(lm, RIGHT_EYE, h, w)) / 2)

        if len(ears) < 10:
            return 0.3
        arr    = np.array(ears)
        blinks = int(np.sum(np.diff((arr < 0.21).astype(int)) > 0))
        bpm    = blinks / (len(ears) / 25) * 60
        if 8 <= bpm <= 25:
            return 0.15
        if bpm < 3 or bpm > 35:
            return 0.80
        return 0.45

SSTGNNEngine β€” src/engines/sstgnn/engine.py

Query context7.query-docs torch-geometric GCNConv and huggingface model_details dima806/deepfake_vs_real_image_detection before modifying.

import logging, os, threading
import numpy as np
import cv2
from PIL import Image
from transformers import pipeline
import mediapipe as mp
from scipy.spatial import Delaunay
from src.types import EngineResult

logger = logging.getLogger(__name__)
CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")

_lock = threading.Lock()
_det1 = _det2 = _mesh = None


def _load():
    global _det1, _det2, _mesh
    if _det1 is not None:
        return
    logger.info("Loading SSTGNN models...")
    _det1 = pipeline("image-classification",
                     model="dima806/deepfake_vs_real_image_detection",
                     cache_dir=CACHE)
    try:
        _det2 = pipeline("image-classification",
                         model="prithivMLmods/Deep-Fake-Detector-Model",
                         cache_dir=CACHE)
    except Exception:
        logger.warning("SSTGNN backup detector unavailable")
    _mesh = mp.solutions.face_mesh.FaceMesh(
        static_image_mode=True, max_num_faces=1, refine_landmarks=True)
    logger.info("SSTGNN models ready")


def _fake_prob(preds: list[dict]) -> float:
    fake_kw = {"fake", "deepfake", "artificial", "generated", "ai"}
    return max(
        (p["score"] for p in preds
         if any(k in p["label"].lower() for k in fake_kw)),
        default=0.5,
    )


class SSTGNNEngine:

    def _ensure(self):
        with _lock:
            _load()

    def run(self, image: Image.Image) -> EngineResult:
        self._ensure()
        if image.mode != "RGB":
            image = image.convert("RGB")

        scores = []
        try:
            scores.append(_fake_prob(_det1(image)) * 0.6)
        except Exception as e:
            logger.warning(f"SSTGNN det1 error: {e}")
        if _det2:
            try:
                scores.append(_fake_prob(_det2(image)) * 0.4)
            except Exception as e:
                logger.warning(f"SSTGNN det2 error: {e}")

        if not scores:
            return EngineResult(engine="sstgnn", verdict="UNKNOWN",
                                confidence=0.5, explanation="All detectors failed.")

        cnn = sum(scores) / (0.6 if len(scores) == 1 else 1.0)
        graph = self._geometry_score(np.array(image))
        final = float(np.clip(cnn * 0.7 + graph * 0.3, 0.0, 1.0))

        return EngineResult(
            engine="sstgnn",
            verdict="FAKE" if final > 0.5 else "REAL",
            confidence=final,
            explanation=f"CNN {cnn:.2f}, geometric graph anomaly {graph:.2f}.",
        )

    def _geometry_score(self, frame: np.ndarray) -> float:
        try:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            res = _mesh.process(rgb)
            if not res.multi_face_landmarks:
                return 0.3
            h, w = frame.shape[:2]
            lms = res.multi_face_landmarks[0].landmark
            idxs = list(range(0, 468, 7))[:68]
            pts  = np.array([[lms[i].x * w, lms[i].y * h] for i in idxs])
            tri  = Delaunay(pts)
            areas = []
            for s in tri.simplices:
                a, b, c = pts[s]
                areas.append(abs(np.cross(b - a, c - a)) / 2)
            areas = np.array(areas)
            cv_score = float(np.std(areas) / (np.mean(areas) + 1e-9))
            return float(np.clip((cv_score - 0.8) / 1.5, 0.0, 1.0))
        except Exception as e:
            logger.warning(f"Geometry score error: {e}")
            return 0.3

    def run_video(self, frames: list[np.ndarray]) -> EngineResult:
        self._ensure()
        if not frames:
            return EngineResult(engine="sstgnn", verdict="UNKNOWN",
                                confidence=0.5, explanation="No frames.")
        sample = frames[::6] or [frames[0]]
        results = [self.run(Image.fromarray(f)) for f in sample]
        avg = float(np.mean([r.confidence for r in results]))
        return EngineResult(
            engine="sstgnn",
            verdict="FAKE" if avg > 0.5 else "REAL",
            confidence=avg,
            explanation=f"Frame-sampled SSTGNN average {avg:.2f} over {len(sample)} frames.",
        )

Fusion β€” src/fusion/fuser.py

import numpy as np
from src.types import EngineResult

ENGINE_WEIGHTS = {
    "fingerprint": 0.45,
    "coherence":   0.35,
    "sstgnn":      0.20,
}

ENGINE_WEIGHTS_VIDEO = {
    "fingerprint": 0.30,
    "coherence":   0.50,
    "sstgnn":      0.20,
}

ATTRIBUTION_PRIORITY = {"fingerprint": 1, "sstgnn": 2, "coherence": 3}


def fuse(
    results: list[EngineResult],
    is_video: bool = False,
) -> tuple[str, float, str]:
    """Returns (verdict, confidence, attributed_generator)."""
    weights = ENGINE_WEIGHTS_VIDEO if is_video else ENGINE_WEIGHTS
    active  = [r for r in results if r.verdict != "UNKNOWN"]

    if not active:
        return "UNKNOWN", 0.5, "unknown_gan"

    wf = sum(r.confidence * weights.get(r.engine, 0.1)
             for r in active if r.verdict == "FAKE")
    wr = sum((1 - r.confidence) * weights.get(r.engine, 0.1)
             for r in active if r.verdict == "REAL")

    fake_prob = float(np.clip(wf / (wf + wr + 1e-9), 0.0, 1.0))
    verdict   = "FAKE" if fake_prob > 0.5 else "REAL"

    generator = "real"
    if verdict == "FAKE":
        for r in sorted(active, key=lambda r: ATTRIBUTION_PRIORITY.get(r.engine, 9)):
            if r.attributed_generator and r.attributed_generator != "real":
                generator = r.attributed_generator
                break
        if generator == "real":
            generator = "unknown_gan"

    return verdict, fake_prob, generator

API β€” src/api/main.py

import asyncio, io, logging, os, time
from pathlib import Path

import cv2, numpy as np
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from PIL import Image

from src.engines.fingerprint.engine import FingerprintEngine
from src.engines.coherence.engine    import CoherenceEngine
from src.engines.sstgnn.engine       import SSTGNNEngine
from src.explainability.explainer    import explain
from src.fusion.fuser                import fuse
from src.services.inference_router   import route_inference
from src.types                       import DetectionResponse

logger = logging.getLogger(__name__)

app = FastAPI(title="GenAI-DeepDetect", version="1.0.0")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], allow_methods=["*"], allow_headers=["*"],
)

_fp = FingerprintEngine()
_co = CoherenceEngine()
_st = SSTGNNEngine()

MAX_MB     = int(os.environ.get("MAX_VIDEO_SIZE_MB", 100))
MAX_FRAMES = int(os.environ.get("MAX_VIDEO_FRAMES",  300))

IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp", "image/bmp"}
VIDEO_TYPES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm"}


def _extract_frames(path: str) -> list[np.ndarray]:
    cap = cv2.VideoCapture(path)
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    step  = max(1, total // MAX_FRAMES)
    frames, i = [], 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if i % step == 0:
            frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        i += 1
    cap.release()
    return frames[:MAX_FRAMES]


@app.on_event("startup")
async def preload():
    logger.info("Preloading models...")
    await asyncio.gather(
        asyncio.to_thread(_fp._ensure),
        asyncio.to_thread(_co._ensure),
        asyncio.to_thread(_st._ensure),
    )
    logger.info("All models preloaded")


@app.get("/health")
async def health():
    return {"status": "ok"}


@app.post("/detect/image", response_model=DetectionResponse)
async def detect_image(file: UploadFile = File(...)):
    t0 = time.monotonic()
    if file.content_type not in IMAGE_TYPES:
        raise HTTPException(400, f"Unsupported type: {file.content_type}")
    data = await file.read()
    if len(data) > MAX_MB * 1024 * 1024:
        raise HTTPException(413, "File too large")

    image = Image.open(io.BytesIO(data)).convert("RGB")
    fp, co, st = await asyncio.gather(
        asyncio.to_thread(_fp.run, image),
        asyncio.to_thread(_co.run, image),
        asyncio.to_thread(_st.run, image),
    )
    ms = (time.monotonic() - t0) * 1000
    for r in [fp, co, st]:
        r.processing_time_ms = ms

    verdict, conf, gen = fuse([fp, co, st], is_video=False)
    expl = await asyncio.to_thread(explain, verdict, conf, [fp, co, st], gen)

    return DetectionResponse(
        verdict=verdict, confidence=conf, attributed_generator=gen,
        explanation=expl, processing_time_ms=ms,
        engine_breakdown=[fp, co, st],
    )


@app.post("/detect/video", response_model=DetectionResponse)
async def detect_video(file: UploadFile = File(...)):
    t0 = time.monotonic()
    if file.content_type not in VIDEO_TYPES:
        raise HTTPException(400, f"Unsupported type: {file.content_type}")
    data = await file.read()
    if len(data) > MAX_MB * 1024 * 1024:
        raise HTTPException(413, "File too large")

    # Route heavy videos to RunPod
    if len(data) > 20 * 1024 * 1024:
        return await route_inference(data, "video")

    tmp = Path(f"/tmp/vid_{int(time.time()*1000)}.mp4")
    tmp.write_bytes(data)
    try:
        frames = await asyncio.to_thread(_extract_frames, str(tmp))
    finally:
        tmp.unlink(missing_ok=True)

    if not frames:
        raise HTTPException(422, "Could not extract frames")

    fp, co, st = await asyncio.gather(
        asyncio.to_thread(_fp.run_video, frames),
        asyncio.to_thread(_co.run_video, frames),
        asyncio.to_thread(_st.run_video, frames),
    )
    ms = (time.monotonic() - t0) * 1000
    for r in [fp, co, st]:
        r.processing_time_ms = ms

    verdict, conf, gen = fuse([fp, co, st], is_video=True)
    expl = await asyncio.to_thread(explain, verdict, conf, [fp, co, st], gen)

    return DetectionResponse(
        verdict=verdict, confidence=conf, attributed_generator=gen,
        explanation=expl, processing_time_ms=ms,
        engine_breakdown=[fp, co, st],
    )

Types β€” src/types.py

from __future__ import annotations
from typing import Optional
from pydantic import BaseModel

GENERATOR_LABELS = {
    0: "real",
    1: "unknown_gan",
    2: "stable_diffusion",
    3: "midjourney",
    4: "dall_e",
    5: "flux",
    6: "firefly",
    7: "imagen",
}


class EngineResult(BaseModel):
    engine: str
    verdict: str                            # FAKE | REAL | UNKNOWN
    confidence: float                       # 0–1
    attributed_generator: Optional[str] = None
    explanation: str = ""
    processing_time_ms: float = 0.0


class DetectionResponse(BaseModel):
    verdict: str
    confidence: float
    attributed_generator: str
    explanation: str
    processing_time_ms: float
    engine_breakdown: list[EngineResult]

Inference Router β€” src/services/inference_router.py

import base64, logging, os
import httpx
from src.types import DetectionResponse

logger = logging.getLogger(__name__)

RUNPOD_KEY = os.environ.get("RUNPOD_API_KEY", "")
RUNPOD_EID = os.environ.get("RUNPOD_ENDPOINT_ID", "")


async def route_inference(data: bytes, media_type: str) -> DetectionResponse:
    if not RUNPOD_KEY or not RUNPOD_EID:
        raise RuntimeError(
            "RunPod not configured. Set RUNPOD_API_KEY and RUNPOD_ENDPOINT_ID."
        )
    url     = f"https://api.runpod.ai/v2/{RUNPOD_EID}/runsync"
    payload = {"input": {"data": base64.b64encode(data).decode(),
                         "media_type": media_type}}
    async with httpx.AsyncClient(timeout=120) as client:
        resp = await client.post(url, json=payload,
                                 headers={"Authorization": f"Bearer {RUNPOD_KEY}"})
        resp.raise_for_status()
        return DetectionResponse(**resp.json()["output"])

RunPod Handler β€” runpod_handler.py (project root)

import base64, io, os, tempfile
import runpod, cv2, numpy as np
from PIL import Image

os.environ.setdefault("MODEL_CACHE_DIR", "/tmp/models")

from src.engines.fingerprint.engine import FingerprintEngine
from src.engines.coherence.engine    import CoherenceEngine
from src.engines.sstgnn.engine       import SSTGNNEngine
from src.explainability.explainer    import explain
from src.fusion.fuser                import fuse

_fp = FingerprintEngine()
_co = CoherenceEngine()
_st = SSTGNNEngine()


def handler(job: dict) -> dict:
    inp        = job["input"]
    raw        = base64.b64decode(inp["data"])
    media_type = inp.get("media_type", "image")

    if media_type == "image":
        image = Image.open(io.BytesIO(raw)).convert("RGB")
        fp = _fp.run(image)
        co = _co.run(image)
        st = _st.run(image)
        verdict, conf, gen = fuse([fp, co, st], is_video=False)
    else:
        with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
            f.write(raw)
            tmp = f.name
        try:
            cap = cv2.VideoCapture(tmp)
            frames, i = [], 0
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                if i % 4 == 0:
                    frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                i += 1
            cap.release()
        finally:
            os.unlink(tmp)
        fp = _fp.run_video(frames)
        co = _co.run_video(frames)
        st = _st.run_video(frames)
        verdict, conf, gen = fuse([fp, co, st], is_video=True)

    expl = explain(verdict, conf, [fp, co, st], gen)

    return {
        "verdict": verdict,
        "confidence": conf,
        "attributed_generator": gen,
        "explanation": expl,
        "processing_time_ms": 0.0,
        "engine_breakdown": [r.model_dump() for r in [fp, co, st]],
    }


runpod.serverless.start({"handler": handler})

Hosting

Option A β€” HuggingFace Spaces (Free, CPU, primary API host)

spaces/app.py:

import os
os.environ.setdefault("MODEL_CACHE_DIR", "/data/models")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

import uvicorn
from src.api.main import app

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860, workers=1)

Root README.md front-matter (Hugging Face reads this file):

---
title: GenAI DeepDetect
emoji: "πŸ”"
colorFrom: gray
colorTo: indigo
sdk: docker
app_port: 7860
pinned: false
---

Dockerfile (replace existing):

FROM python:3.11-slim

RUN apt-get update && apt-get install -y \
    ffmpeg libgl1-mesa-glx libglib2.0-0 libsm6 libxext6 libxrender-dev \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENV MODEL_CACHE_DIR=/data/models
ENV TOKENIZERS_PARALLELISM=false
ENV PYTHONUNBUFFERED=1

EXPOSE 7860
CMD ["python", "spaces/app.py"]

Secrets to set in HF Spaces (Settings β†’ Repository secrets):

GEMINI_API_KEY
HF_TOKEN
RUNPOD_API_KEY
RUNPOD_ENDPOINT_ID

Free tier: 2 vCPU, 16GB RAM, persistent /data volume. Models cache to /data/models and survive container restarts. Cold start first request: ~90s. Warm: <5s. GPU upgrade: T4 at $0.05/hr if needed.


Option B β€” RunPod Serverless (GPU, heavy video, low cost)

  1. RunPod β†’ Serverless β†’ New Endpoint
  2. Select template: runpod/pytorch:2.1.0-py3.10-cuda11.8.0-devel-ubuntu22.04
  3. Set handler file: runpod_handler.py
  4. Min replicas: 0, Max: 3
  5. GPU: RTX 3090 or A40 (cheapest that works)
  6. Set env vars: GEMINI_API_KEY, HF_TOKEN, MODEL_CACHE_DIR=/tmp/models

Cost: ~$0.0002/request on H100. Billed per second. Min workers = 0 means you pay nothing when idle β€” cold start is ~15s.

When it triggers: inference_router.py automatically sends videos >20MB to RunPod. Images always run on HF Spaces.


Frontend β€” frontend/lib/api.ts

const BASE_URL =
    process.env.NEXT_PUBLIC_API_URL ??
    'https://YOUR-USERNAME-genai-deepdetect.hf.space';

export type GeneratorLabel =
    | 'real'
    | 'unknown_gan'
    | 'stable_diffusion'
    | 'midjourney'
    | 'dall_e'
    | 'flux'
    | 'firefly'
    | 'imagen';

export interface EngineResult {
    engine: string;
    verdict: 'FAKE' | 'REAL' | 'UNKNOWN';
    confidence: number;
    attributed_generator: GeneratorLabel | null;
    explanation: string;
    processing_time_ms: number;
}

export interface DetectionResponse {
    verdict: 'FAKE' | 'REAL' | 'UNKNOWN';
    confidence: number;
    attributed_generator: GeneratorLabel;
    explanation: string;
    processing_time_ms: number;
    engine_breakdown: EngineResult[];
}

async function _post(endpoint: string, file: File): Promise<DetectionResponse> {
    const form = new FormData();
    form.append('file', file);
    const res = await fetch(`${BASE_URL}${endpoint}`, {
        method: 'POST',
        body: form,
    });
    if (!res.ok) {
        const err = await res.text();
        throw new Error(`Detection failed (${res.status}): ${err}`);
    }
    return res.json();
}

export const detectImage = (file: File) => _post('/detect/image', file);
export const detectVideo = (file: File) => _post('/detect/video', file);

Set in frontend/.env.local:

NEXT_PUBLIC_API_URL=https://your-username-genai-deepdetect.hf.space

Dependencies β€” requirements.txt

# API
fastapi>=0.111.0
uvicorn[standard]>=0.29.0
python-multipart>=0.0.9
aiofiles>=23.2.1
httpx>=0.27.0
pydantic>=2.7.0

# ML β€” fingerprint
transformers>=4.40.0
timm>=1.0.0
torch>=2.1.0
torchvision>=0.16.0

# ML β€” coherence
facenet-pytorch>=2.5.3
mediapipe>=0.10.14
opencv-python-headless>=4.9.0

# ML β€” sstgnn
torch-geometric>=2.5.0
scipy>=1.13.0

# Explainability β€” Gemini
google-generativeai>=0.8.0

# HuggingFace
huggingface-hub>=0.23.0

# RunPod serverless handler
runpod>=1.6.0

# Continual learning
apscheduler>=3.10.4

# Utils
Pillow>=10.3.0
numpy>=1.26.0

Bug Checklist β€” Fix Before Running

src/types.py

  • EngineResult missing attributed_generator: Optional[str] = None β€” add it
  • DetectionResponse.engine_breakdown typed as list[dict] β€” change to list[EngineResult]

src/fusion/fuser.py

  • fuse() returns 2-tuple β€” update to return 3-tuple (verdict, conf, generator)
  • Update all callers in main.py accordingly

src/explainability/explainer.py

  • References anthropic SDK β€” replace entirely with Gemini implementation above

src/api/main.py

  • Missing CORS middleware β€” add before deploy
  • Missing @app.on_event("startup") preload β€” add it
  • Missing _extract_frames() for video β€” add it
  • detect_video likely missing or stubbed β€” implement fully

src/engines/*/ directories

  • All three engine files are stubs or empty β€” replace with full code above

spaces/app.py

  • Likely empty β€” add uvicorn entrypoint

Dockerfile

  • Check for ffmpeg and libgl1-mesa-glx β€” required for MediaPipe + OpenCV
  • Check EXPOSE 7860 matches HF Spaces app_port

src/services/inference_router.py

  • Likely stub β€” implement route_inference() with RunPod httpx call

Code Standards

  • Lazy-load all models behind a threading lock β€” never load at module import
  • Wrap all model inference in asyncio.to_thread() β€” never block the event loop
  • Type hints on every function
  • logging.getLogger(__name__) not print()
  • os.environ.get() not hardcoded secrets
  • Pydantic BaseModel for all response schemas
  • Next.js: pages router only β€” no app/ dir, no src/ dir
  • Font: Plus Jakarta Sans or DM Sans β€” never Inter, Roboto, Arial
  • Border radius: 22% icon containers, 18px cards, 12px buttons

MCP Usage Rules

Every coding session must follow these rules:

1. Adding a dependency?
   β†’ context7: resolve-library-id <package>
   β†’ context7: query-docs <package> <specific feature>

2. Using any HF model?
   β†’ huggingface: model_details <model-id>
   β†’ confirm size, license, task, input format

3. Modifying engine logic?
   β†’ context7: query-docs transformers pipeline (fingerprint)
   β†’ context7: query-docs mediapipe face_mesh (coherence)
   β†’ context7: query-docs torch-geometric GCNConv (sstgnn)
   β†’ context7: query-docs facenet-pytorch (coherence embeddings)

4. Modifying Gemini calls?
   β†’ context7: query-docs google-generativeai GenerativeModel

5. Modifying RunPod handler?
   β†’ context7: query-docs runpod serverless handler

6. Modifying FastAPI routes?
   β†’ context7: query-docs fastapi UploadFile

7. Frontend API changes?
   β†’ context7: query-docs next.js pages-router fetch

Friday Deploy Checklist

[ ] pip install -r requirements.txt  (no errors)
[ ] src/types.py  β€” EngineResult has attributed_generator
[ ] src/types.py  β€” DetectionResponse has engine_breakdown: list[EngineResult]
[ ] src/fusion/fuser.py  β€” returns 3-tuple
[ ] src/explainability/explainer.py  β€” uses Gemini, no anthropic import
[ ] src/engines/fingerprint/engine.py  β€” full implementation
[ ] src/engines/coherence/engine.py  β€” full implementation
[ ] src/engines/sstgnn/engine.py  β€” full implementation
[ ] src/api/main.py  β€” CORS + startup preload + video route
[ ] src/services/inference_router.py  β€” RunPod httpx call
[ ] runpod_handler.py  β€” added to project root
[ ] spaces/app.py  β€” uvicorn entrypoint
[ ] Dockerfile  β€” has ffmpeg, libgl1, EXPOSE 7860
[ ] HF Space created + secrets set + pushed
[ ] RunPod endpoint deployed + endpoint ID noted
[ ] frontend/.env.local  β€” NEXT_PUBLIC_API_URL points to HF Space
[ ] Vercel deploy of frontend/

Smoke tests:
[ ] GET /health β†’ {"status":"ok"}
[ ] POST /detect/image (real JPEG) β†’ verdict REAL
[ ] POST /detect/image (AI PNG)    β†’ verdict FAKE
[ ] POST /detect/video (MP4 <20MB) β†’ response within 30s
[ ] POST /detect/video (MP4 >20MB) β†’ routes to RunPod