iotaminer's picture
Initial clone with TEE config
8e7e067 verified
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
SIZE = 1280
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
model_path = path_hf_repo / "weights.onnx"
cn_path = model_path.with_name("class_names.txt")
if cn_path.is_file():
lines = cn_path.read_text(encoding="utf-8").splitlines()
self.class_names = [
ln.strip()
for ln in lines
if ln.strip() and not ln.strip().startswith("#")
]
else:
self.class_names = ["numberplate"]
print("ORT version:", ort.__version__)
try:
ort.preload_dlls()
print("onnxruntime.preload_dlls() success")
except Exception as e:
print(f"preload_dlls failed: {e}")
print("ORT available providers BEFORE session:", ort.get_available_providers())
try:
import torch
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"GPU memory: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB")
else:
print("GPU: CUDA not available via torch")
except Exception as e:
print(f"GPU detection failed: {e}")
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
print("Created ORT session with preferred CUDA provider list")
except Exception as e:
print(f"CUDA session creation failed, falling back to CPU: {e}")
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
print("ORT session providers:", self.session.get_providers())
for inp in self.session.get_inputs():
print("INPUT:", inp.name, inp.shape, inp.type)
for out in self.session.get_outputs():
print("OUTPUT:", out.name, out.shape, out.type)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [o.name for o in self.session.get_outputs()]
self.input_shape = self.session.get_inputs()[0].shape
self.input_height = self._safe_dim(self.input_shape[2], default=SIZE)
self.input_width = self._safe_dim(self.input_shape[3], default=SIZE)
# Primary pass: alfred001 tuning (optimized for hermestech weights)
self.conf_thres = 0.23
self.iou_thres = 0.66
self.sigma = 0.465
self.max_det = 300
# Conditional tile-pass (trimmed for latency: no hflip, tighter sparse)
self.sparse_threshold = 3 # fire tiles only if primary returns < this
self.tile_conf = 0.57
self.tile_overlap = 0.20
self.novelty_iou = 0.10
self.final_max_det = 17
self.tile_use_hflip = False # skip hflip tile pass to save ~4 forwards
self.use_tta = True
print(f"ONNX model loaded from: {model_path}")
print(f"ONNX providers: {self.session.get_providers()}")
print(f"ONNX input: name={self.input_name}, shape={self.input_shape}")
def __repr__(self) -> str:
return (
f"ONNXRuntime(session={type(self.session).__name__}, "
f"providers={self.session.get_providers()})"
)
@staticmethod
def _safe_dim(value, default: int) -> int:
return value if isinstance(value, int) and value > 0 else default
# ---------- image preprocessing ----------
def _letterbox(
self,
image: ndarray,
new_shape: tuple[int, int],
color=(114, 114, 114),
) -> tuple[ndarray, float, tuple[float, float]]:
h, w = image.shape[:2]
new_w, new_h = new_shape
ratio = min(new_w / w, new_h / h)
resized_w = int(round(w * ratio))
resized_h = int(round(h * ratio))
if (resized_w, resized_h) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
dw = (new_w - resized_w) / 2.0
dh = (new_h - resized_h) / 2.0
left = int(round(dw - 0.1))
right = int(round(dw + 0.1))
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
padded = cv2.copyMakeBorder(
image, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=color,
)
return padded, ratio, (dw, dh)
def _preprocess(self, image: ndarray):
img, ratio, pad = self._letterbox(image, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))[None, ...]
return np.ascontiguousarray(img, dtype=np.float32), ratio, pad
@staticmethod
def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
w, h = image_size
boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
return boxes
# ---------- NMS primitives ----------
@staticmethod
def _hard_nms(boxes: np.ndarray, scores: np.ndarray, iou_thresh: float) -> np.ndarray:
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp)
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(-scores)
keep: list[int] = []
while len(order):
i = int(order[0])
keep.append(i)
if len(order) == 1:
break
rest = order[1:]
xx1 = np.maximum(boxes[i, 0], boxes[rest, 0])
yy1 = np.maximum(boxes[i, 1], boxes[rest, 1])
xx2 = np.minimum(boxes[i, 2], boxes[rest, 2])
yy2 = np.minimum(boxes[i, 3], boxes[rest, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
area_r = (boxes[rest, 2] - boxes[rest, 0]) * (boxes[rest, 3] - boxes[rest, 1])
iou = inter / (area_i + area_r - inter + 1e-7)
order = rest[iou <= iou_thresh]
return np.array(keep, dtype=np.intp)
def _soft_nms(
self,
boxes: np.ndarray,
scores: np.ndarray,
sigma: float,
score_thresh: float = 0.01,
) -> tuple[np.ndarray, np.ndarray]:
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp), np.array([], dtype=np.float32)
boxes = boxes.astype(np.float32, copy=True)
scores = scores.astype(np.float32, copy=True)
order = np.arange(N)
for i in range(N):
max_pos = i + int(np.argmax(scores[i:]))
boxes[[i, max_pos]] = boxes[[max_pos, i]]
scores[[i, max_pos]] = scores[[max_pos, i]]
order[[i, max_pos]] = order[[max_pos, i]]
if i + 1 >= N:
break
xx1 = np.maximum(boxes[i, 0], boxes[i + 1:, 0])
yy1 = np.maximum(boxes[i, 1], boxes[i + 1:, 1])
xx2 = np.minimum(boxes[i, 2], boxes[i + 1:, 2])
yy2 = np.minimum(boxes[i, 3], boxes[i + 1:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = float(
(boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
)
areas_j = (
np.maximum(0.0, boxes[i + 1:, 2] - boxes[i + 1:, 0])
* np.maximum(0.0, boxes[i + 1:, 3] - boxes[i + 1:, 1])
)
iou = inter / (area_i + areas_j - inter + 1e-7)
scores[i + 1:] *= np.exp(-(iou ** 2) / sigma)
mask = scores > score_thresh
return order[mask], scores[mask]
@staticmethod
def _box_iou_one_to_many(box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
if len(boxes) == 0:
return np.zeros(0, dtype=np.float32)
xx1 = np.maximum(box[0], boxes[:, 0])
yy1 = np.maximum(box[1], boxes[:, 1])
xx2 = np.minimum(box[2], boxes[:, 2])
yy2 = np.minimum(box[3], boxes[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_a = max(0.0, (box[2] - box[0]) * (box[3] - box[1]))
area_b = np.maximum(0.0, boxes[:, 2] - boxes[:, 0]) * np.maximum(0.0, boxes[:, 3] - boxes[:, 1])
return inter / (area_a + area_b - inter + 1e-7)
# ---------- raw-dets helper ----------
def _raw_dets(self, image: ndarray, conf: float) -> np.ndarray:
"""Run a single forward pass and return [N, 5] dets in ORIGINAL image coords."""
x, ratio, (dw, dh) = self._preprocess(image)
out = self.session.run(self.output_names, {self.input_name: x})[0]
if out.ndim == 3:
out = out[0]
if out.shape[1] < 5:
return np.zeros((0, 5), dtype=np.float32)
boxes = out[:, :4].astype(np.float32)
scores = out[:, 4].astype(np.float32)
keep = scores >= conf
boxes, scores = boxes[keep], scores[keep]
if len(boxes) == 0:
return np.zeros((0, 5), dtype=np.float32)
boxes[:, [0, 2]] -= dw
boxes[:, [1, 3]] -= dh
boxes /= ratio
oh, ow = image.shape[:2]
boxes = self._clip_boxes(boxes, (ow, oh))
return np.concatenate([boxes, scores[:, None]], axis=1)
# ---------- primary pass: soft-NMS + hflip TTA ----------
def _primary(self, image: ndarray) -> np.ndarray:
d1 = self._raw_dets(image, self.conf_thres)
flipped = cv2.flip(image, 1)
d2 = self._raw_dets(flipped, self.conf_thres)
if len(d2):
w = image.shape[1]
x1 = w - d2[:, 2]
x2 = w - d2[:, 0]
d2 = np.stack([x1, d2[:, 1], x2, d2[:, 3], d2[:, 4]], axis=1)
all_d = np.concatenate([d1, d2], axis=0) if len(d2) else d1
if len(all_d) == 0:
return np.zeros((0, 5), dtype=np.float32)
# soft-NMS, then hard-NMS
keep_idx, scores = self._soft_nms(all_d[:, :4].copy(), all_d[:, 4].copy(), sigma=self.sigma)
if len(keep_idx) == 0:
return np.zeros((0, 5), dtype=np.float32)
merged = np.concatenate([all_d[keep_idx, :4], scores[:, None]], axis=1)
keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres)
merged = merged[keep]
if len(merged) > self.max_det:
merged = merged[np.argsort(-merged[:, 4])[: self.max_det]]
return merged
# ---------- conditional tile pass ----------
def _tile_augment(self, image: ndarray, primary: np.ndarray) -> np.ndarray:
"""Run 2x2 overlapping tiles + hflip, novelty-merge into primary."""
oh, ow = image.shape[:2]
tw, th = ow // 2, oh // 2
ox, oy = int(tw * self.tile_overlap), int(th * self.tile_overlap)
tiles = [
(0, 0, min(ow, tw + ox), min(oh, th + oy)),
(max(0, tw - ox), 0, ow, min(oh, th + oy)),
(0, max(0, th - oy), min(ow, tw + ox), oh),
(max(0, tw - ox), max(0, th - oy), ow, oh),
]
collected: list[np.ndarray] = []
for x1, y1, x2, y2 in tiles:
crop = image[y1:y2, x1:x2]
if crop.size == 0:
continue
d = self._raw_dets(crop, self.tile_conf)
if len(d):
d[:, 0] += x1
d[:, 1] += y1
d[:, 2] += x1
d[:, 3] += y1
collected.append(d)
# hflip tile pass (skipped when tile_use_hflip=False — saves 4 ONNX forwards)
if self.tile_use_hflip:
flipped = cv2.flip(image, 1)
for x1, y1, x2, y2 in tiles:
fx1 = ow - x2
fx2 = ow - x1
if fx2 <= fx1:
continue
crop = flipped[y1:y2, fx1:fx2]
if crop.size == 0:
continue
d = self._raw_dets(crop, self.tile_conf)
if len(d):
d_un = d.copy()
d_un[:, 0] = (ow - (d[:, 2] + fx1))
d_un[:, 2] = (ow - (d[:, 0] + fx1))
d_un[:, 1] = d[:, 1] + y1
d_un[:, 3] = d[:, 3] + y1
collected.append(d_un)
if not collected:
return primary
tile_dets = np.concatenate(collected, axis=0)
keep = self._hard_nms(tile_dets[:, :4], tile_dets[:, 4], 0.5)
tile_dets = tile_dets[keep]
# Novelty: drop tile boxes that overlap any primary box at IoU >= novelty_iou
if len(primary) > 0 and len(tile_dets) > 0:
mask = np.ones(len(tile_dets), dtype=bool)
for i in range(len(tile_dets)):
ious = self._box_iou_one_to_many(tile_dets[i, :4], primary[:, :4])
if len(ious) and np.max(ious) >= self.novelty_iou:
mask[i] = False
tile_dets = tile_dets[mask]
if len(tile_dets) == 0:
return primary
# Sanity filter: min/max size, aspect ratio
w = tile_dets[:, 2] - tile_dets[:, 0]
h = tile_dets[:, 3] - tile_dets[:, 1]
area = w * h
ar = np.maximum(w / np.maximum(h, 1e-6), h / np.maximum(w, 1e-6))
img_area = float(ow * oh)
ok = (w >= 7) & (h >= 7) & (area >= 85) & (area <= 0.5 * img_area) & (ar <= 10.0)
tile_dets = tile_dets[ok]
if len(tile_dets) == 0:
return primary
merged = np.concatenate([primary, tile_dets], axis=0)
keep = self._hard_nms(merged[:, :4], merged[:, 4], self.iou_thres)
merged = merged[keep]
if len(merged) > self.final_max_det:
merged = merged[np.argsort(-merged[:, 4])[: self.final_max_det]]
return merged
# ---------- single-image predict ----------
def _predict_single(self, image: ndarray) -> list[BoundingBox]:
if image is None or not isinstance(image, np.ndarray) or image.ndim != 3:
return []
if image.shape[0] <= 0 or image.shape[1] <= 0 or image.shape[2] != 3:
return []
if image.dtype != np.uint8:
image = image.astype(np.uint8)
primary = self._primary(image)
if len(primary) < self.sparse_threshold:
dets = self._tile_augment(image, primary)
else:
dets = primary
results: list[BoundingBox] = []
for row in dets:
x1, y1, x2, y2, conf = row.tolist()
if x2 <= x1 or y2 <= y1:
continue
results.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=0,
conf=float(conf),
)
)
return results
# ---------- chute entrypoint ----------
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for frame_number_in_batch, image in enumerate(batch_images):
try:
boxes = self._predict_single(image)
except Exception as e:
print(f"Inference failed for frame {offset + frame_number_in_batch}: {e}")
boxes = []
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
return results