| from pathlib import Path |
| import math |
|
|
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| from numpy import ndarray |
| from pydantic import BaseModel |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
|
|
| SIZE = 1280 |
|
|
|
|
| class Miner: |
| def __init__(self, path_hf_repo: Path) -> None: |
| model_path = path_hf_repo / "weights.onnx" |
| cn_path = model_path.with_name("class_names.txt") |
| if cn_path.is_file(): |
| lines = cn_path.read_text(encoding="utf-8").splitlines() |
| self.class_names = [ |
| ln.strip() |
| for ln in lines |
| if ln.strip() and not ln.strip().startswith("#") |
| ] |
| else: |
| self.class_names = ["numberplate"] |
| print("ORT version:", ort.__version__) |
|
|
| try: |
| ort.preload_dlls() |
| print("onnxruntime.preload_dlls() success") |
| except Exception as e: |
| print(f"preload_dlls failed: {e}") |
|
|
| print("ORT available providers BEFORE session:", ort.get_available_providers()) |
|
|
| try: |
| import torch |
| if torch.cuda.is_available(): |
| print(f"GPU: {torch.cuda.get_device_name(0)}") |
| print(f"GPU memory: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB") |
| else: |
| print("GPU: CUDA not available via torch") |
| except Exception as e: |
| print(f"GPU detection failed: {e}") |
|
|
| sess_options = ort.SessionOptions() |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
| try: |
| self.session = ort.InferenceSession( |
| str(model_path), |
| sess_options=sess_options, |
| providers=["CUDAExecutionProvider", "CPUExecutionProvider"], |
| ) |
| print("Created ORT session with preferred CUDA provider list") |
| except Exception as e: |
| print(f"CUDA session creation failed, falling back to CPU: {e}") |
| self.session = ort.InferenceSession( |
| str(model_path), |
| sess_options=sess_options, |
| providers=["CPUExecutionProvider"], |
| ) |
|
|
| print("ORT session providers:", self.session.get_providers()) |
|
|
| for inp in self.session.get_inputs(): |
| print("INPUT:", inp.name, inp.shape, inp.type) |
| for out in self.session.get_outputs(): |
| print("OUTPUT:", out.name, out.shape, out.type) |
|
|
| self.input_name = self.session.get_inputs()[0].name |
| self.output_names = [o.name for o in self.session.get_outputs()] |
| self.input_shape = self.session.get_inputs()[0].shape |
|
|
| self.input_height = self._safe_dim(self.input_shape[2], default=SIZE) |
| self.input_width = self._safe_dim(self.input_shape[3], default=SIZE) |
|
|
| self.conf_thres = 0.35 |
| self.iou_thres = 0.48 |
| self.sigma = 0.5 |
| self.max_det = 300 |
|
|
| self.sparse_threshold = 3 |
| self.tile_conf = 0.39 |
| self.tile_overlap = 0.12 |
| self.novelty_iou = 0.18 |
| self.final_max_det = 16 |
| self.tile_use_hflip = False |
|
|
| self.use_tta = True |
|
|
| print(f"ONNX model loaded from: {model_path}") |
| print(f"ONNX providers: {self.session.get_providers()}") |
| print(f"ONNX input: name={self.input_name}, shape={self.input_shape}") |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"ONNXRuntime(session={type(self.session).__name__}, " |
| f"providers={self.session.get_providers()})" |
| ) |
|
|
| @staticmethod |
| def _safe_dim(value, default: int) -> int: |
| return value if isinstance(value, int) and value > 0 else default |
|
|
| |
| def _letterbox( |
| self, |
| image: ndarray, |
| new_shape: tuple[int, int], |
| color=(114, 114, 114), |
| ) -> tuple[ndarray, float, tuple[float, float]]: |
| h, w = image.shape[:2] |
| new_w, new_h = new_shape |
| ratio = min(new_w / w, new_h / h) |
| resized_w = int(round(w * ratio)) |
| resized_h = int(round(h * ratio)) |
| if (resized_w, resized_h) != (w, h): |
| interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR |
| image = cv2.resize(image, (resized_w, resized_h), interpolation=interp) |
| dw = (new_w - resized_w) / 2.0 |
| dh = (new_h - resized_h) / 2.0 |
| left = int(round(dw - 0.1)) |
| right = int(round(dw + 0.1)) |
| top = int(round(dh - 0.1)) |
| bottom = int(round(dh + 0.1)) |
| padded = cv2.copyMakeBorder( |
| image, top, bottom, left, right, |
| borderType=cv2.BORDER_CONSTANT, value=color, |
| ) |
| return padded, ratio, (dw, dh) |
|
|
| def _preprocess(self, image: ndarray): |
| img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height)) |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0 |
| img = np.transpose(img, (2, 0, 1))[None, ...] |
| return np.ascontiguousarray(img, dtype=np.float32), ratio, pad |
|
|
| @staticmethod |
| def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray: |
| w, h = image_size |
| boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1) |
| boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1) |
| boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1) |
| boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1) |
| return boxes |
|
|
| |
| @staticmethod |
| def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray: |
| N = len(boxes) |
| if N == 0: |
| return np.array([], dtype=np.intp) |
| boxes = np.asarray(boxes, dtype=np.float32) |
| scores = np.asarray(scores, dtype=np.float32) |
| order = np.argsort(-scores) |
| keep: list[int] = [] |
| while len(order): |
| i = int(order[0]) |
| keep.append(i) |
| if len(order) == 1: |
| break |
| rest = order[1:] |
| xx1 = np.maximum(boxes[i, 0], boxes[rest, 0]) |
| yy1 = np.maximum(boxes[i, 1], boxes[rest, 1]) |
| xx2 = np.minimum(boxes[i, 2], boxes[rest, 2]) |
| yy2 = np.minimum(boxes[i, 3], boxes[rest, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) |
| area_r = (boxes[rest, 2] - boxes[rest, 0]) * (boxes[rest, 3] - boxes[rest, 1]) |
| iou = inter / (area_i + area_r - inter + 1e-7) |
| order = rest[iou <= iou_thresh] |
| return np.array(keep, dtype=np.intp) |
|
|
| def _soft_nms( |
| self, |
| boxes: np.ndarray, |
| scores: np.ndarray, |
| sigma: float, |
| score_thresh: float = 0.01, |
| ) -> tuple[np.ndarray, np.ndarray]: |
| N = len(boxes) |
| if N == 0: |
| return np.array([], dtype=np.intp), np.array([], dtype=np.float32) |
| boxes = boxes.astype(np.float32, copy=True) |
| scores = scores.astype(np.float32, copy=True) |
| order = np.arange(N) |
| for i in range(N): |
| max_pos = i + int(np.argmax(scores[i:])) |
| boxes[[i, max_pos]] = boxes[[max_pos, i]] |
| scores[[i, max_pos]] = scores[[max_pos, i]] |
| order[[i, max_pos]] = order[[max_pos, i]] |
| if i + 1 >= N: |
| break |
| xx1 = np.maximum(boxes[i, 0], boxes[i + 1:, 0]) |
| yy1 = np.maximum(boxes[i, 1], boxes[i + 1:, 1]) |
| xx2 = np.minimum(boxes[i, 2], boxes[i + 1:, 2]) |
| yy2 = np.minimum(boxes[i, 3], boxes[i + 1:, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| area_i = float( |
| (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) |
| ) |
| areas_j = ( |
| np.maximum(0.0, boxes[i + 1:, 2] - boxes[i + 1:, 0]) |
| * np.maximum(0.0, boxes[i + 1:, 3] - boxes[i + 1:, 1]) |
| ) |
| iou = inter / (area_i + areas_j - inter + 1e-7) |
| scores[i + 1:] *= np.exp(-(iou ** 2) / sigma) |
| mask = scores > score_thresh |
| return order[mask], scores[mask] |
|
|
| @staticmethod |
| def _box_iou_one_to_many(box: np.ndarray, boxes: np.ndarray) -> np.ndarray: |
| if len(boxes) == 0: |
| return np.zeros(0, dtype=np.float32) |
| xx1 = np.maximum(box[0], boxes[:, 0]) |
| yy1 = np.maximum(box[1], boxes[:, 1]) |
| xx2 = np.minimum(box[2], boxes[:, 2]) |
| yy2 = np.minimum(box[3], boxes[:, 3]) |
| inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) |
| area_a = max(0.0, (box[2] - box[0]) * (box[3] - box[1])) |
| area_b = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1]) |
| return inter / (area_a + area_b - inter + 1e-7) |
|
|
| |
| def _raw_dets(self, image: ndarray, conf: float) -> np.ndarray: |
| """Run a single forward pass and return [N, 5] dets in ORIGINAL image coords.""" |
| x, ratio, (dw, dh) = self._preprocess(image) |
| out = self.session.run(self.output_names, {self.input_name: x})[0] |
| if out.ndim == 3: |
| out = out[0] |
| if out.shape[1] < 5: |
| return np.zeros((0, 5), dtype=np.float32) |
| boxes = out[:, :4].astype(np.float32) |
| scores = out[:, 4].astype(np.float32) |
| keep = scores >= conf |
| boxes, scores = boxes[keep], scores[keep] |
| if len(boxes) == 0: |
| return np.zeros((0, 5), dtype=np.float32) |
| boxes[:, [0, 2]] -= dw |
| boxes[:, [1, 3]] -= dh |
| boxes /= ratio |
| oh, ow = image.shape[:2] |
| boxes = self._clip_boxes(boxes, (ow, oh)) |
| return np.concatenate([boxes, scores[:, None]], axis=1) |
|
|
| |
| def _primary(self, image: ndarray) -> np.ndarray: |
| d1 = self._raw_dets(image, self.conf_thres) |
| flipped = cv2.flip(image, 1) |
| d2 = self._raw_dets(flipped, self.conf_thres) |
| if len(d2): |
| w = image.shape[1] |
| x1 = w - d2[:, 2] |
| x2 = w - d2[:, 0] |
| d2 = np.stack([x1, d2[:, 1], x2, d2[:, 3], d2[:, 4]], axis=1) |
| all_d = np.concatenate([d1, d2], axis=0) if len(d2) else d1 |
| if len(all_d) == 0: |
| return np.zeros((0, 5), dtype=np.float32) |
| |
| keep_idx, scores = self._soft_nms(all_d[:, :4].copy(), all_d[:, 4].copy(), sigma=self.sigma) |
| if len(keep_idx) == 0: |
| return np.zeros((0, 5), dtype=np.float32) |
| merged = np.concatenate([all_d[keep_idx, :4], scores[:, None]], axis=1) |
| keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres) |
| merged = merged[keep] |
| if len(merged) > self.max_det: |
| merged = merged[np.argsort(-merged[:, 4])[: self.max_det]] |
| return merged |
|
|
| |
| def _tile_augment(self, image: ndarray, primary: np.ndarray) -> np.ndarray: |
| """Run 2x2 overlapping tiles + hflip, novelty-merge into primary.""" |
| oh, ow = image.shape[:2] |
| tw, th = ow // 2, oh // 2 |
| ox, oy = int(tw * self.tile_overlap), int(th * self.tile_overlap) |
| tiles = [ |
| (0, 0, min(ow, tw + ox), min(oh, th + oy)), |
| (max(0, tw - ox), 0, ow, min(oh, th + oy)), |
| (0, max(0, th - oy), min(ow, tw + ox), oh), |
| (max(0, tw - ox), max(0, th - oy), ow, oh), |
| ] |
| collected: list[np.ndarray] = [] |
| for x1, y1, x2, y2 in tiles: |
| crop = image[y1:y2, x1:x2] |
| if crop.size == 0: |
| continue |
| d = self._raw_dets(crop, self.tile_conf) |
| if len(d): |
| d[:, 0] += x1 |
| d[:, 1] += y1 |
| d[:, 2] += x1 |
| d[:, 3] += y1 |
| collected.append(d) |
|
|
| |
| if self.tile_use_hflip: |
| flipped = cv2.flip(image, 1) |
| for x1, y1, x2, y2 in tiles: |
| fx1 = ow - x2 |
| fx2 = ow - x1 |
| if fx2 <= fx1: |
| continue |
| crop = flipped[y1:y2, fx1:fx2] |
| if crop.size == 0: |
| continue |
| d = self._raw_dets(crop, self.tile_conf) |
| if len(d): |
| d_un = d.copy() |
| d_un[:, 0] = (ow - (d[:, 2] + fx1)) |
| d_un[:, 2] = (ow - (d[:, 0] + fx1)) |
| d_un[:, 1] = d[:, 1] + y1 |
| d_un[:, 3] = d[:, 3] + y1 |
| collected.append(d_un) |
|
|
| if not collected: |
| return primary |
|
|
| tile_dets = np.concatenate(collected, axis=0) |
| keep = self._hard_nms(tile_dets[:, :4], tile_dets[:, 4], 0.5) |
| tile_dets = tile_dets[keep] |
|
|
| |
| if len(primary) > 0 and len(tile_dets) > 0: |
| mask = np.ones(len(tile_dets), dtype=bool) |
| for i in range(len(tile_dets)): |
| ious = self._box_iou_one_to_many(tile_dets[i, :4], primary[:, :4]) |
| if len(ious) and np.max(ious) >= self.novelty_iou: |
| mask[i] = False |
| tile_dets = tile_dets[mask] |
|
|
| if len(tile_dets) == 0: |
| return primary |
|
|
| |
| w = tile_dets[:, 2] - tile_dets[:, 0] |
| h = tile_dets[:, 3] - tile_dets[:, 1] |
| area = w * h |
| ar = np.maximum(w / np.maximum(h, 1e-6), h / np.maximum(w, 1e-6)) |
| img_area = float(ow * oh) |
| ok = (w >= 7) & (h >= 7) & (area >= 85) & (area <= 0.5 * img_area) & (ar <= 10.0) |
| tile_dets = tile_dets[ok] |
| if len(tile_dets) == 0: |
| return primary |
|
|
| merged = np.concatenate([primary, tile_dets], axis=0) |
| keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres) |
| merged = merged[keep] |
| if len(merged) > self.final_max_det: |
| merged = merged[np.argsort(-merged[:, 4])[: self.final_max_det]] |
| return merged |
|
|
| |
| def _predict_single(self, image: ndarray) -> list[BoundingBox]: |
| if image is None or not isinstance(image, np.ndarray) or image.ndim != 3: |
| return [] |
| if image.shape[0] <= 0 or image.shape[1] <= 0 or image.shape[2] != 3: |
| return [] |
| if image.dtype != np.uint8: |
| image = image.astype(np.uint8) |
|
|
| primary = self._primary(image) |
| if len(primary) < self.sparse_threshold: |
| dets = self._tile_augment(image, primary) |
| else: |
| dets = primary |
|
|
| results: list[BoundingBox] = [] |
| for row in dets: |
| x1, y1, x2, y2, conf = row.tolist() |
| if x2 <= x1 or y2 <= y1: |
| continue |
| results.append( |
| BoundingBox( |
| x1=int(math.floor(x1)), |
| y1=int(math.floor(y1)), |
| x2=int(math.ceil(x2)), |
| y2=int(math.ceil(y2)), |
| cls_id=0, |
| conf=float(conf), |
| ) |
| ) |
| return results |
|
|
| |
| def predict_batch( |
| self, |
| batch_images: list[ndarray], |
| offset: int, |
| n_keypoints: int, |
| ) -> list[TVFrameResult]: |
| results: list[TVFrameResult] = [] |
| for frame_number_in_batch, image in enumerate(batch_images): |
| try: |
| boxes = self._predict_single(image) |
| except Exception as e: |
| print(f"Inference failed for frame {offset + frame_number_in_batch}: {e}") |
| boxes = [] |
| results.append( |
| TVFrameResult( |
| frame_id=offset + frame_number_in_batch, |
| boxes=boxes, |
| keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], |
| ) |
| ) |
| return results |
|
|