ScoreVision / miner.py
baxtos's picture
scorevision: push artifact
0859aca verified
# build-rev: 2026-05-28-v14 (yolo11s trained on validator-IDENTICAL SAM3-GT)
"""Open-source Detect-beverage miner v14 (yolo11s trained on SAM3 validator-GT).
Trained on 329 frames labelled by the SAME SAM3 endpoint the validator uses to
build pseudo-GT (prompts cup/bottle/can, thr 0.5, mosaic 0) — i.e. the actual
scoring target, not peer miners' boxes. NMS-baked ONNX, output [1,300,6].
On 50 SAM3-GT holdout (validator-target): mAP50=0.715 (navierstocks 0.673,
v12 0.645); best composite UI 63.47% (nav 62.91%, v12 61.97%). Beats peers on
detection; parity-plus on composite.
Post-proc:
- detect NMS-baked output and unpack to (N, 4+num_classes) one-hot scores
- per-class conf filter `[0.70, 0.50, 0.50]` (best v14 sweep on SAM3-GT)
- sane-box geometric filter (min_box_area=100, max_aspect_ratio=10)
- per-class hard NMS @ iou=0.4 (redundant after baked NMS but safe)
- cross-class dedup @ iou=0.7 (helps bottle↔can misclassification FP)
- TTA off
Contract: class `Miner` at HF root, `predict_batch(...) -> list[TVFrameResult]`.
"""
from __future__ import annotations
from pathlib import Path
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
weights_file = "best.onnx"
input_size = 1280
num_classes = 3 # cup, bottle, can
# per-class conf — best v14 sweep on SAM3-GT holdout (composite 63.47%):
conf_thres = np.array([0.70, 0.50, 0.50], dtype=np.float32)
# rescue bonus disabled
rescue_bonus = np.array([0.0, 0.0, 0.0], dtype=np.float32)
iou_thres = 0.40 # per-class NMS (redundant after baked-NMS but safe)
cross_iou_thres = 0.70 # cross-class dedup
containment_thres = 1.00 # OFF
min_box_area = 100.0
min_side = 8.0
max_aspect_ratio = 10.0
max_det = 300 # match NMS-baked graph max_det
use_flip_tta = False # flip-TTA hurt UI on NMS-baked v12 (sweep -0.8 pp)
def __init__(self, path_hf_repo: Path) -> None:
so = ort.SessionOptions()
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.sess = ort.InferenceSession(
str(Path(path_hf_repo) / self.weights_file),
providers=[
("CUDAExecutionProvider", {"device_id": 0}),
"CPUExecutionProvider",
],
sess_options=so,
)
self.inp = self.sess.get_inputs()[0].name
_ort_type = self.sess.get_inputs()[0].type # "tensor(float16)" or fp32
self.np_dtype = np.float16 if "float16" in _ort_type else np.float32
active = self.sess.get_providers()[0]
print(f"✅ v14 ONNX beverage model loaded (provider={active}, dtype={self.np_dtype.__name__})")
# Detect output format once
out0 = self.sess.get_outputs()[0]
print(f"ONNX output: name={out0.name} shape={out0.shape}")
# Eager CUDA EP allocation: ORT lazily binds CUDA on first sess.run,
# TEE cold-bind eats 30-300s otherwise.
try:
dummy = np.zeros((self.input_size, self.input_size, 3), dtype=np.uint8)
_ = self._infer(dummy)
print(f"✅ v14 ONNX warmup pass completed (provider={active})")
except Exception as e:
print(f"⚠️ v14 ONNX warmup pass failed (not fatal): {e}")
def __repr__(self) -> str:
return f"BeverageONNXv14(in={self.input_size}, cls={self.num_classes})"
# ---- preprocessing --------------------------------------------------
def _letterbox(self, im: ndarray) -> tuple[ndarray, float]:
h0, w0 = im.shape[:2]
s = min(self.input_size / h0, self.input_size / w0)
nh, nw = int(round(h0 * s)), int(round(w0 * s))
# INTER_CUBIC if upsampling, INTER_LINEAR if downsampling (peer trick)
interp = cv2.INTER_CUBIC if s > 1.0 else cv2.INTER_LINEAR
r = cv2.resize(im, (nw, nh), interpolation=interp)
out = np.full((self.input_size, self.input_size, 3), 114, np.uint8)
out[:nh, :nw] = r
return out, s
def _infer(self, im_bgr: ndarray) -> ndarray:
lb, s = self._letterbox(im_bgr)
x = (lb[:, :, ::-1].transpose(2, 0, 1)[None].astype(np.float32) / 255.0
).astype(self.np_dtype)
raw = self.sess.run(None, {self.inp: x})[0]
raw = np.asarray(raw, dtype=np.float32)
# NMS-baked output: [1, N, 6] = (x1, y1, x2, y2, conf, cls)
if raw.ndim == 3 and raw.shape[-1] == 6:
arr = raw[0]
keep = arr[:, 4] > 0 # drop zero-padding rows
arr = arr[keep]
if len(arr) == 0:
return np.zeros((0, 4 + self.num_classes), dtype=np.float32)
boxes = arr[:, :4].copy() / s # letterbox → orig coords
confs = arr[:, 4]
cls_ids = arr[:, 5].astype(np.int32)
cls_ids = np.clip(cls_ids, 0, self.num_classes - 1)
scores = np.zeros((len(arr), self.num_classes), dtype=np.float32)
scores[np.arange(len(arr)), cls_ids] = confs
return np.concatenate([boxes, scores], axis=1)
# Legacy raw YOLO output: [1, 4+nc, N] or [1, N, 4+nc] (xywh-center)
out = raw[0]
p = out.T if out.shape[0] < out.shape[1] else out # → (N, 4+nc)
boxes = p[:, :4].copy()
scores = p[:, 4:4 + self.num_classes]
xy = boxes[:, :2]
wh = boxes[:, 2:4]
x1y1 = (xy - wh / 2) / s
x2y2 = (xy + wh / 2) / s
return np.concatenate([x1y1, x2y2, scores], axis=1)
# ---- post-processing primitives -------------------------------------
@staticmethod
def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray:
if len(boxes) == 0:
return np.array([], dtype=np.intp)
order = np.argsort(-scores)
keep: list[int] = []
while len(order):
i = int(order[0])
keep.append(i)
if len(order) == 1:
break
rest = order[1:]
xx1 = np.maximum(boxes[i, 0], boxes[rest, 0])
yy1 = np.maximum(boxes[i, 1], boxes[rest, 1])
xx2 = np.minimum(boxes[i, 2], boxes[rest, 2])
yy2 = np.minimum(boxes[i, 3], boxes[rest, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
ai = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
ar = (boxes[rest, 2] - boxes[rest, 0]) * (boxes[rest, 3] - boxes[rest, 1])
iou = inter / (ai + ar - inter + 1e-7)
order = rest[iou <= iou_thresh]
return np.array(keep, dtype=np.intp)
def _sane_filter(self, boxes: np.ndarray, scores: np.ndarray, cls: np.ndarray,
orig_h: int, orig_w: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
if len(boxes) == 0:
return boxes, scores, cls
bw = np.maximum(0.0, boxes[:, 2] - boxes[:, 0])
bh = np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
area = bw * bh
ar = np.where(
(bw > 0) & (bh > 0),
np.maximum(bw / np.maximum(bh, 1e-6), bh / np.maximum(bw, 1e-6)),
np.inf,
)
keep = (
(bw >= self.min_side) & (bh >= self.min_side)
& (area >= self.min_box_area)
& (area <= 0.95 * orig_h * orig_w)
& (ar <= self.max_aspect_ratio)
)
return boxes[keep], scores[keep], cls[keep]
def _conf_filter_with_rescue(self, scores: np.ndarray, cls: np.ndarray) -> np.ndarray:
if len(scores) == 0:
return np.zeros(0, dtype=bool)
keep = scores >= self.conf_thres[cls]
# per-class rescue: if class c has zero passes, admit top-1 candidate
# whose conf >= conf_thres[c] - rescue_bonus[c]
for c in np.unique(cls):
b = float(self.rescue_bonus[c])
if b <= 0.0:
continue
cm = cls == c
if keep[cm].any():
continue
idx = np.where(cm)[0]
top = int(idx[int(np.argmax(scores[idx]))])
if scores[top] >= self.conf_thres[c] - b:
keep[top] = True
return keep
def _cross_class_dedup(self, boxes: np.ndarray, scores: np.ndarray, cls: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Drop dup boxes between classes (one object getting two cls labels).
Lexsort by larger margin-over-threshold first, then larger area."""
n = len(boxes)
if n <= 1:
return boxes, scores, cls
areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
margins = scores - self.conf_thres[cls]
order = np.lexsort((-areas, -margins))
suppressed = np.zeros(n, dtype=bool)
keep: list[int] = []
for i in order:
if suppressed[i]:
continue
keep.append(int(i))
bi = boxes[i]
xx1 = np.maximum(bi[0], boxes[:, 0])
yy1 = np.maximum(bi[1], boxes[:, 1])
xx2 = np.minimum(bi[2], boxes[:, 2])
yy2 = np.minimum(bi[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
ai = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1])))
iou = inter / (ai + areas - inter + 1e-7)
dup = iou > self.cross_iou_thres
dup[i] = False
suppressed |= dup
idx = np.array(keep, dtype=np.intp)
return boxes[idx], scores[idx], cls[idx]
def _containment_dedup(self, boxes: np.ndarray, scores: np.ndarray, cls: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Drop a box if ≥ containment_thres of its area is inside a same-class
box that is larger (or equal-size with higher conf). Catches the
bottle-inside-bottle / cup-inside-cup pattern YOLO often produces."""
n = len(boxes)
if n <= 1:
return boxes, scores, cls
area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
iw = np.maximum(0.0, np.minimum(boxes[:, 2:3], boxes[None, :, 2])
- np.maximum(boxes[:, 0:1], boxes[None, :, 0]))
ih = np.maximum(0.0, np.minimum(boxes[:, 3:4], boxes[None, :, 3])
- np.maximum(boxes[:, 1:2], boxes[None, :, 1]))
inter = iw * ih
contain = inter / np.maximum(area[:, None], 1e-9) # frac of i contained in j
same_class = cls[:, None] == cls[None, :]
bigger = area[None, :] > area[:, None]
tiebreak = (area[None, :] == area[:, None]) & (scores[None, :] > scores[:, None])
dominator = same_class & (bigger | tiebreak)
np.fill_diagonal(dominator, False)
suppressed = ((contain >= self.containment_thres) & dominator).any(axis=1)
keep = np.where(~suppressed)[0]
return boxes[keep], scores[keep], cls[keep]
def _cluster_boost(self, kept_boxes: np.ndarray, kept_cls: np.ndarray,
all_boxes: np.ndarray, all_scores: np.ndarray, all_cls: np.ndarray,
) -> np.ndarray:
"""For each kept box, return max conf among same-class boxes overlapping
with IoU≥iou_thres (incl. itself). TTA confidence aggregation."""
n = len(kept_boxes)
if n == 0:
return np.empty(0, dtype=np.float32)
all_areas = (np.maximum(0.0, all_boxes[:, 2] - all_boxes[:, 0])
* np.maximum(0.0, all_boxes[:, 3] - all_boxes[:, 1]))
out = np.empty(n, dtype=np.float32)
for i in range(n):
bi = kept_boxes[i]
xx1 = np.maximum(bi[0], all_boxes[:, 0])
yy1 = np.maximum(bi[1], all_boxes[:, 1])
xx2 = np.minimum(bi[2], all_boxes[:, 2])
yy2 = np.minimum(bi[3], all_boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
ai = max(0.0, float((bi[2] - bi[0]) * (bi[3] - bi[1])))
iou = inter / (ai + all_areas - inter + 1e-7)
cluster = (iou >= self.iou_thres) & (all_cls == kept_cls[i])
out[i] = float(np.max(all_scores[cluster])) if np.any(cluster) else 0.0
return out
# ---- top-level detect with TTA --------------------------------------
def _detect(self, im_bgr: ndarray) -> list[BoundingBox]:
orig_h, orig_w = im_bgr.shape[:2]
# 1. Inference + optional flip TTA
det = self._infer(im_bgr)
if self.use_flip_tta:
fl = self._infer(im_bgr[:, ::-1])
W = im_bgr.shape[1]
x1n = W - fl[:, 2]
x2n = W - fl[:, 0]
fl[:, 0], fl[:, 2] = x1n, x2n
det = np.concatenate([det, fl], axis=0)
# 2. Pick class + per-class conf filter + rescue
boxes = det[:, :4]
cls_all = det[:, 4:].argmax(1).astype(np.int32)
conf_all = det[:, 4:].max(1)
keep = self._conf_filter_with_rescue(conf_all, cls_all)
boxes, scores, cls = boxes[keep], conf_all[keep], cls_all[keep]
if len(boxes) == 0:
return []
# 3. Sane filter (geometric)
boxes, scores, cls = self._sane_filter(boxes, scores, cls, orig_h, orig_w)
if len(boxes) == 0:
return []
# Keep raw cluster for boost (before any dedup)
raw_boxes, raw_scores, raw_cls = boxes.copy(), scores.copy(), cls.copy()
# 4. Per-class hard NMS
keep_idx: list[int] = []
for c in np.unique(cls):
m = cls == c
mi = np.where(m)[0]
k = self._hard_nms(boxes[m], scores[m], self.iou_thres)
keep_idx.extend(mi[k].tolist())
keep_idx.sort()
ki = np.array(keep_idx, dtype=np.intp)
boxes, scores, cls = boxes[ki], scores[ki], cls[ki]
# 5. Containment dedup (drop a box mostly inside same-class bigger box)
boxes, scores, cls = self._containment_dedup(boxes, scores, cls)
# 6. Cross-class dedup (one object → one class only)
boxes, scores, cls = self._cross_class_dedup(boxes, scores, cls)
# 7. Cluster-boost confidence (TTA aggregation)
if len(boxes):
boosted = self._cluster_boost(boxes, cls, raw_boxes, raw_scores, raw_cls)
else:
boosted = scores
# 8. Cap at max_det
if len(boxes) > self.max_det:
top = np.argsort(-boosted)[: self.max_det]
boxes, cls, boosted = boxes[top], cls[top], boosted[top]
out: list[BoundingBox] = []
for (x1, y1, x2, y2), c, s in zip(boxes, cls, boosted):
if x2 <= x1 or y2 <= y1:
continue
out.append(BoundingBox(
x1=int(x1), y1=int(y1), x2=int(x2), y2=int(y2),
cls_id=int(c), conf=float(min(1.0, max(0.0, s))),
))
return out
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for i, img in enumerate(batch_images):
try:
boxes = self._detect(np.ascontiguousarray(img))
except Exception as e: # never crash the chute
print(f"⚠️ v9 frame {offset + i} detect error: {e}")
boxes = []
results.append(TVFrameResult(
frame_id=offset + i, boxes=boxes,
keypoints=[(0, 0) for _ in range(n_keypoints)]))
return results