from pathlib import Path import math import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: """ ONNX-backed version of the petrol-tracking miner. This class is responsible for: - Loading the ONNX model via onnxruntime. - Running predictions on images. - Parsing ONNX outputs into structured results (TVFrameResult). It must have the following to be compatible with the chute: - be named `Miner` - have a `predict_batch` function with the inputs and outputs specified - be stored in a file called `miner.py` which lives in the root of the HFHub repo (rename/copy this file to `miner.py` before deploying) """ def __init__(self, path_hf_repo: Path) -> None: model_path = path_hf_repo / "weights.onnx" # Class order as exported from the training pt: must match model.names self.class_names = ["petrol hose", "petrol pump", "price board", "roof canopy"] print("ORT version:", ort.__version__) try: ort.preload_dlls() print("✅ onnxruntime.preload_dlls() success") except Exception as e: print(f"⚠️ preload_dlls failed: {e}") print("ORT available providers BEFORE session:", ort.get_available_providers()) sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL try: self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) print("✅ Created ORT session with preferred CUDA provider list") except Exception as e: print(f"⚠️ CUDA session creation failed, falling back to CPU: {e}") self.session = ort.InferenceSession( str(model_path), sess_options=sess_options, providers=["CPUExecutionProvider"], ) print("ORT session providers:", self.session.get_providers()) for inp in self.session.get_inputs(): print("INPUT:", inp.name, inp.shape, inp.type) for out in self.session.get_outputs(): print("OUTPUT:", out.name, out.shape, out.type) self.input_name = self.session.get_inputs()[0].name self.output_names = [output.name for output in self.session.get_outputs()] self.input_shape = self.session.get_inputs()[0].shape self.input_height = self._safe_dim(self.input_shape[2], default=640) self.input_width = self._safe_dim(self.input_shape[3], default=640) # Thresholds self.conf_thres = 0.4 self.iou_thres = 0.50 self.max_det = 300 print(f"✅ Petrol ONNX model loaded from: {model_path}") print(f"✅ ONNX providers: {self.session.get_providers()}") print(f"✅ ONNX input: name={self.input_name}, shape={self.input_shape}") def __repr__(self) -> str: return ( f"Petrol ONNXRuntime(session={type(self.session).__name__}, " f"providers={self.session.get_providers()})" ) @staticmethod def _safe_dim(value, default: int) -> int: return value if isinstance(value, int) and value > 0 else default def _letterbox( self, image: ndarray, new_shape: tuple[int, int], color=(114, 114, 114), ) -> tuple[ndarray, float, tuple[float, float]]: h, w = image.shape[:2] new_w, new_h = new_shape ratio = min(new_w / w, new_h / h) resized_w = int(round(w * ratio)) resized_h = int(round(h * ratio)) if (resized_w, resized_h) != (w, h): interp = cv2.INTER_CUBIC if ratio > 1.0 else cv2.INTER_LINEAR image = cv2.resize(image, (resized_w, resized_h), interpolation=interp) dw = new_w - resized_w dh = new_h - resized_h dw /= 2.0 dh /= 2.0 left = int(round(dw - 0.1)) right = int(round(dw + 0.1)) top = int(round(dh - 0.1)) bottom = int(round(dh + 0.1)) padded = cv2.copyMakeBorder( image, top, bottom, left, right, borderType=cv2.BORDER_CONSTANT, value=color, ) return padded, ratio, (dw, dh) def _preprocess( self, image: ndarray ) -> tuple[np.ndarray, float, tuple[float, float], tuple[int, int]]: orig_h, orig_w = image.shape[:2] img, ratio, pad = self._letterbox( image, (self.input_width, self.input_height) ) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(np.float32) / 255.0 img = np.transpose(img, (2, 0, 1))[None, ...] img = np.ascontiguousarray(img, dtype=np.float32) return img, ratio, pad, (orig_w, orig_h) @staticmethod def _clip_boxes(boxes: np.ndarray, image_size: tuple[int, int]) -> np.ndarray: w, h = image_size boxes[:, 0] = np.clip(boxes[:, 0], 0, w - 1) boxes[:, 1] = np.clip(boxes[:, 1], 0, h - 1) boxes[:, 2] = np.clip(boxes[:, 2], 0, w - 1) boxes[:, 3] = np.clip(boxes[:, 3], 0, h - 1) return boxes @staticmethod def _xywh_to_xyxy(boxes: np.ndarray) -> np.ndarray: out = np.empty_like(boxes) out[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.0 out[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.0 out[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.0 out[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.0 return out @staticmethod def _hard_nms( boxes: np.ndarray, scores: np.ndarray, iou_thresh: float, ) -> np.ndarray: if len(boxes) == 0: return np.array([], dtype=np.intp) boxes = np.asarray(boxes, dtype=np.float32) scores = np.asarray(scores, dtype=np.float32) order = np.argsort(scores)[::-1] keep = [] while len(order) > 0: i = order[0] keep.append(i) if len(order) == 1: break rest = order[1:] xx1 = np.maximum(boxes[i, 0], boxes[rest, 0]) yy1 = np.maximum(boxes[i, 1], boxes[rest, 1]) xx2 = np.minimum(boxes[i, 2], boxes[rest, 2]) yy2 = np.minimum(boxes[i, 3], boxes[rest, 3]) inter = np.maximum(0.0, xx2 - xx1) * np.maximum(0.0, yy2 - yy1) area_i = max(0.0, (boxes[i, 2] - boxes[i, 0])) * max(0.0, (boxes[i, 3] - boxes[i, 1])) area_r = np.maximum(0.0, boxes[rest, 2] - boxes[rest, 0]) * np.maximum(0.0, boxes[rest, 3] - boxes[rest, 1]) iou = inter / (area_i + area_r - inter + 1e-7) order = rest[iou <= iou_thresh] return np.array(keep, dtype=np.intp) @classmethod def _nms_per_class( cls, boxes: np.ndarray, scores: np.ndarray, cls_ids: np.ndarray, iou_thresh: float, max_det: int, ) -> np.ndarray: if len(boxes) == 0: return np.array([], dtype=np.intp) keep_all: list[int] = [] for c in np.unique(cls_ids): idxs = np.nonzero(cls_ids == c)[0] if len(idxs) == 0: continue local_keep = cls._hard_nms(boxes[idxs], scores[idxs], iou_thresh) keep_all.extend(idxs[local_keep].tolist()) keep_all_arr = np.array(keep_all, dtype=np.intp) order = np.argsort(scores[keep_all_arr])[::-1] return keep_all_arr[order[:max_det]] def _decode_yolov8( self, preds: np.ndarray, ratio: float, pad: tuple[float, float], orig_size: tuple[int, int], ) -> list[BoundingBox]: """ Decode a raw YOLOv8-style ONNX detection output. Expected shape: [1, 4 + nc, num_boxes] (no objectness channel). Some exporters emit [1, num_boxes, 4 + nc]; both are handled. """ if preds.ndim != 3 or preds.shape[0] != 1: raise ValueError(f"Unexpected ONNX output shape: {preds.shape}") preds = preds[0] # Normalize to [N, C] where C = 4 + nc nc = len(self.class_names) expected_c = 4 + nc if preds.shape[0] == expected_c: preds = preds.T elif preds.shape[1] != expected_c: # Fall back: treat smaller dim as channels if preds.shape[0] < preds.shape[1]: preds = preds.T if preds.ndim != 2 or preds.shape[1] < 5: raise ValueError(f"Unexpected normalized output shape: {preds.shape}") boxes_xywh = preds[:, :4].astype(np.float32) class_probs = preds[:, 4:].astype(np.float32) cls_ids = np.argmax(class_probs, axis=1).astype(np.int32) scores = class_probs[np.arange(len(class_probs)), cls_ids] keep = scores >= self.conf_thres boxes_xywh = boxes_xywh[keep] scores = scores[keep] cls_ids = cls_ids[keep] if len(boxes_xywh) == 0: return [] boxes = self._xywh_to_xyxy(boxes_xywh) pad_w, pad_h = pad orig_w, orig_h = orig_size boxes[:, [0, 2]] -= pad_w boxes[:, [1, 3]] -= pad_h boxes /= ratio boxes = self._clip_boxes(boxes, (orig_w, orig_h)) keep_idx = self._nms_per_class( boxes, scores, cls_ids, self.iou_thres, self.max_det ) boxes = boxes[keep_idx] scores = scores[keep_idx] cls_ids = cls_ids[keep_idx] return [ BoundingBox( x1=int(math.floor(box[0])), y1=int(math.floor(box[1])), x2=int(math.ceil(box[2])), y2=int(math.ceil(box[3])), cls_id=int(cls_id), conf=float(conf), ) for box, conf, cls_id in zip(boxes, scores, cls_ids) if box[2] > box[0] and box[3] > box[1] ] def _predict_single(self, image: np.ndarray) -> list[BoundingBox]: if image is None: raise ValueError("Input image is None") if not isinstance(image, np.ndarray): raise TypeError(f"Input is not numpy array: {type(image)}") if image.ndim != 3: raise ValueError(f"Expected HWC image, got shape={image.shape}") if image.shape[0] <= 0 or image.shape[1] <= 0: raise ValueError(f"Invalid image shape={image.shape}") if image.shape[2] != 3: raise ValueError(f"Expected 3 channels, got shape={image.shape}") if image.dtype != np.uint8: image = image.astype(np.uint8) input_tensor, ratio, pad, orig_size = self._preprocess(image) expected_shape = (1, 3, self.input_height, self.input_width) if input_tensor.shape != expected_shape: raise ValueError( f"Bad input tensor shape={input_tensor.shape}, expected={expected_shape}" ) outputs = self.session.run(self.output_names, {self.input_name: input_tensor}) det_output = outputs[0] return self._decode_yolov8(det_output, ratio, pad, orig_size) def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: """ Miner prediction for a batch of images using ONNX Runtime. The petrol detector is a plain object-detection model (no pose), so keypoints are returned as `n_keypoints` padding entries of (0, 0) to keep the TVFrameResult schema stable across challenge types. """ results: list[TVFrameResult] = [] n_kp = max(0, int(n_keypoints)) for frame_number_in_batch, image in enumerate(batch_images): frame_idx = offset + frame_number_in_batch try: boxes = self._predict_single(image) except Exception as e: print(f"⚠️ Inference failed for frame {frame_idx}: {e}") boxes = [] results.append( TVFrameResult( frame_id=frame_idx, boxes=boxes, keypoints=[(0, 0) for _ in range(n_kp)], ) ) print("✅ Petrol ONNX predictions complete") return results def main() -> None: """ Example runner for the ONNX Miner class. Loads `weights.onnx` from the current directory and runs `predict_batch` on one or more image files. Usage: python miner_onnx.py # dummy blank image python miner_onnx.py image1.jpg # single image python miner_onnx.py image1.jpg image2.jpg # batch of images """ import sys import numpy as np repo_path = Path(__file__).parent print(f"Loading miner from: {repo_path}") miner = Miner(path_hf_repo=repo_path) print(repr(miner)) batch_images: list[np.ndarray] = [] if len(sys.argv) > 1: for image_path in sys.argv[1:]: image = cv2.imread(image_path) if image is None: raise ValueError(f"Cannot read image: {image_path}") batch_images.append(image) print(f"Loaded {len(batch_images)} image(s)") else: batch_images = [np.zeros((640, 640, 3), dtype=np.uint8)] print("No image provided — running on a single blank dummy frame") results = miner.predict_batch( batch_images=batch_images, offset=0, n_keypoints=32, ) output_dir = repo_path / "predictions" output_dir.mkdir(exist_ok=True) class_names = {i: n for i, n in enumerate(miner.class_names)} def color_for_class(cls_id: int) -> tuple[int, int, int]: hue = (cls_id * 47) % 180 hsv = np.uint8([[[hue, 220, 255]]]) bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0, 0] return int(bgr[0]), int(bgr[1]), int(bgr[2]) for image, r in zip(batch_images, results): print( f"frame={r.frame_id} " f"boxes={len(r.boxes)} " f"keypoints={len(r.keypoints)}" ) vis = image.copy() for box in r.boxes: name = class_names.get(box.cls_id, str(box.cls_id)) color = color_for_class(box.cls_id) print( f" box cls={box.cls_id}({name}) conf={box.conf:.2f} " f"[{box.x1},{box.y1},{box.x2},{box.y2}]" ) cv2.rectangle(vis, (box.x1, box.y1), (box.x2, box.y2), color, 2) label = f"{name} {box.conf:.2f}" (tw, th), baseline = cv2.getTextSize( label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1 ) top = max(box.y1 - th - baseline, 0) cv2.rectangle( vis, (box.x1, top), (box.x1 + tw, top + th + baseline), color, -1 ) cv2.putText( vis, label, (box.x1, top + th), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA, ) for x, y in r.keypoints: if x == 0 and y == 0: continue cv2.circle(vis, (x, y), 3, (0, 0, 255), -1) out_path = output_dir / f"frame_{r.frame_id:04d}.jpg" cv2.imwrite(str(out_path), vis) print(f" saved: {out_path}") if __name__ == "__main__": main()