Detect-Person / miner.py
meaculpitt's picture
conf_threshold 0.35→0.70: composite +5.9% (FP/img 1.45→0.33, mAP50 flat at 56%)
177bd16 verified
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
"""
Auto-generated by subnet_bridge from a Manako element repo.
This miner is intentionally self-contained for chute import restrictions.
"""
def __init__(self, path_hf_repo: Path) -> None:
self.path_hf_repo = path_hf_repo
self.class_names = ['person']
self.session = ort.InferenceSession(
str(path_hf_repo / "weights.onnx"),
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
self.input_name = self.session.get_inputs()[0].name
input_shape = self.session.get_inputs()[0].shape
# expected [N, C, H, W]
self.input_h = int(input_shape[2])
self.input_w = int(input_shape[3])
self.conf_threshold = 0.70 # sweep-optimised: max composite 0.65×mAP+0.35×FP_score
self.iou_threshold = 0.45
def __repr__(self) -> str:
return f"ONNX Miner session={type(self.session).__name__} classes={len(self.class_names)}"
def _preprocess(self, image_bgr: ndarray) -> tuple[np.ndarray, tuple[int, int]]:
h, w = image_bgr.shape[:2]
rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, (self.input_w, self.input_h))
x = resized.astype(np.float32) / 255.0
x = np.transpose(x, (2, 0, 1))[None, ...]
return x, (h, w)
def _normalize_predictions(self, raw: np.ndarray) -> np.ndarray:
# Common ultralytics export shapes:
# - [1, C, N] where C=4+num_classes
# - [1, N, C]
pred = raw[0]
if pred.ndim != 2:
raise ValueError(f"Unexpected prediction shape: {raw.shape}")
if pred.shape[0] < pred.shape[1]:
pred = pred.transpose(1, 0)
return pred
def _nms(self, dets: list[tuple[float, float, float, float, float, int]]) -> list[tuple[float, float, float, float, float, int]]:
if not dets:
return []
boxes = np.array([[d[0], d[1], d[2], d[3]] for d in dets], dtype=np.float32)
scores = np.array([d[4] for d in dets], dtype=np.float32)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
inter = w * h
area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
area_rest = (boxes[order[1:], 2] - boxes[order[1:], 0]) * (boxes[order[1:], 3] - boxes[order[1:], 1])
union = np.maximum(area_i + area_rest - inter, 1e-6)
iou = inter / union
remaining = np.where(iou <= self.iou_threshold)[0]
order = order[remaining + 1]
return [dets[idx] for idx in keep]
def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]:
inp, (orig_h, orig_w) = self._preprocess(image_bgr)
out = self.session.run(None, {self.input_name: inp})[0]
pred = self._normalize_predictions(out)
if pred.shape[1] < 5:
return []
boxes = pred[:, :4]
cls_scores = pred[:, 4:]
if cls_scores.shape[1] == 0:
return []
cls_ids = np.argmax(cls_scores, axis=1)
confs = np.max(cls_scores, axis=1)
keep = confs >= self.conf_threshold
boxes = boxes[keep]
confs = confs[keep]
cls_ids = cls_ids[keep]
if boxes.shape[0] == 0:
return []
sx = orig_w / float(self.input_w)
sy = orig_h / float(self.input_h)
dets: list[tuple[float, float, float, float, float, int]] = []
for i in range(boxes.shape[0]):
cx, cy, bw, bh = boxes[i].tolist()
x1 = (cx - bw / 2.0) * sx
y1 = (cy - bh / 2.0) * sy
x2 = (cx + bw / 2.0) * sx
y2 = (cy + bh / 2.0) * sy
dets.append((x1, y1, x2, y2, float(confs[i]), int(cls_ids[i])))
dets = self._nms(dets)
out_boxes: list[BoundingBox] = []
for x1, y1, x2, y2, conf, cls_id in dets:
ix1 = max(0, min(orig_w, math.floor(x1)))
iy1 = max(0, min(orig_h, math.floor(y1)))
ix2 = max(0, min(orig_w, math.ceil(x2)))
iy2 = max(0, min(orig_h, math.ceil(y2)))
out_boxes.append(
BoundingBox(
x1=ix1,
y1=iy1,
x2=ix2,
y2=iy2,
cls_id=cls_id,
conf=max(0.0, min(1.0, conf)),
)
)
return out_boxes
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for idx, image in enumerate(batch_images):
boxes = self._infer_single(image)
keypoints = [(0, 0) for _ in range(max(0, int(n_keypoints)))]
results.append(
TVFrameResult(
frame_id=offset + idx,
boxes=boxes,
keypoints=keypoints,
)
)
return results