| """TurboVision miner for Detect-petrol-station-1-0. |
| |
| YOLOv11m static-INT8 QDQ ONNX (21.5MB) + horizontal-flip TTA. |
| 4 classes: 0=petrol hose, 1=petrol pump, 2=price board, 3=roof canopy. |
| |
| Competitive tuning notes: |
| - Lower per-class confidence thresholds to capture more petrol hoses (small thin objects |
| that our previous 0.43 threshold was filtering out). |
| - YOLOv11m body (166 Conv) is more capable than YOLOv11s at detecting small objects. |
| - Static QDQ INT8 keeps size <30MB while preserving mAP within a few percent of FP32. |
| """ |
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import List, Tuple |
|
|
| import cv2 |
| import numpy as np |
| import onnxruntime as ort |
| from pydantic import BaseModel |
|
|
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
|
|
| class Miner: |
| IMGSZ = 1280 |
| CLASS_CONF_THRES = (0.22, 0.35, 0.22, 0.30) |
| CONF_THRES = 0.22 |
| IOU_THRES = 0.45 |
| NUM_CLASSES = 4 |
| MIN_BOX_FRAC = 0.003 |
| USE_TTA = True |
| MAX_DETS = 300 |
|
|
| def __init__(self, path_hf_repo: Path) -> None: |
| self.onnx_path = path_hf_repo / 'weights.onnx' |
| if not self.onnx_path.exists(): |
| raise FileNotFoundError(f'Model not found at {self.onnx_path}') |
|
|
| import os as _os |
| import site as _site |
| import glob as _glob |
| cuda_lib_dirs: list[str] = [] |
| for sp in _site.getsitepackages() + [_site.getusersitepackages()]: |
| for sub in ('nvidia/cuda_runtime/lib', 'nvidia/cublas/lib', 'nvidia/cudnn/lib', |
| 'nvidia/cufft/lib', 'nvidia/cuda_nvrtc/lib', 'nvidia/curand/lib', |
| 'nvidia/cusparse/lib', 'nvidia/cusolver/lib', 'nvidia/nvjitlink/lib'): |
| p = f'{sp}/{sub}' |
| if _glob.glob(f'{p}/*.so*'): |
| cuda_lib_dirs.append(p) |
| if cuda_lib_dirs: |
| existing = _os.environ.get('LD_LIBRARY_PATH', '') |
| _os.environ['LD_LIBRARY_PATH'] = ':'.join(cuda_lib_dirs + ([existing] if existing else [])) |
|
|
| providers: list = [] |
| try: |
| ort.preload_dlls() |
| except Exception as _pe: |
| print(f'[Miner] preload_dlls failed: {_pe}') |
| available = ort.get_available_providers() |
| if 'CUDAExecutionProvider' in available: |
| providers.append(('CUDAExecutionProvider', {'device_id': 0})) |
| providers.append('CPUExecutionProvider') |
| so = ort.SessionOptions() |
| so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
| self.session = ort.InferenceSession(str(self.onnx_path), sess_options=so, providers=providers) |
| self.input_name = self.session.get_inputs()[0].name |
| inp = self.session.get_inputs()[0] |
| self.input_shape = inp.shape |
| self.input_dtype = np.float16 if inp.type == 'tensor(float16)' else np.float32 |
| self.active_providers = self.session.get_providers() |
| print(f'[Miner] Loaded {self.onnx_path.name} | providers={self.active_providers} | dtype={self.input_dtype}') |
| print(f'[Miner] Thresholds: CLASS_CONF={self.CLASS_CONF_THRES}, TTA={self.USE_TTA}') |
|
|
| def __repr__(self) -> str: |
| return f'PetrolMiner(yolo11m-qdq-int8, tta={self.USE_TTA}, conf={self.CONF_THRES}, providers={getattr(self, "active_providers", "?")})' |
|
|
| @staticmethod |
| def _letterbox(img, new_size=1280, color=(114, 114, 114)): |
| h, w = img.shape[:2] |
| r = min(new_size / h, new_size / w) |
| nh, nw = int(round(h * r)), int(round(w * r)) |
| resized = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR) |
| top = (new_size - nh) // 2 |
| bottom = new_size - nh - top |
| left = (new_size - nw) // 2 |
| right = new_size - nw - left |
| padded = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) |
| return padded, r, (left, top) |
|
|
| def _preprocess(self, img): |
| h, w = img.shape[:2] |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| padded, r, (lx, ty) = self._letterbox(img_rgb, self.IMGSZ) |
| x = padded.astype(self.input_dtype) / 255.0 |
| x = x.transpose(2, 0, 1)[None, ...] |
| return np.ascontiguousarray(x), r, (lx, ty), (w, h) |
|
|
| def _run_onnx(self, img): |
| x, r, (lx, ty), (W, H) = self._preprocess(img) |
| outputs = self.session.run(None, {self.input_name: x}) |
| det = outputs[0] |
| if det.ndim == 3: |
| det = det[0] |
| if det.size == 0: |
| return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H |
| det = np.asarray(det, dtype=np.float32) |
| if det.shape[-1] < 6: |
| return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H |
| xyxy = det[:, :4].copy() |
| conf = det[:, 4].copy() |
| cls_id = det[:, 5].astype(int) |
| keep = conf >= self.CONF_THRES |
| xyxy, conf, cls_id = xyxy[keep], conf[keep], cls_id[keep] |
| if len(xyxy) == 0: |
| return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H |
| xyxy[:, [0, 2]] = (xyxy[:, [0, 2]] - lx) / r |
| xyxy[:, [1, 3]] = (xyxy[:, [1, 3]] - ty) / r |
| xyxy[:, 0::2] = np.clip(xyxy[:, 0::2], 0, W - 1) |
| xyxy[:, 1::2] = np.clip(xyxy[:, 1::2], 0, H - 1) |
| min_side = self.MIN_BOX_FRAC * min(W, H) |
| mask = ( |
| (cls_id >= 0) & (cls_id < self.NUM_CLASSES) |
| & ((xyxy[:, 2] - xyxy[:, 0]) >= min_side) |
| & ((xyxy[:, 3] - xyxy[:, 1]) >= min_side) |
| ) |
| return xyxy[mask], conf[mask], cls_id[mask], W, H |
|
|
| @staticmethod |
| def _hard_nms_per_class(xyxy, conf, cls_id, iou_thres=0.5, max_per_class=100): |
| if len(xyxy) == 0: |
| return np.empty((0,), dtype=int) |
| keep = [] |
| for c in np.unique(cls_id): |
| idx = np.where(cls_id == c)[0] |
| b = xyxy[idx] |
| s = conf[idx] |
| order = np.argsort(-s) |
| b = b[order]; s = s[order]; idx = idx[order] |
| areas = (b[:, 2] - b[:, 0]) * (b[:, 3] - b[:, 1]) |
| suppressed = np.zeros(len(b), dtype=bool) |
| for i in range(len(b)): |
| if suppressed[i]: |
| continue |
| keep.append(idx[i]) |
| if len([k for k in keep if cls_id[k] == c]) >= max_per_class: |
| break |
| xx1 = np.maximum(b[i, 0], b[i+1:, 0]) |
| yy1 = np.maximum(b[i, 1], b[i+1:, 1]) |
| xx2 = np.minimum(b[i, 2], b[i+1:, 2]) |
| yy2 = np.minimum(b[i, 3], b[i+1:, 3]) |
| inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1) |
| iou = inter / (areas[i] + areas[i+1:] - inter + 1e-9) |
| suppressed[i+1:][iou > iou_thres] = True |
| return np.array(keep, dtype=int) |
|
|
| def _predict_single(self, img): |
| xyxy1, conf1, cls1, W, H = self._run_onnx(img) |
| if not self.USE_TTA: |
| xyxy, conf, cls_id = xyxy1, conf1, cls1 |
| else: |
| img_f = cv2.flip(img, 1) |
| xyxy2, conf2, cls2, _, _ = self._run_onnx(img_f) |
| if len(xyxy2) > 0: |
| tmp = xyxy2.copy() |
| tmp[:, 0] = W - xyxy2[:, 2] |
| tmp[:, 2] = W - xyxy2[:, 0] |
| xyxy2 = tmp |
| pieces_xyxy = [a for a in (xyxy1, xyxy2) if len(a) > 0] |
| pieces_conf = [a for a in (conf1, conf2) if len(a) > 0] |
| pieces_cls = [a for a in (cls1, cls2) if len(a) > 0] |
| xyxy = np.vstack(pieces_xyxy) if pieces_xyxy else np.empty((0, 4)) |
| conf = np.concatenate(pieces_conf) if pieces_conf else np.empty((0,)) |
| cls_id = np.concatenate(pieces_cls) if pieces_cls else np.empty((0,), dtype=int) |
| if len(xyxy) > 0: |
| keep = self._hard_nms_per_class(xyxy, conf, cls_id, iou_thres=self.IOU_THRES) |
| xyxy, conf, cls_id = xyxy[keep], conf[keep], cls_id[keep] |
| boxes = [] |
| order = np.argsort(-conf) if len(conf) else np.empty((0,), dtype=int) |
| for i in order[:self.MAX_DETS]: |
| ci = int(cls_id[i]) |
| if 0 <= ci < self.NUM_CLASSES and float(conf[i]) < self.CLASS_CONF_THRES[ci]: |
| continue |
| boxes.append(BoundingBox( |
| x1=int(round(float(xyxy[i, 0]))), |
| y1=int(round(float(xyxy[i, 1]))), |
| x2=int(round(float(xyxy[i, 2]))), |
| y2=int(round(float(xyxy[i, 3]))), |
| cls_id=ci, |
| conf=float(conf[i]), |
| )) |
| return boxes |
|
|
| def predict_batch(self, batch_images, offset, n_keypoints): |
| results = [] |
| for i, img in enumerate(batch_images): |
| try: |
| boxes = self._predict_single(img) |
| except Exception as e: |
| print(f'[Miner] predict error on frame {offset + i}: {e}') |
| boxes = [] |
| kps = [(0, 0) for _ in range(n_keypoints)] |
| results.append(TVFrameResult(frame_id=offset + i, boxes=boxes, keypoints=kps)) |
| return results |
|
|