diff --git "a/main.py" "b/main.py" --- "a/main.py" +++ "b/main.py" @@ -1,1840 +1,1784 @@ -import cv2 -import numpy as np -import torch -import os -import json -import warnings - -warnings.filterwarnings('ignore') - -# Import required libraries -try: - from ultralytics import YOLO - from transformers import pipeline - from PIL import Image, ImageDraw, ImageFont - import requests - from datetime import datetime - - # MediaPipe import with fallback - try: - import mediapipe as mp - - MEDIAPIPE_AVAILABLE = True - print("✅ MediaPipe imported successfully") - except ImportError: - MEDIAPIPE_AVAILABLE = False - print("⚠️ MediaPipe not available - pose detection disabled") - except Exception as e: - MEDIAPIPE_AVAILABLE = False - print(f"⚠️ MediaPipe import error: {e} - pose detection disabled") - -except ImportError as e: - print(f"Missing dependency: {e}") - print("Please install: pip install ultralytics transformers pillow requests") - print("For MediaPipe: pip install mediapipe==0.10.18") - - -class ContentModerator: - def __init__(self, config=None): - default_cfg = self.get_default_config() - self.config = self.deep_merge_dicts(default_cfg, config) if config else default_cfg - - self.device = 'cuda' if torch.cuda.is_available() else 'cpu' - - # CPU optimizations - if self.device == 'cpu': - print("💻 CPU mode detected - applying optimizations...") - torch.set_num_threads(4) - self.config['performance']['half_precision'] = False - self.config['nsfw_detection']['pose_analysis'] = False - - # Initialize models - self.weapon_model = None # Primary weapon model - self.weapon_model_custom = None # Custom model for dao + súng + fight - self.weapon_model_general = None # General model for person + backup weapons - self.nsfw_classifier = None - self.pose_detector = None - - # Performance optimization - self.detection_cache = {} - self.cache_ttl = 2 # Cache for 2 seconds - - # Results storage - self.detection_history = [] - - print(f"🚀 Content Moderator initialized on {self.device}") - if self.device == 'cpu': - print("⚡ CPU optimizations enabled") - - self.setup_models() - - def deep_merge_dicts(self, a: dict, b: dict) -> dict: - """Merge dict b into a (non-destructive). Returns merged copy. - - Keeps keys from a when missing in b. - - Recursively merges nested dicts. - """ - if not isinstance(a, dict): - return b if isinstance(b, dict) else a - result = dict(a) # shallow copy - if not b: - return result - for k, v in b.items(): - if k in result and isinstance(result[k], dict) and isinstance(v, dict): - result[k] = self.deep_merge_dicts(result[k], v) - else: - result[k] = v - return result - - def get_default_config(self): - """Default configuration optimized for CPU/GPU with enhanced knife and fight detection""" - # Auto-detect optimal settings - is_cuda = torch.cuda.is_available() - - return { - 'weapon_detection': { - 'enabled': True, - 'confidence_threshold': 0.45, # For guns - 'knife_confidence': 0.45, # Lower threshold for knives - 'fight_confidence': 0.40, # Lower threshold for fights (behavioral) - 'model_size': 'yolo11n', - 'classes': ['knife', 'gun', 'rifle', 'pistol', 'weapon', 'fight'], - 'use_enhancement': True, # Enable image enhancement for knives - 'multi_pass': True, # Enable multi-pass detection - 'boost_knife_detection': True, # Enable knife confidence boosting - 'fight_detection': True, # Enable fight-specific detection - 'fight_analysis': True # Enable advanced fight behavior analysis - }, - 'fight_detection': { - 'enabled': True, - 'confidence_threshold': 0.40, - 'pose_analysis': True, # Analyze poses for fighting - 'motion_analysis': False, # Motion-based fight detection (for video) - 'aggression_keywords': ['fight', 'violence', 'aggression', 'combat'], - 'threat_escalation': True, # Escalate threat level for fights - 'multi_person_analysis': True # Analyze interactions between people - }, - 'nsfw_detection': { - 'enabled': True, - 'confidence_threshold': 0.7, - 'skin_detection': True, - 'pose_analysis': False, - 'region_analysis': True - }, - 'performance': { - 'image_size': 416 if is_cuda else 320, - 'batch_size': 1, - 'half_precision': is_cuda, - 'use_flash_attention': False, - 'cpu_optimization': not is_cuda - }, - 'output': { - 'save_detections': True, - 'draw_boxes': True, - 'log_results': True - } - } - - def setup_models(self): - """Initialize all detection models""" - try: - # Clear GPU cache - if torch.cuda.is_available(): - torch.cuda.empty_cache() - - # 1. Setup Weapon Detection (now includes fight detection) - if self.config['weapon_detection']['enabled']: - self.setup_weapon_detector() - - # 2. Setup NSFW Detection - if self.config['nsfw_detection']['enabled']: - self.setup_nsfw_detector() - - print("✅ All models loaded successfully!") - - except Exception as e: - print(f"❌ Error setting up models: {e}") - - def setup_weapon_detector(self): - """Setup dual model system: Custom for weapons + fights + General for person detection""" - try: - print("🔫 Loading weapon and fight detection models...") - - # Model 1: Custom YOLO11 for weapons (dao + súng + fight) - custom_model_path = "models/best.pt" - project_root = os.path.dirname(os.path.abspath(__file__)) - full_model_path = os.path.join(project_root, custom_model_path) - - if os.path.exists(full_model_path): - print(f"✅ Loading custom weapon+fight model: {full_model_path}") - self.weapon_model_custom = YOLO(full_model_path) - print("🎯 Custom weapon+fight model (dao + súng + fight) loaded!") - - # Show custom model classes - if hasattr(self.weapon_model_custom, 'names'): - classes = list(self.weapon_model_custom.names.values()) - print(f"📊 Custom classes: {classes}") - - # Check if fight class is available - if any('fight' in str(cls).lower() for cls in classes): - print("👊 Fight detection enabled in custom model") - else: - print("⚠️ Fight class not found in custom model") - else: - print("⚠️ Custom weapon+fight model not found") - self.weapon_model_custom = None - - # Model 2: General YOLO11n for person detection and fight fallback - print("👤 Loading general model for person detection...") - self.weapon_model_general = YOLO('yolo11n.pt') - print("✅ General YOLO11n loaded for person detection") - - # Set primary weapon model - self.weapon_model = self.weapon_model_custom if self.weapon_model_custom else self.weapon_model_general - - # Optimize models for performance - if self.device == 'cuda' and self.config['performance']['half_precision']: - try: - if self.weapon_model_custom: - self.weapon_model_custom.model.half() - self.weapon_model_general.model.half() - print("✅ Half precision enabled for both models") - except: - print("⚠️ Half precision not supported") - - print("🔥 Dual model system ready with fight detection!") - - except Exception as e: - print(f"❌ Error loading weapon+fight models: {e}") - self.weapon_model = None - self.weapon_model_custom = None - self.weapon_model_general = None - - def detect_weapons(self, image): - """Enhanced dual model weapon and fight detection""" - detections = [] - - try: - imgsz = self.config['performance']['image_size'] - use_half = self.config['performance']['half_precision'] and self.device == 'cuda' - - # Prepare multiple versions of the image - images_to_process = [(image, 1.0, "original")] - - if self.config['weapon_detection']['use_enhancement']: - enhanced_image = self.enhance_knife_detection(image) - images_to_process.append((enhanced_image, 1.15, "enhanced")) - - # Process each image version - for img, weight_multiplier, img_type in images_to_process: - if self.weapon_model_custom: - # Use different confidence thresholds for different detection types - knife_conf = self.config['weapon_detection']['knife_confidence'] - gun_conf = self.config['weapon_detection']['confidence_threshold'] - fight_conf = self.config['weapon_detection']['fight_confidence'] - - # Multi-pass detection with different thresholds - passes = [ - (knife_conf, "knife_pass"), # Low threshold for knives - (gun_conf, "gun_pass"), # Normal threshold for guns - (fight_conf, "fight_pass") # Low threshold for fights - ] if self.config['weapon_detection']['multi_pass'] else [ - (min(knife_conf, fight_conf), "single_pass")] - - for conf_threshold, pass_type in passes: - try: - results = self.weapon_model_custom( - img, - imgsz=imgsz, - conf=conf_threshold, - device=self.device, - half=use_half, - verbose=False, - augment=True # Enable test-time augmentation - ) - - for result in results: - boxes = result.boxes - if boxes is not None: - for box in boxes: - class_id = int(box.cls[0]) - - if hasattr(result, 'names') and class_id in result.names: - class_name = result.names[class_id].lower() - else: - class_name = f"detection_{class_id}" - - x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() - confidence = float(box.conf[0]) * weight_multiplier - - # Determine detection type and apply appropriate processing - if self.is_fight_detection(class_name): - # Fight detection processing - confidence = self.boost_fight_confidence( - img, [x1, y1, x2, y2], confidence, class_name - ) - - detection_type = 'fight' - min_conf = fight_conf - threat_level = self.assess_fight_threat(confidence, img, [x1, y1, x2, y2]) - - else: - # Weapon detection processing - if self.config['weapon_detection']['boost_knife_detection']: - if 'dao' in class_name or 'knife' in class_name or 'blade' in class_name: - confidence = self.boost_knife_confidence( - img, [x1, y1, x2, y2], confidence, class_name - ) - - detection_type = 'weapon' - weapon_type = self.classify_weapon_type(class_name) - min_conf = knife_conf if weapon_type == 'blade' else gun_conf - threat_level = self.assess_weapon_threat(weapon_type, confidence) - - if confidence >= min_conf: - detection_data = { - 'type': detection_type, - 'class': class_name, - 'confidence': min(confidence, 0.99), - 'bbox': [int(x1), int(y1), int(x2), int(y2)], - 'threat_level': threat_level, - 'detection_method': f'custom_model_{img_type}_{pass_type}' - } - - # Add type-specific fields - if detection_type == 'weapon': - detection_data['weapon_type'] = weapon_type - elif detection_type == 'fight': - detection_data['fight_type'] = self.classify_fight_type(class_name) - detection_data['aggression_level'] = self.assess_aggression_level( - confidence) - - detections.append(detection_data) - - icon = "👊" if detection_type == 'fight' else "🎯" - print( - f" {icon} Detected: {class_name} (conf: {confidence:.3f}, method: {img_type}_{pass_type})") - - except Exception as e: - print(f"⚠️ Detection pass error ({pass_type}): {e}") - - # Fallback: General model for backup detection (only if no custom detections) - if self.weapon_model_general and len(detections) == 0: - detections.extend(self.fallback_detection(image, imgsz, use_half)) - - # Remove duplicate detections - detections = self.remove_duplicate_detections(detections) - - # Additional fight analysis if enabled - # Additional fight analysis if enabled (safe access) - try: - # weapon_detection may contain a simple boolean or a dict for fight_detection - wd = self.config.get('weapon_detection', {}) - wd_fd = wd.get('fight_detection', None) - - # Determine whether fight analysis is enabled - if isinstance(wd_fd, dict): - fight_enabled = wd_fd.get('enabled', False) - fight_multi_person = wd_fd.get('multi_person_analysis', False) - else: - # wd_fd may be boolean (legacy); consult top-level fight_detection dict for details - fight_enabled = bool(wd_fd) - fight_multi_person = bool( - self.config.get('fight_detection', {}).get('multi_person_analysis', False)) - - if fight_enabled and fight_multi_person: - fight_detections = [d for d in detections if d.get('type') == 'fight'] - if fight_detections: - try: - enhanced_fights = self.analyze_fight_context(image, fight_detections) - # Replace original fight detections with enhanced ones - detections = [d for d in detections if d.get('type') != 'fight'] + enhanced_fights - except Exception as e: - print(f"⚠️ Fight context enhancement error: {e}") - except Exception as e: - # Defensive: never allow missing config to break detection pipeline - import traceback - traceback.print_exc() - print(f"⚠️ Skipping fight context analysis due to config error: {e}") - - return detections - - except Exception as e: - print(f"❌ Weapon and fight detection error: {e}") - return [] - - def is_fight_detection(self, class_name): - """Check if detection is fight-related""" - fight_keywords = ['fight', 'fighting', 'combat', 'violence', 'aggression', 'brawl', 'scuffle'] - return any(keyword in class_name.lower() for keyword in fight_keywords) - - def classify_fight_type(self, class_name): - """Classify type of fight detected""" - class_name = class_name.lower() - - if any(word in class_name for word in ['punch', 'boxing', 'fist']): - return 'physical_combat' - elif any(word in class_name for word in ['kick', 'martial', 'karate']): - return 'martial_arts' - elif any(word in class_name for word in ['wrestle', 'grapple']): - return 'wrestling' - elif any(word in class_name for word in ['group', 'mob', 'crowd']): - return 'group_violence' - else: - return 'general_fight' - - def boost_fight_confidence(self, image, bbox, initial_confidence, class_name): - """Boost confidence for fight detection based on contextual analysis""" - try: - x1, y1, x2, y2 = [int(coord) for coord in bbox] - - # Ensure bbox is within image bounds - x1 = max(0, x1) - y1 = max(0, y1) - x2 = min(image.shape[1], x2) - y2 = min(image.shape[0], y2) - - roi = image[y1:y2, x1:x2] - - if roi.size == 0: - return initial_confidence - - boost = 0 - - # 1. Motion blur analysis (indicates rapid movement) - gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - blur_variance = cv2.Laplacian(gray, cv2.CV_64F).var() - if blur_variance < 100: # Low variance indicates blur/motion - boost += 0.10 - - # 2. Edge density (chaotic scenes have more edges) - edges = cv2.Canny(gray, 50, 150) - edge_density = np.count_nonzero(edges) / edges.size - if edge_density > 0.15: - boost += 0.08 - - # 3. Color analysis (fights often have varied, chaotic colors) - hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) - color_variance = np.var(hsv[:, :, 1]) # Saturation variance - if color_variance > 1000: - boost += 0.05 - - # 4. Texture analysis (complex textures indicate multiple overlapping objects) - gray_f = np.float32(gray) - texture_response = cv2.cornerHarris(gray_f, 2, 3, 0.04) - texture_strength = np.mean(texture_response) - if texture_strength > 0.01: - boost += 0.07 - - # 5. Aspect ratio analysis (fights often have irregular bounding boxes) - height = y2 - y1 - width = x2 - x1 - if height > 0 and width > 0: - aspect_ratio = max(width, height) / min(width, height) - if 1.2 < aspect_ratio < 3.0: # Moderate irregularity - boost += 0.05 - - final_confidence = min(initial_confidence + boost, 0.95) - - if boost > 0: - print(f" 👊 Fight boost applied: +{boost:.2f} (blur:{blur_variance:.0f}, edge:{edge_density:.2f})") - - return final_confidence - - except Exception as e: - print(f"⚠️ Fight confidence boost error: {e}") - return initial_confidence - - def assess_fight_threat(self, confidence, image, bbox): - """Assess threat level of detected fight""" - base_threat = 'medium' # Fights start at medium threat - - # Escalate based on confidence - if confidence >= 0.85: - base_threat = 'critical' - elif confidence >= 0.70: - base_threat = 'high' - elif confidence >= 0.50: - base_threat = 'medium' - else: - base_threat = 'low' - - # Additional context-based escalation - try: - x1, y1, x2, y2 = bbox - fight_area = (x2 - x1) * (y2 - y1) - image_area = image.shape[0] * image.shape[1] - area_ratio = fight_area / image_area - - # Large fights are more dangerous - if area_ratio > 0.5: # Fight covers >50% of image - if base_threat == 'medium': - base_threat = 'high' - elif base_threat == 'high': - base_threat = 'critical' - - except Exception as e: - print(f"⚠️ Fight threat assessment error: {e}") - - return base_threat - - def assess_aggression_level(self, confidence): - """Assess aggression level based on confidence""" - if confidence >= 0.80: - return 'extreme' - elif confidence >= 0.65: - return 'high' - elif confidence >= 0.45: - return 'moderate' - else: - return 'low' - - def analyze_fight_context(self, image, fight_detections): - """Enhanced analysis of fight context with multi-person detection""" - enhanced_fights = [] - - try: - # Detect all persons in the image - persons = self.detect_persons(image) - - for fight in fight_detections: - enhanced_fight = fight.copy() - - # Count people involved in or near the fight - fight_bbox = fight['bbox'] - people_in_fight = 0 - people_nearby = 0 - - for person in persons: - person_bbox = person['bbox'] - - # Calculate overlap with fight area - overlap = self.calculate_bbox_overlap(fight_bbox, person_bbox) - - if overlap > 0.3: # Person is directly involved - people_in_fight += 1 - elif overlap > 0.1: # Person is nearby - people_nearby += 1 - - # Update fight information based on context - enhanced_fight['people_involved'] = people_in_fight - enhanced_fight['people_nearby'] = people_nearby - enhanced_fight['total_people'] = people_in_fight + people_nearby - - # Escalate threat based on number of people - if people_in_fight >= 3: - if enhanced_fight['threat_level'] == 'medium': - enhanced_fight['threat_level'] = 'high' - elif enhanced_fight['threat_level'] == 'high': - enhanced_fight['threat_level'] = 'critical' - enhanced_fight['fight_type'] = 'group_violence' - - # Add context flags - enhanced_fight['context_flags'] = [] - if people_in_fight >= 3: - enhanced_fight['context_flags'].append('multi_person_fight') - if people_nearby >= 2: - enhanced_fight['context_flags'].append('crowd_present') - - enhanced_fights.append(enhanced_fight) - - print(f" 👥 Fight context: {people_in_fight} involved, {people_nearby} nearby") - - except Exception as e: - print(f"⚠️ Fight context analysis error: {e}") - return fight_detections - - return enhanced_fights - - def calculate_bbox_overlap(self, bbox1, bbox2): - """Calculate overlap ratio between two bounding boxes""" - x1_min, y1_min, x1_max, y1_max = bbox1 - x2_min, y2_min, x2_max, y2_max = bbox2 - - # Calculate intersection - intersect_xmin = max(x1_min, x2_min) - intersect_ymin = max(y1_min, y2_min) - intersect_xmax = min(x1_max, x2_max) - intersect_ymax = min(y1_max, y2_max) - - if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: - return 0.0 - - intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) - bbox1_area = (x1_max - x1_min) * (y1_max - y1_min) - - return intersect_area / bbox1_area if bbox1_area > 0 else 0 - - def fallback_detection(self, image, imgsz, use_half): - """Fallback detection using general model""" - detections = [] - - try: - general_results = self.weapon_model_general( - image, - imgsz=imgsz, - conf=0.4, - device=self.device, - half=use_half, - verbose=False - ) - - for result in general_results: - boxes = result.boxes - if boxes is not None: - for box in boxes: - class_id = int(box.cls[0]) - class_name = result.names[class_id].lower() - - # Filter for weapon-like objects - weapon_keywords = ['knife', 'scissors', 'fork'] - - if any(keyword in class_name for keyword in weapon_keywords): - x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() - confidence = float(box.conf[0]) - - detections.append({ - 'type': 'weapon', - 'class': class_name, - 'weapon_type': 'blade', - 'confidence': confidence, - 'bbox': [int(x1), int(y1), int(x2), int(y2)], - 'threat_level': self.assess_weapon_threat('blade', confidence), - 'detection_method': 'general_model_fallback' - }) - - except Exception as e: - print(f"⚠️ General detection error: {e}") - - return detections - - # ... (rest of the existing methods remain the same) ... - - def enhance_knife_detection(self, image): - """Enhance image specifically for better knife/dao detection""" - try: - # 1. Increase contrast and brightness for metallic objects - enhanced = cv2.convertScaleAbs(image, alpha=1.4, beta=25) - - # 2. Apply sharpening kernel to highlight edges - kernel_sharpen = np.array([[-1, -1, -1], - [-1, 9, -1], - [-1, -1, -1]]) - sharpened = cv2.filter2D(enhanced, -1, kernel_sharpen) - - # 3. Apply CLAHE for better local contrast - lab = cv2.cvtColor(sharpened, cv2.COLOR_BGR2LAB) - l, a, b = cv2.split(lab) - clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) - l = clahe.apply(l) - enhanced_final = cv2.merge([l, a, b]) - enhanced_final = cv2.cvtColor(enhanced_final, cv2.COLOR_LAB2BGR) - - return enhanced_final - except Exception as e: - print(f"⚠️ Enhancement failed: {e}") - return image - - def boost_knife_confidence(self, image, bbox, initial_confidence, class_name): - """Boost confidence for knife/dao based on geometric and visual features""" - try: - x1, y1, x2, y2 = [int(coord) for coord in bbox] - - # Ensure bbox is within image bounds - x1 = max(0, x1) - y1 = max(0, y1) - x2 = min(image.shape[1], x2) - y2 = min(image.shape[0], y2) - - roi = image[y1:y2, x1:x2] - - if roi.size == 0: - return initial_confidence - - boost = 0 - - # 1. Check aspect ratio (knives are typically elongated) - height = y2 - y1 - width = x2 - x1 - if height > 0 and width > 0: - aspect_ratio = max(width, height) / min(width, height) - if aspect_ratio > 2.5: # Elongated shape - boost += 0.15 - elif aspect_ratio > 2.0: - boost += 0.10 - - # 2. Check for metallic reflection (brightness) - gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - mean_brightness = np.mean(gray) - std_brightness = np.std(gray) - - if mean_brightness > 140: # Bright (metallic) - boost += 0.10 - if std_brightness > 50: # High contrast (blade edge) - boost += 0.05 - - # 3. Edge detection (knives have strong edges) - edges = cv2.Canny(gray, 50, 150) - edge_ratio = np.count_nonzero(edges) / edges.size - if edge_ratio > 0.15: # Strong edges - boost += 0.10 - elif edge_ratio > 0.10: - boost += 0.05 - - # 4. Check for blade-like gradient - if height > width: # Vertical orientation - gradient = np.gradient(gray, axis=0) - else: # Horizontal orientation - gradient = np.gradient(gray, axis=1) - - gradient_strength = np.mean(np.abs(gradient)) - if gradient_strength > 10: - boost += 0.05 - - # Apply boost with class-specific multiplier - if 'dao' in class_name.lower() or 'knife' in class_name.lower(): - boost *= 1.2 # Extra boost for knife/dao classes - - final_confidence = min(initial_confidence + boost, 0.95) - - if boost > 0: - print( - f" 🔪 Knife boost applied: +{boost:.2f} (AR:{aspect_ratio:.1f}, Bright:{mean_brightness:.0f}, Edge:{edge_ratio:.2f})") - - return final_confidence - - except Exception as e: - print(f"⚠️ Confidence boost error: {e}") - return initial_confidence - - def detect_persons(self, image): - """Detect persons using general model (needed for NSFW and fight analysis)""" - persons = [] - - if not self.weapon_model_general: - return persons - - try: - imgsz = self.config['performance']['image_size'] - use_half = self.config['performance']['half_precision'] and self.device == 'cuda' - - results = self.weapon_model_general( - image, - imgsz=imgsz, - conf=0.3, - device=self.device, - half=use_half, - verbose=False - ) - - for result in results: - boxes = result.boxes - if boxes is not None: - for box in boxes: - class_id = int(box.cls[0]) - class_name = result.names[class_id].lower() - - if class_name == 'person': - x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() - confidence = float(box.conf[0]) - - persons.append({ - 'class': 'person', - 'confidence': confidence, - 'bbox': [int(x1), int(y1), int(x2), int(y2)] - }) - - return persons - - except Exception as e: - print(f"❌ Person detection error: {e}") - return [] - - def classify_weapon_type(self, class_name): - """Classify weapon type from class name""" - class_name = class_name.lower() - - # Knife/Blade keywords (expanded) - knife_keywords = ['knife', 'dao', 'blade', 'dagger', 'sword', 'machete', 'katana', 'cutter'] - if any(keyword in class_name for keyword in knife_keywords): - return 'blade' - - # Gun/Firearm keywords - gun_keywords = ['gun', 'pistol', 'rifle', 'firearm', 'revolver', 'shotgun', 'súng'] - if any(keyword in class_name for keyword in gun_keywords): - return 'firearm' - - # Other weapons - other_keywords = ['axe', 'hammer', 'club', 'bat'] - if any(keyword in class_name for keyword in other_keywords): - return 'blunt_weapon' - - # Check for numbered weapon classes - if 'weapon' in class_name: - try: - weapon_id = int(class_name.split('_')[-1]) - if weapon_id in [0, 1]: # Assuming 0,1 are firearms - return 'firearm' - elif weapon_id in [2, 3]: # Assuming 2,3 are blades - return 'blade' - else: - return 'unknown_weapon' - except: - pass - - return 'unknown_weapon' - - def assess_weapon_threat(self, weapon_type, confidence): - """Assess threat level of detected weapon""" - threat_levels = { - 'firearm': 'critical', - 'blade': 'high', - 'blunt_weapon': 'medium', - 'unknown_weapon': 'medium' - } - - base_threat = threat_levels.get(weapon_type, 'medium') - - # Adjust based on confidence - if confidence >= 0.9: - if base_threat == 'medium': - return 'high' - elif base_threat == 'high': - return 'critical' - else: - return base_threat - elif confidence >= 0.7: - return base_threat - elif confidence >= 0.5: - if base_threat == 'critical': - return 'high' - elif base_threat == 'high': - return 'medium' - else: - return base_threat - else: - if base_threat == 'critical': - return 'medium' - elif base_threat == 'high': - return 'low' - else: - return 'low' - - def remove_duplicate_detections(self, detections, iou_threshold=0.4): - """Remove duplicate detections using Non-Maximum Suppression""" - if len(detections) <= 1: - return detections - - # Sort by confidence (highest first) - detections = sorted(detections, key=lambda x: x['confidence'], reverse=True) - - keep = [] - for i, det1 in enumerate(detections): - should_keep = True - for det2 in keep: - # Check if same type and overlapping - if det1['type'] == det2['type']: - iou = self.calculate_iou(det1['bbox'], det2['bbox']) - if iou > iou_threshold: - should_keep = False - break - - if should_keep: - keep.append(det1) - - return keep - - def calculate_iou(self, box1, box2): - """Calculate Intersection over Union between two bounding boxes""" - x1_min, y1_min, x1_max, y1_max = box1 - x2_min, y2_min, x2_max, y2_max = box2 - - # Calculate intersection - intersect_xmin = max(x1_min, x2_min) - intersect_ymin = max(y1_min, y2_min) - intersect_xmax = min(x1_max, x2_max) - intersect_ymax = min(y1_max, y2_max) - - if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: - return 0.0 - - intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) - - # Calculate union - box1_area = (x1_max - x1_min) * (y1_max - y1_min) - box2_area = (x2_max - x2_min) * (y2_max - y2_min) - union_area = box1_area + box2_area - intersect_area - - return intersect_area / union_area if union_area > 0 else 0 - - # ... (continue with remaining NSFW detection methods) ... - - def detect_nsfw_content(self, image): - """Enhanced NSFW detection with person detection first""" - detections = [] - - try: - if len(image.shape) == 3 and image.shape[2] == 3: - rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - else: - rgb_image = image - - # Stage 1: Detect persons first (optimization) - persons = self.detect_persons(image) - - if not persons: - # No persons detected, skip detailed NSFW analysis - return detections - - print(f"👤 Found {len(persons)} person(s), analyzing for NSFW content...") - - # Stage 2: Overall NSFW Classification - if self.nsfw_classifier: - try: - pil_image = Image.fromarray(rgb_image) - nsfw_result = self.nsfw_classifier(pil_image) - - if nsfw_result[0]['label'] == 'nsfw': - confidence = nsfw_result[0]['score'] - if confidence > self.config['nsfw_detection']['confidence_threshold']: - detections.append({ - 'type': 'nsfw', - 'class': 'inappropriate_content', - 'confidence': confidence, - 'bbox': [0, 0, image.shape[1], image.shape[0]], - 'method': 'classification' - }) - except Exception as e: - print(f"⚠️ NSFW classifier error: {e}") - - # Stage 3: Person-specific skin analysis - if self.config['nsfw_detection']['skin_detection']: - for person in persons: - person_detections = self.analyze_person_skin(image, person) - detections.extend(person_detections) - - # Stage 4: Regional skin analysis (if no person-specific detections) - if self.config['nsfw_detection']['region_analysis'] and len(detections) == 0: - skin_detections = self.detect_skin_regions(image) - detections.extend(skin_detections) - - return detections - - except Exception as e: - print(f"❌ NSFW detection error: {e}") - return [] - - def analyze_person_skin(self, image, person): - """Analyze skin exposure for a specific person""" - detections = [] - - try: - x1, y1, x2, y2 = person['bbox'] - person_region = image[y1:y2, x1:x2] - - if person_region.size == 0: - return detections - - # Convert to HSV for skin detection - hsv_person = cv2.cvtColor(person_region, cv2.COLOR_BGR2HSV) - - # Skin color range - lower_skin = np.array([0, 20, 70], dtype=np.uint8) - upper_skin = np.array([20, 255, 255], dtype=np.uint8) - - # Create skin mask - skin_mask = cv2.inRange(hsv_person, lower_skin, upper_skin) - - # Calculate skin percentage - total_person_pixels = person_region.shape[0] * person_region.shape[1] - skin_pixels = cv2.countNonZero(skin_mask) - skin_ratio = skin_pixels / total_person_pixels if total_person_pixels > 0 else 0 - - # Threshold for suspicious skin exposure - if skin_ratio > 0.4: # 40% of person region is skin - confidence = min(skin_ratio * 2, 1.0) - - detections.append({ - 'type': 'nsfw', - 'class': 'excessive_skin_exposure', - 'confidence': confidence, - 'bbox': [x1, y1, x2, y2], - 'method': 'person_skin_analysis', - 'skin_ratio': skin_ratio - }) - - print(f"🚨 Excessive skin exposure detected: {skin_ratio:.2f} ratio") - - return detections - - except Exception as e: - print(f"❌ Person skin analysis error: {e}") - return [] - - def detect_skin_regions(self, image): - """Detect large skin-colored regions""" - try: - hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) - - # Define skin color range - lower_skin = np.array([0, 20, 70], dtype=np.uint8) - upper_skin = np.array([20, 255, 255], dtype=np.uint8) - - # Create skin mask - skin_mask = cv2.inRange(hsv, lower_skin, upper_skin) - - # Apply morphological operations - kernel = np.ones((3, 3), np.uint8) - skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel) - skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel) - - # Find contours - contours, _ = cv2.findContours(skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - - detections = [] - image_area = image.shape[0] * image.shape[1] - - for contour in contours: - area = cv2.contourArea(contour) - - # If skin region is too large - if area > image_area * 0.3: - x, y, w, h = cv2.boundingRect(contour) - confidence = min(area / image_area, 1.0) - - detections.append({ - 'type': 'nsfw', - 'class': 'large_skin_region', - 'confidence': confidence, - 'bbox': [x, y, x + w, y + h], - 'method': 'skin_detection' - }) - - return detections - - except Exception as e: - print(f"❌ Skin detection error: {e}") - return [] - - def setup_nsfw_detector(self): - """Setup NSFW detection components (Optimized for CPU)""" - try: - print("🔞 Loading NSFW detection components...") - - # 1. NSFW Classifier (Optimized for CPU) - try: - device_id = 0 if self.device == 'cuda' else -1 - self.nsfw_classifier = pipeline( - "image-classification", - model="Falconsai/nsfw_image_detection", - device=device_id, - use_fast=True - ) - print("✅ NSFW classifier loaded") - except Exception as nsfw_error: - print(f"⚠️ NSFW classifier failed: {nsfw_error}") - print(" Trying backup method...") - try: - # Fallback without specifying use_fast - self.nsfw_classifier = pipeline( - "image-classification", - model="Falconsai/nsfw_image_detection", - device=device_id - ) - print("✅ NSFW classifier loaded (fallback)") - except: - print("❌ NSFW classifier completely failed") - self.nsfw_classifier = None - - # 2. Pose Detection (Fixed import with fallbacks) - if self.config['nsfw_detection']['pose_analysis'] and MEDIAPIPE_AVAILABLE: - try: - import mediapipe as mp - try: - mp_pose = mp.solutions.pose - self.pose_detector = mp_pose.Pose( - static_image_mode=True, - model_complexity=0, - min_detection_confidence=0.5 - ) - print("✅ Pose detector loaded (legacy API)") - except AttributeError: - print("⚠️ MediaPipe API not available") - self.pose_detector = None - self.config['nsfw_detection']['pose_analysis'] = False - - except Exception as pose_error: - print(f"⚠️ Pose detection failed: {pose_error}") - self.pose_detector = None - self.config['nsfw_detection']['pose_analysis'] = False - else: - self.pose_detector = None - if not MEDIAPIPE_AVAILABLE: - print("⚠️ MediaPipe not available - pose analysis disabled") - - except Exception as e: - print(f"❌ Error loading NSFW components: {e}") - print("💡 Falling back to skin detection only") - - def process_image(self, image_path): - """Process single image with enhanced detection including fights""" - try: - # Load image - if isinstance(image_path, str): - image = cv2.imread(image_path) - if image is None: - raise ValueError(f"Could not load image: {image_path}") - cache_key = f"file_{image_path}" - else: - image = image_path - cache_key = f"array_{hash(image.tobytes())}" - - # Check cache - import time - current_time = time.time() - if cache_key in self.detection_cache: - cached_result, timestamp = self.detection_cache[cache_key] - if current_time - timestamp < self.cache_ttl: - return cached_result - - print(f"📸 Processing image: {image.shape}") - - # Run detections - all_detections = [] - - # Weapon and fight detection - if self.config['weapon_detection']['enabled']: - weapon_fight_detections = self.detect_weapons(image) - all_detections.extend(weapon_fight_detections) - - weapon_detections = [d for d in weapon_fight_detections if d['type'] == 'weapon'] - fight_detections = [d for d in weapon_fight_detections if d['type'] == 'fight'] - - print(f"🔫 Found {len(weapon_detections)} weapon(s)") - print(f"👊 Found {len(fight_detections)} fight(s)") - - # Show detailed breakdown - if weapon_detections: - knife_detections = [d for d in weapon_detections if d['weapon_type'] == 'blade'] - if knife_detections: - print(f" 🔪 Including {len(knife_detections)} knife/dao detection(s)") - - if fight_detections: - for fight in fight_detections: - fight_type = fight.get('fight_type', 'unknown') - aggression = fight.get('aggression_level', 'unknown') - print(f" 👊 Fight: {fight_type} (aggression: {aggression})") - - # NSFW detection - if self.config['nsfw_detection']['enabled']: - nsfw_detections = self.detect_nsfw_content(image) - all_detections.extend(nsfw_detections) - print(f"🔞 Found {len(nsfw_detections)} NSFW detection(s)") - - # Generate result - result = { - 'timestamp': datetime.now().isoformat(), - 'image_path': image_path if isinstance(image_path, str) else 'array', - 'detections': all_detections, - 'total_threats': len(all_detections), - 'risk_level': self.calculate_risk_level(all_detections), - 'action_required': len(all_detections) > 0, - 'processing_method': 'enhanced_dual_model_with_fight', - 'detection_breakdown': { - 'weapons': len([d for d in all_detections if d['type'] == 'weapon']), - 'fights': len([d for d in all_detections if d['type'] == 'fight']), - 'nsfw': len([d for d in all_detections if d['type'] == 'nsfw']) - } - } - - # Cache result - self.detection_cache[cache_key] = (result, current_time) - - # Clean old cache entries - self.clean_cache(current_time) - - # Save detection history - self.detection_history.append(result) - - # Draw detections - if self.config['output']['draw_boxes'] and all_detections: - annotated_image = self.draw_detections(image.copy(), all_detections) - result['annotated_image'] = annotated_image - - return result - - except Exception as e: - print(f"❌ Error processing image: {e}") - return None - - def clean_cache(self, current_time): - """Clean expired cache entries""" - try: - expired_keys = [] - for key, (_, timestamp) in self.detection_cache.items(): - if current_time - timestamp > self.cache_ttl: - expired_keys.append(key) - - for key in expired_keys: - del self.detection_cache[key] - - except Exception as e: - print(f"⚠️ Cache cleanup error: {e}") - - def get_model_status(self): - """Get status of all models (safe access to config keys).""" - # weapon_detection subtree - wd = self.config.get('weapon_detection', {}) - # fight_detection may be a bool in weapon_detection (legacy) or a dict (detailed). - wd_fd = wd.get('fight_detection', None) - if isinstance(wd_fd, dict): - fight_enabled = wd_fd.get('enabled', True) - else: - fight_enabled = bool(wd_fd) - - # fight analysis flag (either in weapon_detection or top-level fight_detection) - fight_analysis_flag = wd.get('fight_analysis', False) or \ - bool(self.config.get('fight_detection', {}).get('multi_person_analysis', False)) - - status = { - 'fight_detection': fight_enabled, - 'custom_weapon_fight_model': self.weapon_model_custom is not None, - 'general_model': self.weapon_model_general is not None, - 'nsfw_classifier': self.nsfw_classifier is not None, - 'pose_detector': self.pose_detector is not None, - 'device': self.device, - 'cache_size': len(self.detection_cache), - 'knife_enhancement': wd.get('use_enhancement', False), - 'knife_boost': wd.get('boost_knife_detection', False), - 'fight_analysis': fight_analysis_flag - } - - if self.weapon_model_custom and hasattr(self.weapon_model_custom, 'names'): - status['custom_classes'] = list(self.weapon_model_custom.names.values()) - - return status - - def calculate_risk_level(self, detections): - """Calculate overall risk level including fights""" - if not detections: - return 'safe' - - max_confidence = max(det['confidence'] for det in detections) - threat_types = set(det['type'] for det in detections) - - # Check for critical combinations - has_weapons = 'weapon' in threat_types - has_fights = 'fight' in threat_types - has_nsfw = 'nsfw' in threat_types - - # Fights + weapons = critical - if has_weapons and has_fights: - return 'critical' - - # High confidence fights are critical - fight_detections = [d for d in detections if d['type'] == 'fight'] - if fight_detections: - max_fight_confidence = max(f['confidence'] for f in fight_detections) - if max_fight_confidence > 0.8: - return 'critical' - elif max_fight_confidence > 0.65: - return 'high' - - # Existing weapon logic - if has_weapons and max_confidence > 0.8: - return 'critical' - elif has_weapons or has_fights or max_confidence > 0.9: - return 'high' - elif max_confidence > 0.7: - return 'medium' - else: - return 'low' - - def draw_detections(self, image, detections): - """Draw detection boxes and labels with enhanced visualization for fights""" - try: - colors = { - 'weapon': (0, 0, 255), # Red - 'fight': (0, 165, 255), # Orange for fights - 'nsfw': (255, 0, 255), # Magenta - } - - # Special colors for weapon types - weapon_colors = { - 'blade': (0, 100, 255), # Orange-red for knives - 'firearm': (0, 0, 255), # Red for guns - 'blunt_weapon': (100, 0, 255) # Purple for blunt weapons - } - - # Special colors for fight types - fight_colors = { - 'physical_combat': (0, 140, 255), # Orange - 'martial_arts': (0, 200, 255), # Light orange - 'wrestling': (0, 165, 255), # Medium orange - 'group_violence': (0, 69, 255), # Dark orange - 'general_fight': (0, 165, 255) # Default orange - } - - for det in detections: - x1, y1, x2, y2 = det['bbox'] - - # Choose color based on type - if det['type'] == 'weapon' and 'weapon_type' in det: - color = weapon_colors.get(det['weapon_type'], colors['weapon']) - elif det['type'] == 'fight' and 'fight_type' in det: - color = fight_colors.get(det['fight_type'], colors['fight']) - else: - color = colors.get(det['type'], (0, 255, 0)) - - # Draw rectangle with thicker line for high-threat detections - thickness = 4 if det.get('threat_level') == 'critical' else 3 if det['type'] in ['weapon', - 'fight'] else 2 - cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness) - - # Create detailed label - if det['type'] == 'weapon': - label = f"{det['class']} ({det['confidence']:.2f})" - if 'threat_level' in det: - label += f" [{det['threat_level']}]" - elif det['type'] == 'fight': - label = f"FIGHT: {det['class']} ({det['confidence']:.2f})" - if 'threat_level' in det: - label += f" [{det['threat_level']}]" - if 'aggression_level' in det: - label += f" {det['aggression_level']}" - else: - label = f"{det['type']}: {det['class']} ({det['confidence']:.2f})" - - # Draw label background - label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] - cv2.rectangle(image, (x1, y1 - 25), (x1 + label_size[0] + 5, y1), color, -1) - - # Draw label text - cv2.putText(image, label, (x1 + 2, y1 - 7), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) - - # Add additional context for fights - if det['type'] == 'fight': - context_text = [] - if 'people_involved' in det and det['people_involved'] > 0: - context_text.append(f"People: {det['people_involved']}") - if 'context_flags' in det and det['context_flags']: - context_text.append(f"Flags: {', '.join(det['context_flags'])}") - - if context_text: - context_label = " | ".join(context_text) - cv2.putText(image, context_label, (x1, y2 + 15), - cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) - - # Add detection method indicator (small text) - if 'detection_method' in det: - method = det['detection_method'].split('_')[-1] - cv2.putText(image, method, (x1, y2 + 30), - cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) - - return image - - except Exception as e: - print(f"❌ Error drawing detections: {e}") - return image - - def process_video(self, video_path, output_path=None, frame_skip=2): - """Process video file with enhanced detection including fights - optimized frame processing""" - try: - cap = cv2.VideoCapture(video_path) - frame_count = 0 - total_detections = [] - fight_timeline = [] # Track fights over time - recent_detections = [] # Track recent detections for adaptive processing - - if output_path: - fourcc = cv2.VideoWriter_fourcc(*'mp4v') - fps = cap.get(cv2.CAP_PROP_FPS) - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) - - while True: - ret, frame = cap.read() - if not ret: - break - - frame_count += 1 - - # Adaptive frame processing based on recent detections - should_process = False - - # Always process if recent threats detected (within last 10 frames) - if any(det['frame'] > frame_count - 10 for det in recent_detections[-5:]): - should_process = True - # Or process based on reduced skip rate - elif frame_count % max(1, frame_skip) == 0: - should_process = True - - if not should_process: - if output_path: - out.write(frame) - continue - - # Process frame - result = self.process_image(frame) - if result and result['detections']: - # Add frame number to each detection for tracking - for detection in result['detections']: - detection['frame'] = frame_count - - total_detections.extend(result['detections']) - recent_detections.append({'frame': frame_count, 'count': len(result['detections'])}) - - # Track fight timeline - fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] - if fight_detections: - timestamp = frame_count / cap.get(cv2.CAP_PROP_FPS) - fight_timeline.append({ - 'timestamp': timestamp, - 'frame': frame_count, - 'fights': len(fight_detections), - 'max_aggression': max(f.get('aggression_level', 'low') for f in fight_detections) - }) - - # Reduce frame_skip temporarily when fight detected - frame_skip = max(1, frame_skip // 2) - - print(f"⚠️ Frame {frame_count}: {len(result['detections'])} threats detected") - - breakdown = result.get('detection_breakdown', {}) - if breakdown.get('fights', 0) > 0: - print(f" 👊 Fights: {breakdown['fights']}") - - if output_path and 'annotated_image' in result: - out.write(result['annotated_image']) - elif output_path: - out.write(frame) - else: - # No detections - can increase frame_skip for efficiency - if len(recent_detections) > 5 and all(det['count'] == 0 for det in recent_detections[-5:]): - frame_skip = min(5, frame_skip + 1) - - if output_path: - out.write(frame) - - cap.release() - if output_path: - out.release() - - # Analysis of fight patterns - fight_analysis = {} - if fight_timeline: - fight_analysis = { - 'total_fight_incidents': len(fight_timeline), - 'first_fight_time': fight_timeline[0]['timestamp'], - 'last_fight_time': fight_timeline[-1]['timestamp'], - 'peak_aggression_time': max(fight_timeline, key=lambda x: x['max_aggression'])['timestamp'], - 'fight_duration_coverage': fight_timeline[-1]['timestamp'] - fight_timeline[0]['timestamp'] if len( - fight_timeline) > 1 else 0 - } - - return { - 'total_frames_processed': frame_count // frame_skip, - 'total_detections': len(total_detections), - 'detections': total_detections, - 'fight_timeline': fight_timeline, - 'fight_analysis': fight_analysis, - 'detection_breakdown': { - 'weapons': len([d for d in total_detections if d['type'] == 'weapon']), - 'fights': len([d for d in total_detections if d['type'] == 'fight']), - 'nsfw': len([d for d in total_detections if d['type'] == 'nsfw']) - } - } - - except Exception as e: - print(f"❌ Error processing video: {e}") - return None - - def save_report(self, filename="detection_report.json"): - """Save detection history to file""" - try: - with open(filename, 'w') as f: - json.dump(self.detection_history, f, indent=2, default=str) - print(f"📊 Report saved to {filename}") - except Exception as e: - print(f"❌ Error saving report: {e}") - - def get_memory_usage(self): - """Get current GPU memory usage""" - if torch.cuda.is_available(): - allocated = torch.cuda.memory_allocated() / 1024 ** 3 - cached = torch.cuda.memory_reserved() / 1024 ** 3 - return f"GPU Memory: {allocated:.2f}GB allocated, {cached:.2f}GB cached" - return "CPU mode" - - -def main(): - """Enhanced example usage with knife and fight detection improvements""" - - # Initialize the system - moderator = ContentModerator() - - # Show enhanced system information - print("\n" + "=" * 60) - print("🎯 ENHANCED DUAL MODEL SYSTEM WITH FIGHT DETECTION") - print("=" * 60) - - status = moderator.get_model_status() - - if status['custom_weapon_fight_model']: - print("✅ Custom YOLO11 Model (dao + súng + fight): LOADED") - if 'custom_classes' in status: - print(f"📊 Custom classes: {status['custom_classes']}") - else: - print("❌ Custom weapon+fight model: NOT FOUND") - - if status['general_model']: - print("✅ General YOLO11n Model (person detection): LOADED") - else: - print("❌ General model: FAILED") - - if status['nsfw_classifier']: - print("✅ NSFW Classifier: LOADED") - else: - print("❌ NSFW Classifier: FAILED") - - print(f"🖥️ Device: {status['device']}") - print(f"🗄️ Cache system: ENABLED") - print(f"🔪 Knife enhancement: {'ENABLED' if status['knife_enhancement'] else 'DISABLED'}") - print(f"📈 Knife confidence boost: {'ENABLED' if status['knife_boost'] else 'DISABLED'}") - print(f"👊 Fight detection: {'ENABLED' if status['fight_detection'] else 'DISABLED'}") - print(f"🧠 Fight analysis: {'ENABLED' if status['fight_analysis'] else 'DISABLED'}") - - # Enhanced features info - print("\n" + "=" * 60) - print("✨ ENHANCED DETECTION FEATURES") - print("=" * 60) - print("🔧 Image Enhancement:") - print(" - Contrast & brightness optimization") - print(" - Edge sharpening for metallic objects") - print(" - CLAHE for local contrast") - print("📊 Confidence Boosting:") - print(" - Geometric analysis (knives)") - print(" - Motion blur analysis (fights)") - print(" - Edge strength analysis") - print("🎯 Multi-pass Detection:") - print(" - Low threshold pass for knives (0.45)") - print(" - Normal threshold for guns (0.45)") - print(" - Low threshold for fights (0.40)") - print("👊 Fight Analysis:") - print(" - Multi-person fight detection") - print(" - Aggression level assessment") - print(" - Context-aware threat escalation") - - # Example 1: Process single image - print("\n" + "=" * 50) - print("🖼️ SINGLE IMAGE PROCESSING") - print("=" * 50) - - test_image = "test_image.jpg" - - if os.path.exists(test_image): - result = moderator.process_image(test_image) - if result: - print(f"\n📊 DETECTION RESULTS:") - print(f"Risk Level: {result['risk_level']}") - print(f"Total Threats: {result['total_threats']}") - print(f"Processing Method: {result.get('processing_method', 'standard')}") - - breakdown = result.get('detection_breakdown', {}) - if breakdown: - print(f"\n📈 BREAKDOWN:") - print(f" Weapons: {breakdown.get('weapons', 0)}") - print(f" Fights: {breakdown.get('fights', 0)}") - print(f" NSFW: {breakdown.get('nsfw', 0)}") - - # Show weapon-specific results - weapon_detections = [d for d in result['detections'] if d['type'] == 'weapon'] - if weapon_detections: - print(f"\n🔫 WEAPON DETECTIONS: {len(weapon_detections)}") - for i, detection in enumerate(weapon_detections): - method = detection.get('detection_method', 'unknown') - print(f" Weapon {i + 1} ({method}):") - print(f" Class: {detection['class']}") - print(f" Type: {detection['weapon_type']}") - print(f" Confidence: {detection['confidence']:.3f}") - print(f" Threat Level: {detection['threat_level']}") - - # Show fight-specific results - fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] - if fight_detections: - print(f"\n👊 FIGHT DETECTIONS: {len(fight_detections)}") - for i, detection in enumerate(fight_detections): - method = detection.get('detection_method', 'unknown') - print(f" Fight {i + 1} ({method}):") - print(f" Class: {detection['class']}") - print(f" Type: {detection.get('fight_type', 'unknown')}") - print(f" Confidence: {detection['confidence']:.3f}") - print(f" Threat Level: {detection['threat_level']}") - print(f" Aggression: {detection.get('aggression_level', 'unknown')}") - if 'people_involved' in detection: - print(f" People Involved: {detection['people_involved']}") - if 'context_flags' in detection and detection['context_flags']: - print(f" Context: {', '.join(detection['context_flags'])}") - - # Show NSFW results - nsfw_detections = [d for d in result['detections'] if d['type'] == 'nsfw'] - if nsfw_detections: - print(f"\n🔞 NSFW DETECTIONS: {len(nsfw_detections)}") - for i, detection in enumerate(nsfw_detections): - method = detection.get('method', 'unknown') - print(f" NSFW {i + 1} ({method}):") - print(f" Class: {detection['class']}") - print(f" Confidence: {detection['confidence']:.3f}") - if 'skin_ratio' in detection: - print(f" Skin Ratio: {detection['skin_ratio']:.2f}") - else: - print(f"⚠️ Test image not found: {test_image}") - print("Creating a test pattern to demonstrate detection...") - - # Create a synthetic test image - test_img = np.ones((640, 640, 3), dtype=np.uint8) * 128 - cv2.putText(test_img, "Test Pattern", (200, 320), - cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3) - - result = moderator.process_image(test_img) - print("✅ Test pattern processed successfully") - - # Example 2: Enhanced webcam processing with fight detection - print("\n" + "=" * 60) - print("📹 ENHANCED WEBCAM PROCESSING WITH FIGHT DETECTION") - print("=" * 60) - print("Starting enhanced detection on webcam...") - print("🎮 Controls:") - print(" - Press 'q' to quit") - print(" - Press 's' to save frame") - print(" - Press 'i' to show model info") - print(" - Press 'e' to toggle enhancement") - print(" - Press 'b' to toggle knife confidence boost") - print(" - Press 'f' to toggle fight analysis") - print(" - Press 'h' for help") - - try: - cap = cv2.VideoCapture(0) - - if not cap.isOpened(): - print("❌ Cannot open webcam. Check if camera is connected.") - else: - print("✅ Enhanced webcam processing started") - - frame_count = 0 - detection_stats = { - 'weapons': 0, - 'knives': 0, - 'guns': 0, - 'fights': 0, - 'nsfw': 0, - 'total_frames': 0, - 'fight_incidents': 0 - } - - # Adaptive processing variables - process_interval = 2 # Start with every 2nd frame - last_detection_frame = 0 - consecutive_safe_frames = 0 - - while True: - ret, frame = cap.read() - if not ret: - print("❌ Cannot read from webcam") - break - - frame_count += 1 - detection_stats['total_frames'] = frame_count - frame = cv2.flip(frame, 1) - - # Add status overlay - y_offset = frame.shape[0] - 120 - cv2.putText(frame, - f"Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}", - (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) - - cv2.putText(frame, - f"Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}", - (10, y_offset + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) - - cv2.putText(frame, - f"Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}", - (10, y_offset + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) - - model_info = "Models: Custom+General" if moderator.weapon_model_custom else "General Only" - cv2.putText(frame, model_info, (10, y_offset + 60), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) - - # Adaptive frame processing - process more frequently when threats detected - should_process = False - - # Always process if recent threats (within last 30 frames) - if frame_count - last_detection_frame <= 30: - should_process = (frame_count % 1 == 0) # Process every frame - cv2.putText(frame, "HIGH ALERT MODE", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) - # Normal processing with reduced interval - elif frame_count % process_interval == 0: - should_process = True - - if should_process: - result = moderator.process_image(frame) - - if result and result['action_required']: - last_detection_frame = frame_count # Update last detection frame - consecutive_safe_frames = 0 - process_interval = 1 # Process every frame when threats detected - - # Count detections by type - for detection in result['detections']: - if detection['type'] == 'weapon': - detection_stats['weapons'] += 1 - if detection['weapon_type'] == 'blade': - detection_stats['knives'] += 1 - elif detection['weapon_type'] == 'firearm': - detection_stats['guns'] += 1 - elif detection['type'] == 'fight': - detection_stats['fights'] += 1 - if detection.get('aggression_level') in ['high', 'extreme']: - detection_stats['fight_incidents'] += 1 - elif detection['type'] == 'nsfw': - detection_stats['nsfw'] += 1 - - print( - f"⚠️ Frame {frame_count}: {result['risk_level']} risk - {result['total_threats']} threats!") - - # Show specific detections with fight info - for detection in result['detections']: - if detection['type'] == 'weapon': - icon = "🔪" if detection['weapon_type'] == 'blade' else "🔫" - method = detection.get('detection_method', 'unknown').split('_')[-1] - print(f" {icon} {detection['class']} ({detection['confidence']:.3f}) [{method}]") - elif detection['type'] == 'fight': - fight_type = detection.get('fight_type', 'general') - aggression = detection.get('aggression_level', 'unknown') - people = detection.get('people_involved', 0) - method = detection.get('detection_method', 'unknown').split('_')[-1] - print(f" 👊 FIGHT: {fight_type} ({detection['confidence']:.3f}) [{method}]") - print(f" Aggression: {aggression}, People: {people}") - - # Use annotated frame - if 'annotated_image' in result: - cv2.imshow('Enhanced Detection System (Weapons + Fights)', result['annotated_image']) - else: - # Add threat counter - breakdown = result.get('detection_breakdown', {}) - threat_text = f"THREATS: W:{breakdown.get('weapons', 0)} F:{breakdown.get('fights', 0)} N:{breakdown.get('nsfw', 0)}" - cv2.putText(frame, threat_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) - cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) - else: - consecutive_safe_frames += 1 - # Gradually increase processing interval when safe (up to max 3) - if consecutive_safe_frames > 30: - process_interval = min(3, process_interval + 1) - consecutive_safe_frames = 0 - - cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) - cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) - cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) - else: - cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) - cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) - cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) - - # Handle key presses - key = cv2.waitKey(1) & 0xFF - if key == ord('q'): - print("🛑 Webcam stopped by user") - break - elif key == ord('s'): - filename = f"enhanced_detection_{frame_count}.jpg" - cv2.imwrite(filename, frame) - print(f"💾 Frame saved as {filename}") - elif key == ord('i'): - print(f"\n📊 Model Status:") - current_status = moderator.get_model_status() - for k, v in current_status.items(): - print(f" {k}: {v}") - elif key == ord('e'): - # Toggle enhancement - moderator.config['weapon_detection']['use_enhancement'] = \ - not moderator.config['weapon_detection']['use_enhancement'] - print( - f"🔧 Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}") - elif key == ord('b'): - # Toggle knife boost - moderator.config['weapon_detection']['boost_knife_detection'] = \ - not moderator.config['weapon_detection']['boost_knife_detection'] - print( - f"📈 Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}") - elif key == ord('f'): - # Toggle fight analysis - moderator.config['weapon_detection']['fight_analysis'] = \ - not moderator.config['weapon_detection']['fight_analysis'] - print( - f"👊 Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}") - elif key == ord('h'): - print("\n🎮 Controls:") - print(" 'q': quit") - print(" 's': save frame") - print(" 'i': model info") - print(" 'e': toggle enhancement") - print(" 'b': toggle knife confidence boost") - print(" 'f': toggle fight analysis") - print(" 'h': help") - - # Show comprehensive session statistics - print(f"\n📈 Session Statistics:") - print(f" Total frames: {detection_stats['total_frames']}") - print(f" Total weapon detections: {detection_stats['weapons']}") - print(f" - Knives/Dao: {detection_stats['knives']}") - print(f" - Guns: {detection_stats['guns']}") - print(f" Total fight detections: {detection_stats['fights']}") - print(f" - High-aggression incidents: {detection_stats['fight_incidents']}") - print(f" NSFW detections: {detection_stats['nsfw']}") - - if detection_stats['total_frames'] > 0: - total_detections = detection_stats['weapons'] + detection_stats['fights'] + detection_stats['nsfw'] - detection_rate = (total_detections / detection_stats['total_frames'] * 100) - print(f" Overall detection rate: {detection_rate:.1f}%") - - if detection_stats['weapons'] > 0: - knife_ratio = detection_stats['knives'] / detection_stats['weapons'] * 100 - print(f" Knife detection ratio: {knife_ratio:.1f}% of weapons") - - if detection_stats['fights'] > 0: - incident_ratio = detection_stats['fight_incidents'] / detection_stats['fights'] * 100 - print(f" High-aggression fight ratio: {incident_ratio:.1f}% of fights") - - cap.release() - cv2.destroyAllWindows() - print("✅ Enhanced webcam session completed") - - except Exception as e: - print(f"❌ Webcam error: {e}") - - # Show final system status - print(f"\n💾 {moderator.get_memory_usage()}") - print(f"🗄️ Final cache size: {len(moderator.detection_cache)} entries") - - # Save enhanced report - moderator.save_report("enhanced_detection_with_fights_report.json") - - print("\n✅ Enhanced Content Moderation System with Fight Detection completed!") - print("💡 New fight detection capabilities:") - print(" - Behavioral fight pattern recognition") - print(" - Multi-person fight analysis") - print(" - Aggression level assessment") - print(" - Context-aware threat escalation") - print(" - Fight timeline tracking for videos") - print("💡 Enhanced weapon detection:") - print(" - Image enhancement preprocessing") - print(" - Dynamic confidence thresholds") - print(" - Geometric feature analysis") - print(" - Multi-pass detection strategy") - - -if __name__ == "__main__": +import cv2 +import numpy as np +import torch +import os +import json +import warnings + +warnings.filterwarnings('ignore') + +# Import required libraries +try: + from ultralytics import YOLO + from transformers import pipeline + from PIL import Image, ImageDraw, ImageFont + import requests + from datetime import datetime + + # MediaPipe import with fallback + try: + import mediapipe as mp + + MEDIAPIPE_AVAILABLE = True + print("✅ MediaPipe imported successfully") + except ImportError: + MEDIAPIPE_AVAILABLE = False + print("⚠️ MediaPipe not available - pose detection disabled") + except Exception as e: + MEDIAPIPE_AVAILABLE = False + print(f"⚠️ MediaPipe import error: {e} - pose detection disabled") + +except ImportError as e: + print(f"Missing dependency: {e}") + print("Please install: pip install ultralytics transformers pillow requests") + print("For MediaPipe: pip install mediapipe==0.10.18") + + +class ContentModerator: + def __init__(self, config=None): + self.config = config or self.get_default_config() + self.device = 'cuda' if torch.cuda.is_available() else 'cpu' + + # CPU optimizations + if self.device == 'cpu': + print("💻 CPU mode detected - applying optimizations...") + torch.set_num_threads(4) + self.config['performance']['half_precision'] = False + self.config['nsfw_detection']['pose_analysis'] = False + + # Initialize models + self.weapon_model = None # Primary weapon model + self.weapon_model_custom = None # Custom model for dao + súng + fight + self.weapon_model_general = None # General model for person + backup weapons + self.nsfw_classifier = None + self.pose_detector = None + + # Performance optimization + self.detection_cache = {} + self.cache_ttl = 2 # Cache for 2 seconds + + # Results storage + self.detection_history = [] + + print(f"🚀 Content Moderator initialized on {self.device}") + if self.device == 'cpu': + print("⚡ CPU optimizations enabled") + + self.setup_models() + + def get_default_config(self): + """Default configuration optimized for CPU/GPU with enhanced knife and fight detection""" + # Auto-detect optimal settings + is_cuda = torch.cuda.is_available() + + return { + 'weapon_detection': { + 'enabled': True, + 'confidence_threshold': 0.45, # For guns + 'knife_confidence': 0.45, # Lower threshold for knives + 'fight_confidence': 0.40, # Lower threshold for fights (behavioral) + 'model_size': 'yolo11n', + 'classes': ['knife', 'gun', 'rifle', 'pistol', 'weapon', 'fight'], + 'use_enhancement': True, # Enable image enhancement for knives + 'multi_pass': True, # Enable multi-pass detection + 'boost_knife_detection': True, # Enable knife confidence boosting + 'fight_detection': True, # Enable fight-specific detection + 'fight_analysis': True # Enable advanced fight behavior analysis + }, + 'fight_detection': { + 'enabled': True, + 'confidence_threshold': 0.40, + 'pose_analysis': True, # Analyze poses for fighting + 'motion_analysis': False, # Motion-based fight detection (for video) + 'aggression_keywords': ['fight', 'violence', 'aggression', 'combat'], + 'threat_escalation': True, # Escalate threat level for fights + 'multi_person_analysis': True # Analyze interactions between people + }, + 'nsfw_detection': { + 'enabled': True, + 'confidence_threshold': 0.7, + 'skin_detection': True, + 'pose_analysis': False, + 'region_analysis': True + }, + 'performance': { + 'image_size': 416 if is_cuda else 320, + 'batch_size': 1, + 'half_precision': is_cuda, + 'use_flash_attention': False, + 'cpu_optimization': not is_cuda + }, + 'output': { + 'save_detections': True, + 'draw_boxes': True, + 'log_results': True + } + } + + def setup_models(self): + """Initialize all detection models""" + try: + # Clear GPU cache + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + # 1. Setup Weapon Detection (now includes fight detection) + if self.config['weapon_detection']['enabled']: + self.setup_weapon_detector() + + # 2. Setup NSFW Detection + if self.config['nsfw_detection']['enabled']: + self.setup_nsfw_detector() + + print("✅ All models loaded successfully!") + + except Exception as e: + print(f"❌ Error setting up models: {e}") + + def setup_weapon_detector(self): + """Setup dual model system: Custom for weapons + fights + General for person detection""" + try: + print("🔫 Loading weapon and fight detection models...") + + # Model 1: Custom YOLO11 for weapons (dao + súng + fight) + custom_model_path = "models/best.pt" + project_root = os.path.dirname(os.path.abspath(__file__)) + full_model_path = os.path.join(project_root, custom_model_path) + + if os.path.exists(full_model_path): + print(f"✅ Loading custom weapon+fight model: {full_model_path}") + self.weapon_model_custom = YOLO(full_model_path) + print("🎯 Custom weapon+fight model (dao + súng + fight) loaded!") + + # Show custom model classes + if hasattr(self.weapon_model_custom, 'names'): + classes = list(self.weapon_model_custom.names.values()) + print(f"📊 Custom classes: {classes}") + + # Check if fight class is available + if any('fight' in str(cls).lower() for cls in classes): + print("👊 Fight detection enabled in custom model") + else: + print("⚠️ Fight class not found in custom model") + else: + print("⚠️ Custom weapon+fight model not found") + self.weapon_model_custom = None + + # Model 2: General YOLO11n for person detection and fight fallback + print("👤 Loading general model for person detection...") + self.weapon_model_general = YOLO('yolo11n.pt') + print("✅ General YOLO11n loaded for person detection") + + # Set primary weapon model + self.weapon_model = self.weapon_model_custom if self.weapon_model_custom else self.weapon_model_general + + # Optimize models for performance + if self.device == 'cuda' and self.config['performance']['half_precision']: + try: + if self.weapon_model_custom: + self.weapon_model_custom.model.half() + self.weapon_model_general.model.half() + print("✅ Half precision enabled for both models") + except: + print("⚠️ Half precision not supported") + + print("🔥 Dual model system ready with fight detection!") + + except Exception as e: + print(f"❌ Error loading weapon+fight models: {e}") + self.weapon_model = None + self.weapon_model_custom = None + self.weapon_model_general = None + + def detect_weapons(self, image): + """Enhanced dual model weapon and fight detection""" + detections = [] + + try: + imgsz = self.config['performance']['image_size'] + use_half = self.config['performance']['half_precision'] and self.device == 'cuda' + + # Prepare multiple versions of the image + images_to_process = [(image, 1.0, "original")] + + if self.config['weapon_detection']['use_enhancement']: + enhanced_image = self.enhance_knife_detection(image) + images_to_process.append((enhanced_image, 1.15, "enhanced")) + + # Process each image version + for img, weight_multiplier, img_type in images_to_process: + if self.weapon_model_custom: + # Use different confidence thresholds for different detection types + knife_conf = self.config['weapon_detection']['knife_confidence'] + gun_conf = self.config['weapon_detection']['confidence_threshold'] + fight_conf = self.config['weapon_detection']['fight_confidence'] + + # Multi-pass detection with different thresholds + passes = [ + (knife_conf, "knife_pass"), # Low threshold for knives + (gun_conf, "gun_pass"), # Normal threshold for guns + (fight_conf, "fight_pass") # Low threshold for fights + ] if self.config['weapon_detection']['multi_pass'] else [ + (min(knife_conf, fight_conf), "single_pass")] + + for conf_threshold, pass_type in passes: + try: + results = self.weapon_model_custom( + img, + imgsz=imgsz, + conf=conf_threshold, + device=self.device, + half=use_half, + verbose=False, + augment=True # Enable test-time augmentation + ) + + for result in results: + boxes = result.boxes + if boxes is not None: + for box in boxes: + class_id = int(box.cls[0]) + + if hasattr(result, 'names') and class_id in result.names: + class_name = result.names[class_id].lower() + else: + class_name = f"detection_{class_id}" + + x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() + confidence = float(box.conf[0]) * weight_multiplier + + # Determine detection type and apply appropriate processing + if self.is_fight_detection(class_name): + # Fight detection processing + confidence = self.boost_fight_confidence( + img, [x1, y1, x2, y2], confidence, class_name + ) + + detection_type = 'fight' + min_conf = fight_conf + threat_level = self.assess_fight_threat(confidence, img, [x1, y1, x2, y2]) + + else: + # Weapon detection processing + if self.config['weapon_detection']['boost_knife_detection']: + if 'dao' in class_name or 'knife' in class_name or 'blade' in class_name: + confidence = self.boost_knife_confidence( + img, [x1, y1, x2, y2], confidence, class_name + ) + + detection_type = 'weapon' + weapon_type = self.classify_weapon_type(class_name) + min_conf = knife_conf if weapon_type == 'blade' else gun_conf + threat_level = self.assess_weapon_threat(weapon_type, confidence) + + if confidence >= min_conf: + detection_data = { + 'type': detection_type, + 'class': class_name, + 'confidence': min(confidence, 0.99), + 'bbox': [int(x1), int(y1), int(x2), int(y2)], + 'threat_level': threat_level, + 'detection_method': f'custom_model_{img_type}_{pass_type}' + } + + # Add type-specific fields + if detection_type == 'weapon': + detection_data['weapon_type'] = weapon_type + elif detection_type == 'fight': + detection_data['fight_type'] = self.classify_fight_type(class_name) + detection_data['aggression_level'] = self.assess_aggression_level( + confidence) + + detections.append(detection_data) + + icon = "👊" if detection_type == 'fight' else "🎯" + print( + f" {icon} Detected: {class_name} (conf: {confidence:.3f}, method: {img_type}_{pass_type})") + + except Exception as e: + print(f"⚠️ Detection pass error ({pass_type}): {e}") + + # Fallback: General model for backup detection (only if no custom detections) + if self.weapon_model_general and len(detections) == 0: + detections.extend(self.fallback_detection(image, imgsz, use_half)) + + # Remove duplicate detections + detections = self.remove_duplicate_detections(detections) + + # Additional fight analysis if enabled + if self.config['fight_detection']['enabled'] and self.config['fight_detection']['multi_person_analysis']: + fight_detections = [d for d in detections if d['type'] == 'fight'] + if fight_detections: + enhanced_fights = self.analyze_fight_context(image, fight_detections) + # Replace original fight detections with enhanced ones + detections = [d for d in detections if d['type'] != 'fight'] + enhanced_fights + + return detections + + except Exception as e: + print(f"❌ Weapon and fight detection error: {e}") + return [] + + def is_fight_detection(self, class_name): + """Check if detection is fight-related""" + fight_keywords = ['fight', 'fighting', 'combat', 'violence', 'aggression', 'brawl', 'scuffle'] + return any(keyword in class_name.lower() for keyword in fight_keywords) + + def classify_fight_type(self, class_name): + """Classify type of fight detected""" + class_name = class_name.lower() + + if any(word in class_name for word in ['punch', 'boxing', 'fist']): + return 'physical_combat' + elif any(word in class_name for word in ['kick', 'martial', 'karate']): + return 'martial_arts' + elif any(word in class_name for word in ['wrestle', 'grapple']): + return 'wrestling' + elif any(word in class_name for word in ['group', 'mob', 'crowd']): + return 'group_violence' + else: + return 'general_fight' + + def boost_fight_confidence(self, image, bbox, initial_confidence, class_name): + """Boost confidence for fight detection based on contextual analysis""" + try: + x1, y1, x2, y2 = [int(coord) for coord in bbox] + + # Ensure bbox is within image bounds + x1 = max(0, x1) + y1 = max(0, y1) + x2 = min(image.shape[1], x2) + y2 = min(image.shape[0], y2) + + roi = image[y1:y2, x1:x2] + + if roi.size == 0: + return initial_confidence + + boost = 0 + + # 1. Motion blur analysis (indicates rapid movement) + gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) + blur_variance = cv2.Laplacian(gray, cv2.CV_64F).var() + if blur_variance < 100: # Low variance indicates blur/motion + boost += 0.10 + + # 2. Edge density (chaotic scenes have more edges) + edges = cv2.Canny(gray, 50, 150) + edge_density = np.count_nonzero(edges) / edges.size + if edge_density > 0.15: + boost += 0.08 + + # 3. Color analysis (fights often have varied, chaotic colors) + hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) + color_variance = np.var(hsv[:, :, 1]) # Saturation variance + if color_variance > 1000: + boost += 0.05 + + # 4. Texture analysis (complex textures indicate multiple overlapping objects) + gray_f = np.float32(gray) + texture_response = cv2.cornerHarris(gray_f, 2, 3, 0.04) + texture_strength = np.mean(texture_response) + if texture_strength > 0.01: + boost += 0.07 + + # 5. Aspect ratio analysis (fights often have irregular bounding boxes) + height = y2 - y1 + width = x2 - x1 + if height > 0 and width > 0: + aspect_ratio = max(width, height) / min(width, height) + if 1.2 < aspect_ratio < 3.0: # Moderate irregularity + boost += 0.05 + + final_confidence = min(initial_confidence + boost, 0.95) + + if boost > 0: + print(f" 👊 Fight boost applied: +{boost:.2f} (blur:{blur_variance:.0f}, edge:{edge_density:.2f})") + + return final_confidence + + except Exception as e: + print(f"⚠️ Fight confidence boost error: {e}") + return initial_confidence + + def assess_fight_threat(self, confidence, image, bbox): + """Assess threat level of detected fight""" + base_threat = 'medium' # Fights start at medium threat + + # Escalate based on confidence + if confidence >= 0.85: + base_threat = 'critical' + elif confidence >= 0.70: + base_threat = 'high' + elif confidence >= 0.50: + base_threat = 'medium' + else: + base_threat = 'low' + + # Additional context-based escalation + try: + x1, y1, x2, y2 = bbox + fight_area = (x2 - x1) * (y2 - y1) + image_area = image.shape[0] * image.shape[1] + area_ratio = fight_area / image_area + + # Large fights are more dangerous + if area_ratio > 0.5: # Fight covers >50% of image + if base_threat == 'medium': + base_threat = 'high' + elif base_threat == 'high': + base_threat = 'critical' + + except Exception as e: + print(f"⚠️ Fight threat assessment error: {e}") + + return base_threat + + def assess_aggression_level(self, confidence): + """Assess aggression level based on confidence""" + if confidence >= 0.80: + return 'extreme' + elif confidence >= 0.65: + return 'high' + elif confidence >= 0.45: + return 'moderate' + else: + return 'low' + + def analyze_fight_context(self, image, fight_detections): + """Enhanced analysis of fight context with multi-person detection""" + enhanced_fights = [] + + try: + # Detect all persons in the image + persons = self.detect_persons(image) + + for fight in fight_detections: + enhanced_fight = fight.copy() + + # Count people involved in or near the fight + fight_bbox = fight['bbox'] + people_in_fight = 0 + people_nearby = 0 + + for person in persons: + person_bbox = person['bbox'] + + # Calculate overlap with fight area + overlap = self.calculate_bbox_overlap(fight_bbox, person_bbox) + + if overlap > 0.3: # Person is directly involved + people_in_fight += 1 + elif overlap > 0.1: # Person is nearby + people_nearby += 1 + + # Update fight information based on context + enhanced_fight['people_involved'] = people_in_fight + enhanced_fight['people_nearby'] = people_nearby + enhanced_fight['total_people'] = people_in_fight + people_nearby + + # Escalate threat based on number of people + if people_in_fight >= 3: + if enhanced_fight['threat_level'] == 'medium': + enhanced_fight['threat_level'] = 'high' + elif enhanced_fight['threat_level'] == 'high': + enhanced_fight['threat_level'] = 'critical' + enhanced_fight['fight_type'] = 'group_violence' + + # Add context flags + enhanced_fight['context_flags'] = [] + if people_in_fight >= 3: + enhanced_fight['context_flags'].append('multi_person_fight') + if people_nearby >= 2: + enhanced_fight['context_flags'].append('crowd_present') + + enhanced_fights.append(enhanced_fight) + + print(f" 👥 Fight context: {people_in_fight} involved, {people_nearby} nearby") + + except Exception as e: + print(f"⚠️ Fight context analysis error: {e}") + return fight_detections + + return enhanced_fights + + def calculate_bbox_overlap(self, bbox1, bbox2): + """Calculate overlap ratio between two bounding boxes""" + x1_min, y1_min, x1_max, y1_max = bbox1 + x2_min, y2_min, x2_max, y2_max = bbox2 + + # Calculate intersection + intersect_xmin = max(x1_min, x2_min) + intersect_ymin = max(y1_min, y2_min) + intersect_xmax = min(x1_max, x2_max) + intersect_ymax = min(y1_max, y2_max) + + if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: + return 0.0 + + intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) + bbox1_area = (x1_max - x1_min) * (y1_max - y1_min) + + return intersect_area / bbox1_area if bbox1_area > 0 else 0 + + def fallback_detection(self, image, imgsz, use_half): + """Fallback detection using general model""" + detections = [] + + try: + general_results = self.weapon_model_general( + image, + imgsz=imgsz, + conf=0.4, + device=self.device, + half=use_half, + verbose=False + ) + + for result in general_results: + boxes = result.boxes + if boxes is not None: + for box in boxes: + class_id = int(box.cls[0]) + class_name = result.names[class_id].lower() + + # Filter for weapon-like objects + weapon_keywords = ['knife', 'scissors', 'fork'] + + if any(keyword in class_name for keyword in weapon_keywords): + x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() + confidence = float(box.conf[0]) + + detections.append({ + 'type': 'weapon', + 'class': class_name, + 'weapon_type': 'blade', + 'confidence': confidence, + 'bbox': [int(x1), int(y1), int(x2), int(y2)], + 'threat_level': self.assess_weapon_threat('blade', confidence), + 'detection_method': 'general_model_fallback' + }) + + except Exception as e: + print(f"⚠️ General detection error: {e}") + + return detections + + # ... (rest of the existing methods remain the same) ... + + def enhance_knife_detection(self, image): + """Enhance image specifically for better knife/dao detection""" + try: + # 1. Increase contrast and brightness for metallic objects + enhanced = cv2.convertScaleAbs(image, alpha=1.4, beta=25) + + # 2. Apply sharpening kernel to highlight edges + kernel_sharpen = np.array([[-1, -1, -1], + [-1, 9, -1], + [-1, -1, -1]]) + sharpened = cv2.filter2D(enhanced, -1, kernel_sharpen) + + # 3. Apply CLAHE for better local contrast + lab = cv2.cvtColor(sharpened, cv2.COLOR_BGR2LAB) + l, a, b = cv2.split(lab) + clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) + l = clahe.apply(l) + enhanced_final = cv2.merge([l, a, b]) + enhanced_final = cv2.cvtColor(enhanced_final, cv2.COLOR_LAB2BGR) + + return enhanced_final + except Exception as e: + print(f"⚠️ Enhancement failed: {e}") + return image + + def boost_knife_confidence(self, image, bbox, initial_confidence, class_name): + """Boost confidence for knife/dao based on geometric and visual features""" + try: + x1, y1, x2, y2 = [int(coord) for coord in bbox] + + # Ensure bbox is within image bounds + x1 = max(0, x1) + y1 = max(0, y1) + x2 = min(image.shape[1], x2) + y2 = min(image.shape[0], y2) + + roi = image[y1:y2, x1:x2] + + if roi.size == 0: + return initial_confidence + + boost = 0 + + # 1. Check aspect ratio (knives are typically elongated) + height = y2 - y1 + width = x2 - x1 + if height > 0 and width > 0: + aspect_ratio = max(width, height) / min(width, height) + if aspect_ratio > 2.5: # Elongated shape + boost += 0.15 + elif aspect_ratio > 2.0: + boost += 0.10 + + # 2. Check for metallic reflection (brightness) + gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) + mean_brightness = np.mean(gray) + std_brightness = np.std(gray) + + if mean_brightness > 140: # Bright (metallic) + boost += 0.10 + if std_brightness > 50: # High contrast (blade edge) + boost += 0.05 + + # 3. Edge detection (knives have strong edges) + edges = cv2.Canny(gray, 50, 150) + edge_ratio = np.count_nonzero(edges) / edges.size + if edge_ratio > 0.15: # Strong edges + boost += 0.10 + elif edge_ratio > 0.10: + boost += 0.05 + + # 4. Check for blade-like gradient + if height > width: # Vertical orientation + gradient = np.gradient(gray, axis=0) + else: # Horizontal orientation + gradient = np.gradient(gray, axis=1) + + gradient_strength = np.mean(np.abs(gradient)) + if gradient_strength > 10: + boost += 0.05 + + # Apply boost with class-specific multiplier + if 'dao' in class_name.lower() or 'knife' in class_name.lower(): + boost *= 1.2 # Extra boost for knife/dao classes + + final_confidence = min(initial_confidence + boost, 0.95) + + if boost > 0: + print( + f" 🔪 Knife boost applied: +{boost:.2f} (AR:{aspect_ratio:.1f}, Bright:{mean_brightness:.0f}, Edge:{edge_ratio:.2f})") + + return final_confidence + + except Exception as e: + print(f"⚠️ Confidence boost error: {e}") + return initial_confidence + + def detect_persons(self, image): + """Detect persons using general model (needed for NSFW and fight analysis)""" + persons = [] + + if not self.weapon_model_general: + return persons + + try: + imgsz = self.config['performance']['image_size'] + use_half = self.config['performance']['half_precision'] and self.device == 'cuda' + + results = self.weapon_model_general( + image, + imgsz=imgsz, + conf=0.3, + device=self.device, + half=use_half, + verbose=False + ) + + for result in results: + boxes = result.boxes + if boxes is not None: + for box in boxes: + class_id = int(box.cls[0]) + class_name = result.names[class_id].lower() + + if class_name == 'person': + x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() + confidence = float(box.conf[0]) + + persons.append({ + 'class': 'person', + 'confidence': confidence, + 'bbox': [int(x1), int(y1), int(x2), int(y2)] + }) + + return persons + + except Exception as e: + print(f"❌ Person detection error: {e}") + return [] + + def classify_weapon_type(self, class_name): + """Classify weapon type from class name""" + class_name = class_name.lower() + + # Knife/Blade keywords (expanded) + knife_keywords = ['knife', 'dao', 'blade', 'dagger', 'sword', 'machete', 'katana', 'cutter'] + if any(keyword in class_name for keyword in knife_keywords): + return 'blade' + + # Gun/Firearm keywords + gun_keywords = ['gun', 'pistol', 'rifle', 'firearm', 'revolver', 'shotgun', 'súng'] + if any(keyword in class_name for keyword in gun_keywords): + return 'firearm' + + # Other weapons + other_keywords = ['axe', 'hammer', 'club', 'bat'] + if any(keyword in class_name for keyword in other_keywords): + return 'blunt_weapon' + + # Check for numbered weapon classes + if 'weapon' in class_name: + try: + weapon_id = int(class_name.split('_')[-1]) + if weapon_id in [0, 1]: # Assuming 0,1 are firearms + return 'firearm' + elif weapon_id in [2, 3]: # Assuming 2,3 are blades + return 'blade' + else: + return 'unknown_weapon' + except: + pass + + return 'unknown_weapon' + + def assess_weapon_threat(self, weapon_type, confidence): + """Assess threat level of detected weapon""" + threat_levels = { + 'firearm': 'critical', + 'blade': 'high', + 'blunt_weapon': 'medium', + 'unknown_weapon': 'medium' + } + + base_threat = threat_levels.get(weapon_type, 'medium') + + # Adjust based on confidence + if confidence >= 0.9: + if base_threat == 'medium': + return 'high' + elif base_threat == 'high': + return 'critical' + else: + return base_threat + elif confidence >= 0.7: + return base_threat + elif confidence >= 0.5: + if base_threat == 'critical': + return 'high' + elif base_threat == 'high': + return 'medium' + else: + return base_threat + else: + if base_threat == 'critical': + return 'medium' + elif base_threat == 'high': + return 'low' + else: + return 'low' + + def remove_duplicate_detections(self, detections, iou_threshold=0.4): + """Remove duplicate detections using Non-Maximum Suppression""" + if len(detections) <= 1: + return detections + + # Sort by confidence (highest first) + detections = sorted(detections, key=lambda x: x['confidence'], reverse=True) + + keep = [] + for i, det1 in enumerate(detections): + should_keep = True + for det2 in keep: + # Check if same type and overlapping + if det1['type'] == det2['type']: + iou = self.calculate_iou(det1['bbox'], det2['bbox']) + if iou > iou_threshold: + should_keep = False + break + + if should_keep: + keep.append(det1) + + return keep + + def calculate_iou(self, box1, box2): + """Calculate Intersection over Union between two bounding boxes""" + x1_min, y1_min, x1_max, y1_max = box1 + x2_min, y2_min, x2_max, y2_max = box2 + + # Calculate intersection + intersect_xmin = max(x1_min, x2_min) + intersect_ymin = max(y1_min, y2_min) + intersect_xmax = min(x1_max, x2_max) + intersect_ymax = min(y1_max, y2_max) + + if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: + return 0.0 + + intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) + + # Calculate union + box1_area = (x1_max - x1_min) * (y1_max - y1_min) + box2_area = (x2_max - x2_min) * (y2_max - y2_min) + union_area = box1_area + box2_area - intersect_area + + return intersect_area / union_area if union_area > 0 else 0 + + # ... (continue with remaining NSFW detection methods) ... + + def detect_nsfw_content(self, image): + """Enhanced NSFW detection with person detection first""" + detections = [] + + try: + if len(image.shape) == 3 and image.shape[2] == 3: + rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + else: + rgb_image = image + + # Stage 1: Detect persons first (optimization) + persons = self.detect_persons(image) + + if not persons: + # No persons detected, skip detailed NSFW analysis + return detections + + print(f"👤 Found {len(persons)} person(s), analyzing for NSFW content...") + + # Stage 2: Overall NSFW Classification + if self.nsfw_classifier: + try: + pil_image = Image.fromarray(rgb_image) + nsfw_result = self.nsfw_classifier(pil_image) + + if nsfw_result[0]['label'] == 'nsfw': + confidence = nsfw_result[0]['score'] + if confidence > self.config['nsfw_detection']['confidence_threshold']: + detections.append({ + 'type': 'nsfw', + 'class': 'inappropriate_content', + 'confidence': confidence, + 'bbox': [0, 0, image.shape[1], image.shape[0]], + 'method': 'classification' + }) + except Exception as e: + print(f"⚠️ NSFW classifier error: {e}") + + # Stage 3: Person-specific skin analysis + if self.config['nsfw_detection']['skin_detection']: + for person in persons: + person_detections = self.analyze_person_skin(image, person) + detections.extend(person_detections) + + # Stage 4: Regional skin analysis (if no person-specific detections) + if self.config['nsfw_detection']['region_analysis'] and len(detections) == 0: + skin_detections = self.detect_skin_regions(image) + detections.extend(skin_detections) + + return detections + + except Exception as e: + print(f"❌ NSFW detection error: {e}") + return [] + + def analyze_person_skin(self, image, person): + """Analyze skin exposure for a specific person""" + detections = [] + + try: + x1, y1, x2, y2 = person['bbox'] + person_region = image[y1:y2, x1:x2] + + if person_region.size == 0: + return detections + + # Convert to HSV for skin detection + hsv_person = cv2.cvtColor(person_region, cv2.COLOR_BGR2HSV) + + # Skin color range + lower_skin = np.array([0, 20, 70], dtype=np.uint8) + upper_skin = np.array([20, 255, 255], dtype=np.uint8) + + # Create skin mask + skin_mask = cv2.inRange(hsv_person, lower_skin, upper_skin) + + # Calculate skin percentage + total_person_pixels = person_region.shape[0] * person_region.shape[1] + skin_pixels = cv2.countNonZero(skin_mask) + skin_ratio = skin_pixels / total_person_pixels if total_person_pixels > 0 else 0 + + # Threshold for suspicious skin exposure + if skin_ratio > 0.4: # 40% of person region is skin + confidence = min(skin_ratio * 2, 1.0) + + detections.append({ + 'type': 'nsfw', + 'class': 'excessive_skin_exposure', + 'confidence': confidence, + 'bbox': [x1, y1, x2, y2], + 'method': 'person_skin_analysis', + 'skin_ratio': skin_ratio + }) + + print(f"🚨 Excessive skin exposure detected: {skin_ratio:.2f} ratio") + + return detections + + except Exception as e: + print(f"❌ Person skin analysis error: {e}") + return [] + + def detect_skin_regions(self, image): + """Detect large skin-colored regions""" + try: + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) + + # Define skin color range + lower_skin = np.array([0, 20, 70], dtype=np.uint8) + upper_skin = np.array([20, 255, 255], dtype=np.uint8) + + # Create skin mask + skin_mask = cv2.inRange(hsv, lower_skin, upper_skin) + + # Apply morphological operations + kernel = np.ones((3, 3), np.uint8) + skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel) + skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel) + + # Find contours + contours, _ = cv2.findContours(skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + detections = [] + image_area = image.shape[0] * image.shape[1] + + for contour in contours: + area = cv2.contourArea(contour) + + # If skin region is too large + if area > image_area * 0.3: + x, y, w, h = cv2.boundingRect(contour) + confidence = min(area / image_area, 1.0) + + detections.append({ + 'type': 'nsfw', + 'class': 'large_skin_region', + 'confidence': confidence, + 'bbox': [x, y, x + w, y + h], + 'method': 'skin_detection' + }) + + return detections + + except Exception as e: + print(f"❌ Skin detection error: {e}") + return [] + + def setup_nsfw_detector(self): + """Setup NSFW detection components (Optimized for CPU)""" + try: + print("🔞 Loading NSFW detection components...") + + # 1. NSFW Classifier (Optimized for CPU) + try: + device_id = 0 if self.device == 'cuda' else -1 + self.nsfw_classifier = pipeline( + "image-classification", + model="Falconsai/nsfw_image_detection", + device=device_id, + use_fast=True + ) + print("✅ NSFW classifier loaded") + except Exception as nsfw_error: + print(f"⚠️ NSFW classifier failed: {nsfw_error}") + print(" Trying backup method...") + try: + # Fallback without specifying use_fast + self.nsfw_classifier = pipeline( + "image-classification", + model="Falconsai/nsfw_image_detection", + device=device_id + ) + print("✅ NSFW classifier loaded (fallback)") + except: + print("❌ NSFW classifier completely failed") + self.nsfw_classifier = None + + # 2. Pose Detection (Fixed import with fallbacks) + if self.config['nsfw_detection']['pose_analysis'] and MEDIAPIPE_AVAILABLE: + try: + import mediapipe as mp + try: + mp_pose = mp.solutions.pose + self.pose_detector = mp_pose.Pose( + static_image_mode=True, + model_complexity=0, + min_detection_confidence=0.5 + ) + print("✅ Pose detector loaded (legacy API)") + except AttributeError: + print("⚠️ MediaPipe API not available") + self.pose_detector = None + self.config['nsfw_detection']['pose_analysis'] = False + + except Exception as pose_error: + print(f"⚠️ Pose detection failed: {pose_error}") + self.pose_detector = None + self.config['nsfw_detection']['pose_analysis'] = False + else: + self.pose_detector = None + if not MEDIAPIPE_AVAILABLE: + print("⚠️ MediaPipe not available - pose analysis disabled") + + except Exception as e: + print(f"❌ Error loading NSFW components: {e}") + print("💡 Falling back to skin detection only") + + def process_image(self, image_path): + """Process single image with enhanced detection including fights""" + try: + # Load image + if isinstance(image_path, str): + image = cv2.imread(image_path) + if image is None: + raise ValueError(f"Could not load image: {image_path}") + cache_key = f"file_{image_path}" + else: + image = image_path + cache_key = f"array_{hash(image.tobytes())}" + + # Check cache + import time + current_time = time.time() + if cache_key in self.detection_cache: + cached_result, timestamp = self.detection_cache[cache_key] + if current_time - timestamp < self.cache_ttl: + return cached_result + + print(f"📸 Processing image: {image.shape}") + + # Run detections + all_detections = [] + + # Weapon and fight detection + if self.config['weapon_detection']['enabled']: + weapon_fight_detections = self.detect_weapons(image) + all_detections.extend(weapon_fight_detections) + + weapon_detections = [d for d in weapon_fight_detections if d['type'] == 'weapon'] + fight_detections = [d for d in weapon_fight_detections if d['type'] == 'fight'] + + print(f"🔫 Found {len(weapon_detections)} weapon(s)") + print(f"👊 Found {len(fight_detections)} fight(s)") + + # Show detailed breakdown + if weapon_detections: + knife_detections = [d for d in weapon_detections if d['weapon_type'] == 'blade'] + if knife_detections: + print(f" 🔪 Including {len(knife_detections)} knife/dao detection(s)") + + if fight_detections: + for fight in fight_detections: + fight_type = fight.get('fight_type', 'unknown') + aggression = fight.get('aggression_level', 'unknown') + print(f" 👊 Fight: {fight_type} (aggression: {aggression})") + + # NSFW detection + if self.config['nsfw_detection']['enabled']: + nsfw_detections = self.detect_nsfw_content(image) + all_detections.extend(nsfw_detections) + print(f"🔞 Found {len(nsfw_detections)} NSFW detection(s)") + + # Generate result + result = { + 'timestamp': datetime.now().isoformat(), + 'image_path': image_path if isinstance(image_path, str) else 'array', + 'detections': all_detections, + 'total_threats': len(all_detections), + 'risk_level': self.calculate_risk_level(all_detections), + 'action_required': len(all_detections) > 0, + 'processing_method': 'enhanced_dual_model_with_fight', + 'detection_breakdown': { + 'weapons': len([d for d in all_detections if d['type'] == 'weapon']), + 'fights': len([d for d in all_detections if d['type'] == 'fight']), + 'nsfw': len([d for d in all_detections if d['type'] == 'nsfw']) + } + } + + # Cache result + self.detection_cache[cache_key] = (result, current_time) + + # Clean old cache entries + self.clean_cache(current_time) + + # Save detection history + self.detection_history.append(result) + + # Draw detections + if self.config['output']['draw_boxes'] and all_detections: + annotated_image = self.draw_detections(image.copy(), all_detections) + result['annotated_image'] = annotated_image + + return result + + except Exception as e: + print(f"❌ Error processing image: {e}") + return None + + def clean_cache(self, current_time): + """Clean expired cache entries""" + try: + expired_keys = [] + for key, (_, timestamp) in self.detection_cache.items(): + if current_time - timestamp > self.cache_ttl: + expired_keys.append(key) + + for key in expired_keys: + del self.detection_cache[key] + + except Exception as e: + print(f"⚠️ Cache cleanup error: {e}") + + def get_model_status(self): + """Get status of all models""" + status = { + 'custom_weapon_fight_model': self.weapon_model_custom is not None, + 'general_model': self.weapon_model_general is not None, + 'nsfw_classifier': self.nsfw_classifier is not None, + 'pose_detector': self.pose_detector is not None, + 'device': self.device, + 'cache_size': len(self.detection_cache), + 'knife_enhancement': self.config['weapon_detection']['use_enhancement'], + 'knife_boost': self.config['weapon_detection']['boost_knife_detection'], + 'fight_detection': self.config['weapon_detection']['fight_detection'], + 'fight_analysis': self.config['weapon_detection']['fight_analysis'] + } + + if self.weapon_model_custom and hasattr(self.weapon_model_custom, 'names'): + status['custom_classes'] = list(self.weapon_model_custom.names.values()) + + return status + + def calculate_risk_level(self, detections): + """Calculate overall risk level including fights""" + if not detections: + return 'safe' + + max_confidence = max(det['confidence'] for det in detections) + threat_types = set(det['type'] for det in detections) + + # Check for critical combinations + has_weapons = 'weapon' in threat_types + has_fights = 'fight' in threat_types + has_nsfw = 'nsfw' in threat_types + + # Fights + weapons = critical + if has_weapons and has_fights: + return 'critical' + + # High confidence fights are critical + fight_detections = [d for d in detections if d['type'] == 'fight'] + if fight_detections: + max_fight_confidence = max(f['confidence'] for f in fight_detections) + if max_fight_confidence > 0.8: + return 'critical' + elif max_fight_confidence > 0.65: + return 'high' + + # Existing weapon logic + if has_weapons and max_confidence > 0.8: + return 'critical' + elif has_weapons or has_fights or max_confidence > 0.9: + return 'high' + elif max_confidence > 0.7: + return 'medium' + else: + return 'low' + + def draw_detections(self, image, detections): + """Draw detection boxes and labels with enhanced visualization for fights""" + try: + colors = { + 'weapon': (0, 0, 255), # Red + 'fight': (0, 165, 255), # Orange for fights + 'nsfw': (255, 0, 255), # Magenta + } + + # Special colors for weapon types + weapon_colors = { + 'blade': (0, 100, 255), # Orange-red for knives + 'firearm': (0, 0, 255), # Red for guns + 'blunt_weapon': (100, 0, 255) # Purple for blunt weapons + } + + # Special colors for fight types + fight_colors = { + 'physical_combat': (0, 140, 255), # Orange + 'martial_arts': (0, 200, 255), # Light orange + 'wrestling': (0, 165, 255), # Medium orange + 'group_violence': (0, 69, 255), # Dark orange + 'general_fight': (0, 165, 255) # Default orange + } + + for det in detections: + x1, y1, x2, y2 = det['bbox'] + + # Choose color based on type + if det['type'] == 'weapon' and 'weapon_type' in det: + color = weapon_colors.get(det['weapon_type'], colors['weapon']) + elif det['type'] == 'fight' and 'fight_type' in det: + color = fight_colors.get(det['fight_type'], colors['fight']) + else: + color = colors.get(det['type'], (0, 255, 0)) + + # Draw rectangle with thicker line for high-threat detections + thickness = 4 if det.get('threat_level') == 'critical' else 3 if det['type'] in ['weapon', + 'fight'] else 2 + cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness) + + # Create detailed label + if det['type'] == 'weapon': + label = f"{det['class']} ({det['confidence']:.2f})" + if 'threat_level' in det: + label += f" [{det['threat_level']}]" + elif det['type'] == 'fight': + label = f"FIGHT: {det['class']} ({det['confidence']:.2f})" + if 'threat_level' in det: + label += f" [{det['threat_level']}]" + if 'aggression_level' in det: + label += f" {det['aggression_level']}" + else: + label = f"{det['type']}: {det['class']} ({det['confidence']:.2f})" + + # Draw label background + label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] + cv2.rectangle(image, (x1, y1 - 25), (x1 + label_size[0] + 5, y1), color, -1) + + # Draw label text + cv2.putText(image, label, (x1 + 2, y1 - 7), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) + + # Add additional context for fights + if det['type'] == 'fight': + context_text = [] + if 'people_involved' in det and det['people_involved'] > 0: + context_text.append(f"People: {det['people_involved']}") + if 'context_flags' in det and det['context_flags']: + context_text.append(f"Flags: {', '.join(det['context_flags'])}") + + if context_text: + context_label = " | ".join(context_text) + cv2.putText(image, context_label, (x1, y2 + 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) + + # Add detection method indicator (small text) + if 'detection_method' in det: + method = det['detection_method'].split('_')[-1] + cv2.putText(image, method, (x1, y2 + 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) + + return image + + except Exception as e: + print(f"❌ Error drawing detections: {e}") + return image + + def process_video(self, video_path, output_path=None, frame_skip=2): + """Process video file with enhanced detection including fights - optimized frame processing""" + try: + cap = cv2.VideoCapture(video_path) + frame_count = 0 + total_detections = [] + fight_timeline = [] # Track fights over time + recent_detections = [] # Track recent detections for adaptive processing + + if output_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + + while True: + ret, frame = cap.read() + if not ret: + break + + frame_count += 1 + + # Adaptive frame processing based on recent detections + should_process = False + + # Always process if recent threats detected (within last 10 frames) + if any(det['frame'] > frame_count - 10 for det in recent_detections[-5:]): + should_process = True + # Or process based on reduced skip rate + elif frame_count % max(1, frame_skip) == 0: + should_process = True + + if not should_process: + if output_path: + out.write(frame) + continue + + # Process frame + result = self.process_image(frame) + if result and result['detections']: + # Add frame number to each detection for tracking + for detection in result['detections']: + detection['frame'] = frame_count + + total_detections.extend(result['detections']) + recent_detections.append({'frame': frame_count, 'count': len(result['detections'])}) + + # Track fight timeline + fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] + if fight_detections: + timestamp = frame_count / cap.get(cv2.CAP_PROP_FPS) + fight_timeline.append({ + 'timestamp': timestamp, + 'frame': frame_count, + 'fights': len(fight_detections), + 'max_aggression': max(f.get('aggression_level', 'low') for f in fight_detections) + }) + + # Reduce frame_skip temporarily when fight detected + frame_skip = max(1, frame_skip // 2) + + print(f"⚠️ Frame {frame_count}: {len(result['detections'])} threats detected") + + breakdown = result.get('detection_breakdown', {}) + if breakdown.get('fights', 0) > 0: + print(f" 👊 Fights: {breakdown['fights']}") + + if output_path and 'annotated_image' in result: + out.write(result['annotated_image']) + elif output_path: + out.write(frame) + else: + # No detections - can increase frame_skip for efficiency + if len(recent_detections) > 5 and all(det['count'] == 0 for det in recent_detections[-5:]): + frame_skip = min(5, frame_skip + 1) + + if output_path: + out.write(frame) + + cap.release() + if output_path: + out.release() + + # Analysis of fight patterns + fight_analysis = {} + if fight_timeline: + fight_analysis = { + 'total_fight_incidents': len(fight_timeline), + 'first_fight_time': fight_timeline[0]['timestamp'], + 'last_fight_time': fight_timeline[-1]['timestamp'], + 'peak_aggression_time': max(fight_timeline, key=lambda x: x['max_aggression'])['timestamp'], + 'fight_duration_coverage': fight_timeline[-1]['timestamp'] - fight_timeline[0]['timestamp'] if len( + fight_timeline) > 1 else 0 + } + + return { + 'total_frames_processed': frame_count // frame_skip, + 'total_detections': len(total_detections), + 'detections': total_detections, + 'fight_timeline': fight_timeline, + 'fight_analysis': fight_analysis, + 'detection_breakdown': { + 'weapons': len([d for d in total_detections if d['type'] == 'weapon']), + 'fights': len([d for d in total_detections if d['type'] == 'fight']), + 'nsfw': len([d for d in total_detections if d['type'] == 'nsfw']) + } + } + + except Exception as e: + print(f"❌ Error processing video: {e}") + return None + + def save_report(self, filename="detection_report.json"): + """Save detection history to file""" + try: + with open(filename, 'w') as f: + json.dump(self.detection_history, f, indent=2, default=str) + print(f"📊 Report saved to {filename}") + except Exception as e: + print(f"❌ Error saving report: {e}") + + def get_memory_usage(self): + """Get current GPU memory usage""" + if torch.cuda.is_available(): + allocated = torch.cuda.memory_allocated() / 1024 ** 3 + cached = torch.cuda.memory_reserved() / 1024 ** 3 + return f"GPU Memory: {allocated:.2f}GB allocated, {cached:.2f}GB cached" + return "CPU mode" + + +def main(): + """Enhanced example usage with knife and fight detection improvements""" + + # Initialize the system + moderator = ContentModerator() + + # Show enhanced system information + print("\n" + "=" * 60) + print("🎯 ENHANCED DUAL MODEL SYSTEM WITH FIGHT DETECTION") + print("=" * 60) + + status = moderator.get_model_status() + + if status['custom_weapon_fight_model']: + print("✅ Custom YOLO11 Model (dao + súng + fight): LOADED") + if 'custom_classes' in status: + print(f"📊 Custom classes: {status['custom_classes']}") + else: + print("❌ Custom weapon+fight model: NOT FOUND") + + if status['general_model']: + print("✅ General YOLO11n Model (person detection): LOADED") + else: + print("❌ General model: FAILED") + + if status['nsfw_classifier']: + print("✅ NSFW Classifier: LOADED") + else: + print("❌ NSFW Classifier: FAILED") + + print(f"🖥️ Device: {status['device']}") + print(f"🗄️ Cache system: ENABLED") + print(f"🔪 Knife enhancement: {'ENABLED' if status['knife_enhancement'] else 'DISABLED'}") + print(f"📈 Knife confidence boost: {'ENABLED' if status['knife_boost'] else 'DISABLED'}") + print(f"👊 Fight detection: {'ENABLED' if status['fight_detection'] else 'DISABLED'}") + print(f"🧠 Fight analysis: {'ENABLED' if status['fight_analysis'] else 'DISABLED'}") + + # Enhanced features info + print("\n" + "=" * 60) + print("✨ ENHANCED DETECTION FEATURES") + print("=" * 60) + print("🔧 Image Enhancement:") + print(" - Contrast & brightness optimization") + print(" - Edge sharpening for metallic objects") + print(" - CLAHE for local contrast") + print("📊 Confidence Boosting:") + print(" - Geometric analysis (knives)") + print(" - Motion blur analysis (fights)") + print(" - Edge strength analysis") + print("🎯 Multi-pass Detection:") + print(" - Low threshold pass for knives (0.45)") + print(" - Normal threshold for guns (0.45)") + print(" - Low threshold for fights (0.40)") + print("👊 Fight Analysis:") + print(" - Multi-person fight detection") + print(" - Aggression level assessment") + print(" - Context-aware threat escalation") + + # Example 1: Process single image + print("\n" + "=" * 50) + print("🖼️ SINGLE IMAGE PROCESSING") + print("=" * 50) + + test_image = "test_image.jpg" + + if os.path.exists(test_image): + result = moderator.process_image(test_image) + if result: + print(f"\n📊 DETECTION RESULTS:") + print(f"Risk Level: {result['risk_level']}") + print(f"Total Threats: {result['total_threats']}") + print(f"Processing Method: {result.get('processing_method', 'standard')}") + + breakdown = result.get('detection_breakdown', {}) + if breakdown: + print(f"\n📈 BREAKDOWN:") + print(f" Weapons: {breakdown.get('weapons', 0)}") + print(f" Fights: {breakdown.get('fights', 0)}") + print(f" NSFW: {breakdown.get('nsfw', 0)}") + + # Show weapon-specific results + weapon_detections = [d for d in result['detections'] if d['type'] == 'weapon'] + if weapon_detections: + print(f"\n🔫 WEAPON DETECTIONS: {len(weapon_detections)}") + for i, detection in enumerate(weapon_detections): + method = detection.get('detection_method', 'unknown') + print(f" Weapon {i + 1} ({method}):") + print(f" Class: {detection['class']}") + print(f" Type: {detection['weapon_type']}") + print(f" Confidence: {detection['confidence']:.3f}") + print(f" Threat Level: {detection['threat_level']}") + + # Show fight-specific results + fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] + if fight_detections: + print(f"\n👊 FIGHT DETECTIONS: {len(fight_detections)}") + for i, detection in enumerate(fight_detections): + method = detection.get('detection_method', 'unknown') + print(f" Fight {i + 1} ({method}):") + print(f" Class: {detection['class']}") + print(f" Type: {detection.get('fight_type', 'unknown')}") + print(f" Confidence: {detection['confidence']:.3f}") + print(f" Threat Level: {detection['threat_level']}") + print(f" Aggression: {detection.get('aggression_level', 'unknown')}") + if 'people_involved' in detection: + print(f" People Involved: {detection['people_involved']}") + if 'context_flags' in detection and detection['context_flags']: + print(f" Context: {', '.join(detection['context_flags'])}") + + # Show NSFW results + nsfw_detections = [d for d in result['detections'] if d['type'] == 'nsfw'] + if nsfw_detections: + print(f"\n🔞 NSFW DETECTIONS: {len(nsfw_detections)}") + for i, detection in enumerate(nsfw_detections): + method = detection.get('method', 'unknown') + print(f" NSFW {i + 1} ({method}):") + print(f" Class: {detection['class']}") + print(f" Confidence: {detection['confidence']:.3f}") + if 'skin_ratio' in detection: + print(f" Skin Ratio: {detection['skin_ratio']:.2f}") + else: + print(f"⚠️ Test image not found: {test_image}") + print("Creating a test pattern to demonstrate detection...") + + # Create a synthetic test image + test_img = np.ones((640, 640, 3), dtype=np.uint8) * 128 + cv2.putText(test_img, "Test Pattern", (200, 320), + cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3) + + result = moderator.process_image(test_img) + print("✅ Test pattern processed successfully") + + # Example 2: Enhanced webcam processing with fight detection + print("\n" + "=" * 60) + print("📹 ENHANCED WEBCAM PROCESSING WITH FIGHT DETECTION") + print("=" * 60) + print("Starting enhanced detection on webcam...") + print("🎮 Controls:") + print(" - Press 'q' to quit") + print(" - Press 's' to save frame") + print(" - Press 'i' to show model info") + print(" - Press 'e' to toggle enhancement") + print(" - Press 'b' to toggle knife confidence boost") + print(" - Press 'f' to toggle fight analysis") + print(" - Press 'h' for help") + + try: + cap = cv2.VideoCapture(0) + + if not cap.isOpened(): + print("❌ Cannot open webcam. Check if camera is connected.") + else: + print("✅ Enhanced webcam processing started") + + frame_count = 0 + detection_stats = { + 'weapons': 0, + 'knives': 0, + 'guns': 0, + 'fights': 0, + 'nsfw': 0, + 'total_frames': 0, + 'fight_incidents': 0 + } + + # Adaptive processing variables + process_interval = 2 # Start with every 2nd frame + last_detection_frame = 0 + consecutive_safe_frames = 0 + + while True: + ret, frame = cap.read() + if not ret: + print("❌ Cannot read from webcam") + break + + frame_count += 1 + detection_stats['total_frames'] = frame_count + frame = cv2.flip(frame, 1) + + # Add status overlay + y_offset = frame.shape[0] - 120 + cv2.putText(frame, + f"Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}", + (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + cv2.putText(frame, + f"Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}", + (10, y_offset + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + cv2.putText(frame, + f"Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}", + (10, y_offset + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + model_info = "Models: Custom+General" if moderator.weapon_model_custom else "General Only" + cv2.putText(frame, model_info, (10, y_offset + 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + # Adaptive frame processing - process more frequently when threats detected + should_process = False + + # Always process if recent threats (within last 30 frames) + if frame_count - last_detection_frame <= 30: + should_process = (frame_count % 1 == 0) # Process every frame + cv2.putText(frame, "HIGH ALERT MODE", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) + # Normal processing with reduced interval + elif frame_count % process_interval == 0: + should_process = True + + if should_process: + result = moderator.process_image(frame) + + if result and result['action_required']: + last_detection_frame = frame_count # Update last detection frame + consecutive_safe_frames = 0 + process_interval = 1 # Process every frame when threats detected + + # Count detections by type + for detection in result['detections']: + if detection['type'] == 'weapon': + detection_stats['weapons'] += 1 + if detection['weapon_type'] == 'blade': + detection_stats['knives'] += 1 + elif detection['weapon_type'] == 'firearm': + detection_stats['guns'] += 1 + elif detection['type'] == 'fight': + detection_stats['fights'] += 1 + if detection.get('aggression_level') in ['high', 'extreme']: + detection_stats['fight_incidents'] += 1 + elif detection['type'] == 'nsfw': + detection_stats['nsfw'] += 1 + + print( + f"⚠️ Frame {frame_count}: {result['risk_level']} risk - {result['total_threats']} threats!") + + # Show specific detections with fight info + for detection in result['detections']: + if detection['type'] == 'weapon': + icon = "🔪" if detection['weapon_type'] == 'blade' else "🔫" + method = detection.get('detection_method', 'unknown').split('_')[-1] + print(f" {icon} {detection['class']} ({detection['confidence']:.3f}) [{method}]") + elif detection['type'] == 'fight': + fight_type = detection.get('fight_type', 'general') + aggression = detection.get('aggression_level', 'unknown') + people = detection.get('people_involved', 0) + method = detection.get('detection_method', 'unknown').split('_')[-1] + print(f" 👊 FIGHT: {fight_type} ({detection['confidence']:.3f}) [{method}]") + print(f" Aggression: {aggression}, People: {people}") + + # Use annotated frame + if 'annotated_image' in result: + cv2.imshow('Enhanced Detection System (Weapons + Fights)', result['annotated_image']) + else: + # Add threat counter + breakdown = result.get('detection_breakdown', {}) + threat_text = f"THREATS: W:{breakdown.get('weapons', 0)} F:{breakdown.get('fights', 0)} N:{breakdown.get('nsfw', 0)}" + cv2.putText(frame, threat_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) + cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) + else: + consecutive_safe_frames += 1 + # Gradually increase processing interval when safe (up to max 3) + if consecutive_safe_frames > 30: + process_interval = min(3, process_interval + 1) + consecutive_safe_frames = 0 + + cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) + cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) + else: + cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) + cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) + + # Handle key presses + key = cv2.waitKey(1) & 0xFF + if key == ord('q'): + print("🛑 Webcam stopped by user") + break + elif key == ord('s'): + filename = f"enhanced_detection_{frame_count}.jpg" + cv2.imwrite(filename, frame) + print(f"💾 Frame saved as {filename}") + elif key == ord('i'): + print(f"\n📊 Model Status:") + current_status = moderator.get_model_status() + for k, v in current_status.items(): + print(f" {k}: {v}") + elif key == ord('e'): + # Toggle enhancement + moderator.config['weapon_detection']['use_enhancement'] = \ + not moderator.config['weapon_detection']['use_enhancement'] + print( + f"🔧 Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}") + elif key == ord('b'): + # Toggle knife boost + moderator.config['weapon_detection']['boost_knife_detection'] = \ + not moderator.config['weapon_detection']['boost_knife_detection'] + print( + f"📈 Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}") + elif key == ord('f'): + # Toggle fight analysis + moderator.config['weapon_detection']['fight_analysis'] = \ + not moderator.config['weapon_detection']['fight_analysis'] + print( + f"👊 Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}") + elif key == ord('h'): + print("\n🎮 Controls:") + print(" 'q': quit") + print(" 's': save frame") + print(" 'i': model info") + print(" 'e': toggle enhancement") + print(" 'b': toggle knife confidence boost") + print(" 'f': toggle fight analysis") + print(" 'h': help") + + # Show comprehensive session statistics + print(f"\n📈 Session Statistics:") + print(f" Total frames: {detection_stats['total_frames']}") + print(f" Total weapon detections: {detection_stats['weapons']}") + print(f" - Knives/Dao: {detection_stats['knives']}") + print(f" - Guns: {detection_stats['guns']}") + print(f" Total fight detections: {detection_stats['fights']}") + print(f" - High-aggression incidents: {detection_stats['fight_incidents']}") + print(f" NSFW detections: {detection_stats['nsfw']}") + + if detection_stats['total_frames'] > 0: + total_detections = detection_stats['weapons'] + detection_stats['fights'] + detection_stats['nsfw'] + detection_rate = (total_detections / detection_stats['total_frames'] * 100) + print(f" Overall detection rate: {detection_rate:.1f}%") + + if detection_stats['weapons'] > 0: + knife_ratio = detection_stats['knives'] / detection_stats['weapons'] * 100 + print(f" Knife detection ratio: {knife_ratio:.1f}% of weapons") + + if detection_stats['fights'] > 0: + incident_ratio = detection_stats['fight_incidents'] / detection_stats['fights'] * 100 + print(f" High-aggression fight ratio: {incident_ratio:.1f}% of fights") + + cap.release() + cv2.destroyAllWindows() + print("✅ Enhanced webcam session completed") + + except Exception as e: + print(f"❌ Webcam error: {e}") + + # Show final system status + print(f"\n💾 {moderator.get_memory_usage()}") + print(f"🗄️ Final cache size: {len(moderator.detection_cache)} entries") + + # Save enhanced report + moderator.save_report("enhanced_detection_with_fights_report.json") + + print("\n✅ Enhanced Content Moderation System with Fight Detection completed!") + print("💡 New fight detection capabilities:") + print(" - Behavioral fight pattern recognition") + print(" - Multi-person fight analysis") + print(" - Aggression level assessment") + print(" - Context-aware threat escalation") + print(" - Fight timeline tracking for videos") + print("💡 Enhanced weapon detection:") + print(" - Image enhancement preprocessing") + print(" - Dynamic confidence thresholds") + print(" - Geometric feature analysis") + print(" - Multi-pass detection strategy") + + +if __name__ == "__main__": main() \ No newline at end of file