#!/usr/bin/env python3 """MIMIC inference utilities. Mirrors the style of :mod:`models.S2E_PG_Full.inference`, adapted for the MIMIC ONNX (``mimic.onnx``) — point-goal, multi-modal, 16-frame context with an explicit past-trajectory + past-mask input. Reference single-shot script: ``urban-sim-human-official/run_v3_b_onnx.py``. The ONNX has four inputs and three outputs:: obs : (B, 16, 3, 256, 256) float32 in [0, 1] goal : (B, 3) float32 = [dist/200, cos(a), sin(a)] past_traj : (B, 15, 2) float32 — past XY in camera frame x 0.25 (usually zeros at inference) past_mask : (B, 16) float32 — 1.0 for valid frames best_reg_mid : (B, 10, 3) — best single trajectory (model-picked) poses_reg_mid_denorm : (B, 62, 10, 3) — 62 candidates poses_cls_mid : (B, 62) — classification scores Outputs are in *camera frame* (col-0 = forward, col-1 = +left in the model output; the reference script's ``[-col1, col0]`` swap turns col1 into the controller's right-positive ``x_right``). We rescale by ``waypoint_scale`` (=0.25) and return waypoints in standard frame ``(x_fwd, y_left)``. nav = MIMICNavigator(device="cuda") traj, scores = nav.inference_trajectory(obs, goal_xy=np.array([5.0, 0.2])) vw, best = nav.inference_vw(obs, goal_xy=np.array([5.0, 0.2])) nav.reset() """ import os import math import numpy as np import torch import yaml import onnxruntime as ort from urbansim.custom.pp import PurePursuitController MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) ONNX_PATH = os.path.join(MODEL_DIR, "mimic.onnx") INFO_PATH = os.path.join(MODEL_DIR, "model_info.yaml") HORIZON_IDX = 13 Scaler = 0.25 class PDController: """Convert predicted waypoints to (v, w) commands.""" MAX_V = 2.5 MAX_W = 0.65 def __init__(self): self.last_v = None def reset(self): self.last_v = None def __call__(self, waypoints, dt=1.0): EPS = 1e-6 dxr = waypoints[:, -1, 0] dyr = waypoints[:, -1, 1] idx = min(6, waypoints.shape[1] - 1) dx = waypoints[:, idx, 0] v = dx / dt w = (torch.atan2(dyr, dxr.abs().clamp(min=EPS)) * dxr.sign() / dt) near_zero = dx.abs() < EPS v = torch.where(near_zero, torch.zeros_like(v), v) w = torch.where(near_zero, dyr.sign() * (math.pi / 20.0), w) if self.last_v is not None: v = v.clamp(self.last_v - 0.5, self.last_v + 0.4) v = v.clamp(-self.MAX_V, self.MAX_V) w = w.clamp(-self.MAX_W, self.MAX_W) self.last_v = v return v, w class MIMICNavigator: """ """ context_size = 16 multimodal = False def __init__( self, onnx_path: str = ONNX_PATH, device: str = "cuda", max_v: float = 2.5, max_w: float = 0.65, dt: float = 2.0, horizon_idx: int = HORIZON_IDX, ): """ Args: onnx_path: Path to the ONNX model file. device: "cuda" or "cpu". max_v: Max linear velocity (m/s). max_w: Max angular velocity (rad/s). dt: Time scaling factor for PD controller. horizon_idx: Number of output waypoints to keep (up to 4s). """ self.device = device self.dt = dt self.horizon_idx = horizon_idx ort.set_default_logger_severity(3) providers = [ ("CUDAExecutionProvider", {"arena_extend_strategy": "kSameAsRequested"}), "CPUExecutionProvider", ] self._session = ort.InferenceSession(onnx_path, providers=providers) with open(INFO_PATH, "r") as f: self._info = yaml.safe_load(f) self._controller = PDController() self._controller.MAX_V = max_v self._controller.MAX_W = max_w self._pp = PurePursuitController(action_dt=4, waypoint_index=6) self._last_best_traj = None def reset(self): self._controller.reset() self._last_best_traj = None def inference_trajectory(self, obs): """Run model → (trajectory, scores). Args: obs: (B, 16, 3, 288, 512) float32 in [0,1]. Note: batch dim is fixed to 1 for this model. Returns: trajectory: np.ndarray (B, 1, K, 2) where K = horizon_idx scores: np.ndarray (B, 1) """ if isinstance(obs, torch.Tensor): obs_np = obs.cpu().numpy() else: obs_np = np.asarray(obs, dtype=np.float32) B = obs_np.shape[0] if B == 1: out = self._session.run(None, {"input": obs_np}) raw = out[0] else: raw = np.concatenate( [self._session.run(None, {"input": obs_np[i:i+1]})[0] for i in range(B)], axis=0, ) # Take x, y columns up to the 4-second horizon traj_xy = raw[:, :self.horizon_idx, :2].astype(np.float32) # Reshape to (B, 1, K, 2) for uniform interface trajectory = traj_xy[:, np.newaxis] * Scaler scores = np.ones((B, 1), dtype=np.float32) return trajectory, scores def inference_vw(self, obs): """Run model → (v, w) velocity commands. Returns: vw: torch.Tensor (B, 2) """ trajectory, _ = self.inference_trajectory(obs) best_traj = trajectory[:, 0] # (B, K, 2) waypoints = torch.from_numpy(best_traj).float().to(self.device) v, w = self._controller(waypoints, dt=self.dt) self._last_best_traj = best_traj return torch.stack([v, w], dim=1), best_traj