Spaces:
Sleeping
Sleeping
| """ | |
| Deepfake Hunter - REST API Server | |
| FastAPI-based REST API for deepfake detection. | |
| Features: | |
| - POST /analyze/image - Analyze single image | |
| - POST /analyze/video - Analyze video file | |
| - POST /analyze/stream - Real-time stream analysis | |
| - GET /health - Health check | |
| - Rate limiting and API key authentication | |
| Author: Deepfake Hunter Team | |
| License: MIT | |
| """ | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| from typing import Optional, List, Dict, Any | |
| from pathlib import Path | |
| import tempfile | |
| import time | |
| import secrets | |
| import hashlib | |
| from datetime import datetime, timedelta | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Header, status, BackgroundTasks | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import APIKeyHeader | |
| from pydantic import BaseSettings, BaseModel, Field | |
| import uvicorn | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.util import get_remote_address | |
| from slowapi.errors import RateLimitExceeded | |
| from loguru import logger | |
| import numpy as np | |
| import cv2 | |
| from deepfake_detector import DeepfakeDetector, DetectionContext, DetectionResult | |
| from video_analyzer import VideoAnalyzer, VideoDetectionResult | |
| # Configuration | |
| class Settings(BaseSettings): | |
| """API Configuration""" | |
| app_name: str = "Deepfake Hunter API" | |
| app_version: str = "1.0.0" | |
| api_key: str = "your-secret-api-key" # Change in production! | |
| max_image_size_mb: int = 10 | |
| max_video_size_mb: int = 500 | |
| rate_limit_per_minute: int = 60 | |
| enable_cors: bool = True | |
| cors_origins: List[str] = ["*"] | |
| use_gpu: bool = True | |
| class Config: | |
| env_file = ".env" | |
| env_prefix = "DEEPFAKE_API_" | |
| settings = Settings() | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title=settings.app_name, | |
| version=settings.app_version, | |
| description="Real-time deepfake detection API with multi-modal analysis", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| # CORS middleware | |
| if settings.enable_cors: | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=settings.cors_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Rate limiting | |
| limiter = Limiter(key_func=get_remote_address) | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| # API Key authentication | |
| api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) | |
| def verify_api_key(api_key: str = Depends(api_key_header)) -> str: | |
| """Verify API key""" | |
| if not api_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="API key required" | |
| ) | |
| if api_key != settings.api_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Invalid API key" | |
| ) | |
| return api_key | |
| # Initialize detector and analyzer (singleton) | |
| logger.info("Initializing Deepfake Hunter API...") | |
| detector = DeepfakeDetector(use_gpu=settings.use_gpu, detection_sensitivity="medium") | |
| video_analyzer = VideoAnalyzer(detector, sample_rate=2) | |
| logger.info("API initialization complete!") | |
| # Request/Response Models | |
| class HealthResponse(BaseModel): | |
| """Health check response""" | |
| status: str = "healthy" | |
| version: str = settings.app_version | |
| timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) | |
| gpu_available: bool = False | |
| models_loaded: bool = True | |
| class ImageAnalysisRequest(BaseModel): | |
| """Image analysis request parameters""" | |
| context: str = Field(default="general", description="Detection context") | |
| class ImageAnalysisResponse(BaseModel): | |
| """Image analysis response""" | |
| is_deepfake: bool | |
| confidence: float | |
| scores: Dict[str, Optional[float]] | |
| explanation: str | |
| regions: List[Dict[str, Any]] | |
| processing_time_ms: float | |
| context: str | |
| severity: str | |
| class VideoAnalysisResponse(BaseModel): | |
| """Video analysis response""" | |
| is_deepfake: bool | |
| confidence: float | |
| frame_scores: List[tuple] | |
| suspect_frames: List[int] | |
| fps: float | |
| total_frames: int | |
| processed_frames: int | |
| processing_time_ms: float | |
| avg_frame_score: float | |
| max_frame_score: float | |
| temporal_consistency_score: float | |
| physiological_score: Optional[float] | |
| # API Endpoints | |
| async def root(): | |
| """API root endpoint""" | |
| return { | |
| "message": "Deepfake Hunter API", | |
| "version": settings.app_version, | |
| "docs": "/docs", | |
| "health": "/health" | |
| } | |
| async def health_check(): | |
| """ | |
| Health check endpoint | |
| Returns API status and configuration information. | |
| """ | |
| import torch | |
| return HealthResponse( | |
| status="healthy", | |
| version=settings.app_version, | |
| gpu_available=torch.cuda.is_available(), | |
| models_loaded=detector is not None | |
| ) | |
| async def analyze_image( | |
| request, | |
| file: UploadFile = File(...), | |
| context: str = "general" | |
| ): | |
| """ | |
| Analyze image for deepfakes | |
| Args: | |
| file: Image file (JPEG, PNG) | |
| context: Detection context (general, political, news, etc.) | |
| Returns: | |
| Detection results with confidence scores and explanations | |
| Raises: | |
| 400: Invalid file format or size | |
| 500: Processing error | |
| """ | |
| try: | |
| # Validate file size | |
| file_size = 0 | |
| contents = await file.read() | |
| file_size = len(contents) / (1024 * 1024) # MB | |
| if file_size > settings.max_image_size_mb: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"File too large. Max size: {settings.max_image_size_mb}MB" | |
| ) | |
| # Validate file type | |
| if not file.content_type.startswith('image/'): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid file type. Only images are supported." | |
| ) | |
| # Read image | |
| nparr = np.frombuffer(contents, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Failed to decode image" | |
| ) | |
| # Convert BGR to RGB | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # Map context string to enum | |
| context_map = { | |
| "general": DetectionContext.GENERAL, | |
| "political": DetectionContext.POLITICAL, | |
| "news": DetectionContext.NEWS, | |
| "legal": DetectionContext.LEGAL, | |
| "entertainment": DetectionContext.ENTERTAINMENT, | |
| "satire": DetectionContext.SATIRE | |
| } | |
| detection_context = context_map.get(context.lower(), DetectionContext.GENERAL) | |
| # Run detection | |
| result = detector.detect_image(image, context=detection_context) | |
| # Return response | |
| return ImageAnalysisResponse( | |
| is_deepfake=result.is_deepfake, | |
| confidence=result.confidence, | |
| scores=result.scores, | |
| explanation=result.explanation, | |
| regions=result.regions, | |
| processing_time_ms=result.processing_time_ms, | |
| context=result.context.value, | |
| severity=result.severity | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Image analysis failed: {e}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Analysis failed: {str(e)}" | |
| ) | |
| # Lower limit for videos | |
| async def analyze_video( | |
| request, | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| context: str = "general", | |
| sample_rate: int = 2 | |
| ): | |
| """ | |
| Analyze video for deepfakes | |
| Args: | |
| file: Video file (MP4, AVI, MOV) | |
| context: Detection context | |
| sample_rate: Process every Nth frame (higher = faster but less accurate) | |
| Returns: | |
| Detection results with frame-by-frame analysis | |
| Raises: | |
| 400: Invalid file format or size | |
| 500: Processing error | |
| """ | |
| temp_file = None | |
| try: | |
| # Validate file size | |
| contents = await file.read() | |
| file_size = len(contents) / (1024 * 1024) # MB | |
| if file_size > settings.max_video_size_mb: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"File too large. Max size: {settings.max_video_size_mb}MB" | |
| ) | |
| # Validate file type | |
| if not file.content_type.startswith('video/'): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid file type. Only videos are supported." | |
| ) | |
| # Save to temp file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file: | |
| temp_file.write(contents) | |
| temp_path = temp_file.name | |
| # Map context | |
| context_map = { | |
| "general": DetectionContext.GENERAL, | |
| "political": DetectionContext.POLITICAL, | |
| "news": DetectionContext.NEWS, | |
| "legal": DetectionContext.LEGAL, | |
| "entertainment": DetectionContext.ENTERTAINMENT, | |
| "satire": DetectionContext.SATIRE | |
| } | |
| detection_context = context_map.get(context.lower(), DetectionContext.GENERAL) | |
| # Update sample rate | |
| original_sample_rate = video_analyzer.sample_rate | |
| video_analyzer.sample_rate = sample_rate | |
| # Analyze video | |
| result = video_analyzer.analyze_video( | |
| temp_path, | |
| export_timeline=False, | |
| export_report=False, | |
| context=detection_context | |
| ) | |
| # Restore sample rate | |
| video_analyzer.sample_rate = original_sample_rate | |
| # Schedule cleanup | |
| def cleanup(): | |
| try: | |
| Path(temp_path).unlink() | |
| except: | |
| pass | |
| background_tasks.add_task(cleanup) | |
| # Return response | |
| return VideoAnalysisResponse( | |
| is_deepfake=result.is_deepfake, | |
| confidence=result.confidence, | |
| frame_scores=result.frame_scores, | |
| suspect_frames=result.suspect_frames, | |
| fps=result.fps, | |
| total_frames=result.total_frames, | |
| processed_frames=result.processed_frames, | |
| processing_time_ms=result.processing_time_ms, | |
| avg_frame_score=result.avg_frame_score, | |
| max_frame_score=result.max_frame_score, | |
| temporal_consistency_score=result.temporal_consistency_score, | |
| physiological_score=result.physiological_score | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Video analysis failed: {e}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Analysis failed: {str(e)}" | |
| ) | |
| finally: | |
| # Cleanup temp file | |
| if temp_file and Path(temp_file.name).exists(): | |
| try: | |
| Path(temp_file.name).unlink() | |
| except: | |
| pass | |
| async def analyze_stream( | |
| stream_url: str, | |
| duration_seconds: float = 30.0 | |
| ): | |
| """ | |
| Analyze live video stream | |
| Args: | |
| stream_url: Stream URL (RTSP, HTTP) or webcam index | |
| duration_seconds: Duration to analyze | |
| Returns: | |
| Detection results from stream analysis | |
| Note: This endpoint is planned for v1.1 | |
| """ | |
| raise HTTPException( | |
| status_code=501, | |
| detail="Stream analysis not yet implemented. Coming in v1.1" | |
| ) | |
| async def get_models_info(): | |
| """ | |
| Get information about loaded models | |
| Returns: | |
| Model versions and configuration | |
| """ | |
| return { | |
| "models": detector.model_versions, | |
| "device": detector.device, | |
| "sensitivity": detector.sensitivity.value, | |
| "thresholds": detector.thresholds | |
| } | |
| async def configure_models( | |
| sensitivity: Optional[str] = None, | |
| thresholds: Optional[Dict[str, float]] = None | |
| ): | |
| """ | |
| Configure detection models | |
| Args: | |
| sensitivity: Detection sensitivity (low, medium, high) | |
| thresholds: Custom detection thresholds | |
| Returns: | |
| Updated configuration | |
| """ | |
| try: | |
| if sensitivity: | |
| from deepfake_detector import DetectionSensitivity | |
| detector.sensitivity = DetectionSensitivity(sensitivity) | |
| detector.thresholds = detector._get_thresholds() | |
| if thresholds: | |
| detector.set_thresholds(thresholds) | |
| return { | |
| "sensitivity": detector.sensitivity.value, | |
| "thresholds": detector.thresholds, | |
| "message": "Configuration updated successfully" | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Configuration failed: {str(e)}" | |
| ) | |
| # Error handlers | |
| async def general_exception_handler(request, exc): | |
| """Handle unexpected errors""" | |
| logger.error(f"Unexpected error: {exc}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "detail": "Internal server error", | |
| "type": type(exc).__name__ | |
| } | |
| ) | |
| # Startup/Shutdown events | |
| async def startup_event(): | |
| """Initialize services on startup""" | |
| logger.info(f"{settings.app_name} v{settings.app_version} starting...") | |
| logger.info(f"GPU available: {detector.device == 'cuda'}") | |
| logger.info(f"API key authentication: {'enabled' if settings.api_key else 'disabled'}") | |
| async def shutdown_event(): | |
| """Cleanup on shutdown""" | |
| logger.info(f"{settings.app_name} shutting down...") | |
| # Run server | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "api_server:app", | |
| host="0.0.0.0", | |
| port=8001, | |
| reload=False, | |
| workers=1, # Use 1 worker for GPU (avoid memory issues) | |
| log_level="info" | |
| ) | |