antai-detector / app.py
RaidenIppen's picture
Fix: implement custom loader for Community-Forensics (commfor) model
d94f3a8
"""
Antai AI Image Detector β€” HuggingFace Space inference API.
Two detection systems run side-by-side:
System A β€” Weighted Ensemble (default):
Model A: OwensLab/commfor-model-224 (Community-Forensics) β€” weight 0.6
Model B: haywoodsloan/ai-image-detector-deploy β€” weight 0.4
final_score = score_A * 0.6 + score_B * 0.4
System B β€” Classic CLIP Router:
Stage 1: openai/clip-vit-base-patch32 zero-shot classifies content type into a bucket.
Stage 2: A bucket-specific specialist model runs and produces the final score.
Buckets: portrait_face, document_ui, art_illustration, nature_landscape,
interior_architecture, general
Both systems share the same verdict thresholds:
β‰₯ 0.6 β†’ AI Generated
0.35–0.6 β†’ Uncertain
< 0.35 β†’ Likely Real
All model IDs and weights are overridable via environment variables.
"""
import os
import asyncio
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import pipeline
from PIL import Image
import requests
import base64
import io
from typing import Optional
import torch
import torch.nn as nn
import torchvision.transforms as T
import timm
from huggingface_hub import PyTorchModelHubMixin
# ---------------------------------------------------------------------------
# Community-Forensics custom model (OwensLab/commfor-model-224)
# Uses PyTorchModelHubMixin β€” cannot be loaded via transformers.pipeline().
# Architecture from: https://github.com/JeongsooP/Community-Forensics
# ---------------------------------------------------------------------------
class _ViTClassifier(nn.Module, PyTorchModelHubMixin):
"""Minimal reproduction of the ViTClassifier needed for inference only."""
def __init__(self, model_size="small", input_size=224, patch_size=16,
freeze_backbone=False, device="cpu", **kwargs):
super().__init__()
_dev = torch.device("cpu") # Space is CPU-only; ignore config's "cuda"
if model_size == "small":
name = f"vit_small_patch{patch_size}_{input_size}.augreg_in21k_ft_in1k"
self.vit = timm.create_model(name, pretrained=False)
self.vit.head = nn.Linear(384, 1, bias=True)
elif model_size == "tiny":
name = f"vit_tiny_patch16_{input_size}.augreg_in21k_ft_in1k"
self.vit = timm.create_model(name, pretrained=False)
self.vit.head = nn.Linear(192, 1, bias=True)
else:
raise ValueError(f"[commfor] Unknown model_size: {model_size}")
self.to(_dev)
def forward(self, x):
return self.vit(x)
class _CommforPipeline:
"""Wraps _ViTClassifier to match the transformers pipeline call signature
used by run_model() / extract_ai_confidence()."""
_transform = T.Compose([
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def __init__(self, model: _ViTClassifier):
self.model = model
self.model.eval()
def __call__(self, image: Image.Image):
tensor = self._transform(image).unsqueeze(0)
with torch.no_grad():
logit = self.model(tensor).squeeze()
score = float(torch.sigmoid(logit))
return [{"label": "ai_generated", "score": score},
{"label": "real", "score": 1.0 - score}]
def _load_commfor(model_id: str):
try:
print(f"[Antai] Loading Community-Forensics (custom loader): {model_id}")
m = _ViTClassifier.from_pretrained(model_id, device="cpu")
pipe = _CommforPipeline(m)
print("[Antai] Community-Forensics ready.")
return pipe
except Exception as e:
print(f"[Antai] WARNING β€” Community-Forensics failed to load: {e}")
return None
# Two workers β€” enough for parallel ensemble execution.
_executor = ThreadPoolExecutor(max_workers=2)
app = FastAPI(title="Antai AI Image Detector")
# ---------------------------------------------------------------------------
# Configuration β€” Ensemble
# ---------------------------------------------------------------------------
MODEL_A_ID = os.environ.get("MODEL_A_ID", "OwensLab/commfor-model-224")
MODEL_B_ID = os.environ.get("MODEL_B_ID", "haywoodsloan/ai-image-detector-deploy")
WEIGHT_A = float(os.environ.get("WEIGHT_A", "0.6"))
WEIGHT_B = float(os.environ.get("WEIGHT_B", "0.4"))
THRESHOLD_AI = float(os.environ.get("THRESHOLD_AI", "0.6"))
THRESHOLD_UNCERTAIN = float(os.environ.get("THRESHOLD_UNCERTAIN", "0.35"))
DEBUG = os.environ.get("DEBUG", "false").lower() in ("true", "1", "yes")
# ---------------------------------------------------------------------------
# Configuration β€” Classic CLIP Router
# ---------------------------------------------------------------------------
ROUTER_MODEL_ID = os.environ.get("ROUTER_MODEL_ID", "openai/clip-vit-base-patch32")
_ATEEQQ = "Ateeqq/ai-vs-human-image-detector"
_DIMA = "dima806/deepfake_vs_real_image_detection"
_UMM = "umm-maybe/AI-image-detector"
_NYUAD = "NYUAD-ComNets/NYUAD_AI-generated_images_detector"
ROUTER_CANDIDATE_LABELS = [
"portrait or face photo",
"document screenshot or UI",
"artwork illustration or painting",
"nature landscape or outdoor",
"indoor room interior or architecture",
"general photo or other",
]
_LABEL_TO_BUCKET = {
"portrait or face photo": "portrait_face",
"document screenshot or UI": "document_ui",
"artwork illustration or painting": "art_illustration",
"nature landscape or outdoor": "nature_landscape",
"indoor room interior or architecture": "interior_architecture",
"general photo or other": "general",
}
# Which specialist each bucket routes to by default
_LABEL_TO_SPECIALIST_ID = {
"portrait or face photo": _ATEEQQ,
"document screenshot or UI": _ATEEQQ,
"artwork illustration or painting": _ATEEQQ,
"nature landscape or outdoor": MODEL_B_ID, # Haywood
"indoor room interior or architecture": _NYUAD,
"general photo or other": MODEL_B_ID, # Haywood
}
# ---------------------------------------------------------------------------
# Model loading β€” runs once at startup, never per-request.
# Each load is wrapped in try/except so one bad model weight doesn't crash
# the entire service β€” the failing pipeline is set to None and the /detect
# route raises a clean 503 if that specific model is requested.
# ---------------------------------------------------------------------------
def _load_pipeline(task: str, model_id: str, label: str):
try:
print(f"[Antai] Loading {label}: {model_id}")
p = pipeline(task, model=model_id)
print(f"[Antai] {label} ready.")
return p
except Exception as e:
print(f"[Antai] WARNING β€” {label} failed to load: {e}")
return None
pipeline_a = _load_commfor(MODEL_A_ID)
pipeline_b = _load_pipeline("image-classification", MODEL_B_ID, "Model B (Haywoodsloan)")
router_pipeline = _load_pipeline("zero-shot-image-classification", ROUTER_MODEL_ID, "CLIP router")
pipeline_ateeqq = _load_pipeline("image-classification", _ATEEQQ, "Ateeqq specialist")
pipeline_dima = _load_pipeline("image-classification", _DIMA, "dima806 specialist")
pipeline_umm = _load_pipeline("image-classification", _UMM, "umm-maybe specialist")
pipeline_nyuad = _load_pipeline("image-classification", _NYUAD, "NYUAD specialist")
# Lookup table: model ID β†’ pipeline instance (for router dynamic dispatch)
_SPECIALIST_PIPELINES = {
_ATEEQQ: pipeline_ateeqq,
_DIMA: pipeline_dima,
_UMM: pipeline_umm,
_NYUAD: pipeline_nyuad,
MODEL_B_ID: pipeline_b,
}
_loaded = [l for l, p in [("Model A", pipeline_a), ("Model B", pipeline_b),
("Router", router_pipeline), ("Ateeqq", pipeline_ateeqq),
("dima806", pipeline_dima), ("umm-maybe", pipeline_umm), ("NYUAD", pipeline_nyuad)] if p]
_failed = [l for l, p in [("Model A", pipeline_a), ("Model B", pipeline_b),
("Router", router_pipeline), ("Ateeqq", pipeline_ateeqq),
("dima806", pipeline_dima), ("umm-maybe", pipeline_umm), ("NYUAD", pipeline_nyuad)] if not p]
print(f"[Antai] Startup complete. Loaded: {_loaded}" + (f" Failed: {_failed}" if _failed else ""))
# ---------------------------------------------------------------------------
# Request schema
# ---------------------------------------------------------------------------
class DetectRequest(BaseModel):
imageUrl: Optional[str] = None # public image URL
imageData: Optional[str] = None # base64 data URL: "data:image/jpeg;base64,..."
singleModel: Optional[str] = None # "commfor" | "haywood" β€” ensemble system
# "router" β€” CLIP auto-route
# "ateeqq" | "dima" | "umm" | "nyuad" β€” classic direct
# ---------------------------------------------------------------------------
# Image loader
# ---------------------------------------------------------------------------
def load_image(req: DetectRequest) -> Image.Image:
if req.imageData:
if "," not in req.imageData:
raise HTTPException(
status_code=400,
detail="Invalid base64 data URL β€” expected 'data:image/...;base64,...'",
)
_, raw = req.imageData.split(",", 1)
try:
img_bytes = base64.b64decode(raw)
return Image.open(io.BytesIO(img_bytes)).convert("RGB")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not decode base64 image: {e}")
if req.imageUrl:
try:
resp = requests.get(
req.imageUrl,
timeout=15,
headers={"User-Agent": "Mozilla/5.0 (Antai-Detector/1.0)"},
)
resp.raise_for_status()
return Image.open(io.BytesIO(resp.content)).convert("RGB")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not fetch image URL: {e}")
raise HTTPException(status_code=400, detail="Provide either imageUrl or imageData")
# ---------------------------------------------------------------------------
# AI confidence extraction
#
# Handles multiple model label conventions:
# Ateeqq / gate model: "ai" / "hum"
# Haywoodsloan: "artificial" / "real"
# CIFAKE-style: "FAKE" / "REAL" (lowercased before lookup)
# OwensLab commfor: unknown β€” generic fallback handles it
# ---------------------------------------------------------------------------
AI_LABELS = {"ai", "artificial", "fake", "ai_generated", "ai-generated", "dalle", "sd"}
HUMAN_LABELS = {"human", "real", "authentic", "genuine", "natural", "hum"}
def extract_ai_confidence(results: list) -> float:
for r in results:
if r["label"].lower() in AI_LABELS:
return float(r["score"])
for r in results:
if r["label"].lower() in HUMAN_LABELS:
return 1.0 - float(r["score"])
return float(results[0]["score"])
# ---------------------------------------------------------------------------
# Inference helpers
# ---------------------------------------------------------------------------
def run_model(pipe, image: Image.Image, name: str):
"""Blocking CPU inference. Returns (score, error). Never raises."""
try:
results = pipe(image)
score = max(0.0, min(1.0, extract_ai_confidence(results)))
return score, None
except Exception as e:
print(f"[Antai] {name} inference failed: {e}")
return None, str(e)
async def run_model_async(pipe, image: Image.Image, name: str):
"""Runs run_model in a thread so models can execute concurrently."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_executor, run_model, pipe, image, name)
def run_router(image: Image.Image):
"""Blocking CLIP zero-shot routing. Returns (results_list, error). Never raises."""
try:
results = router_pipeline(image, candidate_labels=ROUTER_CANDIDATE_LABELS)
return results, None
except Exception as e:
print(f"[Antai] CLIP router inference failed: {e}")
return None, str(e)
async def run_router_async(image: Image.Image):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_executor, run_router, image)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.post("/detect")
async def detect(req: DetectRequest):
image = load_image(req)
# Routing metadata β€” populated only for router mode
score_a = None
score_b = None
router_bucket = None
specialist_model_id = None
# ── Ensemble system ───────────────────────────────────────────────────
if req.singleModel == "commfor":
if pipeline_a is None:
raise HTTPException(status_code=503, detail="Community-Forensics model failed to load at startup")
score, err = await run_model_async(pipeline_a, image, "ModelA")
if score is None:
raise HTTPException(status_code=500, detail=f"Community-Forensics inference failed: {err}")
score_a = score
final_score = score
elif req.singleModel == "haywood":
if pipeline_b is None:
raise HTTPException(status_code=503, detail="Haywoodsloan model failed to load at startup")
score, err = await run_model_async(pipeline_b, image, "ModelB")
if score is None:
raise HTTPException(status_code=500, detail=f"Haywoodsloan inference failed: {err}")
score_b = score
final_score = score
# ── Classic router ────────────────────────────────────────────────────
elif req.singleModel == "router":
router_results, err = await run_router_async(image)
if router_results is None:
# Router failed β€” fall back to Haywood (or commfor if Haywood also failed)
fallback = pipeline_b or pipeline_a
if fallback is None:
raise HTTPException(status_code=503, detail="Router and all fallback models failed to load")
print(f"[Antai] CLIP router failed ({err}), falling back to ensemble model")
score, _ = await run_model_async(fallback, image, "fallback")
final_score = score if score is not None else 0.5
router_bucket = "general"
specialist_model_id = MODEL_B_ID
else:
top_label = router_results[0]["label"]
router_bucket = _LABEL_TO_BUCKET.get(top_label, "general")
specialist_model_id = _LABEL_TO_SPECIALIST_ID.get(top_label, MODEL_B_ID)
specialist_pipe = _SPECIALIST_PIPELINES.get(specialist_model_id)
if specialist_pipe is None:
specialist_pipe = pipeline_b or pipeline_a
if specialist_pipe is None:
raise HTTPException(status_code=503, detail=f"Specialist model for bucket '{router_bucket}' not available")
score, err2 = await run_model_async(specialist_pipe, image, "RouterSpecialist")
final_score = score if score is not None else 0.5
elif req.singleModel == "ateeqq":
if pipeline_ateeqq is None:
raise HTTPException(status_code=503, detail="Ateeqq model failed to load at startup")
score, err = await run_model_async(pipeline_ateeqq, image, "Ateeqq")
if score is None:
raise HTTPException(status_code=500, detail=f"Ateeqq inference failed: {err}")
final_score = score
elif req.singleModel == "dima":
if pipeline_dima is None:
raise HTTPException(status_code=503, detail="dima806 model failed to load at startup")
score, err = await run_model_async(pipeline_dima, image, "dima806")
if score is None:
raise HTTPException(status_code=500, detail=f"dima806 inference failed: {err}")
final_score = score
elif req.singleModel == "umm":
if pipeline_umm is None:
raise HTTPException(status_code=503, detail="umm-maybe model failed to load at startup")
score, err = await run_model_async(pipeline_umm, image, "umm-maybe")
if score is None:
raise HTTPException(status_code=500, detail=f"umm-maybe inference failed: {err}")
final_score = score
elif req.singleModel == "nyuad":
if pipeline_nyuad is None:
raise HTTPException(status_code=503, detail="NYUAD model failed to load at startup")
score, err = await run_model_async(pipeline_nyuad, image, "NYUAD")
if score is None:
raise HTTPException(status_code=500, detail=f"NYUAD inference failed: {err}")
final_score = score
# ── Full weighted ensemble (default) ──────────────────────────────────
else:
(score_a, err_a), (score_b, err_b) = await asyncio.gather(
run_model_async(pipeline_a, image, "ModelA"),
run_model_async(pipeline_b, image, "ModelB"),
)
if score_a is None and score_b is None:
raise HTTPException(
status_code=500,
detail=f"Both ensemble models failed. ModelA: {err_a}. ModelB: {err_b}",
)
if score_a is not None and score_b is not None:
final_score = score_a * WEIGHT_A + score_b * WEIGHT_B
elif score_a is not None:
print("[Antai] ModelB unavailable, using ModelA score alone")
final_score = score_a
else:
print("[Antai] ModelA unavailable, using ModelB score alone")
final_score = score_b
if final_score >= THRESHOLD_AI:
verdict = "AI Generated"
elif final_score >= THRESHOLD_UNCERTAIN:
verdict = "Uncertain"
else:
verdict = "Likely Real"
active_model = req.singleModel if req.singleModel else "ensemble"
response = {
"confidence": final_score,
"isAI": final_score >= THRESHOLD_AI,
"verdict": verdict,
"finalScore": round(final_score, 4),
"modelAScore": round(score_a, 4) if score_a is not None else None,
"modelBScore": round(score_b, 4) if score_b is not None else None,
"gateScore": None,
"routerBucket": router_bucket,
"specialistModel": specialist_model_id,
"provider": f"hf_{active_model}",
}
if DEBUG:
response["modelAId"] = MODEL_A_ID
response["modelBId"] = MODEL_B_ID
return response
@app.get("/health")
async def health():
return {"status": "ok"}