MTerryJack's picture
Add Manako element wrapper
0df6eea verified
# Auto-generated ONNX runner. This file is self-contained for a single model.
import json
import os
import sys
from typing import Any, Dict, List, Tuple
import cv2
import numpy as np
import onnxruntime as ort
from PIL import Image
def read_json(path: str) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def read_text_lines(path: str) -> List[str]:
with open(path, "r", encoding="utf-8") as f:
return [line.strip() for line in f.readlines() if line.strip()]
def load_environment(data_dir: str) -> Dict[str, Any]:
env_path = os.path.join(data_dir, "environment.json")
if not os.path.exists(env_path):
return {}
env = read_json(env_path)
preproc = env.get("PREPROCESSING")
if isinstance(preproc, str):
try:
env["PREPROCESSING"] = json.loads(preproc)
except json.JSONDecodeError:
env["PREPROCESSING"] = {}
return env
def load_class_names(data_dir: str, environment: Dict[str, Any]) -> List[str]:
class_path = os.path.join(data_dir, "class_names.txt")
if os.path.exists(class_path):
return read_text_lines(class_path)
class_map = environment.get("CLASS_MAP")
if isinstance(class_map, dict):
class_names = []
for i in range(len(class_map.keys())):
class_names.append(class_map[str(i)])
return class_names
return []
def load_keypoints_metadata(data_dir: str) -> List[Dict[str, Any]]:
meta_path = os.path.join(data_dir, "keypoints_metadata.json")
if not os.path.exists(meta_path):
return []
return read_json(meta_path)
def load_image(value: Any) -> Tuple[np.ndarray, bool]:
if isinstance(value, np.ndarray):
return value, True
if isinstance(value, Image.Image):
return np.asarray(value.convert("RGB")), False
if isinstance(value, (bytes, bytearray)):
image = cv2.imdecode(np.frombuffer(value, np.uint8), cv2.IMREAD_COLOR)
return image, True
if isinstance(value, str):
image = cv2.imread(value, cv2.IMREAD_COLOR)
if image is None:
raise ValueError(f"Could not read image: {value}")
return image, True
raise ValueError(f"Unsupported image input type: {type(value)}")
def static_crop_should_be_applied(preprocessing_config: dict) -> bool:
cfg = preprocessing_config.get("static-crop")
return bool(cfg and cfg.get("enabled"))
def take_static_crop(image: np.ndarray, crop_parameters: Dict[str, int]) -> np.ndarray:
height, width = image.shape[:2]
x_min = int(crop_parameters["x_min"] / 100 * width)
y_min = int(crop_parameters["y_min"] / 100 * height)
x_max = int(crop_parameters["x_max"] / 100 * width)
y_max = int(crop_parameters["y_max"] / 100 * height)
return image[y_min:y_max, x_min:x_max, :]
def apply_grayscale_conversion(image: np.ndarray) -> np.ndarray:
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
def apply_contrast_stretching(image: np.ndarray) -> np.ndarray:
p2, p98 = np.percentile(image, (2, 98))
image = np.clip(image, p2, p98)
if p98 - p2 > 0:
image = (image - p2) * (255.0 / (p98 - p2))
return image.astype(np.uint8)
def apply_histogram_equalisation(image: np.ndarray) -> np.ndarray:
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
image = cv2.equalizeHist(image)
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
def apply_adaptive_equalisation(image: np.ndarray) -> np.ndarray:
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(clipLimit=0.03, tileGridSize=(8, 8))
image = clahe.apply(image)
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
def apply_preproc(image: np.ndarray, preproc: Dict[str, Any]) -> Tuple[np.ndarray, Tuple[int, int]]:
h, w = image.shape[:2]
img_dims = (h, w)
if static_crop_should_be_applied(preproc):
image = take_static_crop(image, preproc["static-crop"])
if preproc.get("contrast", {}).get("enabled"):
ctype = preproc.get("contrast", {}).get("type")
if ctype == "Contrast Stretching":
image = apply_contrast_stretching(image)
elif ctype == "Histogram Equalization":
image = apply_histogram_equalisation(image)
elif ctype == "Adaptive Equalization":
image = apply_adaptive_equalisation(image)
if preproc.get("grayscale", {}).get("enabled"):
image = apply_grayscale_conversion(image)
return image, img_dims
def resize_image_keeping_aspect_ratio(image: np.ndarray, desired_size: Tuple[int, int]) -> np.ndarray:
height, width = image.shape[:2]
ratio = min(desired_size[1] / height, desired_size[0] / width)
new_width = int(width * ratio)
new_height = int(height * ratio)
return cv2.resize(image, (new_width, new_height))
def letterbox_image(image: np.ndarray, desired_size: Tuple[int, int], color: Tuple[int, int, int]) -> np.ndarray:
resized = resize_image_keeping_aspect_ratio(image, desired_size)
new_height, new_width = resized.shape[:2]
top = (desired_size[1] - new_height) // 2
bottom = desired_size[1] - new_height - top
left = (desired_size[0] - new_width) // 2
right = desired_size[0] - new_width - left
return cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
def get_resize_method(preproc: Dict[str, Any]) -> str:
resize = preproc.get("resize")
if not resize:
return "Stretch to"
method = resize.get("format", "Stretch to")
if method in {"Fit (reflect edges) in", "Fit within", "Fill (with center crop) in"}:
return "Fit (black edges) in"
if method not in {"Stretch to", "Fit (black edges) in", "Fit (white edges) in", "Fit (grey edges) in"}:
return "Stretch to"
return method
def preprocess_image(image: Any, preproc: Dict[str, Any], input_hw: Tuple[int, int]) -> Tuple[np.ndarray, Tuple[int, int]]:
np_image, is_bgr = load_image(image)
processed, img_dims = apply_preproc(np_image, preproc)
resize_method = get_resize_method(preproc)
h, w = input_hw
if resize_method == "Stretch to":
resized = cv2.resize(processed, (w, h))
elif resize_method == "Fit (white edges) in":
resized = letterbox_image(processed, (w, h), (255, 255, 255))
elif resize_method == "Fit (grey edges) in":
resized = letterbox_image(processed, (w, h), (114, 114, 114))
else:
resized = letterbox_image(processed, (w, h), (0, 0, 0))
if is_bgr:
resized = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
img_in = resized.astype(np.float32)
img_in = np.transpose(img_in, (2, 0, 1))
img_in = np.expand_dims(img_in, axis=0)
return img_in, img_dims
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-x))
def non_max_suppression_fast(boxes: np.ndarray, overlap_thresh: float) -> List[np.ndarray]:
if len(boxes) == 0:
return []
if boxes.dtype.kind == "i":
boxes = boxes.astype("float")
pick = []
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
conf = boxes[:, 4]
area = (x2 - x1 + 1) * (y2 - y1 + 1)
idxs = np.argsort(conf)
while len(idxs) > 0:
last = len(idxs) - 1
i = idxs[last]
pick.append(i)
xx1 = np.maximum(x1[i], x1[idxs[:last]])
yy1 = np.maximum(y1[i], y1[idxs[:last]])
xx2 = np.minimum(x2[i], x2[idxs[:last]])
yy2 = np.minimum(y2[i], y2[idxs[:last]])
w = np.maximum(0, xx2 - xx1 + 1)
h = np.maximum(0, yy2 - yy1 + 1)
overlap = (w * h) / area[idxs[:last]]
idxs = np.delete(idxs, np.concatenate(([last], np.where(overlap > overlap_thresh)[0])))
return boxes[pick].astype("float")
def w_np_non_max_suppression(
prediction: np.ndarray,
conf_thresh: float = 0.25,
iou_thresh: float = 0.45,
class_agnostic: bool = False,
max_detections: int = 300,
max_candidate_detections: int = 3000,
num_masks: int = 0,
box_format: str = "xywh",
):
num_classes = prediction.shape[2] - 5 - num_masks
if box_format == "xywh":
pred_view = prediction[:, :, :4]
x1 = pred_view[:, :, 0] - pred_view[:, :, 2] / 2
y1 = pred_view[:, :, 1] - pred_view[:, :, 3] / 2
x2 = pred_view[:, :, 0] + pred_view[:, :, 2] / 2
y2 = pred_view[:, :, 1] + pred_view[:, :, 3] / 2
pred_view[:, :, 0] = x1
pred_view[:, :, 1] = y1
pred_view[:, :, 2] = x2
pred_view[:, :, 3] = y2
elif box_format != "xyxy":
raise ValueError(f"box_format must be 'xywh' or 'xyxy', got {box_format}")
batch_predictions = []
for np_image_pred in prediction:
np_conf_mask = np_image_pred[:, 4] >= conf_thresh
if not np.any(np_conf_mask):
batch_predictions.append([])
continue
np_image_pred = np_image_pred[np_conf_mask]
if np_image_pred.shape[0] == 0:
batch_predictions.append([])
continue
cls_confs = np_image_pred[:, 5 : num_classes + 5]
if cls_confs.shape[1] == 0:
batch_predictions.append([])
continue
np_class_conf = np.max(cls_confs, axis=1, keepdims=True)
np_class_pred = np.argmax(cls_confs, axis=1, keepdims=True)
if num_masks > 0:
np_mask_pred = np_image_pred[:, 5 + num_classes :]
np_detections = np.concatenate(
[
np_image_pred[:, :5],
np_class_conf,
np_class_pred.astype(np.float32),
np_mask_pred,
],
axis=1,
)
else:
np_detections = np.concatenate(
[np_image_pred[:, :5], np_class_conf, np_class_pred.astype(np.float32)],
axis=1,
)
filtered_predictions = []
if class_agnostic:
sorted_indices = np.argsort(-np_detections[:, 4])
np_detections_sorted = np_detections[sorted_indices]
filtered_predictions.extend(non_max_suppression_fast(np_detections_sorted, iou_thresh))
else:
np_unique_labels = np.unique(np_class_pred)
for c in np_unique_labels:
class_mask = np.atleast_1d(np_class_pred.squeeze() == c)
np_detections_class = np_detections[class_mask]
if np_detections_class.shape[0] == 0:
continue
sorted_indices = np.argsort(-np_detections_class[:, 4])
np_detections_sorted = np_detections_class[sorted_indices]
filtered_predictions.extend(non_max_suppression_fast(np_detections_sorted, iou_thresh))
if filtered_predictions:
filtered_np = np.array(filtered_predictions)
idx = np.argsort(-filtered_np[:, 4])
filtered_np = filtered_np[idx]
if len(filtered_np) > max_detections:
filtered_np = filtered_np[:max_detections]
batch_predictions.append(list(filtered_np))
else:
batch_predictions.append([])
return batch_predictions
def get_static_crop_dimensions(orig_shape: Tuple[int, int], preproc: dict) -> Tuple[Tuple[int, int], Tuple[int, int]]:
if not static_crop_should_be_applied(preproc):
return (0, 0), orig_shape
crop = preproc["static-crop"]
x_min, y_min, x_max, y_max = (crop[k] / 100.0 for k in ["x_min", "y_min", "x_max", "y_max"])
crop_shift_x, crop_shift_y = (round(x_min * orig_shape[1]), round(y_min * orig_shape[0]))
cropped_percent_x = x_max - x_min
cropped_percent_y = y_max - y_min
new_shape = (round(orig_shape[0] * cropped_percent_y), round(orig_shape[1] * cropped_percent_x))
return (crop_shift_x, crop_shift_y), new_shape
def post_process_bboxes(
predictions: List[List[List[float]]],
infer_shape: Tuple[int, int],
img_dims: List[Tuple[int, int]],
preproc: dict,
resize_method: str,
) -> List[List[List[float]]]:
scaled_predictions = []
for i, batch_predictions in enumerate(predictions):
if len(batch_predictions) == 0:
scaled_predictions.append([])
continue
np_batch_predictions = np.array(batch_predictions)
predicted_bboxes = np_batch_predictions[:, :4]
(crop_shift_x, crop_shift_y), origin_shape = get_static_crop_dimensions(img_dims[i], preproc)
if resize_method == "Stretch to":
scale_height = origin_shape[0] / infer_shape[0]
scale_width = origin_shape[1] / infer_shape[1]
predicted_bboxes[:, 0] *= scale_width
predicted_bboxes[:, 2] *= scale_width
predicted_bboxes[:, 1] *= scale_height
predicted_bboxes[:, 3] *= scale_height
else:
scale = min(infer_shape[0] / origin_shape[0], infer_shape[1] / origin_shape[1])
inter_h = round(origin_shape[0] * scale)
inter_w = round(origin_shape[1] * scale)
pad_x = (infer_shape[1] - inter_w) / 2
pad_y = (infer_shape[0] - inter_h) / 2
predicted_bboxes[:, 0] -= pad_x
predicted_bboxes[:, 2] -= pad_x
predicted_bboxes[:, 1] -= pad_y
predicted_bboxes[:, 3] -= pad_y
predicted_bboxes /= scale
predicted_bboxes[:, 0] = np.round(np.clip(predicted_bboxes[:, 0], 0, origin_shape[1]))
predicted_bboxes[:, 2] = np.round(np.clip(predicted_bboxes[:, 2], 0, origin_shape[1]))
predicted_bboxes[:, 1] = np.round(np.clip(predicted_bboxes[:, 1], 0, origin_shape[0]))
predicted_bboxes[:, 3] = np.round(np.clip(predicted_bboxes[:, 3], 0, origin_shape[0]))
predicted_bboxes[:, 0] += crop_shift_x
predicted_bboxes[:, 2] += crop_shift_x
predicted_bboxes[:, 1] += crop_shift_y
predicted_bboxes[:, 3] += crop_shift_y
np_batch_predictions[:, :4] = predicted_bboxes
scaled_predictions.append(np_batch_predictions.tolist())
return scaled_predictions
def post_process_keypoints(
predictions: List[List[List[float]]],
keypoints_start_index: int,
infer_shape: Tuple[int, int],
img_dims: List[Tuple[int, int]],
preproc: dict,
resize_method: str,
) -> List[List[List[float]]]:
scaled_predictions = []
for i, batch_predictions in enumerate(predictions):
if len(batch_predictions) == 0:
scaled_predictions.append([])
continue
np_batch_predictions = np.array(batch_predictions)
keypoints = np_batch_predictions[:, keypoints_start_index:]
(crop_shift_x, crop_shift_y), origin_shape = get_static_crop_dimensions(img_dims[i], preproc)
if resize_method == "Stretch to":
scale_width = origin_shape[1] / infer_shape[1]
scale_height = origin_shape[0] / infer_shape[0]
for k in range(keypoints.shape[1] // 3):
keypoints[:, k * 3] *= scale_width
keypoints[:, k * 3 + 1] *= scale_height
else:
scale = min(infer_shape[0] / origin_shape[0], infer_shape[1] / origin_shape[1])
inter_w = int(origin_shape[1] * scale)
inter_h = int(origin_shape[0] * scale)
pad_x = (infer_shape[1] - inter_w) / 2
pad_y = (infer_shape[0] - inter_h) / 2
for k in range(keypoints.shape[1] // 3):
keypoints[:, k * 3] -= pad_x
keypoints[:, k * 3] /= scale
keypoints[:, k * 3 + 1] -= pad_y
keypoints[:, k * 3 + 1] /= scale
for k in range(keypoints.shape[1] // 3):
keypoints[:, k * 3] = np.round(np.clip(keypoints[:, k * 3], 0, origin_shape[1]))
keypoints[:, k * 3 + 1] = np.round(np.clip(keypoints[:, k * 3 + 1], 0, origin_shape[0]))
keypoints[:, k * 3] += crop_shift_x
keypoints[:, k * 3 + 1] += crop_shift_y
np_batch_predictions[:, keypoints_start_index:] = keypoints
scaled_predictions.append(np_batch_predictions.tolist())
return scaled_predictions
def masks2poly(masks: np.ndarray) -> List[np.ndarray]:
segments = []
for mask in masks:
if mask.dtype == np.bool_:
m_uint8 = mask
if not m_uint8.flags.c_contiguous:
m_uint8 = np.ascontiguousarray(m_uint8)
m_uint8 = m_uint8.view(np.uint8)
elif mask.dtype == np.uint8:
m_uint8 = mask if mask.flags.c_contiguous else np.ascontiguousarray(mask)
else:
m_bool = mask > 0
if not m_bool.flags.c_contiguous:
m_bool = np.ascontiguousarray(m_bool)
m_uint8 = m_bool.view(np.uint8)
if not np.any(m_uint8):
segments.append(np.zeros((0, 2), dtype=np.float32))
continue
contours = cv2.findContours(m_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0]
if contours:
contours = np.array(contours[np.array([len(x) for x in contours]).argmax()]).reshape(-1, 2)
else:
contours = np.zeros((0, 2))
segments.append(contours.astype("float32"))
return segments
def post_process_polygons(
origin_shape: Tuple[int, int],
polys: List[List[Tuple[float, float]]],
infer_shape: Tuple[int, int],
preproc: dict,
resize_method: str,
) -> List[List[Tuple[float, float]]]:
(crop_shift_x, crop_shift_y), origin_shape = get_static_crop_dimensions(origin_shape, preproc)
new_polys = []
if resize_method == "Stretch to":
width_ratio = origin_shape[1] / infer_shape[1]
height_ratio = origin_shape[0] / infer_shape[0]
for poly in polys:
new_polys.append([(p[0] * width_ratio, p[1] * height_ratio) for p in poly])
else:
scale = min(infer_shape[0] / origin_shape[0], infer_shape[1] / origin_shape[1])
inter_w = int(origin_shape[1] * scale)
inter_h = int(origin_shape[0] * scale)
pad_x = (infer_shape[1] - inter_w) / 2
pad_y = (infer_shape[0] - inter_h) / 2
for poly in polys:
new_polys.append([((p[0] - pad_x) / scale, (p[1] - pad_y) / scale) for p in poly])
shifted_polys = []
for poly in new_polys:
shifted_polys.append([(p[0] + crop_shift_x, p[1] + crop_shift_y) for p in poly])
return shifted_polys
def preprocess_segmentation_masks(protos: np.ndarray, masks_in: np.ndarray, shape: Tuple[int, int]) -> np.ndarray:
c, mh, mw = protos.shape
masks = protos.astype(np.float32)
masks = masks.reshape((c, -1))
masks = masks_in @ masks
masks = sigmoid(masks)
masks = masks.reshape((-1, mh, mw))
gain = min(mh / shape[0], mw / shape[1])
pad = (mw - shape[1] * gain) / 2, (mh - shape[0] * gain) / 2
top, left = int(pad[1]), int(pad[0])
bottom, right = int(mh - pad[1]), int(mw - pad[0])
return masks[:, top:bottom, left:right]
def crop_mask(masks: np.ndarray, boxes: np.ndarray) -> np.ndarray:
n, h, w = masks.shape
x1, y1, x2, y2 = np.split(boxes[:, :, None], 4, 1)
r = np.arange(w, dtype=x1.dtype)[None, None, :]
c = np.arange(h, dtype=x1.dtype)[None, :, None]
masks = masks * ((r >= x1) * (r < x2) * (c >= y1) * (c < y2))
return masks
def process_mask_accurate(protos: np.ndarray, masks_in: np.ndarray, bboxes: np.ndarray, shape: Tuple[int, int]) -> np.ndarray:
masks = preprocess_segmentation_masks(protos, masks_in, shape)
if len(masks.shape) == 2:
masks = np.expand_dims(masks, axis=0)
masks = masks.transpose((1, 2, 0))
masks = cv2.resize(masks, (shape[1], shape[0]), cv2.INTER_LINEAR)
if len(masks.shape) == 2:
masks = np.expand_dims(masks, axis=2)
masks = masks.transpose((2, 0, 1))
masks = crop_mask(masks, bboxes)
masks[masks < 0.5] = 0
return masks
def process_mask_tradeoff(protos: np.ndarray, masks_in: np.ndarray, bboxes: np.ndarray, shape: Tuple[int, int], tradeoff_factor: float) -> np.ndarray:
c, mh, mw = protos.shape
masks = preprocess_segmentation_masks(protos, masks_in, shape)
if len(masks.shape) == 2:
masks = np.expand_dims(masks, axis=0)
masks = masks.transpose((1, 2, 0))
ih, iw = shape
h = int(mh * (1 - tradeoff_factor) + ih * tradeoff_factor)
w = int(mw * (1 - tradeoff_factor) + iw * tradeoff_factor)
if tradeoff_factor != 0:
masks = cv2.resize(masks, (w, h), cv2.INTER_LINEAR)
if len(masks.shape) == 2:
masks = np.expand_dims(masks, axis=2)
masks = masks.transpose((2, 0, 1))
c, mh, mw = masks.shape
scale_x = mw / iw
scale_y = mh / ih
bboxes = bboxes.copy()
bboxes[:, 0] *= scale_x
bboxes[:, 2] *= scale_x
bboxes[:, 1] *= scale_y
bboxes[:, 3] *= scale_y
masks = crop_mask(masks, bboxes)
masks[masks < 0.5] = 0
return masks
def process_mask_fast(protos: np.ndarray, masks_in: np.ndarray, bboxes: np.ndarray, shape: Tuple[int, int]) -> np.ndarray:
ih, iw = shape
c, mh, mw = protos.shape
masks = preprocess_segmentation_masks(protos, masks_in, shape)
scale_x = mw / iw
scale_y = mh / ih
bboxes = bboxes.copy()
bboxes[:, 0] *= scale_x
bboxes[:, 2] *= scale_x
bboxes[:, 1] *= scale_y
bboxes[:, 3] *= scale_y
masks = crop_mask(masks, bboxes)
masks[masks < 0.5] = 0
return masks
def load_onnx_session(onnx_path: str, providers: List[str] = None) -> ort.InferenceSession:
if providers is None:
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
return ort.InferenceSession(onnx_path, providers=providers)
def find_default_onnx(data_dir: str) -> str:
candidates = [f for f in os.listdir(data_dir) if f.lower().endswith(".onnx")]
candidates.sort()
if not candidates:
raise FileNotFoundError(f"No .onnx file found in {data_dir}")
if len(candidates) > 1:
# Prefer weights.onnx if present.
for name in candidates:
if name.lower() == "weights.onnx":
return os.path.join(data_dir, name)
return os.path.join(data_dir, candidates[0])
def get_input_hw(session: ort.InferenceSession, preproc: Dict[str, Any]) -> Tuple[int, int]:
inputs = session.get_inputs()[0]
shape = inputs.shape
h, w = shape[2], shape[3]
if isinstance(h, str) or isinstance(w, str) or h is None or w is None:
resize = preproc.get("resize") if preproc else None
if resize:
h = int(resize.get("height", 640))
w = int(resize.get("width", 640))
else:
h, w = 640, 640
return int(h), int(w)
def build_meta(data_dir: str, session: ort.InferenceSession) -> Dict[str, Any]:
environment = load_environment(data_dir)
preproc = environment.get("PREPROCESSING") or {}
class_names = load_class_names(data_dir, environment)
resize_method = get_resize_method(preproc)
input_hw = get_input_hw(session, preproc)
keypoints_metadata = load_keypoints_metadata(data_dir)
return {
"environment": environment,
"preproc": preproc,
"class_names": class_names,
"resize_method": resize_method,
"input_hw": input_hw,
"keypoints_metadata": keypoints_metadata,
}
def normalize_rgb(img_in: np.ndarray, means: List[float], stds: List[float]) -> np.ndarray:
img_in = img_in.astype(np.float32)
img_in /= 255.0
img_in[:, 0, :, :] = (img_in[:, 0, :, :] - means[0]) / stds[0]
img_in[:, 1, :, :] = (img_in[:, 1, :, :] - means[1]) / stds[1]
img_in[:, 2, :, :] = (img_in[:, 2, :, :] - means[2]) / stds[2]
return img_in
MODEL_TASK_TYPE = "object-detection"
def preprocess_for_model(image: Any, meta: Dict[str, Any]) -> Tuple[np.ndarray, Tuple[int, int]]:
img_in, img_dims = preprocess_image(image, meta["preproc"], meta["input_hw"])
img_in = img_in.astype(np.float32)
img_in /= 255.0
return img_in, img_dims
def pack_predictions(predictions: np.ndarray) -> np.ndarray:
predictions = predictions.transpose(0, 2, 1)
boxes = predictions[:, :, :4]
class_confs = predictions[:, :, 4:]
confs = np.expand_dims(np.max(class_confs, axis=2), axis=2)
return np.concatenate([boxes, confs, class_confs], axis=2)
def postprocess_predictions(predictions: np.ndarray, meta: Dict[str, Any], img_dims: List[Tuple[int, int]],
confidence: float = 0.4, iou_threshold: float = 0.3, max_detections: int = 300):
preds = w_np_non_max_suppression(
predictions,
conf_thresh=confidence,
iou_thresh=iou_threshold,
class_agnostic=False,
max_detections=max_detections,
box_format="xywh",
)
infer_shape = meta["input_hw"]
preds = post_process_bboxes(preds, infer_shape, img_dims, meta["preproc"], meta["resize_method"])
class_names = meta["class_names"]
results = []
for batch_preds in preds:
batch_out = []
for pred in batch_preds:
cls_id = int(pred[6])
batch_out.append({
"x": (pred[0] + pred[2]) / 2,
"y": (pred[1] + pred[3]) / 2,
"width": pred[2] - pred[0],
"height": pred[3] - pred[1],
"confidence": float(pred[4]),
"class_id": cls_id,
"class": class_names[cls_id] if cls_id < len(class_names) else str(cls_id),
})
results.append(batch_out)
return results
def load_model(onnx_path: str | None = None, data_dir: str | None = None):
data_dir = data_dir or os.path.dirname(os.path.abspath(__file__))
onnx_path = onnx_path or find_default_onnx(data_dir)
session = load_onnx_session(onnx_path)
meta = build_meta(data_dir, session)
model_type_fn = globals().get("load_model_type")
model_type = model_type_fn(data_dir) if callable(model_type_fn) else "unknown"
return {"session": session, "meta": meta, "model_type": model_type}
def run_model(model: Any, image: Any = None, onnx_path: str | None = None, data_dir: str | None = None):
if image is None:
image = model
model = load_model(onnx_path=onnx_path, data_dir=data_dir)
session = model["session"]
meta = model["meta"]
model_type = model["model_type"]
img_in, img_dims = preprocess_for_model(image, meta)
input_name = session.get_inputs()[0].name
outputs = session.run(None, {input_name: img_in})
predictions = pack_predictions(outputs[0])
return postprocess_predictions(predictions, meta, [img_dims])
def main():
if len(sys.argv) < 2:
print("Usage: main.py <image_path> [onnx_path]", file=sys.stderr)
sys.exit(1)
image_path = sys.argv[1]
data_dir = os.path.dirname(os.path.abspath(__file__))
onnx_path = sys.argv[2] if len(sys.argv) > 2 else find_default_onnx(data_dir)
results = run_model(image_path, onnx_path=onnx_path, data_dir=data_dir)
print(json.dumps(results, indent=2))
if __name__ == "__main__":
main()