| import torch
|
| from torch.utils import data
|
| import numpy as np
|
| from os.path import join as pjoin
|
| import random
|
| import codecs as cs
|
| from tqdm import tqdm
|
|
|
|
|
|
|
| class VQMotionDataset(data.Dataset):
|
| def __init__(self, dataset_name, window_size = 64, unit_length = 4):
|
| self.window_size = window_size
|
| self.unit_length = unit_length
|
| self.dataset_name = dataset_name
|
|
|
| if dataset_name == 't2m':
|
| self.data_root = './dataset/Sample1'
|
| self.motion_dir = pjoin(self.data_root, 'new_joint_vecs')
|
| self.text_dir = pjoin(self.data_root, 'texts')
|
| self.joints_num = 22
|
| self.max_motion_length = 196
|
| self.meta_dir = 'checkpoints/t2m/VQVAEV3_CB1024_CMT_H1024_NRES3/meta'
|
|
|
| elif dataset_name == 'kit':
|
| self.data_root = './dataset/KIT-ML'
|
| self.motion_dir = pjoin(self.data_root, 'new_joint_vecs')
|
| self.text_dir = pjoin(self.data_root, 'texts')
|
| self.joints_num = 21
|
|
|
| self.max_motion_length = 196
|
| self.meta_dir = 'checkpoints/kit/VQVAEV3_CB1024_CMT_H1024_NRES3/meta'
|
|
|
| joints_num = self.joints_num
|
|
|
| mean = np.load(pjoin(self.meta_dir, 'mean.npy'))
|
| std = np.load(pjoin(self.meta_dir, 'std.npy'))
|
|
|
| split_file = pjoin(self.data_root, 'train.txt')
|
|
|
| self.data = []
|
| self.lengths = []
|
| id_list = []
|
| with cs.open(split_file, 'r') as f:
|
| for line in f.readlines():
|
| id_list.append(line.strip())
|
|
|
| for name in tqdm(id_list):
|
| try:
|
| motion = np.load(pjoin(self.motion_dir, name + '.npy'))
|
| if motion.shape[0] < self.window_size:
|
| continue
|
| self.lengths.append(motion.shape[0] - self.window_size)
|
| self.data.append(motion)
|
| except:
|
|
|
| pass
|
|
|
|
|
| self.mean = mean
|
| self.std = std
|
| print("Total number of motions {}".format(len(self.data)))
|
|
|
| def inv_transform(self, data):
|
| return data * self.std + self.mean
|
|
|
| def compute_sampling_prob(self) :
|
|
|
| prob = np.array(self.lengths, dtype=np.float32)
|
| prob /= np.sum(prob)
|
| return prob
|
|
|
| def __len__(self):
|
| return len(self.data)
|
|
|
| def __getitem__(self, item):
|
| motion = self.data[item]
|
|
|
| idx = random.randint(0, len(motion) - self.window_size)
|
|
|
| motion = motion[idx:idx+self.window_size]
|
| "Z Normalization"
|
| motion = (motion - self.mean) / self.std
|
|
|
| return motion
|
|
|
| def DATALoader(dataset_name,
|
| batch_size,
|
| num_workers = 8,
|
| window_size = 64,
|
| unit_length = 4):
|
|
|
| trainSet = VQMotionDataset(dataset_name, window_size=window_size, unit_length=unit_length)
|
| prob = trainSet.compute_sampling_prob()
|
| sampler = torch.utils.data.WeightedRandomSampler(prob, num_samples = len(trainSet) * 1000, replacement=True)
|
| train_loader = torch.utils.data.DataLoader(trainSet,
|
| batch_size,
|
| shuffle=True,
|
|
|
| num_workers=num_workers,
|
|
|
| drop_last = True)
|
|
|
| return train_loader
|
|
|
| def cycle(iterable):
|
| while True:
|
| for x in iterable:
|
| yield x
|
|
|