# Build: 2026-05-29 23:30 UTC R11 redeploy (force new revision) from pathlib import Path import math import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: """ONNX Runtime miner. Hard global NMS + sanity filter + dedup + flip TTA, with per-class rescue bonus.""" class_names = ["cup", "bottle", "can"] input_size = 1280 iou_thres = 0.4 cross_iou_thresh = 0.7 min_side = 8.0 min_box_area = 100.0 max_aspect_ratio = 10.0 max_det = 300 _conf_thres_array = np.array([0.6, 0.45, 0.5], dtype=np.float32) _bonus_array = np.array([0.0, 0.0, 0.2], dtype=np.float32) def __init__(self, path_hf_repo: Path) -> None: model_path = path_hf_repo / "weights.onnx" print("ORT version:", ort.__version__) try: ort.preload_dlls() print("preload_dlls success") except Exception as e: print(f"preload_dlls failed: {e}") print("ORT available providers BEFORE session:", ort.get_available_providers()) sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) print("Created ORT session with preferred CUDA provider list") except Exception as e: print(f"CUDA session creation failed, falling back to CPU: {e}") self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CPUExecutionProvider"], ) print("ORT session providers:", self.session.get_providers()) for inp in self.session.get_inputs(): print("INPUT:", inp.name, inp.shape, inp.type) for out in self.session.get_outputs(): print("OUTPUT:", out.name, out.shape, out.type) self.input_name = self.session.get_inputs()[0].name self.output_names = [output.name for output in self.session.get_outputs()] self.input_shape = self.session.get_inputs()[0].shape self.input_height = self._safe_dim(self.input_shape[2], default=self.input_size) self.input_width = self._safe_dim(self.input_shape[3], default=self.input_size) print(f"ONNX model loaded from: {model_path}") print(f"ONNX providers: {self.session.get_providers()}") print(f"ONNX input: name={self.input_name}, shape={self.input_shape}") def __repr__(self) -> str: return ( f"ONNXRuntime(session={type(self.session).__name__}, " f"providers={self.session.get_providers()})" ) @staticmethod def _safe_dim(value, default: int) -> int: return value if isinstance(value, int) and value > 0 else default def _letterbox(self, image: ndarray, new_shape: tuple[int, int], color=(114, 114, 114) ) -> tuple[ndarray, float, tuple[float, float]]: h, w = image.shape[:2] new_w, new_h = new_shape ratio = min(new_w / w, new_h / h) resized_w = int(round(w * ratio)) resized_h = int(round(h * ratio)) if (resized_w, resized_h) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR image = cv2.resize(image, (resized_w, resized_h), interpolation=interp) dw = (new_w - resized_w) / 2.0 dh = (new_h - resized_h) / 2.0 left = int(round(dw - 0.1)) right = int(round(dw + 0.1)) top = int(round(dh - 0.1)) bottom = int(round(dh + 0.1)) padded = cv2.copyMakeBorder(image, top, bottom, left, right, borderType=cv2.BORDER_CONSTANT, value=color) return padded, ratio, (dw, dh) def _preprocess(self, image: ndarray ) -> tuple[np.ndarray, float, tuple[float, float], tuple[int, int]]: orig_h, orig_w = image.shape[:2] img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(np.float32) / 255.0 img = np.transpose(img, (2, 0, 1))[None, ...] img = np.ascontiguousarray(img, dtype=np.float32) return img, ratio, pad, (orig_w, orig_h) @staticmethod def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray: w, h = image_size boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1) boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1) boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1) boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1) return boxes @staticmethod def _xywh_to_xyxy(boxes: np.ndarray) -> np.ndarray: out = np.empty_like(boxes) out[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0 out[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0 out[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0 out[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0 return out @staticmethod def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray: n = len(boxes) if n == 0: return np.array([], dtype=np.intp) order = np.argsort(-scores) keep: list[int] = [] while len(order) > 0: i = int(order[0]) keep.append(i) if len(order) == 1: break rest = order[1:] xx1 = np.maximum(boxes[i, 0], boxes[rest, 0]) yy1 = np.maximum(boxes[i, 1], boxes[rest, 1]) xx2 = np.minimum(boxes[i, 2], boxes[rest, 2]) yy2 = np.minimum(boxes[i, 3], boxes[rest, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) a_i = (max(0.0, boxes[i, 2] - boxes[i, 0]) * max(0.0, boxes[i, 3] - boxes[i, 1])) a_r = (np.maximum(0.0, boxes[rest, 2] - boxes[rest, 0]) * np.maximum(0.0, boxes[rest, 3] - boxes[rest, 1])) iou = inter / (a_i + a_r - inter + 1e-7) order = rest[iou <= iou_thresh] return np.array(keep, dtype=np.intp) def _per_class_hard_nms(self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float ) -> np.ndarray: if len(boxes) == 0: return np.array([], dtype=np.intp) all_keep: list[int] = [] for c in np.unique(cls_ids): mask = cls_ids == c indices = np.where(mask)[0] keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh) all_keep.extend(indices[keep].tolist()) all_keep.sort() return np.array(all_keep, dtype=np.intp) def _cross_class_dedup_op(self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: n = len(boxes) if n <= 1: return boxes, scores, cls_ids boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) cls_ids = np.asarray(cls_ids, dtype=np.int32) areas = (np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1])) margins = scores - self._conf_thres_array[cls_ids] order = np.lexsort((-areas, -margins)) suppressed = np.zeros(n, dtype=bool) keep: list[int] = [] for i in order: if suppressed[i]: continue keep.append(int(i)) bi = boxes[i] xx1 = np.maximum(bi[0], boxes[:, 0]) yy1 = np.maximum(bi[1], boxes[:, 1]) xx2 = np.minimum(bi[2], boxes[:, 2]) yy2 = np.minimum(bi[3], boxes[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) a_i = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) iou = inter / (a_i + areas - inter + 1e-7) dup = iou > iou_thresh dup[i] = False suppressed |= dup keep_idx = np.array(keep, dtype=np.intp) return boxes[keep_idx], scores[keep_idx], cls_ids[keep_idx] def _filter_sane_boxes(self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, orig_size: tuple[int, int] ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: if len(boxes) == 0: return boxes, scores, cls_ids orig_w, orig_h = orig_size image_area = float(orig_w * orig_h) bw = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) bh = np.maximum(0.0, boxes[:, 3] - boxes[:, 1]) area = bw * bh ar = np.where( (bw > 0) & (bh > 0), np.maximum(bw / np.maximum(bh, 1e-6), bh / np.maximum(bw, 1e-6)), np.inf, ) keep = ( (bw >= self.min_side) & (bh >= self.min_side) & (area >= self.min_box_area) & (area <= 0.95 * image_area) & (ar <= self.max_aspect_ratio) ) return boxes[keep], scores[keep], cls_ids[keep] def _max_score_per_cluster(self, post_boxes: np.ndarray, post_cls: np.ndarray, full_boxes: np.ndarray, full_scores: np.ndarray, full_cls: np.ndarray, iou_thresh: float) -> np.ndarray: n = len(post_boxes) if n == 0: return np.empty(0, dtype=np.float32) full_areas = (np.maximum(0.0, full_boxes[:, 2] - full_boxes[:, 0]) * np.maximum(0.0, full_boxes[:, 3] - full_boxes[:, 1])) out = np.empty(n, dtype=np.float32) for i in range(n): bi = post_boxes[i] xx1 = np.maximum(bi[0], full_boxes[:, 0]) yy1 = np.maximum(bi[1], full_boxes[:, 1]) xx2 = np.minimum(bi[2], full_boxes[:, 2]) yy2 = np.minimum(bi[3], full_boxes[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) a_i = max(0.0, float((bi[2] - bi[0]) * (bi[3] - bi[1]))) iou = inter / (a_i + full_areas - inter + 1e-7) cluster = (iou >= iou_thresh) & (full_cls == post_cls[i]) out[i] = float(np.max(full_scores[cluster])) if np.any(cluster) else 0.0 return out def _conf_filter_mask(self, scores: np.ndarray, cls_ids: np.ndarray) -> np.ndarray: """Boolean keep-mask: score >= per-class threshold, with a per-class rescue — if a class has zero boxes passing, admit its top-1 candidate when its score >= (per-class threshold - per-class bonus).""" if len(scores) == 0: return np.zeros(0, dtype=bool) thr = self._conf_thres_array[cls_ids] keep = scores >= thr for c in np.unique(cls_ids): b = float(self._bonus_array[c]) if b <= 0.0: continue cm = cls_ids == c if keep[cm].any(): continue idx = np.where(cm)[0] top = int(idx[int(np.argmax(scores[idx]))]) if scores[top] >= self._conf_thres_array[c] - b: keep[top] = True return keep def _per_view_pipeline(self, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, orig_size: tuple[int, int] ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: boxes, scores, cls_ids = self._filter_sane_boxes( boxes, scores, cls_ids, orig_size ) if len(boxes) == 0: return boxes, scores, cls_ids if len(boxes) > 1: keep = self._hard_nms(boxes, scores, self.iou_thres) boxes, scores, cls_ids = boxes[keep], scores[keep], cls_ids[keep] if len(scores) > self.max_det: top = np.argsort(-scores)[: self.max_det] boxes, scores, cls_ids = boxes[top], scores[top], cls_ids[top] if len(boxes) > 1: boxes, scores, cls_ids = self._cross_class_dedup_op( boxes, scores, cls_ids, self.cross_iou_thresh ) return boxes, scores, cls_ids def _decode_final_dets(self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int]) -> list[BoundingBox]: if preds.ndim == 3 and preds.shape[0] == 1: preds = preds[0] if preds.ndim != 2 or preds.shape[1] < 6: raise ValueError(f"Unexpected ONNX final-det output shape: {preds.shape}") boxes = preds[:, :4].astype(np.float32) scores = preds[:, 4].astype(np.float32) cls_ids = preds[:, 5].astype(np.int32) keep = self._conf_filter_mask(scores, cls_ids) boxes = boxes[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes) == 0: return [] pad_w, pad_h = pad boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, orig_size) boxes, scores, cls_ids = self._per_view_pipeline( boxes, scores, cls_ids, orig_size ) return self._build_results(boxes, scores, cls_ids) def _decode_raw_yolo(self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int]) -> list[BoundingBox]: if preds.ndim != 3 or preds.shape[0] != 1: raise ValueError(f"Unexpected raw ONNX output shape: {preds.shape}") preds = preds[0] if preds.shape[0] <= 16 and preds.shape[1] > preds.shape[0]: preds = preds.T if preds.ndim != 2 or preds.shape[1] < 5: raise ValueError(f"Unexpected raw output shape: {preds.shape}") boxes_xywh = preds[:, :4].astype(np.float32) cls_part = preds[:, 4:].astype(np.float32) if cls_part.shape[1] == 1: scores = cls_part[:, 0] cls_ids = np.zeros(len(scores), dtype=np.int32) else: cls_ids = np.argmax(cls_part, axis=1).astype(np.int32) scores = cls_part[np.arange(len(cls_part)), cls_ids] keep = self._conf_filter_mask(scores, cls_ids) boxes_xywh = boxes_xywh[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes_xywh) == 0: return [] boxes = self._xywh_to_xyxy(boxes_xywh) pad_w, pad_h = pad boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, orig_size) boxes, scores, cls_ids = self._per_view_pipeline( boxes, scores, cls_ids, orig_size ) return self._build_results(boxes, scores, cls_ids) @staticmethod def _build_results(boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray) -> list[BoundingBox]: results: list[BoundingBox] = [] for box, conf, cls_id in zip(boxes, scores, cls_ids): x1, y1, x2, y2 = box.tolist() if x2 <= x1 or y2 <= y1: continue results.append( BoundingBox( x1=int(math.floor(x1)), y1=int(math.floor(y1)), x2=int(math.ceil(x2)), y2=int(math.ceil(y2)), cls_id=int(cls_id), conf=float(conf), ) ) return results def _postprocess(self, output: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int]) -> list[BoundingBox]: if output.ndim == 2 and output.shape[1] >= 6: return self._decode_final_dets(output, ratio, pad, orig_size) if output.ndim == 3 and output.shape[0] == 1 and output.shape[2] == 6: return self._decode_final_dets(output, ratio, pad, orig_size) return self._decode_raw_yolo(output, ratio, pad, orig_size) def _predict_single(self, image: np.ndarray) -> list[BoundingBox]: if image is None: raise ValueError("Input image is None") if not isinstance(image, np.ndarray): raise TypeError(f"Input is not numpy array: {type(image)}") if image.ndim != 3: raise ValueError(f"Expected HWC image, got shape={image.shape}") if image.shape[2] != 3: raise ValueError(f"Expected 3 channels, got shape={image.shape}") if image.dtype != np.uint8: image = image.astype(np.uint8) input_tensor, ratio, pad, orig_size = self._preprocess(image) expected = (1, 3, self.input_height, self.input_width) if input_tensor.shape != expected: raise ValueError( f"Bad input tensor shape={input_tensor.shape}, expected={expected}" ) outputs = self.session.run(self.output_names, {self.input_name: input_tensor}) return self._postprocess(outputs[0], ratio, pad, orig_size) def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]: boxes_orig = self._predict_single(image) flipped = cv2.flip(image, 1) boxes_flip = self._predict_single(flipped) w = image.shape[1] boxes_flip = [ BoundingBox( x1=w - b.x2, y1=b.y1, x2=w - b.x1, y2=b.y2, cls_id=b.cls_id, conf=b.conf, ) for b in boxes_flip ] all_boxes = boxes_orig + boxes_flip if not all_boxes: return [] coords = np.array( [[b.x1, b.y1, b.x2, b.y2] for b in all_boxes], dtype=np.float32 ) scores = np.array([b.conf for b in all_boxes], dtype=np.float32) cls_ids = np.array([b.cls_id for b in all_boxes], dtype=np.int32) hard_keep = self._per_class_hard_nms(coords, scores, cls_ids, self.iou_thres) if len(hard_keep) == 0: return [] if len(hard_keep) > self.max_det: top = np.argsort(-scores[hard_keep])[: self.max_det] hard_keep = hard_keep[top] boosted = self._max_score_per_cluster( coords[hard_keep], cls_ids[hard_keep], coords, scores, cls_ids, self.iou_thres, ) kept_coords = coords[hard_keep] kept_cls = cls_ids[hard_keep] if len(kept_coords) > 1: kept_coords, boosted, kept_cls = self._cross_class_dedup_op( kept_coords, boosted, kept_cls, self.cross_iou_thresh ) return [ BoundingBox( x1=int(math.floor(kept_coords[j, 0])), y1=int(math.floor(kept_coords[j, 1])), x2=int(math.ceil(kept_coords[j, 2])), y2=int(math.ceil(kept_coords[j, 3])), cls_id=int(kept_cls[j]), conf=float(boosted[j]), ) for j in range(len(kept_coords)) ] def predict_batch(self, batch_images: list[ndarray], offset: int, n_keypoints: int) -> list[TVFrameResult]: results: list[TVFrameResult] = [] for frame_number_in_batch, image in enumerate(batch_images): try: boxes = self._predict_tta(image) except Exception as e: print(f"Inference failed for frame {offset + frame_number_in_batch}: {e}") boxes = [] results.append( TVFrameResult( frame_id=offset + frame_number_in_batch, boxes=boxes, keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], ) ) return results