Spaces:
Running
Running
| """ | |
| Antai AI Image Detector β HuggingFace Space inference API. | |
| Two detection systems run side-by-side: | |
| System A β Weighted Ensemble (default): | |
| Model A: OwensLab/commfor-model-224 (Community-Forensics) β weight 0.6 | |
| Model B: haywoodsloan/ai-image-detector-deploy β weight 0.4 | |
| final_score = score_A * 0.6 + score_B * 0.4 | |
| System B β Classic CLIP Router: | |
| Stage 1: openai/clip-vit-base-patch32 zero-shot classifies content type into a bucket. | |
| Stage 2: A bucket-specific specialist model runs and produces the final score. | |
| Buckets: portrait_face, document_ui, art_illustration, nature_landscape, | |
| interior_architecture, general | |
| Both systems share the same verdict thresholds: | |
| β₯ 0.6 β AI Generated | |
| 0.35β0.6 β Uncertain | |
| < 0.35 β Likely Real | |
| All model IDs and weights are overridable via environment variables. | |
| """ | |
| import os | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from transformers import pipeline | |
| from PIL import Image | |
| import requests | |
| import base64 | |
| import io | |
| from typing import Optional | |
| import torch | |
| import torch.nn as nn | |
| import torchvision.transforms as T | |
| import timm | |
| from huggingface_hub import PyTorchModelHubMixin | |
| # --------------------------------------------------------------------------- | |
| # Community-Forensics custom model (OwensLab/commfor-model-224) | |
| # Uses PyTorchModelHubMixin β cannot be loaded via transformers.pipeline(). | |
| # Architecture from: https://github.com/JeongsooP/Community-Forensics | |
| # --------------------------------------------------------------------------- | |
| class _ViTClassifier(nn.Module, PyTorchModelHubMixin): | |
| """Minimal reproduction of the ViTClassifier needed for inference only.""" | |
| def __init__(self, model_size="small", input_size=224, patch_size=16, | |
| freeze_backbone=False, device="cpu", **kwargs): | |
| super().__init__() | |
| _dev = torch.device("cpu") # Space is CPU-only; ignore config's "cuda" | |
| if model_size == "small": | |
| name = f"vit_small_patch{patch_size}_{input_size}.augreg_in21k_ft_in1k" | |
| self.vit = timm.create_model(name, pretrained=False) | |
| self.vit.head = nn.Linear(384, 1, bias=True) | |
| elif model_size == "tiny": | |
| name = f"vit_tiny_patch16_{input_size}.augreg_in21k_ft_in1k" | |
| self.vit = timm.create_model(name, pretrained=False) | |
| self.vit.head = nn.Linear(192, 1, bias=True) | |
| else: | |
| raise ValueError(f"[commfor] Unknown model_size: {model_size}") | |
| self.to(_dev) | |
| def forward(self, x): | |
| return self.vit(x) | |
| class _CommforPipeline: | |
| """Wraps _ViTClassifier to match the transformers pipeline call signature | |
| used by run_model() / extract_ai_confidence().""" | |
| _transform = T.Compose([ | |
| T.Resize((224, 224)), | |
| T.ToTensor(), | |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ]) | |
| def __init__(self, model: _ViTClassifier): | |
| self.model = model | |
| self.model.eval() | |
| def __call__(self, image: Image.Image): | |
| tensor = self._transform(image).unsqueeze(0) | |
| with torch.no_grad(): | |
| logit = self.model(tensor).squeeze() | |
| score = float(torch.sigmoid(logit)) | |
| return [{"label": "ai_generated", "score": score}, | |
| {"label": "real", "score": 1.0 - score}] | |
| def _load_commfor(model_id: str): | |
| try: | |
| print(f"[Antai] Loading Community-Forensics (custom loader): {model_id}") | |
| m = _ViTClassifier.from_pretrained(model_id, device="cpu") | |
| pipe = _CommforPipeline(m) | |
| print("[Antai] Community-Forensics ready.") | |
| return pipe | |
| except Exception as e: | |
| print(f"[Antai] WARNING β Community-Forensics failed to load: {e}") | |
| return None | |
| # Two workers β enough for parallel ensemble execution. | |
| _executor = ThreadPoolExecutor(max_workers=2) | |
| app = FastAPI(title="Antai AI Image Detector") | |
| # --------------------------------------------------------------------------- | |
| # Configuration β Ensemble | |
| # --------------------------------------------------------------------------- | |
| MODEL_A_ID = os.environ.get("MODEL_A_ID", "OwensLab/commfor-model-224") | |
| MODEL_B_ID = os.environ.get("MODEL_B_ID", "haywoodsloan/ai-image-detector-deploy") | |
| WEIGHT_A = float(os.environ.get("WEIGHT_A", "0.6")) | |
| WEIGHT_B = float(os.environ.get("WEIGHT_B", "0.4")) | |
| THRESHOLD_AI = float(os.environ.get("THRESHOLD_AI", "0.6")) | |
| THRESHOLD_UNCERTAIN = float(os.environ.get("THRESHOLD_UNCERTAIN", "0.35")) | |
| DEBUG = os.environ.get("DEBUG", "false").lower() in ("true", "1", "yes") | |
| # --------------------------------------------------------------------------- | |
| # Configuration β Classic CLIP Router | |
| # --------------------------------------------------------------------------- | |
| ROUTER_MODEL_ID = os.environ.get("ROUTER_MODEL_ID", "openai/clip-vit-base-patch32") | |
| _ATEEQQ = "Ateeqq/ai-vs-human-image-detector" | |
| _DIMA = "dima806/deepfake_vs_real_image_detection" | |
| _UMM = "umm-maybe/AI-image-detector" | |
| _NYUAD = "NYUAD-ComNets/NYUAD_AI-generated_images_detector" | |
| ROUTER_CANDIDATE_LABELS = [ | |
| "portrait or face photo", | |
| "document screenshot or UI", | |
| "artwork illustration or painting", | |
| "nature landscape or outdoor", | |
| "indoor room interior or architecture", | |
| "general photo or other", | |
| ] | |
| _LABEL_TO_BUCKET = { | |
| "portrait or face photo": "portrait_face", | |
| "document screenshot or UI": "document_ui", | |
| "artwork illustration or painting": "art_illustration", | |
| "nature landscape or outdoor": "nature_landscape", | |
| "indoor room interior or architecture": "interior_architecture", | |
| "general photo or other": "general", | |
| } | |
| # Which specialist each bucket routes to by default | |
| _LABEL_TO_SPECIALIST_ID = { | |
| "portrait or face photo": _ATEEQQ, | |
| "document screenshot or UI": _ATEEQQ, | |
| "artwork illustration or painting": _ATEEQQ, | |
| "nature landscape or outdoor": MODEL_B_ID, # Haywood | |
| "indoor room interior or architecture": _NYUAD, | |
| "general photo or other": MODEL_B_ID, # Haywood | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Model loading β runs once at startup, never per-request. | |
| # Each load is wrapped in try/except so one bad model weight doesn't crash | |
| # the entire service β the failing pipeline is set to None and the /detect | |
| # route raises a clean 503 if that specific model is requested. | |
| # --------------------------------------------------------------------------- | |
| def _load_pipeline(task: str, model_id: str, label: str): | |
| try: | |
| print(f"[Antai] Loading {label}: {model_id}") | |
| p = pipeline(task, model=model_id) | |
| print(f"[Antai] {label} ready.") | |
| return p | |
| except Exception as e: | |
| print(f"[Antai] WARNING β {label} failed to load: {e}") | |
| return None | |
| pipeline_a = _load_commfor(MODEL_A_ID) | |
| pipeline_b = _load_pipeline("image-classification", MODEL_B_ID, "Model B (Haywoodsloan)") | |
| router_pipeline = _load_pipeline("zero-shot-image-classification", ROUTER_MODEL_ID, "CLIP router") | |
| pipeline_ateeqq = _load_pipeline("image-classification", _ATEEQQ, "Ateeqq specialist") | |
| pipeline_dima = _load_pipeline("image-classification", _DIMA, "dima806 specialist") | |
| pipeline_umm = _load_pipeline("image-classification", _UMM, "umm-maybe specialist") | |
| pipeline_nyuad = _load_pipeline("image-classification", _NYUAD, "NYUAD specialist") | |
| # Lookup table: model ID β pipeline instance (for router dynamic dispatch) | |
| _SPECIALIST_PIPELINES = { | |
| _ATEEQQ: pipeline_ateeqq, | |
| _DIMA: pipeline_dima, | |
| _UMM: pipeline_umm, | |
| _NYUAD: pipeline_nyuad, | |
| MODEL_B_ID: pipeline_b, | |
| } | |
| _loaded = [l for l, p in [("Model A", pipeline_a), ("Model B", pipeline_b), | |
| ("Router", router_pipeline), ("Ateeqq", pipeline_ateeqq), | |
| ("dima806", pipeline_dima), ("umm-maybe", pipeline_umm), ("NYUAD", pipeline_nyuad)] if p] | |
| _failed = [l for l, p in [("Model A", pipeline_a), ("Model B", pipeline_b), | |
| ("Router", router_pipeline), ("Ateeqq", pipeline_ateeqq), | |
| ("dima806", pipeline_dima), ("umm-maybe", pipeline_umm), ("NYUAD", pipeline_nyuad)] if not p] | |
| print(f"[Antai] Startup complete. Loaded: {_loaded}" + (f" Failed: {_failed}" if _failed else "")) | |
| # --------------------------------------------------------------------------- | |
| # Request schema | |
| # --------------------------------------------------------------------------- | |
| class DetectRequest(BaseModel): | |
| imageUrl: Optional[str] = None # public image URL | |
| imageData: Optional[str] = None # base64 data URL: "data:image/jpeg;base64,..." | |
| singleModel: Optional[str] = None # "commfor" | "haywood" β ensemble system | |
| # "router" β CLIP auto-route | |
| # "ateeqq" | "dima" | "umm" | "nyuad" β classic direct | |
| # --------------------------------------------------------------------------- | |
| # Image loader | |
| # --------------------------------------------------------------------------- | |
| def load_image(req: DetectRequest) -> Image.Image: | |
| if req.imageData: | |
| if "," not in req.imageData: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid base64 data URL β expected 'data:image/...;base64,...'", | |
| ) | |
| _, raw = req.imageData.split(",", 1) | |
| try: | |
| img_bytes = base64.b64decode(raw) | |
| return Image.open(io.BytesIO(img_bytes)).convert("RGB") | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Could not decode base64 image: {e}") | |
| if req.imageUrl: | |
| try: | |
| resp = requests.get( | |
| req.imageUrl, | |
| timeout=15, | |
| headers={"User-Agent": "Mozilla/5.0 (Antai-Detector/1.0)"}, | |
| ) | |
| resp.raise_for_status() | |
| return Image.open(io.BytesIO(resp.content)).convert("RGB") | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Could not fetch image URL: {e}") | |
| raise HTTPException(status_code=400, detail="Provide either imageUrl or imageData") | |
| # --------------------------------------------------------------------------- | |
| # AI confidence extraction | |
| # | |
| # Handles multiple model label conventions: | |
| # Ateeqq / gate model: "ai" / "hum" | |
| # Haywoodsloan: "artificial" / "real" | |
| # CIFAKE-style: "FAKE" / "REAL" (lowercased before lookup) | |
| # OwensLab commfor: unknown β generic fallback handles it | |
| # --------------------------------------------------------------------------- | |
| AI_LABELS = {"ai", "artificial", "fake", "ai_generated", "ai-generated", "dalle", "sd"} | |
| HUMAN_LABELS = {"human", "real", "authentic", "genuine", "natural", "hum"} | |
| def extract_ai_confidence(results: list) -> float: | |
| for r in results: | |
| if r["label"].lower() in AI_LABELS: | |
| return float(r["score"]) | |
| for r in results: | |
| if r["label"].lower() in HUMAN_LABELS: | |
| return 1.0 - float(r["score"]) | |
| return float(results[0]["score"]) | |
| # --------------------------------------------------------------------------- | |
| # Inference helpers | |
| # --------------------------------------------------------------------------- | |
| def run_model(pipe, image: Image.Image, name: str): | |
| """Blocking CPU inference. Returns (score, error). Never raises.""" | |
| try: | |
| results = pipe(image) | |
| score = max(0.0, min(1.0, extract_ai_confidence(results))) | |
| return score, None | |
| except Exception as e: | |
| print(f"[Antai] {name} inference failed: {e}") | |
| return None, str(e) | |
| async def run_model_async(pipe, image: Image.Image, name: str): | |
| """Runs run_model in a thread so models can execute concurrently.""" | |
| loop = asyncio.get_event_loop() | |
| return await loop.run_in_executor(_executor, run_model, pipe, image, name) | |
| def run_router(image: Image.Image): | |
| """Blocking CLIP zero-shot routing. Returns (results_list, error). Never raises.""" | |
| try: | |
| results = router_pipeline(image, candidate_labels=ROUTER_CANDIDATE_LABELS) | |
| return results, None | |
| except Exception as e: | |
| print(f"[Antai] CLIP router inference failed: {e}") | |
| return None, str(e) | |
| async def run_router_async(image: Image.Image): | |
| loop = asyncio.get_event_loop() | |
| return await loop.run_in_executor(_executor, run_router, image) | |
| # --------------------------------------------------------------------------- | |
| # Routes | |
| # --------------------------------------------------------------------------- | |
| async def detect(req: DetectRequest): | |
| image = load_image(req) | |
| # Routing metadata β populated only for router mode | |
| score_a = None | |
| score_b = None | |
| router_bucket = None | |
| specialist_model_id = None | |
| # ββ Ensemble system βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if req.singleModel == "commfor": | |
| if pipeline_a is None: | |
| raise HTTPException(status_code=503, detail="Community-Forensics model failed to load at startup") | |
| score, err = await run_model_async(pipeline_a, image, "ModelA") | |
| if score is None: | |
| raise HTTPException(status_code=500, detail=f"Community-Forensics inference failed: {err}") | |
| score_a = score | |
| final_score = score | |
| elif req.singleModel == "haywood": | |
| if pipeline_b is None: | |
| raise HTTPException(status_code=503, detail="Haywoodsloan model failed to load at startup") | |
| score, err = await run_model_async(pipeline_b, image, "ModelB") | |
| if score is None: | |
| raise HTTPException(status_code=500, detail=f"Haywoodsloan inference failed: {err}") | |
| score_b = score | |
| final_score = score | |
| # ββ Classic router ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif req.singleModel == "router": | |
| router_results, err = await run_router_async(image) | |
| if router_results is None: | |
| # Router failed β fall back to Haywood (or commfor if Haywood also failed) | |
| fallback = pipeline_b or pipeline_a | |
| if fallback is None: | |
| raise HTTPException(status_code=503, detail="Router and all fallback models failed to load") | |
| print(f"[Antai] CLIP router failed ({err}), falling back to ensemble model") | |
| score, _ = await run_model_async(fallback, image, "fallback") | |
| final_score = score if score is not None else 0.5 | |
| router_bucket = "general" | |
| specialist_model_id = MODEL_B_ID | |
| else: | |
| top_label = router_results[0]["label"] | |
| router_bucket = _LABEL_TO_BUCKET.get(top_label, "general") | |
| specialist_model_id = _LABEL_TO_SPECIALIST_ID.get(top_label, MODEL_B_ID) | |
| specialist_pipe = _SPECIALIST_PIPELINES.get(specialist_model_id) | |
| if specialist_pipe is None: | |
| specialist_pipe = pipeline_b or pipeline_a | |
| if specialist_pipe is None: | |
| raise HTTPException(status_code=503, detail=f"Specialist model for bucket '{router_bucket}' not available") | |
| score, err2 = await run_model_async(specialist_pipe, image, "RouterSpecialist") | |
| final_score = score if score is not None else 0.5 | |
| elif req.singleModel == "ateeqq": | |
| if pipeline_ateeqq is None: | |
| raise HTTPException(status_code=503, detail="Ateeqq model failed to load at startup") | |
| score, err = await run_model_async(pipeline_ateeqq, image, "Ateeqq") | |
| if score is None: | |
| raise HTTPException(status_code=500, detail=f"Ateeqq inference failed: {err}") | |
| final_score = score | |
| elif req.singleModel == "dima": | |
| if pipeline_dima is None: | |
| raise HTTPException(status_code=503, detail="dima806 model failed to load at startup") | |
| score, err = await run_model_async(pipeline_dima, image, "dima806") | |
| if score is None: | |
| raise HTTPException(status_code=500, detail=f"dima806 inference failed: {err}") | |
| final_score = score | |
| elif req.singleModel == "umm": | |
| if pipeline_umm is None: | |
| raise HTTPException(status_code=503, detail="umm-maybe model failed to load at startup") | |
| score, err = await run_model_async(pipeline_umm, image, "umm-maybe") | |
| if score is None: | |
| raise HTTPException(status_code=500, detail=f"umm-maybe inference failed: {err}") | |
| final_score = score | |
| elif req.singleModel == "nyuad": | |
| if pipeline_nyuad is None: | |
| raise HTTPException(status_code=503, detail="NYUAD model failed to load at startup") | |
| score, err = await run_model_async(pipeline_nyuad, image, "NYUAD") | |
| if score is None: | |
| raise HTTPException(status_code=500, detail=f"NYUAD inference failed: {err}") | |
| final_score = score | |
| # ββ Full weighted ensemble (default) ββββββββββββββββββββββββββββββββββ | |
| else: | |
| (score_a, err_a), (score_b, err_b) = await asyncio.gather( | |
| run_model_async(pipeline_a, image, "ModelA"), | |
| run_model_async(pipeline_b, image, "ModelB"), | |
| ) | |
| if score_a is None and score_b is None: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Both ensemble models failed. ModelA: {err_a}. ModelB: {err_b}", | |
| ) | |
| if score_a is not None and score_b is not None: | |
| final_score = score_a * WEIGHT_A + score_b * WEIGHT_B | |
| elif score_a is not None: | |
| print("[Antai] ModelB unavailable, using ModelA score alone") | |
| final_score = score_a | |
| else: | |
| print("[Antai] ModelA unavailable, using ModelB score alone") | |
| final_score = score_b | |
| if final_score >= THRESHOLD_AI: | |
| verdict = "AI Generated" | |
| elif final_score >= THRESHOLD_UNCERTAIN: | |
| verdict = "Uncertain" | |
| else: | |
| verdict = "Likely Real" | |
| active_model = req.singleModel if req.singleModel else "ensemble" | |
| response = { | |
| "confidence": final_score, | |
| "isAI": final_score >= THRESHOLD_AI, | |
| "verdict": verdict, | |
| "finalScore": round(final_score, 4), | |
| "modelAScore": round(score_a, 4) if score_a is not None else None, | |
| "modelBScore": round(score_b, 4) if score_b is not None else None, | |
| "gateScore": None, | |
| "routerBucket": router_bucket, | |
| "specialistModel": specialist_model_id, | |
| "provider": f"hf_{active_model}", | |
| } | |
| if DEBUG: | |
| response["modelAId"] = MODEL_A_ID | |
| response["modelBId"] = MODEL_B_ID | |
| return response | |
| async def health(): | |
| return {"status": "ok"} | |