# build-marker: v8-yolo26s-fp16-1280-hermestech-pipeline """SN44 beverage detection miner — v8 (yolo26s FP16 at 1280, hermestech-style pipeline). v8 (2026-05-04 ~22:30Z): two simultaneous changes from v7 (emu): 1. WEIGHTS: yolo26s @ 1280, FP16 ONNX (~18.7 MB). Trained natively in validator class order [cup, bottle, can] on merged_v8 (~38k images), epoch 40 best (mAP50=0.840 / mAP50-95=0.694). Replaces v7's yolo26n (~10.3 MB FP32). FP16 quantization: <0.001 mAP loss vs FP32 export. 2. INFERENCE PIPELINE: ported from hermestech00/person-detect-0 (top-1 beverage miner). Aggressive precision-over-recall: - conf_threshold 0.55 → 0.75 - iou_thresh 0.5 → 0.07 (very aggressive NMS) - new max_aspect_ratio 5.0 (was 8.0) - new max_box_area_ratio 0.85 (rejects frame-covering FPs) - new min_w/min_h 6/6 (replaces min_side=8) - TTA-consensus: all orig-view boxes accepted directly (conf_high=0.0); flip view used only to BOOST scores at near-perfect IoU match (tta_match_iou=0.99); flip-only boxes added if no orig overlap. Offline mAP DROPS (~13% on this val set) but the manak0 manifest scores 0.6×mAP50 + 0.4×false_positive — the precision boost is expected to dominate the FP component. Empirical: hermestech with this exact pipeline is rank-1 (0.67 mean) vs our emu's 0.46 mean (rank 5). OLD v7 (kept for context, see miner.py.v7_backup_*): - alfred-aligned: conf=0.55, iou=0.5, TTA=union-then-NMS-then-boost - yolo26n FP32 (~10.3 MB) """ import math from pathlib import Path import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: """yolo26n e2e ONNX miner for manak0/Detect-beverage-detect. Chute platform calls predict_batch(batch_images, offset, n_keypoints). """ def __init__(self, path_hf_repo) -> None: self.path_hf_repo = Path(path_hf_repo) # Validator's positional class order from the SN44 element manifest. self.class_names = ["cup", "bottle", "can"] # v7: model trained natively in validator class order [cup, bottle, can] # so cls_remap is identity (no remap needed). self.cls_remap = np.arange(3, dtype=np.int32) try: ort.preload_dlls() except Exception: pass sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(self.path_hf_repo / "weights.onnx"), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) except Exception: self.session = ort.InferenceSession( str(self.path_hf_repo / "weights.onnx"), sess_options=sess_options, providers=["CPUExecutionProvider"], ) self.input_name = self.session.get_inputs()[0].name self.output_names = [o.name for o in self.session.get_outputs()] # Match exported ONNX resolution. self.input_h = 1280 self.input_w = 1280 # hermestech-inspired aggressive filtering (top-1 beverage miner pattern). # 2026-05-05: conf relaxed 0.75 → 0.60 after weevil scored 0.34 mean (n=3) # vs emu baseline 0.465. Hypothesis: 0.75 was too aggressive for OUR # yolo26s+FP16 model — model produces fewer high-conf detections than # hermestech's. 0.60 should retain mid-conf real detections while # still cutting noise. self.conf_threshold = 0.60 # was 0.75 (hermestech's value); pre-hermestech was 0.55 # husky-collision-retry self.iou_thresh = 0.07 # unchanged — aggressive NMS still desired self.cross_iou_thresh = 0.7 # cross-class dedup (kept; hermestech omits) self.max_det = 150 self.use_tta = True # TTA-consensus thresholds (port of hermestech _merge_tta_consensus): self.conf_high = 0.0 # ALL orig-view boxes accepted directly self.tta_match_iou = 0.99 # near-perfect IoU required to fuse orig+flip scores # Geometry filters (hermestech-tuned for beverage). self.min_box_area = 144 # was 100 (12x12 vs 10x10) self.min_w = 6 # NEW self.min_h = 6 # NEW self.max_aspect_ratio = 5.0 # was 8.0 self.max_box_area_ratio = 0.85 # NEW — reject frame-covering false positives # GPU warmup. warm = np.zeros((self.input_h, self.input_w, 3), dtype=np.uint8) for _ in range(5): try: self._infer_single(warm) except Exception: break def __repr__(self) -> str: return ( f"BeverageMiner v8-hermestech input={self.input_h}x{self.input_w} " f"conf>={self.conf_threshold} iou={self.iou_thresh} " f"tta_match_iou={self.tta_match_iou} use_tta={self.use_tta} " f"providers={self.session.get_providers()}" ) # ---------------------------------------------------------------- preproc def _letterbox(self, image: ndarray) -> tuple[ndarray, float, tuple[float, float]]: """Aspect-preserving resize + 114-grey pad to (input_h, input_w). Cubic when upscaling (small-object fidelity), linear when downscaling. """ h, w = image.shape[:2] ratio = min(self.input_w / w, self.input_h / h) nw, nh = int(round(w * ratio)), int(round(h * ratio)) if (nw, nh) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR resized = cv2.resize(image, (nw, nh), interpolation=interp) else: resized = image canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8) dy = (self.input_h - nh) // 2 dx = (self.input_w - nw) // 2 canvas[dy:dy + nh, dx:dx + nw] = resized return canvas, ratio, (float(dx), float(dy)) def _preprocess(self, image_bgr: ndarray): canvas, ratio, pad = self._letterbox(image_bgr) rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB) x = (rgb.astype(np.float32) / 255.0).transpose(2, 0, 1)[None, ...] # v8 weights are FP16 — input dtype must match or onnxruntime errors with # "Unexpected input data type. Actual: float, expected: float16". return np.ascontiguousarray(x, dtype=np.float16), ratio, pad # ---------------------------------------------------------------- nms helpers @staticmethod def _hard_nms(boxes: ndarray, scores: ndarray, iou_thresh: float) -> ndarray: n = len(boxes) if n == 0: return np.array([], dtype=np.intp) order = np.argsort(scores)[::-1] keep: list[int] = [] suppressed = np.zeros(n, dtype=bool) for i in range(n): idx = order[i] if suppressed[idx]: continue keep.append(int(idx)) bi = boxes[idx] for k in range(i + 1, n): jdx = order[k] if suppressed[jdx]: continue bj = boxes[jdx] xx1 = max(bi[0], bj[0]); yy1 = max(bi[1], bj[1]) xx2 = min(bi[2], bj[2]); yy2 = min(bi[3], bj[3]) inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1) ai = (bi[2] - bi[0]) * (bi[3] - bi[1]) aj = (bj[2] - bj[0]) * (bj[3] - bj[1]) iou = inter / (ai + aj - inter + 1e-7) if iou > iou_thresh: suppressed[jdx] = True return np.array(keep, dtype=np.intp) def _per_class_hard_nms( self, boxes: ndarray, scores: ndarray, cls_ids: ndarray, iou_thresh: float ) -> ndarray: if len(boxes) == 0: return np.array([], dtype=np.intp) all_keep: list[int] = [] for c in np.unique(cls_ids): mask = cls_ids == c indices = np.where(mask)[0] keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) all_keep.extend(indices[keep].tolist()) all_keep.sort() return np.array(all_keep, dtype=np.intp) @staticmethod def _cross_class_dedup( boxes: ndarray, scores: ndarray, cls_ids: ndarray, iou_thresh: float ) -> tuple[ndarray, ndarray, ndarray]: """Suppress high-overlap duplicates across classes (FP reducer). Sort priority: larger boxes first, then higher score (alfred's heuristic). """ n = len(boxes) if n <= 1: return boxes, scores, cls_ids areas = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1]) order = np.lexsort((-scores, -areas)) suppressed = np.zeros(n, dtype=bool) keep: list[int] = [] for i in order: if suppressed[i]: continue keep.append(int(i)) bi = boxes[i] xx1 = np.maximum(bi[0], boxes[:, 0]); yy1 = np.maximum(bi[1], boxes[:, 1]) xx2 = np.minimum(bi[2], boxes[:, 2]); yy2 = np.minimum(bi[3], boxes[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) ai = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) iou = inter / (ai + areas - inter + 1e-7) dup = iou > iou_thresh dup[i] = False suppressed |= dup kept = np.array(keep, dtype=np.intp) return boxes[kept], scores[kept], cls_ids[kept] @staticmethod def _max_score_per_cluster( coords: ndarray, scores: ndarray, keep_idx: ndarray, iou_thresh: float ) -> ndarray: """For each kept box, return the max original score among all boxes that overlap it at IoU >= iou_thresh. Used post-TTA so consensus detections get pushed higher in the rank-ordered PR curve.""" if len(keep_idx) == 0: return np.array([], dtype=np.float32) out = np.empty(len(keep_idx), dtype=np.float32) for j, idx in enumerate(keep_idx): bi = coords[idx] xx1 = np.maximum(bi[0], coords[:, 0]); yy1 = np.maximum(bi[1], coords[:, 1]) xx2 = np.minimum(bi[2], coords[:, 2]); yy2 = np.minimum(bi[3], coords[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) ai = (bi[2] - bi[0]) * (bi[3] - bi[1]) aj = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1]) iou = inter / (ai + aj - inter + 1e-7) out[j] = float(np.max(scores[iou >= iou_thresh])) return out # ---------------------------------------------------------------- inference def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]: inp, ratio, (dx, dy) = self._preprocess(image_bgr) out = self.session.run(self.output_names, {self.input_name: inp})[0] if out.ndim == 3: out = out[0] confs = out[:, 4].astype(np.float32) keep = confs >= self.conf_threshold if not keep.any(): return [] out = out[keep] boxes = out[:, :4].astype(np.float32).copy() confs = out[:, 4].astype(np.float32) cls_ids = self.cls_remap[out[:, 5].astype(np.int32)] # Reverse letterbox: model-space xyxy -> original-image xyxy boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / ratio boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / ratio orig_h, orig_w = image_bgr.shape[:2] boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, orig_w - 1) boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, orig_h - 1) if len(boxes) > 1: keep_idx = self._per_class_hard_nms(boxes, confs, cls_ids, self.iou_thresh) keep_idx = keep_idx[: self.max_det] boxes = boxes[keep_idx] confs = confs[keep_idx] cls_ids = cls_ids[keep_idx] boxes, confs, cls_ids = self._cross_class_dedup( boxes, confs, cls_ids, self.cross_iou_thresh ) return self._to_boundingboxes(boxes, confs, cls_ids, orig_w, orig_h) def _infer_tta(self, image_bgr: ndarray) -> list[BoundingBox]: """Hermestech-style TTA consensus (port from hermestech00/person-detect-0): - all orig-view boxes accepted directly (conf_high=0.0) - flip-view ONLY used to boost orig scores at near-perfect IoU match - flip-only boxes added if no original-view overlap at tta_match_iou - final per-class NMS at iou_thresh (0.07) + geometry filters """ boxes_orig = self._infer_single(image_bgr) h, w = image_bgr.shape[:2] flipped = cv2.flip(image_bgr, 1) boxes_flip_raw = self._infer_single(flipped) boxes_flip = [ BoundingBox(x1=w - b.x2, y1=b.y1, x2=w - b.x1, y2=b.y2, cls_id=b.cls_id, conf=b.conf) for b in boxes_flip_raw ] if not boxes_orig and not boxes_flip: return [] coords_o = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0, 4), dtype=np.float32) scores_o = np.array([b.conf for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0,), dtype=np.float32) cls_o = np.array([b.cls_id for b in boxes_orig], dtype=np.int32) if boxes_orig else np.empty((0,), dtype=np.int32) coords_f = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0, 4), dtype=np.float32) scores_f = np.array([b.conf for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0,), dtype=np.float32) cls_f = np.array([b.cls_id for b in boxes_flip], dtype=np.int32) if boxes_flip else np.empty((0,), dtype=np.int32) acc_b: list[ndarray] = [] acc_s: list[float] = [] acc_c: list[int] = [] # Original-view loop: accept all >= conf_high directly; below, require flip match for i in range(len(coords_o)): sc = float(scores_o[i]) if sc >= self.conf_high: acc_b.append(coords_o[i]); acc_s.append(sc); acc_c.append(int(cls_o[i])) elif len(coords_f) > 0: ious = self._box_iou_one_to_many(coords_o[i], coords_f) j = int(np.argmax(ious)) if ious[j] >= self.tta_match_iou: acc_b.append(coords_o[i]) acc_s.append(max(sc, float(scores_f[j]))) acc_c.append(int(cls_o[i])) # Flipped-view loop: only add high-conf boxes that have NO match in original for i in range(len(coords_f)): sc = float(scores_f[i]) if sc < self.conf_high: continue if len(coords_o) == 0: acc_b.append(coords_f[i]); acc_s.append(sc); acc_c.append(int(cls_f[i])); continue ious = self._box_iou_one_to_many(coords_f[i], coords_o) if np.max(ious) < self.tta_match_iou: acc_b.append(coords_f[i]); acc_s.append(sc); acc_c.append(int(cls_f[i])) if not acc_b: return [] boxes = np.array(acc_b, dtype=np.float32) scores = np.array(acc_s, dtype=np.float32) cls_ids = np.array(acc_c, dtype=np.int32) keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thresh) if len(keep) == 0: return [] keep = keep[: self.max_det] # Apply geometry filters (min_w/h, aspect, area-ratio) via _to_boundingboxes return self._to_boundingboxes(boxes[keep], scores[keep], cls_ids[keep], w, h) @staticmethod def _box_iou_one_to_many(box: ndarray, others: ndarray) -> ndarray: """IoU of one box [x1,y1,x2,y2] vs Nx4 array of others. Returns 1-D scores.""" if len(others) == 0: return np.array([], dtype=np.float32) x1 = np.maximum(box[0], others[:, 0]); y1 = np.maximum(box[1], others[:, 1]) x2 = np.minimum(box[2], others[:, 2]); y2 = np.minimum(box[3], others[:, 3]) inter = np.maximum(0.0, x2 - x1) * np.maximum(0.0, y2 - y1) a = (box[2] - box[0]) * (box[3] - box[1]) b = (others[:, 2] - others[:, 0]) * (others[:, 3] - others[:, 1]) return inter / (a + b - inter + 1e-7) def _to_boundingboxes( self, boxes: ndarray, confs: ndarray, cls_ids: ndarray, orig_w: int, orig_h: int, ) -> list[BoundingBox]: out: list[BoundingBox] = [] for i in range(len(boxes)): x1, y1, x2, y2 = boxes[i] ix1 = max(0, min(orig_w, math.floor(x1))) iy1 = max(0, min(orig_h, math.floor(y1))) ix2 = max(0, min(orig_w, math.ceil(x2))) iy2 = max(0, min(orig_h, math.ceil(y2))) if ix2 <= ix1 or iy2 <= iy1: continue bw, bh = ix2 - ix1, iy2 - iy1 if bw * bh < self.min_box_area: continue if bw < self.min_w or bh < self.min_h: continue ar = max(bw / max(bh, 1), bh / max(bw, 1)) if ar > self.max_aspect_ratio: continue # NEW: reject boxes covering > max_box_area_ratio of frame (FP guard) if (bw * bh) / max(1, orig_w * orig_h) > self.max_box_area_ratio: continue out.append(BoundingBox( x1=ix1, y1=iy1, x2=ix2, y2=iy2, cls_id=int(cls_ids[i]), conf=max(0.0, min(1.0, float(confs[i]))), )) return out # ---------------------------------------------------------------- entry def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: infer = self._infer_tta if self.use_tta else self._infer_single results: list[TVFrameResult] = [] for idx, image in enumerate(batch_images): boxes = infer(image) results.append(TVFrameResult( frame_id=offset + idx, boxes=boxes, keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], )) return results