""" Deepfake Hunter - REST API Server FastAPI-based REST API for deepfake detection. Features: - POST /analyze/image - Analyze single image - POST /analyze/video - Analyze video file - POST /analyze/stream - Real-time stream analysis - GET /health - Health check - Rate limiting and API key authentication Author: Deepfake Hunter Team License: MIT """ import warnings warnings.filterwarnings('ignore') from typing import Optional, List, Dict, Any from pathlib import Path import tempfile import time import secrets import hashlib from datetime import datetime, timedelta from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header, status, BackgroundTasks from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import APIKeyHeader from pydantic import BaseSettings, BaseModel, Field import uvicorn from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from loguru import logger import numpy as np import cv2 from deepfake_detector import DeepfakeDetector, DetectionContext, DetectionResult from video_analyzer import VideoAnalyzer, VideoDetectionResult # Configuration class Settings(BaseSettings): """API Configuration""" app_name: str = "Deepfake Hunter API" app_version: str = "1.0.0" api_key: str = "your-secret-api-key" # Change in production! max_image_size_mb: int = 10 max_video_size_mb: int = 500 rate_limit_per_minute: int = 60 enable_cors: bool = True cors_origins: List[str] = ["*"] use_gpu: bool = True class Config: env_file = ".env" env_prefix = "DEEPFAKE_API_" settings = Settings() # Initialize FastAPI app app = FastAPI( title=settings.app_name, version=settings.app_version, description="Real-time deepfake detection API with multi-modal analysis", docs_url="/docs", redoc_url="/redoc" ) # CORS middleware if settings.enable_cors: app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Rate limiting limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # API Key authentication api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) def verify_api_key(api_key: str = Depends(api_key_header)) -> str: """Verify API key""" if not api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="API key required" ) if api_key != settings.api_key: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid API key" ) return api_key # Initialize detector and analyzer (singleton) logger.info("Initializing Deepfake Hunter API...") detector = DeepfakeDetector(use_gpu=settings.use_gpu, detection_sensitivity="medium") video_analyzer = VideoAnalyzer(detector, sample_rate=2) logger.info("API initialization complete!") # Request/Response Models class HealthResponse(BaseModel): """Health check response""" status: str = "healthy" version: str = settings.app_version timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) gpu_available: bool = False models_loaded: bool = True class ImageAnalysisRequest(BaseModel): """Image analysis request parameters""" context: str = Field(default="general", description="Detection context") class ImageAnalysisResponse(BaseModel): """Image analysis response""" is_deepfake: bool confidence: float scores: Dict[str, Optional[float]] explanation: str regions: List[Dict[str, Any]] processing_time_ms: float context: str severity: str class VideoAnalysisResponse(BaseModel): """Video analysis response""" is_deepfake: bool confidence: float frame_scores: List[tuple] suspect_frames: List[int] fps: float total_frames: int processed_frames: int processing_time_ms: float avg_frame_score: float max_frame_score: float temporal_consistency_score: float physiological_score: Optional[float] # API Endpoints @app.get("/", tags=["Root"]) async def root(): """API root endpoint""" return { "message": "Deepfake Hunter API", "version": settings.app_version, "docs": "/docs", "health": "/health" } @app.get("/health", response_model=HealthResponse, tags=["Health"]) async def health_check(): """ Health check endpoint Returns API status and configuration information. """ import torch return HealthResponse( status="healthy", version=settings.app_version, gpu_available=torch.cuda.is_available(), models_loaded=detector is not None ) @app.post("/analyze/image", response_model=ImageAnalysisResponse, tags=["Analysis"], dependencies=[Depends(verify_api_key)]) @limiter.limit(f"{settings.rate_limit_per_minute}/minute") async def analyze_image( request, file: UploadFile = File(...), context: str = "general" ): """ Analyze image for deepfakes Args: file: Image file (JPEG, PNG) context: Detection context (general, political, news, etc.) Returns: Detection results with confidence scores and explanations Raises: 400: Invalid file format or size 500: Processing error """ try: # Validate file size file_size = 0 contents = await file.read() file_size = len(contents) / (1024 * 1024) # MB if file_size > settings.max_image_size_mb: raise HTTPException( status_code=400, detail=f"File too large. Max size: {settings.max_image_size_mb}MB" ) # Validate file type if not file.content_type.startswith('image/'): raise HTTPException( status_code=400, detail="Invalid file type. Only images are supported." ) # Read image nparr = np.frombuffer(contents, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: raise HTTPException( status_code=400, detail="Failed to decode image" ) # Convert BGR to RGB image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Map context string to enum context_map = { "general": DetectionContext.GENERAL, "political": DetectionContext.POLITICAL, "news": DetectionContext.NEWS, "legal": DetectionContext.LEGAL, "entertainment": DetectionContext.ENTERTAINMENT, "satire": DetectionContext.SATIRE } detection_context = context_map.get(context.lower(), DetectionContext.GENERAL) # Run detection result = detector.detect_image(image, context=detection_context) # Return response return ImageAnalysisResponse( is_deepfake=result.is_deepfake, confidence=result.confidence, scores=result.scores, explanation=result.explanation, regions=result.regions, processing_time_ms=result.processing_time_ms, context=result.context.value, severity=result.severity ) except HTTPException: raise except Exception as e: logger.error(f"Image analysis failed: {e}") raise HTTPException( status_code=500, detail=f"Analysis failed: {str(e)}" ) @app.post("/analyze/video", response_model=VideoAnalysisResponse, tags=["Analysis"], dependencies=[Depends(verify_api_key)]) @limiter.limit("10/minute") # Lower limit for videos async def analyze_video( request, background_tasks: BackgroundTasks, file: UploadFile = File(...), context: str = "general", sample_rate: int = 2 ): """ Analyze video for deepfakes Args: file: Video file (MP4, AVI, MOV) context: Detection context sample_rate: Process every Nth frame (higher = faster but less accurate) Returns: Detection results with frame-by-frame analysis Raises: 400: Invalid file format or size 500: Processing error """ temp_file = None try: # Validate file size contents = await file.read() file_size = len(contents) / (1024 * 1024) # MB if file_size > settings.max_video_size_mb: raise HTTPException( status_code=400, detail=f"File too large. Max size: {settings.max_video_size_mb}MB" ) # Validate file type if not file.content_type.startswith('video/'): raise HTTPException( status_code=400, detail="Invalid file type. Only videos are supported." ) # Save to temp file with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file: temp_file.write(contents) temp_path = temp_file.name # Map context context_map = { "general": DetectionContext.GENERAL, "political": DetectionContext.POLITICAL, "news": DetectionContext.NEWS, "legal": DetectionContext.LEGAL, "entertainment": DetectionContext.ENTERTAINMENT, "satire": DetectionContext.SATIRE } detection_context = context_map.get(context.lower(), DetectionContext.GENERAL) # Update sample rate original_sample_rate = video_analyzer.sample_rate video_analyzer.sample_rate = sample_rate # Analyze video result = video_analyzer.analyze_video( temp_path, export_timeline=False, export_report=False, context=detection_context ) # Restore sample rate video_analyzer.sample_rate = original_sample_rate # Schedule cleanup def cleanup(): try: Path(temp_path).unlink() except: pass background_tasks.add_task(cleanup) # Return response return VideoAnalysisResponse( is_deepfake=result.is_deepfake, confidence=result.confidence, frame_scores=result.frame_scores, suspect_frames=result.suspect_frames, fps=result.fps, total_frames=result.total_frames, processed_frames=result.processed_frames, processing_time_ms=result.processing_time_ms, avg_frame_score=result.avg_frame_score, max_frame_score=result.max_frame_score, temporal_consistency_score=result.temporal_consistency_score, physiological_score=result.physiological_score ) except HTTPException: raise except Exception as e: logger.error(f"Video analysis failed: {e}") raise HTTPException( status_code=500, detail=f"Analysis failed: {str(e)}" ) finally: # Cleanup temp file if temp_file and Path(temp_file.name).exists(): try: Path(temp_file.name).unlink() except: pass @app.post("/analyze/stream", tags=["Analysis"], dependencies=[Depends(verify_api_key)]) async def analyze_stream( stream_url: str, duration_seconds: float = 30.0 ): """ Analyze live video stream Args: stream_url: Stream URL (RTSP, HTTP) or webcam index duration_seconds: Duration to analyze Returns: Detection results from stream analysis Note: This endpoint is planned for v1.1 """ raise HTTPException( status_code=501, detail="Stream analysis not yet implemented. Coming in v1.1" ) @app.get("/models/info", tags=["Models"], dependencies=[Depends(verify_api_key)]) async def get_models_info(): """ Get information about loaded models Returns: Model versions and configuration """ return { "models": detector.model_versions, "device": detector.device, "sensitivity": detector.sensitivity.value, "thresholds": detector.thresholds } @app.post("/models/configure", tags=["Models"], dependencies=[Depends(verify_api_key)]) async def configure_models( sensitivity: Optional[str] = None, thresholds: Optional[Dict[str, float]] = None ): """ Configure detection models Args: sensitivity: Detection sensitivity (low, medium, high) thresholds: Custom detection thresholds Returns: Updated configuration """ try: if sensitivity: from deepfake_detector import DetectionSensitivity detector.sensitivity = DetectionSensitivity(sensitivity) detector.thresholds = detector._get_thresholds() if thresholds: detector.set_thresholds(thresholds) return { "sensitivity": detector.sensitivity.value, "thresholds": detector.thresholds, "message": "Configuration updated successfully" } except Exception as e: raise HTTPException( status_code=400, detail=f"Configuration failed: {str(e)}" ) # Error handlers @app.exception_handler(Exception) async def general_exception_handler(request, exc): """Handle unexpected errors""" logger.error(f"Unexpected error: {exc}") return JSONResponse( status_code=500, content={ "detail": "Internal server error", "type": type(exc).__name__ } ) # Startup/Shutdown events @app.on_event("startup") async def startup_event(): """Initialize services on startup""" logger.info(f"{settings.app_name} v{settings.app_version} starting...") logger.info(f"GPU available: {detector.device == 'cuda'}") logger.info(f"API key authentication: {'enabled' if settings.api_key else 'disabled'}") @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" logger.info(f"{settings.app_name} shutting down...") # Run server if __name__ == "__main__": uvicorn.run( "api_server:app", host="0.0.0.0", port=8001, reload=False, workers=1, # Use 1 worker for GPU (avoid memory issues) log_level="info" )