import cv2 import numpy as np import torch import os import json import warnings warnings.filterwarnings('ignore') # Import required libraries try: from ultralytics import YOLO from transformers import pipeline from PIL import Image, ImageDraw, ImageFont import requests from datetime import datetime # MediaPipe import with fallback try: import mediapipe as mp MEDIAPIPE_AVAILABLE = True print("✅ MediaPipe imported successfully") except ImportError: MEDIAPIPE_AVAILABLE = False print("⚠️ MediaPipe not available - pose detection disabled") except Exception as e: MEDIAPIPE_AVAILABLE = False print(f"⚠️ MediaPipe import error: {e} - pose detection disabled") except ImportError as e: print(f"Missing dependency: {e}") print("Please install: pip install ultralytics transformers pillow requests") print("For MediaPipe: pip install mediapipe==0.10.18") class ContentModerator: def __init__(self, config=None): default_cfg = self.get_default_config() self.config = self.deep_merge_dicts(default_cfg, config) if config else default_cfg self.device = 'cuda' if torch.cuda.is_available() else 'cpu' # CPU optimizations if self.device == 'cpu': print("💻 CPU mode detected - applying optimizations...") torch.set_num_threads(4) self.config['performance']['half_precision'] = False self.config['nsfw_detection']['pose_analysis'] = False # Initialize models self.weapon_model = None # Primary weapon model self.weapon_model_custom = None # Custom model for dao + súng + fight self.weapon_model_general = None # General model for person + backup weapons self.nsfw_classifier = None self.pose_detector = None # Performance optimization self.detection_cache = {} self.cache_ttl = 2 # Cache for 2 seconds # Results storage self.detection_history = [] print(f"🚀 Content Moderator initialized on {self.device}") if self.device == 'cpu': print("⚡ CPU optimizations enabled") self.setup_models() def deep_merge_dicts(self, a: dict, b: dict) -> dict: """Merge dict b into a (non-destructive). Returns merged copy. - Keeps keys from a when missing in b. - Recursively merges nested dicts. """ if not isinstance(a, dict): return b if isinstance(b, dict) else a result = dict(a) # shallow copy if not b: return result for k, v in b.items(): if k in result and isinstance(result[k], dict) and isinstance(v, dict): result[k] = self.deep_merge_dicts(result[k], v) else: result[k] = v return result def get_default_config(self): """Default configuration optimized for CPU/GPU with enhanced knife and fight detection""" # Auto-detect optimal settings is_cuda = torch.cuda.is_available() return { 'weapon_detection': { 'enabled': True, 'confidence_threshold': 0.45, # For guns 'knife_confidence': 0.45, # Lower threshold for knives 'fight_confidence': 0.40, # Lower threshold for fights (behavioral) 'model_size': 'yolo11n', 'classes': ['knife', 'gun', 'rifle', 'pistol', 'weapon', 'fight'], 'use_enhancement': True, # Enable image enhancement for knives 'multi_pass': True, # Enable multi-pass detection 'boost_knife_detection': True, # Enable knife confidence boosting 'fight_detection': True, # Enable fight-specific detection 'fight_analysis': True # Enable advanced fight behavior analysis }, 'fight_detection': { 'enabled': True, 'confidence_threshold': 0.40, 'pose_analysis': True, # Analyze poses for fighting 'motion_analysis': False, # Motion-based fight detection (for video) 'aggression_keywords': ['fight', 'violence', 'aggression', 'combat'], 'threat_escalation': True, # Escalate threat level for fights 'multi_person_analysis': True # Analyze interactions between people }, 'nsfw_detection': { 'enabled': True, 'confidence_threshold': 0.7, 'skin_detection': True, 'pose_analysis': False, 'region_analysis': True }, 'performance': { 'image_size': 416 if is_cuda else 320, 'batch_size': 1, 'half_precision': is_cuda, 'use_flash_attention': False, 'cpu_optimization': not is_cuda }, 'output': { 'save_detections': True, 'draw_boxes': True, 'log_results': True } } def setup_models(self): """Initialize all detection models""" try: # Clear GPU cache if torch.cuda.is_available(): torch.cuda.empty_cache() # 1. Setup Weapon Detection (now includes fight detection) if self.config['weapon_detection']['enabled']: self.setup_weapon_detector() # 2. Setup NSFW Detection if self.config['nsfw_detection']['enabled']: self.setup_nsfw_detector() print("✅ All models loaded successfully!") except Exception as e: print(f"❌ Error setting up models: {e}") def setup_weapon_detector(self): """Setup dual model system: Custom for weapons + fights + General for person detection""" try: print("🔫 Loading weapon and fight detection models...") # Model 1: Custom YOLO11 for weapons (dao + súng + fight) custom_model_path = "models/best.pt" project_root = os.path.dirname(os.path.abspath(__file__)) full_model_path = os.path.join(project_root, custom_model_path) if os.path.exists(full_model_path): print(f"✅ Loading custom weapon+fight model: {full_model_path}") self.weapon_model_custom = YOLO(full_model_path) print("🎯 Custom weapon+fight model (dao + súng + fight) loaded!") # Show custom model classes if hasattr(self.weapon_model_custom, 'names'): classes = list(self.weapon_model_custom.names.values()) print(f"📊 Custom classes: {classes}") # Check if fight class is available if any('fight' in str(cls).lower() for cls in classes): print("👊 Fight detection enabled in custom model") else: print("⚠️ Fight class not found in custom model") else: print("⚠️ Custom weapon+fight model not found") self.weapon_model_custom = None # Model 2: General YOLO11n for person detection and fight fallback print("👤 Loading general model for person detection...") self.weapon_model_general = YOLO('yolo11n.pt') print("✅ General YOLO11n loaded for person detection") # Set primary weapon model self.weapon_model = self.weapon_model_custom if self.weapon_model_custom else self.weapon_model_general # Optimize models for performance if self.device == 'cuda' and self.config['performance']['half_precision']: try: if self.weapon_model_custom: self.weapon_model_custom.model.half() self.weapon_model_general.model.half() print("✅ Half precision enabled for both models") except: print("⚠️ Half precision not supported") print("🔥 Dual model system ready with fight detection!") except Exception as e: print(f"❌ Error loading weapon+fight models: {e}") self.weapon_model = None self.weapon_model_custom = None self.weapon_model_general = None def detect_weapons(self, image): """Enhanced dual model weapon and fight detection""" detections = [] try: imgsz = self.config['performance']['image_size'] use_half = self.config['performance']['half_precision'] and self.device == 'cuda' # Prepare multiple versions of the image images_to_process = [(image, 1.0, "original")] if self.config['weapon_detection']['use_enhancement']: enhanced_image = self.enhance_knife_detection(image) images_to_process.append((enhanced_image, 1.15, "enhanced")) # Process each image version for img, weight_multiplier, img_type in images_to_process: if self.weapon_model_custom: # Use different confidence thresholds for different detection types knife_conf = self.config['weapon_detection']['knife_confidence'] gun_conf = self.config['weapon_detection']['confidence_threshold'] fight_conf = self.config['weapon_detection']['fight_confidence'] # Multi-pass detection with different thresholds passes = [ (knife_conf, "knife_pass"), # Low threshold for knives (gun_conf, "gun_pass"), # Normal threshold for guns (fight_conf, "fight_pass") # Low threshold for fights ] if self.config['weapon_detection']['multi_pass'] else [ (min(knife_conf, fight_conf), "single_pass")] for conf_threshold, pass_type in passes: try: results = self.weapon_model_custom( img, imgsz=imgsz, conf=conf_threshold, device=self.device, half=use_half, verbose=False, augment=True # Enable test-time augmentation ) for result in results: boxes = result.boxes if boxes is not None: for box in boxes: class_id = int(box.cls[0]) if hasattr(result, 'names') and class_id in result.names: class_name = result.names[class_id].lower() else: class_name = f"detection_{class_id}" x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = float(box.conf[0]) * weight_multiplier # Determine detection type and apply appropriate processing if self.is_fight_detection(class_name): # Fight detection processing confidence = self.boost_fight_confidence( img, [x1, y1, x2, y2], confidence, class_name ) detection_type = 'fight' min_conf = fight_conf threat_level = self.assess_fight_threat(confidence, img, [x1, y1, x2, y2]) else: # Weapon detection processing if self.config['weapon_detection']['boost_knife_detection']: if 'dao' in class_name or 'knife' in class_name or 'blade' in class_name: confidence = self.boost_knife_confidence( img, [x1, y1, x2, y2], confidence, class_name ) detection_type = 'weapon' weapon_type = self.classify_weapon_type(class_name) min_conf = knife_conf if weapon_type == 'blade' else gun_conf threat_level = self.assess_weapon_threat(weapon_type, confidence) if confidence >= min_conf: detection_data = { 'type': detection_type, 'class': class_name, 'confidence': min(confidence, 0.99), 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'threat_level': threat_level, 'detection_method': f'custom_model_{img_type}_{pass_type}' } # Add type-specific fields if detection_type == 'weapon': detection_data['weapon_type'] = weapon_type elif detection_type == 'fight': detection_data['fight_type'] = self.classify_fight_type(class_name) detection_data['aggression_level'] = self.assess_aggression_level( confidence) detections.append(detection_data) icon = "👊" if detection_type == 'fight' else "🎯" print( f" {icon} Detected: {class_name} (conf: {confidence:.3f}, method: {img_type}_{pass_type})") except Exception as e: print(f"⚠️ Detection pass error ({pass_type}): {e}") # Fallback: General model for backup detection (only if no custom detections) if self.weapon_model_general and len(detections) == 0: detections.extend(self.fallback_detection(image, imgsz, use_half)) # Remove duplicate detections detections = self.remove_duplicate_detections(detections) # Additional fight analysis if enabled # Additional fight analysis if enabled (safe access) try: # weapon_detection may contain a simple boolean or a dict for fight_detection wd = self.config.get('weapon_detection', {}) wd_fd = wd.get('fight_detection', None) # Determine whether fight analysis is enabled if isinstance(wd_fd, dict): fight_enabled = wd_fd.get('enabled', False) fight_multi_person = wd_fd.get('multi_person_analysis', False) else: # wd_fd may be boolean (legacy); consult top-level fight_detection dict for details fight_enabled = bool(wd_fd) fight_multi_person = bool( self.config.get('fight_detection', {}).get('multi_person_analysis', False)) if fight_enabled and fight_multi_person: fight_detections = [d for d in detections if d.get('type') == 'fight'] if fight_detections: try: enhanced_fights = self.analyze_fight_context(image, fight_detections) # Replace original fight detections with enhanced ones detections = [d for d in detections if d.get('type') != 'fight'] + enhanced_fights except Exception as e: print(f"⚠️ Fight context enhancement error: {e}") except Exception as e: # Defensive: never allow missing config to break detection pipeline import traceback traceback.print_exc() print(f"⚠️ Skipping fight context analysis due to config error: {e}") return detections except Exception as e: print(f"❌ Weapon and fight detection error: {e}") return [] def is_fight_detection(self, class_name): """Check if detection is fight-related""" fight_keywords = ['fight', 'fighting', 'combat', 'violence', 'aggression', 'brawl', 'scuffle'] return any(keyword in class_name.lower() for keyword in fight_keywords) def classify_fight_type(self, class_name): """Classify type of fight detected""" class_name = class_name.lower() if any(word in class_name for word in ['punch', 'boxing', 'fist']): return 'physical_combat' elif any(word in class_name for word in ['kick', 'martial', 'karate']): return 'martial_arts' elif any(word in class_name for word in ['wrestle', 'grapple']): return 'wrestling' elif any(word in class_name for word in ['group', 'mob', 'crowd']): return 'group_violence' else: return 'general_fight' def boost_fight_confidence(self, image, bbox, initial_confidence, class_name): """Boost confidence for fight detection based on contextual analysis""" try: x1, y1, x2, y2 = [int(coord) for coord in bbox] # Ensure bbox is within image bounds x1 = max(0, x1) y1 = max(0, y1) x2 = min(image.shape[1], x2) y2 = min(image.shape[0], y2) roi = image[y1:y2, x1:x2] if roi.size == 0: return initial_confidence boost = 0 # 1. Motion blur analysis (indicates rapid movement) gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) blur_variance = cv2.Laplacian(gray, cv2.CV_64F).var() if blur_variance < 100: # Low variance indicates blur/motion boost += 0.10 # 2. Edge density (chaotic scenes have more edges) edges = cv2.Canny(gray, 50, 150) edge_density = np.count_nonzero(edges) / edges.size if edge_density > 0.15: boost += 0.08 # 3. Color analysis (fights often have varied, chaotic colors) hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) color_variance = np.var(hsv[:, :, 1]) # Saturation variance if color_variance > 1000: boost += 0.05 # 4. Texture analysis (complex textures indicate multiple overlapping objects) gray_f = np.float32(gray) texture_response = cv2.cornerHarris(gray_f, 2, 3, 0.04) texture_strength = np.mean(texture_response) if texture_strength > 0.01: boost += 0.07 # 5. Aspect ratio analysis (fights often have irregular bounding boxes) height = y2 - y1 width = x2 - x1 if height > 0 and width > 0: aspect_ratio = max(width, height) / min(width, height) if 1.2 < aspect_ratio < 3.0: # Moderate irregularity boost += 0.05 final_confidence = min(initial_confidence + boost, 0.95) if boost > 0: print(f" 👊 Fight boost applied: +{boost:.2f} (blur:{blur_variance:.0f}, edge:{edge_density:.2f})") return final_confidence except Exception as e: print(f"⚠️ Fight confidence boost error: {e}") return initial_confidence def assess_fight_threat(self, confidence, image, bbox): """Assess threat level of detected fight""" base_threat = 'medium' # Fights start at medium threat # Escalate based on confidence if confidence >= 0.85: base_threat = 'critical' elif confidence >= 0.70: base_threat = 'high' elif confidence >= 0.50: base_threat = 'medium' else: base_threat = 'low' # Additional context-based escalation try: x1, y1, x2, y2 = bbox fight_area = (x2 - x1) * (y2 - y1) image_area = image.shape[0] * image.shape[1] area_ratio = fight_area / image_area # Large fights are more dangerous if area_ratio > 0.5: # Fight covers >50% of image if base_threat == 'medium': base_threat = 'high' elif base_threat == 'high': base_threat = 'critical' except Exception as e: print(f"⚠️ Fight threat assessment error: {e}") return base_threat def assess_aggression_level(self, confidence): """Assess aggression level based on confidence""" if confidence >= 0.80: return 'extreme' elif confidence >= 0.65: return 'high' elif confidence >= 0.45: return 'moderate' else: return 'low' def analyze_fight_context(self, image, fight_detections): """Enhanced analysis of fight context with multi-person detection""" enhanced_fights = [] try: # Detect all persons in the image persons = self.detect_persons(image) for fight in fight_detections: enhanced_fight = fight.copy() # Count people involved in or near the fight fight_bbox = fight['bbox'] people_in_fight = 0 people_nearby = 0 for person in persons: person_bbox = person['bbox'] # Calculate overlap with fight area overlap = self.calculate_bbox_overlap(fight_bbox, person_bbox) if overlap > 0.3: # Person is directly involved people_in_fight += 1 elif overlap > 0.1: # Person is nearby people_nearby += 1 # Update fight information based on context enhanced_fight['people_involved'] = people_in_fight enhanced_fight['people_nearby'] = people_nearby enhanced_fight['total_people'] = people_in_fight + people_nearby # Escalate threat based on number of people if people_in_fight >= 3: if enhanced_fight['threat_level'] == 'medium': enhanced_fight['threat_level'] = 'high' elif enhanced_fight['threat_level'] == 'high': enhanced_fight['threat_level'] = 'critical' enhanced_fight['fight_type'] = 'group_violence' # Add context flags enhanced_fight['context_flags'] = [] if people_in_fight >= 3: enhanced_fight['context_flags'].append('multi_person_fight') if people_nearby >= 2: enhanced_fight['context_flags'].append('crowd_present') enhanced_fights.append(enhanced_fight) print(f" 👥 Fight context: {people_in_fight} involved, {people_nearby} nearby") except Exception as e: print(f"⚠️ Fight context analysis error: {e}") return fight_detections return enhanced_fights def calculate_bbox_overlap(self, bbox1, bbox2): """Calculate overlap ratio between two bounding boxes""" x1_min, y1_min, x1_max, y1_max = bbox1 x2_min, y2_min, x2_max, y2_max = bbox2 # Calculate intersection intersect_xmin = max(x1_min, x2_min) intersect_ymin = max(y1_min, y2_min) intersect_xmax = min(x1_max, x2_max) intersect_ymax = min(y1_max, y2_max) if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: return 0.0 intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) bbox1_area = (x1_max - x1_min) * (y1_max - y1_min) return intersect_area / bbox1_area if bbox1_area > 0 else 0 def fallback_detection(self, image, imgsz, use_half): """Fallback detection using general model""" detections = [] try: general_results = self.weapon_model_general( image, imgsz=imgsz, conf=0.4, device=self.device, half=use_half, verbose=False ) for result in general_results: boxes = result.boxes if boxes is not None: for box in boxes: class_id = int(box.cls[0]) class_name = result.names[class_id].lower() # Filter for weapon-like objects weapon_keywords = ['knife', 'scissors', 'fork'] if any(keyword in class_name for keyword in weapon_keywords): x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = float(box.conf[0]) detections.append({ 'type': 'weapon', 'class': class_name, 'weapon_type': 'blade', 'confidence': confidence, 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'threat_level': self.assess_weapon_threat('blade', confidence), 'detection_method': 'general_model_fallback' }) except Exception as e: print(f"⚠️ General detection error: {e}") return detections # ... (rest of the existing methods remain the same) ... def enhance_knife_detection(self, image): """Enhance image specifically for better knife/dao detection""" try: # 1. Increase contrast and brightness for metallic objects enhanced = cv2.convertScaleAbs(image, alpha=1.4, beta=25) # 2. Apply sharpening kernel to highlight edges kernel_sharpen = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) sharpened = cv2.filter2D(enhanced, -1, kernel_sharpen) # 3. Apply CLAHE for better local contrast lab = cv2.cvtColor(sharpened, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) l = clahe.apply(l) enhanced_final = cv2.merge([l, a, b]) enhanced_final = cv2.cvtColor(enhanced_final, cv2.COLOR_LAB2BGR) return enhanced_final except Exception as e: print(f"⚠️ Enhancement failed: {e}") return image def boost_knife_confidence(self, image, bbox, initial_confidence, class_name): """Boost confidence for knife/dao based on geometric and visual features""" try: x1, y1, x2, y2 = [int(coord) for coord in bbox] # Ensure bbox is within image bounds x1 = max(0, x1) y1 = max(0, y1) x2 = min(image.shape[1], x2) y2 = min(image.shape[0], y2) roi = image[y1:y2, x1:x2] if roi.size == 0: return initial_confidence boost = 0 # 1. Check aspect ratio (knives are typically elongated) height = y2 - y1 width = x2 - x1 if height > 0 and width > 0: aspect_ratio = max(width, height) / min(width, height) if aspect_ratio > 2.5: # Elongated shape boost += 0.15 elif aspect_ratio > 2.0: boost += 0.10 # 2. Check for metallic reflection (brightness) gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) mean_brightness = np.mean(gray) std_brightness = np.std(gray) if mean_brightness > 140: # Bright (metallic) boost += 0.10 if std_brightness > 50: # High contrast (blade edge) boost += 0.05 # 3. Edge detection (knives have strong edges) edges = cv2.Canny(gray, 50, 150) edge_ratio = np.count_nonzero(edges) / edges.size if edge_ratio > 0.15: # Strong edges boost += 0.10 elif edge_ratio > 0.10: boost += 0.05 # 4. Check for blade-like gradient if height > width: # Vertical orientation gradient = np.gradient(gray, axis=0) else: # Horizontal orientation gradient = np.gradient(gray, axis=1) gradient_strength = np.mean(np.abs(gradient)) if gradient_strength > 10: boost += 0.05 # Apply boost with class-specific multiplier if 'dao' in class_name.lower() or 'knife' in class_name.lower(): boost *= 1.2 # Extra boost for knife/dao classes final_confidence = min(initial_confidence + boost, 0.95) if boost > 0: print( f" 🔪 Knife boost applied: +{boost:.2f} (AR:{aspect_ratio:.1f}, Bright:{mean_brightness:.0f}, Edge:{edge_ratio:.2f})") return final_confidence except Exception as e: print(f"⚠️ Confidence boost error: {e}") return initial_confidence def detect_persons(self, image): """Detect persons using general model (needed for NSFW and fight analysis)""" persons = [] if not self.weapon_model_general: return persons try: imgsz = self.config['performance']['image_size'] use_half = self.config['performance']['half_precision'] and self.device == 'cuda' results = self.weapon_model_general( image, imgsz=imgsz, conf=0.3, device=self.device, half=use_half, verbose=False ) for result in results: boxes = result.boxes if boxes is not None: for box in boxes: class_id = int(box.cls[0]) class_name = result.names[class_id].lower() if class_name == 'person': x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = float(box.conf[0]) persons.append({ 'class': 'person', 'confidence': confidence, 'bbox': [int(x1), int(y1), int(x2), int(y2)] }) return persons except Exception as e: print(f"❌ Person detection error: {e}") return [] def classify_weapon_type(self, class_name): """Classify weapon type from class name""" class_name = class_name.lower() # Knife/Blade keywords (expanded) knife_keywords = ['knife', 'dao', 'blade', 'dagger', 'sword', 'machete', 'katana', 'cutter'] if any(keyword in class_name for keyword in knife_keywords): return 'blade' # Gun/Firearm keywords gun_keywords = ['gun', 'pistol', 'rifle', 'firearm', 'revolver', 'shotgun', 'súng'] if any(keyword in class_name for keyword in gun_keywords): return 'firearm' # Other weapons other_keywords = ['axe', 'hammer', 'club', 'bat'] if any(keyword in class_name for keyword in other_keywords): return 'blunt_weapon' # Check for numbered weapon classes if 'weapon' in class_name: try: weapon_id = int(class_name.split('_')[-1]) if weapon_id in [0, 1]: # Assuming 0,1 are firearms return 'firearm' elif weapon_id in [2, 3]: # Assuming 2,3 are blades return 'blade' else: return 'unknown_weapon' except: pass return 'unknown_weapon' def assess_weapon_threat(self, weapon_type, confidence): """Assess threat level of detected weapon""" threat_levels = { 'firearm': 'critical', 'blade': 'high', 'blunt_weapon': 'medium', 'unknown_weapon': 'medium' } base_threat = threat_levels.get(weapon_type, 'medium') # Adjust based on confidence if confidence >= 0.9: if base_threat == 'medium': return 'high' elif base_threat == 'high': return 'critical' else: return base_threat elif confidence >= 0.7: return base_threat elif confidence >= 0.5: if base_threat == 'critical': return 'high' elif base_threat == 'high': return 'medium' else: return base_threat else: if base_threat == 'critical': return 'medium' elif base_threat == 'high': return 'low' else: return 'low' def remove_duplicate_detections(self, detections, iou_threshold=0.4): """Remove duplicate detections using Non-Maximum Suppression""" if len(detections) <= 1: return detections # Sort by confidence (highest first) detections = sorted(detections, key=lambda x: x['confidence'], reverse=True) keep = [] for i, det1 in enumerate(detections): should_keep = True for det2 in keep: # Check if same type and overlapping if det1['type'] == det2['type']: iou = self.calculate_iou(det1['bbox'], det2['bbox']) if iou > iou_threshold: should_keep = False break if should_keep: keep.append(det1) return keep def calculate_iou(self, box1, box2): """Calculate Intersection over Union between two bounding boxes""" x1_min, y1_min, x1_max, y1_max = box1 x2_min, y2_min, x2_max, y2_max = box2 # Calculate intersection intersect_xmin = max(x1_min, x2_min) intersect_ymin = max(y1_min, y2_min) intersect_xmax = min(x1_max, x2_max) intersect_ymax = min(y1_max, y2_max) if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: return 0.0 intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) # Calculate union box1_area = (x1_max - x1_min) * (y1_max - y1_min) box2_area = (x2_max - x2_min) * (y2_max - y2_min) union_area = box1_area + box2_area - intersect_area return intersect_area / union_area if union_area > 0 else 0 # ... (continue with remaining NSFW detection methods) ... def detect_nsfw_content(self, image): """Enhanced NSFW detection with person detection first""" detections = [] try: if len(image.shape) == 3 and image.shape[2] == 3: rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) else: rgb_image = image # Stage 1: Detect persons first (optimization) persons = self.detect_persons(image) if not persons: # No persons detected, skip detailed NSFW analysis return detections print(f"👤 Found {len(persons)} person(s), analyzing for NSFW content...") # Stage 2: Overall NSFW Classification if self.nsfw_classifier: try: pil_image = Image.fromarray(rgb_image) nsfw_result = self.nsfw_classifier(pil_image) if nsfw_result[0]['label'] == 'nsfw': confidence = nsfw_result[0]['score'] if confidence > self.config['nsfw_detection']['confidence_threshold']: detections.append({ 'type': 'nsfw', 'class': 'inappropriate_content', 'confidence': confidence, 'bbox': [0, 0, image.shape[1], image.shape[0]], 'method': 'classification' }) except Exception as e: print(f"⚠️ NSFW classifier error: {e}") # Stage 3: Person-specific skin analysis if self.config['nsfw_detection']['skin_detection']: for person in persons: person_detections = self.analyze_person_skin(image, person) detections.extend(person_detections) # Stage 4: Regional skin analysis (if no person-specific detections) if self.config['nsfw_detection']['region_analysis'] and len(detections) == 0: skin_detections = self.detect_skin_regions(image) detections.extend(skin_detections) return detections except Exception as e: print(f"❌ NSFW detection error: {e}") return [] def analyze_person_skin(self, image, person): """Analyze skin exposure for a specific person""" detections = [] try: x1, y1, x2, y2 = person['bbox'] person_region = image[y1:y2, x1:x2] if person_region.size == 0: return detections # Convert to HSV for skin detection hsv_person = cv2.cvtColor(person_region, cv2.COLOR_BGR2HSV) # Skin color range lower_skin = np.array([0, 20, 70], dtype=np.uint8) upper_skin = np.array([20, 255, 255], dtype=np.uint8) # Create skin mask skin_mask = cv2.inRange(hsv_person, lower_skin, upper_skin) # Calculate skin percentage total_person_pixels = person_region.shape[0] * person_region.shape[1] skin_pixels = cv2.countNonZero(skin_mask) skin_ratio = skin_pixels / total_person_pixels if total_person_pixels > 0 else 0 # Threshold for suspicious skin exposure if skin_ratio > 0.4: # 40% of person region is skin confidence = min(skin_ratio * 2, 1.0) detections.append({ 'type': 'nsfw', 'class': 'excessive_skin_exposure', 'confidence': confidence, 'bbox': [x1, y1, x2, y2], 'method': 'person_skin_analysis', 'skin_ratio': skin_ratio }) print(f"🚨 Excessive skin exposure detected: {skin_ratio:.2f} ratio") return detections except Exception as e: print(f"❌ Person skin analysis error: {e}") return [] def detect_skin_regions(self, image): """Detect large skin-colored regions""" try: hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # Define skin color range lower_skin = np.array([0, 20, 70], dtype=np.uint8) upper_skin = np.array([20, 255, 255], dtype=np.uint8) # Create skin mask skin_mask = cv2.inRange(hsv, lower_skin, upper_skin) # Apply morphological operations kernel = np.ones((3, 3), np.uint8) skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel) skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel) # Find contours contours, _ = cv2.findContours(skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detections = [] image_area = image.shape[0] * image.shape[1] for contour in contours: area = cv2.contourArea(contour) # If skin region is too large if area > image_area * 0.3: x, y, w, h = cv2.boundingRect(contour) confidence = min(area / image_area, 1.0) detections.append({ 'type': 'nsfw', 'class': 'large_skin_region', 'confidence': confidence, 'bbox': [x, y, x + w, y + h], 'method': 'skin_detection' }) return detections except Exception as e: print(f"❌ Skin detection error: {e}") return [] def setup_nsfw_detector(self): """Setup NSFW detection components (Optimized for CPU)""" try: print("🔞 Loading NSFW detection components...") # 1. NSFW Classifier (Optimized for CPU) try: device_id = 0 if self.device == 'cuda' else -1 self.nsfw_classifier = pipeline( "image-classification", model="Falconsai/nsfw_image_detection", device=device_id, use_fast=True ) print("✅ NSFW classifier loaded") except Exception as nsfw_error: print(f"⚠️ NSFW classifier failed: {nsfw_error}") print(" Trying backup method...") try: # Fallback without specifying use_fast self.nsfw_classifier = pipeline( "image-classification", model="Falconsai/nsfw_image_detection", device=device_id ) print("✅ NSFW classifier loaded (fallback)") except: print("❌ NSFW classifier completely failed") self.nsfw_classifier = None # 2. Pose Detection (Fixed import with fallbacks) if self.config['nsfw_detection']['pose_analysis'] and MEDIAPIPE_AVAILABLE: try: import mediapipe as mp try: mp_pose = mp.solutions.pose self.pose_detector = mp_pose.Pose( static_image_mode=True, model_complexity=0, min_detection_confidence=0.5 ) print("✅ Pose detector loaded (legacy API)") except AttributeError: print("⚠️ MediaPipe API not available") self.pose_detector = None self.config['nsfw_detection']['pose_analysis'] = False except Exception as pose_error: print(f"⚠️ Pose detection failed: {pose_error}") self.pose_detector = None self.config['nsfw_detection']['pose_analysis'] = False else: self.pose_detector = None if not MEDIAPIPE_AVAILABLE: print("⚠️ MediaPipe not available - pose analysis disabled") except Exception as e: print(f"❌ Error loading NSFW components: {e}") print("💡 Falling back to skin detection only") def process_image(self, image_path): """Process single image with enhanced detection including fights""" try: # Load image if isinstance(image_path, str): image = cv2.imread(image_path) if image is None: raise ValueError(f"Could not load image: {image_path}") cache_key = f"file_{image_path}" else: image = image_path cache_key = f"array_{hash(image.tobytes())}" # Check cache import time current_time = time.time() if cache_key in self.detection_cache: cached_result, timestamp = self.detection_cache[cache_key] if current_time - timestamp < self.cache_ttl: return cached_result print(f"📸 Processing image: {image.shape}") # Run detections all_detections = [] # Weapon and fight detection if self.config['weapon_detection']['enabled']: weapon_fight_detections = self.detect_weapons(image) all_detections.extend(weapon_fight_detections) weapon_detections = [d for d in weapon_fight_detections if d['type'] == 'weapon'] fight_detections = [d for d in weapon_fight_detections if d['type'] == 'fight'] print(f"🔫 Found {len(weapon_detections)} weapon(s)") print(f"👊 Found {len(fight_detections)} fight(s)") # Show detailed breakdown if weapon_detections: knife_detections = [d for d in weapon_detections if d['weapon_type'] == 'blade'] if knife_detections: print(f" 🔪 Including {len(knife_detections)} knife/dao detection(s)") if fight_detections: for fight in fight_detections: fight_type = fight.get('fight_type', 'unknown') aggression = fight.get('aggression_level', 'unknown') print(f" 👊 Fight: {fight_type} (aggression: {aggression})") # NSFW detection if self.config['nsfw_detection']['enabled']: nsfw_detections = self.detect_nsfw_content(image) all_detections.extend(nsfw_detections) print(f"🔞 Found {len(nsfw_detections)} NSFW detection(s)") # Generate result result = { 'timestamp': datetime.now().isoformat(), 'image_path': image_path if isinstance(image_path, str) else 'array', 'detections': all_detections, 'total_threats': len(all_detections), 'risk_level': self.calculate_risk_level(all_detections), 'action_required': len(all_detections) > 0, 'processing_method': 'enhanced_dual_model_with_fight', 'detection_breakdown': { 'weapons': len([d for d in all_detections if d['type'] == 'weapon']), 'fights': len([d for d in all_detections if d['type'] == 'fight']), 'nsfw': len([d for d in all_detections if d['type'] == 'nsfw']) } } # Cache result self.detection_cache[cache_key] = (result, current_time) # Clean old cache entries self.clean_cache(current_time) # Save detection history self.detection_history.append(result) # Draw detections if self.config['output']['draw_boxes'] and all_detections: annotated_image = self.draw_detections(image.copy(), all_detections) result['annotated_image'] = annotated_image return result except Exception as e: print(f"❌ Error processing image: {e}") return None def clean_cache(self, current_time): """Clean expired cache entries""" try: expired_keys = [] for key, (_, timestamp) in self.detection_cache.items(): if current_time - timestamp > self.cache_ttl: expired_keys.append(key) for key in expired_keys: del self.detection_cache[key] except Exception as e: print(f"⚠️ Cache cleanup error: {e}") def get_model_status(self): """Get status of all models (safe access to config keys).""" # weapon_detection subtree wd = self.config.get('weapon_detection', {}) # fight_detection may be a bool in weapon_detection (legacy) or a dict (detailed). wd_fd = wd.get('fight_detection', None) if isinstance(wd_fd, dict): fight_enabled = wd_fd.get('enabled', True) else: fight_enabled = bool(wd_fd) # fight analysis flag (either in weapon_detection or top-level fight_detection) fight_analysis_flag = wd.get('fight_analysis', False) or \ bool(self.config.get('fight_detection', {}).get('multi_person_analysis', False)) status = { 'fight_detection': fight_enabled, 'custom_weapon_fight_model': self.weapon_model_custom is not None, 'general_model': self.weapon_model_general is not None, 'nsfw_classifier': self.nsfw_classifier is not None, 'pose_detector': self.pose_detector is not None, 'device': self.device, 'cache_size': len(self.detection_cache), 'knife_enhancement': wd.get('use_enhancement', False), 'knife_boost': wd.get('boost_knife_detection', False), 'fight_analysis': fight_analysis_flag } if self.weapon_model_custom and hasattr(self.weapon_model_custom, 'names'): status['custom_classes'] = list(self.weapon_model_custom.names.values()) return status def calculate_risk_level(self, detections): """Calculate overall risk level including fights""" if not detections: return 'safe' max_confidence = max(det['confidence'] for det in detections) threat_types = set(det['type'] for det in detections) # Check for critical combinations has_weapons = 'weapon' in threat_types has_fights = 'fight' in threat_types has_nsfw = 'nsfw' in threat_types # Fights + weapons = critical if has_weapons and has_fights: return 'critical' # High confidence fights are critical fight_detections = [d for d in detections if d['type'] == 'fight'] if fight_detections: max_fight_confidence = max(f['confidence'] for f in fight_detections) if max_fight_confidence > 0.8: return 'critical' elif max_fight_confidence > 0.65: return 'high' # Existing weapon logic if has_weapons and max_confidence > 0.8: return 'critical' elif has_weapons or has_fights or max_confidence > 0.9: return 'high' elif max_confidence > 0.7: return 'medium' else: return 'low' def draw_detections(self, image, detections): """Draw detection boxes and labels with enhanced visualization for fights""" try: colors = { 'weapon': (0, 0, 255), # Red 'fight': (0, 165, 255), # Orange for fights 'nsfw': (255, 0, 255), # Magenta } # Special colors for weapon types weapon_colors = { 'blade': (0, 100, 255), # Orange-red for knives 'firearm': (0, 0, 255), # Red for guns 'blunt_weapon': (100, 0, 255) # Purple for blunt weapons } # Special colors for fight types fight_colors = { 'physical_combat': (0, 140, 255), # Orange 'martial_arts': (0, 200, 255), # Light orange 'wrestling': (0, 165, 255), # Medium orange 'group_violence': (0, 69, 255), # Dark orange 'general_fight': (0, 165, 255) # Default orange } for det in detections: x1, y1, x2, y2 = det['bbox'] # Choose color based on type if det['type'] == 'weapon' and 'weapon_type' in det: color = weapon_colors.get(det['weapon_type'], colors['weapon']) elif det['type'] == 'fight' and 'fight_type' in det: color = fight_colors.get(det['fight_type'], colors['fight']) else: color = colors.get(det['type'], (0, 255, 0)) # Draw rectangle with thicker line for high-threat detections thickness = 4 if det.get('threat_level') == 'critical' else 3 if det['type'] in ['weapon', 'fight'] else 2 cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness) # Create detailed label if det['type'] == 'weapon': label = f"{det['class']} ({det['confidence']:.2f})" if 'threat_level' in det: label += f" [{det['threat_level']}]" elif det['type'] == 'fight': label = f"FIGHT: {det['class']} ({det['confidence']:.2f})" if 'threat_level' in det: label += f" [{det['threat_level']}]" if 'aggression_level' in det: label += f" {det['aggression_level']}" else: label = f"{det['type']}: {det['class']} ({det['confidence']:.2f})" # Draw label background label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] cv2.rectangle(image, (x1, y1 - 25), (x1 + label_size[0] + 5, y1), color, -1) # Draw label text cv2.putText(image, label, (x1 + 2, y1 - 7), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # Add additional context for fights if det['type'] == 'fight': context_text = [] if 'people_involved' in det and det['people_involved'] > 0: context_text.append(f"People: {det['people_involved']}") if 'context_flags' in det and det['context_flags']: context_text.append(f"Flags: {', '.join(det['context_flags'])}") if context_text: context_label = " | ".join(context_text) cv2.putText(image, context_label, (x1, y2 + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) # Add detection method indicator (small text) if 'detection_method' in det: method = det['detection_method'].split('_')[-1] cv2.putText(image, method, (x1, y2 + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) return image except Exception as e: print(f"❌ Error drawing detections: {e}") return image def process_video(self, video_path, output_path=None, frame_skip=2): """Process video file with enhanced detection including fights - optimized frame processing""" try: cap = cv2.VideoCapture(video_path) frame_count = 0 total_detections = [] fight_timeline = [] # Track fights over time recent_detections = [] # Track recent detections for adaptive processing if output_path: fourcc = cv2.VideoWriter_fourcc(*'mp4v') fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Adaptive frame processing based on recent detections should_process = False # Always process if recent threats detected (within last 10 frames) if any(det['frame'] > frame_count - 10 for det in recent_detections[-5:]): should_process = True # Or process based on reduced skip rate elif frame_count % max(1, frame_skip) == 0: should_process = True if not should_process: if output_path: out.write(frame) continue # Process frame result = self.process_image(frame) if result and result['detections']: # Add frame number to each detection for tracking for detection in result['detections']: detection['frame'] = frame_count total_detections.extend(result['detections']) recent_detections.append({'frame': frame_count, 'count': len(result['detections'])}) # Track fight timeline fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] if fight_detections: timestamp = frame_count / cap.get(cv2.CAP_PROP_FPS) fight_timeline.append({ 'timestamp': timestamp, 'frame': frame_count, 'fights': len(fight_detections), 'max_aggression': max(f.get('aggression_level', 'low') for f in fight_detections) }) # Reduce frame_skip temporarily when fight detected frame_skip = max(1, frame_skip // 2) print(f"⚠️ Frame {frame_count}: {len(result['detections'])} threats detected") breakdown = result.get('detection_breakdown', {}) if breakdown.get('fights', 0) > 0: print(f" 👊 Fights: {breakdown['fights']}") if output_path and 'annotated_image' in result: out.write(result['annotated_image']) elif output_path: out.write(frame) else: # No detections - can increase frame_skip for efficiency if len(recent_detections) > 5 and all(det['count'] == 0 for det in recent_detections[-5:]): frame_skip = min(5, frame_skip + 1) if output_path: out.write(frame) cap.release() if output_path: out.release() # Analysis of fight patterns fight_analysis = {} if fight_timeline: fight_analysis = { 'total_fight_incidents': len(fight_timeline), 'first_fight_time': fight_timeline[0]['timestamp'], 'last_fight_time': fight_timeline[-1]['timestamp'], 'peak_aggression_time': max(fight_timeline, key=lambda x: x['max_aggression'])['timestamp'], 'fight_duration_coverage': fight_timeline[-1]['timestamp'] - fight_timeline[0]['timestamp'] if len( fight_timeline) > 1 else 0 } return { 'total_frames_processed': frame_count // frame_skip, 'total_detections': len(total_detections), 'detections': total_detections, 'fight_timeline': fight_timeline, 'fight_analysis': fight_analysis, 'detection_breakdown': { 'weapons': len([d for d in total_detections if d['type'] == 'weapon']), 'fights': len([d for d in total_detections if d['type'] == 'fight']), 'nsfw': len([d for d in total_detections if d['type'] == 'nsfw']) } } except Exception as e: print(f"❌ Error processing video: {e}") return None def save_report(self, filename="detection_report.json"): """Save detection history to file""" try: with open(filename, 'w') as f: json.dump(self.detection_history, f, indent=2, default=str) print(f"📊 Report saved to {filename}") except Exception as e: print(f"❌ Error saving report: {e}") def get_memory_usage(self): """Get current GPU memory usage""" if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated() / 1024 ** 3 cached = torch.cuda.memory_reserved() / 1024 ** 3 return f"GPU Memory: {allocated:.2f}GB allocated, {cached:.2f}GB cached" return "CPU mode" def main(): """Enhanced example usage with knife and fight detection improvements""" # Initialize the system moderator = ContentModerator() # Show enhanced system information print("\n" + "=" * 60) print("🎯 ENHANCED DUAL MODEL SYSTEM WITH FIGHT DETECTION") print("=" * 60) status = moderator.get_model_status() if status['custom_weapon_fight_model']: print("✅ Custom YOLO11 Model (dao + súng + fight): LOADED") if 'custom_classes' in status: print(f"📊 Custom classes: {status['custom_classes']}") else: print("❌ Custom weapon+fight model: NOT FOUND") if status['general_model']: print("✅ General YOLO11n Model (person detection): LOADED") else: print("❌ General model: FAILED") if status['nsfw_classifier']: print("✅ NSFW Classifier: LOADED") else: print("❌ NSFW Classifier: FAILED") print(f"🖥️ Device: {status['device']}") print(f"🗄️ Cache system: ENABLED") print(f"🔪 Knife enhancement: {'ENABLED' if status['knife_enhancement'] else 'DISABLED'}") print(f"📈 Knife confidence boost: {'ENABLED' if status['knife_boost'] else 'DISABLED'}") print(f"👊 Fight detection: {'ENABLED' if status['fight_detection'] else 'DISABLED'}") print(f"🧠 Fight analysis: {'ENABLED' if status['fight_analysis'] else 'DISABLED'}") # Enhanced features info print("\n" + "=" * 60) print("✨ ENHANCED DETECTION FEATURES") print("=" * 60) print("🔧 Image Enhancement:") print(" - Contrast & brightness optimization") print(" - Edge sharpening for metallic objects") print(" - CLAHE for local contrast") print("📊 Confidence Boosting:") print(" - Geometric analysis (knives)") print(" - Motion blur analysis (fights)") print(" - Edge strength analysis") print("🎯 Multi-pass Detection:") print(" - Low threshold pass for knives (0.45)") print(" - Normal threshold for guns (0.45)") print(" - Low threshold for fights (0.40)") print("👊 Fight Analysis:") print(" - Multi-person fight detection") print(" - Aggression level assessment") print(" - Context-aware threat escalation") # Example 1: Process single image print("\n" + "=" * 50) print("🖼️ SINGLE IMAGE PROCESSING") print("=" * 50) test_image = "test_image.jpg" if os.path.exists(test_image): result = moderator.process_image(test_image) if result: print(f"\n📊 DETECTION RESULTS:") print(f"Risk Level: {result['risk_level']}") print(f"Total Threats: {result['total_threats']}") print(f"Processing Method: {result.get('processing_method', 'standard')}") breakdown = result.get('detection_breakdown', {}) if breakdown: print(f"\n📈 BREAKDOWN:") print(f" Weapons: {breakdown.get('weapons', 0)}") print(f" Fights: {breakdown.get('fights', 0)}") print(f" NSFW: {breakdown.get('nsfw', 0)}") # Show weapon-specific results weapon_detections = [d for d in result['detections'] if d['type'] == 'weapon'] if weapon_detections: print(f"\n🔫 WEAPON DETECTIONS: {len(weapon_detections)}") for i, detection in enumerate(weapon_detections): method = detection.get('detection_method', 'unknown') print(f" Weapon {i + 1} ({method}):") print(f" Class: {detection['class']}") print(f" Type: {detection['weapon_type']}") print(f" Confidence: {detection['confidence']:.3f}") print(f" Threat Level: {detection['threat_level']}") # Show fight-specific results fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] if fight_detections: print(f"\n👊 FIGHT DETECTIONS: {len(fight_detections)}") for i, detection in enumerate(fight_detections): method = detection.get('detection_method', 'unknown') print(f" Fight {i + 1} ({method}):") print(f" Class: {detection['class']}") print(f" Type: {detection.get('fight_type', 'unknown')}") print(f" Confidence: {detection['confidence']:.3f}") print(f" Threat Level: {detection['threat_level']}") print(f" Aggression: {detection.get('aggression_level', 'unknown')}") if 'people_involved' in detection: print(f" People Involved: {detection['people_involved']}") if 'context_flags' in detection and detection['context_flags']: print(f" Context: {', '.join(detection['context_flags'])}") # Show NSFW results nsfw_detections = [d for d in result['detections'] if d['type'] == 'nsfw'] if nsfw_detections: print(f"\n🔞 NSFW DETECTIONS: {len(nsfw_detections)}") for i, detection in enumerate(nsfw_detections): method = detection.get('method', 'unknown') print(f" NSFW {i + 1} ({method}):") print(f" Class: {detection['class']}") print(f" Confidence: {detection['confidence']:.3f}") if 'skin_ratio' in detection: print(f" Skin Ratio: {detection['skin_ratio']:.2f}") else: print(f"⚠️ Test image not found: {test_image}") print("Creating a test pattern to demonstrate detection...") # Create a synthetic test image test_img = np.ones((640, 640, 3), dtype=np.uint8) * 128 cv2.putText(test_img, "Test Pattern", (200, 320), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3) result = moderator.process_image(test_img) print("✅ Test pattern processed successfully") # Example 2: Enhanced webcam processing with fight detection print("\n" + "=" * 60) print("📹 ENHANCED WEBCAM PROCESSING WITH FIGHT DETECTION") print("=" * 60) print("Starting enhanced detection on webcam...") print("🎮 Controls:") print(" - Press 'q' to quit") print(" - Press 's' to save frame") print(" - Press 'i' to show model info") print(" - Press 'e' to toggle enhancement") print(" - Press 'b' to toggle knife confidence boost") print(" - Press 'f' to toggle fight analysis") print(" - Press 'h' for help") try: cap = cv2.VideoCapture(0) if not cap.isOpened(): print("❌ Cannot open webcam. Check if camera is connected.") else: print("✅ Enhanced webcam processing started") frame_count = 0 detection_stats = { 'weapons': 0, 'knives': 0, 'guns': 0, 'fights': 0, 'nsfw': 0, 'total_frames': 0, 'fight_incidents': 0 } # Adaptive processing variables process_interval = 2 # Start with every 2nd frame last_detection_frame = 0 consecutive_safe_frames = 0 while True: ret, frame = cap.read() if not ret: print("❌ Cannot read from webcam") break frame_count += 1 detection_stats['total_frames'] = frame_count frame = cv2.flip(frame, 1) # Add status overlay y_offset = frame.shape[0] - 120 cv2.putText(frame, f"Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}", (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) cv2.putText(frame, f"Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}", (10, y_offset + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) cv2.putText(frame, f"Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}", (10, y_offset + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) model_info = "Models: Custom+General" if moderator.weapon_model_custom else "General Only" cv2.putText(frame, model_info, (10, y_offset + 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # Adaptive frame processing - process more frequently when threats detected should_process = False # Always process if recent threats (within last 30 frames) if frame_count - last_detection_frame <= 30: should_process = (frame_count % 1 == 0) # Process every frame cv2.putText(frame, "HIGH ALERT MODE", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) # Normal processing with reduced interval elif frame_count % process_interval == 0: should_process = True if should_process: result = moderator.process_image(frame) if result and result['action_required']: last_detection_frame = frame_count # Update last detection frame consecutive_safe_frames = 0 process_interval = 1 # Process every frame when threats detected # Count detections by type for detection in result['detections']: if detection['type'] == 'weapon': detection_stats['weapons'] += 1 if detection['weapon_type'] == 'blade': detection_stats['knives'] += 1 elif detection['weapon_type'] == 'firearm': detection_stats['guns'] += 1 elif detection['type'] == 'fight': detection_stats['fights'] += 1 if detection.get('aggression_level') in ['high', 'extreme']: detection_stats['fight_incidents'] += 1 elif detection['type'] == 'nsfw': detection_stats['nsfw'] += 1 print( f"⚠️ Frame {frame_count}: {result['risk_level']} risk - {result['total_threats']} threats!") # Show specific detections with fight info for detection in result['detections']: if detection['type'] == 'weapon': icon = "🔪" if detection['weapon_type'] == 'blade' else "🔫" method = detection.get('detection_method', 'unknown').split('_')[-1] print(f" {icon} {detection['class']} ({detection['confidence']:.3f}) [{method}]") elif detection['type'] == 'fight': fight_type = detection.get('fight_type', 'general') aggression = detection.get('aggression_level', 'unknown') people = detection.get('people_involved', 0) method = detection.get('detection_method', 'unknown').split('_')[-1] print(f" 👊 FIGHT: {fight_type} ({detection['confidence']:.3f}) [{method}]") print(f" Aggression: {aggression}, People: {people}") # Use annotated frame if 'annotated_image' in result: cv2.imshow('Enhanced Detection System (Weapons + Fights)', result['annotated_image']) else: # Add threat counter breakdown = result.get('detection_breakdown', {}) threat_text = f"THREATS: W:{breakdown.get('weapons', 0)} F:{breakdown.get('fights', 0)} N:{breakdown.get('nsfw', 0)}" cv2.putText(frame, threat_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) else: consecutive_safe_frames += 1 # Gradually increase processing interval when safe (up to max 3) if consecutive_safe_frames > 30: process_interval = min(3, process_interval + 1) consecutive_safe_frames = 0 cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) else: cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) # Handle key presses key = cv2.waitKey(1) & 0xFF if key == ord('q'): print("🛑 Webcam stopped by user") break elif key == ord('s'): filename = f"enhanced_detection_{frame_count}.jpg" cv2.imwrite(filename, frame) print(f"💾 Frame saved as {filename}") elif key == ord('i'): print(f"\n📊 Model Status:") current_status = moderator.get_model_status() for k, v in current_status.items(): print(f" {k}: {v}") elif key == ord('e'): # Toggle enhancement moderator.config['weapon_detection']['use_enhancement'] = \ not moderator.config['weapon_detection']['use_enhancement'] print( f"🔧 Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}") elif key == ord('b'): # Toggle knife boost moderator.config['weapon_detection']['boost_knife_detection'] = \ not moderator.config['weapon_detection']['boost_knife_detection'] print( f"📈 Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}") elif key == ord('f'): # Toggle fight analysis moderator.config['weapon_detection']['fight_analysis'] = \ not moderator.config['weapon_detection']['fight_analysis'] print( f"👊 Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}") elif key == ord('h'): print("\n🎮 Controls:") print(" 'q': quit") print(" 's': save frame") print(" 'i': model info") print(" 'e': toggle enhancement") print(" 'b': toggle knife confidence boost") print(" 'f': toggle fight analysis") print(" 'h': help") # Show comprehensive session statistics print(f"\n📈 Session Statistics:") print(f" Total frames: {detection_stats['total_frames']}") print(f" Total weapon detections: {detection_stats['weapons']}") print(f" - Knives/Dao: {detection_stats['knives']}") print(f" - Guns: {detection_stats['guns']}") print(f" Total fight detections: {detection_stats['fights']}") print(f" - High-aggression incidents: {detection_stats['fight_incidents']}") print(f" NSFW detections: {detection_stats['nsfw']}") if detection_stats['total_frames'] > 0: total_detections = detection_stats['weapons'] + detection_stats['fights'] + detection_stats['nsfw'] detection_rate = (total_detections / detection_stats['total_frames'] * 100) print(f" Overall detection rate: {detection_rate:.1f}%") if detection_stats['weapons'] > 0: knife_ratio = detection_stats['knives'] / detection_stats['weapons'] * 100 print(f" Knife detection ratio: {knife_ratio:.1f}% of weapons") if detection_stats['fights'] > 0: incident_ratio = detection_stats['fight_incidents'] / detection_stats['fights'] * 100 print(f" High-aggression fight ratio: {incident_ratio:.1f}% of fights") cap.release() cv2.destroyAllWindows() print("✅ Enhanced webcam session completed") except Exception as e: print(f"❌ Webcam error: {e}") # Show final system status print(f"\n💾 {moderator.get_memory_usage()}") print(f"🗄️ Final cache size: {len(moderator.detection_cache)} entries") # Save enhanced report moderator.save_report("enhanced_detection_with_fights_report.json") print("\n✅ Enhanced Content Moderation System with Fight Detection completed!") print("💡 New fight detection capabilities:") print(" - Behavioral fight pattern recognition") print(" - Multi-person fight analysis") print(" - Aggression level assessment") print(" - Context-aware threat escalation") print(" - Fight timeline tracking for videos") print("💡 Enhanced weapon detection:") print(" - Image enhancement preprocessing") print(" - Dynamic confidence thresholds") print(" - Geometric feature analysis") print(" - Multi-pass detection strategy") if __name__ == "__main__": main()