meaculpitt's picture
deploy push for beverage (deploy)
9c6ce31 verified
# build-marker: v8-yolo26s-fp16-1280-hermestech-pipeline
"""SN44 beverage detection miner — v8 (yolo26s FP16 at 1280, hermestech-style pipeline).
v8 (2026-05-04 ~22:30Z): two simultaneous changes from v7 (emu):
1. WEIGHTS: yolo26s @ 1280, FP16 ONNX (~18.7 MB). Trained natively in
validator class order [cup, bottle, can] on merged_v8 (~38k images),
epoch 40 best (mAP50=0.840 / mAP50-95=0.694). Replaces v7's yolo26n
(~10.3 MB FP32). FP16 quantization: <0.001 mAP loss vs FP32 export.
2. INFERENCE PIPELINE: ported from hermestech00/person-detect-0 (top-1
beverage miner). Aggressive precision-over-recall:
- conf_threshold 0.55 → 0.75
- iou_thresh 0.5 → 0.07 (very aggressive NMS)
- new max_aspect_ratio 5.0 (was 8.0)
- new max_box_area_ratio 0.85 (rejects frame-covering FPs)
- new min_w/min_h 6/6 (replaces min_side=8)
- TTA-consensus: all orig-view boxes accepted directly (conf_high=0.0);
flip view used only to BOOST scores at near-perfect IoU match
(tta_match_iou=0.99); flip-only boxes added if no orig overlap.
Offline mAP DROPS (~13% on this val set) but the manak0 manifest scores
0.6×mAP50 + 0.4×false_positive — the precision boost is expected to
dominate the FP component. Empirical: hermestech with this exact pipeline
is rank-1 (0.67 mean) vs our emu's 0.46 mean (rank 5).
OLD v7 (kept for context, see miner.py.v7_backup_*):
- alfred-aligned: conf=0.55, iou=0.5, TTA=union-then-NMS-then-boost
- yolo26n FP32 (~10.3 MB)
"""
import math
from pathlib import Path
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
"""yolo26n e2e ONNX miner for manak0/Detect-beverage-detect.
Chute platform calls predict_batch(batch_images, offset, n_keypoints).
"""
def __init__(self, path_hf_repo) -> None:
self.path_hf_repo = Path(path_hf_repo)
# Validator's positional class order from the SN44 element manifest.
self.class_names = ["cup", "bottle", "can"]
# v7: model trained natively in validator class order [cup, bottle, can]
# so cls_remap is identity (no remap needed).
self.cls_remap = np.arange(3, dtype=np.int32)
try:
ort.preload_dlls()
except Exception:
pass
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(self.path_hf_repo / "weights.onnx"),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
except Exception:
self.session = ort.InferenceSession(
str(self.path_hf_repo / "weights.onnx"),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [o.name for o in self.session.get_outputs()]
# Match exported ONNX resolution.
self.input_h = 1280
self.input_w = 1280
# hermestech-inspired aggressive filtering (top-1 beverage miner pattern).
# 2026-05-05: conf relaxed 0.75 → 0.60 after weevil scored 0.34 mean (n=3)
# vs emu baseline 0.465. Hypothesis: 0.75 was too aggressive for OUR
# yolo26s+FP16 model — model produces fewer high-conf detections than
# hermestech's. 0.60 should retain mid-conf real detections while
# still cutting noise.
self.conf_threshold = 0.60 # was 0.75 (hermestech's value); pre-hermestech was 0.55 # husky-collision-retry
self.iou_thresh = 0.07 # unchanged — aggressive NMS still desired
self.cross_iou_thresh = 0.7 # cross-class dedup (kept; hermestech omits)
self.max_det = 150
self.use_tta = True
# TTA-consensus thresholds (port of hermestech _merge_tta_consensus):
self.conf_high = 0.0 # ALL orig-view boxes accepted directly
self.tta_match_iou = 0.99 # near-perfect IoU required to fuse orig+flip scores
# Geometry filters (hermestech-tuned for beverage).
self.min_box_area = 144 # was 100 (12x12 vs 10x10)
self.min_w = 6 # NEW
self.min_h = 6 # NEW
self.max_aspect_ratio = 5.0 # was 8.0
self.max_box_area_ratio = 0.85 # NEW — reject frame-covering false positives
# GPU warmup.
warm = np.zeros((self.input_h, self.input_w, 3), dtype=np.uint8)
for _ in range(5):
try:
self._infer_single(warm)
except Exception:
break
def __repr__(self) -> str:
return (
f"BeverageMiner v8-hermestech input={self.input_h}x{self.input_w} "
f"conf>={self.conf_threshold} iou={self.iou_thresh} "
f"tta_match_iou={self.tta_match_iou} use_tta={self.use_tta} "
f"providers={self.session.get_providers()}"
)
# ---------------------------------------------------------------- preproc
def _letterbox(self, image: ndarray) -> tuple[ndarray, float, tuple[float, float]]:
"""Aspect-preserving resize + 114-grey pad to (input_h, input_w).
Cubic when upscaling (small-object fidelity), linear when downscaling.
"""
h, w = image.shape[:2]
ratio = min(self.input_w / w, self.input_h / h)
nw, nh = int(round(w * ratio)), int(round(h * ratio))
if (nw, nh) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
resized = cv2.resize(image, (nw, nh), interpolation=interp)
else:
resized = image
canvas = np.full((self.input_h, self.input_w, 3), 114, dtype=np.uint8)
dy = (self.input_h - nh) // 2
dx = (self.input_w - nw) // 2
canvas[dy:dy + nh, dx:dx + nw] = resized
return canvas, ratio, (float(dx), float(dy))
def _preprocess(self, image_bgr: ndarray):
canvas, ratio, pad = self._letterbox(image_bgr)
rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB)
x = (rgb.astype(np.float32) / 255.0).transpose(2, 0, 1)[None, ...]
# v8 weights are FP16 — input dtype must match or onnxruntime errors with
# "Unexpected input data type. Actual: float, expected: float16".
return np.ascontiguousarray(x, dtype=np.float16), ratio, pad
# ---------------------------------------------------------------- nms helpers
@staticmethod
def _hard_nms(boxes: ndarray, scores: ndarray, iou_thresh: float) -> ndarray:
n = len(boxes)
if n == 0:
return np.array([], dtype=np.intp)
order = np.argsort(scores)[::-1]
keep: list[int] = []
suppressed = np.zeros(n, dtype=bool)
for i in range(n):
idx = order[i]
if suppressed[idx]:
continue
keep.append(int(idx))
bi = boxes[idx]
for k in range(i + 1, n):
jdx = order[k]
if suppressed[jdx]:
continue
bj = boxes[jdx]
xx1 = max(bi[0], bj[0]); yy1 = max(bi[1], bj[1])
xx2 = min(bi[2], bj[2]); yy2 = min(bi[3], bj[3])
inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1)
ai = (bi[2] - bi[0]) * (bi[3] - bi[1])
aj = (bj[2] - bj[0]) * (bj[3] - bj[1])
iou = inter / (ai + aj - inter + 1e-7)
if iou > iou_thresh:
suppressed[jdx] = True
return np.array(keep, dtype=np.intp)
def _per_class_hard_nms(
self, boxes: ndarray, scores: ndarray, cls_ids: ndarray, iou_thresh: float
) -> ndarray:
if len(boxes) == 0:
return np.array([], dtype=np.intp)
all_keep: list[int] = []
for c in np.unique(cls_ids):
mask = cls_ids == c
indices = np.where(mask)[0]
keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh)
all_keep.extend(indices[keep].tolist())
all_keep.sort()
return np.array(all_keep, dtype=np.intp)
@staticmethod
def _cross_class_dedup(
boxes: ndarray, scores: ndarray, cls_ids: ndarray, iou_thresh: float
) -> tuple[ndarray, ndarray, ndarray]:
"""Suppress high-overlap duplicates across classes (FP reducer).
Sort priority: larger boxes first, then higher score (alfred's heuristic).
"""
n = len(boxes)
if n <= 1:
return boxes, scores, cls_ids
areas = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
order = np.lexsort((-scores, -areas))
suppressed = np.zeros(n, dtype=bool)
keep: list[int] = []
for i in order:
if suppressed[i]:
continue
keep.append(int(i))
bi = boxes[i]
xx1 = np.maximum(bi[0], boxes[:, 0]); yy1 = np.maximum(bi[1], boxes[:, 1])
xx2 = np.minimum(bi[2], boxes[:, 2]); yy2 = np.minimum(bi[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
ai = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1])))
iou = inter / (ai + areas - inter + 1e-7)
dup = iou > iou_thresh
dup[i] = False
suppressed |= dup
kept = np.array(keep, dtype=np.intp)
return boxes[kept], scores[kept], cls_ids[kept]
@staticmethod
def _max_score_per_cluster(
coords: ndarray, scores: ndarray, keep_idx: ndarray, iou_thresh: float
) -> ndarray:
"""For each kept box, return the max original score among all boxes
that overlap it at IoU >= iou_thresh. Used post-TTA so consensus
detections get pushed higher in the rank-ordered PR curve."""
if len(keep_idx) == 0:
return np.array([], dtype=np.float32)
out = np.empty(len(keep_idx), dtype=np.float32)
for j, idx in enumerate(keep_idx):
bi = coords[idx]
xx1 = np.maximum(bi[0], coords[:, 0]); yy1 = np.maximum(bi[1], coords[:, 1])
xx2 = np.minimum(bi[2], coords[:, 2]); yy2 = np.minimum(bi[3], coords[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
ai = (bi[2] - bi[0]) * (bi[3] - bi[1])
aj = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1])
iou = inter / (ai + aj - inter + 1e-7)
out[j] = float(np.max(scores[iou >= iou_thresh]))
return out
# ---------------------------------------------------------------- inference
def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]:
inp, ratio, (dx, dy) = self._preprocess(image_bgr)
out = self.session.run(self.output_names, {self.input_name: inp})[0]
if out.ndim == 3:
out = out[0]
confs = out[:, 4].astype(np.float32)
keep = confs >= self.conf_threshold
if not keep.any():
return []
out = out[keep]
boxes = out[:, :4].astype(np.float32).copy()
confs = out[:, 4].astype(np.float32)
cls_ids = self.cls_remap[out[:, 5].astype(np.int32)]
# Reverse letterbox: model-space xyxy -> original-image xyxy
boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / ratio
boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / ratio
orig_h, orig_w = image_bgr.shape[:2]
boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, orig_w - 1)
boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, orig_h - 1)
if len(boxes) > 1:
keep_idx = self._per_class_hard_nms(boxes, confs, cls_ids, self.iou_thresh)
keep_idx = keep_idx[: self.max_det]
boxes = boxes[keep_idx]
confs = confs[keep_idx]
cls_ids = cls_ids[keep_idx]
boxes, confs, cls_ids = self._cross_class_dedup(
boxes, confs, cls_ids, self.cross_iou_thresh
)
return self._to_boundingboxes(boxes, confs, cls_ids, orig_w, orig_h)
def _infer_tta(self, image_bgr: ndarray) -> list[BoundingBox]:
"""Hermestech-style TTA consensus (port from hermestech00/person-detect-0):
- all orig-view boxes accepted directly (conf_high=0.0)
- flip-view ONLY used to boost orig scores at near-perfect IoU match
- flip-only boxes added if no original-view overlap at tta_match_iou
- final per-class NMS at iou_thresh (0.07) + geometry filters
"""
boxes_orig = self._infer_single(image_bgr)
h, w = image_bgr.shape[:2]
flipped = cv2.flip(image_bgr, 1)
boxes_flip_raw = self._infer_single(flipped)
boxes_flip = [
BoundingBox(x1=w - b.x2, y1=b.y1, x2=w - b.x1, y2=b.y2,
cls_id=b.cls_id, conf=b.conf)
for b in boxes_flip_raw
]
if not boxes_orig and not boxes_flip:
return []
coords_o = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0, 4), dtype=np.float32)
scores_o = np.array([b.conf for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0,), dtype=np.float32)
cls_o = np.array([b.cls_id for b in boxes_orig], dtype=np.int32) if boxes_orig else np.empty((0,), dtype=np.int32)
coords_f = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0, 4), dtype=np.float32)
scores_f = np.array([b.conf for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0,), dtype=np.float32)
cls_f = np.array([b.cls_id for b in boxes_flip], dtype=np.int32) if boxes_flip else np.empty((0,), dtype=np.int32)
acc_b: list[ndarray] = []
acc_s: list[float] = []
acc_c: list[int] = []
# Original-view loop: accept all >= conf_high directly; below, require flip match
for i in range(len(coords_o)):
sc = float(scores_o[i])
if sc >= self.conf_high:
acc_b.append(coords_o[i]); acc_s.append(sc); acc_c.append(int(cls_o[i]))
elif len(coords_f) > 0:
ious = self._box_iou_one_to_many(coords_o[i], coords_f)
j = int(np.argmax(ious))
if ious[j] >= self.tta_match_iou:
acc_b.append(coords_o[i])
acc_s.append(max(sc, float(scores_f[j])))
acc_c.append(int(cls_o[i]))
# Flipped-view loop: only add high-conf boxes that have NO match in original
for i in range(len(coords_f)):
sc = float(scores_f[i])
if sc < self.conf_high:
continue
if len(coords_o) == 0:
acc_b.append(coords_f[i]); acc_s.append(sc); acc_c.append(int(cls_f[i])); continue
ious = self._box_iou_one_to_many(coords_f[i], coords_o)
if np.max(ious) < self.tta_match_iou:
acc_b.append(coords_f[i]); acc_s.append(sc); acc_c.append(int(cls_f[i]))
if not acc_b:
return []
boxes = np.array(acc_b, dtype=np.float32)
scores = np.array(acc_s, dtype=np.float32)
cls_ids = np.array(acc_c, dtype=np.int32)
keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thresh)
if len(keep) == 0:
return []
keep = keep[: self.max_det]
# Apply geometry filters (min_w/h, aspect, area-ratio) via _to_boundingboxes
return self._to_boundingboxes(boxes[keep], scores[keep], cls_ids[keep], w, h)
@staticmethod
def _box_iou_one_to_many(box: ndarray, others: ndarray) -> ndarray:
"""IoU of one box [x1,y1,x2,y2] vs Nx4 array of others. Returns 1-D scores."""
if len(others) == 0:
return np.array([], dtype=np.float32)
x1 = np.maximum(box[0], others[:, 0]); y1 = np.maximum(box[1], others[:, 1])
x2 = np.minimum(box[2], others[:, 2]); y2 = np.minimum(box[3], others[:, 3])
inter = np.maximum(0.0, x2 - x1) * np.maximum(0.0, y2 - y1)
a = (box[2] - box[0]) * (box[3] - box[1])
b = (others[:, 2] - others[:, 0]) * (others[:, 3] - others[:, 1])
return inter / (a + b - inter + 1e-7)
def _to_boundingboxes(
self, boxes: ndarray, confs: ndarray, cls_ids: ndarray,
orig_w: int, orig_h: int,
) -> list[BoundingBox]:
out: list[BoundingBox] = []
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
ix1 = max(0, min(orig_w, math.floor(x1)))
iy1 = max(0, min(orig_h, math.floor(y1)))
ix2 = max(0, min(orig_w, math.ceil(x2)))
iy2 = max(0, min(orig_h, math.ceil(y2)))
if ix2 <= ix1 or iy2 <= iy1:
continue
bw, bh = ix2 - ix1, iy2 - iy1
if bw * bh < self.min_box_area:
continue
if bw < self.min_w or bh < self.min_h:
continue
ar = max(bw / max(bh, 1), bh / max(bw, 1))
if ar > self.max_aspect_ratio:
continue
# NEW: reject boxes covering > max_box_area_ratio of frame (FP guard)
if (bw * bh) / max(1, orig_w * orig_h) > self.max_box_area_ratio:
continue
out.append(BoundingBox(
x1=ix1, y1=iy1, x2=ix2, y2=iy2,
cls_id=int(cls_ids[i]),
conf=max(0.0, min(1.0, float(confs[i]))),
))
return out
# ---------------------------------------------------------------- entry
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
infer = self._infer_tta if self.use_tta else self._infer_single
results: list[TVFrameResult] = []
for idx, image in enumerate(batch_images):
boxes = infer(image)
results.append(TVFrameResult(
frame_id=offset + idx,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
))
return results