pi0-rtc-cube-realtime-vla / infer_example.py
Lakesenberg's picture
Upload folder using huggingface_hub
7995596 verified
"""
Pi0 RTC Cube 模型推理示例(基于 realtime-vla Triton 加速)
在 RTX 4090 上运行,使用 2 个相机视角(camera1 主视角 + camera2 辅助视角)。
模型输出 50 步 action chunk,每步 32 维(前 6 维有效:5 关节 + 1 夹爪)。
依赖:
pip install torch triton numpy
用法:
python infer_example.py
"""
import json
import pickle
import numpy as np
import torch
from pi0_infer import Pi0Inference
# ===== 配置 =====
CHECKPOINT_PATH = "converted_checkpoint.pkl"
NORM_STATS_PATH = "norm_stats.json"
NUM_VIEWS = 2 # camera1(主视角)+ camera2(辅助视角)
CHUNK_SIZE = 50 # action horizon
ACTION_DIM = 6 # 5 关节 + 1 夹爪
STATE_DIM = 6 # 原始 state 维度
def load_norm_stats(path):
"""加载归一化统计量。"""
with open(path, 'r') as f:
data = json.load(f)
return data.get('norm_stats', data)
def normalize_state(state, norm_stats, target_dim=32):
"""使用 mean/std 归一化 state,并 pad 到 target_dim。"""
state_mean = np.array(norm_stats["state"]["mean"])
state_std = np.array(norm_stats["state"]["std"])
normalized = (state - state_mean) / (state_std + 1e-6)
padded = np.zeros(target_dim, dtype=np.float32)
padded[:len(normalized)] = normalized
return padded
def unnormalize_actions(actions, norm_stats, action_dim=6):
"""反归一化 actions(mean/std 方式)。"""
actions_mean = np.array(norm_stats["actions"]["mean"])
actions_std = np.array(norm_stats["actions"]["std"])
padded_mean = np.zeros(32, dtype=np.float32)
padded_std = np.zeros(32, dtype=np.float32)
padded_mean[:len(actions_mean)] = actions_mean
padded_std[:len(actions_std)] = actions_std
raw = actions * (padded_std + 1e-6) + padded_mean
return raw[:, :action_dim]
def normalize_image(image):
"""将 uint8 图像归一化到 [-1, 1] 范围,resize 到 224x224。"""
from PIL import Image as PILImage
if image.shape[:2] != (224, 224):
pil_img = PILImage.fromarray(image)
cur_w, cur_h = pil_img.size
ratio = max(cur_w / 224, cur_h / 224)
new_h, new_w = int(cur_h / ratio), int(cur_w / ratio)
resized = pil_img.resize((new_w, new_h), resample=PILImage.BILINEAR)
canvas = PILImage.new(resized.mode, (224, 224), 0)
pad_h = max(0, (224 - new_h) // 2)
pad_w = max(0, (224 - new_w) // 2)
canvas.paste(resized, (pad_w, pad_h))
image = np.array(canvas)
return image.astype(np.float32) / 255.0 * 2.0 - 1.0
def main():
# ===== 加载模型 =====
print("加载模型权重 ...")
with open(CHECKPOINT_PATH, 'rb') as f:
checkpoint = pickle.load(f)
print("初始化推理引擎 ...")
infer = Pi0Inference(checkpoint, num_views=NUM_VIEWS, chunk_size=CHUNK_SIZE)
print("模型就绪!")
# ===== 加载归一化统计量 =====
norm_stats = load_norm_stats(NORM_STATS_PATH)
# ===== 模拟输入(实际使用时替换为真实相机和关节数据)=====
# camera1: 主视角 (H, W, 3) uint8
camera1_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# camera2: 辅助视角 (H, W, 3) uint8
camera2_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# 关节状态 (6,): 5 关节角度 + 1 夹爪
joint_state = np.zeros(STATE_DIM, dtype=np.float32)
# ===== 预处理 =====
img1 = normalize_image(camera1_image)
img2 = normalize_image(camera2_image)
observation_images = torch.from_numpy(
np.stack([img1, img2], axis=0)
).to(torch.bfloat16).cuda()
observation_state = torch.from_numpy(
normalize_state(joint_state, norm_stats, target_dim=32)
).to(torch.bfloat16).cuda()
diffusion_noise = torch.randn(
CHUNK_SIZE, 32, dtype=torch.bfloat16, device="cuda"
)
# ===== 推理 =====
print("执行推理 ...")
with torch.no_grad():
raw_actions = infer.forward(observation_images, observation_state, diffusion_noise)
# ===== 后处理 =====
raw_actions_np = raw_actions.cpu().float().numpy()
actions = unnormalize_actions(raw_actions_np, norm_stats, ACTION_DIM)
print(f"\n推理结果:")
print(f" Action chunk shape: {actions.shape}")
print(f" 第一步 action: {actions[0]}")
print(f" Action 范围: [{actions.min():.4f}, {actions.max():.4f}]")
# ===== Benchmark =====
print("\n性能测试 ...")
torch.cuda.synchronize()
import time
start = time.perf_counter()
n_iters = 100
for _ in range(n_iters):
infer.forward(observation_images, observation_state, diffusion_noise)
torch.cuda.synchronize()
elapsed = (time.perf_counter() - start) / n_iters * 1000
print(f" 平均推理延迟: {elapsed:.1f} ms ({1000/elapsed:.0f} FPS)")
if __name__ == "__main__":
main()