| |
| """SN44 beverage detection miner — v8 (yolo26s FP16 at 1280, hermestech-style pipeline). |
| |
| v8 (2026-05-04 ~22:30Z): two simultaneous changes from v7 (emu): |
| 1. WEIGHTS: yolo26s @ 1280, FP16 ONNX (~18.7 MB). Trained natively in |
| validator class order [cup, bottle, can] on merged_v8 (~38k images), |
| epoch 40 best (mAP50=0.840 / mAP50-95=0.694). Replaces v7's yolo26n |
| (~10.3 MB FP32). FP16 quantization: <0.001 mAP loss vs FP32 export. |
| 2. INFERENCE PIPELINE: ported from hermestech00/person-detect-0 (top-1 |
| beverage miner). Aggressive precision-over-recall: |
| - conf_threshold 0.55 → 0.75 |
| - iou_thresh 0.5 → 0.07 (very aggressive NMS) |
| - new max_aspect_ratio 5.0 (was 8.0) |
| - new max_box_area_ratio 0.85 (rejects frame-covering FPs) |
| - new min_w/min_h 6/6 (replaces min_side=8) |
| - TTA-consensus: all orig-view boxes accepted directly (conf_high=0.0); |
| flip view used only to BOOST scores at near-perfect IoU match |
| (tta_match_iou=0.99); flip-only boxes added if no orig overlap. |
| Offline mAP DROPS (~13% on this val set) but the manak0 manifest scores |
| 0.6×mAP50 + 0.4×false_positive — the precision boost is expected to |
| dominate the FP component. Empirical: hermestech with this exact pipeline |
| is rank-1 (0.67 mean) vs our emu's 0.46 mean (rank 5). |
| |
| OLD v7 (kept for context, see miner.py.v7_backup_*): |
| - alfred-aligned: conf=0.55, iou=0.5, TTA=union-then-NMS-then-boost |
| - yolo26n FP32 (~10.3 MB) |
| """ |
| import math |
| from pathlib import Path |
|
|
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| from numpy import ndarray |
| from pydantic import BaseModel |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
|
|
| class Miner: |
| """yolo26n e2e ONNX miner for manak0/Detect-beverage-detect. |
| Chute platform calls predict_batch(batch_images, offset, n_keypoints). |
| """ |
|
|
| def __init__(self, path_hf_repo) -> None: |
| self.path_hf_repo = Path(path_hf_repo) |
|
|
| |
| self.class_names = ["cup", "bottle", "can"] |
| |
| |
| self.cls_remap = np.arange(3, dtype=np.int32) |
|
|
| try: |
| ort.preload_dlls() |
| except Exception: |
| pass |
|
|
| sess_options = ort.SessionOptions() |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
| try: |
| self.session = ort.InferenceSession( |
| str(self.path_hf_repo / "weights.onnx"), |
| sess_options=sess_options, |
| providers=["CUDAExecutionProvider", "CPUExecutionProvider"], |
| ) |
| except Exception: |
| self.session = ort.InferenceSession( |
| str(self.path_hf_repo / "weights.onnx"), |
| sess_options=sess_options, |
| providers=["CPUExecutionProvider"], |
| ) |
|
|
| self.input_name = self.session.get_inputs()[0].name |
| self.output_names = [o.name for o in self.session.get_outputs()] |
|
|
| |
| self.input_h = 1280 |
| self.input_w = 1280 |
|
|
| |
| |
| |
| |
| |
| |
| self.conf_threshold = 0.60 |
| self.iou_thresh = 0.07 |
| self.cross_iou_thresh = 0.7 |
| self.max_det = 150 |
| self.use_tta = True |
| |
| self.conf_high = 0.0 |
| self.tta_match_iou = 0.99 |
|
|
| |
| self.min_box_area = 144 |
| self.min_w = 6 |
| self.min_h = 6 |
| self.max_aspect_ratio = 5.0 |
| self.max_box_area_ratio = 0.85 |
|
|
| |
| warm = np.zeros((self.input_h, self.input_w, 3), dtype=np.uint8) |
| for _ in range(5): |
| try: |
| self._infer_single(warm) |
| except Exception: |
| break |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"BeverageMiner v8-hermestech input={self.input_h}x{self.input_w} " |
| f"conf>={self.conf_threshold} iou={self.iou_thresh} " |
| f"tta_match_iou={self.tta_match_iou} use_tta={self.use_tta} " |
| f"providers={self.session.get_providers()}" |
| ) |
|
|
| |
| def _letterbox(self, image: ndarray) -> tuple[ndarray, float, tuple[float, float]]: |
| """Aspect-preserving resize + 114-grey pad to (input_h, input_w). |
| Cubic when upscaling (small-object fidelity), linear when downscaling. |
| """ |
| h, w = image.shape[:2] |
| ratio = min(self.input_w / w, self.input_h / h) |
| nw, nh = int(round(w * ratio)), int(round(h * ratio)) |
| if (nw, nh) != (w, h): |
| interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR |
| resized = cv2.resize(image, (nw, nh), interpolation=interp) |
| else: |
| resized = image |
| canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8) |
| dy = (self.input_h - nh) // 2 |
| dx = (self.input_w - nw) // 2 |
| canvas[dy:dy + nh, dx:dx + nw] = resized |
| return canvas, ratio, (float(dx), float(dy)) |
|
|
| def _preprocess(self, image_bgr: ndarray): |
| canvas, ratio, pad = self._letterbox(image_bgr) |
| rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB) |
| x = (rgb.astype(np.float32) / 255.0).transpose(2, 0, 1)[None, ...] |
| |
| |
| return np.ascontiguousarray(x, dtype=np.float16), ratio, pad |
|
|
| |
| @staticmethod |
| def _hard_nms(boxes: ndarray, scores: ndarray, iou_thresh: float) -> ndarray: |
| n = len(boxes) |
| if n == 0: |
| return np.array([], dtype=np.intp) |
| order = np.argsort(scores)[::-1] |
| keep: list[int] = [] |
| suppressed = np.zeros(n, dtype=bool) |
| for i in range(n): |
| idx = order[i] |
| if suppressed[idx]: |
| continue |
| keep.append(int(idx)) |
| bi = boxes[idx] |
| for k in range(i + 1, n): |
| jdx = order[k] |
| if suppressed[jdx]: |
| continue |
| bj = boxes[jdx] |
| xx1 = max(bi[0], bj[0]); yy1 = max(bi[1], bj[1]) |
| xx2 = min(bi[2], bj[2]); yy2 = min(bi[3], bj[3]) |
| inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1) |
| ai = (bi[2] - bi[0]) * (bi[3] - bi[1]) |
| aj = (bj[2] - bj[0]) * (bj[3] - bj[1]) |
| iou = inter / (ai + aj - inter + 1e-7) |
| if iou > iou_thresh: |
| suppressed[jdx] = True |
| return np.array(keep, dtype=np.intp) |
|
|
| def _per_class_hard_nms( |
| self, boxes: ndarray, scores: ndarray, cls_ids: ndarray, iou_thresh: float |
| ) -> ndarray: |
| if len(boxes) == 0: |
| return np.array([], dtype=np.intp) |
| all_keep: list[int] = [] |
| for c in np.unique(cls_ids): |
| mask = cls_ids == c |
| indices = np.where(mask)[0] |
| keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) |
| all_keep.extend(indices[keep].tolist()) |
| all_keep.sort() |
| return np.array(all_keep, dtype=np.intp) |
|
|
| @staticmethod |
| def _cross_class_dedup( |
| boxes: ndarray, scores: ndarray, cls_ids: ndarray, iou_thresh: float |
| ) -> tuple[ndarray, ndarray, ndarray]: |
| """Suppress high-overlap duplicates across classes (FP reducer). |
| Sort priority: larger boxes first, then higher score (alfred's heuristic). |
| """ |
| n = len(boxes) |
| if n <= 1: |
| return boxes, scores, cls_ids |
| areas = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1]) |
| order = np.lexsort((-scores, -areas)) |
| suppressed = np.zeros(n, dtype=bool) |
| keep: list[int] = [] |
| for i in order: |
| if suppressed[i]: |
| continue |
| keep.append(int(i)) |
| bi = boxes[i] |
| xx1 = np.maximum(bi[0], boxes[:, 0]); yy1 = np.maximum(bi[1], boxes[:, 1]) |
| xx2 = np.minimum(bi[2], boxes[:, 2]); yy2 = np.minimum(bi[3], boxes[:, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| ai = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) |
| iou = inter / (ai + areas - inter + 1e-7) |
| dup = iou > iou_thresh |
| dup[i] = False |
| suppressed |= dup |
| kept = np.array(keep, dtype=np.intp) |
| return boxes[kept], scores[kept], cls_ids[kept] |
|
|
| @staticmethod |
| def _max_score_per_cluster( |
| coords: ndarray, scores: ndarray, keep_idx: ndarray, iou_thresh: float |
| ) -> ndarray: |
| """For each kept box, return the max original score among all boxes |
| that overlap it at IoU >= iou_thresh. Used post-TTA so consensus |
| detections get pushed higher in the rank-ordered PR curve.""" |
| if len(keep_idx) == 0: |
| return np.array([], dtype=np.float32) |
| out = np.empty(len(keep_idx), dtype=np.float32) |
| for j, idx in enumerate(keep_idx): |
| bi = coords[idx] |
| xx1 = np.maximum(bi[0], coords[:, 0]); yy1 = np.maximum(bi[1], coords[:, 1]) |
| xx2 = np.minimum(bi[2], coords[:, 2]); yy2 = np.minimum(bi[3], coords[:, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| ai = (bi[2] - bi[0]) * (bi[3] - bi[1]) |
| aj = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1]) |
| iou = inter / (ai + aj - inter + 1e-7) |
| out[j] = float(np.max(scores[iou >= iou_thresh])) |
| return out |
|
|
| |
| def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]: |
| inp, ratio, (dx, dy) = self._preprocess(image_bgr) |
| out = self.session.run(self.output_names, {self.input_name: inp})[0] |
| if out.ndim == 3: |
| out = out[0] |
|
|
| confs = out[:, 4].astype(np.float32) |
| keep = confs >= self.conf_threshold |
| if not keep.any(): |
| return [] |
| out = out[keep] |
|
|
| boxes = out[:, :4].astype(np.float32).copy() |
| confs = out[:, 4].astype(np.float32) |
| cls_ids = self.cls_remap[out[:, 5].astype(np.int32)] |
|
|
| |
| boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / ratio |
| boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / ratio |
|
|
| orig_h, orig_w = image_bgr.shape[:2] |
| boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, orig_w - 1) |
| boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, orig_h - 1) |
|
|
| if len(boxes) > 1: |
| keep_idx = self._per_class_hard_nms(boxes, confs, cls_ids, self.iou_thresh) |
| keep_idx = keep_idx[: self.max_det] |
| boxes = boxes[keep_idx] |
| confs = confs[keep_idx] |
| cls_ids = cls_ids[keep_idx] |
| boxes, confs, cls_ids = self._cross_class_dedup( |
| boxes, confs, cls_ids, self.cross_iou_thresh |
| ) |
|
|
| return self._to_boundingboxes(boxes, confs, cls_ids, orig_w, orig_h) |
|
|
| def _infer_tta(self, image_bgr: ndarray) -> list[BoundingBox]: |
| """Hermestech-style TTA consensus (port from hermestech00/person-detect-0): |
| - all orig-view boxes accepted directly (conf_high=0.0) |
| - flip-view ONLY used to boost orig scores at near-perfect IoU match |
| - flip-only boxes added if no original-view overlap at tta_match_iou |
| - final per-class NMS at iou_thresh (0.07) + geometry filters |
| """ |
| boxes_orig = self._infer_single(image_bgr) |
| h, w = image_bgr.shape[:2] |
| flipped = cv2.flip(image_bgr, 1) |
| boxes_flip_raw = self._infer_single(flipped) |
| boxes_flip = [ |
| BoundingBox(x1=w - b.x2, y1=b.y1, x2=w - b.x1, y2=b.y2, |
| cls_id=b.cls_id, conf=b.conf) |
| for b in boxes_flip_raw |
| ] |
| if not boxes_orig and not boxes_flip: |
| return [] |
|
|
| coords_o = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0, 4), dtype=np.float32) |
| scores_o = np.array([b.conf for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0,), dtype=np.float32) |
| cls_o = np.array([b.cls_id for b in boxes_orig], dtype=np.int32) if boxes_orig else np.empty((0,), dtype=np.int32) |
| coords_f = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0, 4), dtype=np.float32) |
| scores_f = np.array([b.conf for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0,), dtype=np.float32) |
| cls_f = np.array([b.cls_id for b in boxes_flip], dtype=np.int32) if boxes_flip else np.empty((0,), dtype=np.int32) |
|
|
| acc_b: list[ndarray] = [] |
| acc_s: list[float] = [] |
| acc_c: list[int] = [] |
|
|
| |
| for i in range(len(coords_o)): |
| sc = float(scores_o[i]) |
| if sc >= self.conf_high: |
| acc_b.append(coords_o[i]); acc_s.append(sc); acc_c.append(int(cls_o[i])) |
| elif len(coords_f) > 0: |
| ious = self._box_iou_one_to_many(coords_o[i], coords_f) |
| j = int(np.argmax(ious)) |
| if ious[j] >= self.tta_match_iou: |
| acc_b.append(coords_o[i]) |
| acc_s.append(max(sc, float(scores_f[j]))) |
| acc_c.append(int(cls_o[i])) |
|
|
| |
| for i in range(len(coords_f)): |
| sc = float(scores_f[i]) |
| if sc < self.conf_high: |
| continue |
| if len(coords_o) == 0: |
| acc_b.append(coords_f[i]); acc_s.append(sc); acc_c.append(int(cls_f[i])); continue |
| ious = self._box_iou_one_to_many(coords_f[i], coords_o) |
| if np.max(ious) < self.tta_match_iou: |
| acc_b.append(coords_f[i]); acc_s.append(sc); acc_c.append(int(cls_f[i])) |
|
|
| if not acc_b: |
| return [] |
|
|
| boxes = np.array(acc_b, dtype=np.float32) |
| scores = np.array(acc_s, dtype=np.float32) |
| cls_ids = np.array(acc_c, dtype=np.int32) |
|
|
| keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thresh) |
| if len(keep) == 0: |
| return [] |
| keep = keep[: self.max_det] |
|
|
| |
| return self._to_boundingboxes(boxes[keep], scores[keep], cls_ids[keep], w, h) |
|
|
| @staticmethod |
| def _box_iou_one_to_many(box: ndarray, others: ndarray) -> ndarray: |
| """IoU of one box [x1,y1,x2,y2] vs Nx4 array of others. Returns 1-D scores.""" |
| if len(others) == 0: |
| return np.array([], dtype=np.float32) |
| x1 = np.maximum(box[0], others[:, 0]); y1 = np.maximum(box[1], others[:, 1]) |
| x2 = np.minimum(box[2], others[:, 2]); y2 = np.minimum(box[3], others[:, 3]) |
| inter = np.maximum(0.0, x2 - x1) * np.maximum(0.0, y2 - y1) |
| a = (box[2] - box[0]) * (box[3] - box[1]) |
| b = (others[:, 2] - others[:, 0]) * (others[:, 3] - others[:, 1]) |
| return inter / (a + b - inter + 1e-7) |
|
|
| def _to_boundingboxes( |
| self, boxes: ndarray, confs: ndarray, cls_ids: ndarray, |
| orig_w: int, orig_h: int, |
| ) -> list[BoundingBox]: |
| out: list[BoundingBox] = [] |
| for i in range(len(boxes)): |
| x1, y1, x2, y2 = boxes[i] |
| ix1 = max(0, min(orig_w, math.floor(x1))) |
| iy1 = max(0, min(orig_h, math.floor(y1))) |
| ix2 = max(0, min(orig_w, math.ceil(x2))) |
| iy2 = max(0, min(orig_h, math.ceil(y2))) |
| if ix2 <= ix1 or iy2 <= iy1: |
| continue |
| bw, bh = ix2 - ix1, iy2 - iy1 |
| if bw * bh < self.min_box_area: |
| continue |
| if bw < self.min_w or bh < self.min_h: |
| continue |
| ar = max(bw / max(bh, 1), bh / max(bw, 1)) |
| if ar > self.max_aspect_ratio: |
| continue |
| |
| if (bw * bh) / max(1, orig_w * orig_h) > self.max_box_area_ratio: |
| continue |
| out.append(BoundingBox( |
| x1=ix1, y1=iy1, x2=ix2, y2=iy2, |
| cls_id=int(cls_ids[i]), |
| conf=max(0.0, min(1.0, float(confs[i]))), |
| )) |
| return out |
|
|
| |
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[TVFrameResult]: |
| infer = self._infer_tta if self.use_tta else self._infer_single |
| results: list[TVFrameResult] = [] |
| for idx, image in enumerate(batch_images): |
| boxes = infer(image) |
| results.append(TVFrameResult( |
| frame_id=offset + idx, |
| boxes=boxes, |
| keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], |
| )) |
| return results |
|
|