| import os |
| import rich |
| import random |
| import pickle |
| import codecs as cs |
| import numpy as np |
| from torch.utils import data |
| from rich.progress import track |
| from os.path import join as pjoin |
|
|
|
|
| class Text2MotionDataset(data.Dataset): |
|
|
| def __init__( |
| self, |
| data_root, |
| split, |
| mean, |
| std, |
| max_motion_length=196, |
| min_motion_length=40, |
| unit_length=4, |
| fps=20, |
| tmpFile=True, |
| tiny=False, |
| debug=False, |
| **kwargs, |
| ): |
|
|
| |
| self.max_length = 20 |
| self.max_motion_length = max_motion_length |
| self.min_motion_length = min_motion_length |
| self.unit_length = unit_length |
|
|
| |
| self.mean = mean |
| self.std = std |
|
|
| |
| split_file = pjoin(data_root, split + '.txt') |
| motion_dir = pjoin(data_root, 'new_joint_vecs') |
| text_dir = pjoin(data_root, 'texts') |
|
|
| |
| self.id_list = [] |
| with cs.open(split_file, "r") as f: |
| for line in f.readlines(): |
| self.id_list.append(line.strip()) |
|
|
| |
| if tiny or debug: |
| enumerator = enumerate(self.id_list) |
| maxdata = 100 |
| subset = '_tiny' |
| else: |
| enumerator = enumerate( |
| track( |
| self.id_list, |
| f"Loading HumanML3D {split}", |
| )) |
| maxdata = 1e10 |
| subset = '' |
|
|
| new_name_list = [] |
| length_list = [] |
| data_dict = {} |
|
|
| |
| if os.path.exists(pjoin(data_root, f'tmp/{split}{subset}_data.pkl')): |
| if tiny or debug: |
| with open(pjoin(data_root, f'tmp/{split}{subset}_data.pkl'), |
| 'rb') as file: |
| data_dict = pickle.load(file) |
| else: |
| with rich.progress.open( |
| pjoin(data_root, f'tmp/{split}{subset}_data.pkl'), |
| 'rb', |
| description=f"Loading HumanML3D {split}") as file: |
| data_dict = pickle.load(file) |
| with open(pjoin(data_root, f'tmp/{split}{subset}_index.pkl'), |
| 'rb') as file: |
| name_list = pickle.load(file) |
| for name in new_name_list: |
| length_list.append(data_dict[name]['length']) |
|
|
| else: |
| for idx, name in enumerator: |
| if len(new_name_list) > maxdata: |
| break |
| try: |
| motion = np.load(pjoin(motion_dir, name + ".npy")) |
| if (len(motion)) < self.min_motion_length or (len(motion) |
| >= 200): |
| continue |
|
|
| |
| text_data = [] |
| flag = False |
| with cs.open(pjoin(text_dir, name + '.txt')) as f: |
| lines = f.readlines() |
| for line in lines: |
| text_dict = {} |
| line_split = line.strip().split('#') |
| caption = line_split[0] |
| t_tokens = line_split[1].split(' ') |
| f_tag = float(line_split[2]) |
| to_tag = float(line_split[3]) |
| f_tag = 0.0 if np.isnan(f_tag) else f_tag |
| to_tag = 0.0 if np.isnan(to_tag) else to_tag |
|
|
| text_dict['caption'] = caption |
| text_dict['tokens'] = t_tokens |
| if f_tag == 0.0 and to_tag == 0.0: |
| flag = True |
| text_data.append(text_dict) |
| else: |
| motion_new = motion[int(f_tag * |
| fps):int(to_tag * fps)] |
| if (len(motion_new) |
| ) < self.min_motion_length or ( |
| len(motion_new) >= 200): |
| continue |
| new_name = random.choice( |
| 'ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name |
| while new_name in new_name_list: |
| new_name = random.choice( |
| 'ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name |
| name_count = 1 |
| while new_name in data_dict: |
| new_name += '_' + name_count |
| name_count += 1 |
| data_dict[new_name] = { |
| 'motion': motion_new, |
| "length": len(motion_new), |
| 'text': [text_dict] |
| } |
| new_name_list.append(new_name) |
| length_list.append(len(motion_new)) |
|
|
| if flag: |
| data_dict[name] = { |
| 'motion': motion, |
| "length": len(motion), |
| 'text': text_data |
| } |
| new_name_list.append(name) |
| length_list.append(len(motion)) |
| except: |
| pass |
|
|
| name_list, length_list = zip( |
| *sorted(zip(new_name_list, length_list), key=lambda x: x[1])) |
|
|
| if tmpFile: |
| os.makedirs(pjoin(data_root, 'tmp'), exist_ok=True) |
| with open(pjoin(data_root, f'tmp/{split}{subset}_data.pkl'), |
| 'wb') as file: |
| pickle.dump(data_dict, file) |
| with open(pjoin(data_root, f'tmp/{split}{subset}_index.pkl'), |
| 'wb') as file: |
| pickle.dump(name_list, file) |
|
|
| self.length_arr = np.array(length_list) |
| self.data_dict = data_dict |
| self.name_list = name_list |
| self.nfeats = data_dict[name_list[0]]['motion'].shape[1] |
| self.reset_max_len(self.max_length) |
|
|
| def reset_max_len(self, length): |
| assert length <= self.max_motion_length |
| self.pointer = np.searchsorted(self.length_arr, length) |
| print("Pointer Pointing at %d" % self.pointer) |
| self.max_length = length |
|
|
| def __len__(self): |
| return len(self.name_list) - self.pointer |
|
|
| def __getitem__(self, item): |
| idx = self.pointer + item |
| data = self.data_dict[self.name_list[idx]] |
| motion, m_length, text_list = data["motion"], data["length"], data[ |
| "text"] |
|
|
| |
| text_data = random.choice(text_list) |
| caption = text_data["caption"] |
|
|
| all_captions = [ |
| ' '.join([token.split('/')[0] for token in text_dic['tokens']]) |
| for text_dic in text_list |
| ] |
|
|
| |
| if self.unit_length < 10: |
| coin2 = np.random.choice(["single", "single", "double"]) |
| else: |
| coin2 = "single" |
|
|
| if coin2 == "double": |
| m_length = (m_length // self.unit_length - 1) * self.unit_length |
| elif coin2 == "single": |
| m_length = (m_length // self.unit_length) * self.unit_length |
|
|
| idx = random.randint(0, len(motion) - m_length) |
| motion = motion[idx:idx + m_length] |
|
|
| |
| motion = (motion - self.mean) / self.std |
|
|
| return caption, motion, m_length, None, None, None, None, all_captions |
|
|