| |
| """Open-source Detect-beverage miner v14 (yolo11s trained on SAM3 validator-GT). |
| |
| Trained on 329 frames labelled by the SAME SAM3 endpoint the validator uses to |
| build pseudo-GT (prompts cup/bottle/can, thr 0.5, mosaic 0) — i.e. the actual |
| scoring target, not peer miners' boxes. NMS-baked ONNX, output [1,300,6]. |
| |
| On 50 SAM3-GT holdout (validator-target): mAP50=0.715 (navierstocks 0.673, |
| v12 0.645); best composite UI 63.47% (nav 62.91%, v12 61.97%). Beats peers on |
| detection; parity-plus on composite. |
| |
| Post-proc: |
| - detect NMS-baked output and unpack to (N, 4+num_classes) one-hot scores |
| - per-class conf filter `[0.70, 0.50, 0.50]` (best v14 sweep on SAM3-GT) |
| - sane-box geometric filter (min_box_area=100, max_aspect_ratio=10) |
| - per-class hard NMS @ iou=0.4 (redundant after baked NMS but safe) |
| - cross-class dedup @ iou=0.7 (helps bottle↔can misclassification FP) |
| - TTA off |
| |
| Contract: class `Miner` at HF root, `predict_batch(...) -> list[TVFrameResult]`. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
|
|
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| from numpy import ndarray |
| from pydantic import BaseModel |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
|
|
| class Miner: |
| weights_file = "best.onnx" |
| input_size = 1280 |
| num_classes = 3 |
|
|
| |
| conf_thres = np.array([0.70, 0.50, 0.50], dtype=np.float32) |
| |
| rescue_bonus = np.array([0.0, 0.0, 0.0], dtype=np.float32) |
|
|
| iou_thres = 0.40 |
| cross_iou_thres = 0.70 |
| containment_thres = 1.00 |
|
|
| min_box_area = 100.0 |
| min_side = 8.0 |
| max_aspect_ratio = 10.0 |
| max_det = 300 |
| use_flip_tta = False |
|
|
| def __init__(self, path_hf_repo: Path) -> None: |
| so = ort.SessionOptions() |
| so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
| self.sess = ort.InferenceSession( |
| str(Path(path_hf_repo) / self.weights_file), |
| providers=[ |
| ("CUDAExecutionProvider", {"device_id": 0}), |
| "CPUExecutionProvider", |
| ], |
| sess_options=so, |
| ) |
| self.inp = self.sess.get_inputs()[0].name |
| _ort_type = self.sess.get_inputs()[0].type |
| self.np_dtype = np.float16 if "float16" in _ort_type else np.float32 |
| active = self.sess.get_providers()[0] |
| print(f"✅ v14 ONNX beverage model loaded (provider={active}, dtype={self.np_dtype.__name__})") |
|
|
| |
| out0 = self.sess.get_outputs()[0] |
| print(f"ONNX output: name={out0.name} shape={out0.shape}") |
|
|
| |
| |
| try: |
| dummy = np.zeros((self.input_size, self.input_size, 3), dtype=np.uint8) |
| _ = self._infer(dummy) |
| print(f"✅ v14 ONNX warmup pass completed (provider={active})") |
| except Exception as e: |
| print(f"⚠️ v14 ONNX warmup pass failed (not fatal): {e}") |
|
|
| def __repr__(self) -> str: |
| return f"BeverageONNXv14(in={self.input_size}, cls={self.num_classes})" |
|
|
| |
| def _letterbox(self, im: ndarray) -> tuple[ndarray, float]: |
| h0, w0 = im.shape[:2] |
| s = min(self.input_size / h0, self.input_size / w0) |
| nh, nw = int(round(h0 * s)), int(round(w0 * s)) |
| |
| interp = cv2.INTER_CUBIC if s > 1.0 else cv2.INTER_LINEAR |
| r = cv2.resize(im, (nw, nh), interpolation=interp) |
| out = np.full((self.input_size, self.input_size, 3), 114, np.uint8) |
| out[:nh, :nw] = r |
| return out, s |
|
|
| def _infer(self, im_bgr: ndarray) -> ndarray: |
| lb, s = self._letterbox(im_bgr) |
| x = (lb[:, :, ::-1].transpose(2, 0, 1)[None].astype(np.float32) / 255.0 |
| ).astype(self.np_dtype) |
| raw = self.sess.run(None, {self.inp: x})[0] |
| raw = np.asarray(raw, dtype=np.float32) |
|
|
| |
| if raw.ndim == 3 and raw.shape[-1] == 6: |
| arr = raw[0] |
| keep = arr[:, 4] > 0 |
| arr = arr[keep] |
| if len(arr) == 0: |
| return np.zeros((0, 4 + self.num_classes), dtype=np.float32) |
| boxes = arr[:, :4].copy() / s |
| confs = arr[:, 4] |
| cls_ids = arr[:, 5].astype(np.int32) |
| cls_ids = np.clip(cls_ids, 0, self.num_classes - 1) |
| scores = np.zeros((len(arr), self.num_classes), dtype=np.float32) |
| scores[np.arange(len(arr)), cls_ids] = confs |
| return np.concatenate([boxes, scores], axis=1) |
|
|
| |
| out = raw[0] |
| p = out.T if out.shape[0] < out.shape[1] else out |
| boxes = p[:, :4].copy() |
| scores = p[:, 4:4 + self.num_classes] |
| xy = boxes[:, :2] |
| wh = boxes[:, 2:4] |
| x1y1 = (xy - wh / 2) / s |
| x2y2 = (xy + wh / 2) / s |
| return np.concatenate([x1y1, x2y2, scores], axis=1) |
|
|
| |
| @staticmethod |
| def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray: |
| if len(boxes) == 0: |
| return np.array([], dtype=np.intp) |
| order = np.argsort(-scores) |
| keep: list[int] = [] |
| while len(order): |
| i = int(order[0]) |
| keep.append(i) |
| if len(order) == 1: |
| break |
| rest = order[1:] |
| xx1 = np.maximum(boxes[i, 0], boxes[rest, 0]) |
| yy1 = np.maximum(boxes[i, 1], boxes[rest, 1]) |
| xx2 = np.minimum(boxes[i, 2], boxes[rest, 2]) |
| yy2 = np.minimum(boxes[i, 3], boxes[rest, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| ai = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) |
| ar = (boxes[rest, 2] - boxes[rest, 0]) * (boxes[rest, 3] - boxes[rest, 1]) |
| iou = inter / (ai + ar - inter + 1e-7) |
| order = rest[iou <= iou_thresh] |
| return np.array(keep, dtype=np.intp) |
|
|
| def _sane_filter(self, boxes: np.ndarray, scores: np.ndarray, cls: np.ndarray, |
| orig_h: int, orig_w: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]: |
| if len(boxes) == 0: |
| return boxes, scores, cls |
| bw = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) |
| bh = np.maximum(0.0, boxes[:, 3] - boxes[:, 1]) |
| area = bw * bh |
| ar = np.where( |
| (bw > 0) & (bh > 0), |
| np.maximum(bw / np.maximum(bh, 1e-6), bh / np.maximum(bw, 1e-6)), |
| np.inf, |
| ) |
| keep = ( |
| (bw >= self.min_side) & (bh >= self.min_side) |
| & (area >= self.min_box_area) |
| & (area <= 0.95 * orig_h * orig_w) |
| & (ar <= self.max_aspect_ratio) |
| ) |
| return boxes[keep], scores[keep], cls[keep] |
|
|
| def _conf_filter_with_rescue(self, scores: np.ndarray, cls: np.ndarray) -> np.ndarray: |
| if len(scores) == 0: |
| return np.zeros(0, dtype=bool) |
| keep = scores >= self.conf_thres[cls] |
| |
| |
| for c in np.unique(cls): |
| b = float(self.rescue_bonus[c]) |
| if b <= 0.0: |
| continue |
| cm = cls == c |
| if keep[cm].any(): |
| continue |
| idx = np.where(cm)[0] |
| top = int(idx[int(np.argmax(scores[idx]))]) |
| if scores[top] >= self.conf_thres[c] - b: |
| keep[top] = True |
| return keep |
|
|
| def _cross_class_dedup(self, boxes: np.ndarray, scores: np.ndarray, cls: np.ndarray, |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """Drop dup boxes between classes (one object getting two cls labels). |
| Lexsort by larger margin-over-threshold first, then larger area.""" |
| n = len(boxes) |
| if n <= 1: |
| return boxes, scores, cls |
| areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) |
| margins = scores - self.conf_thres[cls] |
| order = np.lexsort((-areas, -margins)) |
| suppressed = np.zeros(n, dtype=bool) |
| keep: list[int] = [] |
| for i in order: |
| if suppressed[i]: |
| continue |
| keep.append(int(i)) |
| bi = boxes[i] |
| xx1 = np.maximum(bi[0], boxes[:, 0]) |
| yy1 = np.maximum(bi[1], boxes[:, 1]) |
| xx2 = np.minimum(bi[2], boxes[:, 2]) |
| yy2 = np.minimum(bi[3], boxes[:, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| ai = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) |
| iou = inter / (ai + areas - inter + 1e-7) |
| dup = iou > self.cross_iou_thres |
| dup[i] = False |
| suppressed |= dup |
| idx = np.array(keep, dtype=np.intp) |
| return boxes[idx], scores[idx], cls[idx] |
|
|
| def _containment_dedup(self, boxes: np.ndarray, scores: np.ndarray, cls: np.ndarray, |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """Drop a box if ≥ containment_thres of its area is inside a same-class |
| box that is larger (or equal-size with higher conf). Catches the |
| bottle-inside-bottle / cup-inside-cup pattern YOLO often produces.""" |
| n = len(boxes) |
| if n <= 1: |
| return boxes, scores, cls |
| area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) |
| iw = np.maximum(0.0, np.minimum(boxes[:, 2:3], boxes[None, :, 2]) |
| - np.maximum(boxes[:, 0:1], boxes[None, :, 0])) |
| ih = np.maximum(0.0, np.minimum(boxes[:, 3:4], boxes[None, :, 3]) |
| - np.maximum(boxes[:, 1:2], boxes[None, :, 1])) |
| inter = iw * ih |
| contain = inter / np.maximum(area[:, None], 1e-9) |
| same_class = cls[:, None] == cls[None, :] |
| bigger = area[None, :] > area[:, None] |
| tiebreak = (area[None, :] == area[:, None]) & (scores[None, :] > scores[:, None]) |
| dominator = same_class & (bigger | tiebreak) |
| np.fill_diagonal(dominator, False) |
| suppressed = ((contain >= self.containment_thres) & dominator).any(axis=1) |
| keep = np.where(~suppressed)[0] |
| return boxes[keep], scores[keep], cls[keep] |
|
|
| def _cluster_boost(self, kept_boxes: np.ndarray, kept_cls: np.ndarray, |
| all_boxes: np.ndarray, all_scores: np.ndarray, all_cls: np.ndarray, |
| ) -> np.ndarray: |
| """For each kept box, return max conf among same-class boxes overlapping |
| with IoU≥iou_thres (incl. itself). TTA confidence aggregation.""" |
| n = len(kept_boxes) |
| if n == 0: |
| return np.empty(0, dtype=np.float32) |
| all_areas = (np.maximum(0.0, all_boxes[:, 2] - all_boxes[:, 0]) |
| * np.maximum(0.0, all_boxes[:, 3] - all_boxes[:, 1])) |
| out = np.empty(n, dtype=np.float32) |
| for i in range(n): |
| bi = kept_boxes[i] |
| xx1 = np.maximum(bi[0], all_boxes[:, 0]) |
| yy1 = np.maximum(bi[1], all_boxes[:, 1]) |
| xx2 = np.minimum(bi[2], all_boxes[:, 2]) |
| yy2 = np.minimum(bi[3], all_boxes[:, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| ai = max(0.0, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) |
| iou = inter / (ai + all_areas - inter + 1e-7) |
| cluster = (iou >= self.iou_thres) & (all_cls == kept_cls[i]) |
| out[i] = float(np.max(all_scores[cluster])) if np.any(cluster) else 0.0 |
| return out |
|
|
| |
| def _detect(self, im_bgr: ndarray) -> list[BoundingBox]: |
| orig_h, orig_w = im_bgr.shape[:2] |
|
|
| |
| det = self._infer(im_bgr) |
| if self.use_flip_tta: |
| fl = self._infer(im_bgr[:, ::-1]) |
| W = im_bgr.shape[1] |
| x1n = W - fl[:, 2] |
| x2n = W - fl[:, 0] |
| fl[:, 0], fl[:, 2] = x1n, x2n |
| det = np.concatenate([det, fl], axis=0) |
|
|
| |
| boxes = det[:, :4] |
| cls_all = det[:, 4:].argmax(1).astype(np.int32) |
| conf_all = det[:, 4:].max(1) |
| keep = self._conf_filter_with_rescue(conf_all, cls_all) |
| boxes, scores, cls = boxes[keep], conf_all[keep], cls_all[keep] |
| if len(boxes) == 0: |
| return [] |
|
|
| |
| boxes, scores, cls = self._sane_filter(boxes, scores, cls, orig_h, orig_w) |
| if len(boxes) == 0: |
| return [] |
|
|
| |
| raw_boxes, raw_scores, raw_cls = boxes.copy(), scores.copy(), cls.copy() |
|
|
| |
| keep_idx: list[int] = [] |
| for c in np.unique(cls): |
| m = cls == c |
| mi = np.where(m)[0] |
| k = self._hard_nms(boxes[m], scores[m], self.iou_thres) |
| keep_idx.extend(mi[k].tolist()) |
| keep_idx.sort() |
| ki = np.array(keep_idx, dtype=np.intp) |
| boxes, scores, cls = boxes[ki], scores[ki], cls[ki] |
|
|
| |
| boxes, scores, cls = self._containment_dedup(boxes, scores, cls) |
|
|
| |
| boxes, scores, cls = self._cross_class_dedup(boxes, scores, cls) |
|
|
| |
| if len(boxes): |
| boosted = self._cluster_boost(boxes, cls, raw_boxes, raw_scores, raw_cls) |
| else: |
| boosted = scores |
|
|
| |
| if len(boxes) > self.max_det: |
| top = np.argsort(-boosted)[: self.max_det] |
| boxes, cls, boosted = boxes[top], cls[top], boosted[top] |
|
|
| out: list[BoundingBox] = [] |
| for (x1, y1, x2, y2), c, s in zip(boxes, cls, boosted): |
| if x2 <= x1 or y2 <= y1: |
| continue |
| out.append(BoundingBox( |
| x1=int(x1), y1=int(y1), x2=int(x2), y2=int(y2), |
| cls_id=int(c), conf=float(min(1.0, max(0.0, s))), |
| )) |
| return out |
|
|
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[TVFrameResult]: |
| results: list[TVFrameResult] = [] |
| for i, img in enumerate(batch_images): |
| try: |
| boxes = self._detect(np.ascontiguousarray(img)) |
| except Exception as e: |
| print(f"⚠️ v9 frame {offset + i} detect error: {e}") |
| boxes = [] |
| results.append(TVFrameResult( |
| frame_id=offset + i, boxes=boxes, |
| keypoints=[(0, 0) for _ in range(n_keypoints)])) |
| return results |
|
|