from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Form from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any, Union import cv2 import numpy as np from datetime import datetime import aiofiles import json from pathlib import Path import uuid import traceback from concurrent.futures import ThreadPoolExecutor import logging import hashlib import time from functools import lru_cache from gunicorn.app.base import BaseApplication from main import ContentModerator # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI( title="Weapon & NSFW Detection API", description="API for detecting knives/dao, guns, fights and NSFW content in images and videos", version="2.0.0", docs_url="/docs", redoc_url="/redoc" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configuration optimized for CPU class Config: UPLOAD_DIR = Path("uploads") RESULTS_DIR = Path("results") PROCESSED_DIR = Path("processed") MAX_IMAGE_SIZE = 50 * 1024 * 1024 # 50MB for images MAX_VIDEO_SIZE = 500 * 1024 * 1024 # 500MB for videos ALLOWED_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'} ALLOWED_VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv'} # CPU-optimized settings VIDEO_FRAME_SKIP = 10 # Process every 10th frame by default VIDEO_MAX_FRAMES = 100 # Maximum frames to process VIDEO_TARGET_WIDTH = 416 # Downscale to this width VIDEO_EARLY_STOP_THRESHOLD = 10 # Stop after N threats CLEANUP_AFTER_HOURS = 24 ENABLE_ANNOTATED_OUTPUT = False # Disable to save CPU MAX_WORKERS = 2 # Reduced for CPU config = Config() # Create necessary directories for directory in [config.UPLOAD_DIR, config.RESULTS_DIR, config.PROCESSED_DIR]: directory.mkdir(exist_ok=True) (directory / "images").mkdir(exist_ok=True) (directory / "videos").mkdir(exist_ok=True) # Global moderator instance moderator: Optional[ContentModerator] = None # Thread pool for background processing executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) # Video Optimizer Class class VideoOptimizer: """Optimized video processing for CPU environments""" def StandaloneApplication(app, options=None): """Hàm tạo Gunicorn Application từ FastAPI app""" from gunicorn.app.base import BaseApplication class _App(BaseApplication): def __init__(self, app, options=None): self.options = options or {} self.application = app super().__init__() def load_config(self): config = { key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None } for key, value in config.items(): self.cfg.set(key.lower(), value) def load(self): return self.application return _App(app, options) def __init__(self): self.frame_cache = {} self.cache_size = 20 def get_optimal_settings(self, duration: float, total_frames: int) -> Dict: """Calculate optimal settings based on video duration""" if duration <= 5: return { 'frame_skip': 3, 'target_width': 416, 'max_frames': 50 } elif duration <= 15: return { 'frame_skip': 8, 'target_width': 416, 'max_frames': 75 } elif duration <= 30: return { 'frame_skip': 12, 'target_width': 320, 'max_frames': 100 } else: return { 'frame_skip': 20, 'target_width': 320, 'max_frames': 150 } def preprocess_frame(self, frame: np.ndarray, target_width: int = 416) -> np.ndarray: """Downscale frame for faster processing""" height, width = frame.shape[:2] if width > target_width: scale = target_width / width new_width = int(width * scale) new_height = int(height * scale) frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR) return frame def get_frame_hash(self, frame: np.ndarray) -> str: """Generate hash for frame""" small = cv2.resize(frame, (8, 8)) return hashlib.md5(small.tobytes()).hexdigest() def should_skip_frame(self, frame: np.ndarray) -> bool: """Check if frame is similar to cached frames""" frame_hash = self.get_frame_hash(frame) if frame_hash in self.frame_cache: return True # Maintain cache size if len(self.frame_cache) >= self.cache_size: # Remove oldest entry oldest = min(self.frame_cache, key=self.frame_cache.get) del self.frame_cache[oldest] self.frame_cache[frame_hash] = time.time() return False def clear_cache(self): """Clear frame cache""" self.frame_cache.clear() # Initialize video optimizer video_optimizer = VideoOptimizer() # ============== Response Models ============== class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int class WeaponDetection(BaseModel): type: str class_name: str weapon_type: str confidence: float bbox: BoundingBox threat_level: str detection_method: str class NSFWDetection(BaseModel): type: str class_name: str confidence: float bbox: BoundingBox method: str skin_ratio: Optional[float] = None class FightDetection(BaseModel): type: str confidence: float bbox: BoundingBox persons_involved: int threat_level: str class ImageDetectionResponse(BaseModel): success: bool request_id: str timestamp: str image_info: Dict[str, Any] detections: Dict[str, List[Union[WeaponDetection, NSFWDetection, FightDetection]]] summary: Dict[str, Any] risk_level: str action_required: bool processing_time_ms: float class VideoDetectionResponse(BaseModel): success: bool request_id: str timestamp: str video_info: Dict[str, Any] total_frames_processed: int frame_detections: List[Dict[str, Any]] summary: Dict[str, Any] risk_level: str action_required: bool processing_time_ms: float optimization_used: Dict[str, Any] # ============== Startup/Shutdown Events ============== @app.on_event("startup") async def startup_event(): """Initialize Content Moderator on startup""" global moderator try: logger.info("Initializing Content Moderator for CPU...") # Create CPU-optimized config cpu_config = { 'weapon_detection': { 'enabled': True, 'confidence_threshold': 0.5, 'knife_confidence': 0.5, 'fight_confidence': 0.45, 'model_size': 'yolo11n', 'use_enhancement': False, # Disable for CPU 'multi_pass': False, # Disable for CPU 'boost_knife_detection': True, 'fight_detection': True, 'fight_analysis': False # Disable complex analysis }, 'nsfw_detection': { 'enabled': True, 'confidence_threshold': 0.7, 'skin_detection': False, # Disable for CPU 'pose_analysis': False, 'region_analysis': False }, 'performance': { 'image_size': 320, # Small size for CPU 'batch_size': 1, 'half_precision': False, 'use_flash_attention': False, 'cpu_optimization': True }, 'output': { 'save_detections': True, 'draw_boxes': False, # Disable to save CPU 'log_results': True } } moderator = ContentModerator(config=cpu_config) status = moderator.get_model_status() logger.info(f"Model Status: {status}") logger.info("✅ Content Moderator initialized successfully for CPU") except Exception as e: logger.error(f"Failed to initialize Content Moderator: {e}") moderator = None @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" global moderator if moderator: logger.info("Shutting down Content Moderator...") moderator = None video_optimizer.clear_cache() # ============== Utility Functions ============== def generate_request_id() -> str: """Generate unique request ID""" return f"req_{datetime.now().strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}" def validate_file_extension(filename: str, allowed_extensions: set) -> bool: """Validate file extension""" return Path(filename).suffix.lower() in allowed_extensions def validate_file_size(file_size: int, max_size: int) -> bool: """Validate file size""" return file_size <= max_size async def save_upload_file(upload_file: UploadFile, destination: Path) -> Path: """Save uploaded file to destination""" try: async with aiofiles.open(destination, 'wb') as f: content = await upload_file.read() await f.write(content) return destination except Exception as e: logger.error(f"Error saving file: {e}") raise def safe_dict(obj): """Convert object to dict safely""" if hasattr(obj, 'dict'): return obj.dict() elif isinstance(obj, dict): return obj else: return str(obj) def process_detections(raw_detections: List[Dict]) -> Dict[str, List]: """Process and categorize raw detections""" processed = { 'weapons': [], 'nsfw': [], 'fights': [] } for det in raw_detections: if det['type'] == 'weapon': processed['weapons'].append(WeaponDetection( type=det['type'], class_name=det['class'], weapon_type=det.get('weapon_type', 'unknown'), confidence=det['confidence'], bbox=BoundingBox( x1=det['bbox'][0], y1=det['bbox'][1], x2=det['bbox'][2], y2=det['bbox'][3] ), threat_level=det.get('threat_level', 'medium'), detection_method=det.get('detection_method', 'yolo') )) elif det['type'] == 'nsfw': processed['nsfw'].append(NSFWDetection( type=det['type'], class_name=det['class'], confidence=det['confidence'], bbox=BoundingBox( x1=det['bbox'][0], y1=det['bbox'][1], x2=det['bbox'][2], y2=det['bbox'][3] ), method=det.get('method', 'classification'), skin_ratio=det.get('skin_ratio') )) elif det['type'] == 'fight': processed['fights'].append(FightDetection( type="fight", confidence=det['confidence'], bbox=BoundingBox( x1=det['bbox'][0], y1=det['bbox'][1], x2=det['bbox'][2], y2=det['bbox'][3] ), persons_involved=det.get('people_involved', 2), threat_level=det.get('threat_level', 'high') )) return processed # ============== API Endpoints ============== @app.get("/") async def root(): """Root endpoint""" return { "message": "Weapon & NSFW Detection API", "version": "2.0.0", "status": "running" if moderator else "initializing", "cpu_optimized": True, "docs": "/docs" } @app.get("/status") async def get_status(): """Check system status""" if moderator is None: return { "status": "error", "message": "Content Moderator not initialized" } return { "status": "ok", "model_status": moderator.get_model_status(), "memory_usage": moderator.get_memory_usage(), "cache_size": len(video_optimizer.frame_cache), "cpu_optimized": True } @app.post("/detect_n_k_f_g/images", response_model=ImageDetectionResponse) async def detect_image( file: UploadFile = File(...), return_annotated: bool = Form(False) ): """ Detect weapons, fights, and NSFW content in images Optimized for CPU processing """ if moderator is None: raise HTTPException( status_code=503, detail="Content Moderator not initialized" ) request_id = generate_request_id() start_time = datetime.now() try: # Validate file if not validate_file_extension(file.filename, config.ALLOWED_IMAGE_EXTENSIONS): raise HTTPException( status_code=400, detail=f"Invalid file type" ) # Read file file_content = await file.read() file_size = len(file_content) if not validate_file_size(file_size, config.MAX_IMAGE_SIZE): raise HTTPException( status_code=400, detail=f"File too large. Max: {config.MAX_IMAGE_SIZE / (1024 * 1024):.1f}MB" ) # Save file upload_path = config.UPLOAD_DIR / "images" / f"{request_id}_{file.filename}" async with aiofiles.open(upload_path, 'wb') as f: await f.write(file_content) # Decode image nparr = np.frombuffer(file_content, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: raise HTTPException(status_code=400, detail="Invalid image file") # Get image info height, width = image.shape[:2] image_info = { "filename": file.filename, "width": width, "height": height, "size_mb": round(file_size / (1024 * 1024), 2) } # Downscale for CPU if too large if width > 640: scale = 640 / width new_width = int(width * scale) new_height = int(height * scale) image = cv2.resize(image, (new_width, new_height)) logger.info(f"Downscaled image from {width}x{height} to {new_width}x{new_height}") # Process image result = moderator.process_image(image) if not result: raise HTTPException(status_code=500, detail="Processing failed") # Process detections processed = process_detections(result['detections']) # Calculate summary summary = { "total_detections": len(result['detections']), "weapons": len(processed['weapons']), "nsfw": len(processed['nsfw']), "fights": len(processed['fights']) } # Determine risk level if len(processed['weapons']) > 0 or len(processed['fights']) > 0: risk_level = "high" elif len(processed['nsfw']) > 0: risk_level = "medium" else: risk_level = "safe" # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() * 1000 return ImageDetectionResponse( success=True, request_id=request_id, timestamp=datetime.now().isoformat(), image_info=image_info, detections=processed, summary=summary, risk_level=risk_level, action_required=(summary["total_detections"] > 0), processing_time_ms=processing_time ) except HTTPException: raise except Exception as e: logger.error(f"Error processing image: {e}") logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.post("/detect_n_k_f_g/videos", response_model=VideoDetectionResponse) async def detect_video( file: UploadFile = File(...), quick_mode: bool = Form(True, description="Enable CPU optimizations"), adaptive_settings: bool = Form(True, description="Auto-adjust settings"), custom_frame_skip: Optional[int] = Form(None, ge=1, le=50) ): """ Detect weapons, fights, and NSFW content in videos CPU-optimized with smart frame skipping """ if moderator is None: raise HTTPException( status_code=503, detail="Content Moderator not initialized" ) request_id = generate_request_id() start_time = datetime.now() try: # Validate file if not validate_file_extension(file.filename, config.ALLOWED_VIDEO_EXTENSIONS): raise HTTPException( status_code=400, detail="Invalid video format" ) # Save video upload_path = config.UPLOAD_DIR / "videos" / f"{request_id}_{file.filename}" await save_upload_file(file, upload_path) # Check file size file_size = upload_path.stat().st_size if not validate_file_size(file_size, config.MAX_VIDEO_SIZE): upload_path.unlink() raise HTTPException( status_code=400, detail=f"File too large. Max: {config.MAX_VIDEO_SIZE / (1024 * 1024):.1f}MB" ) # Open video cap = cv2.VideoCapture(str(upload_path)) if not cap.isOpened(): raise HTTPException(status_code=400, detail="Cannot open video file") # Get video info fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration = total_frames / fps if fps > 0 else 0 video_info = { "filename": file.filename, "width": width, "height": height, "fps": fps, "total_frames": total_frames, "duration_seconds": round(duration, 2), "size_mb": round(file_size / (1024 * 1024), 2) } # Get optimal settings if adaptive_settings: settings = video_optimizer.get_optimal_settings(duration, total_frames) frame_skip = custom_frame_skip or settings['frame_skip'] target_width = settings['target_width'] max_frames = settings['max_frames'] else: frame_skip = custom_frame_skip or config.VIDEO_FRAME_SKIP target_width = config.VIDEO_TARGET_WIDTH max_frames = config.VIDEO_MAX_FRAMES logger.info(f"Video settings: skip={frame_skip}, width={target_width}, max={max_frames}") # Clear cache for new video video_optimizer.clear_cache() # Processing variables frame_detections = [] frame_count = 0 processed_count = 0 threat_count = 0 critical_threat = False # Aggregated statistics all_weapons = [] all_nsfw = [] all_fights = [] # Temporary optimize settings for video processing if quick_mode: original_size = moderator.config['performance']['image_size'] moderator.config['performance']['image_size'] = target_width # Process video while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Skip frames if frame_count % frame_skip != 0: continue # Check max frames limit if processed_count >= max_frames: logger.info(f"Reached max frames limit: {max_frames}") break # Preprocess frame frame = video_optimizer.preprocess_frame(frame, target_width) # Skip similar frames if video_optimizer.should_skip_frame(frame): continue processed_count += 1 # Process frame result = moderator.process_image(frame) if result and result['detections']: # Process detections processed = process_detections(result['detections']) # Track threats current_threats = len(result['detections']) threat_count += current_threats # Check for critical threats for det in result['detections']: if det.get('threat_level') == 'critical': critical_threat = True # Store frame detection info (simplified) if current_threats > 0: frame_info = { "frame_number": frame_count, "timestamp_seconds": round(frame_count / fps, 2), "detections": { "weapons": len(processed['weapons']), "nsfw": len(processed['nsfw']), "fights": len(processed['fights']) }, "threat_level": "critical" if critical_threat else "high" } frame_detections.append(frame_info) # Aggregate all_weapons.extend(processed['weapons']) all_nsfw.extend(processed['nsfw']) all_fights.extend(processed['fights']) # Early stopping if critical_threat and threat_count >= config.VIDEO_EARLY_STOP_THRESHOLD: logger.info(f"Critical threats detected ({threat_count}), early stopping") break # Progress log if processed_count % 20 == 0: elapsed = (datetime.now() - start_time).total_seconds() frames_per_sec = processed_count / elapsed if elapsed > 0 else 0 logger.info(f"Processed {processed_count} frames in {elapsed:.1f}s ({frames_per_sec:.1f} fps)") # Restore original settings if quick_mode: moderator.config['performance']['image_size'] = original_size # Release video cap.release() # Clean up uploaded file try: upload_path.unlink() except: pass # Calculate summary summary = { "total_frames_analyzed": processed_count, "frames_with_detections": len(frame_detections), "total_detections": threat_count, "weapons": len(all_weapons), "nsfw": len(all_nsfw), "fights": len(all_fights) } # Determine risk level if critical_threat or len(all_weapons) > 5: risk_level = "critical" elif len(all_weapons) > 0 or len(all_fights) > 0: risk_level = "high" elif len(all_nsfw) > 0: risk_level = "medium" else: risk_level = "safe" # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() * 1000 # Optimization info optimization_used = { "frame_skip": frame_skip, "resolution": target_width, "max_frames": max_frames, "frames_cached": len(video_optimizer.frame_cache), "early_stopped": critical_threat and threat_count >= config.VIDEO_EARLY_STOP_THRESHOLD } return VideoDetectionResponse( success=True, request_id=request_id, timestamp=datetime.now().isoformat(), video_info=video_info, total_frames_processed=processed_count, frame_detections=frame_detections[:50], # Limit to 50 detections summary=summary, risk_level=risk_level, action_required=(summary["total_detections"] > 0), processing_time_ms=processing_time, optimization_used=optimization_used ) except HTTPException: raise except Exception as e: logger.error(f"Error processing video: {e}") logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) finally: # Clear cache after video processing video_optimizer.clear_cache() @app.delete("/cleanup") async def cleanup_old_files(hours: int = 24): """Clean up old files""" try: from datetime import timedelta cutoff_time = datetime.now() - timedelta(hours=hours) deleted_count = 0 for directory in [config.UPLOAD_DIR, config.RESULTS_DIR, config.PROCESSED_DIR]: for subdir in ["images", "videos"]: path = directory / subdir if path.exists(): for file in path.iterdir(): if file.is_file(): file_time = datetime.fromtimestamp(file.stat().st_mtime) if file_time < cutoff_time: file.unlink() deleted_count += 1 return { "success": True, "deleted_files": deleted_count, "message": f"Deleted {deleted_count} files older than {hours} hours" } except Exception as e: logger.error(f"Cleanup error: {e}") return {"success": False, "error": str(e)} if __name__ == "__main__": import os port = int(os.environ.get("PORT", 7860)) options = { "bind": f"0.0.0.0:{port}", "workers": 2, "worker_class": "uvicorn.workers.UvicornWorker", } StandaloneApplication(app, options).run()