licensy commited on
Commit
978bdb2
·
verified ·
1 Parent(s): cff6063

Upload miner.py

Browse files
Files changed (1) hide show
  1. miner.py +139 -0
miner.py ADDED
@@ -0,0 +1,139 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+ import math
3
+
4
+ import cv2
5
+ import numpy as np
6
+ import onnxruntime as ort
7
+ from numpy import ndarray
8
+ from pydantic import BaseModel
9
+
10
+
11
+ class BoundingBox(BaseModel):
12
+ x1: int
13
+ y1: int
14
+ x2: int
15
+ y2: int
16
+ cls_id: int
17
+ conf: float
18
+
19
+
20
+ class TVFrameResult(BaseModel):
21
+ frame_id: int
22
+ boxes: list[BoundingBox]
23
+ keypoints: list[tuple[int, int]]
24
+
25
+
26
+ class Miner:
27
+ """
28
+ Auto-generated by subnet_bridge from a Manako element repo.
29
+ This miner is intentionally self-contained for chute import restrictions.
30
+ """
31
+
32
+ def __init__(self, path_hf_repo: Path) -> None:
33
+ self.path_hf_repo = path_hf_repo
34
+ self.class_names = ['numberplate']
35
+ self.session = ort.InferenceSession(
36
+ str(path_hf_repo / "weights.onnx"),
37
+ providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
38
+ )
39
+ self.input_name = self.session.get_inputs()[0].name
40
+ input_shape = self.session.get_inputs()[0].shape
41
+ self.input_h = int(input_shape[2])
42
+ self.input_w = int(input_shape[3])
43
+ self.conf_threshold = 0.15
44
+ self.iou_threshold = 0.3
45
+
46
+ def __repr__(self) -> str:
47
+ return f"ONNX Miner session={type(self.session).__name__} classes={len(self.class_names)}"
48
+
49
+ def _preprocess(self, image_bgr: ndarray) -> tuple[np.ndarray, tuple[int, int]]:
50
+ h, w = image_bgr.shape[:2]
51
+ rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
52
+ resized = cv2.resize(rgb, (self.input_w, self.input_h))
53
+ x = resized.astype(np.float32) / 255.0
54
+ x = np.transpose(x, (2, 0, 1))[None, ...]
55
+ return x, (h, w)
56
+
57
+ def _normalize_predictions(self, raw: np.ndarray) -> np.ndarray:
58
+ pred = raw[0]
59
+ if pred.ndim != 2:
60
+ raise ValueError(f"Unexpected prediction shape: {raw.shape}")
61
+ if pred.shape[0] < pred.shape[1]:
62
+ pred = pred.transpose(1, 0)
63
+ return pred
64
+
65
+ def _nms(self, dets: list[tuple[float, float, float, float, float, int]]) -> list[tuple[float, float, float, float, float, int]]:
66
+ if not dets:
67
+ return []
68
+ boxes = np.array([[d[0], d[1], d[2], d[3]] for d in dets], dtype=np.float32)
69
+ scores = np.array([d[4] for d in dets], dtype=np.float32)
70
+ order = scores.argsort()[::-1]
71
+ keep = []
72
+ while order.size > 0:
73
+ i = order[0]
74
+ keep.append(i)
75
+ xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
76
+ yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
77
+ xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
78
+ yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
79
+ w = np.maximum(0.0, xx2 - xx1)
80
+ h = np.maximum(0.0, yy2 - yy1)
81
+ inter = w * h
82
+ area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
83
+ area_rest = (boxes[order[1:], 2] - boxes[order[1:], 0]) * (boxes[order[1:], 3] - boxes[order[1:], 1])
84
+ union = np.maximum(area_i + area_rest - inter, 1e-6)
85
+ iou = inter / union
86
+ remaining = np.where(iou <= self.iou_threshold)[0]
87
+ order = order[remaining + 1]
88
+ return [dets[idx] for idx in keep]
89
+
90
+ def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]:
91
+ inp, (orig_h, orig_w) = self._preprocess(image_bgr)
92
+ out = self.session.run(None, {self.input_name: inp})[0]
93
+ pred = self._normalize_predictions(out)
94
+ if pred.shape[1] < 5:
95
+ return []
96
+ boxes = pred[:, :4]
97
+ cls_scores = pred[:, 4:]
98
+ if cls_scores.shape[1] == 0:
99
+ return []
100
+ cls_ids = np.argmax(cls_scores, axis=1)
101
+ confs = np.max(cls_scores, axis=1)
102
+ keep = confs >= self.conf_threshold
103
+ boxes = boxes[keep]
104
+ confs = confs[keep]
105
+ cls_ids = cls_ids[keep]
106
+ if boxes.shape[0] == 0:
107
+ return []
108
+ sx = orig_w / float(self.input_w)
109
+ sy = orig_h / float(self.input_h)
110
+ dets: list[tuple[float, float, float, float, float, int]] = []
111
+ for i in range(boxes.shape[0]):
112
+ cx, cy, bw, bh = boxes[i].tolist()
113
+ x1 = (cx - bw / 2.0) * sx
114
+ y1 = (cy - bh / 2.0) * sy
115
+ x2 = (cx + bw / 2.0) * sx
116
+ y2 = (cy + bh / 2.0) * sy
117
+ dets.append((x1, y1, x2, y2, float(confs[i]), int(cls_ids[i])))
118
+ dets = self._nms(dets)
119
+ out_boxes: list[BoundingBox] = []
120
+ for x1, y1, x2, y2, conf, cls_id in dets:
121
+ ix1 = max(0, min(orig_w, math.floor(x1)))
122
+ iy1 = max(0, min(orig_h, math.floor(y1)))
123
+ ix2 = max(0, min(orig_w, math.ceil(x2)))
124
+ iy2 = max(0, min(orig_h, math.ceil(y2)))
125
+ out_boxes.append(
126
+ BoundingBox(x1=ix1, y1=iy1, x2=ix2, y2=iy2,
127
+ cls_id=cls_id, conf=max(0.0, min(1.0, conf))))
128
+ return out_boxes
129
+
130
+ def predict_batch(
131
+ self, batch_images: list[ndarray], offset: int, n_keypoints: int,
132
+ ) -> list[TVFrameResult]:
133
+ results: list[TVFrameResult] = []
134
+ for idx, image in enumerate(batch_images):
135
+ boxes = self._infer_single(image)
136
+ keypoints = [(0, 0) for _ in range(max(0, int(n_keypoints)))]
137
+ results.append(TVFrameResult(
138
+ frame_id=offset + idx, boxes=boxes, keypoints=keypoints))
139
+ return results