from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import cv2 import numpy as np import aiofiles from pathlib import Path import uuid import logging from gunicorn.app.base import BaseApplication from sequential_moderation import SmartSequentialModerator # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app = FastAPI( title="Content Detection API", description="Simple API for detecting inappropriate content", version="2.0.0" ) # Add CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configuration UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) MAX_IMAGE_SIZE = 50 * 1024 * 1024 # 50MB MAX_VIDEO_SIZE = 500 * 1024 * 1024 # 500MB # Global moderator moderator = None class StandaloneApplication(BaseApplication): def __init__(self, app, options=None): self.options = options or {} self.application = app super().__init__() def load_config(self): config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} for key, value in config.items(): self.cfg.set(key.lower(), value) def load(self): return self.application # ============== Response Model ============== class DetectionResponse(BaseModel): """Simple response with counts and safety status""" nude: int = 0 gun: int = 0 knife: int = 0 fight: int = 0 is_safe: bool = True # ============== Startup ============== @app.on_event("startup") async def startup_event(): global moderator try: logger.info("🚀 Initializing Smart Sequential Moderator...") moderator = SmartSequentialModerator() logger.info("✅ Ready to process requests") logger.info("📋 Pipeline: NSFW (0.75) → Weapons/Fights") except Exception as e: logger.error(f"Failed to initialize: {e}") moderator = None # ============== API Endpoints ============== @app.post("/detect/image", response_model=DetectionResponse) async def detect_image(file: UploadFile = File(...)): """ Detect inappropriate content in image Sequential processing: 1. NSFW check (threshold: 0.75) 2. If NSFW detected → stop and return 3. If clean → check weapons & fights Returns counts and safety status """ if moderator is None: raise HTTPException(status_code=503, detail="Service not ready") try: # Validate extension allowed = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'} ext = Path(file.filename).suffix.lower() if ext not in allowed: raise HTTPException(400, f"Invalid type. Allowed: {allowed}") # Read file content = await file.read() # Check size if len(content) > MAX_IMAGE_SIZE: raise HTTPException(400, f"File too large (max {MAX_IMAGE_SIZE // 1024 // 1024}MB)") # Decode image nparr = np.frombuffer(content, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: raise HTTPException(400, "Cannot decode image") # Process logger.info(f"Processing image: {file.filename}") result = moderator.process_image(image) # Return return DetectionResponse( nude=result.nude_count, gun=result.gun_count, knife=result.knife_count, fight=result.fight_count, is_safe=result.is_safe ) except HTTPException: raise except Exception as e: logger.error(f"Error: {e}") raise HTTPException(500, str(e)) @app.post("/detect/video", response_model=DetectionResponse) async def detect_video(file: UploadFile = File(...)): """ Detect inappropriate content in video Features: - AUTO frame skipping based on duration - Early stop after 3 NSFW detections - Sequential processing per frame Returns total counts and safety status """ if moderator is None: raise HTTPException(status_code=503, detail="Service not ready") video_path = None try: # Validate extension allowed = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv'} ext = Path(file.filename).suffix.lower() if ext not in allowed: raise HTTPException(400, f"Invalid type. Allowed: {allowed}") # Save temporarily video_id = f"vid_{uuid.uuid4().hex[:8]}" video_path = UPLOAD_DIR / f"{video_id}{ext}" async with aiofiles.open(video_path, 'wb') as f: content = await file.read() await f.write(content) # Check size size = video_path.stat().st_size if size > MAX_VIDEO_SIZE: video_path.unlink() raise HTTPException(400, f"File too large (max {MAX_VIDEO_SIZE // 1024 // 1024}MB)") # Process with auto settings logger.info(f"Processing video: {file.filename} ({size // 1024 // 1024}MB)") result = moderator.process_video(str(video_path)) # Clean up try: video_path.unlink() except: pass # Return return DetectionResponse( nude=result['nude'], gun=result['gun'], knife=result['knife'], fight=result['fight'], is_safe=result['is_safe'] ) except HTTPException: raise except Exception as e: logger.error(f"Error: {e}") # Clean up on error if video_path and video_path.exists(): try: video_path.unlink() except: pass raise HTTPException(500, str(e)) if __name__ == "__main__": import uvicorn import os port = int(os.environ.get("PORT", 7860)) uvicorn.run("detection_api:app", host="0.0.0.0", port=port, reload=False)