"""TurboVision beverage detection miner — score-beverage-v3. YOLO11s @ 1280x1280, 3-class beverage detection (bottle/can/cup), ONNX with end-to-end NMS baked in (output [1, 300, 6] = x1, y1, x2, y2, conf, cls). Inference pipeline (v3): 1) Primary forward pass on the full image. 2) Hflip TTA: forward on horizontally-flipped image, transform boxes back. 3) Per-class hard-NMS to merge primary + flip outputs. 4) Cross-class IoU dedup (suppresses same physical object getting two class labels). 5) Consensus-confidence boost: when both views agree on a cluster, take the max score so true-positives rank higher in the validator's PR curve. 6) Sanity filter (min size, aspect ratio). """ from pathlib import Path import math import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: def __init__(self, path_hf_repo: Path) -> None: model_path = path_hf_repo / "weights.onnx" cn_path = model_path.with_name("class_names.txt") if cn_path.is_file(): self.class_names = [ ln.strip() for ln in cn_path.read_text(encoding="utf-8").splitlines() if ln.strip() and not ln.strip().startswith("#") ] else: self.class_names = ["cup", "bottle", "can"] self.cls_remap = np.arange(len(self.class_names), dtype=np.int32) print("ORT version:", ort.__version__) try: ort.preload_dlls() print("✅ onnxruntime.preload_dlls() success") except Exception as e: print(f"⚠️ preload_dlls failed: {e}") print("ORT available providers BEFORE session:", ort.get_available_providers()) sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) print("✅ Created ORT session with preferred CUDA provider list") except Exception as e: print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}") self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CPUExecutionProvider"], ) print("ORT session providers:", self.session.get_providers()) inp = self.session.get_inputs()[0] self.input_name = inp.name self.output_names = [o.name for o in self.session.get_outputs()] self.input_shape = inp.shape self.input_dtype = np.float16 if "float16" in inp.type else np.float32 self.input_height = self._safe_dim(self.input_shape[2], default=1280) self.input_width = self._safe_dim(self.input_shape[3], default=1280) self.conf_thres = 0.20 self.iou_thres = 0.5 self.cross_iou_thresh = 0.7 self.max_det = 300 self.use_tta = True # Sanity filter — reject obviously bad boxes self.min_box_area = 6 * 6 self.min_side = 4 self.max_aspect_ratio = 8.0 self.max_box_area_ratio = 0.95 print(f"✅ ONNX loaded: {model_path}") print(f"✅ providers: {self.session.get_providers()}") print(f"✅ input: name={self.input_name}, shape={self.input_shape}, dtype={self.input_dtype}") print(f"✅ classes: {self.class_names}") print(f"✅ config: conf={self.conf_thres}, iou={self.iou_thres}, " f"cross_iou={self.cross_iou_thresh}, TTA={self.use_tta}") def __repr__(self) -> str: return ( f"ONNXRuntime(session={type(self.session).__name__}, " f"providers={self.session.get_providers()})" ) @staticmethod def _safe_dim(value, default: int) -> int: return value if isinstance(value, int) and value > 0 else default def _letterbox( self, image: ndarray, new_shape: tuple[int, int], color=(114, 114, 114), ) -> tuple[ndarray, float, tuple[float, float]]: h, w = image.shape[:2] new_w, new_h = new_shape ratio = min(new_w / w, new_h / h) resized_w = int(round(w * ratio)) resized_h = int(round(h * ratio)) if (resized_w, resized_h) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR image = cv2.resize(image, (resized_w, resized_h), interpolation=interp) dw = (new_w - resized_w) / 2.0 dh = (new_h - resized_h) / 2.0 left = int(round(dw - 0.1)) right = int(round(dw + 0.1)) top = int(round(dh - 0.1)) bottom = int(round(dh + 0.1)) padded = cv2.copyMakeBorder( image, top, bottom, left, right, borderType=cv2.BORDER_CONSTANT, value=color, ) return padded, ratio, (dw, dh) def _preprocess(self, image: ndarray): orig_h, orig_w = image.shape[:2] img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(self.input_dtype) / 255.0 img = np.transpose(img, (2, 0, 1))[None, ...] img = np.ascontiguousarray(img) return img, ratio, pad, (orig_w, orig_h) @staticmethod def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray: w, h = image_size boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1) boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1) boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1) boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1) return boxes def _filter_sane_boxes( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, orig_size: tuple[int, int], ): if len(boxes) == 0: return boxes, scores, cls_ids orig_w, orig_h = orig_size image_area = float(orig_w * orig_h) keep = [] for i, box in enumerate(boxes): x1, y1, x2, y2 = box.tolist() bw = x2 - x1 bh = y2 - y1 if bw <= 0 or bh <= 0: continue if bw < self.min_side or bh < self.min_side: continue area = bw * bh if area < self.min_box_area: continue if area > self.max_box_area_ratio * image_area: continue ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6)) if ar > self.max_aspect_ratio: continue keep.append(i) if not keep: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.float32), np.empty((0,), dtype=np.int32), ) k = np.array(keep, dtype=np.intp) return boxes[k], scores[k], cls_ids[k] @staticmethod def _hard_nms( boxes: np.ndarray, scores: np.ndarray, iou_thresh: float, ) -> np.ndarray: N = len(boxes) if N == 0: return np.array([], dtype=np.intp) boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) order = np.argsort(scores)[::-1] keep: list[int] = [] suppressed = np.zeros(N, dtype=bool) for i in range(N): idx = order[i] if suppressed[idx]: continue keep.append(int(idx)) bi = boxes[idx] for k in range(i + 1, N): jdx = order[k] if suppressed[jdx]: continue bj = boxes[jdx] xx1 = max(bi[0], bj[0]) yy1 = max(bi[1], bj[1]) xx2 = min(bi[2], bj[2]) yy2 = min(bi[3], bj[3]) inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1) area_i = (bi[2] - bi[0]) * (bi[3] - bi[1]) area_j = (bj[2] - bj[0]) * (bj[3] - bj[1]) iou = inter / (area_i + area_j - inter + 1e-7) if iou > iou_thresh: suppressed[jdx] = True return np.array(keep, dtype=np.intp) def _per_class_hard_nms( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float, ) -> np.ndarray: if len(boxes) == 0: return np.array([], dtype=np.intp) all_keep: list[int] = [] for c in np.unique(cls_ids): mask = cls_ids == c indices = np.where(mask)[0] keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) all_keep.extend(indices[keep].tolist()) all_keep.sort() return np.array(all_keep, dtype=np.intp) @staticmethod def _cross_class_dedup( boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: n = len(boxes) if n <= 1: return boxes, scores, cls_ids boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) cls_ids = np.asarray(cls_ids, dtype=np.int32) areas = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum( 0.0, boxes[:, 3] - boxes[:, 1] ) # Keep larger boxes first, then higher score. order = np.lexsort((-scores, -areas)) suppressed = np.zeros(n, dtype=bool) keep: list[int] = [] for i in order: if suppressed[i]: continue keep.append(int(i)) bi = boxes[i] xx1 = np.maximum(bi[0], boxes[:, 0]) yy1 = np.maximum(bi[1], boxes[:, 1]) xx2 = np.minimum(bi[2], boxes[:, 2]) yy2 = np.minimum(bi[3], boxes[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) union = area_i + areas - inter + 1e-7 iou = inter / union dup = iou > iou_thresh dup[i] = False suppressed |= dup keep_idx = np.array(keep, dtype=np.intp) return boxes[keep_idx], scores[keep_idx], cls_ids[keep_idx] @staticmethod def _max_score_per_cluster( coords: np.ndarray, scores: np.ndarray, keep_indices: np.ndarray, iou_thresh: float, ) -> np.ndarray: n_keep = len(keep_indices) if n_keep == 0: return np.array([], dtype=np.float32) coords = np.asarray(coords, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) out = np.empty(n_keep, dtype=np.float32) for i in range(n_keep): idx = keep_indices[i] bi = coords[idx] xx1 = np.maximum(bi[0], coords[:, 0]) yy1 = np.maximum(bi[1], coords[:, 1]) xx2 = np.minimum(bi[2], coords[:, 2]) yy2 = np.minimum(bi[3], coords[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = (bi[2] - bi[0]) * (bi[3] - bi[1]) areas_j = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1]) iou = inter / (area_i + areas_j - inter + 1e-7) in_cluster = iou >= iou_thresh out[i] = float(np.max(scores[in_cluster])) return out def _decode_raw_dets( self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Decode end2end NMS output and return (boxes, scores, cls_ids) in original image coordinates, after conf-threshold + remap + letterbox-reverse + sanity.""" if preds.ndim == 3 and preds.shape[0] == 1: preds = preds[0] if preds.ndim != 2 or preds.shape[1] < 6: raise ValueError(f"Unexpected ONNX output shape: {preds.shape}") boxes = preds[:, :4].astype(np.float32) scores = preds[:, 4].astype(np.float32) cls_ids = preds[:, 5].astype(np.int32) valid = (cls_ids >= 0) & (cls_ids < len(self.cls_remap)) boxes, scores, cls_ids = boxes[valid], scores[valid], cls_ids[valid] cls_ids = self.cls_remap[cls_ids] keep = scores >= self.conf_thres boxes = boxes[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes) == 0: return ( np.empty((0, 4), dtype=np.float32), np.empty((0,), dtype=np.float32), np.empty((0,), dtype=np.int32), ) pad_w, pad_h = pad orig_w, orig_h = orig_size boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, (orig_w, orig_h)) boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size) return boxes, scores, cls_ids def _forward( self, image: np.ndarray ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: x, ratio, pad, orig_size = self._preprocess(image) out = self.session.run(self.output_names, {self.input_name: x})[0] return self._decode_raw_dets(out, ratio, pad, orig_size) def _predict_single(self, image: np.ndarray) -> list[BoundingBox]: boxes, scores, cls_ids = self._forward(image) if len(boxes) == 0: return [] return self._build_results(boxes, scores, cls_ids) def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]: """Hflip TTA: merge primary + flipped via per-class hard-NMS, then cross-class dedup, with consensus-confidence boost.""" ow = image.shape[1] b1, s1, c1 = self._forward(image) flipped = cv2.flip(image, 1) b2, s2, c2 = self._forward(flipped) if len(b2): x1f = ow - b2[:, 2] x2f = ow - b2[:, 0] b2 = np.stack([x1f, b2[:, 1], x2f, b2[:, 3]], axis=1) if len(b1) == 0 and len(b2) == 0: return [] boxes = np.concatenate([b1, b2], axis=0) if len(b2) else b1 scores = np.concatenate([s1, s2], axis=0) if len(b2) else s1 cls_ids = np.concatenate([c1, c2], axis=0) if len(b2) else c1 keep = self._per_class_hard_nms(boxes, scores, cls_ids, self.iou_thres) if len(keep) == 0: return [] keep = keep[: self.max_det] # Consensus-confidence boost: cluster by IoU and take max score. boosted = self._max_score_per_cluster(boxes, scores, keep, self.iou_thres) boxes = boxes[keep] cls_ids = cls_ids[keep] scores = boosted boxes, scores, cls_ids = self._cross_class_dedup( boxes, scores, cls_ids, self.cross_iou_thresh ) if len(boxes) == 0: return [] return self._build_results(boxes, scores, cls_ids) def _build_results( self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray ) -> list[BoundingBox]: results: list[BoundingBox] = [] for box, conf, cls_id in zip(boxes, scores, cls_ids): x1, y1, x2, y2 = box.tolist() if x2 <= x1 or y2 <= y1: continue results.append( BoundingBox( x1=int(math.floor(x1)), y1=int(math.floor(y1)), x2=int(math.ceil(x2)), y2=int(math.ceil(y2)), cls_id=int(cls_id), conf=float(conf), ) ) return results def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: results: list[TVFrameResult] = [] for frame_number_in_batch, image in enumerate(batch_images): if image is None or not isinstance(image, np.ndarray) or image.ndim != 3: results.append( TVFrameResult( frame_id=offset + frame_number_in_batch, boxes=[], keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], ) ) continue if image.dtype != np.uint8: image = image.astype(np.uint8) try: if self.use_tta: boxes = self._predict_tta(image) else: boxes = self._predict_single(image) except Exception as e: print(f"⚠️ Inference failed for frame {offset + frame_number_in_batch}: {e}") boxes = [] results.append( TVFrameResult( frame_id=offset + frame_number_in_batch, boxes=boxes, keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], ) ) return results