petrol006 / miner.py
alfred8995's picture
Update miner.py
6cf3810 verified
from pathlib import Path
import math
import cv2
import numpy as np
import onnxruntime as ort
from numpy import ndarray
from pydantic import BaseModel
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
SIZE = 1280
MODEL_CLASS_ORDER = ["petrol pump", "petrol hose", "roof canopy", "price board"]
class Miner:
def __init__(self, path_hf_repo: Path) -> None:
model_path = path_hf_repo / "weights.onnx"
cn_path = model_path.with_name("class_names.txt")
if cn_path.is_file():
lines = cn_path.read_text(encoding="utf-8").splitlines()
self.class_names = [
ln.strip()
for ln in lines
if ln.strip() and not ln.strip().startswith("#")
]
else:
self.class_names = ["person"]
self.model_class_order = MODEL_CLASS_ORDER
self.class_id_remap = self._build_class_id_remap(
self.model_class_order, self.class_names
)
if self.class_id_remap:
print("Class ID remap (model->class_names):", self.class_id_remap)
print("ORT version:", ort.__version__)
try:
ort.preload_dlls()
print("✅ onnxruntime.preload_dlls() success")
except Exception as e:
print(f"⚠️ preload_dlls failed: {e}")
print("ORT available providers BEFORE session:", ort.get_available_providers())
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
try:
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
print("✅ Created ORT session with preferred CUDA provider list")
except Exception as e:
print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}")
self.session = ort.InferenceSession(
str(model_path),
sess_options=sess_options,
providers=["CPUExecutionProvider"],
)
print("ORT session providers:", self.session.get_providers())
for inp in self.session.get_inputs():
print("INPUT:", inp.name, inp.shape, inp.type)
for out in self.session.get_outputs():
print("OUTPUT:", out.name, out.shape, out.type)
self.input_name = self.session.get_inputs()[0].name
self.output_names = [output.name for output in self.session.get_outputs()]
self.input_shape = self.session.get_inputs()[0].shape
self.input_height = self._safe_dim(self.input_shape[2], default=SIZE)
self.input_width = self._safe_dim(self.input_shape[3], default=SIZE)
self.conf_thres = 0.44
self.iou_thres = 0.6
self.max_det = 600
self.use_tta = True
print(f"✅ ONNX model loaded from: {model_path}")
print(f"✅ ONNX providers: {self.session.get_providers()}")
print(f"✅ ONNX input: name={self.input_name}, shape={self.input_shape}")
def __repr__(self) -> str:
return (
f"ONNXRuntime(session={type(self.session).__name__}, "
f"providers={self.session.get_providers()})"
)
@staticmethod
def _safe_dim(value, default: int) -> int:
return value if isinstance(value, int) and value > 0 else default
@staticmethod
def _build_class_id_remap(
model_order: list[str], target_order: list[str]
) -> dict[int, int]:
"""
Build class index remap from model native order -> class_names order.
"""
target_index = {name: idx for idx, name in enumerate(target_order)}
remap: dict[int, int] = {}
for model_idx, class_name in enumerate(model_order):
if class_name in target_index:
remap[model_idx] = target_index[class_name]
return remap
def _remap_cls_id(self, cls_id: int) -> int:
"""
Remap model class id to class_names id; keep original id if unknown.
"""
return self.class_id_remap.get(int(cls_id), int(cls_id))
def _letterbox(
self,
image: ndarray,
new_shape: tuple[int, int],
color=(114, 114, 114),
) -> tuple[ndarray, float, tuple[float, float]]:
"""
Resize with unchanged aspect ratio and pad to target shape.
Returns:
padded_image,
ratio,
(pad_w, pad_h) # half-padding
"""
h, w = image.shape[:2]
new_w, new_h = new_shape
ratio = min(new_w / w, new_h / h)
resized_w = int(round(w * ratio))
resized_h = int(round(h * ratio))
if (resized_w, resized_h) != (w, h):
interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR
image = cv2.resize(image, (resized_w, resized_h), interpolation=interp)
dw = new_w - resized_w
dh = new_h - resized_h
dw /= 2.0
dh /= 2.0
left = int(round(dw - 0.1))
right = int(round(dw + 0.1))
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
padded = cv2.copyMakeBorder(
image,
top,
bottom,
left,
right,
borderType=cv2.BORDER_CONSTANT,
value=color,
)
return padded, ratio, (dw, dh)
def _preprocess(
self, image: ndarray
) -> tuple[np.ndarray, float, tuple[float, float], tuple[int, int]]:
"""
Preprocess for fixed-size ONNX export:
- enhance image quality (CLAHE, denoise, sharpen)
- letterbox to model input size
- BGR -> RGB
- normalize to [0,1]
- HWC -> NCHW float32
"""
orig_h, orig_w = image.shape[:2]
img, ratio, pad = self._letterbox(
image, (self.input_width, self.input_height)
)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1))[None, ...]
img = np.ascontiguousarray(img, dtype=np.float32)
return img, ratio, pad, (orig_w, orig_h)
@staticmethod
def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray:
w, h = image_size
boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1)
boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1)
boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1)
boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1)
return boxes
@staticmethod
def _xywh_to_xyxy(boxes: np.ndarray) -> np.ndarray:
out = np.empty_like(boxes)
out[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0
out[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0
out[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0
out[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0
return out
def _soft_nms(
self,
boxes: np.ndarray,
scores: np.ndarray,
sigma: float = 0.5,
score_thresh: float = 0.01,
) -> tuple[np.ndarray, np.ndarray]:
"""
Soft-NMS: Gaussian decay of overlapping scores instead of hard removal.
Returns (kept_original_indices, updated_scores).
"""
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp), np.array([], dtype=np.float32)
boxes = boxes.astype(np.float32, copy=True)
scores = scores.astype(np.float32, copy=True)
order = np.arange(N)
for i in range(N):
max_pos = i + int(np.argmax(scores[i:]))
boxes[[i, max_pos]] = boxes[[max_pos, i]]
scores[[i, max_pos]] = scores[[max_pos, i]]
order[[i, max_pos]] = order[[max_pos, i]]
if i + 1 >= N:
break
xx1 = np.maximum(boxes[i, 0], boxes[i + 1:, 0])
yy1 = np.maximum(boxes[i, 1], boxes[i + 1:, 1])
xx2 = np.minimum(boxes[i, 2], boxes[i + 1:, 2])
yy2 = np.minimum(boxes[i, 3], boxes[i + 1:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = max(0.0, float(
(boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
))
areas_j = (
np.maximum(0.0, boxes[i + 1:, 2] - boxes[i + 1:, 0])
* np.maximum(0.0, boxes[i + 1:, 3] - boxes[i + 1:, 1])
)
iou = inter / (area_i + areas_j - inter + 1e-7)
scores[i + 1:] *= np.exp(-(iou ** 2) / sigma)
mask = scores > score_thresh
return order[mask], scores[mask]
@staticmethod
def _hard_nms(
boxes: np.ndarray,
scores: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
"""
Standard NMS: keep one box per overlapping cluster (the one with highest score).
Returns indices of kept boxes (into the boxes/scores arrays).
"""
N = len(boxes)
if N == 0:
return np.array([], dtype=np.intp)
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(scores)[::-1]
keep: list[int] = []
suppressed = np.zeros(N, dtype=bool)
for i in range(N):
idx = order[i]
if suppressed[idx]:
continue
keep.append(idx)
bi = boxes[idx]
for k in range(i + 1, N):
jdx = order[k]
if suppressed[jdx]:
continue
bj = boxes[jdx]
xx1 = max(bi[0], bj[0])
yy1 = max(bi[1], bj[1])
xx2 = min(bi[2], bj[2])
yy2 = min(bi[3], bj[3])
inter = max(0.0, xx2 - xx1) * max(0.0, yy2 - yy1)
area_i = (bi[2] - bi[0]) * (bi[3] - bi[1])
area_j = (bj[2] - bj[0]) * (bj[3] - bj[1])
iou = inter / (area_i + area_j - inter + 1e-7)
if iou > iou_thresh:
suppressed[jdx] = True
return np.array(keep)
@staticmethod
def _max_score_per_cluster(
coords: np.ndarray,
scores: np.ndarray,
keep_indices: np.ndarray,
iou_thresh: float,
) -> np.ndarray:
"""
For each kept box, return the max original score among itself and any
box that overlaps it with IOU >= iou_thresh (so TTA cluster keeps best conf).
"""
n_keep = len(keep_indices)
if n_keep == 0:
return np.array([], dtype=np.float32)
out = np.empty(n_keep, dtype=np.float32)
coords = np.asarray(coords, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
for i in range(n_keep):
idx = keep_indices[i]
bi = coords[idx]
xx1 = np.maximum(bi[0], coords[:, 0])
yy1 = np.maximum(bi[1], coords[:, 1])
xx2 = np.minimum(bi[2], coords[:, 2])
yy2 = np.minimum(bi[3], coords[:, 3])
inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1)
area_i = (bi[2] - bi[0]) * (bi[3] - bi[1])
areas_j = (coords[:, 2] - coords[:, 0]) * (coords[:, 3] - coords[:, 1])
iou = inter / (area_i + areas_j - inter + 1e-7)
in_cluster = iou >= iou_thresh
out[i] = float(np.max(scores[in_cluster]))
return out
def _decode_final_dets(
self,
preds: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
apply_optional_dedup: bool = False,
) -> list[BoundingBox]:
"""
Primary path:
expected output rows like [x1, y1, x2, y2, conf, cls_id]
in letterboxed input coordinates.
"""
if preds.ndim == 3 and preds.shape[0] == 1:
preds = preds[0]
if preds.ndim != 2 or preds.shape[1] < 6:
raise ValueError(f"Unexpected ONNX final-det output shape: {preds.shape}")
boxes = preds[:, :4].astype(np.float32)
scores = preds[:, 4].astype(np.float32)
cls_ids = preds[:, 5].astype(np.int32)
keep = scores >= self.conf_thres
boxes = boxes[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes) == 0:
return []
pad_w, pad_h = pad
orig_w, orig_h = orig_size
# reverse letterbox
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, (orig_w, orig_h))
if apply_optional_dedup and len(boxes) > 1:
keep_idx, scores = self._soft_nms(boxes, scores)
boxes = boxes[keep_idx]
cls_ids = cls_ids[keep_idx]
results: list[BoundingBox] = []
for box, conf, cls_id in zip(boxes, scores, cls_ids):
x1, y1, x2, y2 = box.tolist()
if x2 <= x1 or y2 <= y1:
continue
results.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=self._remap_cls_id(int(cls_id)),
conf=float(conf),
)
)
return results
def _decode_raw_yolo(
self,
preds: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
) -> list[BoundingBox]:
"""
Fallback path for raw YOLO predictions.
Supports common layouts:
- [1, C, N]
- [1, N, C]
"""
if preds.ndim != 3:
raise ValueError(f"Unexpected raw ONNX output shape: {preds.shape}")
if preds.shape[0] != 1:
raise ValueError(f"Unexpected batch dimension in raw output: {preds.shape}")
preds = preds[0]
# Normalize to [N, C]
if preds.shape[0] <= 16 and preds.shape[1] > preds.shape[0]:
preds = preds.T
if preds.ndim != 2 or preds.shape[1] < 5:
raise ValueError(f"Unexpected normalized raw output shape: {preds.shape}")
boxes_xywh = preds[:, :4].astype(np.float32)
cls_part = preds[:, 4:].astype(np.float32)
if cls_part.shape[1] == 1:
scores = cls_part[:, 0]
cls_ids = np.zeros(len(scores), dtype=np.int32)
else:
cls_ids = np.argmax(cls_part, axis=1).astype(np.int32)
scores = cls_part[np.arange(len(cls_part)), cls_ids]
keep = scores >= self.conf_thres
boxes_xywh = boxes_xywh[keep]
scores = scores[keep]
cls_ids = cls_ids[keep]
if len(boxes_xywh) == 0:
return []
boxes = self._xywh_to_xyxy(boxes_xywh)
keep_idx, scores = self._soft_nms(boxes, scores)
keep_idx = keep_idx[: self.max_det]
scores = scores[: self.max_det]
boxes = boxes[keep_idx]
cls_ids = cls_ids[keep_idx]
pad_w, pad_h = pad
orig_w, orig_h = orig_size
boxes[:, [0, 2]] -= pad_w
boxes[:, [1, 3]] -= pad_h
boxes /= ratio
boxes = self._clip_boxes(boxes, (orig_w, orig_h))
results: list[BoundingBox] = []
for box, conf, cls_id in zip(boxes, scores, cls_ids):
x1, y1, x2, y2 = box.tolist()
if x2 <= x1 or y2 <= y1:
continue
results.append(
BoundingBox(
x1=int(math.floor(x1)),
y1=int(math.floor(y1)),
x2=int(math.ceil(x2)),
y2=int(math.ceil(y2)),
cls_id=self._remap_cls_id(int(cls_id)),
conf=float(conf),
)
)
return results
def _postprocess(
self,
output: np.ndarray,
ratio: float,
pad: tuple[float, float],
orig_size: tuple[int, int],
) -> list[BoundingBox]:
"""
Prefer final detections first.
Fallback to raw decode only if needed.
"""
# final detections: [N,6]
if output.ndim == 2 and output.shape[1] >= 6:
return self._decode_final_dets(output, ratio, pad, orig_size)
# final detections: [1,N,6]
if output.ndim == 3 and output.shape[0] == 1 and output.shape[2] == 6:
return self._decode_final_dets(output, ratio, pad, orig_size)
# fallback raw decode
return self._decode_raw_yolo(output, ratio, pad, orig_size)
def _predict_single(self, image: np.ndarray) -> list[BoundingBox]:
if image is None:
raise ValueError("Input image is None")
if not isinstance(image, np.ndarray):
raise TypeError(f"Input is not numpy array: {type(image)}")
if image.ndim != 3:
raise ValueError(f"Expected HWC image, got shape={image.shape}")
if image.shape[0] <= 0 or image.shape[1] <= 0:
raise ValueError(f"Invalid image shape={image.shape}")
if image.shape[2] != 3:
raise ValueError(f"Expected 3 channels, got shape={image.shape}")
if image.dtype != np.uint8:
image = image.astype(np.uint8)
input_tensor, ratio, pad, orig_size = self._preprocess(image)
expected_shape = (1, 3, self.input_height, self.input_width)
if input_tensor.shape != expected_shape:
raise ValueError(
f"Bad input tensor shape={input_tensor.shape}, expected={expected_shape}"
)
outputs = self.session.run(self.output_names, {self.input_name: input_tensor})
det_output = outputs[0]
return self._postprocess(det_output, ratio, pad, orig_size)
def _predict_tta(self, image: np.ndarray) -> list[BoundingBox]:
"""Horizontal-flip TTA: merge original + flipped via hard NMS."""
boxes_orig = self._predict_single(image)
flipped = cv2.flip(image, 1)
boxes_flip = self._predict_single(flipped)
w = image.shape[1]
boxes_flip = [
BoundingBox(
x1=w - b.x2, y1=b.y1, x2=w - b.x1, y2=b.y2,
cls_id=b.cls_id, conf=b.conf,
)
for b in boxes_flip
]
all_boxes = boxes_orig + boxes_flip
if len(all_boxes) == 0:
return []
coords = np.array(
[[b.x1, b.y1, b.x2, b.y2] for b in all_boxes], dtype=np.float32
)
scores = np.array([b.conf for b in all_boxes], dtype=np.float32)
hard_keep = self._hard_nms(coords, scores, self.iou_thres)
if len(hard_keep) == 0:
return []
# _hard_nms already orders kept indices by descending score.
hard_keep = hard_keep[: self.max_det]
return [
BoundingBox(
x1=all_boxes[i].x1,
y1=all_boxes[i].y1,
x2=all_boxes[i].x2,
y2=all_boxes[i].y2,
cls_id=all_boxes[i].cls_id,
conf=float(scores[i]),
)
for i in hard_keep
]
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
results: list[TVFrameResult] = []
for frame_number_in_batch, image in enumerate(batch_images):
try:
if self.use_tta:
boxes = self._predict_tta(image)
else:
boxes = self._predict_single(image)
except Exception as e:
print(f"⚠️ Inference failed for frame {offset + frame_number_in_batch}: {e}")
boxes = []
# for box in boxes:
# if box.cls_id == 2:
# box.cls_id = 3
# elif box.cls_id == 3:
# box.cls_id = 2
results.append(
TVFrameResult(
frame_id=offset + frame_number_in_batch,
boxes=boxes,
keypoints=[(0, 0) for _ in range(max(0, int(n_keypoints)))],
)
)
return results
if __name__ == "__main__":
# Simple manual test: load weights.onnx, run on 1.png, and draw bboxes
repo_dir = Path(__file__).parent
miner = Miner(repo_dir)
image_path = repo_dir / "car1.png"
if not image_path.exists():
raise FileNotFoundError(f"Test image not found: {image_path}")
image = cv2.imread(str(image_path), cv2.IMREAD_COLOR)
if image is None:
raise RuntimeError(f"Failed to read image: {image_path}")
results = miner.predict_batch([image], offset=0, n_keypoints=0)
# Draw bounding boxes on a copy of the image
vis = image.copy()
colors = [(0, 255, 0), (0, 0, 255), (255, 0, 0)]
for frame in results:
print(f"Frame {frame.frame_id}:")
for i, box in enumerate(frame.boxes):
color = colors[i % len(colors)]
cv2.rectangle(
vis,
(box.x1, box.y1),
(box.x2, box.y2),
color,
2,
)
label = f"{box.cls_id }_{miner.class_names[box.cls_id] if box.cls_id < len(miner.class_names) else box.cls_id}:{box.conf:.2f}"
cv2.putText(
vis,
label,
(box.x1, max(0, box.y1 - 5)),
cv2.FONT_HERSHEY_SIMPLEX,
box.conf,
color,
1,
cv2.LINE_AA,
)
print(
f" cls={box.cls_id} conf={box.conf:.3f} "
f"box=({box.x1},{box.y1},{box.x2},{box.y2})"
)
print(len(frame.boxes))
out_path = repo_dir / f"1_out_iou{miner.iou_thres:.2f}.png"
cv2.imwrite(str(out_path), vis)
print(f"Saved visualization to: {out_path}")