|
|
import torch
|
|
|
from torch.utils import data
|
|
|
import numpy as np
|
|
|
from os.path import join as pjoin
|
|
|
import random
|
|
|
import codecs as cs
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
import utils.paramUtil as paramUtil
|
|
|
from torch.utils.data._utils.collate import default_collate
|
|
|
|
|
|
|
|
|
def collate_fn(batch):
|
|
|
batch.sort(key=lambda x: x[3], reverse=True)
|
|
|
return default_collate(batch)
|
|
|
|
|
|
|
|
|
'''For use of training text-2-motion generative model'''
|
|
|
class Text2MotionDataset(data.Dataset):
|
|
|
def __init__(self, dataset_name, is_test, w_vectorizer, feat_bias = 5, max_text_len = 20, unit_length = 4):
|
|
|
|
|
|
self.max_length = 20
|
|
|
self.pointer = 0
|
|
|
self.dataset_name = dataset_name
|
|
|
self.is_test = is_test
|
|
|
self.max_text_len = max_text_len
|
|
|
self.unit_length = unit_length
|
|
|
self.w_vectorizer = w_vectorizer
|
|
|
if dataset_name == 't2m':
|
|
|
self.data_root = './dataset/Sample1'
|
|
|
self.motion_dir = pjoin(self.data_root, 'new_joint_vecs')
|
|
|
self.text_dir = pjoin(self.data_root, 'texts')
|
|
|
self.joints_num = 22
|
|
|
radius = 4
|
|
|
fps = 20
|
|
|
self.max_motion_length = 196
|
|
|
dim_pose = 263
|
|
|
kinematic_chain = paramUtil.t2m_kinematic_chain
|
|
|
self.meta_dir = 'checkpoints/t2m/VQVAEV3_CB1024_CMT_H1024_NRES3/meta'
|
|
|
elif dataset_name == 'kit':
|
|
|
self.data_root = './dataset/KIT-ML'
|
|
|
self.motion_dir = pjoin(self.data_root, 'new_joint_vecs')
|
|
|
self.text_dir = pjoin(self.data_root, 'texts')
|
|
|
self.joints_num = 21
|
|
|
radius = 240 * 8
|
|
|
fps = 12.5
|
|
|
dim_pose = 251
|
|
|
self.max_motion_length = 196
|
|
|
kinematic_chain = paramUtil.kit_kinematic_chain
|
|
|
self.meta_dir = 'checkpoints/kit/VQVAEV3_CB1024_CMT_H1024_NRES3/meta'
|
|
|
|
|
|
mean = np.load(pjoin(self.meta_dir, 'mean.npy'))
|
|
|
std = np.load(pjoin(self.meta_dir, 'std.npy'))
|
|
|
|
|
|
if is_test:
|
|
|
split_file = pjoin(self.data_root, 'test.txt')
|
|
|
else:
|
|
|
split_file = pjoin(self.data_root, 'val.txt')
|
|
|
|
|
|
min_motion_len = 40 if self.dataset_name =='t2m' else 24
|
|
|
|
|
|
|
|
|
joints_num = self.joints_num
|
|
|
|
|
|
data_dict = {}
|
|
|
id_list = []
|
|
|
with cs.open(split_file, 'r') as f:
|
|
|
for line in f.readlines():
|
|
|
id_list.append(line.strip())
|
|
|
|
|
|
new_name_list = []
|
|
|
length_list = []
|
|
|
for name in tqdm(id_list):
|
|
|
try:
|
|
|
motion = np.load(pjoin(self.motion_dir, name + '.npy'))
|
|
|
if (len(motion)) < min_motion_len or (len(motion) >= 200):
|
|
|
continue
|
|
|
text_data = []
|
|
|
flag = False
|
|
|
with cs.open(pjoin(self.text_dir, name + '.txt')) as f:
|
|
|
for line in f.readlines():
|
|
|
text_dict = {}
|
|
|
line_split = line.strip().split('#')
|
|
|
caption = line_split[0]
|
|
|
tokens = line_split[1].split(' ')
|
|
|
f_tag = float(line_split[2])
|
|
|
to_tag = float(line_split[3])
|
|
|
f_tag = 0.0 if np.isnan(f_tag) else f_tag
|
|
|
to_tag = 0.0 if np.isnan(to_tag) else to_tag
|
|
|
|
|
|
text_dict['caption'] = caption
|
|
|
text_dict['tokens'] = tokens
|
|
|
if f_tag == 0.0 and to_tag == 0.0:
|
|
|
flag = True
|
|
|
text_data.append(text_dict)
|
|
|
else:
|
|
|
try:
|
|
|
n_motion = motion[int(f_tag*fps) : int(to_tag*fps)]
|
|
|
if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200):
|
|
|
continue
|
|
|
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name
|
|
|
while new_name in data_dict:
|
|
|
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name
|
|
|
data_dict[new_name] = {'motion': n_motion,
|
|
|
'length': len(n_motion),
|
|
|
'text':[text_dict]}
|
|
|
new_name_list.append(new_name)
|
|
|
length_list.append(len(n_motion))
|
|
|
except:
|
|
|
print(line_split)
|
|
|
print(line_split[2], line_split[3], f_tag, to_tag, name)
|
|
|
|
|
|
|
|
|
if flag:
|
|
|
data_dict[name] = {'motion': motion,
|
|
|
'length': len(motion),
|
|
|
'text': text_data}
|
|
|
new_name_list.append(name)
|
|
|
length_list.append(len(motion))
|
|
|
except Exception as e:
|
|
|
|
|
|
pass
|
|
|
|
|
|
name_list, length_list = zip(*sorted(zip(new_name_list, length_list), key=lambda x: x[1]))
|
|
|
self.mean = mean
|
|
|
self.std = std
|
|
|
self.length_arr = np.array(length_list)
|
|
|
self.data_dict = data_dict
|
|
|
self.name_list = name_list
|
|
|
self.reset_max_len(self.max_length)
|
|
|
|
|
|
def reset_max_len(self, length):
|
|
|
assert length <= self.max_motion_length
|
|
|
self.pointer = np.searchsorted(self.length_arr, length)
|
|
|
print("Pointer Pointing at %d"%self.pointer)
|
|
|
self.max_length = length
|
|
|
|
|
|
def inv_transform(self, data):
|
|
|
return data * self.std + self.mean
|
|
|
|
|
|
def forward_transform(self, data):
|
|
|
return (data - self.mean) / self.std
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.data_dict) - self.pointer
|
|
|
|
|
|
def __getitem__(self, item):
|
|
|
idx = self.pointer + item
|
|
|
name = self.name_list[idx]
|
|
|
data = self.data_dict[name]
|
|
|
|
|
|
motion, m_length, text_list = data['motion'], data['length'], data['text']
|
|
|
|
|
|
text_data = random.choice(text_list)
|
|
|
caption, tokens = text_data['caption'], text_data['tokens']
|
|
|
|
|
|
if len(tokens) < self.max_text_len:
|
|
|
|
|
|
tokens = ['sos/OTHER'] + tokens + ['eos/OTHER']
|
|
|
sent_len = len(tokens)
|
|
|
tokens = tokens + ['unk/OTHER'] * (self.max_text_len + 2 - sent_len)
|
|
|
else:
|
|
|
|
|
|
tokens = tokens[:self.max_text_len]
|
|
|
tokens = ['sos/OTHER'] + tokens + ['eos/OTHER']
|
|
|
sent_len = len(tokens)
|
|
|
pos_one_hots = []
|
|
|
word_embeddings = []
|
|
|
for token in tokens:
|
|
|
word_emb, pos_oh = self.w_vectorizer[token]
|
|
|
pos_one_hots.append(pos_oh[None, :])
|
|
|
word_embeddings.append(word_emb[None, :])
|
|
|
pos_one_hots = np.concatenate(pos_one_hots, axis=0)
|
|
|
word_embeddings = np.concatenate(word_embeddings, axis=0)
|
|
|
|
|
|
if self.unit_length < 10:
|
|
|
coin2 = np.random.choice(['single', 'single', 'double'])
|
|
|
else:
|
|
|
coin2 = 'single'
|
|
|
|
|
|
if coin2 == 'double':
|
|
|
m_length = (m_length // self.unit_length - 1) * self.unit_length
|
|
|
elif coin2 == 'single':
|
|
|
m_length = (m_length // self.unit_length) * self.unit_length
|
|
|
idx = random.randint(0, len(motion) - m_length)
|
|
|
motion = motion[idx:idx+m_length]
|
|
|
|
|
|
"Z Normalization"
|
|
|
motion = (motion - self.mean) / self.std
|
|
|
|
|
|
if m_length < self.max_motion_length:
|
|
|
motion = np.concatenate([motion,
|
|
|
np.zeros((self.max_motion_length - m_length, motion.shape[1]))
|
|
|
], axis=0)
|
|
|
|
|
|
return word_embeddings, pos_one_hots, caption, sent_len, motion, m_length, '_'.join(tokens), name
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def DATALoader(dataset_name, is_test,
|
|
|
batch_size, w_vectorizer,
|
|
|
num_workers = 8, unit_length = 4) :
|
|
|
|
|
|
val_loader = torch.utils.data.DataLoader(Text2MotionDataset(dataset_name, is_test, w_vectorizer, unit_length=unit_length),
|
|
|
batch_size,
|
|
|
shuffle = True,
|
|
|
num_workers=num_workers,
|
|
|
collate_fn=collate_fn,
|
|
|
drop_last = True)
|
|
|
return val_loader
|
|
|
|
|
|
|
|
|
def cycle(iterable):
|
|
|
while True:
|
|
|
for x in iterable:
|
|
|
yield x
|
|
|
|