| |
| """MIMIC inference utilities. |
| |
| Mirrors the style of :mod:`models.S2E_PG_Full.inference`, adapted for the |
| MIMIC ONNX (``mimic.onnx``) — point-goal, multi-modal, 16-frame context with |
| an explicit past-trajectory + past-mask input. Reference single-shot script: |
| ``urban-sim-human-official/run_v3_b_onnx.py``. |
| |
| The ONNX has four inputs and three outputs:: |
| |
| obs : (B, 16, 3, 256, 256) float32 in [0, 1] |
| goal : (B, 3) float32 = [dist/200, cos(a), sin(a)] |
| past_traj : (B, 15, 2) float32 — past XY in camera frame x 0.25 |
| (usually zeros at inference) |
| past_mask : (B, 16) float32 — 1.0 for valid frames |
| |
| best_reg_mid : (B, 10, 3) — best single trajectory (model-picked) |
| poses_reg_mid_denorm : (B, 62, 10, 3) — 62 candidates |
| poses_cls_mid : (B, 62) — classification scores |
| |
| Outputs are in *camera frame* (col-0 = forward, col-1 = +left in the model |
| output; the reference script's ``[-col1, col0]`` swap turns col1 into the |
| controller's right-positive ``x_right``). We rescale by ``waypoint_scale`` |
| (=0.25) and return waypoints in standard frame ``(x_fwd, y_left)``. |
| |
| nav = MIMICNavigator(device="cuda") |
| traj, scores = nav.inference_trajectory(obs, goal_xy=np.array([5.0, 0.2])) |
| vw, best = nav.inference_vw(obs, goal_xy=np.array([5.0, 0.2])) |
| nav.reset() |
| """ |
| import os |
| import math |
| import numpy as np |
| import torch |
| import yaml |
| import onnxruntime as ort |
| from urbansim.custom.pp import PurePursuitController |
|
|
| MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) |
| ONNX_PATH = os.path.join(MODEL_DIR, "mimic.onnx") |
| INFO_PATH = os.path.join(MODEL_DIR, "model_info.yaml") |
| HORIZON_IDX = 13 |
| Scaler = 0.25 |
|
|
| class PDController: |
| """Convert predicted waypoints to (v, w) commands.""" |
| MAX_V = 2.5 |
| MAX_W = 0.65 |
|
|
| def __init__(self): |
| self.last_v = None |
|
|
| def reset(self): |
| self.last_v = None |
|
|
| def __call__(self, waypoints, dt=1.0): |
| EPS = 1e-6 |
| dxr = waypoints[:, -1, 0] |
| dyr = waypoints[:, -1, 1] |
| idx = min(6, waypoints.shape[1] - 1) |
| dx = waypoints[:, idx, 0] |
| v = dx / dt |
| w = (torch.atan2(dyr, dxr.abs().clamp(min=EPS)) |
| * dxr.sign() / dt) |
| near_zero = dx.abs() < EPS |
| v = torch.where(near_zero, torch.zeros_like(v), v) |
| w = torch.where(near_zero, dyr.sign() * (math.pi / 20.0), w) |
| if self.last_v is not None: |
| v = v.clamp(self.last_v - 0.5, self.last_v + 0.4) |
| v = v.clamp(-self.MAX_V, self.MAX_V) |
| w = w.clamp(-self.MAX_W, self.MAX_W) |
| self.last_v = v |
| return v, w |
|
|
| class MIMICNavigator: |
| """ |
| """ |
|
|
| context_size = 16 |
| multimodal = False |
|
|
| def __init__( |
| self, |
| onnx_path: str = ONNX_PATH, |
| device: str = "cuda", |
| max_v: float = 2.5, |
| max_w: float = 0.65, |
| dt: float = 2.0, |
| horizon_idx: int = HORIZON_IDX, |
| ): |
| """ |
| Args: |
| onnx_path: Path to the ONNX model file. |
| device: "cuda" or "cpu". |
| max_v: Max linear velocity (m/s). |
| max_w: Max angular velocity (rad/s). |
| dt: Time scaling factor for PD controller. |
| horizon_idx: Number of output waypoints to keep (up to 4s). |
| """ |
| self.device = device |
| self.dt = dt |
| self.horizon_idx = horizon_idx |
|
|
| ort.set_default_logger_severity(3) |
| providers = [ |
| ("CUDAExecutionProvider", |
| {"arena_extend_strategy": "kSameAsRequested"}), |
| "CPUExecutionProvider", |
| ] |
| self._session = ort.InferenceSession(onnx_path, providers=providers) |
| with open(INFO_PATH, "r") as f: |
| self._info = yaml.safe_load(f) |
| self._controller = PDController() |
| self._controller.MAX_V = max_v |
| self._controller.MAX_W = max_w |
|
|
| self._pp = PurePursuitController(action_dt=4, waypoint_index=6) |
| self._last_best_traj = None |
|
|
|
|
| def reset(self): |
| self._controller.reset() |
| self._last_best_traj = None |
|
|
| def inference_trajectory(self, obs): |
| """Run model → (trajectory, scores). |
| |
| Args: |
| obs: (B, 16, 3, 288, 512) float32 in [0,1]. |
| Note: batch dim is fixed to 1 for this model. |
| |
| Returns: |
| trajectory: np.ndarray (B, 1, K, 2) where K = horizon_idx |
| scores: np.ndarray (B, 1) |
| """ |
| if isinstance(obs, torch.Tensor): |
| obs_np = obs.cpu().numpy() |
| else: |
| obs_np = np.asarray(obs, dtype=np.float32) |
| B = obs_np.shape[0] |
|
|
| if B == 1: |
| out = self._session.run(None, {"input": obs_np}) |
| raw = out[0] |
| else: |
| raw = np.concatenate( |
| [self._session.run(None, {"input": obs_np[i:i+1]})[0] for i in range(B)], |
| axis=0, |
| ) |
|
|
| |
| traj_xy = raw[:, :self.horizon_idx, :2].astype(np.float32) |
| |
| trajectory = traj_xy[:, np.newaxis] * Scaler |
| scores = np.ones((B, 1), dtype=np.float32) |
| return trajectory, scores |
|
|
| def inference_vw(self, obs): |
| """Run model → (v, w) velocity commands. |
| |
| Returns: |
| vw: torch.Tensor (B, 2) |
| """ |
| trajectory, _ = self.inference_trajectory(obs) |
| best_traj = trajectory[:, 0] |
| waypoints = torch.from_numpy(best_traj).float().to(self.device) |
| v, w = self._controller(waypoints, dt=self.dt) |
| self._last_best_traj = best_traj |
| return torch.stack([v, w], dim=1), best_traj |
|
|
|
|