|
|
import cv2 |
|
|
import numpy as np |
|
|
import torch |
|
|
import os |
|
|
import json |
|
|
import warnings |
|
|
|
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
|
|
|
try: |
|
|
from ultralytics import YOLO |
|
|
from transformers import pipeline |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
import requests |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
try: |
|
|
import mediapipe as mp |
|
|
|
|
|
MEDIAPIPE_AVAILABLE = True |
|
|
print("โ
MediaPipe imported successfully") |
|
|
except ImportError: |
|
|
MEDIAPIPE_AVAILABLE = False |
|
|
print("โ ๏ธ MediaPipe not available - pose detection disabled") |
|
|
except Exception as e: |
|
|
MEDIAPIPE_AVAILABLE = False |
|
|
print(f"โ ๏ธ MediaPipe import error: {e} - pose detection disabled") |
|
|
|
|
|
except ImportError as e: |
|
|
print(f"Missing dependency: {e}") |
|
|
print("Please install: pip install ultralytics transformers pillow requests") |
|
|
print("For MediaPipe: pip install mediapipe==0.10.18") |
|
|
|
|
|
|
|
|
class ContentModerator: |
|
|
def __init__(self, config=None): |
|
|
self.config = config or self.get_default_config() |
|
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
|
|
|
|
|
if self.device == 'cpu': |
|
|
print("๐ป CPU mode detected - applying optimizations...") |
|
|
torch.set_num_threads(4) |
|
|
self.config['performance']['half_precision'] = False |
|
|
self.config['nsfw_detection']['pose_analysis'] = False |
|
|
|
|
|
|
|
|
self.weapon_model = None |
|
|
self.weapon_model_custom = None |
|
|
self.weapon_model_general = None |
|
|
self.nsfw_classifier = None |
|
|
self.pose_detector = None |
|
|
|
|
|
|
|
|
self.detection_cache = {} |
|
|
self.cache_ttl = 2 |
|
|
|
|
|
|
|
|
self.detection_history = [] |
|
|
|
|
|
print(f"๐ Content Moderator initialized on {self.device}") |
|
|
if self.device == 'cpu': |
|
|
print("โก CPU optimizations enabled") |
|
|
|
|
|
self.setup_models() |
|
|
|
|
|
def get_default_config(self): |
|
|
"""Default configuration optimized for CPU/GPU with enhanced knife and fight detection""" |
|
|
|
|
|
is_cuda = torch.cuda.is_available() |
|
|
|
|
|
return { |
|
|
'weapon_detection': { |
|
|
'enabled': True, |
|
|
'confidence_threshold': 0.5, |
|
|
'knife_confidence': 0.5, |
|
|
'fight_confidence': 0.45, |
|
|
'model_size': 'yolo12n', |
|
|
'classes': ['gun', 'knife', 'fight'], |
|
|
'use_enhancement': True, |
|
|
'multi_pass': True, |
|
|
'boost_knife_detection': True, |
|
|
'fight_detection': True, |
|
|
'fight_analysis': True |
|
|
}, |
|
|
'fight_detection': { |
|
|
'enabled': True, |
|
|
'confidence_threshold': 0.45, |
|
|
'pose_analysis': True, |
|
|
'motion_analysis': False, |
|
|
'aggression_keywords': ['fight'], |
|
|
'threat_escalation': True, |
|
|
'multi_person_analysis': True |
|
|
}, |
|
|
'nsfw_detection': { |
|
|
'enabled': True, |
|
|
'confidence_threshold': 0.7, |
|
|
'skin_detection': True, |
|
|
'pose_analysis': False, |
|
|
'region_analysis': True |
|
|
}, |
|
|
'performance': { |
|
|
'image_size': 416 if is_cuda else 320, |
|
|
'batch_size': 1, |
|
|
'half_precision': is_cuda, |
|
|
'use_flash_attention': False, |
|
|
'cpu_optimization': not is_cuda |
|
|
}, |
|
|
'output': { |
|
|
'save_detections': True, |
|
|
'draw_boxes': True, |
|
|
'log_results': True |
|
|
} |
|
|
} |
|
|
|
|
|
def setup_models(self): |
|
|
"""Initialize all detection models""" |
|
|
try: |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
if self.config['weapon_detection']['enabled']: |
|
|
self.setup_weapon_detector() |
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['enabled']: |
|
|
self.setup_nsfw_detector() |
|
|
|
|
|
print("โ
All models loaded successfully!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Error setting up models: {e}") |
|
|
|
|
|
def setup_weapon_detector(self): |
|
|
"""Setup dual model system: Custom for weapons + fights + General for person detection""" |
|
|
try: |
|
|
print("๐ซ Loading weapon and fight detection models...") |
|
|
|
|
|
|
|
|
custom_model_path = "models/best_ft4.pt" |
|
|
project_root = os.path.dirname(os.path.abspath(__file__)) |
|
|
full_model_path = os.path.join(project_root, custom_model_path) |
|
|
|
|
|
if os.path.exists(full_model_path): |
|
|
print(f"โ
Loading custom weapon+fight model: {full_model_path}") |
|
|
self.weapon_model_custom = YOLO(full_model_path) |
|
|
print("๐ฏ Custom weapon+fight model (dao + sรบng + fight) loaded!") |
|
|
|
|
|
|
|
|
if hasattr(self.weapon_model_custom, 'names'): |
|
|
classes = list(self.weapon_model_custom.names.values()) |
|
|
print(f"๐ Custom classes: {classes}") |
|
|
|
|
|
|
|
|
if any('fight' in str(cls).lower() for cls in classes): |
|
|
print("๐ Fight detection enabled in custom model") |
|
|
else: |
|
|
print("โ ๏ธ Fight class not found in custom model") |
|
|
else: |
|
|
print("โ ๏ธ Custom weapon+fight model not found") |
|
|
self.weapon_model_custom = None |
|
|
|
|
|
|
|
|
print("๐ค Loading general model for person detection...") |
|
|
self.weapon_model_general = YOLO('yolo11n.pt') |
|
|
print("โ
General YOLO11n loaded for person detection") |
|
|
|
|
|
|
|
|
self.weapon_model = self.weapon_model_custom if self.weapon_model_custom else self.weapon_model_general |
|
|
|
|
|
|
|
|
if self.device == 'cuda' and self.config['performance']['half_precision']: |
|
|
try: |
|
|
if self.weapon_model_custom: |
|
|
self.weapon_model_custom.model.half() |
|
|
self.weapon_model_general.model.half() |
|
|
print("โ
Half precision enabled for both models") |
|
|
except: |
|
|
print("โ ๏ธ Half precision not supported") |
|
|
|
|
|
print("๐ฅ Dual model system ready with fight detection!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Error loading weapon+fight models: {e}") |
|
|
self.weapon_model = None |
|
|
self.weapon_model_custom = None |
|
|
self.weapon_model_general = None |
|
|
|
|
|
def detect_weapons(self, image): |
|
|
"""Enhanced dual model weapon and fight detection""" |
|
|
detections = [] |
|
|
|
|
|
try: |
|
|
imgsz = self.config['performance']['image_size'] |
|
|
use_half = self.config['performance']['half_precision'] and self.device == 'cuda' |
|
|
|
|
|
|
|
|
images_to_process = [(image, 1.0, "original")] |
|
|
|
|
|
if self.config['weapon_detection']['use_enhancement']: |
|
|
enhanced_image = self.enhance_knife_detection(image) |
|
|
images_to_process.append((enhanced_image, 1.15, "enhanced")) |
|
|
if 'fight_detection' not in self.config: |
|
|
self.config['fight_detection'] = { |
|
|
'enabled': True, |
|
|
'confidence_threshold': 0.40, |
|
|
'pose_analysis': False, |
|
|
'motion_analysis': False, |
|
|
'aggression_keywords': ['fight'], |
|
|
'threat_escalation': True, |
|
|
'multi_person_analysis': False |
|
|
} |
|
|
|
|
|
for img, weight_multiplier, img_type in images_to_process: |
|
|
if self.weapon_model_custom: |
|
|
|
|
|
knife_conf = self.config['weapon_detection']['knife_confidence'] |
|
|
gun_conf = self.config['weapon_detection']['confidence_threshold'] |
|
|
fight_conf = self.config['weapon_detection']['fight_confidence'] |
|
|
|
|
|
|
|
|
passes = [ |
|
|
(knife_conf, "knife_pass"), |
|
|
(gun_conf, "gun_pass"), |
|
|
(fight_conf, "fight_pass") |
|
|
] if self.config['weapon_detection']['multi_pass'] else [ |
|
|
(min(knife_conf, fight_conf), "single_pass")] |
|
|
|
|
|
for conf_threshold, pass_type in passes: |
|
|
try: |
|
|
results = self.weapon_model_custom( |
|
|
img, |
|
|
imgsz=imgsz, |
|
|
conf=conf_threshold, |
|
|
device=self.device, |
|
|
half=use_half, |
|
|
verbose=False, |
|
|
augment=True |
|
|
) |
|
|
|
|
|
for result in results: |
|
|
boxes = result.boxes |
|
|
if boxes is not None: |
|
|
for box in boxes: |
|
|
class_id = int(box.cls[0]) |
|
|
|
|
|
if hasattr(result, 'names') and class_id in result.names: |
|
|
class_name = result.names[class_id].lower() |
|
|
else: |
|
|
class_name = f"detection_{class_id}" |
|
|
|
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() |
|
|
confidence = float(box.conf[0]) * weight_multiplier |
|
|
|
|
|
|
|
|
if self.is_fight_detection(class_name): |
|
|
|
|
|
confidence = self.boost_fight_confidence( |
|
|
img, [x1, y1, x2, y2], confidence, class_name |
|
|
) |
|
|
|
|
|
detection_type = 'fight' |
|
|
min_conf = fight_conf |
|
|
threat_level = self.assess_fight_threat(confidence, img, [x1, y1, x2, y2]) |
|
|
|
|
|
else: |
|
|
|
|
|
if self.config['weapon_detection']['boost_knife_detection']: |
|
|
if 'dao' in class_name or 'knife' in class_name or 'blade' in class_name: |
|
|
confidence = self.boost_knife_confidence( |
|
|
img, [x1, y1, x2, y2], confidence, class_name |
|
|
) |
|
|
|
|
|
detection_type = 'weapon' |
|
|
weapon_type = self.classify_weapon_type(class_name) |
|
|
min_conf = knife_conf if weapon_type == 'blade' else gun_conf |
|
|
threat_level = self.assess_weapon_threat(weapon_type, confidence) |
|
|
|
|
|
if confidence >= min_conf: |
|
|
detection_data = { |
|
|
'type': detection_type, |
|
|
'class': class_name, |
|
|
'confidence': min(confidence, 0.99), |
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)], |
|
|
'threat_level': threat_level, |
|
|
'detection_method': f'custom_model_{img_type}_{pass_type}' |
|
|
} |
|
|
|
|
|
|
|
|
if detection_type == 'weapon': |
|
|
detection_data['weapon_type'] = weapon_type |
|
|
elif detection_type == 'fight': |
|
|
detection_data['fight_type'] = self.classify_fight_type(class_name) |
|
|
detection_data['aggression_level'] = self.assess_aggression_level( |
|
|
confidence) |
|
|
|
|
|
detections.append(detection_data) |
|
|
|
|
|
icon = "๐" if detection_type == 'fight' else "๐ฏ" |
|
|
print( |
|
|
f" {icon} Detected: {class_name} (conf: {confidence:.3f}, method: {img_type}_{pass_type})") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Detection pass error ({pass_type}): {e}") |
|
|
|
|
|
|
|
|
if self.weapon_model_general and len(detections) == 0: |
|
|
detections.extend(self.fallback_detection(image, imgsz, use_half)) |
|
|
|
|
|
|
|
|
detections = self.remove_duplicate_detections(detections) |
|
|
|
|
|
|
|
|
if self.config['fight_detection']['enabled'] and self.config['fight_detection']['multi_person_analysis']: |
|
|
fight_detections = [d for d in detections if d['type'] == 'fight'] |
|
|
if fight_detections: |
|
|
enhanced_fights = self.analyze_fight_context(image, fight_detections) |
|
|
|
|
|
detections = [d for d in detections if d['type'] != 'fight'] + enhanced_fights |
|
|
|
|
|
return detections |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Weapon and fight detection error: {e}") |
|
|
return [] |
|
|
|
|
|
def is_fight_detection(self, class_name): |
|
|
"""Check if detection is fight-related""" |
|
|
fight_keywords = ['fight', 'fighting', 'combat', 'violence', 'aggression', 'brawl', 'scuffle'] |
|
|
return any(keyword in class_name.lower() for keyword in fight_keywords) |
|
|
|
|
|
def classify_fight_type(self, class_name): |
|
|
"""Classify type of fight detected""" |
|
|
class_name = class_name.lower() |
|
|
|
|
|
if any(word in class_name for word in ['punch', 'boxing', 'fist']): |
|
|
return 'physical_combat' |
|
|
elif any(word in class_name for word in ['kick', 'martial', 'karate']): |
|
|
return 'martial_arts' |
|
|
elif any(word in class_name for word in ['wrestle', 'grapple']): |
|
|
return 'wrestling' |
|
|
elif any(word in class_name for word in ['group', 'mob', 'crowd']): |
|
|
return 'group_violence' |
|
|
else: |
|
|
return 'general_fight' |
|
|
|
|
|
def boost_fight_confidence(self, image, bbox, initial_confidence, class_name): |
|
|
"""Boost confidence for fight detection based on contextual analysis""" |
|
|
try: |
|
|
x1, y1, x2, y2 = [int(coord) for coord in bbox] |
|
|
|
|
|
|
|
|
x1 = max(0, x1) |
|
|
y1 = max(0, y1) |
|
|
x2 = min(image.shape[1], x2) |
|
|
y2 = min(image.shape[0], y2) |
|
|
|
|
|
roi = image[y1:y2, x1:x2] |
|
|
|
|
|
if roi.size == 0: |
|
|
return initial_confidence |
|
|
|
|
|
boost = 0 |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) |
|
|
blur_variance = cv2.Laplacian(gray, cv2.CV_64F).var() |
|
|
if blur_variance < 100: |
|
|
boost += 0.10 |
|
|
|
|
|
|
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
edge_density = np.count_nonzero(edges) / edges.size |
|
|
if edge_density > 0.15: |
|
|
boost += 0.08 |
|
|
|
|
|
|
|
|
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) |
|
|
color_variance = np.var(hsv[:, :, 1]) |
|
|
if color_variance > 1000: |
|
|
boost += 0.05 |
|
|
|
|
|
|
|
|
gray_f = np.float32(gray) |
|
|
texture_response = cv2.cornerHarris(gray_f, 2, 3, 0.04) |
|
|
texture_strength = np.mean(texture_response) |
|
|
if texture_strength > 0.01: |
|
|
boost += 0.07 |
|
|
|
|
|
|
|
|
height = y2 - y1 |
|
|
width = x2 - x1 |
|
|
if height > 0 and width > 0: |
|
|
aspect_ratio = max(width, height) / min(width, height) |
|
|
if 1.2 < aspect_ratio < 3.0: |
|
|
boost += 0.05 |
|
|
|
|
|
final_confidence = min(initial_confidence + boost, 0.95) |
|
|
|
|
|
if boost > 0: |
|
|
print(f" ๐ Fight boost applied: +{boost:.2f} (blur:{blur_variance:.0f}, edge:{edge_density:.2f})") |
|
|
|
|
|
return final_confidence |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Fight confidence boost error: {e}") |
|
|
return initial_confidence |
|
|
|
|
|
def assess_fight_threat(self, confidence, image, bbox): |
|
|
"""Assess threat level of detected fight""" |
|
|
base_threat = 'medium' |
|
|
|
|
|
|
|
|
if confidence >= 0.85: |
|
|
base_threat = 'critical' |
|
|
elif confidence >= 0.70: |
|
|
base_threat = 'high' |
|
|
elif confidence >= 0.50: |
|
|
base_threat = 'medium' |
|
|
else: |
|
|
base_threat = 'low' |
|
|
|
|
|
|
|
|
try: |
|
|
x1, y1, x2, y2 = bbox |
|
|
fight_area = (x2 - x1) * (y2 - y1) |
|
|
image_area = image.shape[0] * image.shape[1] |
|
|
area_ratio = fight_area / image_area |
|
|
|
|
|
|
|
|
if area_ratio > 0.5: |
|
|
if base_threat == 'medium': |
|
|
base_threat = 'high' |
|
|
elif base_threat == 'high': |
|
|
base_threat = 'critical' |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Fight threat assessment error: {e}") |
|
|
|
|
|
return base_threat |
|
|
|
|
|
def assess_aggression_level(self, confidence): |
|
|
"""Assess aggression level based on confidence""" |
|
|
if confidence >= 0.80: |
|
|
return 'extreme' |
|
|
elif confidence >= 0.65: |
|
|
return 'high' |
|
|
elif confidence >= 0.45: |
|
|
return 'moderate' |
|
|
else: |
|
|
return 'low' |
|
|
|
|
|
def analyze_fight_context(self, image, fight_detections): |
|
|
"""Enhanced analysis of fight context with multi-person detection""" |
|
|
enhanced_fights = [] |
|
|
|
|
|
try: |
|
|
|
|
|
persons = self.detect_persons(image) |
|
|
|
|
|
for fight in fight_detections: |
|
|
enhanced_fight = fight.copy() |
|
|
|
|
|
|
|
|
fight_bbox = fight['bbox'] |
|
|
people_in_fight = 0 |
|
|
people_nearby = 0 |
|
|
|
|
|
for person in persons: |
|
|
person_bbox = person['bbox'] |
|
|
|
|
|
|
|
|
overlap = self.calculate_bbox_overlap(fight_bbox, person_bbox) |
|
|
|
|
|
if overlap > 0.3: |
|
|
people_in_fight += 1 |
|
|
elif overlap > 0.1: |
|
|
people_nearby += 1 |
|
|
|
|
|
|
|
|
enhanced_fight['people_involved'] = people_in_fight |
|
|
enhanced_fight['people_nearby'] = people_nearby |
|
|
enhanced_fight['total_people'] = people_in_fight + people_nearby |
|
|
|
|
|
|
|
|
if people_in_fight >= 3: |
|
|
if enhanced_fight['threat_level'] == 'medium': |
|
|
enhanced_fight['threat_level'] = 'high' |
|
|
elif enhanced_fight['threat_level'] == 'high': |
|
|
enhanced_fight['threat_level'] = 'critical' |
|
|
enhanced_fight['fight_type'] = 'group_violence' |
|
|
|
|
|
|
|
|
enhanced_fight['context_flags'] = [] |
|
|
if people_in_fight >= 3: |
|
|
enhanced_fight['context_flags'].append('multi_person_fight') |
|
|
if people_nearby >= 2: |
|
|
enhanced_fight['context_flags'].append('crowd_present') |
|
|
|
|
|
enhanced_fights.append(enhanced_fight) |
|
|
|
|
|
print(f" ๐ฅ Fight context: {people_in_fight} involved, {people_nearby} nearby") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Fight context analysis error: {e}") |
|
|
return fight_detections |
|
|
|
|
|
return enhanced_fights |
|
|
|
|
|
def calculate_bbox_overlap(self, bbox1, bbox2): |
|
|
"""Calculate overlap ratio between two bounding boxes""" |
|
|
x1_min, y1_min, x1_max, y1_max = bbox1 |
|
|
x2_min, y2_min, x2_max, y2_max = bbox2 |
|
|
|
|
|
|
|
|
intersect_xmin = max(x1_min, x2_min) |
|
|
intersect_ymin = max(y1_min, y2_min) |
|
|
intersect_xmax = min(x1_max, x2_max) |
|
|
intersect_ymax = min(y1_max, y2_max) |
|
|
|
|
|
if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: |
|
|
return 0.0 |
|
|
|
|
|
intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) |
|
|
bbox1_area = (x1_max - x1_min) * (y1_max - y1_min) |
|
|
|
|
|
return intersect_area / bbox1_area if bbox1_area > 0 else 0 |
|
|
|
|
|
def fallback_detection(self, image, imgsz, use_half): |
|
|
"""Fallback detection using general model""" |
|
|
detections = [] |
|
|
|
|
|
try: |
|
|
general_results = self.weapon_model_general( |
|
|
image, |
|
|
imgsz=imgsz, |
|
|
conf=0.4, |
|
|
device=self.device, |
|
|
half=use_half, |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
for result in general_results: |
|
|
boxes = result.boxes |
|
|
if boxes is not None: |
|
|
for box in boxes: |
|
|
class_id = int(box.cls[0]) |
|
|
class_name = result.names[class_id].lower() |
|
|
|
|
|
|
|
|
weapon_keywords = ['knife', 'scissors', 'fork'] |
|
|
|
|
|
if any(keyword in class_name for keyword in weapon_keywords): |
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() |
|
|
confidence = float(box.conf[0]) |
|
|
|
|
|
detections.append({ |
|
|
'type': 'weapon', |
|
|
'class': class_name, |
|
|
'weapon_type': 'blade', |
|
|
'confidence': confidence, |
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)], |
|
|
'threat_level': self.assess_weapon_threat('blade', confidence), |
|
|
'detection_method': 'general_model_fallback' |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ General detection error: {e}") |
|
|
|
|
|
return detections |
|
|
|
|
|
def enhance_knife_detection(self, image): |
|
|
"""Enhance image specifically for better knife/dao detection""" |
|
|
try: |
|
|
|
|
|
enhanced = cv2.convertScaleAbs(image, alpha=1.4, beta=25) |
|
|
|
|
|
|
|
|
kernel_sharpen = np.array([[-1, -1, -1], |
|
|
[-1, 9, -1], |
|
|
[-1, -1, -1]]) |
|
|
sharpened = cv2.filter2D(enhanced, -1, kernel_sharpen) |
|
|
|
|
|
|
|
|
lab = cv2.cvtColor(sharpened, cv2.COLOR_BGR2LAB) |
|
|
l, a, b = cv2.split(lab) |
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) |
|
|
l = clahe.apply(l) |
|
|
enhanced_final = cv2.merge([l, a, b]) |
|
|
enhanced_final = cv2.cvtColor(enhanced_final, cv2.COLOR_LAB2BGR) |
|
|
|
|
|
return enhanced_final |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Enhancement failed: {e}") |
|
|
return image |
|
|
|
|
|
def boost_knife_confidence(self, image, bbox, initial_confidence, class_name): |
|
|
"""Boost confidence for knife/dao based on geometric and visual features""" |
|
|
try: |
|
|
x1, y1, x2, y2 = [int(coord) for coord in bbox] |
|
|
|
|
|
|
|
|
x1 = max(0, x1) |
|
|
y1 = max(0, y1) |
|
|
x2 = min(image.shape[1], x2) |
|
|
y2 = min(image.shape[0], y2) |
|
|
|
|
|
roi = image[y1:y2, x1:x2] |
|
|
|
|
|
if roi.size == 0: |
|
|
return initial_confidence |
|
|
|
|
|
boost = 0 |
|
|
|
|
|
|
|
|
height = y2 - y1 |
|
|
width = x2 - x1 |
|
|
if height > 0 and width > 0: |
|
|
aspect_ratio = max(width, height) / min(width, height) |
|
|
if aspect_ratio > 2.5: |
|
|
boost += 0.15 |
|
|
elif aspect_ratio > 2.0: |
|
|
boost += 0.10 |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) |
|
|
mean_brightness = np.mean(gray) |
|
|
std_brightness = np.std(gray) |
|
|
|
|
|
if mean_brightness > 140: |
|
|
boost += 0.10 |
|
|
if std_brightness > 50: |
|
|
boost += 0.05 |
|
|
|
|
|
|
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
edge_ratio = np.count_nonzero(edges) / edges.size |
|
|
if edge_ratio > 0.15: |
|
|
boost += 0.10 |
|
|
elif edge_ratio > 0.10: |
|
|
boost += 0.05 |
|
|
|
|
|
|
|
|
if height > width: |
|
|
gradient = np.gradient(gray, axis=0) |
|
|
else: |
|
|
gradient = np.gradient(gray, axis=1) |
|
|
|
|
|
gradient_strength = np.mean(np.abs(gradient)) |
|
|
if gradient_strength > 10: |
|
|
boost += 0.05 |
|
|
|
|
|
|
|
|
if 'dao' in class_name.lower() or 'knife' in class_name.lower(): |
|
|
boost *= 1.2 |
|
|
|
|
|
final_confidence = min(initial_confidence + boost, 0.95) |
|
|
|
|
|
if boost > 0: |
|
|
print( |
|
|
f" ๐ช Knife boost applied: +{boost:.2f} (AR:{aspect_ratio:.1f}, Bright:{mean_brightness:.0f}, Edge:{edge_ratio:.2f})") |
|
|
|
|
|
return final_confidence |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Confidence boost error: {e}") |
|
|
return initial_confidence |
|
|
|
|
|
def detect_persons(self, image): |
|
|
"""Detect persons using general model (needed for NSFW and fight analysis)""" |
|
|
persons = [] |
|
|
|
|
|
if not self.weapon_model_general: |
|
|
return persons |
|
|
|
|
|
try: |
|
|
imgsz = self.config['performance']['image_size'] |
|
|
use_half = self.config['performance']['half_precision'] and self.device == 'cuda' |
|
|
|
|
|
results = self.weapon_model_general( |
|
|
image, |
|
|
imgsz=imgsz, |
|
|
conf=0.3, |
|
|
device=self.device, |
|
|
half=use_half, |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
for result in results: |
|
|
boxes = result.boxes |
|
|
if boxes is not None: |
|
|
for box in boxes: |
|
|
class_id = int(box.cls[0]) |
|
|
class_name = result.names[class_id].lower() |
|
|
|
|
|
if class_name == 'person': |
|
|
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() |
|
|
confidence = float(box.conf[0]) |
|
|
|
|
|
persons.append({ |
|
|
'class': 'person', |
|
|
'confidence': confidence, |
|
|
'bbox': [int(x1), int(y1), int(x2), int(y2)] |
|
|
}) |
|
|
|
|
|
return persons |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Person detection error: {e}") |
|
|
return [] |
|
|
|
|
|
def classify_weapon_type(self, class_name): |
|
|
"""Classify weapon type from class name""" |
|
|
class_name = class_name.lower() |
|
|
|
|
|
|
|
|
knife_keywords = ['knife', 'dao', 'blade', 'dagger', 'sword', 'machete', 'katana', 'cutter'] |
|
|
if any(keyword in class_name for keyword in knife_keywords): |
|
|
return 'blade' |
|
|
|
|
|
|
|
|
gun_keywords = ['gun', 'pistol', 'rifle', 'firearm', 'revolver', 'shotgun', 'sรบng'] |
|
|
if any(keyword in class_name for keyword in gun_keywords): |
|
|
return 'firearm' |
|
|
|
|
|
|
|
|
other_keywords = ['axe', 'hammer', 'club', 'bat'] |
|
|
if any(keyword in class_name for keyword in other_keywords): |
|
|
return 'blunt_weapon' |
|
|
|
|
|
|
|
|
if 'weapon' in class_name: |
|
|
try: |
|
|
weapon_id = int(class_name.split('_')[-1]) |
|
|
if weapon_id in [0, 1]: |
|
|
return 'firearm' |
|
|
elif weapon_id in [2, 3]: |
|
|
return 'blade' |
|
|
else: |
|
|
return 'unknown_weapon' |
|
|
except: |
|
|
pass |
|
|
|
|
|
return 'unknown_weapon' |
|
|
|
|
|
def assess_weapon_threat(self, weapon_type, confidence): |
|
|
"""Assess threat level of detected weapon""" |
|
|
threat_levels = { |
|
|
'firearm': 'critical', |
|
|
'blade': 'high', |
|
|
'blunt_weapon': 'medium', |
|
|
'unknown_weapon': 'medium' |
|
|
} |
|
|
|
|
|
base_threat = threat_levels.get(weapon_type, 'medium') |
|
|
|
|
|
|
|
|
if confidence >= 0.9: |
|
|
if base_threat == 'medium': |
|
|
return 'high' |
|
|
elif base_threat == 'high': |
|
|
return 'critical' |
|
|
else: |
|
|
return base_threat |
|
|
elif confidence >= 0.7: |
|
|
return base_threat |
|
|
elif confidence >= 0.5: |
|
|
if base_threat == 'critical': |
|
|
return 'high' |
|
|
elif base_threat == 'high': |
|
|
return 'medium' |
|
|
else: |
|
|
return base_threat |
|
|
else: |
|
|
if base_threat == 'critical': |
|
|
return 'medium' |
|
|
elif base_threat == 'high': |
|
|
return 'low' |
|
|
else: |
|
|
return 'low' |
|
|
|
|
|
def remove_duplicate_detections(self, detections, iou_threshold=0.4): |
|
|
"""Remove duplicate detections using Non-Maximum Suppression""" |
|
|
if len(detections) <= 1: |
|
|
return detections |
|
|
|
|
|
|
|
|
detections = sorted(detections, key=lambda x: x['confidence'], reverse=True) |
|
|
|
|
|
keep = [] |
|
|
for i, det1 in enumerate(detections): |
|
|
should_keep = True |
|
|
for det2 in keep: |
|
|
|
|
|
if det1['type'] == det2['type']: |
|
|
iou = self.calculate_iou(det1['bbox'], det2['bbox']) |
|
|
if iou > iou_threshold: |
|
|
should_keep = False |
|
|
break |
|
|
|
|
|
if should_keep: |
|
|
keep.append(det1) |
|
|
|
|
|
return keep |
|
|
|
|
|
def calculate_iou(self, box1, box2): |
|
|
"""Calculate Intersection over Union between two bounding boxes""" |
|
|
x1_min, y1_min, x1_max, y1_max = box1 |
|
|
x2_min, y2_min, x2_max, y2_max = box2 |
|
|
|
|
|
|
|
|
intersect_xmin = max(x1_min, x2_min) |
|
|
intersect_ymin = max(y1_min, y2_min) |
|
|
intersect_xmax = min(x1_max, x2_max) |
|
|
intersect_ymax = min(y1_max, y2_max) |
|
|
|
|
|
if intersect_xmax < intersect_xmin or intersect_ymax < intersect_ymin: |
|
|
return 0.0 |
|
|
|
|
|
intersect_area = (intersect_xmax - intersect_xmin) * (intersect_ymax - intersect_ymin) |
|
|
|
|
|
|
|
|
box1_area = (x1_max - x1_min) * (y1_max - y1_min) |
|
|
box2_area = (x2_max - x2_min) * (y2_max - y2_min) |
|
|
union_area = box1_area + box2_area - intersect_area |
|
|
|
|
|
return intersect_area / union_area if union_area > 0 else 0 |
|
|
|
|
|
def detect_nsfw_content(self, image): |
|
|
"""Enhanced NSFW detection with person detection first""" |
|
|
detections = [] |
|
|
|
|
|
try: |
|
|
if len(image.shape) == 3 and image.shape[2] == 3: |
|
|
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
else: |
|
|
rgb_image = image |
|
|
|
|
|
|
|
|
persons = self.detect_persons(image) |
|
|
|
|
|
if not persons: |
|
|
|
|
|
return detections |
|
|
|
|
|
print(f"๐ค Found {len(persons)} person(s), analyzing for NSFW content...") |
|
|
|
|
|
|
|
|
if self.nsfw_classifier: |
|
|
try: |
|
|
pil_image = Image.fromarray(rgb_image) |
|
|
nsfw_result = self.nsfw_classifier(pil_image) |
|
|
|
|
|
if nsfw_result[0]['label'] == 'nsfw': |
|
|
confidence = nsfw_result[0]['score'] |
|
|
if confidence > self.config['nsfw_detection']['confidence_threshold']: |
|
|
detections.append({ |
|
|
'type': 'nsfw', |
|
|
'class': 'inappropriate_content', |
|
|
'confidence': confidence, |
|
|
'bbox': [0, 0, image.shape[1], image.shape[0]], |
|
|
'method': 'classification' |
|
|
}) |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ NSFW classifier error: {e}") |
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['skin_detection']: |
|
|
for person in persons: |
|
|
person_detections = self.analyze_person_skin(image, person) |
|
|
detections.extend(person_detections) |
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['region_analysis'] and len(detections) == 0: |
|
|
skin_detections = self.detect_skin_regions(image) |
|
|
detections.extend(skin_detections) |
|
|
|
|
|
return detections |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ NSFW detection error: {e}") |
|
|
return [] |
|
|
|
|
|
def analyze_person_skin(self, image, person): |
|
|
"""Analyze skin exposure for a specific person""" |
|
|
detections = [] |
|
|
|
|
|
try: |
|
|
x1, y1, x2, y2 = person['bbox'] |
|
|
person_region = image[y1:y2, x1:x2] |
|
|
|
|
|
if person_region.size == 0: |
|
|
return detections |
|
|
|
|
|
|
|
|
hsv_person = cv2.cvtColor(person_region, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
|
|
|
lower_skin = np.array([0, 20, 70], dtype=np.uint8) |
|
|
upper_skin = np.array([20, 255, 255], dtype=np.uint8) |
|
|
|
|
|
|
|
|
skin_mask = cv2.inRange(hsv_person, lower_skin, upper_skin) |
|
|
|
|
|
|
|
|
total_person_pixels = person_region.shape[0] * person_region.shape[1] |
|
|
skin_pixels = cv2.countNonZero(skin_mask) |
|
|
skin_ratio = skin_pixels / total_person_pixels if total_person_pixels > 0 else 0 |
|
|
|
|
|
|
|
|
if skin_ratio > 0.4: |
|
|
confidence = min(skin_ratio * 2, 1.0) |
|
|
|
|
|
detections.append({ |
|
|
'type': 'nsfw', |
|
|
'class': 'excessive_skin_exposure', |
|
|
'confidence': confidence, |
|
|
'bbox': [x1, y1, x2, y2], |
|
|
'method': 'person_skin_analysis', |
|
|
'skin_ratio': skin_ratio |
|
|
}) |
|
|
|
|
|
print(f"๐จ Excessive skin exposure detected: {skin_ratio:.2f} ratio") |
|
|
|
|
|
return detections |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Person skin analysis error: {e}") |
|
|
return [] |
|
|
|
|
|
def detect_skin_regions(self, image): |
|
|
"""Detect large skin-colored regions""" |
|
|
try: |
|
|
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
|
|
|
lower_skin = np.array([0, 20, 70], dtype=np.uint8) |
|
|
upper_skin = np.array([20, 255, 255], dtype=np.uint8) |
|
|
|
|
|
|
|
|
skin_mask = cv2.inRange(hsv, lower_skin, upper_skin) |
|
|
|
|
|
|
|
|
kernel = np.ones((3, 3), np.uint8) |
|
|
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel) |
|
|
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel) |
|
|
|
|
|
|
|
|
contours, _ = cv2.findContours(skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
|
|
detections = [] |
|
|
image_area = image.shape[0] * image.shape[1] |
|
|
|
|
|
for contour in contours: |
|
|
area = cv2.contourArea(contour) |
|
|
|
|
|
|
|
|
if area > image_area * 0.3: |
|
|
x, y, w, h = cv2.boundingRect(contour) |
|
|
confidence = min(area / image_area, 1.0) |
|
|
|
|
|
detections.append({ |
|
|
'type': 'nsfw', |
|
|
'class': 'large_skin_region', |
|
|
'confidence': confidence, |
|
|
'bbox': [x, y, x + w, y + h], |
|
|
'method': 'skin_detection' |
|
|
}) |
|
|
|
|
|
return detections |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Skin detection error: {e}") |
|
|
return [] |
|
|
|
|
|
def setup_nsfw_detector(self): |
|
|
"""Setup NSFW detection components (Optimized for CPU)""" |
|
|
try: |
|
|
print("๐ Loading NSFW detection components...") |
|
|
|
|
|
|
|
|
try: |
|
|
device_id = 0 if self.device == 'cuda' else -1 |
|
|
self.nsfw_classifier = pipeline( |
|
|
"image-classification", |
|
|
model="Falconsai/nsfw_image_detection", |
|
|
device=device_id, |
|
|
use_fast=True |
|
|
) |
|
|
print("โ
NSFW classifier loaded") |
|
|
except Exception as nsfw_error: |
|
|
print(f"โ ๏ธ NSFW classifier failed: {nsfw_error}") |
|
|
print(" Trying backup method...") |
|
|
try: |
|
|
|
|
|
self.nsfw_classifier = pipeline( |
|
|
"image-classification", |
|
|
model="Falconsai/nsfw_image_detection", |
|
|
device=device_id |
|
|
) |
|
|
print("โ
NSFW classifier loaded (fallback)") |
|
|
except: |
|
|
print("โ NSFW classifier completely failed") |
|
|
self.nsfw_classifier = None |
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['pose_analysis'] and MEDIAPIPE_AVAILABLE: |
|
|
try: |
|
|
import mediapipe as mp |
|
|
try: |
|
|
mp_pose = mp.solutions.pose |
|
|
self.pose_detector = mp_pose.Pose( |
|
|
static_image_mode=True, |
|
|
model_complexity=0, |
|
|
min_detection_confidence=0.5 |
|
|
) |
|
|
print("โ
Pose detector loaded (legacy API)") |
|
|
except AttributeError: |
|
|
print("โ ๏ธ MediaPipe API not available") |
|
|
self.pose_detector = None |
|
|
self.config['nsfw_detection']['pose_analysis'] = False |
|
|
|
|
|
except Exception as pose_error: |
|
|
print(f"โ ๏ธ Pose detection failed: {pose_error}") |
|
|
self.pose_detector = None |
|
|
self.config['nsfw_detection']['pose_analysis'] = False |
|
|
else: |
|
|
self.pose_detector = None |
|
|
if not MEDIAPIPE_AVAILABLE: |
|
|
print("โ ๏ธ MediaPipe not available - pose analysis disabled") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Error loading NSFW components: {e}") |
|
|
print("๐ก Falling back to skin detection only") |
|
|
|
|
|
def process_image(self, image_path): |
|
|
"""Process single image with enhanced detection including fights""" |
|
|
try: |
|
|
|
|
|
if isinstance(image_path, str): |
|
|
image = cv2.imread(image_path) |
|
|
if image is None: |
|
|
raise ValueError(f"Could not load image: {image_path}") |
|
|
cache_key = f"file_{image_path}" |
|
|
else: |
|
|
image = image_path |
|
|
cache_key = f"array_{hash(image.tobytes())}" |
|
|
|
|
|
|
|
|
import time |
|
|
current_time = time.time() |
|
|
if cache_key in self.detection_cache: |
|
|
cached_result, timestamp = self.detection_cache[cache_key] |
|
|
if current_time - timestamp < self.cache_ttl: |
|
|
return cached_result |
|
|
|
|
|
print(f"๐ธ Processing image: {image.shape}") |
|
|
|
|
|
|
|
|
all_detections = [] |
|
|
|
|
|
|
|
|
if self.config['weapon_detection']['enabled']: |
|
|
weapon_fight_detections = self.detect_weapons(image) |
|
|
all_detections.extend(weapon_fight_detections) |
|
|
|
|
|
weapon_detections = [d for d in weapon_fight_detections if d['type'] == 'weapon'] |
|
|
fight_detections = [d for d in weapon_fight_detections if d['type'] == 'fight'] |
|
|
|
|
|
print(f"๐ซ Found {len(weapon_detections)} weapon(s)") |
|
|
print(f"๐ Found {len(fight_detections)} fight(s)") |
|
|
|
|
|
|
|
|
if weapon_detections: |
|
|
knife_detections = [d for d in weapon_detections if d['weapon_type'] == 'blade'] |
|
|
if knife_detections: |
|
|
print(f" ๐ช Including {len(knife_detections)} knife/dao detection(s)") |
|
|
|
|
|
if fight_detections: |
|
|
for fight in fight_detections: |
|
|
fight_type = fight.get('fight_type', 'unknown') |
|
|
aggression = fight.get('aggression_level', 'unknown') |
|
|
print(f" ๐ Fight: {fight_type} (aggression: {aggression})") |
|
|
|
|
|
|
|
|
if self.config['nsfw_detection']['enabled']: |
|
|
nsfw_detections = self.detect_nsfw_content(image) |
|
|
all_detections.extend(nsfw_detections) |
|
|
print(f"๐ Found {len(nsfw_detections)} NSFW detection(s)") |
|
|
|
|
|
|
|
|
result = { |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'image_path': image_path if isinstance(image_path, str) else 'array', |
|
|
'detections': all_detections, |
|
|
'total_threats': len(all_detections), |
|
|
'risk_level': self.calculate_risk_level(all_detections), |
|
|
'action_required': len(all_detections) > 0, |
|
|
'processing_method': 'enhanced_dual_model_with_fight', |
|
|
'detection_breakdown': { |
|
|
'weapons': len([d for d in all_detections if d['type'] == 'weapon']), |
|
|
'fights': len([d for d in all_detections if d['type'] == 'fight']), |
|
|
'nsfw': len([d for d in all_detections if d['type'] == 'nsfw']) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.detection_cache[cache_key] = (result, current_time) |
|
|
|
|
|
|
|
|
self.clean_cache(current_time) |
|
|
|
|
|
|
|
|
self.detection_history.append(result) |
|
|
|
|
|
|
|
|
if self.config['output']['draw_boxes'] and all_detections: |
|
|
annotated_image = self.draw_detections(image.copy(), all_detections) |
|
|
result['annotated_image'] = annotated_image |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Error processing image: {e}") |
|
|
return None |
|
|
|
|
|
def clean_cache(self, current_time): |
|
|
"""Clean expired cache entries""" |
|
|
try: |
|
|
expired_keys = [] |
|
|
for key, value in self.detection_cache.items(): |
|
|
|
|
|
if isinstance(value, tuple) and len(value) == 2: |
|
|
_, timestamp = value |
|
|
if timestamp is not None and current_time - timestamp > self.cache_ttl: |
|
|
expired_keys.append(key) |
|
|
else: |
|
|
|
|
|
expired_keys.append(key) |
|
|
|
|
|
for key in expired_keys: |
|
|
del self.detection_cache[key] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Cache cleanup error: {e}") |
|
|
|
|
|
def get_model_status(self): |
|
|
"""Get status of all models""" |
|
|
status = { |
|
|
'custom_weapon_fight_model': self.weapon_model_custom is not None, |
|
|
'general_model': self.weapon_model_general is not None, |
|
|
'nsfw_classifier': self.nsfw_classifier is not None, |
|
|
'pose_detector': self.pose_detector is not None, |
|
|
'device': self.device, |
|
|
'cache_size': len(self.detection_cache), |
|
|
'knife_enhancement': self.config['weapon_detection']['use_enhancement'], |
|
|
'knife_boost': self.config['weapon_detection']['boost_knife_detection'], |
|
|
'fight_detection': self.config['weapon_detection']['fight_detection'], |
|
|
'fight_analysis': self.config['weapon_detection']['fight_analysis'] |
|
|
} |
|
|
|
|
|
if self.weapon_model_custom and hasattr(self.weapon_model_custom, 'names'): |
|
|
status['custom_classes'] = list(self.weapon_model_custom.names.values()) |
|
|
|
|
|
return status |
|
|
|
|
|
def calculate_risk_level(self, detections): |
|
|
"""Calculate overall risk level including fights""" |
|
|
if not detections: |
|
|
return 'safe' |
|
|
|
|
|
max_confidence = max(det['confidence'] for det in detections) |
|
|
threat_types = set(det['type'] for det in detections) |
|
|
|
|
|
|
|
|
has_weapons = 'weapon' in threat_types |
|
|
has_fights = 'fight' in threat_types |
|
|
has_nsfw = 'nsfw' in threat_types |
|
|
|
|
|
|
|
|
if has_weapons and has_fights: |
|
|
return 'critical' |
|
|
|
|
|
|
|
|
fight_detections = [d for d in detections if d['type'] == 'fight'] |
|
|
if fight_detections: |
|
|
max_fight_confidence = max(f['confidence'] for f in fight_detections) |
|
|
if max_fight_confidence > 0.8: |
|
|
return 'critical' |
|
|
elif max_fight_confidence > 0.65: |
|
|
return 'high' |
|
|
|
|
|
|
|
|
if has_weapons and max_confidence > 0.8: |
|
|
return 'critical' |
|
|
elif has_weapons or has_fights or max_confidence > 0.9: |
|
|
return 'high' |
|
|
elif max_confidence > 0.7: |
|
|
return 'medium' |
|
|
else: |
|
|
return 'low' |
|
|
|
|
|
def draw_detections(self, image, detections): |
|
|
"""Draw detection boxes and labels with enhanced visualization for fights""" |
|
|
try: |
|
|
colors = { |
|
|
'weapon': (0, 0, 255), |
|
|
'fight': (0, 165, 255), |
|
|
'nsfw': (255, 0, 255), |
|
|
} |
|
|
|
|
|
|
|
|
weapon_colors = { |
|
|
'blade': (0, 100, 255), |
|
|
'firearm': (0, 0, 255), |
|
|
'blunt_weapon': (100, 0, 255) |
|
|
} |
|
|
|
|
|
|
|
|
fight_colors = { |
|
|
'physical_combat': (0, 140, 255), |
|
|
'martial_arts': (0, 200, 255), |
|
|
'wrestling': (0, 165, 255), |
|
|
'group_violence': (0, 69, 255), |
|
|
'general_fight': (0, 165, 255) |
|
|
} |
|
|
|
|
|
for det in detections: |
|
|
x1, y1, x2, y2 = det['bbox'] |
|
|
|
|
|
|
|
|
if det['type'] == 'weapon' and 'weapon_type' in det: |
|
|
color = weapon_colors.get(det['weapon_type'], colors['weapon']) |
|
|
elif det['type'] == 'fight' and 'fight_type' in det: |
|
|
color = fight_colors.get(det['fight_type'], colors['fight']) |
|
|
else: |
|
|
color = colors.get(det['type'], (0, 255, 0)) |
|
|
|
|
|
|
|
|
thickness = 4 if det.get('threat_level') == 'critical' else 3 if det['type'] in ['weapon', |
|
|
'fight'] else 2 |
|
|
cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness) |
|
|
|
|
|
|
|
|
if det['type'] == 'weapon': |
|
|
label = f"{det['class']} ({det['confidence']:.2f})" |
|
|
if 'threat_level' in det: |
|
|
label += f" [{det['threat_level']}]" |
|
|
elif det['type'] == 'fight': |
|
|
label = f"FIGHT: {det['class']} ({det['confidence']:.2f})" |
|
|
if 'threat_level' in det: |
|
|
label += f" [{det['threat_level']}]" |
|
|
if 'aggression_level' in det: |
|
|
label += f" {det['aggression_level']}" |
|
|
else: |
|
|
label = f"{det['type']}: {det['class']} ({det['confidence']:.2f})" |
|
|
|
|
|
|
|
|
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] |
|
|
cv2.rectangle(image, (x1, y1 - 25), (x1 + label_size[0] + 5, y1), color, -1) |
|
|
|
|
|
|
|
|
cv2.putText(image, label, (x1 + 2, y1 - 7), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) |
|
|
|
|
|
|
|
|
if det['type'] == 'fight': |
|
|
context_text = [] |
|
|
if 'people_involved' in det and det['people_involved'] > 0: |
|
|
context_text.append(f"People: {det['people_involved']}") |
|
|
if 'context_flags' in det and det['context_flags']: |
|
|
context_text.append(f"Flags: {', '.join(det['context_flags'])}") |
|
|
|
|
|
if context_text: |
|
|
context_label = " | ".join(context_text) |
|
|
cv2.putText(image, context_label, (x1, y2 + 15), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) |
|
|
|
|
|
|
|
|
if 'detection_method' in det: |
|
|
method = det['detection_method'].split('_')[-1] |
|
|
cv2.putText(image, method, (x1, y2 + 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) |
|
|
|
|
|
return image |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Error drawing detections: {e}") |
|
|
return image |
|
|
|
|
|
def process_video(self, video_path, output_path=None): |
|
|
"""Process video file with enhanced detection including fights - processes every frame""" |
|
|
try: |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
frame_count = 0 |
|
|
total_detections = [] |
|
|
fight_timeline = [] |
|
|
|
|
|
if output_path: |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
|
|
|
result = self.process_image(frame) |
|
|
if result and result['detections']: |
|
|
|
|
|
for detection in result['detections']: |
|
|
detection['frame'] = frame_count |
|
|
|
|
|
total_detections.extend(result['detections']) |
|
|
|
|
|
|
|
|
fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] |
|
|
if fight_detections: |
|
|
timestamp = frame_count / cap.get(cv2.CAP_PROP_FPS) |
|
|
fight_timeline.append({ |
|
|
'timestamp': timestamp, |
|
|
'frame': frame_count, |
|
|
'fights': len(fight_detections), |
|
|
'max_aggression': max(f.get('aggression_level', 'low') for f in fight_detections) |
|
|
}) |
|
|
|
|
|
print(f"โ ๏ธ Frame {frame_count}: {len(result['detections'])} threats detected") |
|
|
|
|
|
breakdown = result.get('detection_breakdown', {}) |
|
|
if breakdown.get('fights', 0) > 0: |
|
|
print(f" ๐ Fights: {breakdown['fights']}") |
|
|
|
|
|
if output_path and 'annotated_image' in result: |
|
|
out.write(result['annotated_image']) |
|
|
elif output_path: |
|
|
out.write(frame) |
|
|
else: |
|
|
if output_path: |
|
|
out.write(frame) |
|
|
|
|
|
cap.release() |
|
|
if output_path: |
|
|
out.release() |
|
|
|
|
|
|
|
|
fight_analysis = {} |
|
|
if fight_timeline: |
|
|
fight_analysis = { |
|
|
'total_fight_incidents': len(fight_timeline), |
|
|
'first_fight_time': fight_timeline[0]['timestamp'], |
|
|
'last_fight_time': fight_timeline[-1]['timestamp'], |
|
|
'peak_aggression_time': max(fight_timeline, key=lambda x: x['max_aggression'])['timestamp'], |
|
|
'fight_duration_coverage': fight_timeline[-1]['timestamp'] - fight_timeline[0]['timestamp'] if len( |
|
|
fight_timeline) > 1 else 0 |
|
|
} |
|
|
|
|
|
return { |
|
|
'total_frames_processed': frame_count, |
|
|
'total_detections': len(total_detections), |
|
|
'detections': total_detections, |
|
|
'fight_timeline': fight_timeline, |
|
|
'fight_analysis': fight_analysis, |
|
|
'detection_breakdown': { |
|
|
'weapons': len([d for d in total_detections if d['type'] == 'weapon']), |
|
|
'fights': len([d for d in total_detections if d['type'] == 'fight']), |
|
|
'nsfw': len([d for d in total_detections if d['type'] == 'nsfw']) |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Error processing video: {e}") |
|
|
return None |
|
|
|
|
|
def save_report(self, filename="detection_report.json"): |
|
|
"""Save detection history to file""" |
|
|
try: |
|
|
with open(filename, 'w') as f: |
|
|
json.dump(self.detection_history, f, indent=2, default=str) |
|
|
print(f"๐ Report saved to {filename}") |
|
|
except Exception as e: |
|
|
print(f"โ Error saving report: {e}") |
|
|
|
|
|
def get_memory_usage(self): |
|
|
"""Get current GPU memory usage""" |
|
|
if torch.cuda.is_available(): |
|
|
allocated = torch.cuda.memory_allocated() / 1024 ** 3 |
|
|
cached = torch.cuda.memory_reserved() / 1024 ** 3 |
|
|
return f"GPU Memory: {allocated:.2f}GB allocated, {cached:.2f}GB cached" |
|
|
return "CPU mode" |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Enhanced example usage with knife and fight detection improvements - processes every frame""" |
|
|
|
|
|
|
|
|
moderator = ContentModerator() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("๐ฏ ENHANCED DUAL MODEL SYSTEM WITH FIGHT DETECTION") |
|
|
print("=" * 60) |
|
|
|
|
|
status = moderator.get_model_status() |
|
|
|
|
|
if status['custom_weapon_fight_model']: |
|
|
print("โ
Custom YOLO11 Model (dao + sรบng + fight): LOADED") |
|
|
if 'custom_classes' in status: |
|
|
print(f"๐ Custom classes: {status['custom_classes']}") |
|
|
else: |
|
|
print("โ Custom weapon+fight model: NOT FOUND") |
|
|
|
|
|
if status['general_model']: |
|
|
print("โ
General YOLO11n Model (person detection): LOADED") |
|
|
else: |
|
|
print("โ General model: FAILED") |
|
|
|
|
|
if status['nsfw_classifier']: |
|
|
print("โ
NSFW Classifier: LOADED") |
|
|
else: |
|
|
print("โ NSFW Classifier: FAILED") |
|
|
|
|
|
print(f"๐ฅ๏ธ Device: {status['device']}") |
|
|
print(f"๐๏ธ Cache system: ENABLED") |
|
|
print(f"๐ช Knife enhancement: {'ENABLED' if status['knife_enhancement'] else 'DISABLED'}") |
|
|
print(f"๐ Knife confidence boost: {'ENABLED' if status['knife_boost'] else 'DISABLED'}") |
|
|
print(f"๐ Fight detection: {'ENABLED' if status['fight_detection'] else 'DISABLED'}") |
|
|
print(f"๐ง Fight analysis: {'ENABLED' if status['fight_analysis'] else 'DISABLED'}") |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("โจ ENHANCED DETECTION FEATURES") |
|
|
print("=" * 60) |
|
|
print("๐ง Image Enhancement:") |
|
|
print(" - Contrast & brightness optimization") |
|
|
print(" - Edge sharpening for metallic objects") |
|
|
print(" - CLAHE for local contrast") |
|
|
print("๐ Confidence Boosting:") |
|
|
print(" - Geometric analysis (knives)") |
|
|
print(" - Motion blur analysis (fights)") |
|
|
print(" - Edge strength analysis") |
|
|
print("๐ฏ Multi-pass Detection:") |
|
|
print(" - Low threshold pass for knives (0.45)") |
|
|
print(" - Normal threshold for guns (0.45)") |
|
|
print(" - Low threshold for fights (0.40)") |
|
|
print("๐ Fight Analysis:") |
|
|
print(" - Multi-person fight detection") |
|
|
print(" - Aggression level assessment") |
|
|
print(" - Context-aware threat escalation") |
|
|
|
|
|
|
|
|
print("\n" + "=" * 50) |
|
|
print("๐ผ๏ธ SINGLE IMAGE PROCESSING") |
|
|
print("=" * 50) |
|
|
|
|
|
test_image = "test_image.jpg" |
|
|
|
|
|
if os.path.exists(test_image): |
|
|
result = moderator.process_image(test_image) |
|
|
if result: |
|
|
print(f"\n๐ DETECTION RESULTS:") |
|
|
print(f"Risk Level: {result['risk_level']}") |
|
|
print(f"Total Threats: {result['total_threats']}") |
|
|
print(f"Processing Method: {result.get('processing_method', 'standard')}") |
|
|
|
|
|
breakdown = result.get('detection_breakdown', {}) |
|
|
if breakdown: |
|
|
print(f"\n๐ BREAKDOWN:") |
|
|
print(f" Weapons: {breakdown.get('weapons', 0)}") |
|
|
print(f" Fights: {breakdown.get('fights', 0)}") |
|
|
print(f" NSFW: {breakdown.get('nsfw', 0)}") |
|
|
|
|
|
|
|
|
weapon_detections = [d for d in result['detections'] if d['type'] == 'weapon'] |
|
|
if weapon_detections: |
|
|
print(f"\n๐ซ WEAPON DETECTIONS: {len(weapon_detections)}") |
|
|
for i, detection in enumerate(weapon_detections): |
|
|
method = detection.get('detection_method', 'unknown') |
|
|
print(f" Weapon {i + 1} ({method}):") |
|
|
print(f" Class: {detection['class']}") |
|
|
print(f" Type: {detection['weapon_type']}") |
|
|
print(f" Confidence: {detection['confidence']:.3f}") |
|
|
print(f" Threat Level: {detection['threat_level']}") |
|
|
|
|
|
|
|
|
fight_detections = [d for d in result['detections'] if d['type'] == 'fight'] |
|
|
if fight_detections: |
|
|
print(f"\n๐ FIGHT DETECTIONS: {len(fight_detections)}") |
|
|
for i, detection in enumerate(fight_detections): |
|
|
method = detection.get('detection_method', 'unknown') |
|
|
print(f" Fight {i + 1} ({method}):") |
|
|
print(f" Class: {detection['class']}") |
|
|
print(f" Type: {detection.get('fight_type', 'unknown')}") |
|
|
print(f" Confidence: {detection['confidence']:.3f}") |
|
|
print(f" Threat Level: {detection['threat_level']}") |
|
|
print(f" Aggression: {detection.get('aggression_level', 'unknown')}") |
|
|
if 'people_involved' in detection: |
|
|
print(f" People Involved: {detection['people_involved']}") |
|
|
if 'context_flags' in detection and detection['context_flags']: |
|
|
print(f" Context: {', '.join(detection['context_flags'])}") |
|
|
|
|
|
|
|
|
nsfw_detections = [d for d in result['detections'] if d['type'] == 'nsfw'] |
|
|
if nsfw_detections: |
|
|
print(f"\n๐ NSFW DETECTIONS: {len(nsfw_detections)}") |
|
|
for i, detection in enumerate(nsfw_detections): |
|
|
method = detection.get('method', 'unknown') |
|
|
print(f" NSFW {i + 1} ({method}):") |
|
|
print(f" Class: {detection['class']}") |
|
|
print(f" Confidence: {detection['confidence']:.3f}") |
|
|
if 'skin_ratio' in detection: |
|
|
print(f" Skin Ratio: {detection['skin_ratio']:.2f}") |
|
|
else: |
|
|
print(f"โ ๏ธ Test image not found: {test_image}") |
|
|
print("Creating a test pattern to demonstrate detection...") |
|
|
|
|
|
|
|
|
test_img = np.ones((640, 640, 3), dtype=np.uint8) * 128 |
|
|
cv2.putText(test_img, "Test Pattern", (200, 320), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3) |
|
|
|
|
|
result = moderator.process_image(test_img) |
|
|
print("โ
Test pattern processed successfully") |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("๐น ENHANCED WEBCAM PROCESSING WITH FIGHT DETECTION") |
|
|
print("=" * 60) |
|
|
print("Starting enhanced detection on webcam...") |
|
|
print("๐ฎ Controls:") |
|
|
print(" - Press 'q' to quit") |
|
|
print(" - Press 's' to save frame") |
|
|
print(" - Press 'i' to show model info") |
|
|
print(" - Press 'e' to toggle enhancement") |
|
|
print(" - Press 'b' to toggle knife confidence boost") |
|
|
print(" - Press 'f' to toggle fight analysis") |
|
|
print(" - Press 'h' for help") |
|
|
|
|
|
try: |
|
|
cap = cv2.VideoCapture(0) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
print("โ Cannot open webcam. Check if camera is connected.") |
|
|
else: |
|
|
print("โ
Enhanced webcam processing started") |
|
|
|
|
|
frame_count = 0 |
|
|
detection_stats = { |
|
|
'weapons': 0, |
|
|
'knives': 0, |
|
|
'guns': 0, |
|
|
'fights': 0, |
|
|
'nsfw': 0, |
|
|
'total_frames': 0, |
|
|
'fight_incidents': 0 |
|
|
} |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
print("โ Cannot read from webcam") |
|
|
break |
|
|
|
|
|
frame_count += 1 |
|
|
detection_stats['total_frames'] = frame_count |
|
|
frame = cv2.flip(frame, 1) |
|
|
|
|
|
|
|
|
y_offset = frame.shape[0] - 120 |
|
|
cv2.putText(frame, |
|
|
f"Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}", |
|
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) |
|
|
|
|
|
cv2.putText(frame, |
|
|
f"Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}", |
|
|
(10, y_offset + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) |
|
|
|
|
|
cv2.putText(frame, |
|
|
f"Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}", |
|
|
(10, y_offset + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) |
|
|
|
|
|
model_info = "Models: Custom+General" if moderator.weapon_model_custom else "General Only" |
|
|
cv2.putText(frame, model_info, (10, y_offset + 60), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) |
|
|
|
|
|
|
|
|
result = moderator.process_image(frame) |
|
|
|
|
|
if result and result['action_required']: |
|
|
|
|
|
for detection in result['detections']: |
|
|
if detection['type'] == 'weapon': |
|
|
detection_stats['weapons'] += 1 |
|
|
if detection['weapon_type'] == 'blade': |
|
|
detection_stats['knives'] += 1 |
|
|
elif detection['weapon_type'] == 'firearm': |
|
|
detection_stats['guns'] += 1 |
|
|
elif detection['type'] == 'fight': |
|
|
detection_stats['fights'] += 1 |
|
|
if detection.get('aggression_level') in ['high', 'extreme']: |
|
|
detection_stats['fight_incidents'] += 1 |
|
|
elif detection['type'] == 'nsfw': |
|
|
detection_stats['nsfw'] += 1 |
|
|
|
|
|
print(f"โ ๏ธ Frame {frame_count}: {result['risk_level']} risk - {result['total_threats']} threats!") |
|
|
|
|
|
|
|
|
for detection in result['detections']: |
|
|
if detection['type'] == 'weapon': |
|
|
icon = "๐ช" if detection['weapon_type'] == 'blade' else "๐ซ" |
|
|
method = detection.get('detection_method', 'unknown').split('_')[-1] |
|
|
print(f" {icon} {detection['class']} ({detection['confidence']:.3f}) [{method}]") |
|
|
elif detection['type'] == 'fight': |
|
|
fight_type = detection.get('fight_type', 'general') |
|
|
aggression = detection.get('aggression_level', 'unknown') |
|
|
people = detection.get('people_involved', 0) |
|
|
method = detection.get('detection_method', 'unknown').split('_')[-1] |
|
|
print(f" ๐ FIGHT: {fight_type} ({detection['confidence']:.3f}) [{method}]") |
|
|
print(f" Aggression: {aggression}, People: {people}") |
|
|
|
|
|
|
|
|
if 'annotated_image' in result: |
|
|
cv2.imshow('Enhanced Detection System (Weapons + Fights)', result['annotated_image']) |
|
|
else: |
|
|
|
|
|
breakdown = result.get('detection_breakdown', {}) |
|
|
threat_text = f"THREATS: W:{breakdown.get('weapons', 0)} F:{breakdown.get('fights', 0)} N:{breakdown.get('nsfw', 0)}" |
|
|
cv2.putText(frame, threat_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) |
|
|
else: |
|
|
cv2.putText(frame, "SAFE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
cv2.imshow('Enhanced Detection System (Weapons + Fights)', frame) |
|
|
|
|
|
|
|
|
key = cv2.waitKey(1) & 0xFF |
|
|
if key == ord('q'): |
|
|
print("๐ Webcam stopped by user") |
|
|
break |
|
|
elif key == ord('s'): |
|
|
filename = f"enhanced_detection_{frame_count}.jpg" |
|
|
cv2.imwrite(filename, frame) |
|
|
print(f"๐พ Frame saved as {filename}") |
|
|
elif key == ord('i'): |
|
|
print(f"\n๐ Model Status:") |
|
|
current_status = moderator.get_model_status() |
|
|
for k, v in current_status.items(): |
|
|
print(f" {k}: {v}") |
|
|
elif key == ord('e'): |
|
|
|
|
|
moderator.config['weapon_detection']['use_enhancement'] = \ |
|
|
not moderator.config['weapon_detection']['use_enhancement'] |
|
|
print( |
|
|
f"๐ง Enhancement: {'ON' if moderator.config['weapon_detection']['use_enhancement'] else 'OFF'}") |
|
|
elif key == ord('b'): |
|
|
|
|
|
moderator.config['weapon_detection']['boost_knife_detection'] = \ |
|
|
not moderator.config['weapon_detection']['boost_knife_detection'] |
|
|
print( |
|
|
f"๐ Knife Boost: {'ON' if moderator.config['weapon_detection']['boost_knife_detection'] else 'OFF'}") |
|
|
elif key == ord('f'): |
|
|
|
|
|
moderator.config['weapon_detection']['fight_analysis'] = \ |
|
|
not moderator.config['weapon_detection']['fight_analysis'] |
|
|
print( |
|
|
f"๐ Fight Analysis: {'ON' if moderator.config['weapon_detection']['fight_analysis'] else 'OFF'}") |
|
|
elif key == ord('h'): |
|
|
print("\n๐ฎ Controls:") |
|
|
print(" 'q': quit") |
|
|
print(" 's': save frame") |
|
|
print(" 'i': model info") |
|
|
print(" 'e': toggle enhancement") |
|
|
print(" 'b': toggle knife confidence boost") |
|
|
print(" 'f': toggle fight analysis") |
|
|
print(" 'h': help") |
|
|
|
|
|
|
|
|
print(f"\n๐ Session Statistics:") |
|
|
print(f" Total frames: {detection_stats['total_frames']}") |
|
|
print(f" Total weapon detections: {detection_stats['weapons']}") |
|
|
print(f" - Knives/Dao: {detection_stats['knives']}") |
|
|
print(f" - Guns: {detection_stats['guns']}") |
|
|
print(f" Total fight detections: {detection_stats['fights']}") |
|
|
print(f" - High-aggression incidents: {detection_stats['fight_incidents']}") |
|
|
print(f" NSFW detections: {detection_stats['nsfw']}") |
|
|
|
|
|
if detection_stats['total_frames'] > 0: |
|
|
total_detections = detection_stats['weapons'] + detection_stats['fights'] + detection_stats['nsfw'] |
|
|
detection_rate = (total_detections / detection_stats['total_frames'] * 100) |
|
|
print(f" Overall detection rate: {detection_rate:.1f}%") |
|
|
|
|
|
if detection_stats['weapons'] > 0: |
|
|
knife_ratio = detection_stats['knives'] / detection_stats['weapons'] * 100 |
|
|
print(f" Knife detection ratio: {knife_ratio:.1f}% of weapons") |
|
|
|
|
|
if detection_stats['fights'] > 0: |
|
|
incident_ratio = detection_stats['fight_incidents'] / detection_stats['fights'] * 100 |
|
|
print(f" High-aggression fight ratio: {incident_ratio:.1f}% of fights") |
|
|
|
|
|
cap.release() |
|
|
cv2.destroyAllWindows() |
|
|
print("โ
Enhanced webcam session completed") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ Webcam error: {e}") |
|
|
|
|
|
|
|
|
print(f"\n๐พ {moderator.get_memory_usage()}") |
|
|
print(f"๐๏ธ Final cache size: {len(moderator.detection_cache)} entries") |
|
|
|
|
|
|
|
|
moderator.save_report("enhanced_detection_with_fights_report.json") |
|
|
|
|
|
print("\nโ
Enhanced Content Moderation System with Fight Detection completed!") |
|
|
print("๐ก New fight detection capabilities:") |
|
|
print(" - Behavioral fight pattern recognition") |
|
|
print(" - Multi-person fight analysis") |
|
|
print(" - Aggression level assessment") |
|
|
print(" - Context-aware threat escalation") |
|
|
print(" - Fight timeline tracking for videos") |
|
|
print("๐ก Enhanced weapon detection:") |
|
|
print(" - Image enhancement preprocessing") |
|
|
print(" - Dynamic confidence thresholds") |
|
|
print(" - Geometric feature analysis") |
|
|
print(" - Multi-pass detection strategy") |
|
|
print("๐ก Processing mode: EVERY FRAME (no skipping)") |
|
|
|