Spaces:
Running
CLAUDE.md β GenAI-DeepDetect Agent Instructions
Read this file before touching any code. It is the single source of truth for how this repo is structured, what conventions to follow, and what the hard constraints are.
CLAUDE.md β GenAI-DeepDetect
Full implementation guide for AI-assisted development on this project. Read this file before touching any code.
-# CLAUDE.md β GenAI-DeepDetect
Complete implementation guide. Read this before writing any code. All models are 100% pre-trained β no training required, no GPU needed locally.
MCP Tools β Always Use These First
Before writing any code or looking up any API, resolve docs through MCP:
context7: resolve-library-id + query-docs
β use for: transformers, torch, mediapipe, fastapi, torch-geometric,
google-generativeai, facenet-pytorch, opencv, next.js, runpod
huggingface: model_search + model_details + hf_doc_search
β use for: finding model cards, checking input formats, confirming
pipeline task names, verifying checkpoint sizes before using
Rule: Never guess an API signature. Always call context7.query-docs first.
Never use a HF model without calling huggingface.model_details to confirm it
exists, check its license, and verify its input format.
Project Skill And Memory Policy
For work in this repository, always prefer the installed Claude Code skill pack when a relevant skill applies instead of ad hoc workflows.
Always-on user preference: use Awesome Claude Code workflows with Superpowers + Claude Mem by default, and execute implementation steps automatically unless the user explicitly asks for planning-only mode.
At task start, check Superpowers process skills first (for example:
using-superpowers,brainstorming,systematic-debugging,verification-before-completion) and apply the relevant ones before coding.For memory-aware tasks, use Claude Mem (
mem-search) automatically to recall prior decisions, fixes, and session history when that context can reduce risk or rework.If there is a conflict between this default behavior and a direct user instruction in the current chat, follow the direct user instruction.
Use
context7-mcpfor any library, framework, SDK, or API question, and before changing code that depends on external packages or hosted services.Use
mem-search/ claude-mem whenever the user asks about previous sessions, prior fixes, earlier decisions, or "how we solved this before".When using claude-mem, scope searches to project name
genai-deepdetectunless the user explicitly asks for a broader search.Keep following the repo-specific MCP rules below even when a general-purpose skill also applies.
Recommended companion skills for this project:
systematic-debuggingfor bugs, failing tests, or unexpected runtime behaviorverification-before-completionbefore claiming a fix is donesecurity-reviewfor secrets, external APIs, uploads, and auth-sensitive changes
Project Goal
Multimodal deepfake and AI-generated content detector.
- Input: image (JPEG/PNG/WEBP) or video (MP4/MOV/AVI, max 100MB)
- Output:
DetectionResponseβ verdict, confidence, generator attribution, natural-language explanation, per-engine breakdown
All inference runs on pre-trained HuggingFace checkpoints. No training scripts need to run for the system to work.
Architecture
Request (image/video)
β
βΌ
FastAPI src/api/main.py
β
βββ FingerprintEngine (image artifacts, generator attribution)
βββ CoherenceEngine (lip-sync, biological coherence)
βββ SSTGNNEngine (landmark spatio-temporal graph)
β
βΌ
Fuser src/fusion/fuser.py
β
βΌ
Explainer src/explainability/explainer.py β Gemini API
β
βΌ
DetectionResponse src/types.py
All Pre-Trained Models
Every model downloads via transformers.pipeline() or from_pretrained(). Zero
training. Zero fine-tuning.
| Engine | Model | HF ID | Size | Task |
|---|---|---|---|---|
| Fingerprint | SDXL Detector | Organika/sdxl-detector |
~330MB | binary fake/real |
| Fingerprint | CLIP ViT-L/14 | openai/clip-vit-large-patch14 |
~3.5GB | generator attribution |
| Fingerprint | AI Image Detector | haywoodsloan/ai-image-detector-deploy |
~90MB | ensemble backup |
| SSTGNN | DeepFake Detector | dima806/deepfake_vs_real_image_detection |
~100MB | ResNet50 per-frame |
| SSTGNN | Deep Fake Detector | prithivMLmods/Deep-Fake-Detector-Model |
~80MB | EfficientNet-B4 backup |
| Coherence | MediaPipe Face Mesh | bundled in mediapipe package |
~10MB | landmark extraction |
| Coherence | FaceNet VGGFace2 | facenet-pytorch (auto-downloads) |
~100MB | temporal embeddings |
| Coherence | SyncNet | Junhua-Zhu/SyncNet |
~50MB | lip-sync offset |
CLIP is the largest at 3.5GB β preload at startup, never reload. Everything else fits in HF Spaces 16GB RAM free tier.
Environment Variables
# Required
GEMINI_API_KEY=... # Google AI Studio β free tier works
HF_TOKEN=hf_... # HuggingFace read token (free)
# Hosting
RUNPOD_API_KEY=... # RunPod serverless (heavy video)
RUNPOD_ENDPOINT_ID=... # your deployed endpoint ID
# Paths
MODEL_CACHE_DIR=/data/models # HF Spaces: /data/models (persists)
# local dev: /tmp/models
# Optional
MAX_VIDEO_FRAMES=300
MAX_VIDEO_SIZE_MB=100
INFERENCE_BACKEND=local # "local" | "runpod"
TOKENIZERS_PARALLELISM=false
Set all secrets in:
- HF Spaces β Settings β Repository secrets
- RunPod β Secrets tab
- Vercel β Environment Variables
Gemini API β Explainability Engine
Primary model: gemini-2.5-pro-preview-03-25 Fallback model:
gemini-1.5-pro-002
Both available on Google AI Studio free tier (15 req/min, 1M tokens/day). Always
query context7.query-docs google-generativeai GenerativeModel before modifying
this file.
src/explainability/explainer.py
import os
import logging
import google.generativeai as genai
from src.types import EngineResult
logger = logging.getLogger(__name__)
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
SYSTEM_INSTRUCTION = (
"You are a deepfake forensics analyst writing reports for security professionals. "
"Given detection engine outputs, write exactly 2-3 sentences in plain English "
"explaining why the content is real or fake. "
"Be specific β name the strongest signals. "
"Use direct declarative sentences. No hedging. No 'I think'. "
"Output only the explanation text, nothing else."
)
_model = None
def _get_model() -> genai.GenerativeModel:
global _model
if _model is None:
for name in ("gemini-2.5-pro-preview-03-25", "gemini-1.5-pro-002"):
try:
_model = genai.GenerativeModel(
model_name=name,
system_instruction=SYSTEM_INSTRUCTION,
)
logger.info(f"Gemini model loaded: {name}")
break
except Exception as e:
logger.warning(f"Gemini {name} unavailable: {e}")
return _model
def explain(
verdict: str,
confidence: float,
engine_results: list[EngineResult],
generator: str,
) -> str:
breakdown = "\n".join(
f"- {r.engine}: {r.verdict} ({r.confidence:.0%}) β {r.explanation}"
for r in engine_results
)
prompt = (
f"Verdict: {verdict} ({confidence:.0%} confidence)\n"
f"Attributed generator: {generator}\n"
f"Engine breakdown:\n{breakdown}\n\n"
"Write the forensics explanation."
)
try:
model = _get_model()
if model is None:
raise RuntimeError("No Gemini model available")
response = model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Gemini explain failed: {e}")
top = engine_results[0] if engine_results else None
return (
f"Content classified as {verdict} with {confidence:.0%} confidence. "
f"{'Primary signal from ' + top.engine + ' engine.' if top else ''}"
)
Engine Implementations
FingerprintEngine β src/engines/fingerprint/engine.py
Query context7 for transformers pipeline image-classification and
huggingface model_details Organika/sdxl-detector before modifying.
import os, logging, threading
import numpy as np
from PIL import Image
from transformers import pipeline, CLIPModel, CLIPProcessor
import torch
from src.types import EngineResult
logger = logging.getLogger(__name__)
CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")
GENERATOR_PROMPTS = {
"real": "a real photograph taken by a camera with natural lighting",
"unknown_gan": "a GAN-generated image with checkerboard artifacts and blurry edges",
"stable_diffusion": "a Stable Diffusion image with painterly soft textures",
"midjourney": "a Midjourney image with cinematic dramatic lighting and hyperdetail",
"dall_e": "a DALL-E image with clean illustration-style and smooth gradients",
"flux": "a FLUX model image with photorealistic precision and sharp detail",
"firefly": "an Adobe Firefly image with commercial stock-photo aesthetics",
"imagen": "a Google Imagen image with precise photorealistic rendering",
}
_lock = threading.Lock()
_detector = _clip_model = _clip_processor = _backup = None
def _load():
global _detector, _clip_model, _clip_processor, _backup
if _detector is not None:
return
logger.info("Loading fingerprint models...")
_detector = pipeline("image-classification",
model="Organika/sdxl-detector", cache_dir=CACHE)
_clip_model = CLIPModel.from_pretrained(
"openai/clip-vit-large-patch14", cache_dir=CACHE)
_clip_processor = CLIPProcessor.from_pretrained(
"openai/clip-vit-large-patch14", cache_dir=CACHE)
_clip_model.eval()
try:
_backup = pipeline("image-classification",
model="haywoodsloan/ai-image-detector-deploy",
cache_dir=CACHE)
except Exception:
logger.warning("Backup fingerprint detector unavailable")
logger.info("Fingerprint models ready")
class FingerprintEngine:
def _ensure(self):
with _lock:
_load()
def run(self, image: Image.Image) -> EngineResult:
self._ensure()
if image.mode != "RGB":
image = image.convert("RGB")
# Binary fake score
FAKE_LABELS = {"artificial", "fake", "ai-generated", "generated"}
try:
preds = _detector(image)
fake_score = max(
(p["score"] for p in preds if p["label"].lower() in FAKE_LABELS),
default=0.5,
)
except Exception as e:
logger.warning(f"Primary detector error: {e}")
fake_score = 0.5
# Ensemble backup
if _backup is not None:
try:
bp = _backup(image)
bk = max((p["score"] for p in bp
if p["label"].lower() in FAKE_LABELS), default=0.5)
fake_score = fake_score * 0.6 + bk * 0.4
except Exception:
pass
# CLIP zero-shot generator attribution
generator = "real"
try:
texts = list(GENERATOR_PROMPTS.values())
inputs = _clip_processor(
text=texts, images=image,
return_tensors="pt", padding=True, truncation=True,
)
with torch.no_grad():
logits = _clip_model(**inputs).logits_per_image[0]
probs = logits.softmax(dim=0).numpy()
generator = list(GENERATOR_PROMPTS.keys())[int(np.argmax(probs))]
except Exception as e:
logger.warning(f"CLIP attribution error: {e}")
if fake_score > 0.65 and generator == "real":
generator = "unknown_gan"
return EngineResult(
engine="fingerprint",
verdict="FAKE" if fake_score > 0.5 else "REAL",
confidence=float(fake_score),
attributed_generator=generator,
explanation=f"Binary score {fake_score:.2f}; attributed to {generator}.",
)
def run_video(self, frames: list) -> EngineResult:
if not frames:
return EngineResult(engine="fingerprint", verdict="UNKNOWN",
confidence=0.5, explanation="No frames.")
keyframes = frames[::8] or [frames[0]]
results = [self.run(Image.fromarray(f)) for f in keyframes]
avg = float(np.mean([r.confidence for r in results]))
gens = [r.attributed_generator for r in results]
top_gen = max(set(gens), key=gens.count)
return EngineResult(
engine="fingerprint",
verdict="FAKE" if avg > 0.5 else "REAL",
confidence=avg,
attributed_generator=top_gen,
explanation=f"Keyframe average {avg:.2f} over {len(keyframes)} frames.",
)
CoherenceEngine β src/engines/coherence/engine.py
Query context7.query-docs mediapipe face_mesh and
context7.query-docs facenet-pytorch InceptionResnetV1 before modifying.
import logging, threading, cv2
import numpy as np
from PIL import Image
from facenet_pytorch import MTCNN, InceptionResnetV1
import mediapipe as mp
from src.types import EngineResult
logger = logging.getLogger(__name__)
_lock = threading.Lock()
_mtcnn = _resnet = _face_mesh = None
def _load():
global _mtcnn, _resnet, _face_mesh
if _mtcnn is not None:
return
logger.info("Loading coherence models...")
_mtcnn = MTCNN(keep_all=False, device="cpu")
_resnet = InceptionResnetV1(pretrained="vggface2").eval()
_face_mesh = mp.solutions.face_mesh.FaceMesh(
static_image_mode=False, max_num_faces=1,
refine_landmarks=True, min_detection_confidence=0.5,
)
logger.info("Coherence models ready")
class CoherenceEngine:
def _ensure(self):
with _lock:
_load()
def run(self, image: Image.Image) -> EngineResult:
self._ensure()
frame = np.array(image.convert("RGB"))
score = self._image_score(frame)
return EngineResult(
engine="coherence",
verdict="FAKE" if score > 0.5 else "REAL",
confidence=float(score),
explanation=f"Geometric coherence anomaly {score:.2f} (image mode).",
)
def _image_score(self, frame: np.ndarray) -> float:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if frame.shape[2] == 3 else frame
res = _face_mesh.process(rgb)
if not res.multi_face_landmarks:
return 0.35 # no face detected
lms = res.multi_face_landmarks[0].landmark
h, w = frame.shape[:2]
def pt(i):
return np.array([lms[i].x * w, lms[i].y * h])
# Eye width asymmetry β deepfakes often mismatched
lew = np.linalg.norm(pt(33) - pt(133))
rew = np.linalg.norm(pt(362) - pt(263))
eye_ratio = min(lew, rew) / (max(lew, rew) + 1e-9)
eye_score = max(0.0, (0.85 - eye_ratio) / 0.3)
# Ear symmetry from nose tip
nose = pt(1)
lr = min(np.linalg.norm(nose - pt(234)), np.linalg.norm(nose - pt(454)))
rr = max(np.linalg.norm(nose - pt(234)), np.linalg.norm(nose - pt(454)))
ear_score = max(0.0, (0.90 - lr / (rr + 1e-9)) / 0.2)
return float(np.clip(eye_score * 0.5 + ear_score * 0.5, 0.0, 1.0))
def run_video(self, frames: list[np.ndarray]) -> EngineResult:
self._ensure()
if len(frames) < 4:
r = self.run(Image.fromarray(frames[0]))
r.explanation = "Too few frames for temporal analysis."
return r
delta = self._embedding_variance(frames)
jerk = self._landmark_jerk(frames)
blink = self._blink_anomaly(frames)
score = float(np.clip(delta * 0.45 + jerk * 0.35 + blink * 0.20, 0.0, 1.0))
return EngineResult(
engine="coherence",
verdict="FAKE" if score > 0.5 else "REAL",
confidence=score,
explanation=(
f"Embedding variance {delta:.2f}, "
f"landmark jerk {jerk:.2f}, "
f"blink anomaly {blink:.2f}."
),
)
def _embedding_variance(self, frames: list[np.ndarray]) -> float:
import torch
embeddings = []
for frame in frames[::4]:
try:
face = _mtcnn(Image.fromarray(frame))
if face is not None:
with torch.no_grad():
e = _resnet(face.unsqueeze(0)).numpy()[0]
embeddings.append(e)
except Exception:
continue
if len(embeddings) < 2:
return 0.5
deltas = [np.linalg.norm(embeddings[i+1] - embeddings[i])
for i in range(len(embeddings)-1)]
return float(np.clip(np.var(deltas) * 8, 0.0, 1.0))
def _landmark_jerk(self, frames: list[np.ndarray]) -> float:
positions = []
for frame in frames[::2]:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
res = _face_mesh.process(rgb)
if res.multi_face_landmarks:
lm = res.multi_face_landmarks[0].landmark
positions.append([lm[1].x, lm[1].y])
if len(positions) < 4:
return 0.3
pos = np.array(positions)
jerk = np.diff(pos, n=3, axis=0)
return float(np.clip((np.mean(np.linalg.norm(jerk, axis=1)) - 0.002) / 0.008,
0.0, 1.0))
def _blink_anomaly(self, frames: list[np.ndarray]) -> float:
LEFT_EYE = [33, 160, 158, 133, 153, 144]
RIGHT_EYE = [362, 385, 387, 263, 373, 380]
def ear(lms, idx, h, w):
pts = [np.array([lms[i].x * w, lms[i].y * h]) for i in idx]
a = np.linalg.norm(pts[1] - pts[5])
b = np.linalg.norm(pts[2] - pts[4])
c = np.linalg.norm(pts[0] - pts[3])
return (a + b) / (2.0 * c + 1e-9)
ears = []
for frame in frames:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
res = _face_mesh.process(rgb)
if res.multi_face_landmarks:
lm = res.multi_face_landmarks[0].landmark
h, w = frame.shape[:2]
ears.append((ear(lm, LEFT_EYE, h, w) + ear(lm, RIGHT_EYE, h, w)) / 2)
if len(ears) < 10:
return 0.3
arr = np.array(ears)
blinks = int(np.sum(np.diff((arr < 0.21).astype(int)) > 0))
bpm = blinks / (len(ears) / 25) * 60
if 8 <= bpm <= 25:
return 0.15
if bpm < 3 or bpm > 35:
return 0.80
return 0.45
SSTGNNEngine β src/engines/sstgnn/engine.py
Query context7.query-docs torch-geometric GCNConv and
huggingface model_details dima806/deepfake_vs_real_image_detection before
modifying.
import logging, os, threading
import numpy as np
import cv2
from PIL import Image
from transformers import pipeline
import mediapipe as mp
from scipy.spatial import Delaunay
from src.types import EngineResult
logger = logging.getLogger(__name__)
CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")
_lock = threading.Lock()
_det1 = _det2 = _mesh = None
def _load():
global _det1, _det2, _mesh
if _det1 is not None:
return
logger.info("Loading SSTGNN models...")
_det1 = pipeline("image-classification",
model="dima806/deepfake_vs_real_image_detection",
cache_dir=CACHE)
try:
_det2 = pipeline("image-classification",
model="prithivMLmods/Deep-Fake-Detector-Model",
cache_dir=CACHE)
except Exception:
logger.warning("SSTGNN backup detector unavailable")
_mesh = mp.solutions.face_mesh.FaceMesh(
static_image_mode=True, max_num_faces=1, refine_landmarks=True)
logger.info("SSTGNN models ready")
def _fake_prob(preds: list[dict]) -> float:
fake_kw = {"fake", "deepfake", "artificial", "generated", "ai"}
return max(
(p["score"] for p in preds
if any(k in p["label"].lower() for k in fake_kw)),
default=0.5,
)
class SSTGNNEngine:
def _ensure(self):
with _lock:
_load()
def run(self, image: Image.Image) -> EngineResult:
self._ensure()
if image.mode != "RGB":
image = image.convert("RGB")
scores = []
try:
scores.append(_fake_prob(_det1(image)) * 0.6)
except Exception as e:
logger.warning(f"SSTGNN det1 error: {e}")
if _det2:
try:
scores.append(_fake_prob(_det2(image)) * 0.4)
except Exception as e:
logger.warning(f"SSTGNN det2 error: {e}")
if not scores:
return EngineResult(engine="sstgnn", verdict="UNKNOWN",
confidence=0.5, explanation="All detectors failed.")
cnn = sum(scores) / (0.6 if len(scores) == 1 else 1.0)
graph = self._geometry_score(np.array(image))
final = float(np.clip(cnn * 0.7 + graph * 0.3, 0.0, 1.0))
return EngineResult(
engine="sstgnn",
verdict="FAKE" if final > 0.5 else "REAL",
confidence=final,
explanation=f"CNN {cnn:.2f}, geometric graph anomaly {graph:.2f}.",
)
def _geometry_score(self, frame: np.ndarray) -> float:
try:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
res = _mesh.process(rgb)
if not res.multi_face_landmarks:
return 0.3
h, w = frame.shape[:2]
lms = res.multi_face_landmarks[0].landmark
idxs = list(range(0, 468, 7))[:68]
pts = np.array([[lms[i].x * w, lms[i].y * h] for i in idxs])
tri = Delaunay(pts)
areas = []
for s in tri.simplices:
a, b, c = pts[s]
areas.append(abs(np.cross(b - a, c - a)) / 2)
areas = np.array(areas)
cv_score = float(np.std(areas) / (np.mean(areas) + 1e-9))
return float(np.clip((cv_score - 0.8) / 1.5, 0.0, 1.0))
except Exception as e:
logger.warning(f"Geometry score error: {e}")
return 0.3
def run_video(self, frames: list[np.ndarray]) -> EngineResult:
self._ensure()
if not frames:
return EngineResult(engine="sstgnn", verdict="UNKNOWN",
confidence=0.5, explanation="No frames.")
sample = frames[::6] or [frames[0]]
results = [self.run(Image.fromarray(f)) for f in sample]
avg = float(np.mean([r.confidence for r in results]))
return EngineResult(
engine="sstgnn",
verdict="FAKE" if avg > 0.5 else "REAL",
confidence=avg,
explanation=f"Frame-sampled SSTGNN average {avg:.2f} over {len(sample)} frames.",
)
Fusion β src/fusion/fuser.py
import numpy as np
from src.types import EngineResult
ENGINE_WEIGHTS = {
"fingerprint": 0.45,
"coherence": 0.35,
"sstgnn": 0.20,
}
ENGINE_WEIGHTS_VIDEO = {
"fingerprint": 0.30,
"coherence": 0.50,
"sstgnn": 0.20,
}
ATTRIBUTION_PRIORITY = {"fingerprint": 1, "sstgnn": 2, "coherence": 3}
def fuse(
results: list[EngineResult],
is_video: bool = False,
) -> tuple[str, float, str]:
"""Returns (verdict, confidence, attributed_generator)."""
weights = ENGINE_WEIGHTS_VIDEO if is_video else ENGINE_WEIGHTS
active = [r for r in results if r.verdict != "UNKNOWN"]
if not active:
return "UNKNOWN", 0.5, "unknown_gan"
wf = sum(r.confidence * weights.get(r.engine, 0.1)
for r in active if r.verdict == "FAKE")
wr = sum((1 - r.confidence) * weights.get(r.engine, 0.1)
for r in active if r.verdict == "REAL")
fake_prob = float(np.clip(wf / (wf + wr + 1e-9), 0.0, 1.0))
verdict = "FAKE" if fake_prob > 0.5 else "REAL"
generator = "real"
if verdict == "FAKE":
for r in sorted(active, key=lambda r: ATTRIBUTION_PRIORITY.get(r.engine, 9)):
if r.attributed_generator and r.attributed_generator != "real":
generator = r.attributed_generator
break
if generator == "real":
generator = "unknown_gan"
return verdict, fake_prob, generator
API β src/api/main.py
import asyncio, io, logging, os, time
from pathlib import Path
import cv2, numpy as np
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from PIL import Image
from src.engines.fingerprint.engine import FingerprintEngine
from src.engines.coherence.engine import CoherenceEngine
from src.engines.sstgnn.engine import SSTGNNEngine
from src.explainability.explainer import explain
from src.fusion.fuser import fuse
from src.services.inference_router import route_inference
from src.types import DetectionResponse
logger = logging.getLogger(__name__)
app = FastAPI(title="GenAI-DeepDetect", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_methods=["*"], allow_headers=["*"],
)
_fp = FingerprintEngine()
_co = CoherenceEngine()
_st = SSTGNNEngine()
MAX_MB = int(os.environ.get("MAX_VIDEO_SIZE_MB", 100))
MAX_FRAMES = int(os.environ.get("MAX_VIDEO_FRAMES", 300))
IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp", "image/bmp"}
VIDEO_TYPES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm"}
def _extract_frames(path: str) -> list[np.ndarray]:
cap = cv2.VideoCapture(path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
step = max(1, total // MAX_FRAMES)
frames, i = [], 0
while True:
ret, frame = cap.read()
if not ret:
break
if i % step == 0:
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
i += 1
cap.release()
return frames[:MAX_FRAMES]
@app.on_event("startup")
async def preload():
logger.info("Preloading models...")
await asyncio.gather(
asyncio.to_thread(_fp._ensure),
asyncio.to_thread(_co._ensure),
asyncio.to_thread(_st._ensure),
)
logger.info("All models preloaded")
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/detect/image", response_model=DetectionResponse)
async def detect_image(file: UploadFile = File(...)):
t0 = time.monotonic()
if file.content_type not in IMAGE_TYPES:
raise HTTPException(400, f"Unsupported type: {file.content_type}")
data = await file.read()
if len(data) > MAX_MB * 1024 * 1024:
raise HTTPException(413, "File too large")
image = Image.open(io.BytesIO(data)).convert("RGB")
fp, co, st = await asyncio.gather(
asyncio.to_thread(_fp.run, image),
asyncio.to_thread(_co.run, image),
asyncio.to_thread(_st.run, image),
)
ms = (time.monotonic() - t0) * 1000
for r in [fp, co, st]:
r.processing_time_ms = ms
verdict, conf, gen = fuse([fp, co, st], is_video=False)
expl = await asyncio.to_thread(explain, verdict, conf, [fp, co, st], gen)
return DetectionResponse(
verdict=verdict, confidence=conf, attributed_generator=gen,
explanation=expl, processing_time_ms=ms,
engine_breakdown=[fp, co, st],
)
@app.post("/detect/video", response_model=DetectionResponse)
async def detect_video(file: UploadFile = File(...)):
t0 = time.monotonic()
if file.content_type not in VIDEO_TYPES:
raise HTTPException(400, f"Unsupported type: {file.content_type}")
data = await file.read()
if len(data) > MAX_MB * 1024 * 1024:
raise HTTPException(413, "File too large")
# Route heavy videos to RunPod
if len(data) > 20 * 1024 * 1024:
return await route_inference(data, "video")
tmp = Path(f"/tmp/vid_{int(time.time()*1000)}.mp4")
tmp.write_bytes(data)
try:
frames = await asyncio.to_thread(_extract_frames, str(tmp))
finally:
tmp.unlink(missing_ok=True)
if not frames:
raise HTTPException(422, "Could not extract frames")
fp, co, st = await asyncio.gather(
asyncio.to_thread(_fp.run_video, frames),
asyncio.to_thread(_co.run_video, frames),
asyncio.to_thread(_st.run_video, frames),
)
ms = (time.monotonic() - t0) * 1000
for r in [fp, co, st]:
r.processing_time_ms = ms
verdict, conf, gen = fuse([fp, co, st], is_video=True)
expl = await asyncio.to_thread(explain, verdict, conf, [fp, co, st], gen)
return DetectionResponse(
verdict=verdict, confidence=conf, attributed_generator=gen,
explanation=expl, processing_time_ms=ms,
engine_breakdown=[fp, co, st],
)
Types β src/types.py
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
GENERATOR_LABELS = {
0: "real",
1: "unknown_gan",
2: "stable_diffusion",
3: "midjourney",
4: "dall_e",
5: "flux",
6: "firefly",
7: "imagen",
}
class EngineResult(BaseModel):
engine: str
verdict: str # FAKE | REAL | UNKNOWN
confidence: float # 0β1
attributed_generator: Optional[str] = None
explanation: str = ""
processing_time_ms: float = 0.0
class DetectionResponse(BaseModel):
verdict: str
confidence: float
attributed_generator: str
explanation: str
processing_time_ms: float
engine_breakdown: list[EngineResult]
Inference Router β src/services/inference_router.py
import base64, logging, os
import httpx
from src.types import DetectionResponse
logger = logging.getLogger(__name__)
RUNPOD_KEY = os.environ.get("RUNPOD_API_KEY", "")
RUNPOD_EID = os.environ.get("RUNPOD_ENDPOINT_ID", "")
async def route_inference(data: bytes, media_type: str) -> DetectionResponse:
if not RUNPOD_KEY or not RUNPOD_EID:
raise RuntimeError(
"RunPod not configured. Set RUNPOD_API_KEY and RUNPOD_ENDPOINT_ID."
)
url = f"https://api.runpod.ai/v2/{RUNPOD_EID}/runsync"
payload = {"input": {"data": base64.b64encode(data).decode(),
"media_type": media_type}}
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(url, json=payload,
headers={"Authorization": f"Bearer {RUNPOD_KEY}"})
resp.raise_for_status()
return DetectionResponse(**resp.json()["output"])
RunPod Handler β runpod_handler.py (project root)
import base64, io, os, tempfile
import runpod, cv2, numpy as np
from PIL import Image
os.environ.setdefault("MODEL_CACHE_DIR", "/tmp/models")
from src.engines.fingerprint.engine import FingerprintEngine
from src.engines.coherence.engine import CoherenceEngine
from src.engines.sstgnn.engine import SSTGNNEngine
from src.explainability.explainer import explain
from src.fusion.fuser import fuse
_fp = FingerprintEngine()
_co = CoherenceEngine()
_st = SSTGNNEngine()
def handler(job: dict) -> dict:
inp = job["input"]
raw = base64.b64decode(inp["data"])
media_type = inp.get("media_type", "image")
if media_type == "image":
image = Image.open(io.BytesIO(raw)).convert("RGB")
fp = _fp.run(image)
co = _co.run(image)
st = _st.run(image)
verdict, conf, gen = fuse([fp, co, st], is_video=False)
else:
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
f.write(raw)
tmp = f.name
try:
cap = cv2.VideoCapture(tmp)
frames, i = [], 0
while True:
ret, frame = cap.read()
if not ret:
break
if i % 4 == 0:
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
i += 1
cap.release()
finally:
os.unlink(tmp)
fp = _fp.run_video(frames)
co = _co.run_video(frames)
st = _st.run_video(frames)
verdict, conf, gen = fuse([fp, co, st], is_video=True)
expl = explain(verdict, conf, [fp, co, st], gen)
return {
"verdict": verdict,
"confidence": conf,
"attributed_generator": gen,
"explanation": expl,
"processing_time_ms": 0.0,
"engine_breakdown": [r.model_dump() for r in [fp, co, st]],
}
runpod.serverless.start({"handler": handler})
Hosting
Option A β HuggingFace Spaces (Free, CPU, primary API host)
spaces/app.py:
import os
os.environ.setdefault("MODEL_CACHE_DIR", "/data/models")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
import uvicorn
from src.api.main import app
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860, workers=1)
Root README.md front-matter (Hugging Face reads this file):
---
title: GenAI DeepDetect
emoji: "π"
colorFrom: gray
colorTo: indigo
sdk: docker
app_port: 7860
pinned: false
---
Dockerfile (replace existing):
FROM python:3.11-slim
RUN apt-get update && apt-get install -y \
ffmpeg libgl1-mesa-glx libglib2.0-0 libsm6 libxext6 libxrender-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV MODEL_CACHE_DIR=/data/models
ENV TOKENIZERS_PARALLELISM=false
ENV PYTHONUNBUFFERED=1
EXPOSE 7860
CMD ["python", "spaces/app.py"]
Secrets to set in HF Spaces (Settings β Repository secrets):
GEMINI_API_KEY
HF_TOKEN
RUNPOD_API_KEY
RUNPOD_ENDPOINT_ID
Free tier: 2 vCPU, 16GB RAM, persistent /data volume. Models cache to
/data/models and survive container restarts. Cold start first request: ~90s.
Warm: <5s. GPU upgrade: T4 at $0.05/hr if needed.
Option B β RunPod Serverless (GPU, heavy video, low cost)
- RunPod β Serverless β New Endpoint
- Select template:
runpod/pytorch:2.1.0-py3.10-cuda11.8.0-devel-ubuntu22.04 - Set handler file:
runpod_handler.py - Min replicas: 0, Max: 3
- GPU: RTX 3090 or A40 (cheapest that works)
- Set env vars:
GEMINI_API_KEY,HF_TOKEN,MODEL_CACHE_DIR=/tmp/models
Cost: ~$0.0002/request on H100. Billed per second. Min workers = 0 means you pay nothing when idle β cold start is ~15s.
When it triggers: inference_router.py automatically sends videos >20MB to
RunPod. Images always run on HF Spaces.
Frontend β frontend/lib/api.ts
const BASE_URL =
process.env.NEXT_PUBLIC_API_URL ??
'https://YOUR-USERNAME-genai-deepdetect.hf.space';
export type GeneratorLabel =
| 'real'
| 'unknown_gan'
| 'stable_diffusion'
| 'midjourney'
| 'dall_e'
| 'flux'
| 'firefly'
| 'imagen';
export interface EngineResult {
engine: string;
verdict: 'FAKE' | 'REAL' | 'UNKNOWN';
confidence: number;
attributed_generator: GeneratorLabel | null;
explanation: string;
processing_time_ms: number;
}
export interface DetectionResponse {
verdict: 'FAKE' | 'REAL' | 'UNKNOWN';
confidence: number;
attributed_generator: GeneratorLabel;
explanation: string;
processing_time_ms: number;
engine_breakdown: EngineResult[];
}
async function _post(endpoint: string, file: File): Promise<DetectionResponse> {
const form = new FormData();
form.append('file', file);
const res = await fetch(`${BASE_URL}${endpoint}`, {
method: 'POST',
body: form,
});
if (!res.ok) {
const err = await res.text();
throw new Error(`Detection failed (${res.status}): ${err}`);
}
return res.json();
}
export const detectImage = (file: File) => _post('/detect/image', file);
export const detectVideo = (file: File) => _post('/detect/video', file);
Set in frontend/.env.local:
NEXT_PUBLIC_API_URL=https://your-username-genai-deepdetect.hf.space
Dependencies β requirements.txt
# API
fastapi>=0.111.0
uvicorn[standard]>=0.29.0
python-multipart>=0.0.9
aiofiles>=23.2.1
httpx>=0.27.0
pydantic>=2.7.0
# ML β fingerprint
transformers>=4.40.0
timm>=1.0.0
torch>=2.1.0
torchvision>=0.16.0
# ML β coherence
facenet-pytorch>=2.5.3
mediapipe>=0.10.14
opencv-python-headless>=4.9.0
# ML β sstgnn
torch-geometric>=2.5.0
scipy>=1.13.0
# Explainability β Gemini
google-generativeai>=0.8.0
# HuggingFace
huggingface-hub>=0.23.0
# RunPod serverless handler
runpod>=1.6.0
# Continual learning
apscheduler>=3.10.4
# Utils
Pillow>=10.3.0
numpy>=1.26.0
Bug Checklist β Fix Before Running
src/types.py
-
EngineResultmissingattributed_generator: Optional[str] = Noneβ add it -
DetectionResponse.engine_breakdowntyped aslist[dict]β change tolist[EngineResult]
src/fusion/fuser.py
-
fuse()returns 2-tuple β update to return 3-tuple(verdict, conf, generator) - Update all callers in
main.pyaccordingly
src/explainability/explainer.py
- References
anthropicSDK β replace entirely with Gemini implementation above
src/api/main.py
- Missing CORS middleware β add before deploy
- Missing
@app.on_event("startup")preload β add it - Missing
_extract_frames()for video β add it -
detect_videolikely missing or stubbed β implement fully
src/engines/*/ directories
- All three engine files are stubs or empty β replace with full code above
spaces/app.py
- Likely empty β add uvicorn entrypoint
Dockerfile
- Check for
ffmpegandlibgl1-mesa-glxβ required for MediaPipe + OpenCV - Check
EXPOSE 7860matches HF Spacesapp_port
src/services/inference_router.py
- Likely stub β implement
route_inference()with RunPod httpx call
Code Standards
- Lazy-load all models behind a threading lock β never load at module import
- Wrap all model inference in
asyncio.to_thread()β never block the event loop - Type hints on every function
logging.getLogger(__name__)notprint()os.environ.get()not hardcoded secrets- Pydantic
BaseModelfor all response schemas - Next.js: pages router only β no
app/dir, nosrc/dir - Font: Plus Jakarta Sans or DM Sans β never Inter, Roboto, Arial
- Border radius: 22% icon containers, 18px cards, 12px buttons
MCP Usage Rules
Every coding session must follow these rules:
1. Adding a dependency?
β context7: resolve-library-id <package>
β context7: query-docs <package> <specific feature>
2. Using any HF model?
β huggingface: model_details <model-id>
β confirm size, license, task, input format
3. Modifying engine logic?
β context7: query-docs transformers pipeline (fingerprint)
β context7: query-docs mediapipe face_mesh (coherence)
β context7: query-docs torch-geometric GCNConv (sstgnn)
β context7: query-docs facenet-pytorch (coherence embeddings)
4. Modifying Gemini calls?
β context7: query-docs google-generativeai GenerativeModel
5. Modifying RunPod handler?
β context7: query-docs runpod serverless handler
6. Modifying FastAPI routes?
β context7: query-docs fastapi UploadFile
7. Frontend API changes?
β context7: query-docs next.js pages-router fetch
Friday Deploy Checklist
[ ] pip install -r requirements.txt (no errors)
[ ] src/types.py β EngineResult has attributed_generator
[ ] src/types.py β DetectionResponse has engine_breakdown: list[EngineResult]
[ ] src/fusion/fuser.py β returns 3-tuple
[ ] src/explainability/explainer.py β uses Gemini, no anthropic import
[ ] src/engines/fingerprint/engine.py β full implementation
[ ] src/engines/coherence/engine.py β full implementation
[ ] src/engines/sstgnn/engine.py β full implementation
[ ] src/api/main.py β CORS + startup preload + video route
[ ] src/services/inference_router.py β RunPod httpx call
[ ] runpod_handler.py β added to project root
[ ] spaces/app.py β uvicorn entrypoint
[ ] Dockerfile β has ffmpeg, libgl1, EXPOSE 7860
[ ] HF Space created + secrets set + pushed
[ ] RunPod endpoint deployed + endpoint ID noted
[ ] frontend/.env.local β NEXT_PUBLIC_API_URL points to HF Space
[ ] Vercel deploy of frontend/
Smoke tests:
[ ] GET /health β {"status":"ok"}
[ ] POST /detect/image (real JPEG) β verdict REAL
[ ] POST /detect/image (AI PNG) β verdict FAKE
[ ] POST /detect/video (MP4 <20MB) β response within 30s
[ ] POST /detect/video (MP4 >20MB) β routes to RunPod