ScoreVision-R11 / miner.py
nexu02's picture
R11 backup: miner.py
98903c8 verified
# Build: 2026-05-29 23:30 UTC R11 redeploy (force new revision)
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
"""ONNX Runtime miner. Hard global NMS + sanity filter + dedup + flip TTA, with per-class rescue bonus."""
class_names = ["cup", "bottle", "can"]
input_size = 1280
iou_thres = 0.4
cross_iou_thresh = 0.7
min_side = 8.0
min_box_area = 100.0
max_aspect_ratio = 10.0
max_det = 300
_conf_thres_array = np.array([0.6, 0.45, 0.5], dtype=np.float32)
_bonus_array = np.array([0.0, 0.0, 0.2], dtype=np.float32)
def __init__(self, path_hf_repo: Path) -> None:
model_path = path_hf_repo / "weights.onnx"
print("ORT version:", ort.__version__)
try:
ort.preload_dlls()
print("preload_dlls success")
except Exception as e:
print(f"preload_dlls failed: {e}")
print("ORT available providers BEFORE session:", ort.get_available_providers())
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
print("Created ORT session with preferred CUDA provider list")
except Exception as e:
print(f"CUDA session creation failed, falling back to CPU: {e}")
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
print("ORT session providers:", self.session.get_providers())
for inp in self.session.get_inputs():
print("INPUT:", inp.name, inp.shape, inp.type)
for out in self.session.get_outputs():
print("OUTPUT:", out.name, out.shape, out.type)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [output.name for output in self.session.get_outputs()]
self.input_shape = self.session.get_inputs()[0].shape
self.input_height = self._safe_dim(self.input_shape[2], default=self.input_size)
self.input_width = self._safe_dim(self.input_shape[3], default=self.input_size)
print(f"ONNX model loaded from: {model_path}")
print(f"ONNX providers: {self.session.get_providers()}")
print(f"ONNX input: name={self.input_name}, shape={self.input_shape}")
def __repr__(self) -> str:
return (
f"ONNXRuntime(session={type(self.session).__name__}, "
f"providers={self.session.get_providers()})"
)
@staticmethod
def _safe_dim(value, default: int) -> int:
return value if isinstance(value, int) and value > 0 else default
def _letterbox(self, image: ndarray, new_shape: tuple[int, int],
color=(114, 114, 114)
) -> tuple[ndarray, float, tuple[float, float]]:
h, w = image.shape[:2]
new_w, new_h = new_shape
ratio = min(new_w / w, new_h / h)
resized_w = int(round(w * ratio))
resized_h = int(round(h * ratio))
if (resized_w, resized_h) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
dw = (new_w - resized_w) / 2.0
dh = (new_h - resized_h) / 2.0
left = int(round(dw - 0.1))
right = int(round(dw + 0.1))
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
padded = cv2.copyMakeBorder(image, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=color)
return padded, ratio, (dw, dh)
def _preprocess(self, image: ndarray
) -> tuple[np.ndarray, float, tuple[float, float],
tuple[int, int]]:
orig_h, orig_w = image.shape[:2]
img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))[None, ...]
img = np.ascontiguousarray(img, dtype=np.float32)
return img, ratio, pad, (orig_w, orig_h)
@staticmethod
def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
w, h = image_size
boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
return boxes
@staticmethod
def _xywh_to_xyxy(boxes: np.ndarray) -> np.ndarray:
out = np.empty_like(boxes)
out[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0
out[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0
out[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0
out[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0
return out
@staticmethod
def _hard_nms(boxes: np.ndarray, scores: np.ndarray,
iou_thresh: float) -> np.ndarray:
n = len(boxes)
if n == 0:
return np.array([], dtype=np.intp)
order = np.argsort(-scores)
keep: list[int] = []
while len(order) > 0:
i = int(order[0])
keep.append(i)
if len(order) == 1:
break
rest = order[1:]
xx1 = np.maximum(boxes[i, 0], boxes[rest, 0])
yy1 = np.maximum(boxes[i, 1], boxes[rest, 1])
xx2 = np.minimum(boxes[i, 2], boxes[rest, 2])
yy2 = np.minimum(boxes[i, 3], boxes[rest, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
a_i = (max(0.0, boxes[i, 2] - boxes[i, 0]) *
max(0.0, boxes[i, 3] - boxes[i, 1]))
a_r = (np.maximum(0.0, boxes[rest, 2] - boxes[rest, 0]) *
np.maximum(0.0, boxes[rest, 3] - boxes[rest, 1]))
iou = inter / (a_i + a_r - inter + 1e-7)
order = rest[iou <= iou_thresh]
return np.array(keep, dtype=np.intp)
def _per_class_hard_nms(self, boxes: np.ndarray, scores: np.ndarray,
cls_ids: np.ndarray, iou_thresh: float
) -> np.ndarray:
if len(boxes) == 0:
return np.array([], dtype=np.intp)
all_keep: list[int] = []
for c in np.unique(cls_ids):
mask = cls_ids == c
indices = np.where(mask)[0]
keep = self._hard_nms(boxes[mask], scores[mask], iou_thresh)
all_keep.extend(indices[keep].tolist())
all_keep.sort()
return np.array(all_keep, dtype=np.intp)
def _cross_class_dedup_op(self, boxes: np.ndarray, scores: np.ndarray,
cls_ids: np.ndarray, iou_thresh: float
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
n = len(boxes)
if n <= 1:
return boxes, scores, cls_ids
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
cls_ids = np.asarray(cls_ids, dtype=np.int32)
areas = (np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) *
np.maximum(0.0, boxes[:, 3] - boxes[:, 1]))
margins = scores - self._conf_thres_array[cls_ids]
order = np.lexsort((-areas, -margins))
suppressed = np.zeros(n, dtype=bool)
keep: list[int] = []
for i in order:
if suppressed[i]:
continue
keep.append(int(i))
bi = boxes[i]
xx1 = np.maximum(bi[0], boxes[:, 0])
yy1 = np.maximum(bi[1], boxes[:, 1])
xx2 = np.minimum(bi[2], boxes[:, 2])
yy2 = np.minimum(bi[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
a_i = max(1e-7, float((bi[2] - bi[0]) * (bi[3] - bi[1])))
iou = inter / (a_i + areas - inter + 1e-7)
dup = iou > iou_thresh
dup[i] = False
suppressed |= dup
keep_idx = np.array(keep, dtype=np.intp)
return boxes[keep_idx], scores[keep_idx], cls_ids[keep_idx]
def _filter_sane_boxes(self, boxes: np.ndarray, scores: np.ndarray,
cls_ids: np.ndarray, orig_size: tuple[int, int]
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
if len(boxes) == 0:
return boxes, scores, cls_ids
orig_w, orig_h = orig_size
image_area = float(orig_w * orig_h)
bw = np.maximum(0.0, boxes[:, 2] - boxes[:, 0])
bh = np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
area = bw * bh
ar = np.where(
(bw > 0) & (bh > 0),
np.maximum(bw / np.maximum(bh, 1e-6), bh / np.maximum(bw, 1e-6)),
np.inf,
)
keep = (
(bw >= self.min_side) & (bh >= self.min_side) &
(area >= self.min_box_area) &
(area <= 0.95 * image_area) &
(ar <= self.max_aspect_ratio)
)
return boxes[keep], scores[keep], cls_ids[keep]
def _max_score_per_cluster(self, post_boxes: np.ndarray,
post_cls: np.ndarray,
full_boxes: np.ndarray,
full_scores: np.ndarray,
full_cls: np.ndarray,
iou_thresh: float) -> np.ndarray:
n = len(post_boxes)
if n == 0:
return np.empty(0, dtype=np.float32)
full_areas = (np.maximum(0.0, full_boxes[:, 2] - full_boxes[:, 0]) *
np.maximum(0.0, full_boxes[:, 3] - full_boxes[:, 1]))
out = np.empty(n, dtype=np.float32)
for i in range(n):
bi = post_boxes[i]
xx1 = np.maximum(bi[0], full_boxes[:, 0])
yy1 = np.maximum(bi[1], full_boxes[:, 1])
xx2 = np.minimum(bi[2], full_boxes[:, 2])
yy2 = np.minimum(bi[3], full_boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
a_i = max(0.0, float((bi[2] - bi[0]) * (bi[3] - bi[1])))
iou = inter / (a_i + full_areas - inter + 1e-7)
cluster = (iou >= iou_thresh) & (full_cls == post_cls[i])
out[i] = float(np.max(full_scores[cluster])) if np.any(cluster) else 0.0
return out
def _conf_filter_mask(self, scores: np.ndarray,
cls_ids: np.ndarray) -> np.ndarray:
"""Boolean keep-mask: score >= per-class threshold, with a per-class
rescue — if a class has zero boxes passing, admit its top-1 candidate
when its score >= (per-class threshold - per-class bonus)."""
if len(scores) == 0:
return np.zeros(0, dtype=bool)
thr = self._conf_thres_array[cls_ids]
keep = scores >= thr
for c in np.unique(cls_ids):
b = float(self._bonus_array[c])
if b <= 0.0:
continue
cm = cls_ids == c
if keep[cm].any():
continue
idx = np.where(cm)[0]
top = int(idx[int(np.argmax(scores[idx]))])
if scores[top] >= self._conf_thres_array[c] - b:
keep[top] = True
return keep
def _per_view_pipeline(self, boxes: np.ndarray, scores: np.ndarray,
cls_ids: np.ndarray, orig_size: tuple[int, int]
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
boxes, scores, cls_ids = self._filter_sane_boxes(
boxes, scores, cls_ids, orig_size
)
if len(boxes) == 0:
return boxes, scores, cls_ids
if len(boxes) > 1:
keep = self._hard_nms(boxes, scores, self.iou_thres)
boxes, scores, cls_ids = boxes[keep], scores[keep], cls_ids[keep]
if len(scores) > self.max_det:
top = np.argsort(-scores)[: self.max_det]
boxes, scores, cls_ids = boxes[top], scores[top], cls_ids[top]
if len(boxes) > 1:
boxes, scores, cls_ids = self._cross_class_dedup_op(
boxes, scores, cls_ids, self.cross_iou_thresh
)
return boxes, scores, cls_ids
def _decode_final_dets(self, preds: np.ndarray, ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int]) -> list[BoundingBox]:
if preds.ndim == 3 and preds.shape[0] == 1:
preds = preds[0]
if preds.ndim != 2 or preds.shape[1] < 6:
raise ValueError(f"Unexpected ONNX final-det output shape: {preds.shape}")
boxes = preds[:, :4].astype(np.float32)
scores = preds[:, 4].astype(np.float32)
cls_ids = preds[:, 5].astype(np.int32)
keep = self._conf_filter_mask(scores, cls_ids)
boxes = boxes[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes) == 0:
return []
pad_w, pad_h = pad
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, orig_size)
boxes, scores, cls_ids = self._per_view_pipeline(
boxes, scores, cls_ids, orig_size
)
return self._build_results(boxes, scores, cls_ids)
def _decode_raw_yolo(self, preds: np.ndarray, ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int]) -> list[BoundingBox]:
if preds.ndim != 3 or preds.shape[0] != 1:
raise ValueError(f"Unexpected raw ONNX output shape: {preds.shape}")
preds = preds[0]
if preds.shape[0] <= 16 and preds.shape[1] > preds.shape[0]:
preds = preds.T
if preds.ndim != 2 or preds.shape[1] < 5:
raise ValueError(f"Unexpected raw output shape: {preds.shape}")
boxes_xywh = preds[:, :4].astype(np.float32)
cls_part = preds[:, 4:].astype(np.float32)
if cls_part.shape[1] == 1:
scores = cls_part[:, 0]
cls_ids = np.zeros(len(scores), dtype=np.int32)
else:
cls_ids = np.argmax(cls_part, axis=1).astype(np.int32)
scores = cls_part[np.arange(len(cls_part)), cls_ids]
keep = self._conf_filter_mask(scores, cls_ids)
boxes_xywh = boxes_xywh[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes_xywh) == 0:
return []
boxes = self._xywh_to_xyxy(boxes_xywh)
pad_w, pad_h = pad
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, orig_size)
boxes, scores, cls_ids = self._per_view_pipeline(
boxes, scores, cls_ids, orig_size
)
return self._build_results(boxes, scores, cls_ids)
@staticmethod
def _build_results(boxes: np.ndarray, scores: np.ndarray,
cls_ids: np.ndarray) -> list[BoundingBox]:
results: list[BoundingBox] = []
for box, conf, cls_id in zip(boxes, scores, cls_ids):
x1, y1, x2, y2 = box.tolist()
if x2 <= x1 or y2 <= y1:
continue
results.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=int(cls_id),
conf=float(conf),
)
)
return results
def _postprocess(self, output: np.ndarray, ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int]) -> list[BoundingBox]:
if output.ndim == 2 and output.shape[1] >= 6:
return self._decode_final_dets(output, ratio, pad, orig_size)
if output.ndim == 3 and output.shape[0] == 1 and output.shape[2] == 6:
return self._decode_final_dets(output, ratio, pad, orig_size)
return self._decode_raw_yolo(output, ratio, pad, orig_size)
def _predict_single(self, image: np.ndarray) -> list[BoundingBox]:
if image is None:
raise ValueError("Input image is None")
if not isinstance(image, np.ndarray):
raise TypeError(f"Input is not numpy array: {type(image)}")
if image.ndim != 3:
raise ValueError(f"Expected HWC image, got shape={image.shape}")
if image.shape[2] != 3:
raise ValueError(f"Expected 3 channels, got shape={image.shape}")
if image.dtype != np.uint8:
image = image.astype(np.uint8)
input_tensor, ratio, pad, orig_size = self._preprocess(image)
expected = (1, 3, self.input_height, self.input_width)
if input_tensor.shape != expected:
raise ValueError(
f"Bad input tensor shape={input_tensor.shape}, expected={expected}"
)
outputs = self.session.run(self.output_names, {self.input_name: input_tensor})
return self._postprocess(outputs[0], ratio, pad, orig_size)
def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]:
boxes_orig = self._predict_single(image)
flipped = cv2.flip(image, 1)
boxes_flip = self._predict_single(flipped)
w = image.shape[1]
boxes_flip = [
BoundingBox(
x1=w - b.x2, y1=b.y1, x2=w - b.x1, y2=b.y2,
cls_id=b.cls_id, conf=b.conf,
)
for b in boxes_flip
]
all_boxes = boxes_orig + boxes_flip
if not all_boxes:
return []
coords = np.array(
[[b.x1, b.y1, b.x2, b.y2] for b in all_boxes], dtype=np.float32
)
scores = np.array([b.conf for b in all_boxes], dtype=np.float32)
cls_ids = np.array([b.cls_id for b in all_boxes], dtype=np.int32)
hard_keep = self._per_class_hard_nms(coords, scores, cls_ids, self.iou_thres)
if len(hard_keep) == 0:
return []
if len(hard_keep) > self.max_det:
top = np.argsort(-scores[hard_keep])[: self.max_det]
hard_keep = hard_keep[top]
boosted = self._max_score_per_cluster(
coords[hard_keep], cls_ids[hard_keep],
coords, scores, cls_ids, self.iou_thres,
)
kept_coords = coords[hard_keep]
kept_cls = cls_ids[hard_keep]
if len(kept_coords) > 1:
kept_coords, boosted, kept_cls = self._cross_class_dedup_op(
kept_coords, boosted, kept_cls, self.cross_iou_thresh
)
return [
BoundingBox(
x1=int(math.floor(kept_coords[j, 0])),
y1=int(math.floor(kept_coords[j, 1])),
x2=int(math.ceil(kept_coords[j, 2])),
y2=int(math.ceil(kept_coords[j, 3])),
cls_id=int(kept_cls[j]),
conf=float(boosted[j]),
)
for j in range(len(kept_coords))
]
def predict_batch(self, batch_images: list[ndarray], offset: int,
n_keypoints: int) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for frame_number_in_batch, image in enumerate(batch_images):
try:
boxes = self._predict_tta(image)
except Exception as e:
print(f"Inference failed for frame {offset + frame_number_in_batch}: {e}")
boxes = []
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
return results