lsnu's picture
Add files using upload-large-folder tool
912c7e2 verified
from __future__ import annotations
import torch
import numpy as np
import pypose as pp
from diffusion_policy.common.sampler import SequenceSampler
from pfp.data.replay_buffer import RobotReplayBuffer
from pfp.common.se3_utils import transform_th
from pfp import DATA_DIRS
def rand_range(low: float, high: float, size: tuple[int], device) -> torch.Tensor:
return torch.rand(size, device=device) * (high - low) + low
def augment_pcd_data(batch: tuple[torch.Tensor, ...]) -> tuple[torch.Tensor, ...]:
pcd, robot_state_obs, robot_state_pred = batch
BT_robot_obs = robot_state_obs.shape[:-1]
BT_robot_pred = robot_state_pred.shape[:-1]
# sigma=(sigma_transl, sigma_rot_rad)
transform = pp.randn_SE3(sigma=(0.1, 0.2), device=pcd.device).matrix()
pcd[..., :3] = transform_th(transform, pcd[..., :3])
robot_obs_pseudoposes = robot_state_obs[..., :9].reshape(*BT_robot_obs, 3, 3)
robot_pred_pseudoposes = robot_state_pred[..., :9].reshape(*BT_robot_pred, 3, 3)
robot_obs_pseudoposes = transform_th(transform, robot_obs_pseudoposes)
robot_pred_pseudoposes = transform_th(transform, robot_pred_pseudoposes)
robot_state_obs[..., :9] = robot_obs_pseudoposes.reshape(*BT_robot_obs, 9)
robot_state_pred[..., :9] = robot_pred_pseudoposes.reshape(*BT_robot_pred, 9)
# We shuffle the points, i.e. shuffle pcd along dim=2 (B, T, P, 3)
idx = torch.randperm(pcd.shape[2])
pcd = pcd[:, :, idx, :]
return pcd, robot_state_obs, robot_state_pred
class RobotDatasetPcd(torch.utils.data.Dataset):
def __init__(
self,
data_path: str,
n_obs_steps: int,
n_pred_steps: int,
use_pc_color: bool,
n_points: int,
subs_factor: int = 1, # 1 means no subsampling
) -> None:
"""
To me it makes sense that sequence_length == n_obs_steps + n_prediction_steps
"""
replay_buffer = RobotReplayBuffer.create_from_path(data_path, mode="r")
data_keys = ["robot_state", "pcd_xyz"]
data_key_first_k = {"pcd_xyz": n_obs_steps * subs_factor}
if use_pc_color:
data_keys.append("pcd_color")
data_key_first_k["pcd_color"] = n_obs_steps * subs_factor
self.sampler = SequenceSampler(
replay_buffer=replay_buffer,
sequence_length=(n_obs_steps + n_pred_steps) * subs_factor - (subs_factor - 1),
pad_before=(n_obs_steps - 1) * subs_factor,
pad_after=(n_pred_steps - 1) * subs_factor + (subs_factor - 1),
keys=data_keys,
key_first_k=data_key_first_k,
)
self.n_obs_steps = n_obs_steps
self.n_prediction_steps = n_pred_steps
self.subs_factor = subs_factor
self.use_pc_color = use_pc_color
self.n_points = n_points
self.rng = np.random.default_rng()
return
def __len__(self) -> int:
return len(self.sampler)
def __getitem__(self, idx: int) -> tuple[torch.Tensor, ...]:
sample: dict[str, np.ndarray] = self.sampler.sample_sequence(idx)
cur_step_i = self.n_obs_steps * self.subs_factor
pcd = sample["pcd_xyz"][: cur_step_i : self.subs_factor]
if self.use_pc_color:
pcd_color = sample["pcd_color"][: cur_step_i : self.subs_factor]
pcd_color = pcd_color.astype(np.float32) / 255.0
pcd = np.concatenate([pcd, pcd_color], axis=-1)
robot_state_obs = sample["robot_state"][: cur_step_i : self.subs_factor].astype(np.float32)
robot_state_pred = sample["robot_state"][cur_step_i :: self.subs_factor].astype(np.float32)
# Random sample pcd points
if pcd.shape[1] > self.n_points:
random_indices = np.random.choice(pcd.shape[1], self.n_points, replace=False)
pcd = pcd[:, random_indices]
return pcd, robot_state_obs, robot_state_pred
if __name__ == "__main__":
dataset = RobotDatasetPcd(
data_path=DATA_DIRS.PFP / "open_fridge" / "train",
n_obs_steps=2,
n_pred_steps=8,
subs_factor=5,
use_pc_color=False,
n_points=4096,
)
i = 20
obs, robot_state_obs, robot_state_pred = dataset[i]
print("robot_state_obs: ", robot_state_obs)
print("robot_state_pred: ", robot_state_pred)
print("done")