"""TurboVision beverage detection miner — score-beverage-v3. YOLO11s @ 1280x1280, 3-class beverage detection (bottle/can/cup), ONNX with end-to-end NMS baked in (output [1, 300, 6] = x1, y1, x2, y2, conf, cls). Inference pipeline (v3): 1) Primary forward pass on the full image. 2) Hflip TTA: forward on horizontally-flipped image, transform boxes back. 3) Per-class hard-NMS to merge primary + flip outputs. 4) Cross-class IoU dedup (suppresses same physical object getting two class labels). 5) Consensus-confidence boost: when both views agree on a cluster, take the max score so true-positives rank higher in the validator's PR curve. 6) Sanity filter (min size, aspect ratio). """ from pathlib import Path import math import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: def __init__(self, path_hf_repo: Path) -> None: model_path = path_hf_repo / "weights.onnx" # OUTPUT order: must match the validator manifest's `objects` list # exactly so cls_id ints we emit map to the correct GT classes. # manifest: ["cup", "bottle", "can"] -> cup=0, bottle=1, can=2. cn_path = model_path.with_name("class_names.txt") if cn_path.is_file(): self.class_names = [ ln.strip() for ln in cn_path.read_text(encoding="utf-8").splitlines() if ln.strip() and not ln.strip().startswith("#") ] else: self.class_names = ["cup", "bottle", "can"] # Order our weights.onnx was TRAINED in (per v2_dataset/data.yaml on the # training box: 0=bottle, 1=can, 2=cup). cls_remap maps the model's raw # output cls_id -> our class_names index (= validator manifest cls_id). model_class_order = ["bottle", "can", "cup"] self.cls_remap = np.array( [self.class_names.index(n) for n in model_class_order], dtype=np.int32 ) print("ORT version:", ort.__version__) try: ort.preload_dlls() print("✅ onnxruntime.preload_dlls() success") except Exception as e: print(f"⚠️ preload_dlls failed: {e}") print("ORT available providers BEFORE session:", ort.get_available_providers()) sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) print("✅ Created ORT session with preferred CUDA provider list") except Exception as e: print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}") self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CPUExecutionProvider"], ) print("ORT session providers:", self.session.get_providers()) inp = self.session.get_inputs()[0] self.input_name = inp.name self.output_names = [o.name for o in self.session.get_outputs()] self.input_shape = inp.shape self.input_dtype = np.float16 if "float16" in inp.type else np.float32 self.input_height = self._safe_dim(self.input_shape[2], default=1280) self.input_width = self._safe_dim(self.input_shape[3], default=1280) # Tuned on local benchmark vs rival-proxy GT (5/2/2026): # V3 = consensus filter + hflip TTA. Multi-scale, cross-class tighter, # strict-consensus-across-multi-views all tested and either hurt or # matched. V3 is the local optimum. self.conf_thres = 0.40 self.iou_thres = 0.5 self.cross_iou_thresh = 0.7 self.max_det = 300 self.use_tta = True # Consensus TTA — our edge. None of the top miners (5FBnd/5CiAr/5CtY4) # do this; they keep all-view union and only boost cluster scores. self.use_consensus_tta = True self.consensus_iou = 0.5 self.require_strict_consensus = False # Multi-scale tested + abandoned: it loosened consensus and hurt FP # suppression more than it helped recall on rival-proxy GT. self.use_multi_scale_tta = False self.tta_scale = 0.85 # Sanity filter — reject obviously bad boxes self.min_box_area = 6 * 6 self.min_side = 4 self.max_aspect_ratio = 8.0 self.max_box_area_ratio = 0.95 print(f"✅ ONNX loaded: {model_path}") print(f"✅ providers: {self.session.get_providers()}") print(f"✅ input: name={self.input_name}, shape={self.input_shape}, dtype={self.input_dtype}") print(f"✅ classes: {self.class_names}") print(f"✅ config: conf={self.conf_thres}, iou={self.iou_thres}, " f"cross_iou={self.cross_iou_thresh}, TTA={self.use_tta}") def __repr__(self) -> str: return ( f"ONNXRuntime(session={type(self.session).__name__}, " f"providers={self.session.get_providers()})" ) @staticmethod def _safe_dim(value, default: int) -> int: return value if isinstance(value, int) and value > 0 else default def _letterbox( self, image: ndarray, new_shape: tuple[int, int], color=(114, 114, 114), ) -> tuple[ndarray, float, tuple[float, float]]: h, w = image.shape[:2] new_w, new_h = new_shape ratio = min(new_w / w, new_h / h) resized_w = int(round(w * ratio)) resized_h = int(round(h * ratio)) if (resized_w, resized_h) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR image = cv2.resize(image, (resized_w, resized_h), interpolation=interp) dw = (new_w - resized_w) / 2.0 dh = (new_h - resized_h) / 2.0 left = int(round(dw - 0.1)) right = int(round(dw + 0.1)) top = int(round(dh - 0.1)) bottom = int(round(dh + 0.1)) padded = cv2.copyMakeBorder( image, top, bottom, left, right, borderType=cv2.BORDER_CONSTANT, value=color, ) return padded, ratio, (dw, dh) def _preprocess(self, image: ndarray): orig_h, orig_w = image.shape[:2] img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(self.input_dtype) / 255.0 img = np.transpose(img, (2, 0, 1))[None, ...] img = np.ascontiguousarray(img) return img, ratio, pad, (orig_w, orig_h) @staticmethod def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray: w, h = image_size boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1) boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1) boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1) boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1) return boxes def _filter_sane_boxes( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, orig_size: tuple[int, int], ): if len(boxes) == 0: return boxes, scores, cls_ids orig_w, orig_h = orig_size image_area = float(orig_w * orig_h) keep = [] for i, box in enumerate(boxes): x1, y1, x2, y2 = box.tolist() bw = x2 - x1 bh = y2 - y1 if bw <= 0 or bh <= 0: continue if bw < self.min_side or bh < self.min_side: continue area = bw * bh if area < self.min_box_area: continue if area > self.max_box_area_ratio * image_area: continue ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6)) if ar > self.max_aspect_ratio: continue keep.append(i) if not keep: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.float32), np.empty((0,), dtype=np.int32), ) k = np.array(keep, dtype=np.intp) return boxes[k], scores[k], cls_ids[k] @staticmethod def _hard_nms( boxes: np.ndarray, scores: np.ndarray, iou_thresh: float, ) -> np.ndarray: N = len(boxes) if N == 0: return np.array([], dtype=np.intp) boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) order = np.argsort(scores)[::-1] keep: list[int] = [] suppressed = np.zeros(N, dtype=bool) for i in range(N): idx = order[i] if suppressed[idx]: continue keep.append(int(idx)) bi = boxes[idx] for k in range(i + 1, N): jdx = order[k] if suppressed[jdx]: continue bj = boxes[jdx] xx1 = max(bi[0], bj[0]) yy1 = max(bi[1], bj[1]) xx2 = min(bi[2], bj[2]) yy2 = min(bi[3], bj[3]) inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1) area_i = (bi[2] - bi[0]) * (bi[3] - bi[1]) area_j = (bj[2] - bj[0]) * (bj[3] - bj[1]) iou = inter / (area_i + area_j - inter + 1e-7) if iou > iou_thresh: suppressed[jdx] = True return np.array(keep, dtype=np.intp) def _per_class_hard_nms( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float, ) -> np.ndarray: if len(boxes) == 0: return np.array([], dtype=np.intp) all_keep: list[int] = [] for c in np.unique(cls_ids): mask = cls_ids == c indices = np.where(mask)[0] keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) all_keep.extend(indices[keep].tolist()) all_keep.sort() return np.array(all_keep, dtype=np.intp) @staticmethod def _cross_class_dedup( boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: n = len(boxes) if n <= 1: return boxes, scores, cls_ids boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) cls_ids = np.asarray(cls_ids, dtype=np.int32) areas = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum( 0.0, boxes[:, 3] - boxes[:, 1] ) # Keep larger boxes first, then higher score. order = np.lexsort((-scores, -areas)) suppressed = np.zeros(n, dtype=bool) keep: list[int] = [] for i in order: if suppressed[i]: continue keep.append(int(i)) bi = boxes[i] xx1 = np.maximum(bi[0], boxes[:, 0]) yy1 = np.maximum(bi[1], boxes[:, 1]) xx2 = np.minimum(bi[2], boxes[:, 2]) yy2 = np.minimum(bi[3], boxes[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) union = area_i + areas - inter + 1e-7 iou = inter / union dup = iou > iou_thresh dup[i] = False suppressed |= dup keep_idx = np.array(keep, dtype=np.intp) return boxes[keep_idx], scores[keep_idx], cls_ids[keep_idx] @staticmethod def _max_score_per_cluster( coords: np.ndarray, scores: np.ndarray, keep_indices: np.ndarray, iou_thresh: float, ) -> np.ndarray: n_keep = len(keep_indices) if n_keep == 0: return np.array([], dtype=np.float32) coords = np.asarray(coords, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) out = np.empty(n_keep, dtype=np.float32) for i in range(n_keep): idx = keep_indices[i] bi = coords[idx] xx1 = np.maximum(bi[0], coords[:, 0]) yy1 = np.maximum(bi[1], coords[:, 1]) xx2 = np.minimum(bi[2], coords[:, 2]) yy2 = np.minimum(bi[3], coords[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = (bi[2] - bi[0]) * (bi[3] - bi[1]) areas_j = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1]) iou = inter / (area_i + areas_j - inter + 1e-7) in_cluster = iou >= iou_thresh out[i] = float(np.max(scores[in_cluster])) return out def _decode_raw_dets( self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Decode end2end NMS output and return (boxes, scores, cls_ids) in original image coordinates, after conf-threshold + remap + letterbox-reverse + sanity.""" if preds.ndim == 3 and preds.shape[0] == 1: preds = preds[0] if preds.ndim != 2 or preds.shape[1] < 6: raise ValueError(f"Unexpected ONNX output shape: {preds.shape}") boxes = preds[:, :4].astype(np.float32) scores = preds[:, 4].astype(np.float32) cls_ids = preds[:, 5].astype(np.int32) valid = (cls_ids >= 0) & (cls_ids < len(self.cls_remap)) boxes, scores, cls_ids = boxes[valid], scores[valid], cls_ids[valid] cls_ids = self.cls_remap[cls_ids] keep = scores >= self.conf_thres boxes = boxes[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes) == 0: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.float32), np.empty((0,), dtype=np.int32), ) pad_w, pad_h = pad orig_w, orig_h = orig_size boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, (orig_w, orig_h)) boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size) return boxes, scores, cls_ids def _forward( self, image: np.ndarray ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: x, ratio, pad, orig_size = self._preprocess(image) out = self.session.run(self.output_names, {self.input_name: x})[0] return self._decode_raw_dets(out, ratio, pad, orig_size) def _predict_single(self, image: np.ndarray) -> list[BoundingBox]: boxes, scores, cls_ids = self._forward(image) if len(boxes) == 0: return [] return self._build_results(boxes, scores, cls_ids) def _forward_scaled(self, image: np.ndarray, scale: float): """Forward pass on a scale-augmented image; transform boxes back to original coords.""" if scale == 1.0: return self._forward(image) h, w = image.shape[:2] nh, nw = int(round(h * scale)), int(round(w * scale)) scaled = cv2.resize(image, (nw, nh), interpolation=cv2.INTER_CUBIC if scale > 1.0 else cv2.INTER_LINEAR) b, s, c = self._forward(scaled) if len(b): b = b / scale # scale boxes back to original image coords b = self._clip_boxes(b, (w, h)) return b, s, c def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]: """Multi-view TTA with consensus filter. Views (configurable): v1 = primary forward (1.0x) v2 = horizontal flip v3 = downscaled forward (tta_scale, e.g. 0.85x) — catches small objects Consensus filter (use_consensus_tta=True): A box from v1 is kept iff it is confirmed by ≥1 OTHER view (v2 or v3) at IoU >= consensus_iou with same class. Score = max across confirming views. None of the top miners do this — this is our edge. Merge (use_consensus_tta=False, fallback): union all views, per-class hard-NMS, max-score boost on clusters. """ ow = image.shape[1] # v1: primary b1, s1, c1 = self._forward(image) # v2: hflip flipped = cv2.flip(image, 1) b2, s2, c2 = self._forward(flipped) if len(b2): x1f = ow - b2[:, 2] x2f = ow - b2[:, 0] b2 = np.stack([x1f, b2[:, 1], x2f, b2[:, 3]], axis=1) # v3: multi-scale (0.85x) if self.use_multi_scale_tta: b3, s3, c3 = self._forward_scaled(image, self.tta_scale) else: b3 = np.empty((0, 4), dtype=np.float32) s3 = np.empty((0,), dtype=np.float32) c3 = np.empty((0,), dtype=np.int32) if len(b1) == 0 and len(b2) == 0 and len(b3) == 0: return [] if self.use_consensus_tta: if len(b1) == 0: return [] # Per-view best-IoU helper. def best_iou_match(box, cls, vb, vc, vs): if len(vb) == 0: return 0.0, 0.0 same_cls = vc == cls if not same_cls.any(): return 0.0, 0.0 xx1 = np.maximum(box[0], vb[:, 0]) yy1 = np.maximum(box[1], vb[:, 1]) xx2 = np.minimum(box[2], vb[:, 2]) yy2 = np.minimum(box[3], vb[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) a_i = (box[2] - box[0]) * (box[3] - box[1]) a_j = (vb[:, 2] - vb[:, 0]) * (vb[:, 3] - vb[:, 1]) ious = inter / (a_i + a_j - inter + 1e-7) ious = np.where(same_cls, ious, 0.0) idx = int(ious.argmax()) return float(ious[idx]), float(vs[idx]) keep_b = []; keep_s = []; keep_c = [] for i in range(len(b1)): iou_h, sc_h = best_iou_match(b1[i], c1[i], b2, c2, s2) iou_m, sc_m = best_iou_match(b1[i], c1[i], b3, c3, s3) if len(b3) else (0.0, 0.0) if self.require_strict_consensus and self.use_multi_scale_tta: # Both hflip AND multi-scale must confirm. if iou_h >= self.consensus_iou and iou_m >= self.consensus_iou: keep_b.append(b1[i]); keep_c.append(c1[i]) keep_s.append(max(float(s1[i]), sc_h, sc_m)) else: # ANY one other view confirming is enough. if max(iou_h, iou_m) >= self.consensus_iou: keep_b.append(b1[i]); keep_c.append(c1[i]) # Score = max across confirming views. partners = [float(s1[i])] if iou_h >= self.consensus_iou: partners.append(sc_h) if iou_m >= self.consensus_iou: partners.append(sc_m) keep_s.append(max(partners)) if not keep_b: return [] boxes = np.asarray(keep_b, dtype=np.float32) scores = np.asarray(keep_s, dtype=np.float32) cls_ids = np.asarray(keep_c, dtype=np.int32) keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thres) if len(keep) == 0: return [] keep = keep[: self.max_det] boxes = boxes[keep] scores = scores[keep] cls_ids = cls_ids[keep] boxes, scores, cls_ids = self._cross_class_dedup( boxes, scores, cls_ids, self.cross_iou_thresh ) return self._build_results(boxes, scores, cls_ids) # Merge mode (fallback): union all available views. parts_b = [b1] + ([b2] if len(b2) else []) + ([b3] if len(b3) else []) parts_s = [s1] + ([s2] if len(b2) else []) + ([s3] if len(b3) else []) parts_c = [c1] + ([c2] if len(b2) else []) + ([c3] if len(b3) else []) boxes = np.concatenate(parts_b, axis=0) if len(parts_b) > 1 else b1 scores = np.concatenate(parts_s, axis=0) if len(parts_s) > 1 else s1 cls_ids = np.concatenate(parts_c, axis=0) if len(parts_c) > 1 else c1 keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thres) if len(keep) == 0: return [] keep = keep[: self.max_det] boosted = self._max_score_per_cluster(boxes, scores, keep, self.iou_thres) boxes = boxes[keep] cls_ids = cls_ids[keep] scores = boosted boxes, scores, cls_ids = self._cross_class_dedup( boxes, scores, cls_ids, self.cross_iou_thresh ) if len(boxes) == 0: return [] return self._build_results(boxes, scores, cls_ids) def _build_results( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray ) -> list[BoundingBox]: results: list[BoundingBox] = [] for box, conf, cls_id in zip(boxes, scores, cls_ids): x1, y1, x2, y2 = box.tolist() if x2 <= x1 or y2 <= y1: continue results.append( BoundingBox( x1=int(math.floor(x1)), y1=int(math.floor(y1)), x2=int(math.ceil(x2)), y2=int(math.ceil(y2)), cls_id=int(cls_id), conf=float(conf), ) ) return results def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: results: list[TVFrameResult] = [] for frame_number_in_batch, image in enumerate(batch_images): if image is None or not isinstance(image, np.ndarray) or image.ndim != 3: results.append( TVFrameResult( frame_id=offset + frame_number_in_batch, boxes=[], keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], ) ) continue if image.dtype != np.uint8: image = image.astype(np.uint8) try: if self.use_tta: boxes = self._predict_tta(image) else: boxes = self._predict_single(image) except Exception as e: print(f"⚠️ Inference failed for frame {offset + frame_number_in_batch}: {e}") boxes = [] results.append( TVFrameResult( frame_id=offset + frame_number_in_batch, boxes=boxes, keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], ) ) return results