| from pathlib import Path |
| import math |
|
|
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| from numpy import ndarray |
| from pydantic import BaseModel |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
|
|
| class Miner: |
| """ |
| Auto-generated by subnet_bridge from a Manako element repo. |
| This miner is intentionally self-contained for chute import restrictions. |
| """ |
|
|
| def __init__(self, path_hf_repo: Path) -> None: |
| self.path_hf_repo = path_hf_repo |
| self.class_names = ['bus', 'car', 'motorcycle', 'truck', 'van'] |
| self.session = ort.InferenceSession( |
| str(path_hf_repo / "weights.onnx"), |
| providers=["CUDAExecutionProvider", "CPUExecutionProvider"], |
| ) |
| self.input_name = self.session.get_inputs()[0].name |
| input_shape = self.session.get_inputs()[0].shape |
| |
| self.input_h = int(input_shape[2]) |
| self.input_w = int(input_shape[3]) |
| self.conf_threshold = 0.25 |
| self.iou_threshold = 0.45 |
|
|
| def __repr__(self) -> str: |
| return f"ONNX Miner session={type(self.session).__name__} classes={len(self.class_names)}" |
|
|
| def _preprocess(self, image_bgr: ndarray) -> tuple[np.ndarray, tuple[int, int]]: |
| h, w = image_bgr.shape[:2] |
| rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) |
| resized = cv2.resize(rgb, (self.input_w, self.input_h)) |
| x = resized.astype(np.float32) / 255.0 |
| x = np.transpose(x, (2, 0, 1))[None, ...] |
| return x, (h, w) |
|
|
| def _normalize_predictions(self, raw: np.ndarray) -> np.ndarray: |
| |
| |
| |
| pred = raw[0] |
| if pred.ndim != 2: |
| raise ValueError(f"Unexpected prediction shape: {raw.shape}") |
| if pred.shape[0] < pred.shape[1]: |
| pred = pred.transpose(1, 0) |
| return pred |
|
|
| def _nms(self, dets: list[tuple[float, float, float, float, float, int]]) -> list[tuple[float, float, float, float, float, int]]: |
| if not dets: |
| return [] |
|
|
| boxes = np.array([[d[0], d[1], d[2], d[3]] for d in dets], dtype=np.float32) |
| scores = np.array([d[4] for d in dets], dtype=np.float32) |
| order = scores.argsort()[::-1] |
| keep = [] |
|
|
| while order.size > 0: |
| i = order[0] |
| keep.append(i) |
|
|
| xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0]) |
| yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1]) |
| xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2]) |
| yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3]) |
|
|
| w = np.maximum(0.0, xx2 - xx1) |
| h = np.maximum(0.0, yy2 - yy1) |
| inter = w * h |
|
|
| area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) |
| area_rest = (boxes[order[1:], 2] - boxes[order[1:], 0]) * (boxes[order[1:], 3] - boxes[order[1:], 1]) |
| union = np.maximum(area_i + area_rest - inter, 1e-6) |
| iou = inter / union |
|
|
| remaining = np.where(iou <= self.iou_threshold)[0] |
| order = order[remaining + 1] |
|
|
| return [dets[idx] for idx in keep] |
|
|
| def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]: |
| inp, (orig_h, orig_w) = self._preprocess(image_bgr) |
| out = self.session.run(None, {self.input_name: inp})[0] |
| pred = self._normalize_predictions(out) |
|
|
| if pred.shape[1] < 5: |
| return [] |
|
|
| boxes = pred[:, :4] |
| cls_scores = pred[:, 4:] |
|
|
| if cls_scores.shape[1] == 0: |
| return [] |
|
|
| cls_ids = np.argmax(cls_scores, axis=1) |
| confs = np.max(cls_scores, axis=1) |
| keep = confs >= self.conf_threshold |
|
|
| boxes = boxes[keep] |
| confs = confs[keep] |
| cls_ids = cls_ids[keep] |
|
|
| if boxes.shape[0] == 0: |
| return [] |
|
|
| sx = orig_w / float(self.input_w) |
| sy = orig_h / float(self.input_h) |
|
|
| dets: list[tuple[float, float, float, float, float, int]] = [] |
| for i in range(boxes.shape[0]): |
| cx, cy, bw, bh = boxes[i].tolist() |
| x1 = (cx - bw / 2.0) * sx |
| y1 = (cy - bh / 2.0) * sy |
| x2 = (cx + bw / 2.0) * sx |
| y2 = (cy + bh / 2.0) * sy |
| dets.append((x1, y1, x2, y2, float(confs[i]), int(cls_ids[i]))) |
|
|
| dets = self._nms(dets) |
|
|
| out_boxes: list[BoundingBox] = [] |
| for x1, y1, x2, y2, conf, cls_id in dets: |
| ix1 = max(0, min(orig_w, math.floor(x1))) |
| iy1 = max(0, min(orig_h, math.floor(y1))) |
| ix2 = max(0, min(orig_w, math.ceil(x2))) |
| iy2 = max(0, min(orig_h, math.ceil(y2))) |
| out_boxes.append( |
| BoundingBox( |
| x1=ix1, |
| y1=iy1, |
| x2=ix2, |
| y2=iy2, |
| cls_id=cls_id, |
| conf=max(0.0, min(1.0, conf)), |
| ) |
| ) |
| return out_boxes |
|
|
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[TVFrameResult]: |
| results: list[TVFrameResult] = [] |
| for idx, image in enumerate(batch_images): |
| boxes = self._infer_single(image) |
| keypoints = [(0, 0) for _ in range(max(0, int(n_keypoints)))] |
| results.append( |
| TVFrameResult( |
| frame_id=offset + idx, |
| boxes=boxes, |
| keypoints=keypoints, |
| ) |
| ) |
| return results |
|
|