Hollis71025's picture
Add 7 navigation models (ONNX + inference wrappers) and model card
9c1f523 verified
#!/usr/bin/env python3
"""MIMIC inference utilities.
Mirrors the style of :mod:`models.S2E_PG_Full.inference`, adapted for the
MIMIC ONNX (``mimic.onnx``) — point-goal, multi-modal, 16-frame context with
an explicit past-trajectory + past-mask input. Reference single-shot script:
``urban-sim-human-official/run_v3_b_onnx.py``.
The ONNX has four inputs and three outputs::
obs : (B, 16, 3, 256, 256) float32 in [0, 1]
goal : (B, 3) float32 = [dist/200, cos(a), sin(a)]
past_traj : (B, 15, 2) float32 — past XY in camera frame x 0.25
(usually zeros at inference)
past_mask : (B, 16) float32 — 1.0 for valid frames
best_reg_mid : (B, 10, 3) — best single trajectory (model-picked)
poses_reg_mid_denorm : (B, 62, 10, 3) — 62 candidates
poses_cls_mid : (B, 62) — classification scores
Outputs are in *camera frame* (col-0 = forward, col-1 = +left in the model
output; the reference script's ``[-col1, col0]`` swap turns col1 into the
controller's right-positive ``x_right``). We rescale by ``waypoint_scale``
(=0.25) and return waypoints in standard frame ``(x_fwd, y_left)``.
nav = MIMICNavigator(device="cuda")
traj, scores = nav.inference_trajectory(obs, goal_xy=np.array([5.0, 0.2]))
vw, best = nav.inference_vw(obs, goal_xy=np.array([5.0, 0.2]))
nav.reset()
"""
import os
import math
import numpy as np
import torch
import yaml
import onnxruntime as ort
from urbansim.custom.pp import PurePursuitController
MODEL_DIR = os.path.dirname(os.path.abspath(__file__))
ONNX_PATH = os.path.join(MODEL_DIR, "mimic.onnx")
INFO_PATH = os.path.join(MODEL_DIR, "model_info.yaml")
HORIZON_IDX = 13
Scaler = 0.25
class PDController:
"""Convert predicted waypoints to (v, w) commands."""
MAX_V = 2.5
MAX_W = 0.65
def __init__(self):
self.last_v = None
def reset(self):
self.last_v = None
def __call__(self, waypoints, dt=1.0):
EPS = 1e-6
dxr = waypoints[:, -1, 0]
dyr = waypoints[:, -1, 1]
idx = min(6, waypoints.shape[1] - 1)
dx = waypoints[:, idx, 0]
v = dx / dt
w = (torch.atan2(dyr, dxr.abs().clamp(min=EPS))
* dxr.sign() / dt)
near_zero = dx.abs() < EPS
v = torch.where(near_zero, torch.zeros_like(v), v)
w = torch.where(near_zero, dyr.sign() * (math.pi / 20.0), w)
if self.last_v is not None:
v = v.clamp(self.last_v - 0.5, self.last_v + 0.4)
v = v.clamp(-self.MAX_V, self.MAX_V)
w = w.clamp(-self.MAX_W, self.MAX_W)
self.last_v = v
return v, w
class MIMICNavigator:
"""
"""
context_size = 16
multimodal = False
def __init__(
self,
onnx_path: str = ONNX_PATH,
device: str = "cuda",
max_v: float = 2.5,
max_w: float = 0.65,
dt: float = 2.0,
horizon_idx: int = HORIZON_IDX,
):
"""
Args:
onnx_path: Path to the ONNX model file.
device: "cuda" or "cpu".
max_v: Max linear velocity (m/s).
max_w: Max angular velocity (rad/s).
dt: Time scaling factor for PD controller.
horizon_idx: Number of output waypoints to keep (up to 4s).
"""
self.device = device
self.dt = dt
self.horizon_idx = horizon_idx
ort.set_default_logger_severity(3)
providers = [
("CUDAExecutionProvider",
{"arena_extend_strategy": "kSameAsRequested"}),
"CPUExecutionProvider",
]
self._session = ort.InferenceSession(onnx_path, providers=providers)
with open(INFO_PATH, "r") as f:
self._info = yaml.safe_load(f)
self._controller = PDController()
self._controller.MAX_V = max_v
self._controller.MAX_W = max_w
self._pp = PurePursuitController(action_dt=4, waypoint_index=6)
self._last_best_traj = None
def reset(self):
self._controller.reset()
self._last_best_traj = None
def inference_trajectory(self, obs):
"""Run model → (trajectory, scores).
Args:
obs: (B, 16, 3, 288, 512) float32 in [0,1].
Note: batch dim is fixed to 1 for this model.
Returns:
trajectory: np.ndarray (B, 1, K, 2) where K = horizon_idx
scores: np.ndarray (B, 1)
"""
if isinstance(obs, torch.Tensor):
obs_np = obs.cpu().numpy()
else:
obs_np = np.asarray(obs, dtype=np.float32)
B = obs_np.shape[0]
if B == 1:
out = self._session.run(None, {"input": obs_np})
raw = out[0]
else:
raw = np.concatenate(
[self._session.run(None, {"input": obs_np[i:i+1]})[0] for i in range(B)],
axis=0,
)
# Take x, y columns up to the 4-second horizon
traj_xy = raw[:, :self.horizon_idx, :2].astype(np.float32)
# Reshape to (B, 1, K, 2) for uniform interface
trajectory = traj_xy[:, np.newaxis] * Scaler
scores = np.ones((B, 1), dtype=np.float32)
return trajectory, scores
def inference_vw(self, obs):
"""Run model → (v, w) velocity commands.
Returns:
vw: torch.Tensor (B, 2)
"""
trajectory, _ = self.inference_trajectory(obs)
best_traj = trajectory[:, 0] # (B, K, 2)
waypoints = torch.from_numpy(best_traj).float().to(self.device)
v, w = self._controller(waypoints, dt=self.dt)
self._last_best_traj = best_traj
return torch.stack([v, w], dim=1), best_traj