| import warnings
|
|
|
| warnings.filterwarnings("ignore")
|
|
|
| import parselmouth
|
| import os
|
| import torch
|
| from skimage.transform import resize
|
| from utils.text_encoder import TokenTextEncoder
|
| from utils.pitch_utils import f0_to_coarse
|
| import struct
|
| import webrtcvad
|
| from scipy.ndimage.morphology import binary_dilation
|
| import librosa
|
| import numpy as np
|
| from utils import audio
|
| import pyloudnorm as pyln
|
| import re
|
| import json
|
| from collections import OrderedDict
|
|
|
| PUNCS = '!,.?;:'
|
|
|
| int16_max = (2 ** 15) - 1
|
|
|
|
|
| def trim_long_silences(path, sr=None, return_raw_wav=False, norm=True, vad_max_silence_length=12):
|
| """
|
| Ensures that segments without voice in the waveform remain no longer than a
|
| threshold determined by the VAD parameters in params.py.
|
| :param wav: the raw waveform as a numpy array of floats
|
| :param vad_max_silence_length: Maximum number of consecutive silent frames a segment can have.
|
| :return: the same waveform with silences trimmed away (length <= original wav length)
|
| """
|
|
|
|
|
|
|
|
|
| sampling_rate = 16000
|
| wav_raw, sr = librosa.core.load(path, sr=sr)
|
|
|
| if norm:
|
| meter = pyln.Meter(sr)
|
| loudness = meter.integrated_loudness(wav_raw)
|
| wav_raw = pyln.normalize.loudness(wav_raw, loudness, -20.0)
|
| if np.abs(wav_raw).max() > 1.0:
|
| wav_raw = wav_raw / np.abs(wav_raw).max()
|
|
|
| wav = librosa.resample(wav_raw, sr, sampling_rate, res_type='kaiser_best')
|
|
|
| vad_window_length = 30
|
|
|
|
|
| vad_moving_average_width = 8
|
|
|
|
|
| samples_per_window = (vad_window_length * sampling_rate) // 1000
|
|
|
|
|
| wav = wav[:len(wav) - (len(wav) % samples_per_window)]
|
|
|
|
|
| pcm_wave = struct.pack("%dh" % len(wav), *(np.round(wav * int16_max)).astype(np.int16))
|
|
|
|
|
| voice_flags = []
|
| vad = webrtcvad.Vad(mode=3)
|
| for window_start in range(0, len(wav), samples_per_window):
|
| window_end = window_start + samples_per_window
|
| voice_flags.append(vad.is_speech(pcm_wave[window_start * 2:window_end * 2],
|
| sample_rate=sampling_rate))
|
| voice_flags = np.array(voice_flags)
|
|
|
|
|
| def moving_average(array, width):
|
| array_padded = np.concatenate((np.zeros((width - 1) // 2), array, np.zeros(width // 2)))
|
| ret = np.cumsum(array_padded, dtype=float)
|
| ret[width:] = ret[width:] - ret[:-width]
|
| return ret[width - 1:] / width
|
|
|
| audio_mask = moving_average(voice_flags, vad_moving_average_width)
|
| audio_mask = np.round(audio_mask).astype(np.bool)
|
|
|
|
|
| audio_mask = binary_dilation(audio_mask, np.ones(vad_max_silence_length + 1))
|
| audio_mask = np.repeat(audio_mask, samples_per_window)
|
| audio_mask = resize(audio_mask, (len(wav_raw),)) > 0
|
| if return_raw_wav:
|
| return wav_raw, audio_mask, sr
|
| return wav_raw[audio_mask], audio_mask, sr
|
|
|
|
|
| def process_utterance(wav_path,
|
| fft_size=1024,
|
| hop_size=256,
|
| win_length=1024,
|
| window="hann",
|
| num_mels=80,
|
| fmin=80,
|
| fmax=7600,
|
| eps=1e-6,
|
| sample_rate=22050,
|
| loud_norm=False,
|
| min_level_db=-100,
|
| return_linear=False,
|
| trim_long_sil=False, vocoder='pwg'):
|
| if isinstance(wav_path, str):
|
| if trim_long_sil:
|
| wav, _, _ = trim_long_silences(wav_path, sample_rate)
|
| else:
|
| wav, _ = librosa.core.load(wav_path, sr=sample_rate)
|
| else:
|
| wav = wav_path
|
|
|
| if loud_norm:
|
| meter = pyln.Meter(sample_rate)
|
| loudness = meter.integrated_loudness(wav)
|
| wav = pyln.normalize.loudness(wav, loudness, -22.0)
|
| if np.abs(wav).max() > 1:
|
| wav = wav / np.abs(wav).max()
|
|
|
|
|
| x_stft = librosa.stft(wav, n_fft=fft_size, hop_length=hop_size,
|
| win_length=win_length, window=window, pad_mode="constant")
|
| spc = np.abs(x_stft)
|
|
|
|
|
| fmin = 0 if fmin == -1 else fmin
|
| fmax = sample_rate / 2 if fmax == -1 else fmax
|
| mel_basis = librosa.filters.mel(sample_rate, fft_size, num_mels, fmin, fmax)
|
| mel = mel_basis @ spc
|
|
|
| if vocoder == 'pwg':
|
| mel = np.log10(np.maximum(eps, mel))
|
| else:
|
| assert False, f'"{vocoder}" is not in ["pwg"].'
|
|
|
| l_pad, r_pad = audio.librosa_pad_lr(wav, fft_size, hop_size, 1)
|
| wav = np.pad(wav, (l_pad, r_pad), mode='constant', constant_values=0.0)
|
| wav = wav[:mel.shape[1] * hop_size]
|
|
|
| if not return_linear:
|
| return wav, mel
|
| else:
|
| spc = audio.amp_to_db(spc)
|
| spc = audio.normalize(spc, {'min_level_db': min_level_db})
|
| return wav, mel, spc
|
|
|
|
|
| def get_pitch(wav_data, mel, hparams):
|
| """
|
|
|
| :param wav_data: [T]
|
| :param mel: [T, 80]
|
| :param hparams:
|
| :return:
|
| """
|
| time_step = hparams['hop_size'] / hparams['audio_sample_rate'] * 1000
|
| f0_min = 80
|
| f0_max = 750
|
|
|
| if hparams['hop_size'] == 128:
|
| pad_size = 4
|
| elif hparams['hop_size'] == 256:
|
| pad_size = 2
|
| else:
|
| assert False
|
|
|
| f0 = parselmouth.Sound(wav_data, hparams['audio_sample_rate']).to_pitch_ac(
|
| time_step=time_step / 1000, voicing_threshold=0.6,
|
| pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency']
|
| lpad = pad_size * 2
|
| rpad = len(mel) - len(f0) - lpad
|
| f0 = np.pad(f0, [[lpad, rpad]], mode='constant')
|
|
|
|
|
|
|
| delta_l = len(mel) - len(f0)
|
| assert np.abs(delta_l) <= 8
|
| if delta_l > 0:
|
| f0 = np.concatenate([f0, [f0[-1]] * delta_l], 0)
|
| f0 = f0[:len(mel)]
|
| pitch_coarse = f0_to_coarse(f0)
|
| return f0, pitch_coarse
|
|
|
|
|
| def remove_empty_lines(text):
|
| """remove empty lines"""
|
| assert (len(text) > 0)
|
| assert (isinstance(text, list))
|
| text = [t.strip() for t in text]
|
| if "" in text:
|
| text.remove("")
|
| return text
|
|
|
|
|
| class TextGrid(object):
|
| def __init__(self, text):
|
| text = remove_empty_lines(text)
|
| self.text = text
|
| self.line_count = 0
|
| self._get_type()
|
| self._get_time_intval()
|
| self._get_size()
|
| self.tier_list = []
|
| self._get_item_list()
|
|
|
| def _extract_pattern(self, pattern, inc):
|
| """
|
| Parameters
|
| ----------
|
| pattern : regex to extract pattern
|
| inc : increment of line count after extraction
|
| Returns
|
| -------
|
| group : extracted info
|
| """
|
| try:
|
| group = re.match(pattern, self.text[self.line_count]).group(1)
|
| self.line_count += inc
|
| except AttributeError:
|
| raise ValueError("File format error at line %d:%s" % (self.line_count, self.text[self.line_count]))
|
| return group
|
|
|
| def _get_type(self):
|
| self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2)
|
|
|
| def _get_time_intval(self):
|
| self.xmin = self._extract_pattern(r"xmin = (.*)", 1)
|
| self.xmax = self._extract_pattern(r"xmax = (.*)", 2)
|
|
|
| def _get_size(self):
|
| self.size = int(self._extract_pattern(r"size = (.*)", 2))
|
|
|
| def _get_item_list(self):
|
| """Only supports IntervalTier currently"""
|
| for itemIdx in range(1, self.size + 1):
|
| tier = OrderedDict()
|
| item_list = []
|
| tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1)
|
| tier_class = self._extract_pattern(r"class = \"(.*)\"", 1)
|
| if tier_class != "IntervalTier":
|
| raise NotImplementedError("Only IntervalTier class is supported currently")
|
| tier_name = self._extract_pattern(r"name = \"(.*)\"", 1)
|
| tier_xmin = self._extract_pattern(r"xmin = (.*)", 1)
|
| tier_xmax = self._extract_pattern(r"xmax = (.*)", 1)
|
| tier_size = self._extract_pattern(r"intervals: size = (.*)", 1)
|
| for i in range(int(tier_size)):
|
| item = OrderedDict()
|
| item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1)
|
| item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1)
|
| item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1)
|
| item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1)
|
| item_list.append(item)
|
| tier["idx"] = tier_idx
|
| tier["class"] = tier_class
|
| tier["name"] = tier_name
|
| tier["xmin"] = tier_xmin
|
| tier["xmax"] = tier_xmax
|
| tier["size"] = tier_size
|
| tier["items"] = item_list
|
| self.tier_list.append(tier)
|
|
|
| def toJson(self):
|
| _json = OrderedDict()
|
| _json["file_type"] = self.file_type
|
| _json["xmin"] = self.xmin
|
| _json["xmax"] = self.xmax
|
| _json["size"] = self.size
|
| _json["tiers"] = self.tier_list
|
| return json.dumps(_json, ensure_ascii=False, indent=2)
|
|
|
|
|
| def get_mel2ph(tg_fn, ph, mel, hparams):
|
| ph_list = ph.split(" ")
|
| with open(tg_fn, "r") as f:
|
| tg = f.readlines()
|
| tg = remove_empty_lines(tg)
|
| tg = TextGrid(tg)
|
| tg = json.loads(tg.toJson())
|
| split = np.ones(len(ph_list) + 1, np.float) * -1
|
| tg_idx = 0
|
| ph_idx = 0
|
| tg_align = [x for x in tg['tiers'][-1]['items']]
|
| tg_align_ = []
|
| for x in tg_align:
|
| x['xmin'] = float(x['xmin'])
|
| x['xmax'] = float(x['xmax'])
|
| if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC']:
|
| x['text'] = ''
|
| if len(tg_align_) > 0 and tg_align_[-1]['text'] == '':
|
| tg_align_[-1]['xmax'] = x['xmax']
|
| continue
|
| tg_align_.append(x)
|
| tg_align = tg_align_
|
| tg_len = len([x for x in tg_align if x['text'] != ''])
|
| ph_len = len([x for x in ph_list if not is_sil_phoneme(x)])
|
| assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, tg_fn)
|
| while tg_idx < len(tg_align) or ph_idx < len(ph_list):
|
| if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]):
|
| split[ph_idx] = 1e8
|
| ph_idx += 1
|
| continue
|
| x = tg_align[tg_idx]
|
| if x['text'] == '' and ph_idx == len(ph_list):
|
| tg_idx += 1
|
| continue
|
| assert ph_idx < len(ph_list), (tg_len, ph_len, tg_align, ph_list, tg_fn)
|
| ph = ph_list[ph_idx]
|
| if x['text'] == '' and not is_sil_phoneme(ph):
|
| assert False, (ph_list, tg_align)
|
| if x['text'] != '' and is_sil_phoneme(ph):
|
| ph_idx += 1
|
| else:
|
| assert (x['text'] == '' and is_sil_phoneme(ph)) \
|
| or x['text'].lower() == ph.lower() \
|
| or x['text'].lower() == 'sil', (x['text'], ph)
|
| split[ph_idx] = x['xmin']
|
| if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme(ph_list[ph_idx - 1]):
|
| split[ph_idx - 1] = split[ph_idx]
|
| ph_idx += 1
|
| tg_idx += 1
|
| assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align])
|
| assert ph_idx >= len(ph_list) - 1, (ph_idx, ph_list, len(ph_list), [x['text'] for x in tg_align], tg_fn)
|
| mel2ph = np.zeros([mel.shape[0]], np.int)
|
| split[0] = 0
|
| split[-1] = 1e8
|
| for i in range(len(split) - 1):
|
| assert split[i] != -1 and split[i] <= split[i + 1], (split[:-1],)
|
| split = [int(s * hparams['audio_sample_rate'] / hparams['hop_size'] + 0.5) for s in split]
|
| for ph_idx in range(len(ph_list)):
|
| mel2ph[split[ph_idx]:split[ph_idx + 1]] = ph_idx + 1
|
| mel2ph_torch = torch.from_numpy(mel2ph)
|
| T_t = len(ph_list)
|
| dur = mel2ph_torch.new_zeros([T_t + 1]).scatter_add(0, mel2ph_torch, torch.ones_like(mel2ph_torch))
|
| dur = dur[1:].numpy()
|
| return mel2ph, dur
|
|
|
|
|
| def build_phone_encoder(data_dir):
|
| phone_list_file = os.path.join(data_dir, 'phone_set.json')
|
| phone_list = json.load(open(phone_list_file))
|
| return TokenTextEncoder(None, vocab_list=phone_list, replace_oov=',')
|
|
|
|
|
| def is_sil_phoneme(p):
|
| return not p[0].isalpha()
|
|
|