from pathlib import Path import math import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: """ Auto-generated by subnet_bridge from a Manako element repo. This miner is intentionally self-contained for chute import restrictions. """ def __init__(self, path_hf_repo: Path) -> None: self.path_hf_repo = path_hf_repo self.class_names = ['bus', 'car', 'motorcycle', 'truck', 'van'] self.session = ort.InferenceSession( str(path_hf_repo / "weights.onnx"), providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) self.input_name = self.session.get_inputs()[0].name input_shape = self.session.get_inputs()[0].shape # expected [N, C, H, W] self.input_h = int(input_shape[2]) self.input_w = int(input_shape[3]) self.conf_threshold = 0.25 self.iou_threshold = 0.45 def __repr__(self) -> str: return f"ONNX Miner session={type(self.session).__name__} classes={len(self.class_names)}" def _preprocess(self, image_bgr: ndarray) -> tuple[np.ndarray, tuple[int, int]]: h, w = image_bgr.shape[:2] rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) resized = cv2.resize(rgb, (self.input_w, self.input_h)) x = resized.astype(np.float32) / 255.0 x = np.transpose(x, (2, 0, 1))[None, ...] return x, (h, w) def _normalize_predictions(self, raw: np.ndarray) -> np.ndarray: # Common ultralytics export shapes: # - [1, C, N] where C=4+num_classes # - [1, N, C] pred = raw[0] if pred.ndim != 2: raise ValueError(f"Unexpected prediction shape: {raw.shape}") if pred.shape[0] < pred.shape[1]: pred = pred.transpose(1, 0) return pred def _nms(self, dets: list[tuple[float, float, float, float, float, int]]) -> list[tuple[float, float, float, float, float, int]]: if not dets: return [] boxes = np.array([[d[0], d[1], d[2], d[3]] for d in dets], dtype=np.float32) scores = np.array([d[4] for d in dets], dtype=np.float32) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0]) yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1]) xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2]) yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3]) w = np.maximum(0.0, xx2 - xx1) h = np.maximum(0.0, yy2 - yy1) inter = w * h area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) area_rest = (boxes[order[1:], 2] - boxes[order[1:], 0]) * (boxes[order[1:], 3] - boxes[order[1:], 1]) union = np.maximum(area_i + area_rest - inter, 1e-6) iou = inter / union remaining = np.where(iou <= self.iou_threshold)[0] order = order[remaining + 1] return [dets[idx] for idx in keep] def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]: inp, (orig_h, orig_w) = self._preprocess(image_bgr) out = self.session.run(None, {self.input_name: inp})[0] pred = self._normalize_predictions(out) if pred.shape[1] < 5: return [] boxes = pred[:, :4] cls_scores = pred[:, 4:] if cls_scores.shape[1] == 0: return [] cls_ids = np.argmax(cls_scores, axis=1) confs = np.max(cls_scores, axis=1) keep = confs >= self.conf_threshold boxes = boxes[keep] confs = confs[keep] cls_ids = cls_ids[keep] if boxes.shape[0] == 0: return [] sx = orig_w / float(self.input_w) sy = orig_h / float(self.input_h) dets: list[tuple[float, float, float, float, float, int]] = [] for i in range(boxes.shape[0]): cx, cy, bw, bh = boxes[i].tolist() x1 = (cx - bw / 2.0) * sx y1 = (cy - bh / 2.0) * sy x2 = (cx + bw / 2.0) * sx y2 = (cy + bh / 2.0) * sy dets.append((x1, y1, x2, y2, float(confs[i]), int(cls_ids[i]))) dets = self._nms(dets) out_boxes: list[BoundingBox] = [] for x1, y1, x2, y2, conf, cls_id in dets: ix1 = max(0, min(orig_w, math.floor(x1))) iy1 = max(0, min(orig_h, math.floor(y1))) ix2 = max(0, min(orig_w, math.ceil(x2))) iy2 = max(0, min(orig_h, math.ceil(y2))) out_boxes.append( BoundingBox( x1=ix1, y1=iy1, x2=ix2, y2=iy2, cls_id=cls_id, conf=max(0.0, min(1.0, conf)), ) ) return out_boxes def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: results: list[TVFrameResult] = [] for idx, image in enumerate(batch_images): boxes = self._infer_single(image) keypoints = [(0, 0) for _ in range(max(0, int(n_keypoints)))] results.append( TVFrameResult( frame_id=offset + idx, boxes=boxes, keypoints=keypoints, ) ) return results