ScoreVision-v2 / miner.py
iotaminer's picture
v2: YOLOv11m INT8 QDQ (21.5MB) + lowered hose threshold 0.22
79a04e5 verified
"""TurboVision miner for Detect-petrol-station-1-0.
YOLOv11m static-INT8 QDQ ONNX (21.5MB) + horizontal-flip TTA.
4 classes: 0=petrol hose, 1=petrol pump, 2=price board, 3=roof canopy.
Competitive tuning notes:
- Lower per-class confidence thresholds to capture more petrol hoses (small thin objects
that our previous 0.43 threshold was filtering out).
- YOLOv11m body (166 Conv) is more capable than YOLOv11s at detecting small objects.
- Static QDQ INT8 keeps size <30MB while preserving mAP within a few percent of FP32.
"""
from __future__ import annotations
from pathlib import Path
from typing import List, Tuple
import cv2
import numpy as np
import onnxruntime as ort
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
IMGSZ = 1280
CLASS_CONF_THRES = (0.22, 0.35, 0.22, 0.30)
CONF_THRES = 0.22
IOU_THRES = 0.45
NUM_CLASSES = 4
MIN_BOX_FRAC = 0.003
USE_TTA = True
MAX_DETS = 300
def __init__(self, path_hf_repo: Path) -> None:
self.onnx_path = path_hf_repo / 'weights.onnx'
if not self.onnx_path.exists():
raise FileNotFoundError(f'Model not found at {self.onnx_path}')
import os as _os
import site as _site
import glob as _glob
cuda_lib_dirs: list[str] = []
for sp in _site.getsitepackages() + [_site.getusersitepackages()]:
for sub in ('nvidia/cuda_runtime/lib', 'nvidia/cublas/lib', 'nvidia/cudnn/lib',
'nvidia/cufft/lib', 'nvidia/cuda_nvrtc/lib', 'nvidia/curand/lib',
'nvidia/cusparse/lib', 'nvidia/cusolver/lib', 'nvidia/nvjitlink/lib'):
p = f'{sp}/{sub}'
if _glob.glob(f'{p}/*.so*'):
cuda_lib_dirs.append(p)
if cuda_lib_dirs:
existing = _os.environ.get('LD_LIBRARY_PATH', '')
_os.environ['LD_LIBRARY_PATH'] = ':'.join(cuda_lib_dirs + ([existing] if existing else []))
providers: list = []
try:
ort.preload_dlls()
except Exception as _pe:
print(f'[Miner] preload_dlls failed: {_pe}')
available = ort.get_available_providers()
if 'CUDAExecutionProvider' in available:
providers.append(('CUDAExecutionProvider', {'device_id': 0}))
providers.append('CPUExecutionProvider')
so = ort.SessionOptions()
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(str(self.onnx_path), sess_options=so, providers=providers)
self.input_name = self.session.get_inputs()[0].name
inp = self.session.get_inputs()[0]
self.input_shape = inp.shape
self.input_dtype = np.float16 if inp.type == 'tensor(float16)' else np.float32
self.active_providers = self.session.get_providers()
print(f'[Miner] Loaded {self.onnx_path.name} | providers={self.active_providers} | dtype={self.input_dtype}')
print(f'[Miner] Thresholds: CLASS_CONF={self.CLASS_CONF_THRES}, TTA={self.USE_TTA}')
def __repr__(self) -> str:
return f'PetrolMiner(yolo11m-qdq-int8, tta={self.USE_TTA}, conf={self.CONF_THRES}, providers={getattr(self, "active_providers", "?")})'
@staticmethod
def _letterbox(img, new_size=1280, color=(114, 114, 114)):
h, w = img.shape[:2]
r = min(new_size / h, new_size / w)
nh, nw = int(round(h * r)), int(round(w * r))
resized = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR)
top = (new_size - nh) // 2
bottom = new_size - nh - top
left = (new_size - nw) // 2
right = new_size - nw - left
padded = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
return padded, r, (left, top)
def _preprocess(self, img):
h, w = img.shape[:2]
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
padded, r, (lx, ty) = self._letterbox(img_rgb, self.IMGSZ)
x = padded.astype(self.input_dtype) / 255.0
x = x.transpose(2, 0, 1)[None, ...]
return np.ascontiguousarray(x), r, (lx, ty), (w, h)
def _run_onnx(self, img):
x, r, (lx, ty), (W, H) = self._preprocess(img)
outputs = self.session.run(None, {self.input_name: x})
det = outputs[0]
if det.ndim == 3:
det = det[0]
if det.size == 0:
return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H
det = np.asarray(det, dtype=np.float32)
if det.shape[-1] < 6:
return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H
xyxy = det[:, :4].copy()
conf = det[:, 4].copy()
cls_id = det[:, 5].astype(int)
keep = conf >= self.CONF_THRES
xyxy, conf, cls_id = xyxy[keep], conf[keep], cls_id[keep]
if len(xyxy) == 0:
return np.empty((0, 4)), np.empty((0,)), np.empty((0,), dtype=int), W, H
xyxy[:, [0, 2]] = (xyxy[:, [0, 2]] - lx) / r
xyxy[:, [1, 3]] = (xyxy[:, [1, 3]] - ty) / r
xyxy[:, 0::2] = np.clip(xyxy[:, 0::2], 0, W - 1)
xyxy[:, 1::2] = np.clip(xyxy[:, 1::2], 0, H - 1)
min_side = self.MIN_BOX_FRAC * min(W, H)
mask = (
(cls_id >= 0) & (cls_id < self.NUM_CLASSES)
& ((xyxy[:, 2] - xyxy[:, 0]) >= min_side)
& ((xyxy[:, 3] - xyxy[:, 1]) >= min_side)
)
return xyxy[mask], conf[mask], cls_id[mask], W, H
@staticmethod
def _hard_nms_per_class(xyxy, conf, cls_id, iou_thres=0.5, max_per_class=100):
if len(xyxy) == 0:
return np.empty((0,), dtype=int)
keep = []
for c in np.unique(cls_id):
idx = np.where(cls_id == c)[0]
b = xyxy[idx]
s = conf[idx]
order = np.argsort(-s)
b = b[order]; s = s[order]; idx = idx[order]
areas = (b[:, 2] - b[:, 0]) * (b[:, 3] - b[:, 1])
suppressed = np.zeros(len(b), dtype=bool)
for i in range(len(b)):
if suppressed[i]:
continue
keep.append(idx[i])
if len([k for k in keep if cls_id[k] == c]) >= max_per_class:
break
xx1 = np.maximum(b[i, 0], b[i+1:, 0])
yy1 = np.maximum(b[i, 1], b[i+1:, 1])
xx2 = np.minimum(b[i, 2], b[i+1:, 2])
yy2 = np.minimum(b[i, 3], b[i+1:, 3])
inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1)
iou = inter / (areas[i] + areas[i+1:] - inter + 1e-9)
suppressed[i+1:][iou > iou_thres] = True
return np.array(keep, dtype=int)
def _predict_single(self, img):
xyxy1, conf1, cls1, W, H = self._run_onnx(img)
if not self.USE_TTA:
xyxy, conf, cls_id = xyxy1, conf1, cls1
else:
img_f = cv2.flip(img, 1)
xyxy2, conf2, cls2, _, _ = self._run_onnx(img_f)
if len(xyxy2) > 0:
tmp = xyxy2.copy()
tmp[:, 0] = W - xyxy2[:, 2]
tmp[:, 2] = W - xyxy2[:, 0]
xyxy2 = tmp
pieces_xyxy = [a for a in (xyxy1, xyxy2) if len(a) > 0]
pieces_conf = [a for a in (conf1, conf2) if len(a) > 0]
pieces_cls = [a for a in (cls1, cls2) if len(a) > 0]
xyxy = np.vstack(pieces_xyxy) if pieces_xyxy else np.empty((0, 4))
conf = np.concatenate(pieces_conf) if pieces_conf else np.empty((0,))
cls_id = np.concatenate(pieces_cls) if pieces_cls else np.empty((0,), dtype=int)
if len(xyxy) > 0:
keep = self._hard_nms_per_class(xyxy, conf, cls_id, iou_thres=self.IOU_THRES)
xyxy, conf, cls_id = xyxy[keep], conf[keep], cls_id[keep]
boxes = []
order = np.argsort(-conf) if len(conf) else np.empty((0,), dtype=int)
for i in order[:self.MAX_DETS]:
ci = int(cls_id[i])
if 0 <= ci < self.NUM_CLASSES and float(conf[i]) < self.CLASS_CONF_THRES[ci]:
continue
boxes.append(BoundingBox(
x1=int(round(float(xyxy[i, 0]))),
y1=int(round(float(xyxy[i, 1]))),
x2=int(round(float(xyxy[i, 2]))),
y2=int(round(float(xyxy[i, 3]))),
cls_id=ci,
conf=float(conf[i]),
))
return boxes
def predict_batch(self, batch_images, offset, n_keypoints):
results = []
for i, img in enumerate(batch_images):
try:
boxes = self._predict_single(img)
except Exception as e:
print(f'[Miner] predict error on frame {offset + i}: {e}')
boxes = []
kps = [(0, 0) for _ in range(n_keypoints)]
results.append(TVFrameResult(frame_id=offset + i, boxes=boxes, keypoints=kps))
return results