Spaces:
Runtime error
Runtime error
| import os | |
| import pickle | |
| import torch | |
| import numpy as np | |
| from numpy.random import uniform | |
| from torch.utils import data | |
| from torch.utils.data.sampler import Sampler | |
| from multiprocessing import Process, Manager | |
| class Utterances(data.Dataset): | |
| """Dataset class for the Utterances dataset.""" | |
| def __init__(self, hparams): | |
| """Initialize and preprocess the Utterances dataset.""" | |
| self.meta_file = hparams.meta_file | |
| self.feat_dir_1 = hparams.feat_dir_1 | |
| self.feat_dir_2 = hparams.feat_dir_2 | |
| self.feat_dir_3 = hparams.feat_dir_3 | |
| self.step = 4 | |
| self.split = 0 | |
| self.max_len_pad = hparams.max_len_pad | |
| meta = pickle.load(open(self.meta_file, "rb")) | |
| manager = Manager() | |
| meta = manager.list(meta) | |
| dataset = manager.list(len(meta)*[None]) # <-- can be shared between processes. | |
| processes = [] | |
| for i in range(0, len(meta), self.step): | |
| p = Process(target=self.load_data, | |
| args=(meta[i:i+self.step],dataset,i)) | |
| p.start() | |
| processes.append(p) | |
| for p in processes: | |
| p.join() | |
| # very importtant to do dataset = list(dataset) | |
| self.train_dataset = list(dataset) | |
| self.num_tokens = len(self.train_dataset) | |
| print('Finished loading the {} Utterances training dataset...'.format(self.num_tokens)) | |
| def load_data(self, submeta, dataset, idx_offset): | |
| for k, sbmt in enumerate(submeta): | |
| uttrs = len(sbmt)*[None] | |
| for j, tmp in enumerate(sbmt): | |
| if j < 2: | |
| # fill in speaker name and embedding | |
| uttrs[j] = tmp | |
| else: | |
| # fill in data | |
| sp_tmp = np.load(os.path.join(self.feat_dir_1, tmp)) | |
| cep_tmp = np.load(os.path.join(self.feat_dir_2, tmp))[:, 0:14] | |
| cd_tmp = np.load(os.path.join(self.feat_dir_3, tmp)) | |
| assert len(sp_tmp) == len(cep_tmp) == len(cd_tmp) | |
| uttrs[j] = ( np.clip(sp_tmp, 0, 1), cep_tmp, cd_tmp ) | |
| dataset[idx_offset+k] = uttrs | |
| def segment_np(self, cd_long, tau=2): | |
| cd_norm = np.sqrt((cd_long ** 2).sum(axis=-1, keepdims=True)) | |
| G = (cd_long @ cd_long.T) / (cd_norm @ cd_norm.T) | |
| L = G.shape[0] | |
| num_rep = [] | |
| num_rep_sync = [] | |
| prev_boundary = 0 | |
| rate = np.random.uniform(0.8, 1.3) | |
| for t in range(1, L+1): | |
| if t==L: | |
| num_rep.append(t - prev_boundary) | |
| num_rep_sync.append(t - prev_boundary) | |
| prev_boundary = t | |
| if t < L: | |
| q = np.random.uniform(rate-0.1, rate) | |
| tmp = G[prev_boundary, max(prev_boundary-20, 0):min(prev_boundary+20, L)] | |
| if q <= 1: | |
| epsilon = np.quantile(tmp, q) | |
| if np.all(G[prev_boundary, t:min(t+tau, L)] < epsilon): | |
| num_rep.append(t - prev_boundary) | |
| num_rep_sync.append(t - prev_boundary) | |
| prev_boundary = t | |
| else: | |
| epsilon = np.quantile(tmp, 2-q) | |
| if np.all(G[prev_boundary, t:min(t+tau, L)] < epsilon): | |
| num_rep.append(t - prev_boundary) | |
| else: | |
| num_rep.extend([t-prev_boundary-0.5, 0.5]) | |
| num_rep_sync.append(t - prev_boundary) | |
| prev_boundary = t | |
| num_rep = np.array(num_rep) | |
| num_rep_sync = np.array(num_rep_sync) | |
| return num_rep, num_rep_sync | |
| def __getitem__(self, index): | |
| """Return M uttrs for one spkr.""" | |
| dataset = self.train_dataset | |
| list_uttrs = dataset[index] | |
| emb_org = list_uttrs[1] | |
| uttr = np.random.randint(2, len(list_uttrs)) | |
| melsp, melcep, cd_real = list_uttrs[uttr] | |
| num_rep, num_rep_sync = self.segment_np(cd_real) | |
| return melsp, melcep, cd_real, num_rep, num_rep_sync, len(melsp), len(num_rep), len(num_rep_sync), emb_org | |
| def __len__(self): | |
| """Return the number of spkrs.""" | |
| return self.num_tokens | |
| class MyCollator(object): | |
| def __init__(self, hparams): | |
| self.max_len_pad = hparams.max_len_pad | |
| def __call__(self, batch): | |
| new_batch = [] | |
| l_short_max = 0 | |
| l_short_sync_max = 0 | |
| l_real_max = 0 | |
| for token in batch: | |
| sp_real, cep_real, cd_real, rep, rep_sync, l_real, l_short, l_short_sync, emb = token | |
| if l_short > l_short_max: | |
| l_short_max = l_short | |
| if l_short_sync > l_short_sync_max: | |
| l_short_sync_max = l_short_sync | |
| if l_real > l_real_max: | |
| l_real_max = l_real | |
| sp_real_pad = np.pad(sp_real, ((0,self.max_len_pad-l_real),(0,0)), 'constant') | |
| cep_real_pad = np.pad(cep_real, ((0,self.max_len_pad-l_real),(0,0)), 'constant') | |
| cd_real_pad = np.pad(cd_real, ((0,self.max_len_pad-l_real),(0,0)), 'constant') | |
| rep_pad = np.pad(rep, (0,self.max_len_pad-l_short), 'constant') | |
| rep_sync_pad = np.pad(rep_sync, (0,self.max_len_pad-l_short_sync), 'constant') | |
| new_batch.append( (sp_real_pad, cep_real_pad, cd_real_pad, rep_pad, rep_sync_pad, l_real, l_short, l_short_sync, emb) ) | |
| batch = new_batch | |
| a, b, c, d, e, f, g, h, i = zip(*batch) | |
| sp_real = torch.from_numpy(np.stack(a, axis=0))[:,:l_real_max+1,:] | |
| cep_real = torch.from_numpy(np.stack(b, axis=0))[:,:l_real_max+1,:] | |
| cd_real = torch.from_numpy(np.stack(c, axis=0))[:,:l_real_max+1,:] | |
| num_rep = torch.from_numpy(np.stack(d, axis=0))[:,:l_short_max+1] | |
| num_rep_sync = torch.from_numpy(np.stack(e, axis=0))[:,:l_short_sync_max+1] | |
| len_real = torch.from_numpy(np.stack(f, axis=0)) | |
| len_short = torch.from_numpy(np.stack(g, axis=0)) | |
| len_short_sync = torch.from_numpy(np.stack(h, axis=0)) | |
| spk_emb = torch.from_numpy(np.stack(i, axis=0)) | |
| return sp_real, cep_real, cd_real, num_rep, num_rep_sync, len_real, len_short, len_short_sync, spk_emb | |
| class MultiSampler(Sampler): | |
| """Samples elements more than once in a single pass through the data. | |
| """ | |
| def __init__(self, num_samples, n_repeats, shuffle=False): | |
| self.num_samples = num_samples | |
| self.n_repeats = n_repeats | |
| self.shuffle = shuffle | |
| def gen_sample_array(self): | |
| self.sample_idx_array = torch.arange(self.num_samples, dtype=torch.int64).repeat(self.n_repeats) | |
| if self.shuffle: | |
| self.sample_idx_array = self.sample_idx_array[torch.randperm(len(self.sample_idx_array))] | |
| return self.sample_idx_array | |
| def __iter__(self): | |
| return iter(self.gen_sample_array()) | |
| def __len__(self): | |
| return len(self.sample_idx_array) | |
| def worker_init_fn(x): | |
| return np.random.seed((torch.initial_seed()) % (2**32)) | |
| def get_loader(hparams): | |
| """Build and return a data loader.""" | |
| dataset = Utterances(hparams) | |
| my_collator = MyCollator(hparams) | |
| sampler = MultiSampler(len(dataset), hparams.samplier, shuffle=hparams.shuffle) | |
| data_loader = data.DataLoader(dataset=dataset, | |
| batch_size=hparams.batch_size, | |
| sampler=sampler, | |
| num_workers=hparams.num_workers, | |
| drop_last=True, | |
| pin_memory=False, | |
| worker_init_fn=worker_init_fn, | |
| collate_fn=my_collator) | |
| return data_loader |