Comparative-Analysis-of-Speech-Synthesis-Models
/
TensorFlowTTS
/examples
/fastspeech2
/fastspeech2_dataset.py
| # -*- coding: utf-8 -*- | |
| # Copyright 2020 Minh Nguyen (@dathudeptrai) | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """Dataset modules.""" | |
| import itertools | |
| import logging | |
| import os | |
| import random | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow_tts.datasets.abstract_dataset import AbstractDataset | |
| from tensorflow_tts.utils import find_files | |
| def average_by_duration(x, durs): | |
| mel_len = durs.sum() | |
| durs_cum = np.cumsum(np.pad(durs, (1, 0))) | |
| # calculate charactor f0/energy | |
| x_char = np.zeros((durs.shape[0],), dtype=np.float32) | |
| for idx, start, end in zip(range(mel_len), durs_cum[:-1], durs_cum[1:]): | |
| values = x[start:end][np.where(x[start:end] != 0.0)[0]] | |
| x_char[idx] = np.mean(values) if len(values) > 0 else 0.0 # np.mean([]) = nan. | |
| return x_char.astype(np.float32) | |
| def tf_average_by_duration(x, durs): | |
| outs = tf.numpy_function(average_by_duration, [x, durs], tf.float32) | |
| return outs | |
| class CharactorDurationF0EnergyMelDataset(AbstractDataset): | |
| """Tensorflow Charactor Duration F0 Energy Mel dataset.""" | |
| def __init__( | |
| self, | |
| root_dir, | |
| charactor_query="*-ids.npy", | |
| mel_query="*-norm-feats.npy", | |
| duration_query="*-durations.npy", | |
| f0_query="*-raw-f0.npy", | |
| energy_query="*-raw-energy.npy", | |
| f0_stat="./dump/stats_f0.npy", | |
| energy_stat="./dump/stats_energy.npy", | |
| charactor_load_fn=np.load, | |
| mel_load_fn=np.load, | |
| duration_load_fn=np.load, | |
| f0_load_fn=np.load, | |
| energy_load_fn=np.load, | |
| mel_length_threshold=0, | |
| ): | |
| """Initialize dataset. | |
| Args: | |
| root_dir (str): Root directory including dumped files. | |
| charactor_query (str): Query to find charactor files in root_dir. | |
| mel_query (str): Query to find feature files in root_dir. | |
| duration_query (str): Query to find duration files in root_dir. | |
| f0_query (str): Query to find f0 files in root_dir. | |
| energy_query (str): Query to find energy files in root_dir. | |
| f0_stat (str): str path of f0_stat. | |
| energy_stat (str): str path of energy_stat. | |
| charactor_load_fn (func): Function to load charactor file. | |
| mel_load_fn (func): Function to load feature file. | |
| duration_load_fn (func): Function to load duration file. | |
| f0_load_fn (func): Function to load f0 file. | |
| energy_load_fn (func): Function to load energy file. | |
| mel_length_threshold (int): Threshold to remove short feature files. | |
| """ | |
| # find all of charactor and mel files. | |
| charactor_files = sorted(find_files(root_dir, charactor_query)) | |
| mel_files = sorted(find_files(root_dir, mel_query)) | |
| duration_files = sorted(find_files(root_dir, duration_query)) | |
| f0_files = sorted(find_files(root_dir, f0_query)) | |
| energy_files = sorted(find_files(root_dir, energy_query)) | |
| # assert the number of files | |
| assert len(mel_files) != 0, f"Not found any mels files in ${root_dir}." | |
| assert ( | |
| len(mel_files) | |
| == len(charactor_files) | |
| == len(duration_files) | |
| == len(f0_files) | |
| == len(energy_files) | |
| ), f"Number of charactor, mel, duration, f0 and energy files are different" | |
| if ".npy" in charactor_query: | |
| suffix = charactor_query[1:] | |
| utt_ids = [os.path.basename(f).replace(suffix, "") for f in charactor_files] | |
| # set global params | |
| self.utt_ids = utt_ids | |
| self.mel_files = mel_files | |
| self.charactor_files = charactor_files | |
| self.duration_files = duration_files | |
| self.f0_files = f0_files | |
| self.energy_files = energy_files | |
| self.mel_load_fn = mel_load_fn | |
| self.charactor_load_fn = charactor_load_fn | |
| self.duration_load_fn = duration_load_fn | |
| self.f0_load_fn = f0_load_fn | |
| self.energy_load_fn = energy_load_fn | |
| self.mel_length_threshold = mel_length_threshold | |
| self.f0_stat = np.load(f0_stat) | |
| self.energy_stat = np.load(energy_stat) | |
| def get_args(self): | |
| return [self.utt_ids] | |
| def _norm_mean_std(self, x, mean, std): | |
| zero_idxs = np.where(x == 0.0)[0] | |
| x = (x - mean) / std | |
| x[zero_idxs] = 0.0 | |
| return x | |
| def _norm_mean_std_tf(self, x, mean, std): | |
| x = tf.numpy_function(self._norm_mean_std, [x, mean, std], tf.float32) | |
| return x | |
| def generator(self, utt_ids): | |
| for i, utt_id in enumerate(utt_ids): | |
| mel_file = self.mel_files[i] | |
| charactor_file = self.charactor_files[i] | |
| duration_file = self.duration_files[i] | |
| f0_file = self.f0_files[i] | |
| energy_file = self.energy_files[i] | |
| items = { | |
| "utt_ids": utt_id, | |
| "mel_files": mel_file, | |
| "charactor_files": charactor_file, | |
| "duration_files": duration_file, | |
| "f0_files": f0_file, | |
| "energy_files": energy_file, | |
| } | |
| yield items | |
| def _load_data(self, items): | |
| mel = tf.numpy_function(np.load, [items["mel_files"]], tf.float32) | |
| charactor = tf.numpy_function(np.load, [items["charactor_files"]], tf.int32) | |
| duration = tf.numpy_function(np.load, [items["duration_files"]], tf.int32) | |
| f0 = tf.numpy_function(np.load, [items["f0_files"]], tf.float32) | |
| energy = tf.numpy_function(np.load, [items["energy_files"]], tf.float32) | |
| f0 = self._norm_mean_std_tf(f0, self.f0_stat[0], self.f0_stat[1]) | |
| energy = self._norm_mean_std_tf( | |
| energy, self.energy_stat[0], self.energy_stat[1] | |
| ) | |
| # calculate charactor f0/energy | |
| f0 = tf_average_by_duration(f0, duration) | |
| energy = tf_average_by_duration(energy, duration) | |
| items = { | |
| "utt_ids": items["utt_ids"], | |
| "input_ids": charactor, | |
| "speaker_ids": 0, | |
| "duration_gts": duration, | |
| "f0_gts": f0, | |
| "energy_gts": energy, | |
| "mel_gts": mel, | |
| "mel_lengths": len(mel), | |
| } | |
| return items | |
| def create( | |
| self, | |
| allow_cache=False, | |
| batch_size=1, | |
| is_shuffle=False, | |
| map_fn=None, | |
| reshuffle_each_iteration=True, | |
| ): | |
| """Create tf.dataset function.""" | |
| output_types = self.get_output_dtypes() | |
| datasets = tf.data.Dataset.from_generator( | |
| self.generator, output_types=output_types, args=(self.get_args()) | |
| ) | |
| # load data | |
| datasets = datasets.map( | |
| lambda items: self._load_data(items), tf.data.experimental.AUTOTUNE | |
| ) | |
| datasets = datasets.filter( | |
| lambda x: x["mel_lengths"] > self.mel_length_threshold | |
| ) | |
| if allow_cache: | |
| datasets = datasets.cache() | |
| if is_shuffle: | |
| datasets = datasets.shuffle( | |
| self.get_len_dataset(), | |
| reshuffle_each_iteration=reshuffle_each_iteration, | |
| ) | |
| # define padded shapes | |
| padded_shapes = { | |
| "utt_ids": [], | |
| "input_ids": [None], | |
| "speaker_ids": [], | |
| "duration_gts": [None], | |
| "f0_gts": [None], | |
| "energy_gts": [None], | |
| "mel_gts": [None, None], | |
| "mel_lengths": [], | |
| } | |
| datasets = datasets.padded_batch( | |
| batch_size, padded_shapes=padded_shapes, drop_remainder=True | |
| ) | |
| datasets = datasets.prefetch(tf.data.experimental.AUTOTUNE) | |
| return datasets | |
| def get_output_dtypes(self): | |
| output_types = { | |
| "utt_ids": tf.string, | |
| "mel_files": tf.string, | |
| "charactor_files": tf.string, | |
| "duration_files": tf.string, | |
| "f0_files": tf.string, | |
| "energy_files": tf.string, | |
| } | |
| return output_types | |
| def get_len_dataset(self): | |
| return len(self.utt_ids) | |
| def __name__(self): | |
| return "CharactorDurationF0EnergyMelDataset" | |