File size: 4,876 Bytes
7995596 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """
Pi0 RTC Cube 模型推理示例(基于 realtime-vla Triton 加速)
在 RTX 4090 上运行,使用 2 个相机视角(camera1 主视角 + camera2 辅助视角)。
模型输出 50 步 action chunk,每步 32 维(前 6 维有效:5 关节 + 1 夹爪)。
依赖:
pip install torch triton numpy
用法:
python infer_example.py
"""
import json
import pickle
import numpy as np
import torch
from pi0_infer import Pi0Inference
# ===== 配置 =====
CHECKPOINT_PATH = "converted_checkpoint.pkl"
NORM_STATS_PATH = "norm_stats.json"
NUM_VIEWS = 2 # camera1(主视角)+ camera2(辅助视角)
CHUNK_SIZE = 50 # action horizon
ACTION_DIM = 6 # 5 关节 + 1 夹爪
STATE_DIM = 6 # 原始 state 维度
def load_norm_stats(path):
"""加载归一化统计量。"""
with open(path, 'r') as f:
data = json.load(f)
return data.get('norm_stats', data)
def normalize_state(state, norm_stats, target_dim=32):
"""使用 mean/std 归一化 state,并 pad 到 target_dim。"""
state_mean = np.array(norm_stats["state"]["mean"])
state_std = np.array(norm_stats["state"]["std"])
normalized = (state - state_mean) / (state_std + 1e-6)
padded = np.zeros(target_dim, dtype=np.float32)
padded[:len(normalized)] = normalized
return padded
def unnormalize_actions(actions, norm_stats, action_dim=6):
"""反归一化 actions(mean/std 方式)。"""
actions_mean = np.array(norm_stats["actions"]["mean"])
actions_std = np.array(norm_stats["actions"]["std"])
padded_mean = np.zeros(32, dtype=np.float32)
padded_std = np.zeros(32, dtype=np.float32)
padded_mean[:len(actions_mean)] = actions_mean
padded_std[:len(actions_std)] = actions_std
raw = actions * (padded_std + 1e-6) + padded_mean
return raw[:, :action_dim]
def normalize_image(image):
"""将 uint8 图像归一化到 [-1, 1] 范围,resize 到 224x224。"""
from PIL import Image as PILImage
if image.shape[:2] != (224, 224):
pil_img = PILImage.fromarray(image)
cur_w, cur_h = pil_img.size
ratio = max(cur_w / 224, cur_h / 224)
new_h, new_w = int(cur_h / ratio), int(cur_w / ratio)
resized = pil_img.resize((new_w, new_h), resample=PILImage.BILINEAR)
canvas = PILImage.new(resized.mode, (224, 224), 0)
pad_h = max(0, (224 - new_h) // 2)
pad_w = max(0, (224 - new_w) // 2)
canvas.paste(resized, (pad_w, pad_h))
image = np.array(canvas)
return image.astype(np.float32) / 255.0 * 2.0 - 1.0
def main():
# ===== 加载模型 =====
print("加载模型权重 ...")
with open(CHECKPOINT_PATH, 'rb') as f:
checkpoint = pickle.load(f)
print("初始化推理引擎 ...")
infer = Pi0Inference(checkpoint, num_views=NUM_VIEWS, chunk_size=CHUNK_SIZE)
print("模型就绪!")
# ===== 加载归一化统计量 =====
norm_stats = load_norm_stats(NORM_STATS_PATH)
# ===== 模拟输入(实际使用时替换为真实相机和关节数据)=====
# camera1: 主视角 (H, W, 3) uint8
camera1_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# camera2: 辅助视角 (H, W, 3) uint8
camera2_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# 关节状态 (6,): 5 关节角度 + 1 夹爪
joint_state = np.zeros(STATE_DIM, dtype=np.float32)
# ===== 预处理 =====
img1 = normalize_image(camera1_image)
img2 = normalize_image(camera2_image)
observation_images = torch.from_numpy(
np.stack([img1, img2], axis=0)
).to(torch.bfloat16).cuda()
observation_state = torch.from_numpy(
normalize_state(joint_state, norm_stats, target_dim=32)
).to(torch.bfloat16).cuda()
diffusion_noise = torch.randn(
CHUNK_SIZE, 32, dtype=torch.bfloat16, device="cuda"
)
# ===== 推理 =====
print("执行推理 ...")
with torch.no_grad():
raw_actions = infer.forward(observation_images, observation_state, diffusion_noise)
# ===== 后处理 =====
raw_actions_np = raw_actions.cpu().float().numpy()
actions = unnormalize_actions(raw_actions_np, norm_stats, ACTION_DIM)
print(f"\n推理结果:")
print(f" Action chunk shape: {actions.shape}")
print(f" 第一步 action: {actions[0]}")
print(f" Action 范围: [{actions.min():.4f}, {actions.max():.4f}]")
# ===== Benchmark =====
print("\n性能测试 ...")
torch.cuda.synchronize()
import time
start = time.perf_counter()
n_iters = 100
for _ in range(n_iters):
infer.forward(observation_images, observation_state, diffusion_noise)
torch.cuda.synchronize()
elapsed = (time.perf_counter() - start) / n_iters * 1000
print(f" 平均推理延迟: {elapsed:.1f} ms ({1000/elapsed:.0f} FPS)")
if __name__ == "__main__":
main()
|