test001 / miner.py
alfred8995's picture
Update miner.py
e7014d0 verified
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
model_path = path_hf_repo / "weights.onnx"
self.class_names = ['bus', 'car', 'truck', 'motorcycle']
model_class_order = ['truck', 'car', 'bus', 'motorcycle']
self._train_cls_to_canonical = np.array(
[self.class_names.index(n) for n in model_class_order],
dtype=np.int32
)
print("ORT version:", ort.__version__)
try:
ort.preload_dlls()
print("✅ onnxruntime.preload_dlls() success")
except Exception as e:
print(f"⚠️ preload_dlls failed: {e}")
print("ORT available providers BEFORE session:", ort.get_available_providers())
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
print("✅ Created ORT session with preferred CUDA provider list")
except Exception as e:
print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}")
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
print("ORT session providers:", self.session.get_providers())
for inp in self.session.get_inputs():
print("INPUT:", inp.name, inp.shape, inp.type)
for out in self.session.get_outputs():
print("OUTPUT:", out.name, out.shape, out.type)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [output.name for output in self.session.get_outputs()]
self.input_shape = self.session.get_inputs()[0].shape
self.input_height = self._safe_dim(self.input_shape[2], default=1280)
self.input_width = self._safe_dim(self.input_shape[3], default=1280)
# ---------- Scoring-oriented thresholds ----------
# Low threshold for candidate generation
self.conf_thres = 0.2124
# High-confidence boxes can survive without TTA confirmation
self.conf_high = 0.7225
# NMS threshold
self.iou_thres = 0.7704
# TTA confirmation IoU
self.tta_match_iou = 0.5609
self.max_det = 264
self.use_tta = True
# Box sanity filters
self.min_box_area = 82
self.min_w = 15
self.min_h = 17
self.max_aspect_ratio = 3.6571
self.max_box_area_ratio = 0.7807
print(f"✅ ONNX model loaded from: {model_path}")
print(f"✅ ONNX providers: {self.session.get_providers()}")
print(f"✅ ONNX input: name={self.input_name}, shape={self.input_shape}")
def __repr__(self) -> str:
return (
f"ONNXRuntime(session={type(self.session).__name__}, "
f"providers={self.session.get_providers()})"
)
@staticmethod
def _safe_dim(value, default: int) -> int:
return value if isinstance(value, int) and value > 0 else default
def _remap_train_cls_ids(self, cls_ids: np.ndarray) -> np.ndarray:
idx = np.clip(cls_ids.astype(np.int64, copy=False), 0, len(self._train_cls_to_canonical) - 1)
return self._train_cls_to_canonical[idx]
def _letterbox(
self,
image: ndarray,
new_shape: tuple[int, int],
color=(114, 114, 114),
) -> tuple[ndarray, float, tuple[float, float]]:
h, w = image.shape[:2]
new_w, new_h = new_shape
ratio = min(new_w / w, new_h / h)
resized_w = int(round(w * ratio))
resized_h = int(round(h * ratio))
if (resized_w, resized_h) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
dw = new_w - resized_w
dh = new_h - resized_h
dw /= 2.0
dh /= 2.0
left = int(round(dw - 0.1))
right = int(round(dw + 0.1))
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
padded = cv2.copyMakeBorder(
image,
top,
bottom,
left,
right,
borderType=cv2.BORDER_CONSTANT,
value=color,
)
return padded, ratio, (dw, dh)
def _preprocess(
self, image: ndarray
) -> tuple[np.ndarray, float, tuple[float, float], tuple[int, int]]:
orig_h, orig_w = image.shape[:2]
img, ratio, pad = self._letterbox(
image, (self.input_width, self.input_height)
)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))[None, ...]
img = np.ascontiguousarray(img, dtype=np.float32)
return img, ratio, pad, (orig_w, orig_h)
@staticmethod
def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
w, h = image_size
boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
return boxes
@staticmethod
def _xywh_to_xyxy(boxes: np.ndarray) -> np.ndarray:
out = np.empty_like(boxes)
out[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0
out[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0
out[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0
out[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0
return out
@staticmethod
def _hard_nms(
boxes: np.ndarray,
scores: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
if len(boxes) == 0:
return np.array([], dtype=np.intp)
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(scores)[::-1]
keep = []
while len(order) > 0:
i = order[0]
keep.append(i)
if len(order) == 1:
break
rest = order[1:]
xx1 = np.maximum(boxes[i, 0], boxes[rest, 0])
yy1 = np.maximum(boxes[i, 1], boxes[rest, 1])
xx2 = np.minimum(boxes[i, 2], boxes[rest, 2])
yy2 = np.minimum(boxes[i, 3], boxes[rest, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = np.maximum(0.0, (boxes[i, 2] - boxes[i, 0])) * np.maximum(0.0, (boxes[i, 3] - boxes[i, 1]))
area_r = np.maximum(0.0, (boxes[rest, 2] - boxes[rest, 0])) * np.maximum(0.0, (boxes[rest, 3] - boxes[rest, 1]))
iou = inter / (area_i + area_r - inter + 1e-7)
order = rest[iou <= iou_thresh]
return np.array(keep, dtype=np.intp)
@classmethod
def _nms_per_class(
cls,
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
iou_thresh: float,
max_det: int,
) -> np.ndarray:
"""NMS within each class so overlapping car vs bus predictions are not merged away."""
if len(boxes) == 0:
return np.array([], dtype=np.intp)
keep_all: list[int] = []
for c in np.unique(cls_ids):
idxs = np.nonzero(cls_ids == c)[0]
if len(idxs) == 0:
continue
local_keep = cls._hard_nms(boxes[idxs], scores[idxs], iou_thresh)
keep_all.extend(idxs[local_keep].tolist())
keep_all = np.array(keep_all, dtype=np.intp)
order = np.argsort(scores[keep_all])[::-1]
return keep_all[order[:max_det]]
@staticmethod
def _box_iou_one_to_many(box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
xx1 = np.maximum(box[0], boxes[:, 0])
yy1 = np.maximum(box[1], boxes[:, 1])
xx2 = np.minimum(box[2], boxes[:, 2])
yy2 = np.minimum(box[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_a = max(0.0, (box[2] - box[0]) * (box[3] - box[1]))
area_b = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
return inter / (area_a + area_b - inter + 1e-7)
def _filter_sane_boxes(
self,
boxes: np.ndarray,
scores: np.ndarray,
cls_ids: np.ndarray,
orig_size: tuple[int, int],
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
if len(boxes) == 0:
return boxes, scores, cls_ids
orig_w, orig_h = orig_size
image_area = float(orig_w * orig_h)
keep = []
for i, box in enumerate(boxes):
x1, y1, x2, y2 = box.tolist()
bw = x2 - x1
bh = y2 - y1
if bw <= 0 or bh <= 0:
continue
if bw < self.min_w or bh < self.min_h:
continue
area = bw * bh
if area < self.min_box_area:
continue
if area > self.max_box_area_ratio * image_area:
continue
ar = max(bw / max(bh, 1e-6), bh / max(bw, 1e-6))
if ar > self.max_aspect_ratio:
continue
keep.append(i)
if not keep:
return (
np.empty((0, 4), dtype=np.float32),
np.empty((0,), dtype=np.float32),
np.empty((0,), dtype=np.int32),
)
keep = np.array(keep, dtype=np.intp)
return boxes[keep], scores[keep], cls_ids[keep]
def _decode_final_dets(
self,
preds: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
) -> list[BoundingBox]:
if preds.ndim == 3 and preds.shape[0] == 1:
preds = preds[0]
if preds.ndim != 2 or preds.shape[1] < 6:
raise ValueError(f"Unexpected ONNX final-det output shape: {preds.shape}")
boxes = preds[:, :4].astype(np.float32)
scores = preds[:, 4].astype(np.float32)
cls_ids = self._remap_train_cls_ids(preds[:, 5].astype(np.int32))
# All trained vehicle classes: bus, car, truck, motorcycle (see self.class_names).
# candidate threshold
keep = scores >= self.conf_thres
boxes = boxes[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes) == 0:
return []
pad_w, pad_h = pad
orig_w, orig_h = orig_size
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, (orig_w, orig_h))
boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size)
if len(boxes) == 0:
return []
keep_idx = self._nms_per_class(
boxes, scores, cls_ids, self.iou_thres, self.max_det
)
boxes = boxes[keep_idx]
scores = scores[keep_idx]
cls_ids = cls_ids[keep_idx]
return [
BoundingBox(
x1=int(math.floor(box[0])),
y1=int(math.floor(box[1])),
x2=int(math.ceil(box[2])),
y2=int(math.ceil(box[3])),
cls_id=int(cls_id),
conf=float(conf),
)
for box, conf, cls_id in zip(boxes, scores, cls_ids)
if box[2] > box[0] and box[3] > box[1]
]
def _decode_raw_yolo(
self,
preds: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
) -> list[BoundingBox]:
if preds.ndim != 3:
raise ValueError(f"Unexpected raw ONNX output shape: {preds.shape}")
if preds.shape[0] != 1:
raise ValueError(f"Unexpected batch dimension in raw output: {preds.shape}")
preds = preds[0]
# Normalize to [N, C]
if preds.shape[0] <= 16 and preds.shape[1] > preds.shape[0]:
preds = preds.T
if preds.ndim != 2 or preds.shape[1] < 5:
raise ValueError(f"Unexpected normalized raw output shape: {preds.shape}")
boxes_xywh = preds[:, :4].astype(np.float32)
tail = preds[:, 4:].astype(np.float32)
# Supports:
# [x,y,w,h,score] single-class
# [x,y,w,h,obj,cls] YOLO standard single-class
# [x,y,w,h,obj,cls1,cls2,...] multi-class
if tail.shape[1] == 1:
scores = tail[:, 0]
cls_ids = np.zeros(len(scores), dtype=np.int32)
elif tail.shape[1] == 2:
obj = tail[:, 0]
cls_prob = tail[:, 1]
scores = obj * cls_prob
cls_ids = np.zeros(len(scores), dtype=np.int32)
else:
obj = tail[:, 0]
class_probs = tail[:, 1:]
cls_ids = np.argmax(class_probs, axis=1).astype(np.int32)
cls_scores = class_probs[np.arange(len(class_probs)), cls_ids]
scores = obj * cls_scores
cls_ids = self._remap_train_cls_ids(cls_ids)
keep = scores >= self.conf_thres
boxes_xywh = boxes_xywh[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes_xywh) == 0:
return []
boxes = self._xywh_to_xyxy(boxes_xywh)
pad_w, pad_h = pad
orig_w, orig_h = orig_size
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, (orig_w, orig_h))
boxes, scores, cls_ids = self._filter_sane_boxes(boxes, scores, cls_ids, orig_size)
if len(boxes) == 0:
return []
keep_idx = self._nms_per_class(
boxes, scores, cls_ids, self.iou_thres, self.max_det
)
boxes = boxes[keep_idx]
scores = scores[keep_idx]
cls_ids = cls_ids[keep_idx]
return [
BoundingBox(
x1=int(math.floor(box[0])),
y1=int(math.floor(box[1])),
x2=int(math.ceil(box[2])),
y2=int(math.ceil(box[3])),
cls_id=int(cls_id),
conf=float(conf),
)
for box, conf, cls_id in zip(boxes, scores, cls_ids)
if box[2] > box[0] and box[3] > box[1]
]
def _postprocess(
self,
output: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
) -> list[BoundingBox]:
if output.ndim == 2 and output.shape[1] >= 6:
return self._decode_final_dets(output, ratio, pad, orig_size)
if output.ndim == 3 and output.shape[0] == 1 and output.shape[2] >= 6:
return self._decode_final_dets(output, ratio, pad, orig_size)
return self._decode_raw_yolo(output, ratio, pad, orig_size)
def _predict_single(self, image: np.ndarray) -> list[BoundingBox]:
if image is None:
raise ValueError("Input image is None")
if not isinstance(image, np.ndarray):
raise TypeError(f"Input is not numpy array: {type(image)}")
if image.ndim != 3:
raise ValueError(f"Expected HWC image, got shape={image.shape}")
if image.shape[0] <= 0 or image.shape[1] <= 0:
raise ValueError(f"Invalid image shape={image.shape}")
if image.shape[2] != 3:
raise ValueError(f"Expected 3 channels, got shape={image.shape}")
if image.dtype != np.uint8:
image = image.astype(np.uint8)
input_tensor, ratio, pad, orig_size = self._preprocess(image)
expected_shape = (1, 3, self.input_height, self.input_width)
if input_tensor.shape != expected_shape:
raise ValueError(
f"Bad input tensor shape={input_tensor.shape}, expected={expected_shape}"
)
outputs = self.session.run(self.output_names, {self.input_name: input_tensor})
det_output = outputs[0]
return self._postprocess(det_output, ratio, pad, orig_size)
def _merge_tta_consensus(
self,
boxes_orig: list[BoundingBox],
boxes_flip: list[BoundingBox],
) -> list[BoundingBox]:
"""
Keep:
- any box with conf >= conf_high
- low/medium-conf boxes only if confirmed across TTA views
Then run final hard NMS.
"""
if not boxes_orig and not boxes_flip:
return []
coords_o = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0, 4), dtype=np.float32)
scores_o = np.array([b.conf for b in boxes_orig], dtype=np.float32) if boxes_orig else np.empty((0,), dtype=np.float32)
cls_o = np.array([b.cls_id for b in boxes_orig], dtype=np.int32) if boxes_orig else np.empty((0,), dtype=np.int32)
coords_f = np.array([[b.x1, b.y1, b.x2, b.y2] for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0, 4), dtype=np.float32)
scores_f = np.array([b.conf for b in boxes_flip], dtype=np.float32) if boxes_flip else np.empty((0,), dtype=np.float32)
cls_f = np.array([b.cls_id for b in boxes_flip], dtype=np.int32) if boxes_flip else np.empty((0,), dtype=np.int32)
accepted_boxes = []
accepted_scores = []
accepted_cls = []
# Original view candidates
for i in range(len(coords_o)):
score = scores_o[i]
if score >= self.conf_high:
accepted_boxes.append(coords_o[i])
accepted_scores.append(score)
accepted_cls.append(int(cls_o[i]))
elif len(coords_f) > 0:
ious = self._box_iou_one_to_many(coords_o[i], coords_f)
j = int(np.argmax(ious))
if ious[j] >= self.tta_match_iou:
fused_score = max(score, scores_f[j])
accepted_boxes.append(coords_o[i])
accepted_scores.append(fused_score)
accepted_cls.append(int(cls_o[i]))
# Flipped-view high-confidence boxes that original missed
for i in range(len(coords_f)):
score = scores_f[i]
if score < self.conf_high:
continue
if len(coords_o) == 0:
accepted_boxes.append(coords_f[i])
accepted_scores.append(score)
accepted_cls.append(int(cls_f[i]))
continue
ious = self._box_iou_one_to_many(coords_f[i], coords_o)
if np.max(ious) < self.tta_match_iou:
accepted_boxes.append(coords_f[i])
accepted_scores.append(score)
accepted_cls.append(int(cls_f[i]))
if not accepted_boxes:
return []
boxes = np.array(accepted_boxes, dtype=np.float32)
scores = np.array(accepted_scores, dtype=np.float32)
cls_ids = np.array(accepted_cls, dtype=np.int32)
keep = self._nms_per_class(boxes, scores, cls_ids, self.iou_thres, self.max_det)
out = []
for idx in keep:
x1, y1, x2, y2 = boxes[idx].tolist()
out.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=int(cls_ids[idx]),
conf=float(scores[idx]),
)
)
return out
def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]:
boxes_orig = self._predict_single(image)
flipped = cv2.flip(image, 1)
boxes_flip_raw = self._predict_single(flipped)
w = image.shape[1]
boxes_flip = [
BoundingBox(
x1=w - b.x2,
y1=b.y1,
x2=w - b.x1,
y2=b.y2,
cls_id=b.cls_id,
conf=b.conf,
)
for b in boxes_flip_raw
]
return self._merge_tta_consensus(boxes_orig, boxes_flip)
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for frame_number_in_batch, image in enumerate(batch_images):
try:
if self.use_tta:
boxes = self._predict_tta(image)
else:
boxes = self._predict_single(image)
except Exception as e:
print(f"⚠️ Inference failed for frame {offset + frame_number_in_batch}: {e}")
boxes = []
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
return results