numbers / miner.py
fitleech's picture
Upload folder using huggingface_hub
bca414f verified
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
SIZE = 1280
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
model_path = path_hf_repo / "weights.onnx"
cn_path = model_path.with_name("class_names.txt")
if cn_path.is_file():
lines = cn_path.read_text(encoding="utf-8").splitlines()
self.class_names = [
ln.strip()
for ln in lines
if ln.strip() and not ln.strip().startswith("#")
]
else:
self.class_names = ["numberplate"]
print("ORT version:", ort.__version__)
try:
ort.preload_dlls()
print("onnxruntime.preload_dlls() success")
except Exception as e:
print(f"preload_dlls failed: {e}")
print("ORT available providers BEFORE session:", ort.get_available_providers())
try:
import torch
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"GPU memory: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB")
else:
print("GPU: CUDA not available via torch")
except Exception as e:
print(f"GPU detection failed: {e}")
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
print("Created ORT session with preferred CUDA provider list")
except Exception as e:
print(f"CUDA session creation failed, falling back to CPU: {e}")
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
print("ORT session providers:", self.session.get_providers())
for inp in self.session.get_inputs():
print("INPUT:", inp.name, inp.shape, inp.type)
for out in self.session.get_outputs():
print("OUTPUT:", out.name, out.shape, out.type)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [o.name for o in self.session.get_outputs()]
self.input_shape = self.session.get_inputs()[0].shape
self.input_height = self._safe_dim(self.input_shape[2], default=SIZE)
self.input_width = self._safe_dim(self.input_shape[3], default=SIZE)
# Primary pass: alfred001 tuning (optimized for hermestech weights)
self.conf_thres = 0.23
self.iou_thres = 0.66
self.sigma = 0.465
self.max_det = 300
# Conditional tile-pass (trimmed for latency: no hflip, tighter sparse)
self.sparse_threshold = 3 # fire tiles only if primary returns < this
self.tile_conf = 0.57
self.tile_overlap = 0.20
self.novelty_iou = 0.10
self.final_max_det = 17
self.tile_use_hflip = False # skip hflip tile pass to save ~4 forwards
self.use_tta = True
print(f"ONNX model loaded from: {model_path}")
print(f"ONNX providers: {self.session.get_providers()}")
print(f"ONNX input: name={self.input_name}, shape={self.input_shape}")
def __repr__(self) -> str:
return (
f"ONNXRuntime(session={type(self.session).__name__}, "
f"providers={self.session.get_providers()})"
)
@staticmethod
def _safe_dim(value, default: int) -> int:
return value if isinstance(value, int) and value > 0 else default
# ---------- image preprocessing ----------
def _letterbox(
self,
image: ndarray,
new_shape: tuple[int, int],
color=(114, 114, 114),
) -> tuple[ndarray, float, tuple[float, float]]:
h, w = image.shape[:2]
new_w, new_h = new_shape
ratio = min(new_w / w, new_h / h)
resized_w = int(round(w * ratio))
resized_h = int(round(h * ratio))
if (resized_w, resized_h) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
dw = (new_w - resized_w) / 2.0
dh = (new_h - resized_h) / 2.0
left = int(round(dw - 0.1))
right = int(round(dw + 0.1))
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
padded = cv2.copyMakeBorder(
image, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=color,
)
return padded, ratio, (dw, dh)
def _preprocess(self, image: ndarray):
img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))[None, ...]
return np.ascontiguousarray(img, dtype=np.float32), ratio, pad
@staticmethod
def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
w, h = image_size
boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
return boxes
# ---------- NMS primitives ----------
@staticmethod
def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray:
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp)
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(-scores)
keep: list[int] = []
while len(order):
i = int(order[0])
keep.append(i)
if len(order) == 1:
break
rest = order[1:]
xx1 = np.maximum(boxes[i, 0], boxes[rest, 0])
yy1 = np.maximum(boxes[i, 1], boxes[rest, 1])
xx2 = np.minimum(boxes[i, 2], boxes[rest, 2])
yy2 = np.minimum(boxes[i, 3], boxes[rest, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
area_r = (boxes[rest, 2] - boxes[rest, 0]) * (boxes[rest, 3] - boxes[rest, 1])
iou = inter / (area_i + area_r - inter + 1e-7)
order = rest[iou <= iou_thresh]
return np.array(keep, dtype=np.intp)
def _soft_nms(
self,
boxes: np.ndarray,
scores: np.ndarray,
sigma: float,
score_thresh: float = 0.01,
) -> tuple[np.ndarray, np.ndarray]:
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp), np.array([], dtype=np.float32)
boxes = boxes.astype(np.float32, copy=True)
scores = scores.astype(np.float32, copy=True)
order = np.arange(N)
for i in range(N):
max_pos = i + int(np.argmax(scores[i:]))
boxes[[i, max_pos]] = boxes[[max_pos, i]]
scores[[i, max_pos]] = scores[[max_pos, i]]
order[[i, max_pos]] = order[[max_pos, i]]
if i + 1 >= N:
break
xx1 = np.maximum(boxes[i, 0], boxes[i + 1:, 0])
yy1 = np.maximum(boxes[i, 1], boxes[i + 1:, 1])
xx2 = np.minimum(boxes[i, 2], boxes[i + 1:, 2])
yy2 = np.minimum(boxes[i, 3], boxes[i + 1:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = float(
(boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
)
areas_j = (
np.maximum(0.0, boxes[i + 1:, 2] - boxes[i + 1:, 0])
* np.maximum(0.0, boxes[i + 1:, 3] - boxes[i + 1:, 1])
)
iou = inter / (area_i + areas_j - inter + 1e-7)
scores[i + 1:] *= np.exp(-(iou ** 2) / sigma)
mask = scores > score_thresh
return order[mask], scores[mask]
@staticmethod
def _box_iou_one_to_many(box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
if len(boxes) == 0:
return np.zeros(0, dtype=np.float32)
xx1 = np.maximum(box[0], boxes[:, 0])
yy1 = np.maximum(box[1], boxes[:, 1])
xx2 = np.minimum(box[2], boxes[:, 2])
yy2 = np.minimum(box[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_a = max(0.0, (box[2] - box[0]) * (box[3] - box[1]))
area_b = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
return inter / (area_a + area_b - inter + 1e-7)
# ---------- raw-dets helper ----------
def _raw_dets(self, image: ndarray, conf: float) -> np.ndarray:
"""Run a single forward pass and return [N, 5] dets in ORIGINAL image coords."""
x, ratio, (dw, dh) = self._preprocess(image)
out = self.session.run(self.output_names, {self.input_name: x})[0]
if out.ndim == 3:
out = out[0]
if out.shape[1] < 5:
return np.zeros((0, 5), dtype=np.float32)
boxes = out[:, :4].astype(np.float32)
scores = out[:, 4].astype(np.float32)
keep = scores >= conf
boxes, scores = boxes[keep], scores[keep]
if len(boxes) == 0:
return np.zeros((0, 5), dtype=np.float32)
boxes[:, [0, 2]] -= dw
boxes[:, [1, 3]] -= dh
boxes /= ratio
oh, ow = image.shape[:2]
boxes = self._clip_boxes(boxes, (ow, oh))
return np.concatenate([boxes, scores[:, None]], axis=1)
# ---------- primary pass: soft-NMS + hflip TTA ----------
def _primary(self, image: ndarray) -> np.ndarray:
d1 = self._raw_dets(image, self.conf_thres)
flipped = cv2.flip(image, 1)
d2 = self._raw_dets(flipped, self.conf_thres)
if len(d2):
w = image.shape[1]
x1 = w - d2[:, 2]
x2 = w - d2[:, 0]
d2 = np.stack([x1, d2[:, 1], x2, d2[:, 3], d2[:, 4]], axis=1)
all_d = np.concatenate([d1, d2], axis=0) if len(d2) else d1
if len(all_d) == 0:
return np.zeros((0, 5), dtype=np.float32)
# soft-NMS, then hard-NMS
keep_idx, scores = self._soft_nms(all_d[:, :4].copy(), all_d[:, 4].copy(), sigma=self.sigma)
if len(keep_idx) == 0:
return np.zeros((0, 5), dtype=np.float32)
merged = np.concatenate([all_d[keep_idx, :4], scores[:, None]], axis=1)
keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres)
merged = merged[keep]
if len(merged) > self.max_det:
merged = merged[np.argsort(-merged[:, 4])[: self.max_det]]
return merged
# ---------- conditional tile pass ----------
def _tile_augment(self, image: ndarray, primary: np.ndarray) -> np.ndarray:
"""Run 2x2 overlapping tiles + hflip, novelty-merge into primary."""
oh, ow = image.shape[:2]
tw, th = ow // 2, oh // 2
ox, oy = int(tw * self.tile_overlap), int(th * self.tile_overlap)
tiles = [
(0, 0, min(ow, tw + ox), min(oh, th + oy)),
(max(0, tw - ox), 0, ow, min(oh, th + oy)),
(0, max(0, th - oy), min(ow, tw + ox), oh),
(max(0, tw - ox), max(0, th - oy), ow, oh),
]
collected: list[np.ndarray] = []
for x1, y1, x2, y2 in tiles:
crop = image[y1:y2, x1:x2]
if crop.size == 0:
continue
d = self._raw_dets(crop, self.tile_conf)
if len(d):
d[:, 0] += x1
d[:, 1] += y1
d[:, 2] += x1
d[:, 3] += y1
collected.append(d)
# hflip tile pass (skipped when tile_use_hflip=False — saves 4 ONNX forwards)
if self.tile_use_hflip:
flipped = cv2.flip(image, 1)
for x1, y1, x2, y2 in tiles:
fx1 = ow - x2
fx2 = ow - x1
if fx2 <= fx1:
continue
crop = flipped[y1:y2, fx1:fx2]
if crop.size == 0:
continue
d = self._raw_dets(crop, self.tile_conf)
if len(d):
d_un = d.copy()
d_un[:, 0] = (ow - (d[:, 2] + fx1))
d_un[:, 2] = (ow - (d[:, 0] + fx1))
d_un[:, 1] = d[:, 1] + y1
d_un[:, 3] = d[:, 3] + y1
collected.append(d_un)
if not collected:
return primary
tile_dets = np.concatenate(collected, axis=0)
keep = self._hard_nms(tile_dets[:, :4], tile_dets[:, 4], 0.5)
tile_dets = tile_dets[keep]
# Novelty: drop tile boxes that overlap any primary box at IoU >= novelty_iou
if len(primary) > 0 and len(tile_dets) > 0:
mask = np.ones(len(tile_dets), dtype=bool)
for i in range(len(tile_dets)):
ious = self._box_iou_one_to_many(tile_dets[i, :4], primary[:, :4])
if len(ious) and np.max(ious) >= self.novelty_iou:
mask[i] = False
tile_dets = tile_dets[mask]
if len(tile_dets) == 0:
return primary
# Sanity filter: min/max size, aspect ratio
w = tile_dets[:, 2] - tile_dets[:, 0]
h = tile_dets[:, 3] - tile_dets[:, 1]
area = w * h
ar = np.maximum(w / np.maximum(h, 1e-6), h / np.maximum(w, 1e-6))
img_area = float(ow * oh)
ok = (w >= 7) & (h >= 7) & (area >= 85) & (area <= 0.5 * img_area) & (ar <= 10.0)
tile_dets = tile_dets[ok]
if len(tile_dets) == 0:
return primary
merged = np.concatenate([primary, tile_dets], axis=0)
keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres)
merged = merged[keep]
if len(merged) > self.final_max_det:
merged = merged[np.argsort(-merged[:, 4])[: self.final_max_det]]
return merged
# ---------- single-image predict ----------
def _predict_single(self, image: ndarray) -> list[BoundingBox]:
if image is None or not isinstance(image, np.ndarray) or image.ndim != 3:
return []
if image.shape[0] <= 0 or image.shape[1] <= 0 or image.shape[2] != 3:
return []
if image.dtype != np.uint8:
image = image.astype(np.uint8)
primary = self._primary(image)
if len(primary) < self.sparse_threshold:
dets = self._tile_augment(image, primary)
else:
dets = primary
results: list[BoundingBox] = []
for row in dets:
x1, y1, x2, y2, conf = row.tolist()
if x2 <= x1 or y2 <= y1:
continue
results.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=0,
conf=float(conf),
)
)
return results
# ---------- chute entrypoint ----------
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for frame_number_in_batch, image in enumerate(batch_images):
try:
boxes = self._predict_single(image)
except Exception as e:
print(f"Inference failed for frame {offset + frame_number_in_batch}: {e}")
boxes = []
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
return results