| | |
| | |
| | |
| | |
| |
|
| | import torch |
| | import numpy as np |
| | import torch.utils.data |
| | from torch.nn.utils.rnn import pad_sequence |
| | from utils.data_utils import * |
| | from processors.acoustic_extractor import cal_normalized_mel |
| | from text import text_to_sequence |
| | from text.text_token_collation import phoneIDCollation |
| |
|
| |
|
| | class BaseDataset(torch.utils.data.Dataset): |
| | def __init__(self, cfg, dataset, is_valid=False): |
| | """ |
| | Args: |
| | cfg: config |
| | dataset: dataset name |
| | is_valid: whether to use train or valid dataset |
| | """ |
| |
|
| | assert isinstance(dataset, str) |
| |
|
| | |
| | self.cfg = cfg |
| | |
| | processed_data_dir = os.path.join(cfg.preprocess.processed_dir, dataset) |
| | meta_file = cfg.preprocess.valid_file if is_valid else cfg.preprocess.train_file |
| | self.metafile_path = os.path.join(processed_data_dir, meta_file) |
| | self.metadata = self.get_metadata() |
| |
|
| | |
| |
|
| | ''' |
| | load spk2id and utt2spk from json file |
| | spk2id: {spk1: 0, spk2: 1, ...} |
| | utt2spk: {dataset_uid: spk1, ...} |
| | ''' |
| | if cfg.preprocess.use_spkid: |
| | spk2id_path = os.path.join(processed_data_dir, cfg.preprocess.spk2id) |
| | with open(spk2id_path, "r") as f: |
| | self.spk2id = json.load(f) |
| | |
| | utt2spk_path = os.path.join(processed_data_dir, cfg.preprocess.utt2spk) |
| | self.utt2spk = dict() |
| | with open(utt2spk_path, "r") as f: |
| | for line in f.readlines(): |
| | utt, spk = line.strip().split('\t') |
| | self.utt2spk[utt] = spk |
| | |
| |
|
| | if cfg.preprocess.use_uv: |
| | self.utt2uv_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| | self.utt2uv_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.uv_dir, |
| | uid + ".npy", |
| | ) |
| |
|
| | if cfg.preprocess.use_frame_pitch: |
| | self.utt2frame_pitch_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | self.utt2frame_pitch_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.pitch_dir, |
| | uid + ".npy", |
| | ) |
| |
|
| | if cfg.preprocess.use_frame_energy: |
| | self.utt2frame_energy_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | self.utt2frame_energy_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.energy_dir, |
| | uid + ".npy", |
| | ) |
| |
|
| | if cfg.preprocess.use_mel: |
| | self.utt2mel_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | self.utt2mel_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.mel_dir, |
| | uid + ".npy", |
| | ) |
| |
|
| | if cfg.preprocess.use_linear: |
| | self.utt2linear_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | self.utt2linear_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.linear_dir, |
| | uid + ".npy", |
| | ) |
| |
|
| | if cfg.preprocess.use_audio: |
| | self.utt2audio_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | self.utt2audio_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.audio_dir, |
| | uid + ".npy", |
| | ) |
| | elif cfg.preprocess.use_label: |
| | self.utt2label_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | self.utt2label_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.label_dir, |
| | uid + ".npy", |
| | ) |
| | elif cfg.preprocess.use_one_hot: |
| | self.utt2one_hot_path = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | self.utt2one_hot_path[utt] = os.path.join( |
| | cfg.preprocess.processed_dir, |
| | dataset, |
| | cfg.preprocess.one_hot_dir, |
| | uid + ".npy", |
| | ) |
| |
|
| | if cfg.preprocess.use_text or cfg.preprocess.use_phone: |
| | self.utt2seq = {} |
| | for utt_info in self.metadata: |
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | if cfg.preprocess.use_text: |
| | text = utt_info["Text"] |
| | sequence = text_to_sequence(text, cfg.preprocess.text_cleaners) |
| | elif cfg.preprocess.use_phone: |
| | |
| | phone_path = os.path.join(processed_data_dir, |
| | cfg.preprocess.phone_dir, |
| | uid+'.phone' |
| | ) |
| | with open(phone_path, 'r') as fin: |
| | phones = fin.readlines() |
| | assert len(phones) == 1 |
| | phones = phones[0].strip() |
| | phones_seq = phones.split(' ') |
| |
|
| | phon_id_collator = phoneIDCollation(cfg, dataset=dataset) |
| | sequence = phon_id_collator.get_phone_id_sequence(cfg, phones_seq) |
| |
|
| | self.utt2seq[utt] = sequence |
| |
|
| | |
| | def get_metadata(self): |
| | with open(self.metafile_path, "r", encoding="utf-8") as f: |
| | metadata = json.load(f) |
| |
|
| | return metadata |
| |
|
| | def get_dataset_name(self): |
| | return self.metadata[0]["Dataset"] |
| |
|
| | def __getitem__(self, index): |
| | utt_info = self.metadata[index] |
| |
|
| | dataset = utt_info["Dataset"] |
| | uid = utt_info["Uid"] |
| | utt = "{}_{}".format(dataset, uid) |
| |
|
| | single_feature = dict() |
| |
|
| | if self.cfg.preprocess.use_spkid: |
| | single_feature["spk_id"] = np.array( |
| | [self.spk2id[self.utt2spk[utt]]], dtype=np.int32 |
| | ) |
| |
|
| | if self.cfg.preprocess.use_mel: |
| | mel = np.load(self.utt2mel_path[utt]) |
| | assert mel.shape[0] == self.cfg.preprocess.n_mel |
| | if self.cfg.preprocess.use_min_max_norm_mel: |
| | |
| | mel = cal_normalized_mel(mel, utt_info["Dataset"], self.cfg.preprocess) |
| |
|
| | if "target_len" not in single_feature.keys(): |
| | single_feature["target_len"] = mel.shape[1] |
| | single_feature["mel"] = mel.T |
| |
|
| | if self.cfg.preprocess.use_linear: |
| | linear = np.load(self.utt2linear_path[utt]) |
| | if "target_len" not in single_feature.keys(): |
| | single_feature["target_len"] = linear.shape[1] |
| | single_feature["linear"] = linear.T |
| |
|
| | if self.cfg.preprocess.use_frame_pitch: |
| | frame_pitch_path = self.utt2frame_pitch_path[utt] |
| | frame_pitch = np.load(frame_pitch_path) |
| | if "target_len" not in single_feature.keys(): |
| | single_feature["target_len"] = len(frame_pitch) |
| | aligned_frame_pitch = align_length( |
| | frame_pitch, single_feature["target_len"] |
| | ) |
| | single_feature["frame_pitch"] = aligned_frame_pitch |
| |
|
| | if self.cfg.preprocess.use_uv: |
| | frame_uv_path = self.utt2uv_path[utt] |
| | frame_uv = np.load(frame_uv_path) |
| | aligned_frame_uv = align_length(frame_uv, single_feature["target_len"]) |
| | aligned_frame_uv = [ |
| | 0 if frame_uv else 1 for frame_uv in aligned_frame_uv |
| | ] |
| | aligned_frame_uv = np.array(aligned_frame_uv) |
| | single_feature["frame_uv"] = aligned_frame_uv |
| |
|
| | if self.cfg.preprocess.use_frame_energy: |
| | frame_energy_path = self.utt2frame_energy_path[utt] |
| | frame_energy = np.load(frame_energy_path) |
| | if "target_len" not in single_feature.keys(): |
| | single_feature["target_len"] = len(frame_energy) |
| | aligned_frame_energy = align_length( |
| | frame_energy, single_feature["target_len"] |
| | ) |
| | single_feature["frame_energy"] = aligned_frame_energy |
| |
|
| | if self.cfg.preprocess.use_audio: |
| | audio = np.load(self.utt2audio_path[utt]) |
| | single_feature["audio"] = audio |
| | single_feature["audio_len"] = audio.shape[0] |
| |
|
| | if self.cfg.preprocess.use_phone or self.cfg.preprocess.use_text: |
| | single_feature["phone_seq"] = np.array(self.utt2seq[utt]) |
| | single_feature["phone_len"] = len(self.utt2seq[utt]) |
| |
|
| | return single_feature |
| |
|
| | def __len__(self): |
| | return len(self.metadata) |
| |
|
| |
|
| | class BaseCollator(object): |
| | """Zero-pads model inputs and targets based on number of frames per step""" |
| |
|
| | def __init__(self, cfg): |
| | self.cfg = cfg |
| |
|
| | def __call__(self, batch): |
| | packed_batch_features = dict() |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | for key in batch[0].keys(): |
| | if key == "target_len": |
| | packed_batch_features["target_len"] = torch.LongTensor( |
| | [b["target_len"] for b in batch] |
| | ) |
| | masks = [ |
| | torch.ones((b["target_len"], 1), dtype=torch.long) for b in batch |
| | ] |
| | packed_batch_features["mask"] = pad_sequence( |
| | masks, batch_first=True, padding_value=0 |
| | ) |
| | elif key == "phone_len": |
| | packed_batch_features["phone_len"] = torch.LongTensor( |
| | [b["phone_len"] for b in batch] |
| | ) |
| | masks = [ |
| | torch.ones((b["phone_len"], 1), dtype=torch.long) for b in batch |
| | ] |
| | packed_batch_features["phn_mask"] = pad_sequence( |
| | masks, batch_first=True, padding_value=0 |
| | ) |
| | elif key == "audio_len": |
| | packed_batch_features["audio_len"] = torch.LongTensor( |
| | [b["audio_len"] for b in batch] |
| | ) |
| | masks = [ |
| | torch.ones((b["audio_len"], 1), dtype=torch.long) for b in batch |
| | ] |
| | else: |
| | values = [torch.from_numpy(b[key]) for b in batch] |
| | packed_batch_features[key] = pad_sequence( |
| | values, batch_first=True, padding_value=0 |
| | ) |
| | return packed_batch_features |
| |
|
| |
|
| | class BaseTestDataset(torch.utils.data.Dataset): |
| | def __init__(self, cfg, args): |
| | raise NotImplementedError |
| | |
| |
|
| | def get_metadata(self): |
| | raise NotImplementedError |
| |
|
| | def __getitem__(self, index): |
| | raise NotImplementedError |
| |
|
| | def __len__(self): |
| | return len(self.metadata) |
| |
|
| |
|
| | class BaseTestCollator(object): |
| | """Zero-pads model inputs and targets based on number of frames per step""" |
| |
|
| | def __init__(self, cfg): |
| | raise NotImplementedError |
| |
|
| | def __call__(self, batch): |
| | raise NotImplementedError |
| |
|