| | import six |
| | from pathlib import Path |
| | import re |
| | import json |
| | from collections import OrderedDict |
| | from typing import Union |
| |
|
| | import numpy as np |
| | import librosa |
| | import torch |
| |
|
| | PAD = "<pad>" |
| | EOS = "<EOS>" |
| | UNK = "<UNK>" |
| | SEG = "|" |
| | RESERVED_TOKENS = [PAD, EOS, UNK] |
| | NUM_RESERVED_TOKENS = len(RESERVED_TOKENS) |
| | PAD_ID = RESERVED_TOKENS.index(PAD) |
| | EOS_ID = RESERVED_TOKENS.index(EOS) |
| | UNK_ID = RESERVED_TOKENS.index(UNK) |
| |
|
| | F0_BIN = 256 |
| | F0_MAX = 1100.0 |
| | F0_MIN = 50.0 |
| | F0_MEL_MIN = 1127 * np.log(1 + F0_MIN / 700) |
| | F0_MEL_MAX = 1127 * np.log(1 + F0_MAX / 700) |
| |
|
| |
|
| | def f0_to_coarse(f0): |
| | is_torch = isinstance(f0, torch.Tensor) |
| | f0_mel = 1127 * (1 + f0 / |
| | 700).log() if is_torch else 1127 * np.log(1 + f0 / 700) |
| | f0_mel[f0_mel > 0 |
| | ] = (f0_mel[f0_mel > 0] - |
| | F0_MEL_MIN) * (F0_BIN - 2) / (F0_MEL_MAX - F0_MEL_MIN) + 1 |
| |
|
| | f0_mel[f0_mel <= 1] = 1 |
| | f0_mel[f0_mel > F0_BIN - 1] = F0_BIN - 1 |
| | f0_coarse = (f0_mel + |
| | 0.5).long() if is_torch else np.rint(f0_mel).astype(int) |
| | assert f0_coarse.max() <= 255 and f0_coarse.min() >= 1, ( |
| | f0_coarse.max(), f0_coarse.min() |
| | ) |
| | return f0_coarse |
| |
|
| |
|
| | def norm_f0( |
| | f0: Union[np.ndarray, torch.Tensor], |
| | uv: Union[None, np.ndarray], |
| | f0_mean: float, |
| | f0_std: float, |
| | pitch_norm: str = "log", |
| | use_uv: bool = True |
| | ): |
| | is_torch = isinstance(f0, torch.Tensor) |
| | if pitch_norm == 'standard': |
| | f0 = (f0 - f0_mean) / f0_std |
| | if pitch_norm == 'log': |
| | f0 = torch.log2(f0) if is_torch else np.log2(f0) |
| | if uv is not None and use_uv: |
| | f0[uv > 0] = 0 |
| | return f0 |
| |
|
| |
|
| | def norm_interp_f0( |
| | f0: Union[np.ndarray, torch.Tensor], |
| | f0_mean: float, |
| | f0_std: float, |
| | pitch_norm: str = "log", |
| | use_uv: bool = True |
| | ): |
| | is_torch = isinstance(f0, torch.Tensor) |
| | if is_torch: |
| | device = f0.device |
| | f0 = f0.data.cpu().numpy() |
| | uv = f0 == 0 |
| | f0 = norm_f0(f0, uv, f0_mean, f0_std, pitch_norm, use_uv) |
| | if sum(uv) == len(f0): |
| | f0[uv] = 0 |
| | elif sum(uv) > 0: |
| | f0[uv] = np.interp(np.where(uv)[0], np.where(~uv)[0], f0[~uv]) |
| | uv = torch.as_tensor(uv).float() |
| | f0 = torch.as_tensor(f0).float() |
| | if is_torch: |
| | f0 = f0.to(device) |
| | return f0, uv |
| |
|
| |
|
| | def denorm_f0( |
| | f0, |
| | uv, |
| | pitch_norm="log", |
| | f0_mean=None, |
| | f0_std=None, |
| | pitch_padding=None, |
| | min=None, |
| | max=None, |
| | use_uv=True |
| | ): |
| | if pitch_norm == 'standard': |
| | f0 = f0 * f0_std + f0_mean |
| | if pitch_norm == 'log': |
| | f0 = 2**f0 |
| | if min is not None: |
| | f0 = f0.clamp(min=min) |
| | if max is not None: |
| | f0 = f0.clamp(max=max) |
| | if uv is not None and use_uv: |
| | f0[uv > 0] = 0 |
| | if pitch_padding is not None: |
| | f0[pitch_padding] = 0 |
| | return f0 |
| |
|
| |
|
| | def librosa_pad_lr(x, fshift, pad_sides=1): |
| | '''compute right padding (final frame) or both sides padding (first and final frames) |
| | ''' |
| | assert pad_sides in (1, 2) |
| | |
| | pad = (x.shape[0] // fshift + 1) * fshift - x.shape[0] |
| | if pad_sides == 1: |
| | return 0, pad |
| | else: |
| | return pad // 2, pad // 2 + pad % 2 |
| |
|
| |
|
| | def get_pitch( |
| | wav_file: Union[str, Path], sample_rate: int, frame_shift: float |
| | ): |
| | import parselmouth |
| | hop_size = int(frame_shift * sample_rate) |
| | wav, _ = librosa.core.load(wav_file, sr=sample_rate) |
| | |
| | |
| |
|
| | latent_length = wav.shape[0] // hop_size |
| | f0_min = 80 |
| | f0_max = 750 |
| | pad_size = 4 |
| |
|
| | f0 = parselmouth.Sound(wav, sample_rate).to_pitch_ac( |
| | time_step=frame_shift, |
| | voicing_threshold=0.6, |
| | pitch_floor=f0_min, |
| | pitch_ceiling=f0_max |
| | ).selected_array['frequency'] |
| | delta_l = latent_length - len(f0) |
| | if delta_l > 0: |
| | f0 = np.concatenate([f0, [f0[-1]] * delta_l], 0) |
| | pitch_coarse = f0_to_coarse(f0) |
| | return f0, pitch_coarse |
| |
|
| |
|
| | def remove_empty_lines(text): |
| | """remove empty lines""" |
| | assert (len(text) > 0) |
| | assert (isinstance(text, list)) |
| | text = [t.strip() for t in text] |
| | if "" in text: |
| | text.remove("") |
| | return text |
| |
|
| |
|
| | def is_sil_phoneme(p): |
| | return not p[0].isalpha() |
| |
|
| |
|
| | def strip_ids(ids, ids_to_strip): |
| | """Strip ids_to_strip from the end ids.""" |
| | ids = list(ids) |
| | while ids and ids[-1] in ids_to_strip: |
| | ids.pop() |
| | return ids |
| |
|
| |
|
| | class TextEncoder(object): |
| | """Base class for converting from ints to/from human readable strings.""" |
| | def __init__(self, num_reserved_ids=NUM_RESERVED_TOKENS): |
| | self._num_reserved_ids = num_reserved_ids |
| |
|
| | @property |
| | def num_reserved_ids(self): |
| | return self._num_reserved_ids |
| |
|
| | def encode(self, s): |
| | """Transform a human-readable string into a sequence of int ids. |
| | |
| | The ids should be in the range [num_reserved_ids, vocab_size). Ids [0, |
| | num_reserved_ids) are reserved. |
| | |
| | EOS is not appended. |
| | |
| | Args: |
| | s: human-readable string to be converted. |
| | |
| | Returns: |
| | ids: list of integers |
| | """ |
| | return [int(w) + self._num_reserved_ids for w in s.split()] |
| |
|
| | def decode(self, ids, strip_extraneous=False): |
| | """Transform a sequence of int ids into a human-readable string. |
| | |
| | EOS is not expected in ids. |
| | |
| | Args: |
| | ids: list of integers to be converted. |
| | strip_extraneous: bool, whether to strip off extraneous tokens |
| | (EOS and PAD). |
| | |
| | Returns: |
| | s: human-readable string. |
| | """ |
| | if strip_extraneous: |
| | ids = strip_ids(ids, list(range(self._num_reserved_ids or 0))) |
| | return " ".join(self.decode_list(ids)) |
| |
|
| | def decode_list(self, ids): |
| | """Transform a sequence of int ids into a their string versions. |
| | |
| | This method supports transforming individual input/output ids to their |
| | string versions so that sequence to/from text conversions can be visualized |
| | in a human readable format. |
| | |
| | Args: |
| | ids: list of integers to be converted. |
| | |
| | Returns: |
| | strs: list of human-readable string. |
| | """ |
| | decoded_ids = [] |
| | for id_ in ids: |
| | if 0 <= id_ < self._num_reserved_ids: |
| | decoded_ids.append(RESERVED_TOKENS[int(id_)]) |
| | else: |
| | decoded_ids.append(id_ - self._num_reserved_ids) |
| | return [str(d) for d in decoded_ids] |
| |
|
| | @property |
| | def vocab_size(self): |
| | raise NotImplementedError() |
| |
|
| |
|
| | class TokenTextEncoder(TextEncoder): |
| | """Encoder based on a user-supplied vocabulary (file or list).""" |
| | def __init__( |
| | self, |
| | vocab_filename, |
| | reverse=False, |
| | vocab_list=None, |
| | replace_oov=None, |
| | num_reserved_ids=NUM_RESERVED_TOKENS |
| | ): |
| | """Initialize from a file or list, one token per line. |
| | |
| | Handling of reserved tokens works as follows: |
| | - When initializing from a list, we add reserved tokens to the vocab. |
| | - When initializing from a file, we do not add reserved tokens to the vocab. |
| | - When saving vocab files, we save reserved tokens to the file. |
| | |
| | Args: |
| | vocab_filename: If not None, the full filename to read vocab from. If this |
| | is not None, then vocab_list should be None. |
| | reverse: Boolean indicating if tokens should be reversed during encoding |
| | and decoding. |
| | vocab_list: If not None, a list of elements of the vocabulary. If this is |
| | not None, then vocab_filename should be None. |
| | replace_oov: If not None, every out-of-vocabulary token seen when |
| | encoding will be replaced by this string (which must be in vocab). |
| | num_reserved_ids: Number of IDs to save for reserved tokens like <EOS>. |
| | """ |
| | super(TokenTextEncoder, |
| | self).__init__(num_reserved_ids=num_reserved_ids) |
| | self._reverse = reverse |
| | self._replace_oov = replace_oov |
| | if vocab_filename: |
| | self._init_vocab_from_file(vocab_filename) |
| | else: |
| | assert vocab_list is not None |
| | self._init_vocab_from_list(vocab_list) |
| | self.pad_index = self._token_to_id[PAD] |
| | self.eos_index = self._token_to_id[EOS] |
| | self.unk_index = self._token_to_id[UNK] |
| | self.seg_index = self._token_to_id[ |
| | SEG] if SEG in self._token_to_id else self.eos_index |
| |
|
| | def encode(self, s): |
| | """Converts a space-separated string of tokens to a list of ids.""" |
| | sentence = s |
| | tokens = sentence.strip().split() |
| | if self._replace_oov is not None: |
| | tokens = [ |
| | t if t in self._token_to_id else self._replace_oov |
| | for t in tokens |
| | ] |
| | ret = [self._token_to_id[tok] for tok in tokens] |
| | return ret[::-1] if self._reverse else ret |
| |
|
| | def decode(self, ids, strip_eos=False, strip_padding=False): |
| | if strip_padding and self.pad() in list(ids): |
| | pad_pos = list(ids).index(self.pad()) |
| | ids = ids[:pad_pos] |
| | if strip_eos and self.eos() in list(ids): |
| | eos_pos = list(ids).index(self.eos()) |
| | ids = ids[:eos_pos] |
| | return " ".join(self.decode_list(ids)) |
| |
|
| | def decode_list(self, ids): |
| | seq = reversed(ids) if self._reverse else ids |
| | return [self._safe_id_to_token(i) for i in seq] |
| |
|
| | @property |
| | def vocab_size(self): |
| | return len(self._id_to_token) |
| |
|
| | def __len__(self): |
| | return self.vocab_size |
| |
|
| | def _safe_id_to_token(self, idx): |
| | return self._id_to_token.get(idx, "ID_%d" % idx) |
| |
|
| | def _init_vocab_from_file(self, filename): |
| | """Load vocab from a file. |
| | |
| | Args: |
| | filename: The file to load vocabulary from. |
| | """ |
| | with open(filename) as f: |
| | tokens = [token.strip() for token in f.readlines()] |
| |
|
| | def token_gen(): |
| | for token in tokens: |
| | yield token |
| |
|
| | self._init_vocab(token_gen(), add_reserved_tokens=False) |
| |
|
| | def _init_vocab_from_list(self, vocab_list): |
| | """Initialize tokens from a list of tokens. |
| | |
| | It is ok if reserved tokens appear in the vocab list. They will be |
| | removed. The set of tokens in vocab_list should be unique. |
| | |
| | Args: |
| | vocab_list: A list of tokens. |
| | """ |
| | def token_gen(): |
| | for token in vocab_list: |
| | if token not in RESERVED_TOKENS: |
| | yield token |
| |
|
| | self._init_vocab(token_gen()) |
| |
|
| | def _init_vocab(self, token_generator, add_reserved_tokens=True): |
| | """Initialize vocabulary with tokens from token_generator.""" |
| |
|
| | self._id_to_token = {} |
| | non_reserved_start_index = 0 |
| |
|
| | if add_reserved_tokens: |
| | self._id_to_token.update(enumerate(RESERVED_TOKENS)) |
| | non_reserved_start_index = len(RESERVED_TOKENS) |
| |
|
| | self._id_to_token.update( |
| | enumerate(token_generator, start=non_reserved_start_index) |
| | ) |
| |
|
| | |
| | self._token_to_id = dict( |
| | (v, k) for k, v in six.iteritems(self._id_to_token) |
| | ) |
| |
|
| | def pad(self): |
| | return self.pad_index |
| |
|
| | def eos(self): |
| | return self.eos_index |
| |
|
| | def unk(self): |
| | return self.unk_index |
| |
|
| | def seg(self): |
| | return self.seg_index |
| |
|
| | def store_to_file(self, filename): |
| | """Write vocab file to disk. |
| | |
| | Vocab files have one token per line. The file ends in a newline. Reserved |
| | tokens are written to the vocab file as well. |
| | |
| | Args: |
| | filename: Full path of the file to store the vocab to. |
| | """ |
| | with open(filename, "w") as f: |
| | for i in range(len(self._id_to_token)): |
| | f.write(self._id_to_token[i] + "\n") |
| |
|
| | def sil_phonemes(self): |
| | return [p for p in self._id_to_token.values() if not p[0].isalpha()] |
| |
|
| |
|
| | class TextGrid(object): |
| | def __init__(self, text): |
| | text = remove_empty_lines(text) |
| | self.text = text |
| | self.line_count = 0 |
| | self._get_type() |
| | self._get_time_intval() |
| | self._get_size() |
| | self.tier_list = [] |
| | self._get_item_list() |
| |
|
| | def _extract_pattern(self, pattern, inc): |
| | """ |
| | Parameters |
| | ---------- |
| | pattern : regex to extract pattern |
| | inc : increment of line count after extraction |
| | Returns |
| | ------- |
| | group : extracted info |
| | """ |
| | try: |
| | group = re.match(pattern, self.text[self.line_count]).group(1) |
| | self.line_count += inc |
| | except AttributeError: |
| | raise ValueError( |
| | "File format error at line %d:%s" % |
| | (self.line_count, self.text[self.line_count]) |
| | ) |
| | return group |
| |
|
| | def _get_type(self): |
| | self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2) |
| |
|
| | def _get_time_intval(self): |
| | self.xmin = self._extract_pattern(r"xmin = (.*)", 1) |
| | self.xmax = self._extract_pattern(r"xmax = (.*)", 2) |
| |
|
| | def _get_size(self): |
| | self.size = int(self._extract_pattern(r"size = (.*)", 2)) |
| |
|
| | def _get_item_list(self): |
| | """Only supports IntervalTier currently""" |
| | for itemIdx in range(1, self.size + 1): |
| | tier = OrderedDict() |
| | item_list = [] |
| | tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1) |
| | tier_class = self._extract_pattern(r"class = \"(.*)\"", 1) |
| | if tier_class != "IntervalTier": |
| | raise NotImplementedError( |
| | "Only IntervalTier class is supported currently" |
| | ) |
| | tier_name = self._extract_pattern(r"name = \"(.*)\"", 1) |
| | tier_xmin = self._extract_pattern(r"xmin = (.*)", 1) |
| | tier_xmax = self._extract_pattern(r"xmax = (.*)", 1) |
| | tier_size = self._extract_pattern(r"intervals: size = (.*)", 1) |
| | for i in range(int(tier_size)): |
| | item = OrderedDict() |
| | item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1) |
| | item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1) |
| | item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1) |
| | item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1) |
| | item_list.append(item) |
| | tier["idx"] = tier_idx |
| | tier["class"] = tier_class |
| | tier["name"] = tier_name |
| | tier["xmin"] = tier_xmin |
| | tier["xmax"] = tier_xmax |
| | tier["size"] = tier_size |
| | tier["items"] = item_list |
| | self.tier_list.append(tier) |
| |
|
| | def toJson(self): |
| | _json = OrderedDict() |
| | _json["file_type"] = self.file_type |
| | _json["xmin"] = self.xmin |
| | _json["xmax"] = self.xmax |
| | _json["size"] = self.size |
| | _json["tiers"] = self.tier_list |
| | return json.dumps(_json, ensure_ascii=False, indent=2) |
| |
|
| |
|
| | def read_duration_from_textgrid( |
| | textgrid_path: Union[str, Path], |
| | phoneme: str, |
| | utterance_duration: float, |
| | ): |
| | ph_list = phoneme.split(" ") |
| | with open(textgrid_path, "r") as f: |
| | textgrid = f.readlines() |
| | textgrid = remove_empty_lines(textgrid) |
| | textgrid = TextGrid(textgrid) |
| | textgrid = json.loads(textgrid.toJson()) |
| |
|
| | split = np.ones(len(ph_list) + 1, np.float32) * -1 |
| | tg_idx = 0 |
| | ph_idx = 0 |
| | tg_align = [x for x in textgrid['tiers'][-1]['items']] |
| | tg_align_ = [] |
| | for x in tg_align: |
| | x['xmin'] = float(x['xmin']) |
| | x['xmax'] = float(x['xmax']) |
| | if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC', '<SP>', '<AP>']: |
| | x['text'] = '' |
| | if len(tg_align_) > 0 and tg_align_[-1]['text'] == '': |
| | tg_align_[-1]['xmax'] = x['xmax'] |
| | continue |
| | tg_align_.append(x) |
| | tg_align = tg_align_ |
| | tg_len = len([x for x in tg_align if x['text'] != '']) |
| | ph_len = len([x for x in ph_list if not is_sil_phoneme(x)]) |
| | assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, textgrid_path) |
| | while tg_idx < len(tg_align) or ph_idx < len(ph_list): |
| | if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]): |
| | split[ph_idx] = 1e8 |
| | ph_idx += 1 |
| | continue |
| | x = tg_align[tg_idx] |
| | if x['text'] == '' and ph_idx == len(ph_list): |
| | tg_idx += 1 |
| | continue |
| | assert ph_idx < len(ph_list), ( |
| | tg_len, ph_len, tg_align, ph_list, textgrid_path |
| | ) |
| |
|
| | ph = ph_list[ph_idx] |
| | if x['text'] == '' and not is_sil_phoneme(ph): |
| | assert False, (ph_list, tg_align) |
| | if x['text'] != '' and is_sil_phoneme(ph): |
| | ph_idx += 1 |
| | else: |
| | assert (x['text'] == '' and is_sil_phoneme(ph)) \ |
| | or x['text'].lower() == ph.lower() \ |
| | or x['text'].lower() == 'sil', (x['text'], ph) |
| | split[ph_idx] = x['xmin'] |
| | if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme( |
| | ph_list[ph_idx - 1] |
| | ): |
| | split[ph_idx - 1] = split[ph_idx] |
| | ph_idx += 1 |
| | tg_idx += 1 |
| | assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align]) |
| | assert ph_idx >= len(ph_list) - 1, ( |
| | ph_idx, ph_list, len(ph_list), [x['text'] |
| | for x in tg_align], textgrid_path |
| | ) |
| |
|
| | split[0] = 0 |
| | split[-1] = utterance_duration |
| | duration = np.diff(split) |
| | return duration |
| |
|