anyrobot_bimanual / demo_deploy.py
returnzeros's picture
Update demo_deploy.py
6921fb0 verified
from control_joints import *
from read_joints import *
from realsense import *
import os
import cv2
import numpy as np
import torch
import time
import argparse
import sys
import json
# get current workspace
sys.path.insert(0, 'policy/ACT_DP_multitask') # Add the path to the policy directory
from policy_anyrobot import ACTDiffusionPolicy
from t5_encoder import T5Embedder
from utils import convert_weight
import yaml
def get_image(cameras):
"""
Get the latest images from all cameras and return them as a dictionary.
"""
obs_image = dict()
for i in range(3):
for cam in cameras:
color_image = cam.get_latest_image() # Get the latest image from the camera BGR 0-255 H W 3
if color_image is not None:
obs_image[cam.name] = color_image[:,:,::-1] # BGR to RGB conversion 0-255 H W 3
# cv save
filename = f"{cam.name}_image_{i}.png"
cv2.imwrite(filename, color_image)
print(f"Saved image: {filename}")
return obs_image # Return the dictionary containing images from all cameras
def get_observation(obs_image, joint):
# for key, value in obs_image.items():
# print(f"Camera: {key}, Image shape: {value.shape} , Image: {value.max()}") # Debugging line to check image shapes
observation = dict()
observation['images'] = dict()
observation['images']['cam_high'] = np.moveaxis(obs_image['head_camera'], -1, 0) # rgb H W C -> C H W
observation['images']['cam_left_wrist']= np.moveaxis(obs_image['left_camera'], -1, 0) # rgb H W C -> C H W
observation['images']['cam_right_wrist'] = np.moveaxis(obs_image['right_camera'], -1, 0) # rgb H W C -> C H W
observation['qpos'] = joint
return observation
def encode_obs(observations, camera_names):
obs_img = []
for camera_name in camera_names: # ['cam_high', 'cam_left_wrist', 'cam_right_wrist']
obs_img.append(observations['images'][camera_name])
obs_img = np.stack(obs_img, axis=0) # shape: (N_views, H, W, C) 0-255 rgb
image_data = torch.from_numpy(obs_img).unsqueeze(0).float() / 255.0 # Normalize to [0, 1] and add history dimension
qpos_data = torch.from_numpy(np.array(observations['qpos'], dtype=np.float32)).unsqueeze(0) # shape: (1, 14)
return image_data, qpos_data # no batch dimension
def get_model(config_file, ckpt_file, device):
with open(config_file, "r", encoding="utf-8") as file:
policy_config = json.load(file)
print(f"Loading policy config from {config_file}")
policy = ACTDiffusionPolicy(policy_config)
print(f"Loading model from {ckpt_file}")
policy.load_state_dict(convert_weight(torch.load(ckpt_file, weights_only=False)["state_dict"]))
policy.to(device)
policy.eval()
stats = torch.load(ckpt_file, weights_only=False)["stats"]
print('Resetting observation normalization stats')
policy.reset_obs(stats, norm_type = policy_config["norm_type"])
camera_names = policy_config["camera_names"]
return policy, camera_names
def get_language_encoder(device):
MODEL_PATH ='/data/gjx/.cache/huggingface/hub/models--google--t5-v1_1-xxl/snapshots/3db67ab1af984cf10548a73467f0e5bca2aaaeb2'
# MODEL_PATH = 'policy/weights/RDT/t5-v1_1-xxl'
CONFIG_PATH = os.path.join('policy/ACT_DP_multitask',"base.yaml")
with open(CONFIG_PATH, "r") as fp:
config = yaml.safe_load(fp)
text_embedder = T5Embedder(
from_pretrained=MODEL_PATH,
model_max_length=config["dataset"]["tokenizer_max_length"],
device=device,
use_offload_folder=None,
)
tokenizer, text_encoder = text_embedder.tokenizer, text_embedder.model
return tokenizer, text_encoder
def get_language_embed(tokenizer, text_encoder, language_instruction, device):
tokens = tokenizer(
language_instruction, return_tensors="pt",
padding="longest",
truncation=True
)["input_ids"].to(device)
tokens = tokens.view(1, -1)
with torch.no_grad():
pred = text_encoder(tokens).last_hidden_state.detach().cpu()
return pred.squeeze(0).mean(0) # shape: (hidden_size) cpu tensor
def test_code():
print("Testing the code...") # Dummy test function to ensure the script runs without errors
# Dummy test function to ensure the script runs without errors
# reader = JointReader(left_can="can_left_1", right_can="can_right_1") # ��ȡ�ؽڶ���
# cameras = [
# RealSenseCam("250122079815", "left_camera"),
# RealSenseCam("048522073543", "head_camera"),
# RealSenseCam("030522070109", "right_camera"),
# ]
# left_can, right_can = "can_left_2", "can_right_2"
# ctx = rs.context()
# if len(ctx.devices) > 0:
# print("Found RealSense devices:")
# for d in ctx.devices:
# # ��ȡ�豸�����ƺ����к�
# name = d.get_info(rs.camera_info.name)
# serial_number = d.get_info(rs.camera_info.serial_number)
# print(f"Device: {name}, Serial Number: {serial_number}")
# else:
# print("No Intel RealSense devices connected")
# for cam in cameras:
# cam.start()
# for i in range(20):
# print(f"Warm up: {i}", end="\r")
# for cam in cameras:
# color_image = cam.get_latest_image()
# time.sleep(0.15)
device = 'cpu'
ckpt_dir = 'policy/ACT_DP_multitask/checkpoints/real_fintune_50_2000/act_dp'
config_path = os.path.join(ckpt_dir, 'policy_config.json')
ckpt_path = os.path.join(ckpt_dir, 'policy_lastest_seed_0.ckpt')
policy, cameras_name = get_model(config_path, ckpt_path, device)
instruction_file = 'instruction.txt'
with open(instruction_file, 'r') as f:
instruction = f.readline().strip().strip('"')
print(f"Instruction: {instruction}")
tokenizer, text_encoder = get_language_encoder(device) # ��ȡ���Ա�����
task_emb = get_language_embed(tokenizer, text_encoder, instruction, device) # ��ȡ����Ƕ�� temsor D
controller = ControlJoints(left_can=left_can, right_can=right_can)
image = torch.rand(1, 3, 3, 480, 640) # Dummy image tensor
qpos = torch.rand(1, 14) # Dummy joint position tensor
actions = policy.get_action(qpos.float().to(device), image.float().to(device), task_emb.float().to(device))
print(f"Actions shape: {actions.shape}")
if __name__ == "__main__":
# test_code() # Uncomment to run the test function
# print("Testing the done...")
# exit()
print(torch.backends.cudnn.enabled)
print(torch.backends.cudnn.version())
print(torch.version.cuda)
print(torch.__version__)
# �������
parser = argparse.ArgumentParser(description="Deploy the action for a specific player")
parser.add_argument("--ckpt_path", type=str, default='policy/ACT_DP_multitask/checkpoints/real_fintune_50_2000/act_dp')
# ��ȡ�������� PLAYER
player_value = os.getenv("PLAYER")
# ��黷�������Ƿ������������
if player_value is None:
raise ValueError("�������� PLAYER ���")
try:
player_value = int(player_value)
except ValueError:
raise ValueError("�������� PLAYER ������һ������")
# ���� PLAYER ��ִֵ�в�ͬ�IJ���
if player_value == 1:
print("Player 1")
cameras = [
RealSenseCam("337322073280", "left_camera"),
RealSenseCam("337322074191", "head_camera"),
RealSenseCam("337122072617", "right_camera"),
]
left_can, right_can = "can_left_1", "can_right_1"
elif player_value == 2:
print("Player 2")
cameras = [
RealSenseCam("250122079815", "left_camera"),
RealSenseCam("048522073543", "head_camera"),
RealSenseCam("030522070109", "right_camera"),
]
left_can, right_can = "can_left_2", "can_right_2"
else:
raise ValueError("PLAYER ֵ��Ч�������� 1 �� 2")
reader = JointReader(left_can=left_can, right_can=right_can) # ��ȡ�ؽڶ���
# ==== Get RGB ====
# ���������Ķ������ڹ����������ӵ� RealSense �豸
ctx = rs.context()
# ����Ƿ����豸����
if len(ctx.devices) > 0:
print("Found RealSense devices:")
for d in ctx.devices:
# ��ȡ�豸�����ƺ����к�
name = d.get_info(rs.camera_info.name)
serial_number = d.get_info(rs.camera_info.serial_number)
print(f"Device: {name}, Serial Number: {serial_number}")
else:
print("No Intel RealSense devices connected")
# �����������
for cam in cameras:
cam.start()
# Ԥ�����
for i in range(10):
print(f"Warm up: {i}", end="\r")
for cam in cameras:
color_image = cam.get_latest_image()
time.sleep(0.15)
# Load model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device ='cpu'
ckpt_dir = parser.parse_args().ckpt_path
config_path = os.path.join(ckpt_dir, 'policy_config.json')
# ckpt_path = os.path.join(ckpt_dir, 'policy_lastest_seed_0.ckpt') # policy_epoch_600_seed_0.ckpt
ckpt_path = os.path.join(ckpt_dir, 'policy_epoch_600_seed_0.ckpt')
policy, camera_names = get_model(config_path, ckpt_path, device) # ��ȡģ��
print('camera_names:', camera_names)
# Get instructions
instruction_file = 'instruction.txt'
with open(instruction_file, 'r') as f:
instruction = f.readline().strip()
print(f"Using instruction: {instruction}")
tokenizer, text_encoder = get_language_encoder(device) # ��ȡ���Ա�����
print('Loading language tokenizer and encoder...')
task_emb = get_language_embed(tokenizer, text_encoder, instruction, device) # ��ȡ����Ƕ�� temsor D
# # ==== Get Observation ====
controller = ControlJoints(left_can=left_can, right_can=right_can)
max_timestep = 600
step = 0
while step < max_timestep:
obs_image = get_image(cameras)
joint = reader.get_joint_value()
observation = get_observation(obs_image, joint)
image, qpos = encode_obs(observation, camera_names)
actions = policy.get_action(qpos.float().to(device), image.float().to(device), task_emb.float().to(device))
print(f"Step: {step}/{max_timestep}, Action: {actions.shape}")
for action in actions[0:30]: # ִ��ÿ������
controller.control(action)
step += 1
# import pdb; pdb.set_trace() # Debugging line to pause execution
time.sleep(0.05)
# joint = actions[-1]
# obs = dict()
# for i in range(3):
# for cam in cameras:
# color_image = cam.get_latest_image()
# if color_image is not None:
# # ����ͼ��
# obs[cam] = color_image
# # filename = f"{cam.name}_image_{i}.png"
# # cv2.imwrite(filename, color_image)
# # print(f"Saved image: {filename}")
# # ==== Get Joint ====
# reader = JointReader(left_can=left_can, right_can=right_can)
# print(reader.get_joint_value())
# # ==== Deploy Action ====
# controller = ControlJoints(left_can=left_can, right_can=right_can)
# for i in range(10):
# positions = [0] * 14
# controller.control(positions)
# time.sleep(0.1)