| |
| """S2EBC-PG-Web100 inference utilities. |
| |
| S2EBC-PG takes 11 frames at 256x256 (no ImageNet normalization, [0,1]), |
| a goal vector [norm_dist, cos(angle), sin(angle)], and outputs |
| 10 waypoints x 3 (x, y, yaw). Waypoints are scaled by 1/0.1 = 10x. |
| |
| User provides goal_xy in standard frame (x=forward, y=left, meters). |
| |
| nav = S2EBCPGNavigator(device="cuda") |
| traj, scores = nav.inference_trajectory(obs, goal_xy=np.array([5.0, 0.2])) |
| vw = nav.inference_vw(obs, goal_xy=np.array([5.0, 0.2])) |
| """ |
|
|
| import os |
| import math |
| import numpy as np |
| import torch |
| import onnxruntime as ort |
|
|
| MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) |
| ONNX_PATH = os.path.join(MODEL_DIR, "s2e.onnx") |
| IMG_SIZE = 256 |
| WP_SCALE = 0.25 |
|
|
|
|
| class PDController: |
| MAX_V = 2.5 |
| MAX_W = 0.65 |
|
|
| def __init__(self): |
| self.last_v = None |
|
|
| def reset(self): |
| self.last_v = None |
|
|
| def __call__(self, waypoints, dt=1.0): |
| EPS = 1e-6 |
| idx = min(4, waypoints.shape[1] - 1) |
| dx = waypoints[:, idx, 0] |
| dyr = waypoints[:, -1, 1] |
| dxr = waypoints[:, -1, 0] |
| v = dx / dt |
| w = (torch.atan2(dyr, dxr.abs().clamp(min=EPS)) |
| * dxr.sign() / dt ) |
| near_zero = dx.abs() < EPS |
| v = torch.where(near_zero, torch.zeros_like(v), v) |
| w = torch.where(near_zero, dyr.sign() * (math.pi / 20.0), w) |
| if self.last_v is not None: |
| v = v.clamp(self.last_v - 0.5, self.last_v + 0.4) |
| v = v.clamp(-self.MAX_V, self.MAX_V) |
| w = w.clamp(-self.MAX_W, self.MAX_W) |
| self.last_v = v |
| return v, w |
|
|
|
|
| class S2ENavigator: |
| """S2E point-goal / goal-free navigator. |
| |
| All user-facing coordinates: x=forward, y=left, meters. |
| """ |
|
|
| context_size = 11 |
| multimodal = False |
|
|
| def __init__( |
| self, |
| onnx_path: str = ONNX_PATH, |
| device: str = "cuda", |
| max_v: float = 2.5, |
| max_w: float = 0.65, |
| dt: float = 2.0, |
| ): |
| self.device = device |
| self.dt = dt |
|
|
| ort.set_default_logger_severity(3) |
| providers = [ |
| ("CUDAExecutionProvider", |
| {"arena_extend_strategy": "kSameAsRequested"}), |
| "CPUExecutionProvider", |
| ] |
| self._session = ort.InferenceSession(onnx_path, providers=providers) |
| self._controller = PDController() |
| self._controller.MAX_V = max_v |
| self._controller.MAX_W = max_w |
| self._last_best_traj = None |
| INFO_PATH = os.path.join(MODEL_DIR, "model_info.yaml") |
| import yaml |
| with open(INFO_PATH, "r") as f: |
| self._info = yaml.safe_load(f) |
| |
| def reset(self): |
| self._controller.reset() |
| self._last_best_traj = None |
|
|
| @staticmethod |
| def _goal_to_input(goal_xy): |
| """Standard goal [x_fwd, y_left] meters → model input [norm_dist, cos(θ), sin(θ)].""" |
| x, y = float(goal_xy[0]), float(goal_xy[1]) |
| dist = math.sqrt(x * x + y * y) |
| norm_dist = max(min(dist, 200.0), 0.1) / 200.0 |
| angle = math.atan2(y, x) |
| return np.array([norm_dist, math.cos(angle), math.sin(angle)], |
| dtype=np.float32) |
|
|
| def inference_trajectory(self, obs, goal_xy=None): |
| """Run model → (trajectory, scores). |
| |
| Args: |
| obs: (B, 11, 3, H, W) float32 in [0,1]. 11 frames. |
| Images are resized internally to 256x256. |
| goal_xy: (2,) goal in standard frame, or None → [5.0, 0.0]. |
| |
| Returns: |
| trajectory: np.ndarray (B, 1, 10, 2) meters |
| scores: np.ndarray (B, 1) |
| """ |
| if isinstance(obs, torch.Tensor): |
| obs_np = obs.cpu().numpy() |
| else: |
| obs_np = np.asarray(obs, dtype=np.float32) |
| B = obs_np.shape[0] |
|
|
| |
| if obs_np.shape[-2:] != (IMG_SIZE, IMG_SIZE): |
| import cv2 |
| resized = np.empty((B, 11, 3, IMG_SIZE, IMG_SIZE), dtype=np.float32) |
| for b in range(B): |
| for c in range(11): |
| frame_hwc = obs_np[b, c].transpose(1, 2, 0) |
| frame_rsz = cv2.resize(frame_hwc, (IMG_SIZE, IMG_SIZE)) |
| resized[b, c] = frame_rsz.transpose(2, 0, 1) |
| obs_np = resized |
|
|
| if goal_xy is None: |
| goal_xy = np.array([5.0, 0.0]) |
| goal_input = self._goal_to_input(goal_xy) |
| goal_batch = np.tile(goal_input, (B, 1)) |
|
|
| all_wp_raw = [] |
| for i in range(B): |
| out = self._session.run(None, { |
| "obs_images": obs_np[i:i+1], |
| "goal": goal_batch[i:i+1], |
| }) |
| all_wp_raw.append(out[0][:, :, :2]) |
|
|
| wp_raw = np.concatenate(all_wp_raw, axis=0) |
| wp_meters = wp_raw * WP_SCALE |
|
|
| trajectory = wp_meters[:, np.newaxis].astype(np.float32) |
| scores = np.ones((B, 1), dtype=np.float32) |
| self._last_best_traj = trajectory[:, 0] |
| return trajectory, scores |
|
|
| def inference_vw(self, obs, goal_xy=None): |
| """Run model → (v, w). |
| |
| Returns: |
| vw: torch.Tensor (B, 2) |
| """ |
| trajectory, _ = self.inference_trajectory(obs, goal_xy) |
| best_traj = trajectory[:, 0] |
| waypoints = torch.from_numpy(best_traj).float().to(self.device) |
| v, w = self._controller(waypoints, dt=self.dt) |
| return torch.stack([v, w], dim=1), best_traj |
|
|