from __future__ import annotations import asyncio import io import logging import os import subprocess import sys import tempfile import time from pathlib import Path import numpy as np from dotenv import load_dotenv from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from PIL import ExifTags, Image from src.engines.coherence.engine import CoherenceEngine from src.engines.fingerprint.engine import FingerprintEngine from src.engines.sstgnn.engine import SSTGNNEngine from src.explainability.explainer import MODEL_CANDIDATES, explain from src.fusion.fuser import fuse from src.services.hf_inference_client import HFInferenceClient, HFInferenceUnavailable from src.services.inference_router import ( get_inference_backend, is_runpod_configured, route_inference, ) from src.services.media_utils import extract_video_frames from src.types import DetectionResponse, EngineResult logger = logging.getLogger(__name__) # Load local development environment values from .env when present. load_dotenv() def _is_test_mode() -> bool: return ( os.environ.get("GENAI_SKIP_MODEL_LOAD", "").strip().lower() in {"1", "true", "yes", "on"} or "PYTEST_CURRENT_TEST" in os.environ or "pytest" in sys.modules ) if _is_test_mode(): os.environ.setdefault("GENAI_SKIP_MODEL_LOAD", "1") app = FastAPI(title="GenAI-DeepDetect", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) _fp = FingerprintEngine() _co = CoherenceEngine() _st = SSTGNNEngine() _hf = HFInferenceClient() MAX_IMAGE_MB = int(os.environ.get("MAX_IMAGE_SIZE_MB", 20)) MAX_VIDEO_MB = int(os.environ.get("MAX_VIDEO_SIZE_MB", 100)) MAX_FRAMES = int(os.environ.get("MAX_VIDEO_FRAMES", 300)) IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp", "image/bmp", "image/gif"} VIDEO_TYPES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm", "video/avi"} SUPPORTED_GENERATORS = [ "real", "unknown_gan", "stable_diffusion", "midjourney", "dall_e", "flux", "firefly", "imagen", ] SYNTHETIC_KEYWORDS = ( "chatgpt", "gemini", "thispersondoesnotexist", "this person does not exist", ) def _find_synthetic_keyword_hits(*texts: str) -> list[str]: haystack = " ".join(texts).lower() hits: list[str] = [] for keyword in SYNTHETIC_KEYWORDS: if keyword in haystack: hits.append(keyword) # Preserve order while deduping. return list(dict.fromkeys(hits)) def _collect_image_metadata_text(data: bytes) -> str: try: image = Image.open(io.BytesIO(data)) except Exception: return "" parts: list[str] = [] try: for key, value in image.info.items(): if isinstance(value, bytes): parts.append(f"{key}={value[:200]!r}") else: parts.append(f"{key}={value}") except Exception: pass try: exif = image.getexif() for tag_id, value in exif.items(): tag_name = ExifTags.TAGS.get(tag_id, str(tag_id)) parts.append(f"{tag_name}={value}") except Exception: pass return " | ".join(parts) def _collect_video_metadata_text( data: bytes, *, content_type: str | None, filename: str | None, ) -> str: suffix = _video_temp_suffix(content_type, filename) with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: tmp.write(data) tmp_path = tmp.name try: cmd = [ "ffprobe", "-v", "error", "-print_format", "json", "-show_format", "-show_streams", tmp_path, ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=15, check=False) if result.returncode != 0: return "" return result.stdout or "" except Exception: return "" finally: Path(tmp_path).unlink(missing_ok=True) def _apply_metadata_keyword_signal( response: DetectionResponse, *, filename: str | None, metadata_text: str, ) -> DetectionResponse: hits = _find_synthetic_keyword_hits(filename or "", metadata_text) if not hits: return response flagged = response.model_copy(deep=True) flagged.engine_breakdown.append( EngineResult( engine="metadata_signal", verdict="FAKE", confidence=0.98, attributed_generator="unknown_gan", explanation=f"Filename/metadata contains synthetic keyword(s): {', '.join(hits)}.", processing_time_ms=0.0, ) ) flagged.explanation = ( f"{flagged.explanation} " f"Metadata signal detected keyword(s): {', '.join(hits)}." ) if flagged.verdict != "FAKE" or flagged.confidence < 0.85: flagged.verdict = "FAKE" flagged.confidence = max(flagged.confidence, 0.85) if flagged.attributed_generator == "real": flagged.attributed_generator = "unknown_gan" return flagged def _video_temp_suffix(content_type: str | None, filename: str | None) -> str: """Choose a temp suffix matching the uploaded container for better decoder compatibility.""" by_type = { "video/mp4": ".mp4", "video/quicktime": ".mov", "video/x-msvideo": ".avi", "video/webm": ".webm", "video/avi": ".avi", } ctype = (content_type or "").split(";")[0].strip().lower() if ctype in by_type: return by_type[ctype] ext = Path(filename or "").suffix.strip().lower() if ext in {".mp4", ".mov", ".avi", ".webm"}: return ext return ".mp4" def _model_inventory() -> dict[str, object]: return { "fingerprint": { "ensemble_detectors": [ "Organika/sdxl-detector", "haywoodsloan/ai-image-detector-deploy", "dima806/deepfake_vs_real_image_detection", ], "ensemble_weights": [0.5, 0.3, 0.2], "attribution_model": "openai/clip-vit-large-patch14", }, "coherence": { "audio_deepfake_model": "disabled (visual-only coherence)", "facial_landmarks": "mediapipe FaceMesh/FaceLandmarker", "temporal_embedding": "facenet-pytorch InceptionResnetV1(vggface2) when available", }, "sstgnn": { "pretrained_hf_models": [ "dima806/deepfake_vs_real_image_detection", "prithivMLmods/Deep-Fake-Detector-Model", ], "graph_component": "scipy.spatial.Delaunay + MediaPipe landmarks", }, "explainability": { "gemini_model_candidates": list(MODEL_CANDIDATES), }, "generator_labels": SUPPORTED_GENERATORS, } @app.get("/", response_class=HTMLResponse) async def root() -> HTMLResponse: return HTMLResponse("

