kfvideodt / main.py
Haiss123's picture
Upload 5 files
9adeec5 verified
raw
history blame
82.7 kB
import cv2
import numpy as np
import torch
import os
import json
import warnings
warnings.filterwarnings('ignore')
# Import required libraries
try:
from ultralytics import YOLO
from transformers import pipeline
from PIL import Image, ImageDraw, ImageFont
import requests
from datetime import datetime
# MediaPipe import with fallback
try:
import mediapipe as mp
MEDIAPIPE_AVAILABLE = True
print("โœ… MediaPipe imported successfully")
except ImportError:
MEDIAPIPE_AVAILABLE = False
print("โš ๏ธ MediaPipe not available - pose detection disabled")
except Exception as e:
MEDIAPIPE_AVAILABLE = False
print(f"โš ๏ธ MediaPipe import error: {e} - pose detection disabled")
except ImportError as e:
print(f"Missing dependency: {e}")
print("Please install: pip install ultralytics transformers pillow requests")
print("For MediaPipe: pip install mediapipe==0.10.18")
class ContentModerator:
def __init__(self, config=None):
default_cfg = self.get_default_config()
self.config = self.deep_merge_dicts(default_cfg, config) if config else default_cfg
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
# CPU optimizations
if self.device == 'cpu':
print("๐Ÿ’ป CPU mode detected - applying optimizations...")
torch.set_num_threads(4)
self.config['performance']['half_precision'] = False
self.config['nsfw_detection']['pose_analysis'] = False
# Initialize models
self.weapon_model = None # Primary weapon model
self.weapon_model_custom = None # Custom model for dao + sรบng + fight
self.weapon_model_general = None # General model for person + backup weapons
self.nsfw_classifier = None
self.pose_detector = None
# Performance optimization
self.detection_cache = {}
self.cache_ttl = 2 # Cache for 2 seconds
# Results storage
self.detection_history = []
print(f"๐Ÿš€ Content Moderator initialized on {self.device}")
if self.device == 'cpu':
print("โšก CPU optimizations enabled")
self.setup_models()
def deep_merge_dicts(self, a: dict, b: dict) -> dict:
"""Merge dict b into a (non-destructive). Returns merged copy.
- Keeps keys from a when missing in b.
- Recursively merges nested dicts.
"""
if not isinstance(a, dict):
return b if isinstance(b, dict) else a
result = dict(a) # shallow copy
if not b:
return result
for k, v in b.items():
if k in result and isinstance(result[k], dict) and isinstance(v, dict):
result[k] = self.deep_merge_dicts(result[k], v)
else:
result[k] = v
return result
def get_default_config(self):
"""Default configuration optimized for CPU/GPU with enhanced knife and fight detection"""
# Auto-detect optimal settings
is_cuda = torch.cuda.is_available()
return {
'weapon_detection': {
'enabled': True,
'confidence_threshold': 0.45, # For guns
'knife_confidence': 0.45, # Lower threshold for knives
'fight_confidence': 0.40, # Lower threshold for fights (behavioral)
'model_size': 'yolo11n',
'classes': ['knife', 'gun', 'rifle', 'pistol', 'weapon', 'fight'],
'use_enhancement': True, # Enable image enhancement for knives
'multi_pass': True, # Enable multi-pass detection
'boost_knife_detection': True, # Enable knife confidence boosting
'fight_detection': True, # Enable fight-specific detection
'fight_analysis': True # Enable advanced fight behavior analysis
},
'fight_detection': {
'enabled': True,
'confidence_threshold': 0.40,
'pose_analysis': True, # Analyze poses for fighting
'motion_analysis': False, # Motion-based fight detection (for video)
'aggression_keywords': ['fight', 'violence', 'aggression', 'combat'],
'threat_escalation': True, # Escalate threat level for fights
'multi_person_analysis': True # Analyze interactions between people
},
'nsfw_detection': {
'enabled': True,
'confidence_threshold': 0.7,
'skin_detection': True,
'pose_analysis': False,
'region_analysis': True
},
'performance': {
'image_size': 416 if is_cuda else 320,
'batch_size': 1,
'half_precision': is_cuda,
'use_flash_attention': False,
'cpu_optimization': not is_cuda
},
'output': {
'save_detections': True,
'draw_boxes': True,
'log_results': True
}
}
def setup_models(self):
"""Initialize all detection models"""
try:
# Clear GPU cache
if torch.cuda.is_available():
torch.cuda.empty_cache()
# 1. Setup Weapon Detection (now includes fight detection)
if self.config['weapon_detection']['enabled']:
self.setup_weapon_detector()
# 2. Setup NSFW Detection
if self.config['nsfw_detection']['enabled']:
self.setup_nsfw_detector()
print("โœ… All models loaded successfully!")
except Exception as e:
print(f"โŒ Error setting up models: {e}")
def setup_weapon_detector(self):
"""Setup dual model system: Custom for weapons + fights + General for person detection"""
try:
print("๐Ÿ”ซ Loading weapon and fight detection models...")
# Model 1: Custom YOLO11 for weapons (dao + sรบng + fight)
custom_model_path = "models/best.pt"
project_root = os.path.dirname(os.path.abspath(__file__))
full_model_path = os.path.join(project_root, custom_model_path)
if os.path.exists(full_model_path):
print(f"โœ… Loading custom weapon+fight model: {full_model_path}")
self.weapon_model_custom = YOLO(full_model_path)
print("๐ŸŽฏ Custom weapon+fight model (dao + sรบng + fight) loaded!")
# Show custom model classes
if hasattr(self.weapon_model_custom, 'names'):
classes = list(self.weapon_model_custom.names.values())
print(f"๐Ÿ“Š Custom classes: {classes}")
# Check if fight class is available
if any('fight' in str(cls).lower() for cls in classes):
print("๐Ÿ‘Š Fight detection enabled in custom model")
else:
print("โš ๏ธ Fight class not found in custom model")
else:
print("โš ๏ธ Custom weapon+fight model not found")
self.weapon_model_custom = None
# Model 2: General YOLO11n for person detection and fight fallback
print("๐Ÿ‘ค Loading general model for person detection...")
self.weapon_model_general = YOLO('yolo11n.pt')
print("โœ… General YOLO11n loaded for person detection")
# Set primary weapon model
self.weapon_model = self.weapon_model_custom if self.weapon_model_custom else self.weapon_model_general
# Optimize models for performance
if self.device == 'cuda' and self.config['performance']['half_precision']:
try:
if self.weapon_model_custom:
self.weapon_model_custom.model.half()
self.weapon_model_general.model.half()
print("โœ… Half precision enabled for both models")
except:
print("โš ๏ธ Half precision not supported")
print("๐Ÿ”ฅ Dual model system ready with fight detection!")
except Exception as e:
print(f"โŒ Error loading weapon+fight models: {e}")
self.weapon_model = None
self.weapon_model_custom = None
self.weapon_model_general = None
def detect_weapons(self, image):
"""Enhanced dual model weapon and fight detection"""
detections = []
try:
imgsz = self.config['performance']['image_size']
use_half = self.config['performance']['half_precision'] and self.device == 'cuda'
# Prepare multiple versions of the image
images_to_process = [(image, 1.0, "original")]
if self.config['weapon_detection']['use_enhancement']:
enhanced_image = self.enhance_knife_detection(image)
images_to_process.append((enhanced_image, 1.15, "enhanced"))
# Process each image version
for img, weight_multiplier, img_type in images_to_process:
if self.weapon_model_custom:
# Use different confidence thresholds for different detection types
knife_conf = self.config['weapon_detection']['knife_confidence']
gun_conf = self.config['weapon_detection']['confidence_threshold']
fight_conf = self.config['weapon_detection']['fight_confidence']
# Multi-pass detection with different thresholds
passes = [
(knife_conf, "knife_pass"), # Low threshold for knives
(gun_conf, "gun_pass"), # Normal threshold for guns
(fight_conf, "fight_pass") # Low threshold for fights
] if self.config['weapon_detection']['multi_pass'] else [
(min(knife_conf, fight_conf), "single_pass")]
for conf_threshold, pass_type in passes:
try:
results = self.weapon_model_custom(
img,
imgsz=imgsz,
conf=conf_threshold,
device=self.device,
half=use_half,
verbose=False,
augment=True # Enable test-time augmentation
)
for result in results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
class_id = int(box.cls[0])
if hasattr(result, 'names') and class_id in result.names:
class_name = result.names[class_id].lower()
else:
class_name = f"detection_{class_id}"
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = float(box.conf[0]) * weight_multiplier
# Determine detection type and apply appropriate processing
if self.is_fight_detection(class_name):
# Fight detection processing
confidence = self.boost_fight_confidence(
img, [x1, y1, x2, y2], confidence, class_name
)
detection_type = 'fight'
min_conf = fight_conf
threat_level = self.assess_fight_threat(confidence, img, [x1, y1, x2, y2])
else:
# Weapon detection processing
if self.config['weapon_detection']['boost_knife_detection']:
if 'dao' in class_name or 'knife' in class_name or 'blade' in class_name:
confidence = self.boost_knife_confidence(
img, [x1, y1, x2, y2], confidence, class_name
)
detection_type = 'weapon'
weapon_type = self.classify_weapon_type(class_name)
min_conf = knife_conf if weapon_type == 'blade' else gun_conf
threat_level = self.assess_weapon_threat(weapon_type, confidence)
if confidence >= min_conf:
detection_data = {
'type': detection_type,
'class': class_name,
'confidence': min(confidence, 0.99),
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'threat_level': threat_level,
'detection_method': f'custom_model_{img_type}_{pass_type}'
}
# Add type-specific fields
if detection_type == 'weapon':
detection_data['weapon_type'] = weapon_type
elif detection_type == 'fight':
detection_data['fight_type'] = self.classify_fight_type(class_name)
detection_data['aggression_level'] = self.assess_aggression_level(
confidence)
detections.append(detection_data)
icon = "๐Ÿ‘Š" if detection_type == 'fight' else "๐ŸŽฏ"
print(
f" {icon} Detected: {class_name} (conf: {confidence:.3f}, method: {img_type}_{pass_type})")
except Exception as e:
print(f"โš ๏ธ Detection pass error ({pass_type}): {e}")
# Fallback: General model for backup detection (only if no custom detections)
if self.weapon_model_general and len(detections) == 0:
detections.extend(self.fallback_detection(image, imgsz, use_half))
# Remove duplicate detections
detections = self.remove_duplicate_detections(detections)
# Additional fight analysis if enabled
# Additional fight analysis if enabled (safe access)
try:
# weapon_detection may contain a simple boolean or a dict for fight_detection
wd = self.config.get('weapon_detection', {})
wd_fd = wd.get('fight_detection', None)
# Determine whether fight analysis is enabled
if isinstance(wd_fd, dict):
fight_enabled = wd_fd.get('enabled', False)
fight_multi_person = wd_fd.get('multi_person_analysis', False)
else:
# wd_fd may be boolean (legacy); consult top-level fight_detection dict for details
fight_enabled = bool(wd_fd)
fight_multi_person = bool(
self.config.get('fight_detection', {}).get('multi_person_analysis', False))
if fight_enabled and fight_multi_person:
fight_detections = [d for d in detections if d.get('type') == 'fight']
if fight_detections:
try:
enhanced_fights = self.analyze_fight_context(image, fight_detections)
# Replace original fight detections with enhanced ones
detections = [d for d in detections if d.get('type') != 'fight'] + enhanced_fights
except Exception as e:
print(f"โš ๏ธ Fight context enhancement error: {e}")
except Exception as e:
# Defensive: never allow missing config to break detection pipeline
import traceback
traceback.print_exc()
print(f"โš ๏ธ Skipping fight context analysis due to config error: {e}")
return detections
except Exception as e:
print(f"โŒ Weapon and fight detection error: {e}")
return []
def is_fight_detection(self, class_name):
"""Check if detection is fight-related"""
fight_keywords = ['fight', 'fighting', 'combat', 'violence', 'aggression', 'brawl', 'scuffle']
return any(keyword in class_name.lower() for keyword in fight_keywords)
def classify_fight_type(self, class_name):
"""Classify type of fight detected"""
class_name = class_name.lower()
if any(word in class_name for word in ['punch', 'boxing', 'fist']):
return 'physical_combat'
elif any(word in class_name for word in ['kick', 'martial', 'karate']):
return 'martial_arts'
elif any(word in class_name for word in ['wrestle', 'grapple']):
return 'wrestling'
elif any(word in class_name for word in ['group', 'mob', 'crowd']):
return 'group_violence'
else:
return 'general_fight'
def boost_fight_confidence(self, image, bbox, initial_confidence, class_name):
"""Boost confidence for fight detection based on contextual analysis"""
try:
x1, y1, x2, y2 = [int(coord) for coord in bbox]
# Ensure bbox is within image bounds
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(image.shape[1], x2)
y2 = min(image.shape[0], y2)
roi = image[y1:y2, x1:x2]
if roi.size == 0:
return initial_confidence
boost = 0
# 1. Motion blur analysis (indicates rapid movement)
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
blur_variance = cv2.Laplacian(gray, cv2.CV_64F).var()
if blur_variance < 100: # Low variance indicates blur/motion
boost += 0.10
# 2. Edge density (chaotic scenes have more edges)
edges = cv2.Canny(gray, 50, 150)
edge_density = np.count_nonzero(edges) / edges.size
if edge_density > 0.15:
boost += 0.08
# 3. Color analysis (fights often have varied, chaotic colors)
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
color_variance = np.var(hsv[:, :, 1]) # Saturation variance
if color_variance > 1000:
boost += 0.05
# 4. Texture analysis (complex textures indicate multiple overlapping objects)
gray_f = np.float32(gray)
texture_response = cv2.cornerHarris(gray_f, 2, 3, 0.04)
texture_strength = np.mean(texture_response)
if texture_strength > 0.01:
boost += 0.07
# 5. Aspect ratio analysis (fights often have irregular bounding boxes)
height = y2 - y1
width = x2 - x1
if height > 0 and width > 0:
aspect_ratio = max(width, height) / min(width, height)
if 1.2 < aspect_ratio < 3.0: # Moderate irregularity
boost += 0.05
final_confidence = min(initial_confidence + boost, 0.95)
if boost > 0:
print(f" ๐Ÿ‘Š Fight boost applied: +{boost:.2f} (blur:{blur_variance:.0f}, edge:{edge_density:.2f})")
return final_confidence
except Exception as e:
print(f"โš ๏ธ Fight confidence boost error: {e}")
return initial_confidence
def assess_fight_threat(self, confidence, image, bbox):
"""Assess threat level of detected fight"""
base_threat = 'medium' # Fights start at medium threat
# Escalate based on confidence
if confidence >= 0.85:
base_threat = 'critical'
elif confidence >= 0.70:
base_threat = 'high'
elif confidence >= 0.50:
base_threat = 'medium'
else:
base_threat = 'low'
# Additional context-based escalation
try:
x1, y1, x2, y2 = bbox
fight_area = (x2 - x1) * (y2 - y1)
image_area = image.shape[0] * image.shape[1]
area_ratio = fight_area / image_area
# Large fights are more dangerous
if area_ratio > 0.5: # Fight covers >50% of image
if base_threat == 'medium':
base_threat = 'high'
elif base_threat == 'high':
base_threat = 'critical'
except Exception as e:
print(f"โš ๏ธ Fight threat assessment error: {e}")
return base_threat
def assess_aggression_level(self, confidence):
"""Assess aggression level based on confidence"""
if confidence >= 0.80:
return 'extreme'
elif confidence >= 0.65:
return 'high'
elif confidence >= 0.45:
return 'moderate'
else:
return 'low'
def analyze_fight_context(self, image, fight_detections):
"""Enhanced analysis of fight context with multi-person detection"""
enhanced_fights = []
try:
# Detect all persons in the image
persons = self.detect_persons(image)
for fight in fight_detections:
enhanced_fight = fight.copy()
# Count people involved in or near the fight
fight_bbox = fight['bbox']
people_in_fight = 0
people_nearby = 0
for person in persons:
person_bbox = person['bbox']
# Calculate overlap with fight area
overlap = self.calculate_bbox_overlap(fight_bbox, person_bbox)
if overlap > 0.3: # Person is directly involved
people_in_fight += 1
elif overlap > 0.1: # Person is nearby
people_nearby += 1
# Update fight information based on context
enhanced_fight['people_involved'] = people_in_fight
enhanced_fight['people_nearby'] = people_nearby
enhanced_fight['total_people'] = people_in_fight + people_nearby
# Escalate threat based on number of people
if people_in_fight >= 3:
if enhanced_fight['threat_level'] == 'medium':
enhanced_fight['threat_level'] = 'high'
elif enhanced_fight['threat_level'] == 'high':
enhanced_fight['threat_level'] = 'critical'
enhanced_fight['fight_type'] = 'group_violence'
# Add context flags
enhanced_fight['context_flags'] = []
if people_in_fight >= 3:
enhanced_fight['context_flags'].append('multi_person_fight')
if people_nearby >= 2:
enhanced_fight['context_flags'].append('crowd_present')
enhanced_fights.append(enhanced_fight)
print(f" ๐Ÿ‘ฅ Fight context: {people_in_fight} involved, {people_nearby} nearby")
except Exception as e:
print(f"โš ๏ธ Fight context analysis error: {e}")
return fight_detections
return enhanced_fights
def calculate_bbox_overlap(self, bbox1, bbox2):
"""Calculate overlap ratio between two bounding boxes"""
x1_min, y1_min, x1_max, y1_max = bbox1
x2_min, y2_min, x2_max, y2_max = bbox2
# Calculate intersection
intersect_xmin = max(x1_min, x2_min)
intersect_ymin = max(y1_min, y2_min)
intersect_xmax = min(x1_max, x2_max)
intersect_ymax = min(y1_max, y2_max)
if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin:
return 0.0
intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin)
bbox1_area = (x1_max - x1_min) * (y1_max - y1_min)
return intersect_area / bbox1_area if bbox1_area > 0 else 0
def fallback_detection(self, image, imgsz, use_half):
"""Fallback detection using general model"""
detections = []
try:
general_results = self.weapon_model_general(
image,
imgsz=imgsz,
conf=0.4,
device=self.device,
half=use_half,
verbose=False
)
for result in general_results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
class_id = int(box.cls[0])
class_name = result.names[class_id].lower()
# Filter for weapon-like objects
weapon_keywords = ['knife', 'scissors', 'fork']
if any(keyword in class_name for keyword in weapon_keywords):
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = float(box.conf[0])
detections.append({
'type': 'weapon',
'class': class_name,
'weapon_type': 'blade',
'confidence': confidence,
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'threat_level': self.assess_weapon_threat('blade', confidence),
'detection_method': 'general_model_fallback'
})
except Exception as e:
print(f"โš ๏ธ General detection error: {e}")
return detections
# ... (rest of the existing methods remain the same) ...
def enhance_knife_detection(self, image):
"""Enhance image specifically for better knife/dao detection"""
try:
# 1. Increase contrast and brightness for metallic objects
enhanced = cv2.convertScaleAbs(image, alpha=1.4, beta=25)
# 2. Apply sharpening kernel to highlight edges
kernel_sharpen = np.array([[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]])
sharpened = cv2.filter2D(enhanced, -1, kernel_sharpen)
# 3. Apply CLAHE for better local contrast
lab = cv2.cvtColor(sharpened, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
l = clahe.apply(l)
enhanced_final = cv2.merge([l, a, b])
enhanced_final = cv2.cvtColor(enhanced_final, cv2.COLOR_LAB2BGR)
return enhanced_final
except Exception as e:
print(f"โš ๏ธ Enhancement failed: {e}")
return image
def boost_knife_confidence(self, image, bbox, initial_confidence, class_name):
"""Boost confidence for knife/dao based on geometric and visual features"""
try:
x1, y1, x2, y2 = [int(coord) for coord in bbox]
# Ensure bbox is within image bounds
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(image.shape[1], x2)
y2 = min(image.shape[0], y2)
roi = image[y1:y2, x1:x2]
if roi.size == 0:
return initial_confidence
boost = 0
# 1. Check aspect ratio (knives are typically elongated)
height = y2 - y1
width = x2 - x1
if height > 0 and width > 0:
aspect_ratio = max(width, height) / min(width, height)
if aspect_ratio > 2.5: # Elongated shape
boost += 0.15
elif aspect_ratio > 2.0:
boost += 0.10
# 2. Check for metallic reflection (brightness)
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
mean_brightness = np.mean(gray)
std_brightness = np.std(gray)
if mean_brightness > 140: # Bright (metallic)
boost += 0.10
if std_brightness > 50: # High contrast (blade edge)
boost += 0.05
# 3. Edge detection (knives have strong edges)
edges = cv2.Canny(gray, 50, 150)
edge_ratio = np.count_nonzero(edges) / edges.size
if edge_ratio > 0.15: # Strong edges
boost += 0.10
elif edge_ratio > 0.10:
boost += 0.05
# 4. Check for blade-like gradient
if height > width: # Vertical orientation
gradient = np.gradient(gray, axis=0)
else: # Horizontal orientation
gradient = np.gradient(gray, axis=1)
gradient_strength = np.mean(np.abs(gradient))
if gradient_strength > 10:
boost += 0.05
# Apply boost with class-specific multiplier
if 'dao' in class_name.lower() or 'knife' in class_name.lower():
boost *= 1.2 # Extra boost for knife/dao classes
final_confidence = min(initial_confidence + boost, 0.95)
if boost > 0:
print(
f" ๐Ÿ”ช Knife boost applied: +{boost:.2f} (AR:{aspect_ratio:.1f}, Bright:{mean_brightness:.0f}, Edge:{edge_ratio:.2f})")
return final_confidence
except Exception as e:
print(f"โš ๏ธ Confidence boost error: {e}")
return initial_confidence
def detect_persons(self, image):
"""Detect persons using general model (needed for NSFW and fight analysis)"""
persons = []
if not self.weapon_model_general:
return persons
try:
imgsz = self.config['performance']['image_size']
use_half = self.config['performance']['half_precision'] and self.device == 'cuda'
results = self.weapon_model_general(
image,
imgsz=imgsz,
conf=0.3,
device=self.device,
half=use_half,
verbose=False
)
for result in results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
class_id = int(box.cls[0])
class_name = result.names[class_id].lower()
if class_name == 'person':
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = float(box.conf[0])
persons.append({
'class': 'person',
'confidence': confidence,
'bbox': [int(x1), int(y1), int(x2), int(y2)]
})
return persons
except Exception as e:
print(f"โŒ Person detection error: {e}")
return []
def classify_weapon_type(self, class_name):
"""Classify weapon type from class name"""
class_name = class_name.lower()
# Knife/Blade keywords (expanded)
knife_keywords = ['knife', 'dao', 'blade', 'dagger', 'sword', 'machete', 'katana', 'cutter']
if any(keyword in class_name for keyword in knife_keywords):
return 'blade'
# Gun/Firearm keywords
gun_keywords = ['gun', 'pistol', 'rifle', 'firearm', 'revolver', 'shotgun', 'sรบng']
if any(keyword in class_name for keyword in gun_keywords):
return 'firearm'
# Other weapons
other_keywords = ['axe', 'hammer', 'club', 'bat']
if any(keyword in class_name for keyword in other_keywords):
return 'blunt_weapon'
# Check for numbered weapon classes
if 'weapon' in class_name:
try:
weapon_id = int(class_name.split('_')[-1])
if weapon_id in [0, 1]: # Assuming 0,1 are firearms
return 'firearm'
elif weapon_id in [2, 3]: # Assuming 2,3 are blades
return 'blade'
else:
return 'unknown_weapon'
except:
pass
return 'unknown_weapon'
def assess_weapon_threat(self, weapon_type, confidence):
"""Assess threat level of detected weapon"""
threat_levels = {
'firearm': 'critical',
'blade': 'high',
'blunt_weapon': 'medium',
'unknown_weapon': 'medium'
}
base_threat = threat_levels.get(weapon_type, 'medium')
# Adjust based on confidence
if confidence >= 0.9:
if base_threat == 'medium':
return 'high'
elif base_threat == 'high':
return 'critical'
else:
return base_threat
elif confidence >= 0.7:
return base_threat
elif confidence >= 0.5:
if base_threat == 'critical':
return 'high'
elif base_threat == 'high':
return 'medium'
else:
return base_threat
else:
if base_threat == 'critical':
return 'medium'
elif base_threat == 'high':
return 'low'
else:
return 'low'
def remove_duplicate_detections(self, detections, iou_threshold=0.4):
"""Remove duplicate detections using Non-Maximum Suppression"""
if len(detections) <= 1:
return detections
# Sort by confidence (highest first)
detections = sorted(detections, key=lambda x: x['confidence'], reverse=True)
keep = []
for i, det1 in enumerate(detections):
should_keep = True
for det2 in keep:
# Check if same type and overlapping
if det1['type'] == det2['type']:
iou = self.calculate_iou(det1['bbox'], det2['bbox'])
if iou > iou_threshold:
should_keep = False
break
if should_keep:
keep.append(det1)
return keep
def calculate_iou(self, box1, box2):
"""Calculate Intersection over Union between two bounding boxes"""
x1_min, y1_min, x1_max, y1_max = box1
x2_min, y2_min, x2_max, y2_max = box2
# Calculate intersection
intersect_xmin = max(x1_min, x2_min)
intersect_ymin = max(y1_min, y2_min)
intersect_xmax = min(x1_max, x2_max)
intersect_ymax = min(y1_max, y2_max)
if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin:
return 0.0
intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin)
# Calculate union
box1_area = (x1_max - x1_min) * (y1_max - y1_min)
box2_area = (x2_max - x2_min) * (y2_max - y2_min)
union_area = box1_area + box2_area - intersect_area
return intersect_area / union_area if union_area > 0 else 0
# ... (continue with remaining NSFW detection methods) ...
def detect_nsfw_content(self, image):
"""Enhanced NSFW detection with person detection first"""
detections = []
try:
if len(image.shape) == 3 and image.shape[2] == 3:
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
else:
rgb_image = image
# Stage 1: Detect persons first (optimization)
persons = self.detect_persons(image)
if not persons:
# No persons detected, skip detailed NSFW analysis
return detections
print(f"๐Ÿ‘ค Found {len(persons)} person(s), analyzing for NSFW content...")
# Stage 2: Overall NSFW Classification
if self.nsfw_classifier:
try:
pil_image = Image.fromarray(rgb_image)
nsfw_result = self.nsfw_classifier(pil_image)
if nsfw_result[0]['label'] == 'nsfw':
confidence = nsfw_result[0]['score']
if confidence > self.config['nsfw_detection']['confidence_threshold']:
detections.append({
'type': 'nsfw',
'class': 'inappropriate_content',
'confidence': confidence,
'bbox': [0, 0, image.shape[1], image.shape[0]],
'method': 'classification'
})
except Exception as e:
print(f"โš ๏ธ NSFW classifier error: {e}")
# Stage 3: Person-specific skin analysis
if self.config['nsfw_detection']['skin_detection']:
for person in persons:
person_detections = self.analyze_person_skin(image, person)
detections.extend(person_detections)
# Stage 4: Regional skin analysis (if no person-specific detections)
if self.config['nsfw_detection']['region_analysis'] and len(detections) == 0:
skin_detections = self.detect_skin_regions(image)
detections.extend(skin_detections)
return detections
except Exception as e:
print(f"โŒ NSFW detection error: {e}")
return []
def analyze_person_skin(self, image, person):
"""Analyze skin exposure for a specific person"""
detections = []
try:
x1, y1, x2, y2 = person['bbox']
person_region = image[y1:y2, x1:x2]
if person_region.size == 0:
return detections
# Convert to HSV for skin detection
hsv_person = cv2.cvtColor(person_region, cv2.COLOR_BGR2HSV)
# Skin color range
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
# Create skin mask
skin_mask = cv2.inRange(hsv_person, lower_skin, upper_skin)
# Calculate skin percentage
total_person_pixels = person_region.shape[0] * person_region.shape[1]
skin_pixels = cv2.countNonZero(skin_mask)
skin_ratio = skin_pixels / total_person_pixels if total_person_pixels > 0 else 0
# Threshold for suspicious skin exposure
if skin_ratio > 0.4: # 40% of person region is skin
confidence = min(skin_ratio * 2, 1.0)
detections.append({
'type': 'nsfw',
'class': 'excessive_skin_exposure',
'confidence': confidence,
'bbox': [x1, y1, x2, y2],
'method': 'person_skin_analysis',
'skin_ratio': skin_ratio
})
print(f"๐Ÿšจ Excessive skin exposure detected: {skin_ratio:.2f} ratio")
return detections
except Exception as e:
print(f"โŒ Person skin analysis error: {e}")
return []
def detect_skin_regions(self, image):
"""Detect large skin-colored regions"""
try:
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# Define skin color range
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
# Create skin mask
skin_mask = cv2.inRange(hsv, lower_skin, upper_skin)
# Apply morphological operations
kernel = np.ones((3, 3), np.uint8)
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel)
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel)
# Find contours
contours, _ = cv2.findContours(skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
detections = []
image_area = image.shape[0] * image.shape[1]
for contour in contours:
area = cv2.contourArea(contour)
# If skin region is too large
if area > image_area * 0.3:
x, y, w, h = cv2.boundingRect(contour)
confidence = min(area / image_area, 1.0)
detections.append({
'type': 'nsfw',
'class': 'large_skin_region',
'confidence': confidence,
'bbox': [x, y, x + w, y + h],
'method': 'skin_detection'
})
return detections
except Exception as e:
print(f"โŒ Skin detection error: {e}")
return []
def setup_nsfw_detector(self):
"""Setup NSFW detection components (Optimized for CPU)"""
try:
print("๐Ÿ”ž Loading NSFW detection components...")
# 1. NSFW Classifier (Optimized for CPU)
try:
device_id = 0 if self.device == 'cuda' else -1
self.nsfw_classifier = pipeline(
"image-classification",
model="Falconsai/nsfw_image_detection",
device=device_id,
use_fast=True
)
print("โœ… NSFW classifier loaded")
except Exception as nsfw_error:
print(f"โš ๏ธ NSFW classifier failed: {nsfw_error}")
print(" Trying backup method...")
try:
# Fallback without specifying use_fast
self.nsfw_classifier = pipeline(
"image-classification",
model="Falconsai/nsfw_image_detection",
device=device_id
)
print("โœ… NSFW classifier loaded (fallback)")
except:
print("โŒ NSFW classifier completely failed")
self.nsfw_classifier = None
# 2. Pose Detection (Fixed import with fallbacks)
if self.config['nsfw_detection']['pose_analysis'] and MEDIAPIPE_AVAILABLE:
try:
import mediapipe as mp
try:
mp_pose = mp.solutions.pose
self.pose_detector = mp_pose.Pose(
static_image_mode=True,
model_complexity=0,
min_detection_confidence=0.5
)
print("โœ… Pose detector loaded (legacy API)")
except AttributeError:
print("โš ๏ธ MediaPipe API not available")
self.pose_detector = None
self.config['nsfw_detection']['pose_analysis'] = False
except Exception as pose_error:
print(f"โš ๏ธ Pose detection failed: {pose_error}")
self.pose_detector = None
self.config['nsfw_detection']['pose_analysis'] = False
else:
self.pose_detector = None
if not MEDIAPIPE_AVAILABLE:
print("โš ๏ธ MediaPipe not available - pose analysis disabled")
except Exception as e:
print(f"โŒ Error loading NSFW components: {e}")
print("๐Ÿ’ก Falling back to skin detection only")
def process_image(self, image_path):
"""Process single image with enhanced detection including fights"""
try:
# Load image
if isinstance(image_path, str):
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Could not load image: {image_path}")
cache_key = f"file_{image_path}"
else:
image = image_path
cache_key = f"array_{hash(image.tobytes())}"
# Check cache
import time
current_time = time.time()
if cache_key in self.detection_cache:
cached_result, timestamp = self.detection_cache[cache_key]
if current_time - timestamp < self.cache_ttl:
return cached_result
print(f"๐Ÿ“ธ Processing image: {image.shape}")
# Run detections
all_detections = []
# Weapon and fight detection
if self.config['weapon_detection']['enabled']:
weapon_fight_detections = self.detect_weapons(image)
all_detections.extend(weapon_fight_detections)
weapon_detections = [d for d in weapon_fight_detections if d['type'] == 'weapon']
fight_detections = [d for d in weapon_fight_detections if d['type'] == 'fight']
print(f"๐Ÿ”ซ Found {len(weapon_detections)} weapon(s)")
print(f"๐Ÿ‘Š Found {len(fight_detections)} fight(s)")
# Show detailed breakdown
if weapon_detections:
knife_detections = [d for d in weapon_detections if d['weapon_type'] == 'blade']
if knife_detections:
print(f" ๐Ÿ”ช Including {len(knife_detections)} knife/dao detection(s)")
if fight_detections:
for fight in fight_detections:
fight_type = fight.get('fight_type', 'unknown')
aggression = fight.get('aggression_level', 'unknown')
print(f" ๐Ÿ‘Š Fight: {fight_type} (aggression: {aggression})")
# NSFW detection
if self.config['nsfw_detection']['enabled']:
nsfw_detections = self.detect_nsfw_content(image)
all_detections.extend(nsfw_detections)
print(f"๐Ÿ”ž Found {len(nsfw_detections)} NSFW detection(s)")
# Generate result
result = {
'timestamp': datetime.now().isoformat(),
'image_path': image_path if isinstance(image_path, str) else 'array',
'detections': all_detections,
'total_threats': len(all_detections),
'risk_level': self.calculate_risk_level(all_detections),
'action_required': len(all_detections) > 0,
'processing_method': 'enhanced_dual_model_with_fight',
'detection_breakdown': {
'weapons': len([d for d in all_detections if d['type'] == 'weapon']),
'fights': len([d for d in all_detections if d['type'] == 'fight']),
'nsfw': len([d for d in all_detections if d['type'] == 'nsfw'])
}
}
# Cache result
self.detection_cache[cache_key] = (result, current_time)
# Clean old cache entries
self.clean_cache(current_time)
# Save detection history
self.detection_history.append(result)
# Draw detections
if self.config['output']['draw_boxes'] and all_detections:
annotated_image = self.draw_detections(image.copy(), all_detections)
result['annotated_image'] = annotated_image
return result
except Exception as e:
print(f"โŒ Error processing image: {e}")
return None
def clean_cache(self, current_time):
"""Clean expired cache entries"""
try:
expired_keys = []
for key, (_, timestamp) in self.detection_cache.items():
if current_time - timestamp > self.cache_ttl:
expired_keys.append(key)
for key in expired_keys:
del self.detection_cache[key]
except Exception as e:
print(f"โš ๏ธ Cache cleanup error: {e}")
def get_model_status(self):
"""Get status of all models (safe access to config keys)."""
# weapon_detection subtree
wd = self.config.get('weapon_detection', {})
# fight_detection may be a bool in weapon_detection (legacy) or a dict (detailed).
wd_fd = wd.get('fight_detection', None)
if isinstance(wd_fd, dict):
fight_enabled = wd_fd.get('enabled', True)
else:
fight_enabled = bool(wd_fd)
# fight analysis flag (either in weapon_detection or top-level fight_detection)
fight_analysis_flag = wd.get('fight_analysis', False) or \
bool(self.config.get('fight_detection', {}).get('multi_person_analysis', False))
status = {
'fight_detection': fight_enabled,
'custom_weapon_fight_model': self.weapon_model_custom is not None,
'general_model': self.weapon_model_general is not None,
'nsfw_classifier': self.nsfw_classifier is not None,
'pose_detector': self.pose_detector is not None,
'device': self.device,
'cache_size': len(self.detection_cache),
'knife_enhancement': wd.get('use_enhancement', False),
'knife_boost': wd.get('boost_knife_detection', False),
'fight_analysis': fight_analysis_flag
}
if self.weapon_model_custom and hasattr(self.weapon_model_custom, 'names'):
status['custom_classes'] = list(self.weapon_model_custom.names.values())
return status
def calculate_risk_level(self, detections):
"""Calculate overall risk level including fights"""
if not detections:
return 'safe'
max_confidence = max(det['confidence'] for det in detections)
threat_types = set(det['type'] for det in detections)
# Check for critical combinations
has_weapons = 'weapon' in threat_types
has_fights = 'fight' in threat_types
has_nsfw = 'nsfw' in threat_types
# Fights + weapons = critical
if has_weapons and has_fights:
return 'critical'
# High confidence fights are critical
fight_detections = [d for d in detections if d['type'] == 'fight']
if fight_detections:
max_fight_confidence = max(f['confidence'] for f in fight_detections)
if max_fight_confidence > 0.8:
return 'critical'
elif max_fight_confidence > 0.65:
return 'high'
# Existing weapon logic
if has_weapons and max_confidence > 0.8:
return 'critical'
elif has_weapons or has_fights or max_confidence > 0.9:
return 'high'
elif max_confidence > 0.7:
return 'medium'
else:
return 'low'
def draw_detections(self, image, detections):
"""Draw detection boxes and labels with enhanced visualization for fights"""
try:
colors = {
'weapon': (0, 0, 255), # Red
'fight': (0, 165, 255), # Orange for fights
'nsfw': (255, 0, 255), # Magenta
}
# Special colors for weapon types
weapon_colors = {
'blade': (0, 100, 255), # Orange-red for knives
'firearm': (0, 0, 255), # Red for guns
'blunt_weapon': (100, 0, 255) # Purple for blunt weapons
}
# Special colors for fight types
fight_colors = {
'physical_combat': (0, 140, 255), # Orange
'martial_arts': (0, 200, 255), # Light orange
'wrestling': (0, 165, 255), # Medium orange
'group_violence': (0, 69, 255), # Dark orange
'general_fight': (0, 165, 255) # Default orange
}
for det in detections:
x1, y1, x2, y2 = det['bbox']
# Choose color based on type
if det['type'] == 'weapon' and 'weapon_type' in det:
color = weapon_colors.get(det['weapon_type'], colors['weapon'])
elif det['type'] == 'fight' and 'fight_type' in det:
color = fight_colors.get(det['fight_type'], colors['fight'])
else:
color = colors.get(det['type'], (0, 255, 0))
# Draw rectangle with thicker line for high-threat detections
thickness = 4 if det.get('threat_level') == 'critical' else 3 if det['type'] in ['weapon',
'fight'] else 2
cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness)
# Create detailed label
if det['type'] == 'weapon':
label = f"{det['class']} ({det['confidence']:.2f})"
if 'threat_level' in det:
label += f" [{det['threat_level']}]"
elif det['type'] == 'fight':
label = f"FIGHT: {det['class']} ({det['confidence']:.2f})"
if 'threat_level' in det:
label += f" [{det['threat_level']}]"
if 'aggression_level' in det:
label += f" {det['aggression_level']}"
else:
label = f"{det['type']}: {det['class']} ({det['confidence']:.2f})"
# Draw label background
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
cv2.rectangle(image, (x1, y1 - 25), (x1 + label_size[0] + 5, y1), color, -1)
# Draw label text
cv2.putText(image, label, (x1 + 2, y1 - 7),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
# Add additional context for fights
if det['type'] == 'fight':
context_text = []
if 'people_involved' in det and det['people_involved'] > 0:
context_text.append(f"People: {det['people_involved']}")
if 'context_flags' in det and det['context_flags']:
context_text.append(f"Flags: {', '.join(det['context_flags'])}")
if context_text:
context_label = " | ".join(context_text)
cv2.putText(image, context_label, (x1, y2 + 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1)
# Add detection method indicator (small text)
if 'detection_method' in det:
method = det['detection_method'].split('_')[-1]
cv2.putText(image, method, (x1, y2 + 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1)
return image
except Exception as e:
print(f"โŒ Error drawing detections: {e}")
return image
def process_video(self, video_path, output_path=None, frame_skip=2):
"""Process video file with enhanced detection including fights - optimized frame processing"""
try:
cap = cv2.VideoCapture(video_path)
frame_count = 0
total_detections = []
fight_timeline = [] # Track fights over time
recent_detections = [] # Track recent detections for adaptive processing
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Adaptive frame processing based on recent detections
should_process = False
# Always process if recent threats detected (within last 10 frames)
if any(det['frame'] > frame_count - 10 for det in recent_detections[-5:]):
should_process = True
# Or process based on reduced skip rate
elif frame_count % max(1, frame_skip) == 0:
should_process = True
if not should_process:
if output_path:
out.write(frame)
continue
# Process frame
result = self.process_image(frame)
if result and result['detections']:
# Add frame number to each detection for tracking
for detection in result['detections']:
detection['frame'] = frame_count
total_detections.extend(result['detections'])
recent_detections.append({'frame': frame_count, 'count': len(result['detections'])})
# Track fight timeline
fight_detections = [d for d in result['detections'] if d['type'] == 'fight']
if fight_detections:
timestamp = frame_count / cap.get(cv2.CAP_PROP_FPS)
fight_timeline.append({
'timestamp': timestamp,
'frame': frame_count,
'fights': len(fight_detections),
'max_aggression': max(f.get('aggression_level', 'low') for f in fight_detections)
})
# Reduce frame_skip temporarily when fight detected
frame_skip = max(1, frame_skip // 2)
print(f"โš ๏ธ Frame {frame_count}: {len(result['detections'])} threats detected")
breakdown = result.get('detection_breakdown', {})
if breakdown.get('fights', 0) > 0:
print(f" ๐Ÿ‘Š Fights: {breakdown['fights']}")
if output_path and 'annotated_image' in result:
out.write(result['annotated_image'])
elif output_path:
out.write(frame)
else:
# No detections - can increase frame_skip for efficiency
if len(recent_detections) > 5 and all(det['count'] == 0 for det in recent_detections[-5:]):
frame_skip = min(5, frame_skip + 1)
if output_path:
out.write(frame)
cap.release()
if output_path:
out.release()
# Analysis of fight patterns
fight_analysis = {}
if fight_timeline:
fight_analysis = {
'total_fight_incidents': len(fight_timeline),
'first_fight_time': fight_timeline[0]['timestamp'],
'last_fight_time': fight_timeline[-1]['timestamp'],
'peak_aggression_time': max(fight_timeline, key=lambda x: x['max_aggression'])['timestamp'],
'fight_duration_coverage': fight_timeline[-1]['timestamp'] - fight_timeline[0]['timestamp'] if len(
fight_timeline) > 1 else 0
}
return {
'total_frames_processed': frame_count // frame_skip,
'total_detections': len(total_detections),
'detections': total_detections,
'fight_timeline': fight_timeline,
'fight_analysis': fight_analysis,
'detection_breakdown': {
'weapons': len([d for d in total_detections if d['type'] == 'weapon']),
'fights': len([d for d in total_detections if d['type'] == 'fight']),
'nsfw': len([d for d in total_detections if d['type'] == 'nsfw'])
}
}
except Exception as e:
print(f"โŒ Error processing video: {e}")
return None
def save_report(self, filename="detection_report.json"):
"""Save detection history to file"""
try:
with open(filename, 'w') as f:
json.dump(self.detection_history, f, indent=2, default=str)
print(f"๐Ÿ“Š Report saved to {filename}")
except Exception as e:
print(f"โŒ Error saving report: {e}")
def get_memory_usage(self):
"""Get current GPU memory usage"""
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024 ** 3
cached = torch.cuda.memory_reserved() / 1024 ** 3
return f"GPU Memory: {allocated:.2f}GB allocated, {cached:.2f}GB cached"
return "CPU mode"
def main():
"""Enhanced example usage with knife and fight detection improvements"""
# Initialize the system
moderator = ContentModerator()
# Show enhanced system information
print("\n" + "=" * 60)
print("๐ŸŽฏ ENHANCED DUAL MODEL SYSTEM WITH FIGHT DETECTION")
print("=" * 60)
status = moderator.get_model_status()
if status['custom_weapon_fight_model']:
print("โœ… Custom YOLO11 Model (dao + sรบng + fight): LOADED")
if 'custom_classes' in status:
print(f"๐Ÿ“Š Custom classes: {status['custom_classes']}")
else:
print("โŒ Custom weapon+fight model: NOT FOUND")
if status['general_model']:
print("โœ… General YOLO11n Model (person detection): LOADED")
else:
print("โŒ General model: FAILED")
if status['nsfw_classifier']:
print("โœ… NSFW Classifier: LOADED")
else:
print("โŒ NSFW Classifier: FAILED")
print(f"๐Ÿ–ฅ๏ธ Device: {status['device']}")
print(f"๐Ÿ—„๏ธ Cache system: ENABLED")
print(f"๐Ÿ”ช Knife enhancement: {'ENABLED' if status['knife_enhancement'] else 'DISABLED'}")
print(f"๐Ÿ“ˆ Knife confidence boost: {'ENABLED' if status['knife_boost'] else 'DISABLED'}")
print(f"๐Ÿ‘Š Fight detection: {'ENABLED' if status['fight_detection'] else 'DISABLED'}")
print(f"๐Ÿง  Fight analysis: {'ENABLED' if status['fight_analysis'] else 'DISABLED'}")
# Enhanced features info
print("\n" + "=" * 60)
print("โœจ ENHANCED DETECTION FEATURES")
print("=" * 60)
print("๐Ÿ”ง Image Enhancement:")
print(" - Contrast & brightness optimization")
print(" - Edge sharpening for metallic objects")
print(" - CLAHE for local contrast")
print("๐Ÿ“Š Confidence Boosting:")
print(" - Geometric analysis (knives)")
print(" - Motion blur analysis (fights)")
print(" - Edge strength analysis")
print("๐ŸŽฏ Multi-pass Detection:")
print(" - Low threshold pass for knives (0.45)")
print(" - Normal threshold for guns (0.45)")
print(" - Low threshold for fights (0.40)")
print("๐Ÿ‘Š Fight Analysis:")
print(" - Multi-person fight detection")
print(" - Aggression level assessment")
print(" - Context-aware threat escalation")
# Example 1: Process single image
print("\n" + "=" * 50)
print("๐Ÿ–ผ๏ธ SINGLE IMAGE PROCESSING")
print("=" * 50)
test_image = "test_image.jpg"
if os.path.exists(test_image):
result = moderator.process_image(test_image)
if result:
print(f"\n๐Ÿ“Š DETECTION RESULTS:")
print(f"Risk Level: {result['risk_level']}")
print(f"Total Threats: {result['total_threats']}")
print(f"Processing Method: {result.get('processing_method', 'standard')}")
breakdown = result.get('detection_breakdown', {})
if breakdown:
print(f"\n๐Ÿ“ˆ BREAKDOWN:")
print(f" Weapons: {breakdown.get('weapons', 0)}")
print(f" Fights: {breakdown.get('fights', 0)}")
print(f" NSFW: {breakdown.get('nsfw', 0)}")
# Show weapon-specific results
weapon_detections = [d for d in result['detections'] if d['type'] == 'weapon']
if weapon_detections:
print(f"\n๐Ÿ”ซ WEAPON DETECTIONS: {len(weapon_detections)}")
for i, detection in enumerate(weapon_detections):
method = detection.get('detection_method', 'unknown')
print(f" Weapon {i + 1} ({method}):")
print(f" Class: {detection['class']}")
print(f" Type: {detection['weapon_type']}")
print(f" Confidence: {detection['confidence']:.3f}")
print(f" Threat Level: {detection['threat_level']}")
# Show fight-specific results
fight_detections = [d for d in result['detections'] if d['type'] == 'fight']
if fight_detections:
print(f"\n๐Ÿ‘Š FIGHT DETECTIONS: {len(fight_detections)}")
for i, detection in enumerate(fight_detections):
method = detection.get('detection_method', 'unknown')
print(f" Fight {i + 1} ({method}):")
print(f" Class: {detection['class']}")
print(f" Type: {detection.get('fight_type', 'unknown')}")
print(f" Confidence: {detection['confidence']:.3f}")
print(f" Threat Level: {detection['threat_level']}")
print(f" Aggression: {detection.get('aggression_level', 'unknown')}")
if 'people_involved' in detection:
print(f" People Involved: {detection['people_involved']}")
if 'context_flags' in detection and detection['context_flags']:
print(f" Context: {', '.join(detection['context_flags'])}")
# Show NSFW results
nsfw_detections = [d for d in result['detections'] if d['type'] == 'nsfw']
if nsfw_detections:
print(f"\n๐Ÿ”ž NSFW DETECTIONS: {len(nsfw_detections)}")
for i, detection in enumerate(nsfw_detections):
method = detection.get('method', 'unknown')
print(f" NSFW {i + 1} ({method}):")
print(f" Class: {detection['class']}")
print(f" Confidence: {detection['confidence']:.3f}")
if 'skin_ratio' in detection:
print(f" Skin Ratio: {detection['skin_ratio']:.2f}")
else:
print(f"โš ๏ธ Test image not found: {test_image}")
print("Creating a test pattern to demonstrate detection...")
# Create a synthetic test image
test_img = np.ones((640, 640, 3), dtype=np.uint8) * 128
cv2.putText(test_img, "Test Pattern", (200, 320),
cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3)
result = moderator.process_image(test_img)
print("โœ… Test pattern processed successfully")
# Example 2: Enhanced webcam processing with fight detection
print("\n" + "=" * 60)
print("๐Ÿ“น ENHANCED WEBCAM PROCESSING WITH FIGHT DETECTION")
print("=" * 60)
print("Starting enhanced detection on webcam...")
print("๐ŸŽฎ Controls:")
print(" - Press 'q' to quit")
print(" - Press 's' to save frame")
print(" - Press 'i' to show model info")
print(" - Press 'e' to toggle enhancement")
print(" - Press 'b' to toggle knife confidence boost")
print(" - Press 'f' to toggle fight analysis")
print(" - Press 'h' for help")
try:
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("โŒ Cannot open webcam. Check if camera is connected.")
else:
print("โœ… Enhanced webcam processing started")
frame_count = 0
detection_stats = {
'weapons': 0,
'knives': 0,
'guns': 0,
'fights': 0,
'nsfw': 0,
'total_frames': 0,
'fight_incidents': 0
}
# Adaptive processing variables
process_interval = 2 # Start with every 2nd frame
last_detection_frame = 0
consecutive_safe_frames = 0
while True:
ret, frame = cap.read()
if not ret:
print("โŒ Cannot read from webcam")
break
frame_count += 1
detection_stats['total_frames'] = frame_count
frame = cv2.flip(frame, 1)
# Add status overlay
y_offset = frame.shape[0] - 120
cv2.putText(frame,
f"Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
cv2.putText(frame,
f"Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}",
(10, y_offset + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
cv2.putText(frame,
f"Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}",
(10, y_offset + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
model_info = "Models: Custom+General" if moderator.weapon_model_custom else "General Only"
cv2.putText(frame, model_info, (10, y_offset + 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# Adaptive frame processing - process more frequently when threats detected
should_process = False
# Always process if recent threats (within last 30 frames)
if frame_count - last_detection_frame <= 30:
should_process = (frame_count % 1 == 0) # Process every frame
cv2.putText(frame, "HIGH ALERT MODE", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
# Normal processing with reduced interval
elif frame_count % process_interval == 0:
should_process = True
if should_process:
result = moderator.process_image(frame)
if result and result['action_required']:
last_detection_frame = frame_count # Update last detection frame
consecutive_safe_frames = 0
process_interval = 1 # Process every frame when threats detected
# Count detections by type
for detection in result['detections']:
if detection['type'] == 'weapon':
detection_stats['weapons'] += 1
if detection['weapon_type'] == 'blade':
detection_stats['knives'] += 1
elif detection['weapon_type'] == 'firearm':
detection_stats['guns'] += 1
elif detection['type'] == 'fight':
detection_stats['fights'] += 1
if detection.get('aggression_level') in ['high', 'extreme']:
detection_stats['fight_incidents'] += 1
elif detection['type'] == 'nsfw':
detection_stats['nsfw'] += 1
print(
f"โš ๏ธ Frame {frame_count}: {result['risk_level']} risk - {result['total_threats']} threats!")
# Show specific detections with fight info
for detection in result['detections']:
if detection['type'] == 'weapon':
icon = "๐Ÿ”ช" if detection['weapon_type'] == 'blade' else "๐Ÿ”ซ"
method = detection.get('detection_method', 'unknown').split('_')[-1]
print(f" {icon} {detection['class']} ({detection['confidence']:.3f}) [{method}]")
elif detection['type'] == 'fight':
fight_type = detection.get('fight_type', 'general')
aggression = detection.get('aggression_level', 'unknown')
people = detection.get('people_involved', 0)
method = detection.get('detection_method', 'unknown').split('_')[-1]
print(f" ๐Ÿ‘Š FIGHT: {fight_type} ({detection['confidence']:.3f}) [{method}]")
print(f" Aggression: {aggression}, People: {people}")
# Use annotated frame
if 'annotated_image' in result:
cv2.imshow('Enhanced Detection System (Weapons + Fights)', result['annotated_image'])
else:
# Add threat counter
breakdown = result.get('detection_breakdown', {})
threat_text = f"THREATS: W:{breakdown.get('weapons', 0)} F:{breakdown.get('fights', 0)} N:{breakdown.get('nsfw', 0)}"
cv2.putText(frame, threat_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame)
else:
consecutive_safe_frames += 1
# Gradually increase processing interval when safe (up to max 3)
if consecutive_safe_frames > 30:
process_interval = min(3, process_interval + 1)
consecutive_safe_frames = 0
cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame)
else:
cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame)
# Handle key presses
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
print("๐Ÿ›‘ Webcam stopped by user")
break
elif key == ord('s'):
filename = f"enhanced_detection_{frame_count}.jpg"
cv2.imwrite(filename, frame)
print(f"๐Ÿ’พ Frame saved as {filename}")
elif key == ord('i'):
print(f"\n๐Ÿ“Š Model Status:")
current_status = moderator.get_model_status()
for k, v in current_status.items():
print(f" {k}: {v}")
elif key == ord('e'):
# Toggle enhancement
moderator.config['weapon_detection']['use_enhancement'] = \
not moderator.config['weapon_detection']['use_enhancement']
print(
f"๐Ÿ”ง Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}")
elif key == ord('b'):
# Toggle knife boost
moderator.config['weapon_detection']['boost_knife_detection'] = \
not moderator.config['weapon_detection']['boost_knife_detection']
print(
f"๐Ÿ“ˆ Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}")
elif key == ord('f'):
# Toggle fight analysis
moderator.config['weapon_detection']['fight_analysis'] = \
not moderator.config['weapon_detection']['fight_analysis']
print(
f"๐Ÿ‘Š Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}")
elif key == ord('h'):
print("\n๐ŸŽฎ Controls:")
print(" 'q': quit")
print(" 's': save frame")
print(" 'i': model info")
print(" 'e': toggle enhancement")
print(" 'b': toggle knife confidence boost")
print(" 'f': toggle fight analysis")
print(" 'h': help")
# Show comprehensive session statistics
print(f"\n๐Ÿ“ˆ Session Statistics:")
print(f" Total frames: {detection_stats['total_frames']}")
print(f" Total weapon detections: {detection_stats['weapons']}")
print(f" - Knives/Dao: {detection_stats['knives']}")
print(f" - Guns: {detection_stats['guns']}")
print(f" Total fight detections: {detection_stats['fights']}")
print(f" - High-aggression incidents: {detection_stats['fight_incidents']}")
print(f" NSFW detections: {detection_stats['nsfw']}")
if detection_stats['total_frames'] > 0:
total_detections = detection_stats['weapons'] + detection_stats['fights'] + detection_stats['nsfw']
detection_rate = (total_detections / detection_stats['total_frames'] * 100)
print(f" Overall detection rate: {detection_rate:.1f}%")
if detection_stats['weapons'] > 0:
knife_ratio = detection_stats['knives'] / detection_stats['weapons'] * 100
print(f" Knife detection ratio: {knife_ratio:.1f}% of weapons")
if detection_stats['fights'] > 0:
incident_ratio = detection_stats['fight_incidents'] / detection_stats['fights'] * 100
print(f" High-aggression fight ratio: {incident_ratio:.1f}% of fights")
cap.release()
cv2.destroyAllWindows()
print("โœ… Enhanced webcam session completed")
except Exception as e:
print(f"โŒ Webcam error: {e}")
# Show final system status
print(f"\n๐Ÿ’พ {moderator.get_memory_usage()}")
print(f"๐Ÿ—„๏ธ Final cache size: {len(moderator.detection_cache)} entries")
# Save enhanced report
moderator.save_report("enhanced_detection_with_fights_report.json")
print("\nโœ… Enhanced Content Moderation System with Fight Detection completed!")
print("๐Ÿ’ก New fight detection capabilities:")
print(" - Behavioral fight pattern recognition")
print(" - Multi-person fight analysis")
print(" - Aggression level assessment")
print(" - Context-aware threat escalation")
print(" - Fight timeline tracking for videos")
print("๐Ÿ’ก Enhanced weapon detection:")
print(" - Image enhancement preprocessing")
print(" - Dynamic confidence thresholds")
print(" - Geometric feature analysis")
print(" - Multi-pass detection strategy")
if __name__ == "__main__":
main()