| | """
|
| | FastAPI Main Application
|
| |
|
| | Production-ready API for multimodal misinformation detection.
|
| |
|
| | Features:
|
| | - Async endpoints
|
| | - Rate limiting
|
| | - Authentication
|
| | - Background task processing
|
| | - Comprehensive error handling
|
| | """
|
| |
|
| | from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, BackgroundTasks, Request
|
| | from fastapi.middleware.cors import CORSMiddleware
|
| | from fastapi.responses import JSONResponse
|
| | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
| | from pydantic import BaseModel, Field
|
| | from typing import Optional, List, Dict
|
| | import uvicorn
|
| | from datetime import datetime
|
| | import logging
|
| | import asyncio
|
| | from pathlib import Path
|
| | import tempfile
|
| | import os
|
| |
|
| |
|
| | import sys
|
| | sys.path.append(str(Path(__file__).parent.parent))
|
| |
|
| | from detection.deepfake_detector import DeepfakeDetector
|
| | from detection.ai_text_detector import AITextDetector
|
| | from detection.anomaly_detector import AnomalyDetector
|
| |
|
| |
|
| | logging.basicConfig(
|
| | level=logging.INFO,
|
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
| | )
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | app = FastAPI(
|
| | title="Multimodal Misinformation Detection API",
|
| | description="Production API for detecting deepfakes, AI-generated content, and coordinated campaigns",
|
| | version="1.0.0",
|
| | docs_url="/docs",
|
| | redoc_url="/redoc"
|
| | )
|
| |
|
| |
|
| | app.add_middleware(
|
| | CORSMiddleware,
|
| | allow_origins=["*"],
|
| | allow_credentials=True,
|
| | allow_methods=["*"],
|
| | allow_headers=["*"],
|
| | )
|
| |
|
| |
|
| | security = HTTPBearer()
|
| |
|
| |
|
| | _deepfake_detector = None
|
| | _ai_text_detector = None
|
| | _anomaly_detector = None
|
| |
|
| |
|
| | def get_deepfake_detector():
|
| | """Lazy load deepfake detector."""
|
| | global _deepfake_detector
|
| | if _deepfake_detector is None:
|
| | _deepfake_detector = DeepfakeDetector()
|
| | return _deepfake_detector
|
| |
|
| |
|
| | def get_ai_text_detector():
|
| | """Lazy load AI text detector."""
|
| | global _ai_text_detector
|
| | if _ai_text_detector is None:
|
| | _ai_text_detector = AITextDetector()
|
| | return _ai_text_detector
|
| |
|
| |
|
| | def get_anomaly_detector():
|
| | """Lazy load anomaly detector."""
|
| | global _anomaly_detector
|
| | if _anomaly_detector is None:
|
| | _anomaly_detector = AnomalyDetector()
|
| | return _anomaly_detector
|
| |
|
| |
|
| |
|
| | class TextAnalysisRequest(BaseModel):
|
| | text: str = Field(..., min_length=10, description="Text to analyze")
|
| | detailed: bool = Field(default=True, description="Return detailed analysis")
|
| |
|
| |
|
| | class TextAnalysisResponse(BaseModel):
|
| | verdict: str
|
| | confidence: float
|
| | perplexity: Optional[float] = None
|
| | explanation: str
|
| | timestamp: datetime
|
| | processing_time_ms: float
|
| |
|
| |
|
| | class ImageAnalysisResponse(BaseModel):
|
| | verdict: str
|
| | confidence: float
|
| | faces_analyzed: int
|
| | explanation: str
|
| | artifacts_detected: List[str]
|
| | timestamp: datetime
|
| | processing_time_ms: float
|
| |
|
| |
|
| | class HealthResponse(BaseModel):
|
| | status: str
|
| | version: str
|
| | timestamp: datetime
|
| | models_loaded: Dict[str, bool]
|
| |
|
| |
|
| |
|
| | @app.middleware("http")
|
| | async def add_process_time_header(request: Request, call_next):
|
| | """Add processing time and security headers to response."""
|
| | start_time = datetime.utcnow()
|
| | response = await call_next(request)
|
| | process_time = (datetime.utcnow() - start_time).total_seconds() * 1000
|
| | response.headers["X-Process-Time-Ms"] = str(process_time)
|
| |
|
| |
|
| | if request.url.path in ["/docs", "/redoc"] or request.url.path.startswith("/openapi"):
|
| | response.headers["Content-Security-Policy"] = (
|
| | "default-src 'self'; "
|
| | "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.jsdelivr.net; "
|
| | "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
|
| | "img-src 'self' data: https:; "
|
| | "font-src 'self' data: https://cdn.jsdelivr.net;"
|
| | )
|
| |
|
| | return response
|
| |
|
| |
|
| |
|
| | async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
| | """
|
| | Verify API token.
|
| | In production, implement proper JWT verification.
|
| | """
|
| | token = credentials.credentials
|
| |
|
| |
|
| | if token != os.getenv("API_TOKEN", "dev-token"):
|
| | raise HTTPException(
|
| | status_code=401,
|
| | detail="Invalid authentication credentials"
|
| | )
|
| | return token
|
| |
|
| |
|
| |
|
| |
|
| | @app.get("/", response_model=HealthResponse)
|
| | async def root():
|
| | """Root endpoint with API health status."""
|
| | return {
|
| | "status": "operational",
|
| | "version": "1.0.0",
|
| | "timestamp": datetime.utcnow(),
|
| | "models_loaded": {
|
| | "deepfake_detector": _deepfake_detector is not None,
|
| | "ai_text_detector": _ai_text_detector is not None,
|
| | "anomaly_detector": _anomaly_detector is not None
|
| | }
|
| | }
|
| |
|
| |
|
| | @app.get("/health")
|
| | async def health_check():
|
| | """Health check endpoint for monitoring."""
|
| | return {
|
| | "status": "healthy",
|
| | "timestamp": datetime.utcnow().isoformat()
|
| | }
|
| |
|
| |
|
| | @app.post("/api/v1/analyze/text", response_model=TextAnalysisResponse)
|
| | async def analyze_text(
|
| | request: TextAnalysisRequest,
|
| | background_tasks: BackgroundTasks,
|
| |
|
| | ):
|
| | """
|
| | Analyze text for AI generation.
|
| |
|
| | **Example Request:**
|
| | ```json
|
| | {
|
| | "text": "Your text here...",
|
| | "detailed": true
|
| | }
|
| | ```
|
| | """
|
| | start_time = datetime.utcnow()
|
| |
|
| | try:
|
| | detector = get_ai_text_detector()
|
| | result = detector.analyze_text(request.text, detailed=request.detailed)
|
| |
|
| | processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
|
| |
|
| |
|
| | background_tasks.add_task(
|
| | log_analysis,
|
| | "text",
|
| | result['verdict'],
|
| | processing_time
|
| | )
|
| |
|
| | return {
|
| | **result,
|
| | "timestamp": datetime.utcnow(),
|
| | "processing_time_ms": processing_time
|
| | }
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error analyzing text: {str(e)}")
|
| | raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
|
| |
|
| |
|
| | @app.post("/api/v1/analyze/image", response_model=ImageAnalysisResponse)
|
| | async def analyze_image(
|
| | file: UploadFile = File(...),
|
| | return_attention: bool = False,
|
| | background_tasks: BackgroundTasks = BackgroundTasks(),
|
| |
|
| | ):
|
| | """
|
| | Analyze image for deepfake artifacts.
|
| |
|
| | **Supported formats:** JPG, PNG, WebP
|
| | **Max size:** 10MB
|
| | """
|
| | start_time = datetime.utcnow()
|
| |
|
| |
|
| | if file.content_type not in ["image/jpeg", "image/png", "image/webp"]:
|
| | raise HTTPException(
|
| | status_code=400,
|
| | detail="Invalid file type. Supported: JPEG, PNG, WebP"
|
| | )
|
| |
|
| |
|
| | try:
|
| | with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp:
|
| | content = await file.read()
|
| | tmp.write(content)
|
| | tmp_path = tmp.name
|
| |
|
| |
|
| | detector = get_deepfake_detector()
|
| | result = detector.analyze_image(tmp_path, return_attention=return_attention)
|
| |
|
| | processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
|
| |
|
| |
|
| | os.unlink(tmp_path)
|
| |
|
| |
|
| | background_tasks.add_task(
|
| | log_analysis,
|
| | "image",
|
| | result['verdict'],
|
| | processing_time
|
| | )
|
| |
|
| | return {
|
| | **result,
|
| | "timestamp": datetime.utcnow(),
|
| | "processing_time_ms": processing_time
|
| | }
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error analyzing image: {str(e)}")
|
| |
|
| | if 'tmp_path' in locals():
|
| | try:
|
| | os.unlink(tmp_path)
|
| | except:
|
| | pass
|
| | raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
|
| |
|
| |
|
| | @app.post("/api/v1/analyze/video")
|
| | async def analyze_video(
|
| | file: UploadFile = File(...),
|
| | sample_rate: int = 5,
|
| | max_frames: int = 100,
|
| | background_tasks: BackgroundTasks = BackgroundTasks(),
|
| |
|
| | ):
|
| | """
|
| | Analyze video for deepfake artifacts.
|
| |
|
| | **Supported formats:** MP4, AVI, MOV
|
| | **Max size:** 100MB
|
| | **Processing:** Async with job ID returned immediately
|
| | """
|
| | start_time = datetime.utcnow()
|
| |
|
| |
|
| | if file.content_type not in ["video/mp4", "video/avi", "video/quicktime"]:
|
| | raise HTTPException(
|
| | status_code=400,
|
| | detail="Invalid file type. Supported: MP4, AVI, MOV"
|
| | )
|
| |
|
| | try:
|
| |
|
| | with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp:
|
| | content = await file.read()
|
| | tmp.write(content)
|
| | tmp_path = tmp.name
|
| |
|
| |
|
| |
|
| | detector = get_deepfake_detector()
|
| | result = detector.analyze_video(
|
| | tmp_path,
|
| | sample_rate=sample_rate,
|
| | max_frames=max_frames
|
| | )
|
| |
|
| | processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
|
| |
|
| |
|
| | os.unlink(tmp_path)
|
| |
|
| |
|
| | background_tasks.add_task(
|
| | log_analysis,
|
| | "video",
|
| | result['verdict'],
|
| | processing_time
|
| | )
|
| |
|
| | return {
|
| | **result,
|
| | "timestamp": datetime.utcnow(),
|
| | "processing_time_ms": processing_time
|
| | }
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error analyzing video: {str(e)}")
|
| | if 'tmp_path' in locals():
|
| | try:
|
| | os.unlink(tmp_path)
|
| | except:
|
| | pass
|
| | raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
|
| |
|
| |
|
| | @app.post("/api/v1/batch/text")
|
| | async def batch_analyze_text(
|
| | texts: List[str],
|
| | background_tasks: BackgroundTasks,
|
| |
|
| | ):
|
| | """
|
| | Batch analyze multiple texts.
|
| |
|
| | **Limit:** 100 texts per request
|
| | """
|
| | if len(texts) > 100:
|
| | raise HTTPException(
|
| | status_code=400,
|
| | detail="Maximum 100 texts per batch"
|
| | )
|
| |
|
| | start_time = datetime.utcnow()
|
| |
|
| | try:
|
| | detector = get_ai_text_detector()
|
| | results = detector.batch_analyze(texts)
|
| |
|
| | processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
|
| |
|
| | return {
|
| | "results": results,
|
| | "total_analyzed": len(texts),
|
| | "timestamp": datetime.utcnow(),
|
| | "processing_time_ms": processing_time
|
| | }
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error in batch analysis: {str(e)}")
|
| | raise HTTPException(status_code=500, detail=f"Batch analysis failed: {str(e)}")
|
| |
|
| |
|
| |
|
| | async def log_analysis(modality: str, verdict: str, processing_time: float):
|
| | """Log analysis for monitoring and analytics."""
|
| | logger.info(
|
| | f"Analysis completed - Modality: {modality}, "
|
| | f"Verdict: {verdict}, Time: {processing_time:.2f}ms"
|
| | )
|
| |
|
| |
|
| |
|
| |
|
| | @app.exception_handler(HTTPException)
|
| | async def http_exception_handler(request: Request, exc: HTTPException):
|
| | """Custom HTTP exception handler."""
|
| | return JSONResponse(
|
| | status_code=exc.status_code,
|
| | content={
|
| | "error": exc.detail,
|
| | "timestamp": datetime.utcnow().isoformat()
|
| | }
|
| | )
|
| |
|
| |
|
| | @app.exception_handler(Exception)
|
| | async def general_exception_handler(request: Request, exc: Exception):
|
| | """General exception handler."""
|
| | logger.error(f"Unhandled exception: {str(exc)}")
|
| | return JSONResponse(
|
| | status_code=500,
|
| | content={
|
| | "error": "Internal server error",
|
| | "timestamp": datetime.utcnow().isoformat()
|
| | }
|
| | )
|
| |
|
| |
|
| |
|
| | @app.on_event("startup")
|
| | async def startup_event():
|
| | """Initialize on startup."""
|
| | logger.info("π Starting Multimodal Misinformation Detection API")
|
| | logger.info("π API Documentation: http://localhost:8000/docs")
|
| |
|
| |
|
| | @app.on_event("shutdown")
|
| | async def shutdown_event():
|
| | """Cleanup on shutdown."""
|
| | logger.info("π Shutting down API")
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | uvicorn.run(
|
| | "main:app",
|
| | host="0.0.0.0",
|
| | port=8000,
|
| | reload=True,
|
| | log_level="info"
|
| | )
|
| |
|