File size: 4,876 Bytes
7995596
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Pi0 RTC Cube 模型推理示例(基于 realtime-vla Triton 加速)

在 RTX 4090 上运行,使用 2 个相机视角(camera1 主视角 + camera2 辅助视角)。
模型输出 50 步 action chunk,每步 32 维(前 6 维有效:5 关节 + 1 夹爪)。

依赖:
    pip install torch triton numpy

用法:
    python infer_example.py
"""

import json
import pickle
import numpy as np
import torch
from pi0_infer import Pi0Inference

# ===== 配置 =====
CHECKPOINT_PATH = "converted_checkpoint.pkl"
NORM_STATS_PATH = "norm_stats.json"
NUM_VIEWS = 2       # camera1(主视角)+ camera2(辅助视角)
CHUNK_SIZE = 50      # action horizon
ACTION_DIM = 6       # 5 关节 + 1 夹爪
STATE_DIM = 6        # 原始 state 维度


def load_norm_stats(path):
    """加载归一化统计量。"""
    with open(path, 'r') as f:
        data = json.load(f)
    return data.get('norm_stats', data)


def normalize_state(state, norm_stats, target_dim=32):
    """使用 mean/std 归一化 state,并 pad 到 target_dim。"""
    state_mean = np.array(norm_stats["state"]["mean"])
    state_std = np.array(norm_stats["state"]["std"])
    normalized = (state - state_mean) / (state_std + 1e-6)
    padded = np.zeros(target_dim, dtype=np.float32)
    padded[:len(normalized)] = normalized
    return padded


def unnormalize_actions(actions, norm_stats, action_dim=6):
    """反归一化 actions(mean/std 方式)。"""
    actions_mean = np.array(norm_stats["actions"]["mean"])
    actions_std = np.array(norm_stats["actions"]["std"])
    padded_mean = np.zeros(32, dtype=np.float32)
    padded_std = np.zeros(32, dtype=np.float32)
    padded_mean[:len(actions_mean)] = actions_mean
    padded_std[:len(actions_std)] = actions_std
    raw = actions * (padded_std + 1e-6) + padded_mean
    return raw[:, :action_dim]


def normalize_image(image):
    """将 uint8 图像归一化到 [-1, 1] 范围,resize 到 224x224。"""
    from PIL import Image as PILImage
    if image.shape[:2] != (224, 224):
        pil_img = PILImage.fromarray(image)
        cur_w, cur_h = pil_img.size
        ratio = max(cur_w / 224, cur_h / 224)
        new_h, new_w = int(cur_h / ratio), int(cur_w / ratio)
        resized = pil_img.resize((new_w, new_h), resample=PILImage.BILINEAR)
        canvas = PILImage.new(resized.mode, (224, 224), 0)
        pad_h = max(0, (224 - new_h) // 2)
        pad_w = max(0, (224 - new_w) // 2)
        canvas.paste(resized, (pad_w, pad_h))
        image = np.array(canvas)
    return image.astype(np.float32) / 255.0 * 2.0 - 1.0


def main():
    # ===== 加载模型 =====
    print("加载模型权重 ...")
    with open(CHECKPOINT_PATH, 'rb') as f:
        checkpoint = pickle.load(f)
    print("初始化推理引擎 ...")
    infer = Pi0Inference(checkpoint, num_views=NUM_VIEWS, chunk_size=CHUNK_SIZE)
    print("模型就绪!")

    # ===== 加载归一化统计量 =====
    norm_stats = load_norm_stats(NORM_STATS_PATH)

    # ===== 模拟输入(实际使用时替换为真实相机和关节数据)=====
    # camera1: 主视角 (H, W, 3) uint8
    camera1_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
    # camera2: 辅助视角 (H, W, 3) uint8
    camera2_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
    # 关节状态 (6,): 5 关节角度 + 1 夹爪
    joint_state = np.zeros(STATE_DIM, dtype=np.float32)

    # ===== 预处理 =====
    img1 = normalize_image(camera1_image)
    img2 = normalize_image(camera2_image)

    observation_images = torch.from_numpy(
        np.stack([img1, img2], axis=0)
    ).to(torch.bfloat16).cuda()

    observation_state = torch.from_numpy(
        normalize_state(joint_state, norm_stats, target_dim=32)
    ).to(torch.bfloat16).cuda()

    diffusion_noise = torch.randn(
        CHUNK_SIZE, 32, dtype=torch.bfloat16, device="cuda"
    )

    # ===== 推理 =====
    print("执行推理 ...")
    with torch.no_grad():
        raw_actions = infer.forward(observation_images, observation_state, diffusion_noise)

    # ===== 后处理 =====
    raw_actions_np = raw_actions.cpu().float().numpy()
    actions = unnormalize_actions(raw_actions_np, norm_stats, ACTION_DIM)

    print(f"\n推理结果:")
    print(f"  Action chunk shape: {actions.shape}")
    print(f"  第一步 action: {actions[0]}")
    print(f"  Action 范围: [{actions.min():.4f}, {actions.max():.4f}]")

    # ===== Benchmark =====
    print("\n性能测试 ...")
    torch.cuda.synchronize()
    import time
    start = time.perf_counter()
    n_iters = 100
    for _ in range(n_iters):
        infer.forward(observation_images, observation_state, diffusion_noise)
    torch.cuda.synchronize()
    elapsed = (time.perf_counter() - start) / n_iters * 1000
    print(f"  平均推理延迟: {elapsed:.1f} ms ({1000/elapsed:.0f} FPS)")


if __name__ == "__main__":
    main()