from pathlib import Path import math import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] SIZE = 1280 class Miner: def __init__(self, path_hf_repo: Path) -> None: model_path = path_hf_repo / "weights.onnx" cn_path = model_path.with_name("class_names.txt") if cn_path.is_file(): lines = cn_path.read_text(encoding="utf-8").splitlines() self.class_names = [ ln.strip() for ln in lines if ln.strip() and not ln.strip().startswith("#") ] else: self.class_names = ["numberplate"] print("ORT version:", ort.__version__) try: ort.preload_dlls() print("onnxruntime.preload_dlls() success") except Exception as e: print(f"preload_dlls failed: {e}") print("ORT available providers BEFORE session:", ort.get_available_providers()) try: import torch if torch.cuda.is_available(): print(f"GPU: {torch.cuda.get_device_name(0)}") print(f"GPU memory: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB") else: print("GPU: CUDA not available via torch") except Exception as e: print(f"GPU detection failed: {e}") sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) print("Created ORT session with preferred CUDA provider list") except Exception as e: print(f"CUDA session creation failed, falling back to CPU: {e}") self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CPUExecutionProvider"], ) print("ORT session providers:", self.session.get_providers()) for inp in self.session.get_inputs(): print("INPUT:", inp.name, inp.shape, inp.type) for out in self.session.get_outputs(): print("OUTPUT:", out.name, out.shape, out.type) self.input_name = self.session.get_inputs()[0].name self.output_names = [o.name for o in self.session.get_outputs()] self.input_shape = self.session.get_inputs()[0].shape self.input_height = self._safe_dim(self.input_shape[2], default=SIZE) self.input_width = self._safe_dim(self.input_shape[3], default=SIZE) # Primary pass: alfred001 tuning (optimized for hermestech weights) self.conf_thres = 0.26 self.iou_thres = 0.39 self.sigma = 0.465 self.max_det = 300 # Conditional tile-pass (trimmed for latency: no hflip, tighter sparse) self.sparse_threshold = 3 # fire tiles only if primary returns < this self.tile_conf = 0.57 self.tile_overlap = 0.20 self.novelty_iou = 0.10 self.final_max_det = 17 self.tile_use_hflip = False # skip hflip tile pass to save ~4 forwards self.use_tta = True print(f"ONNX model loaded from: {model_path}") print(f"ONNX providers: {self.session.get_providers()}") print(f"ONNX input: name={self.input_name}, shape={self.input_shape}") def __repr__(self) -> str: return ( f"ONNXRuntime(session={type(self.session).__name__}, " f"providers={self.session.get_providers()})" ) @staticmethod def _safe_dim(value, default: int) -> int: return value if isinstance(value, int) and value > 0 else default # ---------- image preprocessing ---------- def _letterbox( self, image: ndarray, new_shape: tuple[int, int], color=(114, 114, 114), ) -> tuple[ndarray, float, tuple[float, float]]: h, w = image.shape[:2] new_w, new_h = new_shape ratio = min(new_w / w, new_h / h) resized_w = int(round(w * ratio)) resized_h = int(round(h * ratio)) if (resized_w, resized_h) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR image = cv2.resize(image, (resized_w, resized_h), interpolation=interp) dw = (new_w - resized_w) / 2.0 dh = (new_h - resized_h) / 2.0 left = int(round(dw - 0.1)) right = int(round(dw + 0.1)) top = int(round(dh - 0.1)) bottom = int(round(dh + 0.1)) padded = cv2.copyMakeBorder( image, top, bottom, left, right, borderType=cv2.BORDER_CONSTANT, value=color, ) return padded, ratio, (dw, dh) def _preprocess(self, image: ndarray): img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0 img = np.transpose(img, (2, 0, 1))[None, ...] return np.ascontiguousarray(img, dtype=np.float32), ratio, pad @staticmethod def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray: w, h = image_size boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1) boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1) boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1) boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1) return boxes # ---------- NMS primitives ---------- @staticmethod def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray: N = len(boxes) if N == 0: return np.array([], dtype=np.intp) boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) order = np.argsort(-scores) keep: list[int] = [] while len(order): i = int(order[0]) keep.append(i) if len(order) == 1: break rest = order[1:] xx1 = np.maximum(boxes[i, 0], boxes[rest, 0]) yy1 = np.maximum(boxes[i, 1], boxes[rest, 1]) xx2 = np.minimum(boxes[i, 2], boxes[rest, 2]) yy2 = np.minimum(boxes[i, 3], boxes[rest, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) area_r = (boxes[rest, 2] - boxes[rest, 0]) * (boxes[rest, 3] - boxes[rest, 1]) iou = inter / (area_i + area_r - inter + 1e-7) order = rest[iou <= iou_thresh] return np.array(keep, dtype=np.intp) def _soft_nms( self, boxes: np.ndarray, scores: np.ndarray, sigma: float, score_thresh: float = 0.01, ) -> tuple[np.ndarray, np.ndarray]: N = len(boxes) if N == 0: return np.array([], dtype=np.intp), np.array([], dtype=np.float32) boxes = boxes.astype(np.float32, copy=True) scores = scores.astype(np.float32, copy=True) order = np.arange(N) for i in range(N): max_pos = i + int(np.argmax(scores[i:])) boxes[[i, max_pos]] = boxes[[max_pos, i]] scores[[i, max_pos]] = scores[[max_pos, i]] order[[i, max_pos]] = order[[max_pos, i]] if i + 1 >= N: break xx1 = np.maximum(boxes[i, 0], boxes[i + 1:, 0]) yy1 = np.maximum(boxes[i, 1], boxes[i + 1:, 1]) xx2 = np.minimum(boxes[i, 2], boxes[i + 1:, 2]) yy2 = np.minimum(boxes[i, 3], boxes[i + 1:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = float( (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) ) areas_j = ( np.maximum(0.0, boxes[i + 1:, 2] - boxes[i + 1:, 0]) * np.maximum(0.0, boxes[i + 1:, 3] - boxes[i + 1:, 1]) ) iou = inter / (area_i + areas_j - inter + 1e-7) scores[i + 1:] *= np.exp(-(iou ** 2) / sigma) mask = scores > score_thresh return order[mask], scores[mask] @staticmethod def _box_iou_one_to_many(box: np.ndarray, boxes: np.ndarray) -> np.ndarray: if len(boxes) == 0: return np.zeros(0, dtype=np.float32) xx1 = np.maximum(box[0], boxes[:, 0]) yy1 = np.maximum(box[1], boxes[:, 1]) xx2 = np.minimum(box[2], boxes[:, 2]) yy2 = np.minimum(box[3], boxes[:, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_a = max(0.0, (box[2] - box[0]) * (box[3] - box[1])) area_b = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1]) return inter / (area_a + area_b - inter + 1e-7) # ---------- raw-dets helper ---------- def _raw_dets(self, image: ndarray, conf: float) -> np.ndarray: """Run a single forward pass and return [N, 5] dets in ORIGINAL image coords.""" x, ratio, (dw, dh) = self._preprocess(image) out = self.session.run(self.output_names, {self.input_name: x})[0] if out.ndim == 3: out = out[0] if out.shape[1] < 5: return np.zeros((0, 5), dtype=np.float32) boxes = out[:, :4].astype(np.float32) scores = out[:, 4].astype(np.float32) keep = scores >= conf boxes, scores = boxes[keep], scores[keep] if len(boxes) == 0: return np.zeros((0, 5), dtype=np.float32) boxes[:, [0, 2]] -= dw boxes[:, [1, 3]] -= dh boxes /= ratio oh, ow = image.shape[:2] boxes = self._clip_boxes(boxes, (ow, oh)) return np.concatenate([boxes, scores[:, None]], axis=1) # ---------- primary pass: soft-NMS + hflip TTA ---------- def _primary(self, image: ndarray) -> np.ndarray: d1 = self._raw_dets(image, self.conf_thres) flipped = cv2.flip(image, 1) d2 = self._raw_dets(flipped, self.conf_thres) if len(d2): w = image.shape[1] x1 = w - d2[:, 2] x2 = w - d2[:, 0] d2 = np.stack([x1, d2[:, 1], x2, d2[:, 3], d2[:, 4]], axis=1) all_d = np.concatenate([d1, d2], axis=0) if len(d2) else d1 if len(all_d) == 0: return np.zeros((0, 5), dtype=np.float32) # soft-NMS, then hard-NMS keep_idx, scores = self._soft_nms(all_d[:, :4].copy(), all_d[:, 4].copy(), sigma=self.sigma) if len(keep_idx) == 0: return np.zeros((0, 5), dtype=np.float32) merged = np.concatenate([all_d[keep_idx, :4], scores[:, None]], axis=1) keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres) merged = merged[keep] if len(merged) > self.max_det: merged = merged[np.argsort(-merged[:, 4])[: self.max_det]] return merged # ---------- conditional tile pass ---------- def _tile_augment(self, image: ndarray, primary: np.ndarray) -> np.ndarray: """Run 2x2 overlapping tiles + hflip, novelty-merge into primary.""" oh, ow = image.shape[:2] tw, th = ow // 2, oh // 2 ox, oy = int(tw * self.tile_overlap), int(th * self.tile_overlap) tiles = [ (0, 0, min(ow, tw + ox), min(oh, th + oy)), (max(0, tw - ox), 0, ow, min(oh, th + oy)), (0, max(0, th - oy), min(ow, tw + ox), oh), (max(0, tw - ox), max(0, th - oy), ow, oh), ] collected: list[np.ndarray] = [] for x1, y1, x2, y2 in tiles: crop = image[y1:y2, x1:x2] if crop.size == 0: continue d = self._raw_dets(crop, self.tile_conf) if len(d): d[:, 0] += x1 d[:, 1] += y1 d[:, 2] += x1 d[:, 3] += y1 collected.append(d) # hflip tile pass (skipped when tile_use_hflip=False — saves 4 ONNX forwards) if self.tile_use_hflip: flipped = cv2.flip(image, 1) for x1, y1, x2, y2 in tiles: fx1 = ow - x2 fx2 = ow - x1 if fx2 <= fx1: continue crop = flipped[y1:y2, fx1:fx2] if crop.size == 0: continue d = self._raw_dets(crop, self.tile_conf) if len(d): d_un = d.copy() d_un[:, 0] = (ow - (d[:, 2] + fx1)) d_un[:, 2] = (ow - (d[:, 0] + fx1)) d_un[:, 1] = d[:, 1] + y1 d_un[:, 3] = d[:, 3] + y1 collected.append(d_un) if not collected: return primary tile_dets = np.concatenate(collected, axis=0) keep = self._hard_nms(tile_dets[:, :4], tile_dets[:, 4], 0.5) tile_dets = tile_dets[keep] # Novelty: drop tile boxes that overlap any primary box at IoU >= novelty_iou if len(primary) > 0 and len(tile_dets) > 0: mask = np.ones(len(tile_dets), dtype=bool) for i in range(len(tile_dets)): ious = self._box_iou_one_to_many(tile_dets[i, :4], primary[:, :4]) if len(ious) and np.max(ious) >= self.novelty_iou: mask[i] = False tile_dets = tile_dets[mask] if len(tile_dets) == 0: return primary # Sanity filter: min/max size, aspect ratio w = tile_dets[:, 2] - tile_dets[:, 0] h = tile_dets[:, 3] - tile_dets[:, 1] area = w * h ar = np.maximum(w / np.maximum(h, 1e-6), h / np.maximum(w, 1e-6)) img_area = float(ow * oh) ok = (w >= 7) & (h >= 7) & (area >= 85) & (area <= 0.5 * img_area) & (ar <= 10.0) tile_dets = tile_dets[ok] if len(tile_dets) == 0: return primary merged = np.concatenate([primary, tile_dets], axis=0) keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres) merged = merged[keep] if len(merged) > self.final_max_det: merged = merged[np.argsort(-merged[:, 4])[: self.final_max_det]] return merged # ---------- single-image predict ---------- def _predict_single(self, image: ndarray) -> list[BoundingBox]: if image is None or not isinstance(image, np.ndarray) or image.ndim != 3: return [] if image.shape[0] <= 0 or image.shape[1] <= 0 or image.shape[2] != 3: return [] if image.dtype != np.uint8: image = image.astype(np.uint8) primary = self._primary(image) if len(primary) < self.sparse_threshold: dets = self._tile_augment(image, primary) else: dets = primary results: list[BoundingBox] = [] for row in dets: x1, y1, x2, y2, conf = row.tolist() if x2 <= x1 or y2 <= y1: continue results.append( BoundingBox( x1=int(math.floor(x1)), y1=int(math.floor(y1)), x2=int(math.ceil(x2)), y2=int(math.ceil(y2)), cls_id=0, conf=float(conf), ) ) return results # ---------- chute entrypoint ---------- def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: results: list[TVFrameResult] = [] for frame_number_in_batch, image in enumerate(batch_images): try: boxes = self._predict_single(image) except Exception as e: print(f"Inference failed for frame {offset + frame_number_in_batch}: {e}") boxes = [] results.append( TVFrameResult( frame_id=offset + frame_number_in_batch, boxes=boxes, keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))], ) ) return results