| import torch |
| from torch.utils import data |
| import numpy as np |
| from os.path import join as pjoin |
| import random |
| import codecs as cs |
| from tqdm import tqdm |
|
|
|
|
|
|
| class VQMotionDataset(data.Dataset): |
| def __init__(self, dataset_name, window_size = 64, unit_length = 4): |
| self.window_size = window_size |
| self.unit_length = unit_length |
| self.dataset_name = dataset_name |
|
|
| if dataset_name == 't2m': |
| self.data_root = './dataset/HumanML3D' |
| self.motion_dir = pjoin(self.data_root, 'new_joint_vecs') |
| self.text_dir = pjoin(self.data_root, 'texts') |
| self.joints_num = 22 |
| self.max_motion_length = 196 |
| self.meta_dir = 'checkpoints/t2m/VQVAEV3_CB1024_CMT_H1024_NRES3/meta' |
|
|
| elif dataset_name == 'kit': |
| self.data_root = './dataset/KIT-ML' |
| self.motion_dir = pjoin(self.data_root, 'new_joint_vecs') |
| self.text_dir = pjoin(self.data_root, 'texts') |
| self.joints_num = 21 |
|
|
| self.max_motion_length = 196 |
| self.meta_dir = 'checkpoints/kit/VQVAEV3_CB1024_CMT_H1024_NRES3/meta' |
| |
| joints_num = self.joints_num |
|
|
| mean = np.load(pjoin(self.meta_dir, 'mean.npy')) |
| std = np.load(pjoin(self.meta_dir, 'std.npy')) |
|
|
| split_file = pjoin(self.data_root, 'train.txt') |
|
|
| self.data = [] |
| self.lengths = [] |
| id_list = [] |
| with cs.open(split_file, 'r') as f: |
| for line in f.readlines(): |
| id_list.append(line.strip()) |
|
|
| for name in tqdm(id_list): |
| try: |
| motion = np.load(pjoin(self.motion_dir, name + '.npy')) |
| if motion.shape[0] < self.window_size: |
| continue |
| self.lengths.append(motion.shape[0] - self.window_size) |
| self.data.append(motion) |
| except: |
| |
| pass |
|
|
| |
| self.mean = mean |
| self.std = std |
| print("Total number of motions {}".format(len(self.data))) |
|
|
| def inv_transform(self, data): |
| return data * self.std + self.mean |
| |
| def compute_sampling_prob(self) : |
| |
| prob = np.array(self.lengths, dtype=np.float32) |
| prob /= np.sum(prob) |
| return prob |
| |
| def __len__(self): |
| return len(self.data) |
|
|
| def __getitem__(self, item): |
| motion = self.data[item] |
| |
| idx = random.randint(0, len(motion) - self.window_size) |
|
|
| motion = motion[idx:idx+self.window_size] |
| "Z Normalization" |
| motion = (motion - self.mean) / self.std |
|
|
| return motion |
|
|
| def DATALoader(dataset_name, |
| batch_size, |
| num_workers = 8, |
| window_size = 64, |
| unit_length = 4): |
| |
| trainSet = VQMotionDataset(dataset_name, window_size=window_size, unit_length=unit_length) |
| prob = trainSet.compute_sampling_prob() |
| sampler = torch.utils.data.WeightedRandomSampler(prob, num_samples = len(trainSet) * 1000, replacement=True) |
| train_loader = torch.utils.data.DataLoader(trainSet, |
| batch_size, |
| shuffle=True, |
| |
| num_workers=num_workers, |
| |
| drop_last = True) |
| |
| return train_loader |
|
|
| def cycle(iterable): |
| while True: |
| for x in iterable: |
| yield x |
|
|