"""ONNX inference for car detection in aerial images.""" import base64 import os from pathlib import Path import cv2 import numpy as np import onnxruntime as ort _PROJECT_ROOT = Path(__file__).resolve().parent.parent MODEL_PATHS: dict[str, Path] = { "cars": Path( os.environ.get( "CAR_MODEL_PATH", str( _PROJECT_ROOT / "training" / "exported_models" / "inference_model.sim.onnx" ), ) ), "spots": Path( os.environ.get( "SPOT_MODEL_PATH", str(_PROJECT_ROOT / "training" / "spot_exported" / "inference_model.onnx"), ) ), } MODEL_CLASSES: dict[str, list[str]] = { "cars": ["car"], "spots": ["empty", "occupied"], } # Per-class colors in BGR _CLASS_COLORS = [ (0, 255, 0), # green — class 0 (0, 165, 255), # orange — class 1 (255, 0, 0), # blue — class 2 (0, 255, 255), # yellow — class 3 ] def load_model(model_path: Path) -> ort.InferenceSession: """Load the ONNX model and return an inference session.""" return ort.InferenceSession(str(model_path)) def get_resolution(session: ort.InferenceSession) -> int: """Read the expected input resolution from the model's input shape.""" shape = session.get_inputs()[0].shape # e.g. [1, 3, H, W] return int(shape[2]) def preprocess( image: np.ndarray, resolution: int ) -> tuple[np.ndarray, tuple[int, int]]: """Resize and normalize an image for ONNX inference. Returns the preprocessed tensor (1, 3, H, W) and original (h, w). """ orig_h, orig_w = image.shape[:2] resized = cv2.resize(image, (resolution, resolution)) # BGR -> RGB, HWC -> CHW, uint8 -> float32 [0,1] tensor = resized[:, :, ::-1].transpose(2, 0, 1).astype(np.float32) / 255.0 # ImageNet normalization mean = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3, 1, 1) std = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3, 1, 1) tensor = (tensor - mean) / std return np.expand_dims(tensor, axis=0), (orig_h, orig_w) def postprocess( outputs: dict[str, np.ndarray], orig_hw: tuple[int, int], threshold: float, class_names: list[str], ) -> list[dict]: """Convert ONNX outputs to a list of detection dicts. Each dict has keys: "bbox" (list[float] xyxy), "score" (float), "class_id" (int), "class_name" (str). RF-DETR uses per-class sigmoid (not softmax). Each logit column is an independent binary classifier — there is no "no-object" column. """ boxes = outputs["dets"].reshape(-1, 4) logits = outputs["labels"].reshape(boxes.shape[0], -1) num_classes = logits.shape[1] # Sigmoid per logit (independent binary classifiers) probs = 1.0 / (1.0 + np.exp(-logits)) # Best class per detection class_ids = probs.argmax(axis=1) scores = probs[np.arange(len(class_ids)), class_ids] # Normalized cxcywh -> pixel xyxy orig_h, orig_w = orig_hw cx, cy, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] xyxy = np.stack( [ (cx - w / 2) * orig_w, (cy - h / 2) * orig_h, (cx + w / 2) * orig_w, (cy + h / 2) * orig_h, ], axis=1, ) mask = scores >= threshold xyxy = xyxy[mask] scores = scores[mask] class_ids = class_ids[mask] return [ { "bbox": box.tolist(), "score": float(s), "class_id": int(cid), "class_name": class_names[cid] if cid < len(class_names) else str(cid), } for box, s, cid in zip(xyxy, scores, class_ids) ] def run_detection( session: ort.InferenceSession, image: np.ndarray, threshold: float = 0.5, class_names: list[str] | None = None, ) -> list[dict]: """Run full detection pipeline on a BGR image.""" if class_names is None: class_names = ["car"] input_name = session.get_inputs()[0].name output_names = [o.name for o in session.get_outputs()] resolution = get_resolution(session) tensor, orig_hw = preprocess(image, resolution) raw_outputs = session.run(output_names, {input_name: tensor}) outputs = dict(zip(output_names, raw_outputs)) return postprocess(outputs, orig_hw, threshold, class_names) def annotate_image(image: np.ndarray, detections: list[dict]) -> np.ndarray: """Draw bounding boxes and scores on the image.""" annotated = image.copy() for det in detections: class_id = det.get("class_id", 0) color = _CLASS_COLORS[class_id % len(_CLASS_COLORS)] class_name = det.get("class_name", "") x1, y1, x2, y2 = [int(v) for v in det["bbox"]] cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2) label = f'{class_name} {det["score"]:.2f}' (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(annotated, (x1, y1 - th - 6), (x1 + tw + 4, y1), color, -1) cv2.putText( annotated, label, (x1 + 2, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA, ) return annotated def image_to_data_uri(image: np.ndarray, quality: int = 85) -> str: """Encode a BGR image as a JPEG base64 data URI.""" _, buf = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, quality]) b64 = base64.b64encode(buf.tobytes()).decode("ascii") return f"data:image/jpeg;base64,{b64}"