GenAI-DeepDetect API

See /docs

") @app.on_event("startup") async def preload() -> None: if _is_test_mode(): logger.info("Skipping startup preload in test mode") return logger.info("Preloading models...") # Keep model imports/loads sequential to avoid lazy-import race issues. await asyncio.to_thread(_fp._ensure) await asyncio.to_thread(_co._ensure) await asyncio.to_thread(_st._ensure) logger.info("Model preload complete") @app.get("/health") async def health() -> dict: return { "status": "ok", "version": "1.0.0", "engines": ["fingerprint", "coherence", "sstgnn"], "inference_backend": get_inference_backend(), "runpod_configured": is_runpod_configured(), } @app.get("/health/models") async def health_models() -> dict[str, object]: """Return the pretrained model inventory used by each engine.""" return _model_inventory() def _assign_processing_time(results: list[EngineResult], ms: float) -> list[EngineResult]: for result in results: result.processing_time_ms = round(ms, 2) return results def _fallback_explanation(verdict: str, confidence: float, generator: str) -> str: return ( f"Content classified as {verdict} with {confidence:.0%} confidence. " f"Attributed generator: {generator}." ) def _hf_fake_score(preds: list[dict]) -> float: if not preds: return 0.5 fake_keywords = ( "fake", "deepfake", "generated", "synthetic", "artificial", "ai", "label_1", "class_1", "1", ) real_keywords = ("real", "authentic", "human", "natural", "label_0", "class_0", "0") fake_best = 0.0 real_best = 0.0 for pred in preds: label = str(pred.get("label", "")).strip().lower() score = float(pred.get("score", 0.0)) if any(keyword in label for keyword in fake_keywords): fake_best = max(fake_best, score) if any(keyword in label for keyword in real_keywords): real_best = max(real_best, score) if fake_best == 0.0 and real_best == 0.0: top = preds[0] if preds else {} top_label = str(top.get("label", "")).strip().lower() top_score = float(top.get("score", 0.5)) if any(keyword in top_label for keyword in fake_keywords): return float(np.clip(top_score, 0.0, 1.0)) if any(keyword in top_label for keyword in real_keywords): return float(np.clip(1.0 - top_score, 0.0, 1.0)) return 0.5 if fake_best == 0.0: return float(np.clip(1.0 - real_best, 0.0, 1.0)) return float(np.clip(fake_best, 0.0, 1.0)) def _hf_generator_label(preds: list[dict], verdict: str) -> str: if verdict != "FAKE": return "real" labels = " ".join(str(pred.get("label", "")).lower() for pred in preds) for candidate in SUPPORTED_GENERATORS: if candidate == "real": continue if candidate.replace("_", " ") in labels or candidate in labels: return candidate return "unknown_gan" def _build_hf_response(preds: list[dict], elapsed_ms: float, media_type: str) -> DetectionResponse: fake_score = _hf_fake_score(preds) verdict = "FAKE" if fake_score > 0.5 else "REAL" confidence = fake_score if verdict == "FAKE" else (1.0 - fake_score) generator = _hf_generator_label(preds, verdict) top_label = str(preds[0].get("label", "unknown")) if preds else "unknown" explanation = ( f"Hugging Face serverless ({media_type}) top label: {top_label}. " f"Classified as {verdict} with {confidence:.0%} confidence." ) engine_result = EngineResult( engine="hf_serverless", verdict=verdict, confidence=confidence, attributed_generator=generator, explanation=explanation, processing_time_ms=elapsed_ms, ) return DetectionResponse( verdict=verdict, confidence=confidence, attributed_generator=generator, explanation=explanation, processing_time_ms=elapsed_ms, engine_breakdown=[engine_result], ) async def _hf_detect_image(data: bytes) -> DetectionResponse: t0 = time.monotonic() preds = await _hf.classify_image(data, timeout=45.0) elapsed_ms = (time.monotonic() - t0) * 1000 return _build_hf_response(preds, elapsed_ms, media_type="image") async def _hf_detect_video( data: bytes, *, content_type: str | None = None, filename: str | None = None, ) -> DetectionResponse: with tempfile.NamedTemporaryFile( suffix=_video_temp_suffix(content_type, filename), delete=False, ) as tmp: tmp.write(data) tmp_path = tmp.name try: try: frames = await asyncio.to_thread(extract_video_frames, tmp_path, MAX_FRAMES) except Exception as exc: raise HTTPException(status_code=422, detail=f"Video decode failed: {exc}") from exc finally: Path(tmp_path).unlink(missing_ok=True) if not frames: raise HTTPException(status_code=422, detail="Could not extract frames") keyframe = Image.fromarray(frames[0]) buf = io.BytesIO() keyframe.save(buf, format="JPEG") return await _hf_detect_image(buf.getvalue()) async def _ensure_models_loaded() -> None: if _is_test_mode(): return await asyncio.to_thread(_fp._ensure) await asyncio.to_thread(_co._ensure) await asyncio.to_thread(_st._ensure) @app.post("/detect/image", response_model=DetectionResponse) async def detect_image(file: UploadFile = File(...)) -> DetectionResponse: t0 = time.monotonic() content_type = (file.content_type or "").split(";")[0].strip().lower() if content_type not in IMAGE_TYPES: raise HTTPException(status_code=415, detail=f"Unsupported type: {file.content_type}") data = await file.read() if len(data) > MAX_IMAGE_MB * 1024 * 1024: raise HTTPException(status_code=413, detail="File too large") metadata_text = _collect_image_metadata_text(data) backend = get_inference_backend() if backend == "hf" and not _is_test_mode(): try: response = await _hf_detect_image(data) return _apply_metadata_keyword_signal( response, filename=file.filename, metadata_text=metadata_text, ) except HFInferenceUnavailable as exc: logger.warning("HF image route failed, trying RunPod fallback: %s", exc) except Exception as exc: logger.warning("HF image route unexpected error, trying RunPod fallback: %s", exc) if is_runpod_configured(): try: return await route_inference(data, "image") except Exception as exc: raise HTTPException( status_code=503, detail=f"Hugging Face and RunPod failed for image inference: {exc}", ) from exc raise HTTPException( status_code=503, detail="Hugging Face inference failed and RunPod is not configured.", ) if ( backend == "runpod" and not _is_test_mode() and is_runpod_configured() ): try: return await route_inference(data, "image") except Exception as exc: logger.warning("RunPod image route failed, falling back to local image inference: %s", exc) try: image = Image.open(io.BytesIO(data)).convert("RGB") except Exception as exc: raise HTTPException(status_code=422, detail=f"Could not decode image: {exc}") from exc await _ensure_models_loaded() fp, co, st = await asyncio.gather( asyncio.to_thread(_fp.run, image), asyncio.to_thread(_co.run, image), asyncio.to_thread(_st.run, image), ) elapsed_ms = (time.monotonic() - t0) * 1000 engine_results = _assign_processing_time([fp, co, st], elapsed_ms) verdict, conf, generator = fuse(engine_results, is_video=False) if _is_test_mode(): explanation = _fallback_explanation(verdict, conf, generator) else: explanation = await asyncio.to_thread(explain, verdict, conf, engine_results, generator) response = DetectionResponse( verdict=verdict, confidence=conf, attributed_generator=generator, explanation=explanation, processing_time_ms=elapsed_ms, engine_breakdown=engine_results, ) return _apply_metadata_keyword_signal( response, filename=file.filename, metadata_text=metadata_text, ) @app.post("/detect/video", response_model=DetectionResponse) async def detect_video(file: UploadFile = File(...)) -> DetectionResponse: t0 = time.monotonic() content_type = (file.content_type or "").split(";")[0].strip().lower() if content_type not in VIDEO_TYPES: raise HTTPException(status_code=415, detail=f"Unsupported type: {file.content_type}") data = await file.read() if len(data) > MAX_VIDEO_MB * 1024 * 1024: raise HTTPException(status_code=413, detail="File too large") metadata_text = _collect_video_metadata_text( data, content_type=file.content_type, filename=file.filename, ) backend = get_inference_backend() if backend == "hf" and not _is_test_mode(): try: response = await _hf_detect_video( data, content_type=file.content_type, filename=file.filename, ) return _apply_metadata_keyword_signal( response, filename=file.filename, metadata_text=metadata_text, ) except HFInferenceUnavailable as exc: logger.warning("HF video route failed, trying RunPod fallback: %s", exc) except Exception as exc: logger.warning("HF video route unexpected error, trying RunPod fallback: %s", exc) if is_runpod_configured(): try: return await route_inference(data, "video") except Exception as exc: raise HTTPException( status_code=503, detail=f"Hugging Face and RunPod failed for video inference: {exc}", ) from exc raise HTTPException( status_code=503, detail="Hugging Face inference failed and RunPod is not configured.", ) should_try_runpod = ( backend == "runpod" or (backend == "auto" and len(data) > 20 * 1024 * 1024) ) if should_try_runpod and not _is_test_mode() and is_runpod_configured(): try: return await route_inference(data, "video") except Exception as exc: logger.warning("RunPod route failed, falling back to local video inference: %s", exc) with tempfile.NamedTemporaryFile( suffix=_video_temp_suffix(file.content_type, file.filename), delete=False, ) as tmp: tmp.write(data) tmp_path = tmp.name try: try: frames = await asyncio.to_thread(extract_video_frames, tmp_path, MAX_FRAMES) except Exception as exc: raise HTTPException(status_code=422, detail=f"Video decode failed: {exc}") from exc finally: Path(tmp_path).unlink(missing_ok=True) if not frames: raise HTTPException(status_code=422, detail="Could not extract frames") await _ensure_models_loaded() try: fp, co, st = await asyncio.gather( asyncio.to_thread(_fp.run_video, frames), asyncio.to_thread(_co.run_video, frames), asyncio.to_thread(_st.run_video, frames), ) except Exception as exc: logger.exception("Video engine inference failed") raise HTTPException(status_code=503, detail=f"Video analysis failed: {exc}") from exc elapsed_ms = (time.monotonic() - t0) * 1000 engine_results = _assign_processing_time([fp, co, st], elapsed_ms) verdict, conf, generator = fuse(engine_results, is_video=True) if _is_test_mode(): explanation = _fallback_explanation(verdict, conf, generator) else: explanation = await asyncio.to_thread(explain, verdict, conf, engine_results, generator) response = DetectionResponse( verdict=verdict, confidence=conf, attributed_generator=generator, explanation=explanation, processing_time_ms=elapsed_ms, engine_breakdown=engine_results, ) return _apply_metadata_keyword_signal( response, filename=file.filename, metadata_text=metadata_text, )