| """miner.py — uploaded to nexu02/ScoreVision HF repo (R17 ONNX migration). |
| |
| Migrated from .pt → ONNX FP16 to comply with subnet requirement |
| (.onnx-only models). Same R17 weights (mAP50 0.928, mAP50-95 0.764) + |
| identical inference recipe to keep the #1 dashboard standing. |
| |
| Inference (same as R17 .pt version): |
| - imgsz=1280, conf=0.50, iou=0.45 |
| - hflip TTA (manual: run twice, merge with per-class NMS) |
| - cross-class NMS at IoU 0.6 |
| |
| Runtime: onnxruntime-gpu (CUDAExecutionProvider) with CPU fallback. |
| FP16 input/weights to fit under 30 MB HF cap (19.3 MB total). |
| """ |
| from pathlib import Path |
| import math |
|
|
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| from numpy import ndarray |
| from pydantic import BaseModel |
|
|
|
|
| CLASS_NAMES = ["cup", "bottle", "can"] |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
|
|
| def _iou_xyxy(a: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Vectorised IoU between one box (a) and array of boxes (b).""" |
| xx1 = np.maximum(a[0], b[:, 0]) |
| yy1 = np.maximum(a[1], b[:, 1]) |
| xx2 = np.minimum(a[2], b[:, 2]) |
| yy2 = np.minimum(a[3], b[:, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| a_area = max(0.0, (a[2] - a[0]) * (a[3] - a[1])) |
| b_area = np.maximum(0.0, (b[:, 2] - b[:, 0]) * (b[:, 3] - b[:, 1])) |
| return inter / (a_area + b_area - inter + 1e-7) |
|
|
|
|
| def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thr: float) -> np.ndarray: |
| """Per-class hard NMS — assumes boxes already filtered to one class.""" |
| n = len(boxes) |
| if n == 0: |
| return np.array([], dtype=np.intp) |
| order = np.argsort(-scores) |
| keep = [] |
| while len(order) > 0: |
| i = int(order[0]) |
| keep.append(i) |
| if len(order) == 1: |
| break |
| rest = order[1:] |
| iou = _iou_xyxy(boxes[i], boxes[rest]) |
| order = rest[iou <= iou_thr] |
| return np.array(keep, dtype=np.intp) |
|
|
|
|
| def _per_class_nms(boxes, scores, cls_ids, iou_thr): |
| if len(boxes) == 0: |
| return np.array([], dtype=np.intp) |
| keep_all = [] |
| for c in np.unique(cls_ids): |
| m = cls_ids == c |
| idx = np.where(m)[0] |
| k = _hard_nms(boxes[m], scores[m], iou_thr) |
| keep_all.extend(idx[k].tolist()) |
| keep_all.sort() |
| return np.array(keep_all, dtype=np.intp) |
|
|
|
|
| def _cross_class_nms(boxes, scores, cls_ids, iou_thr): |
| """Cross-class NMS — drop overlapping boxes regardless of class.""" |
| if len(boxes) <= 1: |
| return np.arange(len(boxes)) |
| order = np.argsort(-scores) |
| keep = [] |
| suppressed = np.zeros(len(boxes), dtype=bool) |
| for i in order: |
| if suppressed[i]: |
| continue |
| keep.append(int(i)) |
| iou = _iou_xyxy(boxes[i], boxes) |
| dup = iou > iou_thr |
| dup[i] = False |
| suppressed |= dup |
| return np.array(sorted(keep), dtype=np.intp) |
|
|
|
|
| class Miner: |
| """R17 ONNX miner. Same recipe as .pt version: 1280 + flip TTA + cross-class NMS.""" |
|
|
| INPUT_SIZE = 1280 |
| CONF_THR = 0.50 |
| IOU_THR = 0.45 |
| CROSS_CLASS_IOU = 0.6 |
|
|
| def __init__(self, path_hf_repo: Path) -> None: |
| model_path = path_hf_repo / "best.onnx" |
| if not model_path.exists(): |
| raise FileNotFoundError(f"missing weights at {model_path}") |
|
|
| print(f"ORT version: {ort.__version__}") |
| try: |
| ort.preload_dlls() |
| except Exception: |
| pass |
|
|
| sess_options = ort.SessionOptions() |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
| try: |
| self.session = ort.InferenceSession( |
| str(model_path), |
| sess_options=sess_options, |
| providers=["CUDAExecutionProvider", "CPUExecutionProvider"], |
| ) |
| except Exception as e: |
| print(f"CUDA session failed, fallback CPU: {e}") |
| self.session = ort.InferenceSession( |
| str(model_path), |
| sess_options=sess_options, |
| providers=["CPUExecutionProvider"], |
| ) |
| print(f"ORT providers: {self.session.get_providers()}") |
| for inp in self.session.get_inputs(): |
| print(f"INPUT {inp.name} shape={inp.shape} dtype={inp.type}") |
| for out in self.session.get_outputs(): |
| print(f"OUTPUT {out.name} shape={out.shape} dtype={out.type}") |
|
|
| self.input_name = self.session.get_inputs()[0].name |
| |
| in_type = self.session.get_inputs()[0].type |
| self.input_dtype = np.float16 if "float16" in in_type else np.float32 |
| print(f"✅ R17 ONNX loaded, input dtype={self.input_dtype.__name__}") |
|
|
| def __repr__(self) -> str: |
| return f"R17_ONNX(imgsz={self.INPUT_SIZE}, conf={self.CONF_THR}, iou={self.IOU_THR})" |
|
|
| def _letterbox(self, img: np.ndarray, size: int): |
| h, w = img.shape[:2] |
| r = min(size / w, size / h) |
| new_w, new_h = int(round(w * r)), int(round(h * r)) |
| if (new_w, new_h) != (w, h): |
| interp = cv2.INTER_LINEAR |
| img = cv2.resize(img, (new_w, new_h), interpolation=interp) |
| dw, dh = (size - new_w) / 2.0, (size - new_h) / 2.0 |
| top = int(round(dh - 0.1)); bottom = int(round(dh + 0.1)) |
| left = int(round(dw - 0.1)); right = int(round(dw + 0.1)) |
| padded = cv2.copyMakeBorder(img, top, bottom, left, right, |
| borderType=cv2.BORDER_CONSTANT, value=(114, 114, 114)) |
| return padded, r, (dw, dh) |
|
|
| def _preprocess(self, img_bgr: np.ndarray): |
| h, w = img_bgr.shape[:2] |
| padded, r, pad = self._letterbox(img_bgr, self.INPUT_SIZE) |
| rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB) |
| x = rgb.astype(self.input_dtype) / 255.0 |
| x = np.transpose(x, (2, 0, 1))[None, ...] |
| return np.ascontiguousarray(x, dtype=self.input_dtype), r, pad, (w, h) |
|
|
| def _decode_raw(self, raw: np.ndarray, r: float, pad, orig_size): |
| """Decode YOLO11 raw output (1, 7, N) → boxes + scores + class. |
| Output shape: 4 box (xywh) + 3 class scores. |
| """ |
| if raw.ndim == 3: |
| raw = raw[0] |
| if raw.shape[0] < raw.shape[1]: |
| raw = raw.T |
| boxes_xywh = raw[:, :4].astype(np.float32) |
| cls_scores = raw[:, 4:].astype(np.float32) |
| cls_ids = np.argmax(cls_scores, axis=1) |
| scores = cls_scores[np.arange(len(cls_scores)), cls_ids] |
|
|
| keep = scores >= self.CONF_THR |
| if not keep.any(): |
| return (np.empty((0, 4)), np.empty(0), np.empty(0, dtype=int)) |
| boxes_xywh, scores, cls_ids = boxes_xywh[keep], scores[keep], cls_ids[keep] |
|
|
| |
| boxes = np.empty_like(boxes_xywh) |
| boxes[:, 0] = boxes_xywh[:, 0] - boxes_xywh[:, 2] / 2 |
| boxes[:, 1] = boxes_xywh[:, 1] - boxes_xywh[:, 3] / 2 |
| boxes[:, 2] = boxes_xywh[:, 0] + boxes_xywh[:, 2] / 2 |
| boxes[:, 3] = boxes_xywh[:, 1] + boxes_xywh[:, 3] / 2 |
|
|
| |
| pad_w, pad_h = pad |
| boxes[:, [0, 2]] -= pad_w |
| boxes[:, [1, 3]] -= pad_h |
| boxes /= r |
|
|
| |
| w, h = orig_size |
| boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, w - 1) |
| boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, h - 1) |
|
|
| return boxes, scores, cls_ids |
|
|
| def _predict_single(self, img_bgr: np.ndarray): |
| x, r, pad, orig = self._preprocess(img_bgr) |
| out = self.session.run(None, {self.input_name: x})[0] |
| return self._decode_raw(out, r, pad, orig) |
|
|
| def _predict_with_tta(self, img_bgr: np.ndarray): |
| """Predict + horizontal flip TTA, merge with per-class NMS.""" |
| boxes1, scores1, cls1 = self._predict_single(img_bgr) |
| flipped = cv2.flip(img_bgr, 1) |
| boxes2, scores2, cls2 = self._predict_single(flipped) |
| if len(boxes2): |
| w = img_bgr.shape[1] |
| new = boxes2.copy() |
| new[:, 0] = w - boxes2[:, 2] |
| new[:, 2] = w - boxes2[:, 0] |
| boxes2 = new |
| if not len(boxes1) and not len(boxes2): |
| return np.empty((0, 4)), np.empty(0), np.empty(0, dtype=int) |
| boxes = np.concatenate([boxes1, boxes2]) if len(boxes1) and len(boxes2) else (boxes1 if len(boxes1) else boxes2) |
| scores = np.concatenate([scores1, scores2]) if len(boxes1) and len(boxes2) else (scores1 if len(scores1) else scores2) |
| cls_ids = np.concatenate([cls1, cls2]) if len(boxes1) and len(boxes2) else (cls1 if len(cls1) else cls2) |
| keep = _per_class_nms(boxes, scores, cls_ids, self.IOU_THR) |
| return boxes[keep], scores[keep], cls_ids[keep] |
|
|
| def predict_batch(self, batch_images: list[ndarray], offset: int, |
| n_keypoints: int) -> list[TVFrameResult]: |
| out: list[TVFrameResult] = [] |
| kp_zeros = [(0, 0) for _ in range(max(0, int(n_keypoints)))] |
| for i, image in enumerate(batch_images): |
| frame_id = offset + i |
| try: |
| if image is None or image.ndim != 3 or image.shape[2] != 3: |
| out.append(TVFrameResult(frame_id=frame_id, boxes=[], keypoints=kp_zeros)) |
| continue |
| if image.dtype != np.uint8: |
| image = image.astype(np.uint8) |
|
|
| boxes, scores, cls_ids = self._predict_with_tta(image) |
| if len(boxes): |
| |
| keep = _cross_class_nms(boxes, scores, cls_ids, self.CROSS_CLASS_IOU) |
| boxes, scores, cls_ids = boxes[keep], scores[keep], cls_ids[keep] |
|
|
| results = [] |
| for b, s, c in zip(boxes, scores, cls_ids): |
| x1, y1, x2, y2 = b |
| if x2 <= x1 or y2 <= y1: |
| continue |
| c_int = int(c) |
| if c_int < 0 or c_int >= len(CLASS_NAMES): |
| continue |
| results.append(BoundingBox( |
| x1=int(math.floor(x1)), y1=int(math.floor(y1)), |
| x2=int(math.ceil(x2)), y2=int(math.ceil(y2)), |
| cls_id=c_int, conf=float(s), |
| )) |
| out.append(TVFrameResult(frame_id=frame_id, boxes=results, keypoints=kp_zeros)) |
| except Exception as e: |
| print(f"Inference err for frame {frame_id}: {e}") |
| out.append(TVFrameResult(frame_id=frame_id, boxes=[], keypoints=kp_zeros)) |
| return out |
|
|