Beverage0 / miner.py
coolroman's picture
scorevision: push artifact
20faeb4 verified
"""TurboVision beverage detection miner — score-beverage-v3.
YOLO11s @ 1280x1280, 3-class beverage detection (bottle/can/cup),
ONNX with end-to-end NMS baked in (output [1, 300, 6] = x1, y1, x2, y2, conf, cls).
Inference pipeline (v3):
1) Primary forward pass on the full image.
2) Hflip TTA: forward on horizontally-flipped image, transform boxes back.
3) Per-class hard-NMS to merge primary + flip outputs.
4) Cross-class IoU dedup (suppresses same physical object getting two class labels).
5) Consensus-confidence boost: when both views agree on a cluster, take the max
score so true-positives rank higher in the validator's PR curve.
6) Sanity filter (min size, aspect ratio).
"""
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
model_path = path_hf_repo / "weights.onnx"
# OUTPUT order: must match the validator manifest's `objects` list
# exactly so cls_id ints we emit map to the correct GT classes.
# manifest: ["cup", "bottle", "can"] -> cup=0, bottle=1, can=2.
cn_path = model_path.with_name("class_names.txt")
if cn_path.is_file():
self.class_names = [
ln.strip()
for ln in cn_path.read_text(encoding="utf-8").splitlines()
if ln.strip() and not ln.strip().startswith("#")
]
else:
self.class_names = ["cup", "bottle", "can"]
# Order our weights.onnx was TRAINED in (per v2_dataset/data.yaml on the
# training box: 0=bottle, 1=can, 2=cup). cls_remap maps the model's raw
# output cls_id -> our class_names index (= validator manifest cls_id).
model_class_order = ["bottle", "can", "cup"]
self.cls_remap = np.array(
[self.class_names.index(n) for n in model_class_order], dtype=np.int32
)
print("ORT version:", ort.__version__)
try:
ort.preload_dlls()
print("✅ onnxruntime.preload_dlls() success")
except Exception as e:
print(f"⚠️ preload_dlls failed: {e}")
print("ORT available providers BEFORE session:", ort.get_available_providers())
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
print("✅ Created ORT session with preferred CUDA provider list")
except Exception as e:
print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}")
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
print("ORT session providers:", self.session.get_providers())
inp = self.session.get_inputs()[0]
self.input_name = inp.name
self.output_names = [o.name for o in self.session.get_outputs()]
self.input_shape = inp.shape
self.input_dtype = np.float16 if "float16" in inp.type else np.float32
self.input_height = self._safe_dim(self.input_shape[2], default=1280)
self.input_width = self._safe_dim(self.input_shape[3], default=1280)
# Tuned on local benchmark vs rival-proxy GT (5/2/2026):
# V3 = consensus filter + hflip TTA. Multi-scale, cross-class tighter,
# strict-consensus-across-multi-views all tested and either hurt or
# matched. V3 is the local optimum.
self.conf_thres = 0.40
self.iou_thres = 0.5
self.cross_iou_thresh = 0.7
self.max_det = 300
self.use_tta = True
# Consensus TTA — our edge. None of the top miners (5FBnd/5CiAr/5CtY4)
# do this; they keep all-view union and only boost cluster scores.
self.use_consensus_tta = True
self.consensus_iou = 0.5
self.require_strict_consensus = False
# Multi-scale tested + abandoned: it loosened consensus and hurt FP
# suppression more than it helped recall on rival-proxy GT.
self.use_multi_scale_tta = False
self.tta_scale = 0.85
# Sanity filter — reject obviously bad boxes
self.min_box_area = 6 * 6
self.min_side = 4
self.max_aspect_ratio = 8.0
self.max_box_area_ratio = 0.95
print(f"✅ ONNX loaded: {model_path}")
print(f"✅ providers: {self.session.get_providers()}")
print(f"✅ input: name={self.input_name}, shape={self.input_shape}, dtype={self.input_dtype}")
print(f"✅ classes: {self.class_names}")
print(f"✅ config: conf={self.conf_thres}, iou={self.iou_thres}, "
f"cross_iou={self.cross_iou_thresh}, TTA={self.use_tta}")
def __repr__(self) -> str:
return (
f"ONNXRuntime(session={type(self.session).__name__}, "
f"providers={self.session.get_providers()})"
)
@staticmethod
def _safe_dim(value, default: int) -> int:
return value if isinstance(value, int) and value > 0 else default
def _letterbox(
self,
image: ndarray,
new_shape: tuple[int, int],
color=(114, 114, 114),
) -> tuple[ndarray, float, tuple[float, float]]:
h, w = image.shape[:2]
new_w, new_h = new_shape
ratio = min(new_w / w, new_h / h)
resized_w = int(round(w * ratio))
resized_h = int(round(h * ratio))
if (resized_w, resized_h) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
dw = (new_w - resized_w) / 2.0
dh = (new_h - resized_h) / 2.0
left = int(round(dw - 0.1))
right = int(round(dw + 0.1))
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
padded = cv2.copyMakeBorder(
image, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=color,
)
return padded, ratio, (dw, dh)
def _preprocess(self, image: ndarray):
orig_h, orig_w = image.shape[:2]
img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(self.input_dtype) / 255.0
img = np.transpose(img, (2, 0, 1))[None, ...]
img = np.ascontiguousarray(img)
return img, ratio, pad, (orig_w, orig_h)
@staticmethod
def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
w, h = image_size
boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
return boxes
def _filter_sane_boxes(
self,
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
orig_size: tuple[int, int],
):
if len(boxes) == 0:
return boxes, scores, cls_ids
orig_w, orig_h = orig_size
image_area = float(orig_w * orig_h)
keep = []
for i, box in enumerate(boxes):
x1, y1, x2, y2 = box.tolist()
bw = x2 - x1
bh = y2 - y1
if bw <= 0 or bh <= 0:
continue
if bw < self.min_side or bh < self.min_side:
continue
area = bw * bh
if area < self.min_box_area:
continue
if area > self.max_box_area_ratio * image_area:
continue
ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6))
if ar > self.max_aspect_ratio:
continue
keep.append(i)
if not keep:
return (
np.empty((0, 4), dtype=np.float32),
np.empty((0,), dtype=np.float32),
np.empty((0,), dtype=np.int32),
)
k = np.array(keep, dtype=np.intp)
return boxes[k], scores[k], cls_ids[k]
@staticmethod
def _hard_nms(
boxes: np.ndarray,
scores: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp)
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(scores)[::-1]
keep: list[int] = []
suppressed = np.zeros(N, dtype=bool)
for i in range(N):
idx = order[i]
if suppressed[idx]:
continue
keep.append(int(idx))
bi = boxes[idx]
for k in range(i + 1, N):
jdx = order[k]
if suppressed[jdx]:
continue
bj = boxes[jdx]
xx1 = max(bi[0], bj[0])
yy1 = max(bi[1], bj[1])
xx2 = min(bi[2], bj[2])
yy2 = min(bi[3], bj[3])
inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1)
area_i = (bi[2] - bi[0]) * (bi[3] - bi[1])
area_j = (bj[2] - bj[0]) * (bj[3] - bj[1])
iou = inter / (area_i + area_j - inter + 1e-7)
if iou > iou_thresh:
suppressed[jdx] = True
return np.array(keep, dtype=np.intp)
def _per_class_hard_nms(
self,
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
if len(boxes) == 0:
return np.array([], dtype=np.intp)
all_keep: list[int] = []
for c in np.unique(cls_ids):
mask = cls_ids == c
indices = np.where(mask)[0]
keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh)
all_keep.extend(indices[keep].tolist())
all_keep.sort()
return np.array(all_keep, dtype=np.intp)
@staticmethod
def _cross_class_dedup(
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
iou_thresh: float,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
n = len(boxes)
if n <= 1:
return boxes, scores, cls_ids
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
cls_ids = np.asarray(cls_ids, dtype=np.int32)
areas = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(
0.0, boxes[:, 3] - boxes[:, 1]
)
# Keep larger boxes first, then higher score.
order = np.lexsort((-scores, -areas))
suppressed = np.zeros(n, dtype=bool)
keep: list[int] = []
for i in order:
if suppressed[i]:
continue
keep.append(int(i))
bi = boxes[i]
xx1 = np.maximum(bi[0], boxes[:, 0])
yy1 = np.maximum(bi[1], boxes[:, 1])
xx2 = np.minimum(bi[2], boxes[:, 2])
yy2 = np.minimum(bi[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1])))
union = area_i + areas - inter + 1e-7
iou = inter / union
dup = iou > iou_thresh
dup[i] = False
suppressed |= dup
keep_idx = np.array(keep, dtype=np.intp)
return boxes[keep_idx], scores[keep_idx], cls_ids[keep_idx]
@staticmethod
def _max_score_per_cluster(
coords: np.ndarray,
scores: np.ndarray,
keep_indices: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
n_keep = len(keep_indices)
if n_keep == 0:
return np.array([], dtype=np.float32)
coords = np.asarray(coords, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
out = np.empty(n_keep, dtype=np.float32)
for i in range(n_keep):
idx = keep_indices[i]
bi = coords[idx]
xx1 = np.maximum(bi[0], coords[:, 0])
yy1 = np.maximum(bi[1], coords[:, 1])
xx2 = np.minimum(bi[2], coords[:, 2])
yy2 = np.minimum(bi[3], coords[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = (bi[2] - bi[0]) * (bi[3] - bi[1])
areas_j = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1])
iou = inter / (area_i + areas_j - inter + 1e-7)
in_cluster = iou >= iou_thresh
out[i] = float(np.max(scores[in_cluster]))
return out
def _decode_raw_dets(
self,
preds: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Decode end2end NMS output and return (boxes, scores, cls_ids)
in original image coordinates, after conf-threshold + remap + letterbox-reverse + sanity."""
if preds.ndim == 3 and preds.shape[0] == 1:
preds = preds[0]
if preds.ndim != 2 or preds.shape[1] < 6:
raise ValueError(f"Unexpected ONNX output shape: {preds.shape}")
boxes = preds[:, :4].astype(np.float32)
scores = preds[:, 4].astype(np.float32)
cls_ids = preds[:, 5].astype(np.int32)
valid = (cls_ids >= 0) & (cls_ids < len(self.cls_remap))
boxes, scores, cls_ids = boxes[valid], scores[valid], cls_ids[valid]
cls_ids = self.cls_remap[cls_ids]
keep = scores >= self.conf_thres
boxes = boxes[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes) == 0:
return (
np.empty((0, 4), dtype=np.float32),
np.empty((0,), dtype=np.float32),
np.empty((0,), dtype=np.int32),
)
pad_w, pad_h = pad
orig_w, orig_h = orig_size
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, (orig_w, orig_h))
boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size)
return boxes, scores, cls_ids
def _forward(
self, image: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
x, ratio, pad, orig_size = self._preprocess(image)
out = self.session.run(self.output_names, {self.input_name: x})[0]
return self._decode_raw_dets(out, ratio, pad, orig_size)
def _predict_single(self, image: np.ndarray) -> list[BoundingBox]:
boxes, scores, cls_ids = self._forward(image)
if len(boxes) == 0:
return []
return self._build_results(boxes, scores, cls_ids)
def _forward_scaled(self, image: np.ndarray, scale: float):
"""Forward pass on a scale-augmented image; transform boxes back to original coords."""
if scale == 1.0:
return self._forward(image)
h, w = image.shape[:2]
nh, nw = int(round(h * scale)), int(round(w * scale))
scaled = cv2.resize(image, (nw, nh), interpolation=cv2.INTER_CUBIC if scale > 1.0 else cv2.INTER_LINEAR)
b, s, c = self._forward(scaled)
if len(b):
b = b / scale # scale boxes back to original image coords
b = self._clip_boxes(b, (w, h))
return b, s, c
def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]:
"""Multi-view TTA with consensus filter.
Views (configurable):
v1 = primary forward (1.0x)
v2 = horizontal flip
v3 = downscaled forward (tta_scale, e.g. 0.85x) — catches small objects
Consensus filter (use_consensus_tta=True):
A box from v1 is kept iff it is confirmed by ≥1 OTHER view (v2 or v3)
at IoU >= consensus_iou with same class. Score = max across confirming
views. None of the top miners do this — this is our edge.
Merge (use_consensus_tta=False, fallback): union all views, per-class
hard-NMS, max-score boost on clusters.
"""
ow = image.shape[1]
# v1: primary
b1, s1, c1 = self._forward(image)
# v2: hflip
flipped = cv2.flip(image, 1)
b2, s2, c2 = self._forward(flipped)
if len(b2):
x1f = ow - b2[:, 2]
x2f = ow - b2[:, 0]
b2 = np.stack([x1f, b2[:, 1], x2f, b2[:, 3]], axis=1)
# v3: multi-scale (0.85x)
if self.use_multi_scale_tta:
b3, s3, c3 = self._forward_scaled(image, self.tta_scale)
else:
b3 = np.empty((0, 4), dtype=np.float32)
s3 = np.empty((0,), dtype=np.float32)
c3 = np.empty((0,), dtype=np.int32)
if len(b1) == 0 and len(b2) == 0 and len(b3) == 0:
return []
if self.use_consensus_tta:
if len(b1) == 0:
return []
# Per-view best-IoU helper.
def best_iou_match(box, cls, vb, vc, vs):
if len(vb) == 0:
return 0.0, 0.0
same_cls = vc == cls
if not same_cls.any():
return 0.0, 0.0
xx1 = np.maximum(box[0], vb[:, 0])
yy1 = np.maximum(box[1], vb[:, 1])
xx2 = np.minimum(box[2], vb[:, 2])
yy2 = np.minimum(box[3], vb[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
a_i = (box[2] - box[0]) * (box[3] - box[1])
a_j = (vb[:, 2] - vb[:, 0]) * (vb[:, 3] - vb[:, 1])
ious = inter / (a_i + a_j - inter + 1e-7)
ious = np.where(same_cls, ious, 0.0)
idx = int(ious.argmax())
return float(ious[idx]), float(vs[idx])
keep_b = []; keep_s = []; keep_c = []
for i in range(len(b1)):
iou_h, sc_h = best_iou_match(b1[i], c1[i], b2, c2, s2)
iou_m, sc_m = best_iou_match(b1[i], c1[i], b3, c3, s3) if len(b3) else (0.0, 0.0)
if self.require_strict_consensus and self.use_multi_scale_tta:
# Both hflip AND multi-scale must confirm.
if iou_h >= self.consensus_iou and iou_m >= self.consensus_iou:
keep_b.append(b1[i]); keep_c.append(c1[i])
keep_s.append(max(float(s1[i]), sc_h, sc_m))
else:
# ANY one other view confirming is enough.
if max(iou_h, iou_m) >= self.consensus_iou:
keep_b.append(b1[i]); keep_c.append(c1[i])
# Score = max across confirming views.
partners = [float(s1[i])]
if iou_h >= self.consensus_iou: partners.append(sc_h)
if iou_m >= self.consensus_iou: partners.append(sc_m)
keep_s.append(max(partners))
if not keep_b:
return []
boxes = np.asarray(keep_b, dtype=np.float32)
scores = np.asarray(keep_s, dtype=np.float32)
cls_ids = np.asarray(keep_c, dtype=np.int32)
keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thres)
if len(keep) == 0:
return []
keep = keep[: self.max_det]
boxes = boxes[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
boxes, scores, cls_ids = self._cross_class_dedup(
boxes, scores, cls_ids, self.cross_iou_thresh
)
return self._build_results(boxes, scores, cls_ids)
# Merge mode (fallback): union all available views.
parts_b = [b1] + ([b2] if len(b2) else []) + ([b3] if len(b3) else [])
parts_s = [s1] + ([s2] if len(b2) else []) + ([s3] if len(b3) else [])
parts_c = [c1] + ([c2] if len(b2) else []) + ([c3] if len(b3) else [])
boxes = np.concatenate(parts_b, axis=0) if len(parts_b) > 1 else b1
scores = np.concatenate(parts_s, axis=0) if len(parts_s) > 1 else s1
cls_ids = np.concatenate(parts_c, axis=0) if len(parts_c) > 1 else c1
keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thres)
if len(keep) == 0:
return []
keep = keep[: self.max_det]
boosted = self._max_score_per_cluster(boxes, scores, keep, self.iou_thres)
boxes = boxes[keep]
cls_ids = cls_ids[keep]
scores = boosted
boxes, scores, cls_ids = self._cross_class_dedup(
boxes, scores, cls_ids, self.cross_iou_thresh
)
if len(boxes) == 0:
return []
return self._build_results(boxes, scores, cls_ids)
def _build_results(
self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray
) -> list[BoundingBox]:
results: list[BoundingBox] = []
for box, conf, cls_id in zip(boxes, scores, cls_ids):
x1, y1, x2, y2 = box.tolist()
if x2 <= x1 or y2 <= y1:
continue
results.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=int(cls_id),
conf=float(conf),
)
)
return results
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for frame_number_in_batch, image in enumerate(batch_images):
if image is None or not isinstance(image, np.ndarray) or image.ndim != 3:
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=[],
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
continue
if image.dtype != np.uint8:
image = image.astype(np.uint8)
try:
if self.use_tta:
boxes = self._predict_tta(image)
else:
boxes = self._predict_single(image)
except Exception as e:
print(f"⚠️ Inference failed for frame {offset + frame_number_in_batch}: {e}")
boxes = []
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
return results