|
|
import six |
|
|
from pathlib import Path |
|
|
import re |
|
|
import json |
|
|
from collections import OrderedDict |
|
|
from typing import Union |
|
|
|
|
|
import numpy as np |
|
|
import librosa |
|
|
import torch |
|
|
|
|
|
PAD = "<pad>" |
|
|
EOS = "<EOS>" |
|
|
UNK = "<UNK>" |
|
|
SEG = "|" |
|
|
RESERVED_TOKENS = [PAD, EOS, UNK] |
|
|
NUM_RESERVED_TOKENS = len(RESERVED_TOKENS) |
|
|
PAD_ID = RESERVED_TOKENS.index(PAD) |
|
|
EOS_ID = RESERVED_TOKENS.index(EOS) |
|
|
UNK_ID = RESERVED_TOKENS.index(UNK) |
|
|
|
|
|
F0_BIN = 256 |
|
|
F0_MAX = 1100.0 |
|
|
F0_MIN = 50.0 |
|
|
F0_MEL_MIN = 1127 * np.log(1 + F0_MIN/700) |
|
|
F0_MEL_MAX = 1127 * np.log(1 + F0_MAX/700) |
|
|
|
|
|
|
|
|
def f0_to_coarse(f0): |
|
|
is_torch = isinstance(f0, torch.Tensor) |
|
|
f0_mel = 1127 * (1 + |
|
|
f0/700).log() if is_torch else 1127 * np.log(1 + f0/700) |
|
|
f0_mel[f0_mel > 0 |
|
|
] = (f0_mel[f0_mel > 0] - |
|
|
F0_MEL_MIN) * (F0_BIN-2) / (F0_MEL_MAX-F0_MEL_MIN) + 1 |
|
|
|
|
|
f0_mel[f0_mel <= 1] = 1 |
|
|
f0_mel[f0_mel > F0_BIN - 1] = F0_BIN - 1 |
|
|
f0_coarse = (f0_mel + |
|
|
0.5).long() if is_torch else np.rint(f0_mel).astype(np.int) |
|
|
assert f0_coarse.max() <= 255 and f0_coarse.min() >= 1, ( |
|
|
f0_coarse.max(), f0_coarse.min() |
|
|
) |
|
|
return f0_coarse |
|
|
|
|
|
|
|
|
def norm_f0( |
|
|
f0: Union[np.ndarray, torch.Tensor], |
|
|
uv: Union[None, np.ndarray], |
|
|
f0_mean: float, |
|
|
f0_std: float, |
|
|
pitch_norm: str = "log", |
|
|
use_uv: bool = True |
|
|
): |
|
|
is_torch = isinstance(f0, torch.Tensor) |
|
|
if pitch_norm == 'standard': |
|
|
f0 = (f0-f0_mean) / f0_std |
|
|
if pitch_norm == 'log': |
|
|
f0 = torch.log2(f0) if is_torch else np.log2(f0) |
|
|
if uv is not None and use_uv: |
|
|
f0[uv > 0] = 0 |
|
|
return f0 |
|
|
|
|
|
|
|
|
def norm_interp_f0( |
|
|
f0: Union[np.ndarray, torch.Tensor], |
|
|
f0_mean: float, |
|
|
f0_std: float, |
|
|
pitch_norm: str = "log", |
|
|
use_uv: bool = True |
|
|
): |
|
|
is_torch = isinstance(f0, torch.Tensor) |
|
|
if is_torch: |
|
|
device = f0.device |
|
|
f0 = f0.data.cpu().numpy() |
|
|
uv = f0 == 0 |
|
|
f0 = norm_f0(f0, uv, f0_mean, f0_std, pitch_norm, use_uv) |
|
|
if sum(uv) == len(f0): |
|
|
f0[uv] = 0 |
|
|
elif sum(uv) > 0: |
|
|
f0[uv] = np.interp(np.where(uv)[0], np.where(~uv)[0], f0[~uv]) |
|
|
uv = torch.as_tensor(uv).float() |
|
|
f0 = torch.as_tensor(f0).float() |
|
|
if is_torch: |
|
|
f0 = f0.to(device) |
|
|
return f0, uv |
|
|
|
|
|
|
|
|
def denorm_f0( |
|
|
f0, |
|
|
uv, |
|
|
pitch_norm="log", |
|
|
f0_mean=None, |
|
|
f0_std=None, |
|
|
pitch_padding=None, |
|
|
min=None, |
|
|
max=None, |
|
|
use_uv=True |
|
|
): |
|
|
if pitch_norm == 'standard': |
|
|
f0 = f0*f0_std + f0_mean |
|
|
if pitch_norm == 'log': |
|
|
f0 = 2**f0 |
|
|
if min is not None: |
|
|
f0 = f0.clamp(min=min) |
|
|
if max is not None: |
|
|
f0 = f0.clamp(max=max) |
|
|
if uv is not None and use_uv: |
|
|
f0[uv > 0] = 0 |
|
|
if pitch_padding is not None: |
|
|
f0[pitch_padding] = 0 |
|
|
return f0 |
|
|
|
|
|
|
|
|
def librosa_pad_lr(x, fshift, pad_sides=1): |
|
|
'''compute right padding (final frame) or both sides padding (first and final frames) |
|
|
''' |
|
|
assert pad_sides in (1, 2) |
|
|
|
|
|
pad = (x.shape[0] // fshift + 1) * fshift - x.shape[0] |
|
|
if pad_sides == 1: |
|
|
return 0, pad |
|
|
else: |
|
|
return pad // 2, pad//2 + pad%2 |
|
|
|
|
|
|
|
|
def get_pitch( |
|
|
wav_file: Union[str, Path], sample_rate: int, frame_shift: float |
|
|
): |
|
|
import parselmouth |
|
|
hop_size = int(frame_shift * sample_rate) |
|
|
wav, _ = librosa.core.load(wav_file, sr=sample_rate) |
|
|
|
|
|
|
|
|
|
|
|
latent_length = wav.shape[0] // hop_size |
|
|
f0_min = 80 |
|
|
f0_max = 750 |
|
|
pad_size = 4 |
|
|
|
|
|
f0 = parselmouth.Sound(wav, sample_rate).to_pitch_ac( |
|
|
time_step=frame_shift, |
|
|
voicing_threshold=0.6, |
|
|
pitch_floor=f0_min, |
|
|
pitch_ceiling=f0_max |
|
|
).selected_array['frequency'] |
|
|
delta_l = latent_length - len(f0) |
|
|
if delta_l > 0: |
|
|
f0 = np.concatenate([f0, [f0[-1]] * delta_l], 0) |
|
|
pitch_coarse = f0_to_coarse(f0) |
|
|
return f0, pitch_coarse |
|
|
|
|
|
|
|
|
def remove_empty_lines(text): |
|
|
"""remove empty lines""" |
|
|
assert (len(text) > 0) |
|
|
assert (isinstance(text, list)) |
|
|
text = [t.strip() for t in text] |
|
|
if "" in text: |
|
|
text.remove("") |
|
|
return text |
|
|
|
|
|
|
|
|
def is_sil_phoneme(p): |
|
|
return not p[0].isalpha() |
|
|
|
|
|
|
|
|
def strip_ids(ids, ids_to_strip): |
|
|
"""Strip ids_to_strip from the end ids.""" |
|
|
ids = list(ids) |
|
|
while ids and ids[-1] in ids_to_strip: |
|
|
ids.pop() |
|
|
return ids |
|
|
|
|
|
|
|
|
class TextEncoder(object): |
|
|
"""Base class for converting from ints to/from human readable strings.""" |
|
|
def __init__(self, num_reserved_ids=NUM_RESERVED_TOKENS): |
|
|
self._num_reserved_ids = num_reserved_ids |
|
|
|
|
|
@property |
|
|
def num_reserved_ids(self): |
|
|
return self._num_reserved_ids |
|
|
|
|
|
def encode(self, s): |
|
|
"""Transform a human-readable string into a sequence of int ids. |
|
|
|
|
|
The ids should be in the range [num_reserved_ids, vocab_size). Ids [0, |
|
|
num_reserved_ids) are reserved. |
|
|
|
|
|
EOS is not appended. |
|
|
|
|
|
Args: |
|
|
s: human-readable string to be converted. |
|
|
|
|
|
Returns: |
|
|
ids: list of integers |
|
|
""" |
|
|
return [int(w) + self._num_reserved_ids for w in s.split()] |
|
|
|
|
|
def decode(self, ids, strip_extraneous=False): |
|
|
"""Transform a sequence of int ids into a human-readable string. |
|
|
|
|
|
EOS is not expected in ids. |
|
|
|
|
|
Args: |
|
|
ids: list of integers to be converted. |
|
|
strip_extraneous: bool, whether to strip off extraneous tokens |
|
|
(EOS and PAD). |
|
|
|
|
|
Returns: |
|
|
s: human-readable string. |
|
|
""" |
|
|
if strip_extraneous: |
|
|
ids = strip_ids(ids, list(range(self._num_reserved_ids or 0))) |
|
|
return " ".join(self.decode_list(ids)) |
|
|
|
|
|
def decode_list(self, ids): |
|
|
"""Transform a sequence of int ids into a their string versions. |
|
|
|
|
|
This method supports transforming individual input/output ids to their |
|
|
string versions so that sequence to/from text conversions can be visualized |
|
|
in a human readable format. |
|
|
|
|
|
Args: |
|
|
ids: list of integers to be converted. |
|
|
|
|
|
Returns: |
|
|
strs: list of human-readable string. |
|
|
""" |
|
|
decoded_ids = [] |
|
|
for id_ in ids: |
|
|
if 0 <= id_ < self._num_reserved_ids: |
|
|
decoded_ids.append(RESERVED_TOKENS[int(id_)]) |
|
|
else: |
|
|
decoded_ids.append(id_ - self._num_reserved_ids) |
|
|
return [str(d) for d in decoded_ids] |
|
|
|
|
|
@property |
|
|
def vocab_size(self): |
|
|
raise NotImplementedError() |
|
|
|
|
|
|
|
|
class TokenTextEncoder(TextEncoder): |
|
|
"""Encoder based on a user-supplied vocabulary (file or list).""" |
|
|
def __init__( |
|
|
self, |
|
|
vocab_filename, |
|
|
reverse=False, |
|
|
vocab_list=None, |
|
|
replace_oov=None, |
|
|
num_reserved_ids=NUM_RESERVED_TOKENS |
|
|
): |
|
|
"""Initialize from a file or list, one token per line. |
|
|
|
|
|
Handling of reserved tokens works as follows: |
|
|
- When initializing from a list, we add reserved tokens to the vocab. |
|
|
- When initializing from a file, we do not add reserved tokens to the vocab. |
|
|
- When saving vocab files, we save reserved tokens to the file. |
|
|
|
|
|
Args: |
|
|
vocab_filename: If not None, the full filename to read vocab from. If this |
|
|
is not None, then vocab_list should be None. |
|
|
reverse: Boolean indicating if tokens should be reversed during encoding |
|
|
and decoding. |
|
|
vocab_list: If not None, a list of elements of the vocabulary. If this is |
|
|
not None, then vocab_filename should be None. |
|
|
replace_oov: If not None, every out-of-vocabulary token seen when |
|
|
encoding will be replaced by this string (which must be in vocab). |
|
|
num_reserved_ids: Number of IDs to save for reserved tokens like <EOS>. |
|
|
""" |
|
|
super(TokenTextEncoder, |
|
|
self).__init__(num_reserved_ids=num_reserved_ids) |
|
|
self._reverse = reverse |
|
|
self._replace_oov = replace_oov |
|
|
if vocab_filename: |
|
|
self._init_vocab_from_file(vocab_filename) |
|
|
else: |
|
|
assert vocab_list is not None |
|
|
self._init_vocab_from_list(vocab_list) |
|
|
self.pad_index = self._token_to_id[PAD] |
|
|
self.eos_index = self._token_to_id[EOS] |
|
|
self.unk_index = self._token_to_id[UNK] |
|
|
self.seg_index = self._token_to_id[ |
|
|
SEG] if SEG in self._token_to_id else self.eos_index |
|
|
|
|
|
def encode(self, s): |
|
|
"""Converts a space-separated string of tokens to a list of ids.""" |
|
|
sentence = s |
|
|
tokens = sentence.strip().split() |
|
|
if self._replace_oov is not None: |
|
|
tokens = [ |
|
|
t if t in self._token_to_id else self._replace_oov |
|
|
for t in tokens |
|
|
] |
|
|
ret = [self._token_to_id[tok] for tok in tokens] |
|
|
return ret[::-1] if self._reverse else ret |
|
|
|
|
|
def decode(self, ids, strip_eos=False, strip_padding=False): |
|
|
if strip_padding and self.pad() in list(ids): |
|
|
pad_pos = list(ids).index(self.pad()) |
|
|
ids = ids[:pad_pos] |
|
|
if strip_eos and self.eos() in list(ids): |
|
|
eos_pos = list(ids).index(self.eos()) |
|
|
ids = ids[:eos_pos] |
|
|
return " ".join(self.decode_list(ids)) |
|
|
|
|
|
def decode_list(self, ids): |
|
|
seq = reversed(ids) if self._reverse else ids |
|
|
return [self._safe_id_to_token(i) for i in seq] |
|
|
|
|
|
@property |
|
|
def vocab_size(self): |
|
|
return len(self._id_to_token) |
|
|
|
|
|
def __len__(self): |
|
|
return self.vocab_size |
|
|
|
|
|
def _safe_id_to_token(self, idx): |
|
|
return self._id_to_token.get(idx, "ID_%d" % idx) |
|
|
|
|
|
def _init_vocab_from_file(self, filename): |
|
|
"""Load vocab from a file. |
|
|
|
|
|
Args: |
|
|
filename: The file to load vocabulary from. |
|
|
""" |
|
|
with open(filename) as f: |
|
|
tokens = [token.strip() for token in f.readlines()] |
|
|
|
|
|
def token_gen(): |
|
|
for token in tokens: |
|
|
yield token |
|
|
|
|
|
self._init_vocab(token_gen(), add_reserved_tokens=False) |
|
|
|
|
|
def _init_vocab_from_list(self, vocab_list): |
|
|
"""Initialize tokens from a list of tokens. |
|
|
|
|
|
It is ok if reserved tokens appear in the vocab list. They will be |
|
|
removed. The set of tokens in vocab_list should be unique. |
|
|
|
|
|
Args: |
|
|
vocab_list: A list of tokens. |
|
|
""" |
|
|
def token_gen(): |
|
|
for token in vocab_list: |
|
|
if token not in RESERVED_TOKENS: |
|
|
yield token |
|
|
|
|
|
self._init_vocab(token_gen()) |
|
|
|
|
|
def _init_vocab(self, token_generator, add_reserved_tokens=True): |
|
|
"""Initialize vocabulary with tokens from token_generator.""" |
|
|
|
|
|
self._id_to_token = {} |
|
|
non_reserved_start_index = 0 |
|
|
|
|
|
if add_reserved_tokens: |
|
|
self._id_to_token.update(enumerate(RESERVED_TOKENS)) |
|
|
non_reserved_start_index = len(RESERVED_TOKENS) |
|
|
|
|
|
self._id_to_token.update( |
|
|
enumerate(token_generator, start=non_reserved_start_index) |
|
|
) |
|
|
|
|
|
|
|
|
self._token_to_id = dict((v, k) |
|
|
for k, v in six.iteritems(self._id_to_token)) |
|
|
|
|
|
def pad(self): |
|
|
return self.pad_index |
|
|
|
|
|
def eos(self): |
|
|
return self.eos_index |
|
|
|
|
|
def unk(self): |
|
|
return self.unk_index |
|
|
|
|
|
def seg(self): |
|
|
return self.seg_index |
|
|
|
|
|
def store_to_file(self, filename): |
|
|
"""Write vocab file to disk. |
|
|
|
|
|
Vocab files have one token per line. The file ends in a newline. Reserved |
|
|
tokens are written to the vocab file as well. |
|
|
|
|
|
Args: |
|
|
filename: Full path of the file to store the vocab to. |
|
|
""" |
|
|
with open(filename, "w") as f: |
|
|
for i in range(len(self._id_to_token)): |
|
|
f.write(self._id_to_token[i] + "\n") |
|
|
|
|
|
def sil_phonemes(self): |
|
|
return [p for p in self._id_to_token.values() if not p[0].isalpha()] |
|
|
|
|
|
|
|
|
class TextGrid(object): |
|
|
def __init__(self, text): |
|
|
text = remove_empty_lines(text) |
|
|
self.text = text |
|
|
self.line_count = 0 |
|
|
self._get_type() |
|
|
self._get_time_intval() |
|
|
self._get_size() |
|
|
self.tier_list = [] |
|
|
self._get_item_list() |
|
|
|
|
|
def _extract_pattern(self, pattern, inc): |
|
|
""" |
|
|
Parameters |
|
|
---------- |
|
|
pattern : regex to extract pattern |
|
|
inc : increment of line count after extraction |
|
|
Returns |
|
|
------- |
|
|
group : extracted info |
|
|
""" |
|
|
try: |
|
|
group = re.match(pattern, self.text[self.line_count]).group(1) |
|
|
self.line_count += inc |
|
|
except AttributeError: |
|
|
raise ValueError( |
|
|
"File format error at line %d:%s" % |
|
|
(self.line_count, self.text[self.line_count]) |
|
|
) |
|
|
return group |
|
|
|
|
|
def _get_type(self): |
|
|
self.file_type = self._extract_pattern(r"File type = \"(.*)\"", 2) |
|
|
|
|
|
def _get_time_intval(self): |
|
|
self.xmin = self._extract_pattern(r"xmin = (.*)", 1) |
|
|
self.xmax = self._extract_pattern(r"xmax = (.*)", 2) |
|
|
|
|
|
def _get_size(self): |
|
|
self.size = int(self._extract_pattern(r"size = (.*)", 2)) |
|
|
|
|
|
def _get_item_list(self): |
|
|
"""Only supports IntervalTier currently""" |
|
|
for itemIdx in range(1, self.size + 1): |
|
|
tier = OrderedDict() |
|
|
item_list = [] |
|
|
tier_idx = self._extract_pattern(r"item \[(.*)\]:", 1) |
|
|
tier_class = self._extract_pattern(r"class = \"(.*)\"", 1) |
|
|
if tier_class != "IntervalTier": |
|
|
raise NotImplementedError( |
|
|
"Only IntervalTier class is supported currently" |
|
|
) |
|
|
tier_name = self._extract_pattern(r"name = \"(.*)\"", 1) |
|
|
tier_xmin = self._extract_pattern(r"xmin = (.*)", 1) |
|
|
tier_xmax = self._extract_pattern(r"xmax = (.*)", 1) |
|
|
tier_size = self._extract_pattern(r"intervals: size = (.*)", 1) |
|
|
for i in range(int(tier_size)): |
|
|
item = OrderedDict() |
|
|
item["idx"] = self._extract_pattern(r"intervals \[(.*)\]", 1) |
|
|
item["xmin"] = self._extract_pattern(r"xmin = (.*)", 1) |
|
|
item["xmax"] = self._extract_pattern(r"xmax = (.*)", 1) |
|
|
item["text"] = self._extract_pattern(r"text = \"(.*)\"", 1) |
|
|
item_list.append(item) |
|
|
tier["idx"] = tier_idx |
|
|
tier["class"] = tier_class |
|
|
tier["name"] = tier_name |
|
|
tier["xmin"] = tier_xmin |
|
|
tier["xmax"] = tier_xmax |
|
|
tier["size"] = tier_size |
|
|
tier["items"] = item_list |
|
|
self.tier_list.append(tier) |
|
|
|
|
|
def toJson(self): |
|
|
_json = OrderedDict() |
|
|
_json["file_type"] = self.file_type |
|
|
_json["xmin"] = self.xmin |
|
|
_json["xmax"] = self.xmax |
|
|
_json["size"] = self.size |
|
|
_json["tiers"] = self.tier_list |
|
|
return json.dumps(_json, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
def read_duration_from_textgrid( |
|
|
textgrid_path: Union[str, Path], |
|
|
phoneme: str, |
|
|
utterance_duration: float, |
|
|
): |
|
|
ph_list = phoneme.split(" ") |
|
|
with open(textgrid_path, "r") as f: |
|
|
textgrid = f.readlines() |
|
|
textgrid = remove_empty_lines(textgrid) |
|
|
textgrid = TextGrid(textgrid) |
|
|
textgrid = json.loads(textgrid.toJson()) |
|
|
|
|
|
split = np.ones(len(ph_list) + 1, np.float) * -1 |
|
|
tg_idx = 0 |
|
|
ph_idx = 0 |
|
|
tg_align = [x for x in textgrid['tiers'][-1]['items']] |
|
|
tg_align_ = [] |
|
|
for x in tg_align: |
|
|
x['xmin'] = float(x['xmin']) |
|
|
x['xmax'] = float(x['xmax']) |
|
|
if x['text'] in ['sil', 'sp', '', 'SIL', 'PUNC']: |
|
|
x['text'] = '' |
|
|
if len(tg_align_) > 0 and tg_align_[-1]['text'] == '': |
|
|
tg_align_[-1]['xmax'] = x['xmax'] |
|
|
continue |
|
|
tg_align_.append(x) |
|
|
tg_align = tg_align_ |
|
|
tg_len = len([x for x in tg_align if x['text'] != '']) |
|
|
ph_len = len([x for x in ph_list if not is_sil_phoneme(x)]) |
|
|
assert tg_len == ph_len, (tg_len, ph_len, tg_align, ph_list, textgrid_path) |
|
|
while tg_idx < len(tg_align) or ph_idx < len(ph_list): |
|
|
if tg_idx == len(tg_align) and is_sil_phoneme(ph_list[ph_idx]): |
|
|
split[ph_idx] = 1e8 |
|
|
ph_idx += 1 |
|
|
continue |
|
|
x = tg_align[tg_idx] |
|
|
if x['text'] == '' and ph_idx == len(ph_list): |
|
|
tg_idx += 1 |
|
|
continue |
|
|
assert ph_idx < len(ph_list), ( |
|
|
tg_len, ph_len, tg_align, ph_list, textgrid_path |
|
|
) |
|
|
|
|
|
ph = ph_list[ph_idx] |
|
|
if x['text'] == '' and not is_sil_phoneme(ph): |
|
|
assert False, (ph_list, tg_align) |
|
|
if x['text'] != '' and is_sil_phoneme(ph): |
|
|
ph_idx += 1 |
|
|
else: |
|
|
assert (x['text'] == '' and is_sil_phoneme(ph)) \ |
|
|
or x['text'].lower() == ph.lower() \ |
|
|
or x['text'].lower() == 'sil', (x['text'], ph) |
|
|
split[ph_idx] = x['xmin'] |
|
|
if ph_idx > 0 and split[ph_idx - 1] == -1 and is_sil_phoneme( |
|
|
ph_list[ph_idx - 1] |
|
|
): |
|
|
split[ph_idx - 1] = split[ph_idx] |
|
|
ph_idx += 1 |
|
|
tg_idx += 1 |
|
|
assert tg_idx == len(tg_align), (tg_idx, [x['text'] for x in tg_align]) |
|
|
assert ph_idx >= len(ph_list) - 1, ( |
|
|
ph_idx, ph_list, len(ph_list), [x['text'] |
|
|
for x in tg_align], textgrid_path |
|
|
) |
|
|
|
|
|
split[0] = 0 |
|
|
split[-1] = utterance_duration |
|
|
duration = np.diff(split) |
|
|
return duration |
|
|
|