| import os |
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class FaceDetector: |
| def __init__(self, model_path="models/yolov8n-face.onnx"): |
| self.model_path = model_path |
| self.loaded = False |
| self.session = None |
| |
| if os.path.exists(model_path): |
| try: |
| |
| self.session = ort.InferenceSession( |
| model_path, |
| providers=['CPUExecutionProvider'] |
| ) |
| self.loaded = True |
| logger.info(f"YOLOv8 Face Detector loaded successfully from {model_path}") |
| except Exception as e: |
| logger.error(f"Error initializing YOLOv8 ONNX session: {e}") |
| else: |
| logger.warning(f"YOLOv8 face detection model file missing at {model_path}") |
|
|
| def detect_faces(self, image_array): |
| """ |
| Detects faces in the input image. |
| Returns a list of [x1, y1, x2, y2] boxes with 15px padding. |
| """ |
| if not self.loaded or self.session is None: |
| logger.warning("Face detector model not loaded. Skipping detection.") |
| return [] |
| |
| h, w = image_array.shape[:2] |
| |
| |
| input_img = cv2.resize(image_array, (640, 640)) |
| input_img = input_img.astype(np.float32) / 255.0 |
| input_img = np.transpose(input_img, (2, 0, 1)) |
| input_tensor = np.expand_dims(input_img, axis=0) |
| |
| try: |
| outputs = self.session.run( |
| None, |
| {self.session.get_inputs()[0].name: input_tensor} |
| ) |
| |
| detections = outputs[0][0] |
| detections = np.transpose(detections) |
| except Exception as e: |
| logger.error(f"Error during YOLOv8 detection inference: {e}") |
| return [] |
|
|
| raw_boxes = [] |
| raw_scores = [] |
| |
| for pred in detections: |
| score = float(pred[4]) |
| if score > 0.5: |
| cx, cy, nw, nh = float(pred[0]), float(pred[1]), float(pred[2]), float(pred[3]) |
| |
| |
| x1 = int((cx - nw/2) * (w / 640.0)) |
| y1 = int((cy - nh/2) * (h / 640.0)) |
| x2 = int((cx + nw/2) * (w / 640.0)) |
| y2 = int((cy + nh/2) * (h / 640.0)) |
| |
| |
| x1 = max(0, x1 - 15) |
| y1 = max(0, y1 - 15) |
| x2 = min(w, x2 + 15) |
| y2 = min(h, y2 + 15) |
| |
| |
| box_w = x2 - x1 |
| box_h = y2 - y1 |
| if box_w < 40 or box_h < 40: |
| logger.info(f"Face too small ({box_w}x{box_h}px), skipping") |
| continue |
| |
| raw_boxes.append([x1, y1, x2, y2]) |
| raw_scores.append(score) |
|
|
| |
| |
| filtered_boxes = self._apply_nms(raw_boxes, raw_scores, iou_threshold=0.4, region_size=50) |
| return filtered_boxes |
|
|
| def _apply_nms(self, boxes, scores, iou_threshold=0.4, region_size=50): |
| """Applies Non-Maximum Suppression and spatial center-distance filtering.""" |
| if not boxes: |
| return [] |
| |
| indices = np.argsort(scores)[::-1] |
| keep = [] |
| |
| while len(indices) > 0: |
| current = indices[0] |
| keep.append(current) |
| |
| if len(indices) == 1: |
| break |
| |
| curr_box = boxes[current] |
| curr_cx = (curr_box[0] + curr_box[2]) / 2.0 |
| curr_cy = (curr_box[1] + curr_box[3]) / 2.0 |
| |
| remaining_indices = indices[1:] |
| filtered_indices = [] |
| |
| for idx in remaining_indices: |
| box = boxes[idx] |
| cx = (box[0] + box[2]) / 2.0 |
| cy = (box[1] + box[3]) / 2.0 |
| |
| |
| dist_x = abs(curr_cx - cx) |
| dist_y = abs(curr_cy - cy) |
| |
| |
| x1 = max(curr_box[0], box[0]) |
| y1 = max(curr_box[1], box[1]) |
| x2 = min(curr_box[2], box[2]) |
| y2 = min(curr_box[3], box[3]) |
| |
| inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| box_area = (box[2] - box[0]) * (box[3] - box[1]) |
| curr_area = (curr_box[2] - curr_box[0]) * (curr_box[3] - curr_box[1]) |
| union_area = float(box_area + curr_area - inter_area) |
| |
| iou = inter_area / union_area if union_area > 0 else 0 |
| |
| |
| if iou > iou_threshold or (dist_x < region_size and dist_y < region_size): |
| continue |
| else: |
| filtered_indices.append(idx) |
| |
| indices = np.array(filtered_indices) |
| |
| return [boxes[i] for i in keep] |
|
|
|
|
| class PhoneDetector: |
| def __init__(self, model_path="models/yolov8n.onnx"): |
| self.model_path = model_path |
| self.loaded = False |
| self.session = None |
| |
| if os.path.exists(model_path): |
| try: |
| |
| self.session = ort.InferenceSession( |
| model_path, |
| providers=['CPUExecutionProvider'] |
| ) |
| self.loaded = True |
| logger.info(f"YOLOv8 COCO Detector loaded successfully from {model_path}") |
| except Exception as e: |
| logger.error(f"Error initializing YOLOv8 COCO ONNX session: {e}") |
| else: |
| logger.warning(f"YOLOv8 COCO model file missing at {model_path}") |
|
|
| def detect_phones(self, image_array, confidence_threshold=0.35): |
| """ |
| Detects cell phones in the input image. |
| Returns a list of dicts: [{"bbox": [x1, y1, x2, y2], "confidence": score}] |
| """ |
| if not self.loaded or self.session is None: |
| return [] |
| |
| h, w = image_array.shape[:2] |
| |
| |
| input_img = cv2.resize(image_array, (640, 640)) |
| input_img = input_img.astype(np.float32) / 255.0 |
| input_img = np.transpose(input_img, (2, 0, 1)) |
| input_tensor = np.expand_dims(input_img, axis=0) |
| |
| try: |
| outputs = self.session.run( |
| None, |
| {self.session.get_inputs()[0].name: input_tensor} |
| ) |
| |
| detections = outputs[0][0] |
| detections = np.transpose(detections) |
| except Exception as e: |
| logger.error(f"Error during YOLOv8 COCO inference: {e}") |
| return [] |
|
|
| raw_boxes = [] |
| raw_scores = [] |
| |
| |
| phone_class_idx = 67 |
| score_idx = 4 + phone_class_idx |
| |
| for pred in detections: |
| score = float(pred[score_idx]) |
| if score > confidence_threshold: |
| cx, cy, nw, nh = float(pred[0]), float(pred[1]), float(pred[2]), float(pred[3]) |
| |
| |
| x1 = int((cx - nw/2) * (w / 640.0)) |
| y1 = int((cy - nh/2) * (h / 640.0)) |
| x2 = int((cx + nw/2) * (w / 640.0)) |
| y2 = int((cy + nh/2) * (h / 640.0)) |
| |
| |
| x1 = max(0, min(w, x1)) |
| y1 = max(0, min(h, y1)) |
| x2 = max(0, min(w, x2)) |
| y2 = max(0, min(h, y2)) |
| |
| |
| box_w = x2 - x1 |
| box_h = y2 - y1 |
| if box_w < 15 or box_h < 15: |
| continue |
| |
| raw_boxes.append([x1, y1, x2, y2]) |
| raw_scores.append(score) |
| |
| if not raw_boxes: |
| return [] |
| |
| |
| keep_indices = cv2.dnn.NMSBoxes( |
| bboxes=raw_boxes, |
| scores=raw_scores, |
| score_threshold=confidence_threshold, |
| nms_threshold=0.45 |
| ) |
| |
| filtered_detections = [] |
| if len(keep_indices) > 0: |
| indices = np.array(keep_indices).flatten() |
| for idx in indices: |
| filtered_detections.append({ |
| "bbox": raw_boxes[idx], |
| "confidence": raw_scores[idx] |
| }) |
| |
| return filtered_detections |
|
|
|
|