|
|
import cv2
|
|
|
import numpy as np
|
|
|
import torch
|
|
|
import os
|
|
|
import json
|
|
|
import warnings
|
|
|
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
|
|
|
try:
|
|
|
from ultralytics import YOLO
|
|
|
from transformers import pipeline
|
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
import requests
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
try:
|
|
|
import mediapipe as mp
|
|
|
|
|
|
MEDIAPIPE_AVAILABLE = True
|
|
|
print("โ
MediaPipe imported successfully")
|
|
|
except ImportError:
|
|
|
MEDIAPIPE_AVAILABLE = False
|
|
|
print("โ ๏ธ MediaPipe not available - pose detection disabled")
|
|
|
except Exception as e:
|
|
|
MEDIAPIPE_AVAILABLE = False
|
|
|
print(f"โ ๏ธ MediaPipe import error: {e} - pose detection disabled")
|
|
|
|
|
|
except ImportError as e:
|
|
|
print(f"Missing dependency: {e}")
|
|
|
print("Please install: pip install ultralytics transformers pillow requests")
|
|
|
print("For MediaPipe: pip install mediapipe==0.10.18")
|
|
|
|
|
|
|
|
|
class ContentModerator:
|
|
|
def __init__(self, config=None):
|
|
|
default_cfg = self.get_default_config()
|
|
|
self.config = self.deep_merge_dicts(default_cfg, config) if config else default_cfg
|
|
|
|
|
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
|
|
|
|
|
|
|
if self.device == 'cpu':
|
|
|
print("๐ป CPU mode detected - applying optimizations...")
|
|
|
torch.set_num_threads(4)
|
|
|
self.config['performance']['half_precision'] = False
|
|
|
self.config['nsfw_detection']['pose_analysis'] = False
|
|
|
|
|
|
|
|
|
self.weapon_model = None
|
|
|
self.weapon_model_custom = None
|
|
|
self.weapon_model_general = None
|
|
|
self.nsfw_classifier = None
|
|
|
self.pose_detector = None
|
|
|
|
|
|
|
|
|
self.detection_cache = {}
|
|
|
self.cache_ttl = 2
|
|
|
|
|
|
|
|
|
self.detection_history = []
|
|
|
|
|
|
print(f"๐ Content Moderator initialized on {self.device}")
|
|
|
if self.device == 'cpu':
|
|
|
print("โก CPU optimizations enabled")
|
|
|
|
|
|
self.setup_models()
|
|
|
|
|
|
def deep_merge_dicts(self, a: dict, b: dict) -> dict:
|
|
|
"""Merge dict b into a (non-destructive). Returns merged copy.
|
|
|
- Keeps keys from a when missing in b.
|
|
|
- Recursively merges nested dicts.
|
|
|
"""
|
|
|
if not isinstance(a, dict):
|
|
|
return b if isinstance(b, dict) else a
|
|
|
result = dict(a)
|
|
|
if not b:
|
|
|
return result
|
|
|
for k, v in b.items():
|
|
|
if k in result and isinstance(result[k], dict) and isinstance(v, dict):
|
|
|
result[k] = self.deep_merge_dicts(result[k], v)
|
|
|
else:
|
|
|
result[k] = v
|
|
|
return result
|
|
|
|
|
|
def get_default_config(self):
|
|
|
"""Default configuration optimized for CPU/GPU with enhanced knife and fight detection"""
|
|
|
|
|
|
is_cuda = torch.cuda.is_available()
|
|
|
|
|
|
return {
|
|
|
'weapon_detection': {
|
|
|
'enabled': True,
|
|
|
'confidence_threshold': 0.45,
|
|
|
'knife_confidence': 0.45,
|
|
|
'fight_confidence': 0.40,
|
|
|
'model_size': 'yolo11n',
|
|
|
'classes': ['knife', 'gun', 'rifle', 'pistol', 'weapon', 'fight'],
|
|
|
'use_enhancement': True,
|
|
|
'multi_pass': True,
|
|
|
'boost_knife_detection': True,
|
|
|
'fight_detection': True,
|
|
|
'fight_analysis': True
|
|
|
},
|
|
|
'fight_detection': {
|
|
|
'enabled': True,
|
|
|
'confidence_threshold': 0.40,
|
|
|
'pose_analysis': True,
|
|
|
'motion_analysis': False,
|
|
|
'aggression_keywords': ['fight', 'violence', 'aggression', 'combat'],
|
|
|
'threat_escalation': True,
|
|
|
'multi_person_analysis': True
|
|
|
},
|
|
|
'nsfw_detection': {
|
|
|
'enabled': True,
|
|
|
'confidence_threshold': 0.7,
|
|
|
'skin_detection': True,
|
|
|
'pose_analysis': False,
|
|
|
'region_analysis': True
|
|
|
},
|
|
|
'performance': {
|
|
|
'image_size': 416 if is_cuda else 320,
|
|
|
'batch_size': 1,
|
|
|
'half_precision': is_cuda,
|
|
|
'use_flash_attention': False,
|
|
|
'cpu_optimization': not is_cuda
|
|
|
},
|
|
|
'output': {
|
|
|
'save_detections': True,
|
|
|
'draw_boxes': True,
|
|
|
'log_results': True
|
|
|
}
|
|
|
}
|
|
|
|
|
|
def setup_models(self):
|
|
|
"""Initialize all detection models"""
|
|
|
try:
|
|
|
|
|
|
if torch.cuda.is_available():
|
|
|
torch.cuda.empty_cache()
|
|
|
|
|
|
|
|
|
if self.config['weapon_detection']['enabled']:
|
|
|
self.setup_weapon_detector()
|
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['enabled']:
|
|
|
self.setup_nsfw_detector()
|
|
|
|
|
|
print("โ
All models loaded successfully!")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Error setting up models: {e}")
|
|
|
|
|
|
def setup_weapon_detector(self):
|
|
|
"""Setup dual model system: Custom for weapons + fights + General for person detection"""
|
|
|
try:
|
|
|
print("๐ซ Loading weapon and fight detection models...")
|
|
|
|
|
|
|
|
|
custom_model_path = "models/best.pt"
|
|
|
project_root = os.path.dirname(os.path.abspath(__file__))
|
|
|
full_model_path = os.path.join(project_root, custom_model_path)
|
|
|
|
|
|
if os.path.exists(full_model_path):
|
|
|
print(f"โ
Loading custom weapon+fight model: {full_model_path}")
|
|
|
self.weapon_model_custom = YOLO(full_model_path)
|
|
|
print("๐ฏ Custom weapon+fight model (dao + sรบng + fight) loaded!")
|
|
|
|
|
|
|
|
|
if hasattr(self.weapon_model_custom, 'names'):
|
|
|
classes = list(self.weapon_model_custom.names.values())
|
|
|
print(f"๐ Custom classes: {classes}")
|
|
|
|
|
|
|
|
|
if any('fight' in str(cls).lower() for cls in classes):
|
|
|
print("๐ Fight detection enabled in custom model")
|
|
|
else:
|
|
|
print("โ ๏ธ Fight class not found in custom model")
|
|
|
else:
|
|
|
print("โ ๏ธ Custom weapon+fight model not found")
|
|
|
self.weapon_model_custom = None
|
|
|
|
|
|
|
|
|
print("๐ค Loading general model for person detection...")
|
|
|
self.weapon_model_general = YOLO('yolo11n.pt')
|
|
|
print("โ
General YOLO11n loaded for person detection")
|
|
|
|
|
|
|
|
|
self.weapon_model = self.weapon_model_custom if self.weapon_model_custom else self.weapon_model_general
|
|
|
|
|
|
|
|
|
if self.device == 'cuda' and self.config['performance']['half_precision']:
|
|
|
try:
|
|
|
if self.weapon_model_custom:
|
|
|
self.weapon_model_custom.model.half()
|
|
|
self.weapon_model_general.model.half()
|
|
|
print("โ
Half precision enabled for both models")
|
|
|
except:
|
|
|
print("โ ๏ธ Half precision not supported")
|
|
|
|
|
|
print("๐ฅ Dual model system ready with fight detection!")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Error loading weapon+fight models: {e}")
|
|
|
self.weapon_model = None
|
|
|
self.weapon_model_custom = None
|
|
|
self.weapon_model_general = None
|
|
|
|
|
|
def detect_weapons(self, image):
|
|
|
"""Enhanced dual model weapon and fight detection"""
|
|
|
detections = []
|
|
|
|
|
|
try:
|
|
|
imgsz = self.config['performance']['image_size']
|
|
|
use_half = self.config['performance']['half_precision'] and self.device == 'cuda'
|
|
|
|
|
|
|
|
|
images_to_process = [(image, 1.0, "original")]
|
|
|
|
|
|
if self.config['weapon_detection']['use_enhancement']:
|
|
|
enhanced_image = self.enhance_knife_detection(image)
|
|
|
images_to_process.append((enhanced_image, 1.15, "enhanced"))
|
|
|
|
|
|
|
|
|
for img, weight_multiplier, img_type in images_to_process:
|
|
|
if self.weapon_model_custom:
|
|
|
|
|
|
knife_conf = self.config['weapon_detection']['knife_confidence']
|
|
|
gun_conf = self.config['weapon_detection']['confidence_threshold']
|
|
|
fight_conf = self.config['weapon_detection']['fight_confidence']
|
|
|
|
|
|
|
|
|
passes = [
|
|
|
(knife_conf, "knife_pass"),
|
|
|
(gun_conf, "gun_pass"),
|
|
|
(fight_conf, "fight_pass")
|
|
|
] if self.config['weapon_detection']['multi_pass'] else [
|
|
|
(min(knife_conf, fight_conf), "single_pass")]
|
|
|
|
|
|
for conf_threshold, pass_type in passes:
|
|
|
try:
|
|
|
results = self.weapon_model_custom(
|
|
|
img,
|
|
|
imgsz=imgsz,
|
|
|
conf=conf_threshold,
|
|
|
device=self.device,
|
|
|
half=use_half,
|
|
|
verbose=False,
|
|
|
augment=True
|
|
|
)
|
|
|
|
|
|
for result in results:
|
|
|
boxes = result.boxes
|
|
|
if boxes is not None:
|
|
|
for box in boxes:
|
|
|
class_id = int(box.cls[0])
|
|
|
|
|
|
if hasattr(result, 'names') and class_id in result.names:
|
|
|
class_name = result.names[class_id].lower()
|
|
|
else:
|
|
|
class_name = f"detection_{class_id}"
|
|
|
|
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
|
confidence = float(box.conf[0]) * weight_multiplier
|
|
|
|
|
|
|
|
|
if self.is_fight_detection(class_name):
|
|
|
|
|
|
confidence = self.boost_fight_confidence(
|
|
|
img, [x1, y1, x2, y2], confidence, class_name
|
|
|
)
|
|
|
|
|
|
detection_type = 'fight'
|
|
|
min_conf = fight_conf
|
|
|
threat_level = self.assess_fight_threat(confidence, img, [x1, y1, x2, y2])
|
|
|
|
|
|
else:
|
|
|
|
|
|
if self.config['weapon_detection']['boost_knife_detection']:
|
|
|
if 'dao' in class_name or 'knife' in class_name or 'blade' in class_name:
|
|
|
confidence = self.boost_knife_confidence(
|
|
|
img, [x1, y1, x2, y2], confidence, class_name
|
|
|
)
|
|
|
|
|
|
detection_type = 'weapon'
|
|
|
weapon_type = self.classify_weapon_type(class_name)
|
|
|
min_conf = knife_conf if weapon_type == 'blade' else gun_conf
|
|
|
threat_level = self.assess_weapon_threat(weapon_type, confidence)
|
|
|
|
|
|
if confidence >= min_conf:
|
|
|
detection_data = {
|
|
|
'type': detection_type,
|
|
|
'class': class_name,
|
|
|
'confidence': min(confidence, 0.99),
|
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)],
|
|
|
'threat_level': threat_level,
|
|
|
'detection_method': f'custom_model_{img_type}_{pass_type}'
|
|
|
}
|
|
|
|
|
|
|
|
|
if detection_type == 'weapon':
|
|
|
detection_data['weapon_type'] = weapon_type
|
|
|
elif detection_type == 'fight':
|
|
|
detection_data['fight_type'] = self.classify_fight_type(class_name)
|
|
|
detection_data['aggression_level'] = self.assess_aggression_level(
|
|
|
confidence)
|
|
|
|
|
|
detections.append(detection_data)
|
|
|
|
|
|
icon = "๐" if detection_type == 'fight' else "๐ฏ"
|
|
|
print(
|
|
|
f" {icon} Detected: {class_name} (conf: {confidence:.3f}, method: {img_type}_{pass_type})")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Detection pass error ({pass_type}): {e}")
|
|
|
|
|
|
|
|
|
if self.weapon_model_general and len(detections) == 0:
|
|
|
detections.extend(self.fallback_detection(image, imgsz, use_half))
|
|
|
|
|
|
|
|
|
detections = self.remove_duplicate_detections(detections)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
wd = self.config.get('weapon_detection', {})
|
|
|
wd_fd = wd.get('fight_detection', None)
|
|
|
|
|
|
|
|
|
if isinstance(wd_fd, dict):
|
|
|
fight_enabled = wd_fd.get('enabled', False)
|
|
|
fight_multi_person = wd_fd.get('multi_person_analysis', False)
|
|
|
else:
|
|
|
|
|
|
fight_enabled = bool(wd_fd)
|
|
|
fight_multi_person = bool(
|
|
|
self.config.get('fight_detection', {}).get('multi_person_analysis', False))
|
|
|
|
|
|
if fight_enabled and fight_multi_person:
|
|
|
fight_detections = [d for d in detections if d.get('type') == 'fight']
|
|
|
if fight_detections:
|
|
|
try:
|
|
|
enhanced_fights = self.analyze_fight_context(image, fight_detections)
|
|
|
|
|
|
detections = [d for d in detections if d.get('type') != 'fight'] + enhanced_fights
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Fight context enhancement error: {e}")
|
|
|
except Exception as e:
|
|
|
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
print(f"โ ๏ธ Skipping fight context analysis due to config error: {e}")
|
|
|
|
|
|
return detections
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Weapon and fight detection error: {e}")
|
|
|
return []
|
|
|
|
|
|
def is_fight_detection(self, class_name):
|
|
|
"""Check if detection is fight-related"""
|
|
|
fight_keywords = ['fight', 'fighting', 'combat', 'violence', 'aggression', 'brawl', 'scuffle']
|
|
|
return any(keyword in class_name.lower() for keyword in fight_keywords)
|
|
|
|
|
|
def classify_fight_type(self, class_name):
|
|
|
"""Classify type of fight detected"""
|
|
|
class_name = class_name.lower()
|
|
|
|
|
|
if any(word in class_name for word in ['punch', 'boxing', 'fist']):
|
|
|
return 'physical_combat'
|
|
|
elif any(word in class_name for word in ['kick', 'martial', 'karate']):
|
|
|
return 'martial_arts'
|
|
|
elif any(word in class_name for word in ['wrestle', 'grapple']):
|
|
|
return 'wrestling'
|
|
|
elif any(word in class_name for word in ['group', 'mob', 'crowd']):
|
|
|
return 'group_violence'
|
|
|
else:
|
|
|
return 'general_fight'
|
|
|
|
|
|
def boost_fight_confidence(self, image, bbox, initial_confidence, class_name):
|
|
|
"""Boost confidence for fight detection based on contextual analysis"""
|
|
|
try:
|
|
|
x1, y1, x2, y2 = [int(coord) for coord in bbox]
|
|
|
|
|
|
|
|
|
x1 = max(0, x1)
|
|
|
y1 = max(0, y1)
|
|
|
x2 = min(image.shape[1], x2)
|
|
|
y2 = min(image.shape[0], y2)
|
|
|
|
|
|
roi = image[y1:y2, x1:x2]
|
|
|
|
|
|
if roi.size == 0:
|
|
|
return initial_confidence
|
|
|
|
|
|
boost = 0
|
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
|
|
blur_variance = cv2.Laplacian(gray, cv2.CV_64F).var()
|
|
|
if blur_variance < 100:
|
|
|
boost += 0.10
|
|
|
|
|
|
|
|
|
edges = cv2.Canny(gray, 50, 150)
|
|
|
edge_density = np.count_nonzero(edges) / edges.size
|
|
|
if edge_density > 0.15:
|
|
|
boost += 0.08
|
|
|
|
|
|
|
|
|
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
|
|
|
color_variance = np.var(hsv[:, :, 1])
|
|
|
if color_variance > 1000:
|
|
|
boost += 0.05
|
|
|
|
|
|
|
|
|
gray_f = np.float32(gray)
|
|
|
texture_response = cv2.cornerHarris(gray_f, 2, 3, 0.04)
|
|
|
texture_strength = np.mean(texture_response)
|
|
|
if texture_strength > 0.01:
|
|
|
boost += 0.07
|
|
|
|
|
|
|
|
|
height = y2 - y1
|
|
|
width = x2 - x1
|
|
|
if height > 0 and width > 0:
|
|
|
aspect_ratio = max(width, height) / min(width, height)
|
|
|
if 1.2 < aspect_ratio < 3.0:
|
|
|
boost += 0.05
|
|
|
|
|
|
final_confidence = min(initial_confidence + boost, 0.95)
|
|
|
|
|
|
if boost > 0:
|
|
|
print(f" ๐ Fight boost applied: +{boost:.2f} (blur:{blur_variance:.0f}, edge:{edge_density:.2f})")
|
|
|
|
|
|
return final_confidence
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Fight confidence boost error: {e}")
|
|
|
return initial_confidence
|
|
|
|
|
|
def assess_fight_threat(self, confidence, image, bbox):
|
|
|
"""Assess threat level of detected fight"""
|
|
|
base_threat = 'medium'
|
|
|
|
|
|
|
|
|
if confidence >= 0.85:
|
|
|
base_threat = 'critical'
|
|
|
elif confidence >= 0.70:
|
|
|
base_threat = 'high'
|
|
|
elif confidence >= 0.50:
|
|
|
base_threat = 'medium'
|
|
|
else:
|
|
|
base_threat = 'low'
|
|
|
|
|
|
|
|
|
try:
|
|
|
x1, y1, x2, y2 = bbox
|
|
|
fight_area = (x2 - x1) * (y2 - y1)
|
|
|
image_area = image.shape[0] * image.shape[1]
|
|
|
area_ratio = fight_area / image_area
|
|
|
|
|
|
|
|
|
if area_ratio > 0.5:
|
|
|
if base_threat == 'medium':
|
|
|
base_threat = 'high'
|
|
|
elif base_threat == 'high':
|
|
|
base_threat = 'critical'
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Fight threat assessment error: {e}")
|
|
|
|
|
|
return base_threat
|
|
|
|
|
|
def assess_aggression_level(self, confidence):
|
|
|
"""Assess aggression level based on confidence"""
|
|
|
if confidence >= 0.80:
|
|
|
return 'extreme'
|
|
|
elif confidence >= 0.65:
|
|
|
return 'high'
|
|
|
elif confidence >= 0.45:
|
|
|
return 'moderate'
|
|
|
else:
|
|
|
return 'low'
|
|
|
|
|
|
def analyze_fight_context(self, image, fight_detections):
|
|
|
"""Enhanced analysis of fight context with multi-person detection"""
|
|
|
enhanced_fights = []
|
|
|
|
|
|
try:
|
|
|
|
|
|
persons = self.detect_persons(image)
|
|
|
|
|
|
for fight in fight_detections:
|
|
|
enhanced_fight = fight.copy()
|
|
|
|
|
|
|
|
|
fight_bbox = fight['bbox']
|
|
|
people_in_fight = 0
|
|
|
people_nearby = 0
|
|
|
|
|
|
for person in persons:
|
|
|
person_bbox = person['bbox']
|
|
|
|
|
|
|
|
|
overlap = self.calculate_bbox_overlap(fight_bbox, person_bbox)
|
|
|
|
|
|
if overlap > 0.3:
|
|
|
people_in_fight += 1
|
|
|
elif overlap > 0.1:
|
|
|
people_nearby += 1
|
|
|
|
|
|
|
|
|
enhanced_fight['people_involved'] = people_in_fight
|
|
|
enhanced_fight['people_nearby'] = people_nearby
|
|
|
enhanced_fight['total_people'] = people_in_fight + people_nearby
|
|
|
|
|
|
|
|
|
if people_in_fight >= 3:
|
|
|
if enhanced_fight['threat_level'] == 'medium':
|
|
|
enhanced_fight['threat_level'] = 'high'
|
|
|
elif enhanced_fight['threat_level'] == 'high':
|
|
|
enhanced_fight['threat_level'] = 'critical'
|
|
|
enhanced_fight['fight_type'] = 'group_violence'
|
|
|
|
|
|
|
|
|
enhanced_fight['context_flags'] = []
|
|
|
if people_in_fight >= 3:
|
|
|
enhanced_fight['context_flags'].append('multi_person_fight')
|
|
|
if people_nearby >= 2:
|
|
|
enhanced_fight['context_flags'].append('crowd_present')
|
|
|
|
|
|
enhanced_fights.append(enhanced_fight)
|
|
|
|
|
|
print(f" ๐ฅ Fight context: {people_in_fight} involved, {people_nearby} nearby")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Fight context analysis error: {e}")
|
|
|
return fight_detections
|
|
|
|
|
|
return enhanced_fights
|
|
|
|
|
|
def calculate_bbox_overlap(self, bbox1, bbox2):
|
|
|
"""Calculate overlap ratio between two bounding boxes"""
|
|
|
x1_min, y1_min, x1_max, y1_max = bbox1
|
|
|
x2_min, y2_min, x2_max, y2_max = bbox2
|
|
|
|
|
|
|
|
|
intersect_xmin = max(x1_min, x2_min)
|
|
|
intersect_ymin = max(y1_min, y2_min)
|
|
|
intersect_xmax = min(x1_max, x2_max)
|
|
|
intersect_ymax = min(y1_max, y2_max)
|
|
|
|
|
|
if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin:
|
|
|
return 0.0
|
|
|
|
|
|
intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin)
|
|
|
bbox1_area = (x1_max - x1_min) * (y1_max - y1_min)
|
|
|
|
|
|
return intersect_area / bbox1_area if bbox1_area > 0 else 0
|
|
|
|
|
|
def fallback_detection(self, image, imgsz, use_half):
|
|
|
"""Fallback detection using general model"""
|
|
|
detections = []
|
|
|
|
|
|
try:
|
|
|
general_results = self.weapon_model_general(
|
|
|
image,
|
|
|
imgsz=imgsz,
|
|
|
conf=0.4,
|
|
|
device=self.device,
|
|
|
half=use_half,
|
|
|
verbose=False
|
|
|
)
|
|
|
|
|
|
for result in general_results:
|
|
|
boxes = result.boxes
|
|
|
if boxes is not None:
|
|
|
for box in boxes:
|
|
|
class_id = int(box.cls[0])
|
|
|
class_name = result.names[class_id].lower()
|
|
|
|
|
|
|
|
|
weapon_keywords = ['knife', 'scissors', 'fork']
|
|
|
|
|
|
if any(keyword in class_name for keyword in weapon_keywords):
|
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
|
confidence = float(box.conf[0])
|
|
|
|
|
|
detections.append({
|
|
|
'type': 'weapon',
|
|
|
'class': class_name,
|
|
|
'weapon_type': 'blade',
|
|
|
'confidence': confidence,
|
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)],
|
|
|
'threat_level': self.assess_weapon_threat('blade', confidence),
|
|
|
'detection_method': 'general_model_fallback'
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ General detection error: {e}")
|
|
|
|
|
|
return detections
|
|
|
|
|
|
|
|
|
|
|
|
def enhance_knife_detection(self, image):
|
|
|
"""Enhance image specifically for better knife/dao detection"""
|
|
|
try:
|
|
|
|
|
|
enhanced = cv2.convertScaleAbs(image, alpha=1.4, beta=25)
|
|
|
|
|
|
|
|
|
kernel_sharpen = np.array([[-1, -1, -1],
|
|
|
[-1, 9, -1],
|
|
|
[-1, -1, -1]])
|
|
|
sharpened = cv2.filter2D(enhanced, -1, kernel_sharpen)
|
|
|
|
|
|
|
|
|
lab = cv2.cvtColor(sharpened, cv2.COLOR_BGR2LAB)
|
|
|
l, a, b = cv2.split(lab)
|
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
|
|
|
l = clahe.apply(l)
|
|
|
enhanced_final = cv2.merge([l, a, b])
|
|
|
enhanced_final = cv2.cvtColor(enhanced_final, cv2.COLOR_LAB2BGR)
|
|
|
|
|
|
return enhanced_final
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Enhancement failed: {e}")
|
|
|
return image
|
|
|
|
|
|
def boost_knife_confidence(self, image, bbox, initial_confidence, class_name):
|
|
|
"""Boost confidence for knife/dao based on geometric and visual features"""
|
|
|
try:
|
|
|
x1, y1, x2, y2 = [int(coord) for coord in bbox]
|
|
|
|
|
|
|
|
|
x1 = max(0, x1)
|
|
|
y1 = max(0, y1)
|
|
|
x2 = min(image.shape[1], x2)
|
|
|
y2 = min(image.shape[0], y2)
|
|
|
|
|
|
roi = image[y1:y2, x1:x2]
|
|
|
|
|
|
if roi.size == 0:
|
|
|
return initial_confidence
|
|
|
|
|
|
boost = 0
|
|
|
|
|
|
|
|
|
height = y2 - y1
|
|
|
width = x2 - x1
|
|
|
if height > 0 and width > 0:
|
|
|
aspect_ratio = max(width, height) / min(width, height)
|
|
|
if aspect_ratio > 2.5:
|
|
|
boost += 0.15
|
|
|
elif aspect_ratio > 2.0:
|
|
|
boost += 0.10
|
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
|
|
mean_brightness = np.mean(gray)
|
|
|
std_brightness = np.std(gray)
|
|
|
|
|
|
if mean_brightness > 140:
|
|
|
boost += 0.10
|
|
|
if std_brightness > 50:
|
|
|
boost += 0.05
|
|
|
|
|
|
|
|
|
edges = cv2.Canny(gray, 50, 150)
|
|
|
edge_ratio = np.count_nonzero(edges) / edges.size
|
|
|
if edge_ratio > 0.15:
|
|
|
boost += 0.10
|
|
|
elif edge_ratio > 0.10:
|
|
|
boost += 0.05
|
|
|
|
|
|
|
|
|
if height > width:
|
|
|
gradient = np.gradient(gray, axis=0)
|
|
|
else:
|
|
|
gradient = np.gradient(gray, axis=1)
|
|
|
|
|
|
gradient_strength = np.mean(np.abs(gradient))
|
|
|
if gradient_strength > 10:
|
|
|
boost += 0.05
|
|
|
|
|
|
|
|
|
if 'dao' in class_name.lower() or 'knife' in class_name.lower():
|
|
|
boost *= 1.2
|
|
|
|
|
|
final_confidence = min(initial_confidence + boost, 0.95)
|
|
|
|
|
|
if boost > 0:
|
|
|
print(
|
|
|
f" ๐ช Knife boost applied: +{boost:.2f} (AR:{aspect_ratio:.1f}, Bright:{mean_brightness:.0f}, Edge:{edge_ratio:.2f})")
|
|
|
|
|
|
return final_confidence
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Confidence boost error: {e}")
|
|
|
return initial_confidence
|
|
|
|
|
|
def detect_persons(self, image):
|
|
|
"""Detect persons using general model (needed for NSFW and fight analysis)"""
|
|
|
persons = []
|
|
|
|
|
|
if not self.weapon_model_general:
|
|
|
return persons
|
|
|
|
|
|
try:
|
|
|
imgsz = self.config['performance']['image_size']
|
|
|
use_half = self.config['performance']['half_precision'] and self.device == 'cuda'
|
|
|
|
|
|
results = self.weapon_model_general(
|
|
|
image,
|
|
|
imgsz=imgsz,
|
|
|
conf=0.3,
|
|
|
device=self.device,
|
|
|
half=use_half,
|
|
|
verbose=False
|
|
|
)
|
|
|
|
|
|
for result in results:
|
|
|
boxes = result.boxes
|
|
|
if boxes is not None:
|
|
|
for box in boxes:
|
|
|
class_id = int(box.cls[0])
|
|
|
class_name = result.names[class_id].lower()
|
|
|
|
|
|
if class_name == 'person':
|
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
|
|
|
confidence = float(box.conf[0])
|
|
|
|
|
|
persons.append({
|
|
|
'class': 'person',
|
|
|
'confidence': confidence,
|
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)]
|
|
|
})
|
|
|
|
|
|
return persons
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Person detection error: {e}")
|
|
|
return []
|
|
|
|
|
|
def classify_weapon_type(self, class_name):
|
|
|
"""Classify weapon type from class name"""
|
|
|
class_name = class_name.lower()
|
|
|
|
|
|
|
|
|
knife_keywords = ['knife', 'dao', 'blade', 'dagger', 'sword', 'machete', 'katana', 'cutter']
|
|
|
if any(keyword in class_name for keyword in knife_keywords):
|
|
|
return 'blade'
|
|
|
|
|
|
|
|
|
gun_keywords = ['gun', 'pistol', 'rifle', 'firearm', 'revolver', 'shotgun', 'sรบng']
|
|
|
if any(keyword in class_name for keyword in gun_keywords):
|
|
|
return 'firearm'
|
|
|
|
|
|
|
|
|
other_keywords = ['axe', 'hammer', 'club', 'bat']
|
|
|
if any(keyword in class_name for keyword in other_keywords):
|
|
|
return 'blunt_weapon'
|
|
|
|
|
|
|
|
|
if 'weapon' in class_name:
|
|
|
try:
|
|
|
weapon_id = int(class_name.split('_')[-1])
|
|
|
if weapon_id in [0, 1]:
|
|
|
return 'firearm'
|
|
|
elif weapon_id in [2, 3]:
|
|
|
return 'blade'
|
|
|
else:
|
|
|
return 'unknown_weapon'
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
return 'unknown_weapon'
|
|
|
|
|
|
def assess_weapon_threat(self, weapon_type, confidence):
|
|
|
"""Assess threat level of detected weapon"""
|
|
|
threat_levels = {
|
|
|
'firearm': 'critical',
|
|
|
'blade': 'high',
|
|
|
'blunt_weapon': 'medium',
|
|
|
'unknown_weapon': 'medium'
|
|
|
}
|
|
|
|
|
|
base_threat = threat_levels.get(weapon_type, 'medium')
|
|
|
|
|
|
|
|
|
if confidence >= 0.9:
|
|
|
if base_threat == 'medium':
|
|
|
return 'high'
|
|
|
elif base_threat == 'high':
|
|
|
return 'critical'
|
|
|
else:
|
|
|
return base_threat
|
|
|
elif confidence >= 0.7:
|
|
|
return base_threat
|
|
|
elif confidence >= 0.5:
|
|
|
if base_threat == 'critical':
|
|
|
return 'high'
|
|
|
elif base_threat == 'high':
|
|
|
return 'medium'
|
|
|
else:
|
|
|
return base_threat
|
|
|
else:
|
|
|
if base_threat == 'critical':
|
|
|
return 'medium'
|
|
|
elif base_threat == 'high':
|
|
|
return 'low'
|
|
|
else:
|
|
|
return 'low'
|
|
|
|
|
|
def remove_duplicate_detections(self, detections, iou_threshold=0.4):
|
|
|
"""Remove duplicate detections using Non-Maximum Suppression"""
|
|
|
if len(detections) <= 1:
|
|
|
return detections
|
|
|
|
|
|
|
|
|
detections = sorted(detections, key=lambda x: x['confidence'], reverse=True)
|
|
|
|
|
|
keep = []
|
|
|
for i, det1 in enumerate(detections):
|
|
|
should_keep = True
|
|
|
for det2 in keep:
|
|
|
|
|
|
if det1['type'] == det2['type']:
|
|
|
iou = self.calculate_iou(det1['bbox'], det2['bbox'])
|
|
|
if iou > iou_threshold:
|
|
|
should_keep = False
|
|
|
break
|
|
|
|
|
|
if should_keep:
|
|
|
keep.append(det1)
|
|
|
|
|
|
return keep
|
|
|
|
|
|
def calculate_iou(self, box1, box2):
|
|
|
"""Calculate Intersection over Union between two bounding boxes"""
|
|
|
x1_min, y1_min, x1_max, y1_max = box1
|
|
|
x2_min, y2_min, x2_max, y2_max = box2
|
|
|
|
|
|
|
|
|
intersect_xmin = max(x1_min, x2_min)
|
|
|
intersect_ymin = max(y1_min, y2_min)
|
|
|
intersect_xmax = min(x1_max, x2_max)
|
|
|
intersect_ymax = min(y1_max, y2_max)
|
|
|
|
|
|
if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin:
|
|
|
return 0.0
|
|
|
|
|
|
intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin)
|
|
|
|
|
|
|
|
|
box1_area = (x1_max - x1_min) * (y1_max - y1_min)
|
|
|
box2_area = (x2_max - x2_min) * (y2_max - y2_min)
|
|
|
union_area = box1_area + box2_area - intersect_area
|
|
|
|
|
|
return intersect_area / union_area if union_area > 0 else 0
|
|
|
|
|
|
|
|
|
|
|
|
def detect_nsfw_content(self, image):
|
|
|
"""Enhanced NSFW detection with person detection first"""
|
|
|
detections = []
|
|
|
|
|
|
try:
|
|
|
if len(image.shape) == 3 and image.shape[2] == 3:
|
|
|
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
|
else:
|
|
|
rgb_image = image
|
|
|
|
|
|
|
|
|
persons = self.detect_persons(image)
|
|
|
|
|
|
if not persons:
|
|
|
|
|
|
return detections
|
|
|
|
|
|
print(f"๐ค Found {len(persons)} person(s), analyzing for NSFW content...")
|
|
|
|
|
|
|
|
|
if self.nsfw_classifier:
|
|
|
try:
|
|
|
pil_image = Image.fromarray(rgb_image)
|
|
|
nsfw_result = self.nsfw_classifier(pil_image)
|
|
|
|
|
|
if nsfw_result[0]['label'] == 'nsfw':
|
|
|
confidence = nsfw_result[0]['score']
|
|
|
if confidence > self.config['nsfw_detection']['confidence_threshold']:
|
|
|
detections.append({
|
|
|
'type': 'nsfw',
|
|
|
'class': 'inappropriate_content',
|
|
|
'confidence': confidence,
|
|
|
'bbox': [0, 0, image.shape[1], image.shape[0]],
|
|
|
'method': 'classification'
|
|
|
})
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ NSFW classifier error: {e}")
|
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['skin_detection']:
|
|
|
for person in persons:
|
|
|
person_detections = self.analyze_person_skin(image, person)
|
|
|
detections.extend(person_detections)
|
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['region_analysis'] and len(detections) == 0:
|
|
|
skin_detections = self.detect_skin_regions(image)
|
|
|
detections.extend(skin_detections)
|
|
|
|
|
|
return detections
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ NSFW detection error: {e}")
|
|
|
return []
|
|
|
|
|
|
def analyze_person_skin(self, image, person):
|
|
|
"""Analyze skin exposure for a specific person"""
|
|
|
detections = []
|
|
|
|
|
|
try:
|
|
|
x1, y1, x2, y2 = person['bbox']
|
|
|
person_region = image[y1:y2, x1:x2]
|
|
|
|
|
|
if person_region.size == 0:
|
|
|
return detections
|
|
|
|
|
|
|
|
|
hsv_person = cv2.cvtColor(person_region, cv2.COLOR_BGR2HSV)
|
|
|
|
|
|
|
|
|
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
|
|
|
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
|
|
|
|
|
|
|
|
|
skin_mask = cv2.inRange(hsv_person, lower_skin, upper_skin)
|
|
|
|
|
|
|
|
|
total_person_pixels = person_region.shape[0] * person_region.shape[1]
|
|
|
skin_pixels = cv2.countNonZero(skin_mask)
|
|
|
skin_ratio = skin_pixels / total_person_pixels if total_person_pixels > 0 else 0
|
|
|
|
|
|
|
|
|
if skin_ratio > 0.4:
|
|
|
confidence = min(skin_ratio * 2, 1.0)
|
|
|
|
|
|
detections.append({
|
|
|
'type': 'nsfw',
|
|
|
'class': 'excessive_skin_exposure',
|
|
|
'confidence': confidence,
|
|
|
'bbox': [x1, y1, x2, y2],
|
|
|
'method': 'person_skin_analysis',
|
|
|
'skin_ratio': skin_ratio
|
|
|
})
|
|
|
|
|
|
print(f"๐จ Excessive skin exposure detected: {skin_ratio:.2f} ratio")
|
|
|
|
|
|
return detections
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Person skin analysis error: {e}")
|
|
|
return []
|
|
|
|
|
|
def detect_skin_regions(self, image):
|
|
|
"""Detect large skin-colored regions"""
|
|
|
try:
|
|
|
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
|
|
|
|
|
|
|
|
|
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
|
|
|
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
|
|
|
|
|
|
|
|
|
skin_mask = cv2.inRange(hsv, lower_skin, upper_skin)
|
|
|
|
|
|
|
|
|
kernel = np.ones((3, 3), np.uint8)
|
|
|
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel)
|
|
|
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel)
|
|
|
|
|
|
|
|
|
contours, _ = cv2.findContours(skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
|
|
detections = []
|
|
|
image_area = image.shape[0] * image.shape[1]
|
|
|
|
|
|
for contour in contours:
|
|
|
area = cv2.contourArea(contour)
|
|
|
|
|
|
|
|
|
if area > image_area * 0.3:
|
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
|
confidence = min(area / image_area, 1.0)
|
|
|
|
|
|
detections.append({
|
|
|
'type': 'nsfw',
|
|
|
'class': 'large_skin_region',
|
|
|
'confidence': confidence,
|
|
|
'bbox': [x, y, x + w, y + h],
|
|
|
'method': 'skin_detection'
|
|
|
})
|
|
|
|
|
|
return detections
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Skin detection error: {e}")
|
|
|
return []
|
|
|
|
|
|
def setup_nsfw_detector(self):
|
|
|
"""Setup NSFW detection components (Optimized for CPU)"""
|
|
|
try:
|
|
|
print("๐ Loading NSFW detection components...")
|
|
|
|
|
|
|
|
|
try:
|
|
|
device_id = 0 if self.device == 'cuda' else -1
|
|
|
self.nsfw_classifier = pipeline(
|
|
|
"image-classification",
|
|
|
model="Falconsai/nsfw_image_detection",
|
|
|
device=device_id,
|
|
|
use_fast=True
|
|
|
)
|
|
|
print("โ
NSFW classifier loaded")
|
|
|
except Exception as nsfw_error:
|
|
|
print(f"โ ๏ธ NSFW classifier failed: {nsfw_error}")
|
|
|
print(" Trying backup method...")
|
|
|
try:
|
|
|
|
|
|
self.nsfw_classifier = pipeline(
|
|
|
"image-classification",
|
|
|
model="Falconsai/nsfw_image_detection",
|
|
|
device=device_id
|
|
|
)
|
|
|
print("โ
NSFW classifier loaded (fallback)")
|
|
|
except:
|
|
|
print("โ NSFW classifier completely failed")
|
|
|
self.nsfw_classifier = None
|
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['pose_analysis'] and MEDIAPIPE_AVAILABLE:
|
|
|
try:
|
|
|
import mediapipe as mp
|
|
|
try:
|
|
|
mp_pose = mp.solutions.pose
|
|
|
self.pose_detector = mp_pose.Pose(
|
|
|
static_image_mode=True,
|
|
|
model_complexity=0,
|
|
|
min_detection_confidence=0.5
|
|
|
)
|
|
|
print("โ
Pose detector loaded (legacy API)")
|
|
|
except AttributeError:
|
|
|
print("โ ๏ธ MediaPipe API not available")
|
|
|
self.pose_detector = None
|
|
|
self.config['nsfw_detection']['pose_analysis'] = False
|
|
|
|
|
|
except Exception as pose_error:
|
|
|
print(f"โ ๏ธ Pose detection failed: {pose_error}")
|
|
|
self.pose_detector = None
|
|
|
self.config['nsfw_detection']['pose_analysis'] = False
|
|
|
else:
|
|
|
self.pose_detector = None
|
|
|
if not MEDIAPIPE_AVAILABLE:
|
|
|
print("โ ๏ธ MediaPipe not available - pose analysis disabled")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Error loading NSFW components: {e}")
|
|
|
print("๐ก Falling back to skin detection only")
|
|
|
|
|
|
def process_image(self, image_path):
|
|
|
"""Process single image with enhanced detection including fights"""
|
|
|
try:
|
|
|
|
|
|
if isinstance(image_path, str):
|
|
|
image = cv2.imread(image_path)
|
|
|
if image is None:
|
|
|
raise ValueError(f"Could not load image: {image_path}")
|
|
|
cache_key = f"file_{image_path}"
|
|
|
else:
|
|
|
image = image_path
|
|
|
cache_key = f"array_{hash(image.tobytes())}"
|
|
|
|
|
|
|
|
|
import time
|
|
|
current_time = time.time()
|
|
|
if cache_key in self.detection_cache:
|
|
|
cached_result, timestamp = self.detection_cache[cache_key]
|
|
|
if current_time - timestamp < self.cache_ttl:
|
|
|
return cached_result
|
|
|
|
|
|
print(f"๐ธ Processing image: {image.shape}")
|
|
|
|
|
|
|
|
|
all_detections = []
|
|
|
|
|
|
|
|
|
if self.config['weapon_detection']['enabled']:
|
|
|
weapon_fight_detections = self.detect_weapons(image)
|
|
|
all_detections.extend(weapon_fight_detections)
|
|
|
|
|
|
weapon_detections = [d for d in weapon_fight_detections if d['type'] == 'weapon']
|
|
|
fight_detections = [d for d in weapon_fight_detections if d['type'] == 'fight']
|
|
|
|
|
|
print(f"๐ซ Found {len(weapon_detections)} weapon(s)")
|
|
|
print(f"๐ Found {len(fight_detections)} fight(s)")
|
|
|
|
|
|
|
|
|
if weapon_detections:
|
|
|
knife_detections = [d for d in weapon_detections if d['weapon_type'] == 'blade']
|
|
|
if knife_detections:
|
|
|
print(f" ๐ช Including {len(knife_detections)} knife/dao detection(s)")
|
|
|
|
|
|
if fight_detections:
|
|
|
for fight in fight_detections:
|
|
|
fight_type = fight.get('fight_type', 'unknown')
|
|
|
aggression = fight.get('aggression_level', 'unknown')
|
|
|
print(f" ๐ Fight: {fight_type} (aggression: {aggression})")
|
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['enabled']:
|
|
|
nsfw_detections = self.detect_nsfw_content(image)
|
|
|
all_detections.extend(nsfw_detections)
|
|
|
print(f"๐ Found {len(nsfw_detections)} NSFW detection(s)")
|
|
|
|
|
|
|
|
|
result = {
|
|
|
'timestamp': datetime.now().isoformat(),
|
|
|
'image_path': image_path if isinstance(image_path, str) else 'array',
|
|
|
'detections': all_detections,
|
|
|
'total_threats': len(all_detections),
|
|
|
'risk_level': self.calculate_risk_level(all_detections),
|
|
|
'action_required': len(all_detections) > 0,
|
|
|
'processing_method': 'enhanced_dual_model_with_fight',
|
|
|
'detection_breakdown': {
|
|
|
'weapons': len([d for d in all_detections if d['type'] == 'weapon']),
|
|
|
'fights': len([d for d in all_detections if d['type'] == 'fight']),
|
|
|
'nsfw': len([d for d in all_detections if d['type'] == 'nsfw'])
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
self.detection_cache[cache_key] = (result, current_time)
|
|
|
|
|
|
|
|
|
self.clean_cache(current_time)
|
|
|
|
|
|
|
|
|
self.detection_history.append(result)
|
|
|
|
|
|
|
|
|
if self.config['output']['draw_boxes'] and all_detections:
|
|
|
annotated_image = self.draw_detections(image.copy(), all_detections)
|
|
|
result['annotated_image'] = annotated_image
|
|
|
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Error processing image: {e}")
|
|
|
return None
|
|
|
|
|
|
def clean_cache(self, current_time):
|
|
|
"""Clean expired cache entries"""
|
|
|
try:
|
|
|
expired_keys = []
|
|
|
for key, (_, timestamp) in self.detection_cache.items():
|
|
|
if current_time - timestamp > self.cache_ttl:
|
|
|
expired_keys.append(key)
|
|
|
|
|
|
for key in expired_keys:
|
|
|
del self.detection_cache[key]
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ ๏ธ Cache cleanup error: {e}")
|
|
|
|
|
|
def get_model_status(self):
|
|
|
"""Get status of all models (safe access to config keys)."""
|
|
|
|
|
|
wd = self.config.get('weapon_detection', {})
|
|
|
|
|
|
wd_fd = wd.get('fight_detection', None)
|
|
|
if isinstance(wd_fd, dict):
|
|
|
fight_enabled = wd_fd.get('enabled', True)
|
|
|
else:
|
|
|
fight_enabled = bool(wd_fd)
|
|
|
|
|
|
|
|
|
fight_analysis_flag = wd.get('fight_analysis', False) or \
|
|
|
bool(self.config.get('fight_detection', {}).get('multi_person_analysis', False))
|
|
|
|
|
|
status = {
|
|
|
'fight_detection': fight_enabled,
|
|
|
'custom_weapon_fight_model': self.weapon_model_custom is not None,
|
|
|
'general_model': self.weapon_model_general is not None,
|
|
|
'nsfw_classifier': self.nsfw_classifier is not None,
|
|
|
'pose_detector': self.pose_detector is not None,
|
|
|
'device': self.device,
|
|
|
'cache_size': len(self.detection_cache),
|
|
|
'knife_enhancement': wd.get('use_enhancement', False),
|
|
|
'knife_boost': wd.get('boost_knife_detection', False),
|
|
|
'fight_analysis': fight_analysis_flag
|
|
|
}
|
|
|
|
|
|
if self.weapon_model_custom and hasattr(self.weapon_model_custom, 'names'):
|
|
|
status['custom_classes'] = list(self.weapon_model_custom.names.values())
|
|
|
|
|
|
return status
|
|
|
|
|
|
def calculate_risk_level(self, detections):
|
|
|
"""Calculate overall risk level including fights"""
|
|
|
if not detections:
|
|
|
return 'safe'
|
|
|
|
|
|
max_confidence = max(det['confidence'] for det in detections)
|
|
|
threat_types = set(det['type'] for det in detections)
|
|
|
|
|
|
|
|
|
has_weapons = 'weapon' in threat_types
|
|
|
has_fights = 'fight' in threat_types
|
|
|
has_nsfw = 'nsfw' in threat_types
|
|
|
|
|
|
|
|
|
if has_weapons and has_fights:
|
|
|
return 'critical'
|
|
|
|
|
|
|
|
|
fight_detections = [d for d in detections if d['type'] == 'fight']
|
|
|
if fight_detections:
|
|
|
max_fight_confidence = max(f['confidence'] for f in fight_detections)
|
|
|
if max_fight_confidence > 0.8:
|
|
|
return 'critical'
|
|
|
elif max_fight_confidence > 0.65:
|
|
|
return 'high'
|
|
|
|
|
|
|
|
|
if has_weapons and max_confidence > 0.8:
|
|
|
return 'critical'
|
|
|
elif has_weapons or has_fights or max_confidence > 0.9:
|
|
|
return 'high'
|
|
|
elif max_confidence > 0.7:
|
|
|
return 'medium'
|
|
|
else:
|
|
|
return 'low'
|
|
|
|
|
|
def draw_detections(self, image, detections):
|
|
|
"""Draw detection boxes and labels with enhanced visualization for fights"""
|
|
|
try:
|
|
|
colors = {
|
|
|
'weapon': (0, 0, 255),
|
|
|
'fight': (0, 165, 255),
|
|
|
'nsfw': (255, 0, 255),
|
|
|
}
|
|
|
|
|
|
|
|
|
weapon_colors = {
|
|
|
'blade': (0, 100, 255),
|
|
|
'firearm': (0, 0, 255),
|
|
|
'blunt_weapon': (100, 0, 255)
|
|
|
}
|
|
|
|
|
|
|
|
|
fight_colors = {
|
|
|
'physical_combat': (0, 140, 255),
|
|
|
'martial_arts': (0, 200, 255),
|
|
|
'wrestling': (0, 165, 255),
|
|
|
'group_violence': (0, 69, 255),
|
|
|
'general_fight': (0, 165, 255)
|
|
|
}
|
|
|
|
|
|
for det in detections:
|
|
|
x1, y1, x2, y2 = det['bbox']
|
|
|
|
|
|
|
|
|
if det['type'] == 'weapon' and 'weapon_type' in det:
|
|
|
color = weapon_colors.get(det['weapon_type'], colors['weapon'])
|
|
|
elif det['type'] == 'fight' and 'fight_type' in det:
|
|
|
color = fight_colors.get(det['fight_type'], colors['fight'])
|
|
|
else:
|
|
|
color = colors.get(det['type'], (0, 255, 0))
|
|
|
|
|
|
|
|
|
thickness = 4 if det.get('threat_level') == 'critical' else 3 if det['type'] in ['weapon',
|
|
|
'fight'] else 2
|
|
|
cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness)
|
|
|
|
|
|
|
|
|
if det['type'] == 'weapon':
|
|
|
label = f"{det['class']} ({det['confidence']:.2f})"
|
|
|
if 'threat_level' in det:
|
|
|
label += f" [{det['threat_level']}]"
|
|
|
elif det['type'] == 'fight':
|
|
|
label = f"FIGHT: {det['class']} ({det['confidence']:.2f})"
|
|
|
if 'threat_level' in det:
|
|
|
label += f" [{det['threat_level']}]"
|
|
|
if 'aggression_level' in det:
|
|
|
label += f" {det['aggression_level']}"
|
|
|
else:
|
|
|
label = f"{det['type']}: {det['class']} ({det['confidence']:.2f})"
|
|
|
|
|
|
|
|
|
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
|
|
|
cv2.rectangle(image, (x1, y1 - 25), (x1 + label_size[0] + 5, y1), color, -1)
|
|
|
|
|
|
|
|
|
cv2.putText(image, label, (x1 + 2, y1 - 7),
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
|
|
|
|
|
|
|
|
|
if det['type'] == 'fight':
|
|
|
context_text = []
|
|
|
if 'people_involved' in det and det['people_involved'] > 0:
|
|
|
context_text.append(f"People: {det['people_involved']}")
|
|
|
if 'context_flags' in det and det['context_flags']:
|
|
|
context_text.append(f"Flags: {', '.join(det['context_flags'])}")
|
|
|
|
|
|
if context_text:
|
|
|
context_label = " | ".join(context_text)
|
|
|
cv2.putText(image, context_label, (x1, y2 + 15),
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1)
|
|
|
|
|
|
|
|
|
if 'detection_method' in det:
|
|
|
method = det['detection_method'].split('_')[-1]
|
|
|
cv2.putText(image, method, (x1, y2 + 30),
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1)
|
|
|
|
|
|
return image
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Error drawing detections: {e}")
|
|
|
return image
|
|
|
|
|
|
def process_video(self, video_path, output_path=None, frame_skip=2):
|
|
|
"""Process video file with enhanced detection including fights - optimized frame processing"""
|
|
|
try:
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
frame_count = 0
|
|
|
total_detections = []
|
|
|
fight_timeline = []
|
|
|
recent_detections = []
|
|
|
|
|
|
if output_path:
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
|
|
|
|
|
while True:
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
break
|
|
|
|
|
|
frame_count += 1
|
|
|
|
|
|
|
|
|
should_process = False
|
|
|
|
|
|
|
|
|
if any(det['frame'] > frame_count - 10 for det in recent_detections[-5:]):
|
|
|
should_process = True
|
|
|
|
|
|
elif frame_count % max(1, frame_skip) == 0:
|
|
|
should_process = True
|
|
|
|
|
|
if not should_process:
|
|
|
if output_path:
|
|
|
out.write(frame)
|
|
|
continue
|
|
|
|
|
|
|
|
|
result = self.process_image(frame)
|
|
|
if result and result['detections']:
|
|
|
|
|
|
for detection in result['detections']:
|
|
|
detection['frame'] = frame_count
|
|
|
|
|
|
total_detections.extend(result['detections'])
|
|
|
recent_detections.append({'frame': frame_count, 'count': len(result['detections'])})
|
|
|
|
|
|
|
|
|
fight_detections = [d for d in result['detections'] if d['type'] == 'fight']
|
|
|
if fight_detections:
|
|
|
timestamp = frame_count / cap.get(cv2.CAP_PROP_FPS)
|
|
|
fight_timeline.append({
|
|
|
'timestamp': timestamp,
|
|
|
'frame': frame_count,
|
|
|
'fights': len(fight_detections),
|
|
|
'max_aggression': max(f.get('aggression_level', 'low') for f in fight_detections)
|
|
|
})
|
|
|
|
|
|
|
|
|
frame_skip = max(1, frame_skip // 2)
|
|
|
|
|
|
print(f"โ ๏ธ Frame {frame_count}: {len(result['detections'])} threats detected")
|
|
|
|
|
|
breakdown = result.get('detection_breakdown', {})
|
|
|
if breakdown.get('fights', 0) > 0:
|
|
|
print(f" ๐ Fights: {breakdown['fights']}")
|
|
|
|
|
|
if output_path and 'annotated_image' in result:
|
|
|
out.write(result['annotated_image'])
|
|
|
elif output_path:
|
|
|
out.write(frame)
|
|
|
else:
|
|
|
|
|
|
if len(recent_detections) > 5 and all(det['count'] == 0 for det in recent_detections[-5:]):
|
|
|
frame_skip = min(5, frame_skip + 1)
|
|
|
|
|
|
if output_path:
|
|
|
out.write(frame)
|
|
|
|
|
|
cap.release()
|
|
|
if output_path:
|
|
|
out.release()
|
|
|
|
|
|
|
|
|
fight_analysis = {}
|
|
|
if fight_timeline:
|
|
|
fight_analysis = {
|
|
|
'total_fight_incidents': len(fight_timeline),
|
|
|
'first_fight_time': fight_timeline[0]['timestamp'],
|
|
|
'last_fight_time': fight_timeline[-1]['timestamp'],
|
|
|
'peak_aggression_time': max(fight_timeline, key=lambda x: x['max_aggression'])['timestamp'],
|
|
|
'fight_duration_coverage': fight_timeline[-1]['timestamp'] - fight_timeline[0]['timestamp'] if len(
|
|
|
fight_timeline) > 1 else 0
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
'total_frames_processed': frame_count // frame_skip,
|
|
|
'total_detections': len(total_detections),
|
|
|
'detections': total_detections,
|
|
|
'fight_timeline': fight_timeline,
|
|
|
'fight_analysis': fight_analysis,
|
|
|
'detection_breakdown': {
|
|
|
'weapons': len([d for d in total_detections if d['type'] == 'weapon']),
|
|
|
'fights': len([d for d in total_detections if d['type'] == 'fight']),
|
|
|
'nsfw': len([d for d in total_detections if d['type'] == 'nsfw'])
|
|
|
}
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Error processing video: {e}")
|
|
|
return None
|
|
|
|
|
|
def save_report(self, filename="detection_report.json"):
|
|
|
"""Save detection history to file"""
|
|
|
try:
|
|
|
with open(filename, 'w') as f:
|
|
|
json.dump(self.detection_history, f, indent=2, default=str)
|
|
|
print(f"๐ Report saved to {filename}")
|
|
|
except Exception as e:
|
|
|
print(f"โ Error saving report: {e}")
|
|
|
|
|
|
def get_memory_usage(self):
|
|
|
"""Get current GPU memory usage"""
|
|
|
if torch.cuda.is_available():
|
|
|
allocated = torch.cuda.memory_allocated() / 1024 ** 3
|
|
|
cached = torch.cuda.memory_reserved() / 1024 ** 3
|
|
|
return f"GPU Memory: {allocated:.2f}GB allocated, {cached:.2f}GB cached"
|
|
|
return "CPU mode"
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""Enhanced example usage with knife and fight detection improvements"""
|
|
|
|
|
|
|
|
|
moderator = ContentModerator()
|
|
|
|
|
|
|
|
|
print("\n" + "=" * 60)
|
|
|
print("๐ฏ ENHANCED DUAL MODEL SYSTEM WITH FIGHT DETECTION")
|
|
|
print("=" * 60)
|
|
|
|
|
|
status = moderator.get_model_status()
|
|
|
|
|
|
if status['custom_weapon_fight_model']:
|
|
|
print("โ
Custom YOLO11 Model (dao + sรบng + fight): LOADED")
|
|
|
if 'custom_classes' in status:
|
|
|
print(f"๐ Custom classes: {status['custom_classes']}")
|
|
|
else:
|
|
|
print("โ Custom weapon+fight model: NOT FOUND")
|
|
|
|
|
|
if status['general_model']:
|
|
|
print("โ
General YOLO11n Model (person detection): LOADED")
|
|
|
else:
|
|
|
print("โ General model: FAILED")
|
|
|
|
|
|
if status['nsfw_classifier']:
|
|
|
print("โ
NSFW Classifier: LOADED")
|
|
|
else:
|
|
|
print("โ NSFW Classifier: FAILED")
|
|
|
|
|
|
print(f"๐ฅ๏ธ Device: {status['device']}")
|
|
|
print(f"๐๏ธ Cache system: ENABLED")
|
|
|
print(f"๐ช Knife enhancement: {'ENABLED' if status['knife_enhancement'] else 'DISABLED'}")
|
|
|
print(f"๐ Knife confidence boost: {'ENABLED' if status['knife_boost'] else 'DISABLED'}")
|
|
|
print(f"๐ Fight detection: {'ENABLED' if status['fight_detection'] else 'DISABLED'}")
|
|
|
print(f"๐ง Fight analysis: {'ENABLED' if status['fight_analysis'] else 'DISABLED'}")
|
|
|
|
|
|
|
|
|
print("\n" + "=" * 60)
|
|
|
print("โจ ENHANCED DETECTION FEATURES")
|
|
|
print("=" * 60)
|
|
|
print("๐ง Image Enhancement:")
|
|
|
print(" - Contrast & brightness optimization")
|
|
|
print(" - Edge sharpening for metallic objects")
|
|
|
print(" - CLAHE for local contrast")
|
|
|
print("๐ Confidence Boosting:")
|
|
|
print(" - Geometric analysis (knives)")
|
|
|
print(" - Motion blur analysis (fights)")
|
|
|
print(" - Edge strength analysis")
|
|
|
print("๐ฏ Multi-pass Detection:")
|
|
|
print(" - Low threshold pass for knives (0.45)")
|
|
|
print(" - Normal threshold for guns (0.45)")
|
|
|
print(" - Low threshold for fights (0.40)")
|
|
|
print("๐ Fight Analysis:")
|
|
|
print(" - Multi-person fight detection")
|
|
|
print(" - Aggression level assessment")
|
|
|
print(" - Context-aware threat escalation")
|
|
|
|
|
|
|
|
|
print("\n" + "=" * 50)
|
|
|
print("๐ผ๏ธ SINGLE IMAGE PROCESSING")
|
|
|
print("=" * 50)
|
|
|
|
|
|
test_image = "test_image.jpg"
|
|
|
|
|
|
if os.path.exists(test_image):
|
|
|
result = moderator.process_image(test_image)
|
|
|
if result:
|
|
|
print(f"\n๐ DETECTION RESULTS:")
|
|
|
print(f"Risk Level: {result['risk_level']}")
|
|
|
print(f"Total Threats: {result['total_threats']}")
|
|
|
print(f"Processing Method: {result.get('processing_method', 'standard')}")
|
|
|
|
|
|
breakdown = result.get('detection_breakdown', {})
|
|
|
if breakdown:
|
|
|
print(f"\n๐ BREAKDOWN:")
|
|
|
print(f" Weapons: {breakdown.get('weapons', 0)}")
|
|
|
print(f" Fights: {breakdown.get('fights', 0)}")
|
|
|
print(f" NSFW: {breakdown.get('nsfw', 0)}")
|
|
|
|
|
|
|
|
|
weapon_detections = [d for d in result['detections'] if d['type'] == 'weapon']
|
|
|
if weapon_detections:
|
|
|
print(f"\n๐ซ WEAPON DETECTIONS: {len(weapon_detections)}")
|
|
|
for i, detection in enumerate(weapon_detections):
|
|
|
method = detection.get('detection_method', 'unknown')
|
|
|
print(f" Weapon {i + 1} ({method}):")
|
|
|
print(f" Class: {detection['class']}")
|
|
|
print(f" Type: {detection['weapon_type']}")
|
|
|
print(f" Confidence: {detection['confidence']:.3f}")
|
|
|
print(f" Threat Level: {detection['threat_level']}")
|
|
|
|
|
|
|
|
|
fight_detections = [d for d in result['detections'] if d['type'] == 'fight']
|
|
|
if fight_detections:
|
|
|
print(f"\n๐ FIGHT DETECTIONS: {len(fight_detections)}")
|
|
|
for i, detection in enumerate(fight_detections):
|
|
|
method = detection.get('detection_method', 'unknown')
|
|
|
print(f" Fight {i + 1} ({method}):")
|
|
|
print(f" Class: {detection['class']}")
|
|
|
print(f" Type: {detection.get('fight_type', 'unknown')}")
|
|
|
print(f" Confidence: {detection['confidence']:.3f}")
|
|
|
print(f" Threat Level: {detection['threat_level']}")
|
|
|
print(f" Aggression: {detection.get('aggression_level', 'unknown')}")
|
|
|
if 'people_involved' in detection:
|
|
|
print(f" People Involved: {detection['people_involved']}")
|
|
|
if 'context_flags' in detection and detection['context_flags']:
|
|
|
print(f" Context: {', '.join(detection['context_flags'])}")
|
|
|
|
|
|
|
|
|
nsfw_detections = [d for d in result['detections'] if d['type'] == 'nsfw']
|
|
|
if nsfw_detections:
|
|
|
print(f"\n๐ NSFW DETECTIONS: {len(nsfw_detections)}")
|
|
|
for i, detection in enumerate(nsfw_detections):
|
|
|
method = detection.get('method', 'unknown')
|
|
|
print(f" NSFW {i + 1} ({method}):")
|
|
|
print(f" Class: {detection['class']}")
|
|
|
print(f" Confidence: {detection['confidence']:.3f}")
|
|
|
if 'skin_ratio' in detection:
|
|
|
print(f" Skin Ratio: {detection['skin_ratio']:.2f}")
|
|
|
else:
|
|
|
print(f"โ ๏ธ Test image not found: {test_image}")
|
|
|
print("Creating a test pattern to demonstrate detection...")
|
|
|
|
|
|
|
|
|
test_img = np.ones((640, 640, 3), dtype=np.uint8) * 128
|
|
|
cv2.putText(test_img, "Test Pattern", (200, 320),
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3)
|
|
|
|
|
|
result = moderator.process_image(test_img)
|
|
|
print("โ
Test pattern processed successfully")
|
|
|
|
|
|
|
|
|
print("\n" + "=" * 60)
|
|
|
print("๐น ENHANCED WEBCAM PROCESSING WITH FIGHT DETECTION")
|
|
|
print("=" * 60)
|
|
|
print("Starting enhanced detection on webcam...")
|
|
|
print("๐ฎ Controls:")
|
|
|
print(" - Press 'q' to quit")
|
|
|
print(" - Press 's' to save frame")
|
|
|
print(" - Press 'i' to show model info")
|
|
|
print(" - Press 'e' to toggle enhancement")
|
|
|
print(" - Press 'b' to toggle knife confidence boost")
|
|
|
print(" - Press 'f' to toggle fight analysis")
|
|
|
print(" - Press 'h' for help")
|
|
|
|
|
|
try:
|
|
|
cap = cv2.VideoCapture(0)
|
|
|
|
|
|
if not cap.isOpened():
|
|
|
print("โ Cannot open webcam. Check if camera is connected.")
|
|
|
else:
|
|
|
print("โ
Enhanced webcam processing started")
|
|
|
|
|
|
frame_count = 0
|
|
|
detection_stats = {
|
|
|
'weapons': 0,
|
|
|
'knives': 0,
|
|
|
'guns': 0,
|
|
|
'fights': 0,
|
|
|
'nsfw': 0,
|
|
|
'total_frames': 0,
|
|
|
'fight_incidents': 0
|
|
|
}
|
|
|
|
|
|
|
|
|
process_interval = 2
|
|
|
last_detection_frame = 0
|
|
|
consecutive_safe_frames = 0
|
|
|
|
|
|
while True:
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
print("โ Cannot read from webcam")
|
|
|
break
|
|
|
|
|
|
frame_count += 1
|
|
|
detection_stats['total_frames'] = frame_count
|
|
|
frame = cv2.flip(frame, 1)
|
|
|
|
|
|
|
|
|
y_offset = frame.shape[0] - 120
|
|
|
cv2.putText(frame,
|
|
|
f"Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}",
|
|
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
|
|
|
cv2.putText(frame,
|
|
|
f"Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}",
|
|
|
(10, y_offset + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
|
|
|
cv2.putText(frame,
|
|
|
f"Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}",
|
|
|
(10, y_offset + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
|
|
|
model_info = "Models: Custom+General" if moderator.weapon_model_custom else "General Only"
|
|
|
cv2.putText(frame, model_info, (10, y_offset + 60),
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
|
|
|
|
|
|
should_process = False
|
|
|
|
|
|
|
|
|
if frame_count - last_detection_frame <= 30:
|
|
|
should_process = (frame_count % 1 == 0)
|
|
|
cv2.putText(frame, "HIGH ALERT MODE", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
|
|
|
|
|
|
elif frame_count % process_interval == 0:
|
|
|
should_process = True
|
|
|
|
|
|
if should_process:
|
|
|
result = moderator.process_image(frame)
|
|
|
|
|
|
if result and result['action_required']:
|
|
|
last_detection_frame = frame_count
|
|
|
consecutive_safe_frames = 0
|
|
|
process_interval = 1
|
|
|
|
|
|
|
|
|
for detection in result['detections']:
|
|
|
if detection['type'] == 'weapon':
|
|
|
detection_stats['weapons'] += 1
|
|
|
if detection['weapon_type'] == 'blade':
|
|
|
detection_stats['knives'] += 1
|
|
|
elif detection['weapon_type'] == 'firearm':
|
|
|
detection_stats['guns'] += 1
|
|
|
elif detection['type'] == 'fight':
|
|
|
detection_stats['fights'] += 1
|
|
|
if detection.get('aggression_level') in ['high', 'extreme']:
|
|
|
detection_stats['fight_incidents'] += 1
|
|
|
elif detection['type'] == 'nsfw':
|
|
|
detection_stats['nsfw'] += 1
|
|
|
|
|
|
print(
|
|
|
f"โ ๏ธ Frame {frame_count}: {result['risk_level']} risk - {result['total_threats']} threats!")
|
|
|
|
|
|
|
|
|
for detection in result['detections']:
|
|
|
if detection['type'] == 'weapon':
|
|
|
icon = "๐ช" if detection['weapon_type'] == 'blade' else "๐ซ"
|
|
|
method = detection.get('detection_method', 'unknown').split('_')[-1]
|
|
|
print(f" {icon} {detection['class']} ({detection['confidence']:.3f}) [{method}]")
|
|
|
elif detection['type'] == 'fight':
|
|
|
fight_type = detection.get('fight_type', 'general')
|
|
|
aggression = detection.get('aggression_level', 'unknown')
|
|
|
people = detection.get('people_involved', 0)
|
|
|
method = detection.get('detection_method', 'unknown').split('_')[-1]
|
|
|
print(f" ๐ FIGHT: {fight_type} ({detection['confidence']:.3f}) [{method}]")
|
|
|
print(f" Aggression: {aggression}, People: {people}")
|
|
|
|
|
|
|
|
|
if 'annotated_image' in result:
|
|
|
cv2.imshow('Enhanced Detection System (Weapons + Fights)', result['annotated_image'])
|
|
|
else:
|
|
|
|
|
|
breakdown = result.get('detection_breakdown', {})
|
|
|
threat_text = f"THREATS: W:{breakdown.get('weapons', 0)} F:{breakdown.get('fights', 0)} N:{breakdown.get('nsfw', 0)}"
|
|
|
cv2.putText(frame, threat_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
|
|
|
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame)
|
|
|
else:
|
|
|
consecutive_safe_frames += 1
|
|
|
|
|
|
if consecutive_safe_frames > 30:
|
|
|
process_interval = min(3, process_interval + 1)
|
|
|
consecutive_safe_frames = 0
|
|
|
|
|
|
cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
|
|
cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90),
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame)
|
|
|
else:
|
|
|
cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
|
|
cv2.putText(frame, f"Process Interval: {process_interval}", (10, 90),
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame)
|
|
|
|
|
|
|
|
|
key = cv2.waitKey(1) & 0xFF
|
|
|
if key == ord('q'):
|
|
|
print("๐ Webcam stopped by user")
|
|
|
break
|
|
|
elif key == ord('s'):
|
|
|
filename = f"enhanced_detection_{frame_count}.jpg"
|
|
|
cv2.imwrite(filename, frame)
|
|
|
print(f"๐พ Frame saved as {filename}")
|
|
|
elif key == ord('i'):
|
|
|
print(f"\n๐ Model Status:")
|
|
|
current_status = moderator.get_model_status()
|
|
|
for k, v in current_status.items():
|
|
|
print(f" {k}: {v}")
|
|
|
elif key == ord('e'):
|
|
|
|
|
|
moderator.config['weapon_detection']['use_enhancement'] = \
|
|
|
not moderator.config['weapon_detection']['use_enhancement']
|
|
|
print(
|
|
|
f"๐ง Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}")
|
|
|
elif key == ord('b'):
|
|
|
|
|
|
moderator.config['weapon_detection']['boost_knife_detection'] = \
|
|
|
not moderator.config['weapon_detection']['boost_knife_detection']
|
|
|
print(
|
|
|
f"๐ Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}")
|
|
|
elif key == ord('f'):
|
|
|
|
|
|
moderator.config['weapon_detection']['fight_analysis'] = \
|
|
|
not moderator.config['weapon_detection']['fight_analysis']
|
|
|
print(
|
|
|
f"๐ Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}")
|
|
|
elif key == ord('h'):
|
|
|
print("\n๐ฎ Controls:")
|
|
|
print(" 'q': quit")
|
|
|
print(" 's': save frame")
|
|
|
print(" 'i': model info")
|
|
|
print(" 'e': toggle enhancement")
|
|
|
print(" 'b': toggle knife confidence boost")
|
|
|
print(" 'f': toggle fight analysis")
|
|
|
print(" 'h': help")
|
|
|
|
|
|
|
|
|
print(f"\n๐ Session Statistics:")
|
|
|
print(f" Total frames: {detection_stats['total_frames']}")
|
|
|
print(f" Total weapon detections: {detection_stats['weapons']}")
|
|
|
print(f" - Knives/Dao: {detection_stats['knives']}")
|
|
|
print(f" - Guns: {detection_stats['guns']}")
|
|
|
print(f" Total fight detections: {detection_stats['fights']}")
|
|
|
print(f" - High-aggression incidents: {detection_stats['fight_incidents']}")
|
|
|
print(f" NSFW detections: {detection_stats['nsfw']}")
|
|
|
|
|
|
if detection_stats['total_frames'] > 0:
|
|
|
total_detections = detection_stats['weapons'] + detection_stats['fights'] + detection_stats['nsfw']
|
|
|
detection_rate = (total_detections / detection_stats['total_frames'] * 100)
|
|
|
print(f" Overall detection rate: {detection_rate:.1f}%")
|
|
|
|
|
|
if detection_stats['weapons'] > 0:
|
|
|
knife_ratio = detection_stats['knives'] / detection_stats['weapons'] * 100
|
|
|
print(f" Knife detection ratio: {knife_ratio:.1f}% of weapons")
|
|
|
|
|
|
if detection_stats['fights'] > 0:
|
|
|
incident_ratio = detection_stats['fight_incidents'] / detection_stats['fights'] * 100
|
|
|
print(f" High-aggression fight ratio: {incident_ratio:.1f}% of fights")
|
|
|
|
|
|
cap.release()
|
|
|
cv2.destroyAllWindows()
|
|
|
print("โ
Enhanced webcam session completed")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"โ Webcam error: {e}")
|
|
|
|
|
|
|
|
|
print(f"\n๐พ {moderator.get_memory_usage()}")
|
|
|
print(f"๐๏ธ Final cache size: {len(moderator.detection_cache)} entries")
|
|
|
|
|
|
|
|
|
moderator.save_report("enhanced_detection_with_fights_report.json")
|
|
|
|
|
|
print("\nโ
Enhanced Content Moderation System with Fight Detection completed!")
|
|
|
print("๐ก New fight detection capabilities:")
|
|
|
print(" - Behavioral fight pattern recognition")
|
|
|
print(" - Multi-person fight analysis")
|
|
|
print(" - Aggression level assessment")
|
|
|
print(" - Context-aware threat escalation")
|
|
|
print(" - Fight timeline tracking for videos")
|
|
|
print("๐ก Enhanced weapon detection:")
|
|
|
print(" - Image enhancement preprocessing")
|
|
|
print(" - Dynamic confidence thresholds")
|
|
|
print(" - Geometric feature analysis")
|
|
|
print(" - Multi-pass detection strategy")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |