#!/usr/bin/env python3 """S2EBC-PG-Web100 inference utilities. S2EBC-PG takes 11 frames at 256x256 (no ImageNet normalization, [0,1]), a goal vector [norm_dist, cos(angle), sin(angle)], and outputs 10 waypoints x 3 (x, y, yaw). Waypoints are scaled by 1/0.1 = 10x. User provides goal_xy in standard frame (x=forward, y=left, meters). nav = S2EBCPGNavigator(device="cuda") traj, scores = nav.inference_trajectory(obs, goal_xy=np.array([5.0, 0.2])) vw = nav.inference_vw(obs, goal_xy=np.array([5.0, 0.2])) """ import os import math import numpy as np import torch import onnxruntime as ort MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) ONNX_PATH = os.path.join(MODEL_DIR, "s2e.onnx") IMG_SIZE = 256 WP_SCALE = 0.25 # model outputs are divided by this to get meters class PDController: MAX_V = 2.5 MAX_W = 0.65 def __init__(self): self.last_v = None def reset(self): self.last_v = None def __call__(self, waypoints, dt=1.0): EPS = 1e-6 idx = min(4, waypoints.shape[1] - 1) dx = waypoints[:, idx, 0] dyr = waypoints[:, -1, 1] dxr = waypoints[:, -1, 0] v = dx / dt w = (torch.atan2(dyr, dxr.abs().clamp(min=EPS)) * dxr.sign() / dt ) near_zero = dx.abs() < EPS v = torch.where(near_zero, torch.zeros_like(v), v) w = torch.where(near_zero, dyr.sign() * (math.pi / 20.0), w) if self.last_v is not None: v = v.clamp(self.last_v - 0.5, self.last_v + 0.4) v = v.clamp(-self.MAX_V, self.MAX_V) w = w.clamp(-self.MAX_W, self.MAX_W) self.last_v = v return v, w class S2ENavigator: """S2E point-goal / goal-free navigator. All user-facing coordinates: x=forward, y=left, meters. """ context_size = 11 multimodal = False def __init__( self, onnx_path: str = ONNX_PATH, device: str = "cuda", max_v: float = 2.5, max_w: float = 0.65, dt: float = 2.0, ): self.device = device self.dt = dt ort.set_default_logger_severity(3) providers = [ ("CUDAExecutionProvider", {"arena_extend_strategy": "kSameAsRequested"}), "CPUExecutionProvider", ] self._session = ort.InferenceSession(onnx_path, providers=providers) self._controller = PDController() self._controller.MAX_V = max_v self._controller.MAX_W = max_w self._last_best_traj = None INFO_PATH = os.path.join(MODEL_DIR, "model_info.yaml") import yaml with open(INFO_PATH, "r") as f: self._info = yaml.safe_load(f) def reset(self): self._controller.reset() self._last_best_traj = None @staticmethod def _goal_to_input(goal_xy): """Standard goal [x_fwd, y_left] meters → model input [norm_dist, cos(θ), sin(θ)].""" x, y = float(goal_xy[0]), float(goal_xy[1]) dist = math.sqrt(x * x + y * y) norm_dist = max(min(dist, 200.0), 0.1) / 200.0 angle = math.atan2(y, x) return np.array([norm_dist, math.cos(angle), math.sin(angle)], dtype=np.float32) def inference_trajectory(self, obs, goal_xy=None): """Run model → (trajectory, scores). Args: obs: (B, 11, 3, H, W) float32 in [0,1]. 11 frames. Images are resized internally to 256x256. goal_xy: (2,) goal in standard frame, or None → [5.0, 0.0]. Returns: trajectory: np.ndarray (B, 1, 10, 2) meters scores: np.ndarray (B, 1) """ if isinstance(obs, torch.Tensor): obs_np = obs.cpu().numpy() else: obs_np = np.asarray(obs, dtype=np.float32) B = obs_np.shape[0] # Resize to 256x256 if needed (obs is CHW) if obs_np.shape[-2:] != (IMG_SIZE, IMG_SIZE): import cv2 resized = np.empty((B, 11, 3, IMG_SIZE, IMG_SIZE), dtype=np.float32) for b in range(B): for c in range(11): frame_hwc = obs_np[b, c].transpose(1, 2, 0) # HWC frame_rsz = cv2.resize(frame_hwc, (IMG_SIZE, IMG_SIZE)) resized[b, c] = frame_rsz.transpose(2, 0, 1) obs_np = resized if goal_xy is None: goal_xy = np.array([5.0, 0.0]) goal_input = self._goal_to_input(goal_xy) goal_batch = np.tile(goal_input, (B, 1)) all_wp_raw = [] for i in range(B): out = self._session.run(None, { "obs_images": obs_np[i:i+1], # (1, 11, 3, 256, 256) "goal": goal_batch[i:i+1], # (1, 3) }) all_wp_raw.append(out[0][:, :, :2]) wp_raw = np.concatenate(all_wp_raw, axis=0) # (B, 10, 2) — x, y (scaled) wp_meters = wp_raw * WP_SCALE # un-scale to meters trajectory = wp_meters[:, np.newaxis].astype(np.float32) # (B, 1, 10, 2) scores = np.ones((B, 1), dtype=np.float32) self._last_best_traj = trajectory[:, 0] return trajectory, scores def inference_vw(self, obs, goal_xy=None): """Run model → (v, w). Returns: vw: torch.Tensor (B, 2) """ trajectory, _ = self.inference_trajectory(obs, goal_xy) best_traj = trajectory[:, 0] waypoints = torch.from_numpy(best_traj).float().to(self.device) v, w = self._controller(waypoints, dt=self.dt) return torch.stack([v, w], dim=1), best_traj