Hollis71025's picture
Add 7 navigation models (ONNX + inference wrappers) and model card
9c1f523 verified
#!/usr/bin/env python3
"""S2EBC-PG-Web100 inference utilities.
S2EBC-PG takes 11 frames at 256x256 (no ImageNet normalization, [0,1]),
a goal vector [norm_dist, cos(angle), sin(angle)], and outputs
10 waypoints x 3 (x, y, yaw). Waypoints are scaled by 1/0.1 = 10x.
User provides goal_xy in standard frame (x=forward, y=left, meters).
nav = S2EBCPGNavigator(device="cuda")
traj, scores = nav.inference_trajectory(obs, goal_xy=np.array([5.0, 0.2]))
vw = nav.inference_vw(obs, goal_xy=np.array([5.0, 0.2]))
"""
import os
import math
import numpy as np
import torch
import onnxruntime as ort
MODEL_DIR = os.path.dirname(os.path.abspath(__file__))
ONNX_PATH = os.path.join(MODEL_DIR, "s2e.onnx")
IMG_SIZE = 256
WP_SCALE = 0.25 # model outputs are divided by this to get meters
class PDController:
MAX_V = 2.5
MAX_W = 0.65
def __init__(self):
self.last_v = None
def reset(self):
self.last_v = None
def __call__(self, waypoints, dt=1.0):
EPS = 1e-6
idx = min(4, waypoints.shape[1] - 1)
dx = waypoints[:, idx, 0]
dyr = waypoints[:, -1, 1]
dxr = waypoints[:, -1, 0]
v = dx / dt
w = (torch.atan2(dyr, dxr.abs().clamp(min=EPS))
* dxr.sign() / dt )
near_zero = dx.abs() < EPS
v = torch.where(near_zero, torch.zeros_like(v), v)
w = torch.where(near_zero, dyr.sign() * (math.pi / 20.0), w)
if self.last_v is not None:
v = v.clamp(self.last_v - 0.5, self.last_v + 0.4)
v = v.clamp(-self.MAX_V, self.MAX_V)
w = w.clamp(-self.MAX_W, self.MAX_W)
self.last_v = v
return v, w
class S2ENavigator:
"""S2E point-goal / goal-free navigator.
All user-facing coordinates: x=forward, y=left, meters.
"""
context_size = 11
multimodal = False
def __init__(
self,
onnx_path: str = ONNX_PATH,
device: str = "cuda",
max_v: float = 2.5,
max_w: float = 0.65,
dt: float = 2.0,
):
self.device = device
self.dt = dt
ort.set_default_logger_severity(3)
providers = [
("CUDAExecutionProvider",
{"arena_extend_strategy": "kSameAsRequested"}),
"CPUExecutionProvider",
]
self._session = ort.InferenceSession(onnx_path, providers=providers)
self._controller = PDController()
self._controller.MAX_V = max_v
self._controller.MAX_W = max_w
self._last_best_traj = None
INFO_PATH = os.path.join(MODEL_DIR, "model_info.yaml")
import yaml
with open(INFO_PATH, "r") as f:
self._info = yaml.safe_load(f)
def reset(self):
self._controller.reset()
self._last_best_traj = None
@staticmethod
def _goal_to_input(goal_xy):
"""Standard goal [x_fwd, y_left] meters → model input [norm_dist, cos(θ), sin(θ)]."""
x, y = float(goal_xy[0]), float(goal_xy[1])
dist = math.sqrt(x * x + y * y)
norm_dist = max(min(dist, 200.0), 0.1) / 200.0
angle = math.atan2(y, x)
return np.array([norm_dist, math.cos(angle), math.sin(angle)],
dtype=np.float32)
def inference_trajectory(self, obs, goal_xy=None):
"""Run model → (trajectory, scores).
Args:
obs: (B, 11, 3, H, W) float32 in [0,1]. 11 frames.
Images are resized internally to 256x256.
goal_xy: (2,) goal in standard frame, or None → [5.0, 0.0].
Returns:
trajectory: np.ndarray (B, 1, 10, 2) meters
scores: np.ndarray (B, 1)
"""
if isinstance(obs, torch.Tensor):
obs_np = obs.cpu().numpy()
else:
obs_np = np.asarray(obs, dtype=np.float32)
B = obs_np.shape[0]
# Resize to 256x256 if needed (obs is CHW)
if obs_np.shape[-2:] != (IMG_SIZE, IMG_SIZE):
import cv2
resized = np.empty((B, 11, 3, IMG_SIZE, IMG_SIZE), dtype=np.float32)
for b in range(B):
for c in range(11):
frame_hwc = obs_np[b, c].transpose(1, 2, 0) # HWC
frame_rsz = cv2.resize(frame_hwc, (IMG_SIZE, IMG_SIZE))
resized[b, c] = frame_rsz.transpose(2, 0, 1)
obs_np = resized
if goal_xy is None:
goal_xy = np.array([5.0, 0.0])
goal_input = self._goal_to_input(goal_xy)
goal_batch = np.tile(goal_input, (B, 1))
all_wp_raw = []
for i in range(B):
out = self._session.run(None, {
"obs_images": obs_np[i:i+1], # (1, 11, 3, 256, 256)
"goal": goal_batch[i:i+1], # (1, 3)
})
all_wp_raw.append(out[0][:, :, :2])
wp_raw = np.concatenate(all_wp_raw, axis=0) # (B, 10, 2) — x, y (scaled)
wp_meters = wp_raw * WP_SCALE # un-scale to meters
trajectory = wp_meters[:, np.newaxis].astype(np.float32) # (B, 1, 10, 2)
scores = np.ones((B, 1), dtype=np.float32)
self._last_best_traj = trajectory[:, 0]
return trajectory, scores
def inference_vw(self, obs, goal_xy=None):
"""Run model → (v, w).
Returns:
vw: torch.Tensor (B, 2)
"""
trajectory, _ = self.inference_trajectory(obs, goal_xy)
best_traj = trajectory[:, 0]
waypoints = torch.from_numpy(best_traj).float().to(self.device)
v, w = self._controller(waypoints, dt=self.dt)
return torch.stack([v, w], dim=1), best_traj