from pathlib import Path import math import logging import cv2 import numpy as np import onnxruntime as ort from numpy import ndarray from pydantic import BaseModel logger = logging.getLogger(__name__) # ─── Petrol miner v1.1 ─────────────────────────────────────────────── # Improvements over auto-generated baseline: # 1. Fix end-to-end ONNX decode (model outputs [1,300,6] post-NMS) # 2. Spatial co-occurrence scoring (pump+canopy boost, isolated suppress) # 3. Geometric validation (aspect ratio + size checks per class) # ────────────────────────────────────────────────────────────────────── # Class IDs CLS_HOSE = 0 CLS_PUMP = 1 CLS_PRICEBOARD = 2 CLS_CANOPY = 3 # ── Geometric validation thresholds (derived from 2000-label analysis) ── # Canopy: wide/flat, aspect(w/h) mean=2.96. Suppress if aspect < 0.8 (too tall) CANOPY_MIN_ASPECT = 0.8 # Pump: roughly square/tall, aspect mean=0.91. Suppress if aspect > 4.0 (too wide) PUMP_MAX_ASPECT = 4.0 # Price board: small. Suppress if area > 15% of image PRICEBOARD_MAX_AREA_FRAC = 0.15 # Hose: variable. Suppress if area < 0.05% of image (tiny FP) HOSE_MIN_AREA_FRAC = 0.0005 # ── Spatial co-occurrence boost/suppress amounts ── COOCCUR_BOOST_PUMP_CANOPY = 0.05 COOCCUR_BOOST_PUMP_HOSE = 0.08 COOCCUR_BOOST_CANOPY_HOSE = 0.05 COOCCUR_SUPPRESS_ISOLATED = 0.03 # per missing expected neighbor # Proximity threshold: normalized distance between box centers COOCCUR_PROXIMITY = 0.5 # half of image dimension # ── Geometric suppress penalty ── GEOMETRIC_SUPPRESS_PENALTY = 0.10 class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: VERSION = "petrol-v1.1" def __init__(self, path_hf_repo: Path) -> None: self.path_hf_repo = path_hf_repo self.class_names = ['petrol hose', 'petrol pump', 'price board', 'roof canopy'] self.session = ort.InferenceSession( str(path_hf_repo / "weights.onnx"), providers=["CUDAExecutionProvider", "CPUExecutionProvider"], ) self.input_name = self.session.get_inputs()[0].name input_shape = self.session.get_inputs()[0].shape self.input_h = int(input_shape[2]) self.input_w = int(input_shape[3]) self.conf_threshold = 0.25 self.iou_threshold = 0.45 # Detect output format: end-to-end [1,N,6] vs raw [1,C,N] out_shape = self.session.get_outputs()[0].shape # End-to-end: [1, max_dets, 6] where max_dets is small (100-300) # Raw: [1, 4+nc, N] where N is large (8400+) if len(out_shape) == 3 and out_shape[2] == 6 and (out_shape[1] or 0) <= 1000: self._end2end = True logger.info("[init] End-to-end ONNX output detected") else: self._end2end = False logger.info("[init] Raw ONNX output detected") logger.info(f"[init] {self.VERSION} loaded, input={self.input_w}x{self.input_h}, " f"end2end={self._end2end}") def __repr__(self) -> str: return f"Petrol Miner {self.VERSION} end2end={self._end2end}" # ─── Preprocessing ──────────────────────────────────────────────── def _preprocess(self, image_bgr: ndarray) -> tuple[np.ndarray, tuple[int, int]]: h, w = image_bgr.shape[:2] rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) resized = cv2.resize(rgb, (self.input_w, self.input_h)) x = resized.astype(np.float32) / 255.0 x = np.transpose(x, (2, 0, 1))[None, ...] return x, (h, w) # ─── NMS (only needed for raw output format) ───────────────────── def _nms(self, dets): if not dets: return [] boxes = np.array([[d[0], d[1], d[2], d[3]] for d in dets], dtype=np.float32) scores = np.array([d[4] for d in dets], dtype=np.float32) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0]) yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1]) xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2]) yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3]) w = np.maximum(0.0, xx2 - xx1) h = np.maximum(0.0, yy2 - yy1) inter = w * h area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1]) area_rest = (boxes[order[1:], 2] - boxes[order[1:], 0]) * (boxes[order[1:], 3] - boxes[order[1:], 1]) union = np.maximum(area_i + area_rest - inter, 1e-6) iou = inter / union remaining = np.where(iou <= self.iou_threshold)[0] order = order[remaining + 1] return [dets[idx] for idx in keep] # ─── Decode: handles both end-to-end and raw formats ───────────── def _decode_end2end(self, out, orig_h, orig_w): """Decode end-to-end [1, N, 6] output: [x1,y1,x2,y2,conf,cls_id] in input coords.""" pred = out[0] # [N, 6] if pred.ndim != 2 or pred.shape[1] != 6: return [] confs = pred[:, 4] keep = confs >= self.conf_threshold pred = pred[keep] if pred.shape[0] == 0: return [] sx = orig_w / float(self.input_w) sy = orig_h / float(self.input_h) results = [] for i in range(pred.shape[0]): x1 = pred[i, 0] * sx y1 = pred[i, 1] * sy x2 = pred[i, 2] * sx y2 = pred[i, 3] * sy conf = float(pred[i, 4]) cls_id = int(pred[i, 5]) results.append((x1, y1, x2, y2, conf, cls_id)) return results def _decode_raw(self, out, orig_h, orig_w): """Decode raw [1, 4+nc, N] or [1, N, 4+nc] output.""" pred = out[0] if pred.ndim != 2: return [] if pred.shape[0] < pred.shape[1]: pred = pred.T if pred.shape[1] < 5: return [] boxes = pred[:, :4] cls_scores = pred[:, 4:] if cls_scores.shape[1] == 0: return [] cls_ids = np.argmax(cls_scores, axis=1) confs = np.max(cls_scores, axis=1) keep = confs >= self.conf_threshold boxes, confs, cls_ids = boxes[keep], confs[keep], cls_ids[keep] if boxes.shape[0] == 0: return [] sx = orig_w / float(self.input_w) sy = orig_h / float(self.input_h) dets = [] for i in range(boxes.shape[0]): cx, cy, bw, bh = boxes[i].tolist() x1 = (cx - bw / 2.0) * sx y1 = (cy - bh / 2.0) * sy x2 = (cx + bw / 2.0) * sx y2 = (cy + bh / 2.0) * sy dets.append((x1, y1, x2, y2, float(confs[i]), int(cls_ids[i]))) return self._nms(dets) # ─── Geometric validation ──────────────────────────────────────── def _geometric_validate(self, dets, orig_h, orig_w): """Suppress detections that fail basic geometric expectations. Returns list with penalties applied to conf. - Canopy: must be wide (aspect w/h >= 0.8) - Pump: must not be extremely wide (aspect w/h <= 4.0) - Price board: must be small (area <= 15% of image) - Hose: must not be tiny (area >= 0.05% of image) """ img_area = max(orig_h * orig_w, 1) result = [] for x1, y1, x2, y2, conf, cls_id in dets: bw = max(x2 - x1, 1) bh = max(y2 - y1, 1) aspect = bw / bh box_area = bw * bh area_frac = box_area / img_area penalty = 0.0 if cls_id == CLS_CANOPY: if aspect < CANOPY_MIN_ASPECT: penalty = GEOMETRIC_SUPPRESS_PENALTY elif cls_id == CLS_PUMP: if aspect > PUMP_MAX_ASPECT: penalty = GEOMETRIC_SUPPRESS_PENALTY elif cls_id == CLS_PRICEBOARD: if area_frac > PRICEBOARD_MAX_AREA_FRAC: penalty = GEOMETRIC_SUPPRESS_PENALTY elif cls_id == CLS_HOSE: if area_frac < HOSE_MIN_AREA_FRAC: penalty = GEOMETRIC_SUPPRESS_PENALTY new_conf = max(0.0, conf - penalty) if new_conf >= self.conf_threshold: result.append((x1, y1, x2, y2, new_conf, cls_id)) return result # ─── Spatial co-occurrence scoring ─────────────────────────────── def _spatial_cooccurrence(self, dets, orig_h, orig_w): """Adjust confidences based on spatial co-occurrence patterns. Boosts: - Pump near canopy: both get +0.05 - Pump near hose: hose gets +0.08 - Canopy near hose: hose gets +0.05 Suppresses: - Low-conf detection with no neighbors of expected class: -0.03 (except price boards, which are 91% solo in training data) """ if not dets: return dets n = len(dets) adjustments = [0.0] * n diag = math.sqrt(orig_h ** 2 + orig_w ** 2) prox = COOCCUR_PROXIMITY * diag # absolute pixel distance # Precompute centers centers = [] for x1, y1, x2, y2, conf, cls_id in dets: centers.append(((x1 + x2) / 2, (y1 + y2) / 2)) # Build per-class index cls_map = {} for i, (_, _, _, _, _, cls_id) in enumerate(dets): cls_map.setdefault(cls_id, []).append(i) def near(i, j): dx = centers[i][0] - centers[j][0] dy = centers[i][1] - centers[j][1] return math.sqrt(dx * dx + dy * dy) < prox # Pump + Canopy boost for pi in cls_map.get(CLS_PUMP, []): for ci in cls_map.get(CLS_CANOPY, []): if near(pi, ci): adjustments[pi] = max(adjustments[pi], COOCCUR_BOOST_PUMP_CANOPY) adjustments[ci] = max(adjustments[ci], COOCCUR_BOOST_PUMP_CANOPY) # Pump + Hose boost (hose gets larger boost) for pi in cls_map.get(CLS_PUMP, []): for hi in cls_map.get(CLS_HOSE, []): if near(pi, hi): adjustments[hi] = max(adjustments[hi], COOCCUR_BOOST_PUMP_HOSE) # Canopy + Hose boost for ci in cls_map.get(CLS_CANOPY, []): for hi in cls_map.get(CLS_HOSE, []): if near(ci, hi): adjustments[hi] = max(adjustments[hi], COOCCUR_BOOST_CANOPY_HOSE) # Suppress isolated low-confidence detections (not price boards) for i, (x1, y1, x2, y2, conf, cls_id) in enumerate(dets): if cls_id == CLS_PRICEBOARD: continue # price boards are often solo (91% in training) if conf > 0.60: continue # high confidence — don't suppress has_neighbor = False for j in range(n): if i == j: continue if near(i, j): has_neighbor = True break if not has_neighbor: adjustments[i] = min(adjustments[i], adjustments[i] - COOCCUR_SUPPRESS_ISOLATED) # Apply adjustments result = [] for i, (x1, y1, x2, y2, conf, cls_id) in enumerate(dets): new_conf = min(1.0, max(0.0, conf + adjustments[i])) if new_conf >= self.conf_threshold: result.append((x1, y1, x2, y2, new_conf, cls_id)) return result # ─── Main inference ────────────────────────────────────────────── def _infer_single(self, image_bgr: ndarray) -> list[BoundingBox]: inp, (orig_h, orig_w) = self._preprocess(image_bgr) out = self.session.run(None, {self.input_name: inp})[0] # Decode based on detected output format if self._end2end: dets = self._decode_end2end(out, orig_h, orig_w) else: dets = self._decode_raw(out, orig_h, orig_w) if not dets: return [] # Post-processing pipeline dets = self._geometric_validate(dets, orig_h, orig_w) dets = self._spatial_cooccurrence(dets, orig_h, orig_w) # Convert to BoundingBox out_boxes = [] for x1, y1, x2, y2, conf, cls_id in dets: ix1 = max(0, min(orig_w, math.floor(x1))) iy1 = max(0, min(orig_h, math.floor(y1))) ix2 = max(0, min(orig_w, math.ceil(x2))) iy2 = max(0, min(orig_h, math.ceil(y2))) out_boxes.append( BoundingBox( x1=ix1, y1=iy1, x2=ix2, y2=iy2, cls_id=cls_id, conf=max(0.0, min(1.0, conf)), ) ) return out_boxes def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: results = [] for idx, image in enumerate(batch_images): boxes = self._infer_single(image) keypoints = [(0, 0) for _ in range(max(0, int(n_keypoints)))] results.append( TVFrameResult( frame_id=offset + idx, boxes=boxes, keypoints=keypoints, ) ) return results