| import hydra |
| import numpy as np |
| from tqdm import tqdm |
| from omegaconf import OmegaConf |
| from rlbench.backend.observation import Observation |
| from pfp import DATA_DIRS, set_seeds |
| from pfp.envs.rlbench_env import RLBenchEnv |
| from pfp.data.replay_buffer import RobotReplayBuffer |
| from pfp.common.visualization import RerunViewer as RV |
|
|
|
|
| |
| |
| @hydra.main(version_base=None, config_path="../conf", config_name="collect_demos_train") |
| def main(cfg: OmegaConf): |
| set_seeds(cfg.seed) |
| if not OmegaConf.has_resolver("eval"): |
| OmegaConf.register_new_resolver("eval", eval) |
| print(OmegaConf.to_yaml(cfg)) |
|
|
| assert cfg.mode in ["train", "valid"] |
| if cfg.env_config.vis: |
| RV("pfp_collect_demos") |
| env = RLBenchEnv(use_pc_color=True, **cfg.env_config) |
| if cfg.save_data: |
| data_path = DATA_DIRS.PFP / cfg.env_config.task_name / cfg.mode |
| if data_path.is_dir(): |
| print(f"ERROR: Data path {data_path} already exists! Exiting...") |
| return |
| replay_buffer = RobotReplayBuffer.create_from_path(data_path, mode="a") |
|
|
| for _ in tqdm(range(cfg.num_episodes)): |
| data_history = list() |
| demo = env.task.get_demos(1, live_demos=True)[0] |
| observations: list[Observation] = demo._observations |
| for obs in observations: |
| robot_state = env.get_robot_state(obs) |
| images = env.get_images(obs) |
| pcd = env.get_pcd(obs) |
| pcd_xyz = np.asarray(pcd.points) |
| pcd_color = np.asarray(pcd.colors) |
| data_history.append( |
| { |
| "pcd_xyz": pcd_xyz.astype(np.float32), |
| "pcd_color": (pcd_color * 255).astype(np.uint8), |
| "robot_state": robot_state.astype(np.float32), |
| "images": images, |
| } |
| ) |
| env.vis_step(robot_state, np.concatenate((pcd_xyz, pcd_color), axis=-1)) |
|
|
| if cfg.save_data: |
| replay_buffer.add_episode_from_list(data_history, compressors="disk") |
| print(f"Saved episode with {len(data_history)} steps to disk.") |
|
|
| |
| |
| return |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|