"""TurboVision miner for Detect-petrol-station-1-0. YOLOv11m static-INT8 QDQ ONNX (21.5MB) + horizontal-flip TTA. 4 classes: 0=petrol hose, 1=petrol pump, 2=price board, 3=roof canopy. Competitive tuning notes: - Lower per-class confidence thresholds to capture more petrol hoses (small thin objects that our previous 0.43 threshold was filtering out). - YOLOv11m body (166 Conv) is more capable than YOLOv11s at detecting small objects. - Static QDQ INT8 keeps size <30MB while preserving mAP within a few percent of FP32. """ from __future__ import annotations from pathlib import Path from typing import List, Tuple import cv2 import numpy as np import onnxruntime as ort from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: IMGSZ = 1280 CLASS_CONF_THRES = (0.22, 0.35, 0.22, 0.30) CONF_THRES = 0.22 IOU_THRES = 0.45 NUM_CLASSES = 4 MIN_BOX_FRAC = 0.003 USE_TTA = True MAX_DETS = 300 def __init__(self, path_hf_repo: Path) -> None: self.onnx_path = path_hf_repo / 'weights.onnx' if not self.onnx_path.exists(): raise FileNotFoundError(f'Model not found at {self.onnx_path}') import os as _os import site as _site import glob as _glob cuda_lib_dirs: list[str] = [] for sp in _site.getsitepackages() + [_site.getusersitepackages()]: for sub in ('nvidia/cuda_runtime/lib', 'nvidia/cublas/lib', 'nvidia/cudnn/lib', 'nvidia/cufft/lib', 'nvidia/cuda_nvrtc/lib', 'nvidia/curand/lib', 'nvidia/cusparse/lib', 'nvidia/cusolver/lib', 'nvidia/nvjitlink/lib'): p = f'{sp}/{sub}' if _glob.glob(f'{p}/*.so*'): cuda_lib_dirs.append(p) if cuda_lib_dirs: existing = _os.environ.get('LD_LIBRARY_PATH', '') _os.environ['LD_LIBRARY_PATH'] = ':'.join(cuda_lib_dirs + ([existing] if existing else [])) providers: list = [] try: ort.preload_dlls() except Exception as _pe: print(f'[Miner] preload_dlls failed: {_pe}') available = ort.get_available_providers() if 'CUDAExecutionProvider' in available: providers.append(('CUDAExecutionProvider', {'device_id': 0})) providers.append('CPUExecutionProvider') so = ort.SessionOptions() so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL self.session = ort.InferenceSession(str(self.onnx_path), sess_options=so, providers=providers) self.input_name = self.session.get_inputs()[0].name inp = self.session.get_inputs()[0] self.input_shape = inp.shape self.input_dtype = np.float16 if inp.type == 'tensor(float16)' else np.float32 self.active_providers = self.session.get_providers() print(f'[Miner] Loaded {self.onnx_path.name} | providers={self.active_providers} | dtype={self.input_dtype}') print(f'[Miner] Thresholds: CLASS_CONF={self.CLASS_CONF_THRES}, TTA={self.USE_TTA}') def __repr__(self) -> str: return f'PetrolMiner(yolo11m-qdq-int8, tta={self.USE_TTA}, conf={self.CONF_THRES}, providers={getattr(self, "active_providers", "?")})' @staticmethod def _letterbox(img, new_size=1280, color=(114, 114, 114)): h, w = img.shape[:2] r = min(new_size / h, new_size / w) nh, nw = int(round(h * r)), int(round(w * r)) resized = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR) top = (new_size - nh) // 2 bottom = new_size - nh - top left = (new_size - nw) // 2 right = new_size - nw - left padded = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) return padded, r, (left, top) def _preprocess(self, img): h, w = img.shape[:2] img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) padded, r, (lx, ty) = self._letterbox(img_rgb, self.IMGSZ) x = padded.astype(self.input_dtype) / 255.0 x = x.transpose(2, 0, 1)[None, ...] return np.ascontiguousarray(x), r, (lx, ty), (w, h) def _run_onnx(self, img): x, r, (lx, ty), (W, H) = self._preprocess(img) outputs = self.session.run(None, {self.input_name: x}) det = outputs[0] if det.ndim == 3: det = det[0] if det.size == 0: return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H det = np.asarray(det, dtype=np.float32) if det.shape[-1] < 6: return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H xyxy = det[:, :4].copy() conf = det[:, 4].copy() cls_id = det[:, 5].astype(int) keep = conf >= self.CONF_THRES xyxy, conf, cls_id = xyxy[keep], conf[keep], cls_id[keep] if len(xyxy) == 0: return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H xyxy[:, [0, 2]] = (xyxy[:, [0, 2]] - lx) / r xyxy[:, [1, 3]] = (xyxy[:, [1, 3]] - ty) / r xyxy[:, 0::2] = np.clip(xyxy[:, 0::2], 0, W - 1) xyxy[:, 1::2] = np.clip(xyxy[:, 1::2], 0, H - 1) min_side = self.MIN_BOX_FRAC * min(W, H) mask = ( (cls_id >= 0) & (cls_id < self.NUM_CLASSES) & ((xyxy[:, 2] - xyxy[:, 0]) >= min_side) & ((xyxy[:, 3] - xyxy[:, 1]) >= min_side) ) return xyxy[mask], conf[mask], cls_id[mask], W, H @staticmethod def _hard_nms_per_class(xyxy, conf, cls_id, iou_thres=0.5, max_per_class=100): if len(xyxy) == 0: return np.empty((0,), dtype=int) keep = [] for c in np.unique(cls_id): idx = np.where(cls_id == c)[0] b = xyxy[idx] s = conf[idx] order = np.argsort(-s) b = b[order]; s = s[order]; idx = idx[order] areas = (b[:, 2] - b[:, 0]) * (b[:, 3] - b[:, 1]) suppressed = np.zeros(len(b), dtype=bool) for i in range(len(b)): if suppressed[i]: continue keep.append(idx[i]) if len([k for k in keep if cls_id[k] == c]) >= max_per_class: break xx1 = np.maximum(b[i, 0], b[i+1:, 0]) yy1 = np.maximum(b[i, 1], b[i+1:, 1]) xx2 = np.minimum(b[i, 2], b[i+1:, 2]) yy2 = np.minimum(b[i, 3], b[i+1:, 3]) inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1) iou = inter / (areas[i] + areas[i+1:] - inter + 1e-9) suppressed[i+1:][iou > iou_thres] = True return np.array(keep, dtype=int) def _predict_single(self, img): xyxy1, conf1, cls1, W, H = self._run_onnx(img) if not self.USE_TTA: xyxy, conf, cls_id = xyxy1, conf1, cls1 else: img_f = cv2.flip(img, 1) xyxy2, conf2, cls2, _, _ = self._run_onnx(img_f) if len(xyxy2) > 0: tmp = xyxy2.copy() tmp[:, 0] = W - xyxy2[:, 2] tmp[:, 2] = W - xyxy2[:, 0] xyxy2 = tmp pieces_xyxy = [a for a in (xyxy1, xyxy2) if len(a) > 0] pieces_conf = [a for a in (conf1, conf2) if len(a) > 0] pieces_cls = [a for a in (cls1, cls2) if len(a) > 0] xyxy = np.vstack(pieces_xyxy) if pieces_xyxy else np.empty((0, 4)) conf = np.concatenate(pieces_conf) if pieces_conf else np.empty((0,)) cls_id = np.concatenate(pieces_cls) if pieces_cls else np.empty((0,), dtype=int) if len(xyxy) > 0: keep = self._hard_nms_per_class(xyxy, conf, cls_id, iou_thres=self.IOU_THRES) xyxy, conf, cls_id = xyxy[keep], conf[keep], cls_id[keep] boxes = [] order = np.argsort(-conf) if len(conf) else np.empty((0,), dtype=int) for i in order[:self.MAX_DETS]: ci = int(cls_id[i]) if 0 <= ci < self.NUM_CLASSES and float(conf[i]) < self.CLASS_CONF_THRES[ci]: continue boxes.append(BoundingBox( x1=int(round(float(xyxy[i, 0]))), y1=int(round(float(xyxy[i, 1]))), x2=int(round(float(xyxy[i, 2]))), y2=int(round(float(xyxy[i, 3]))), cls_id=ci, conf=float(conf[i]), )) return boxes def predict_batch(self, batch_images, offset, n_keypoints): results = [] for i, img in enumerate(batch_images): try: boxes = self._predict_single(img) except Exception as e: print(f'[Miner] predict error on frame {offset + i}: {e}') boxes = [] kps = [(0, 0) for _ in range(n_keypoints)] results.append(TVFrameResult(frame_id=offset + i, boxes=boxes, keypoints=kps)) return results