| import pickle |
| import os |
| import re |
| import wordsegment |
| from g2p_en import G2p |
|
|
| from text.symbols import punctuation |
|
|
| from text.symbols2 import symbols |
|
|
| import unicodedata |
| from builtins import str as unicode |
| from g2p_en.expand import normalize_numbers |
| from nltk.tokenize import TweetTokenizer |
| word_tokenize = TweetTokenizer().tokenize |
| from nltk import pos_tag |
|
|
| current_file_path = os.path.dirname(__file__) |
| CMU_DICT_PATH = os.path.join(current_file_path, "cmudict.rep") |
| CMU_DICT_FAST_PATH = os.path.join(current_file_path, "cmudict-fast.rep") |
| CMU_DICT_HOT_PATH = os.path.join(current_file_path, "engdict-hot.rep") |
| CACHE_PATH = os.path.join(current_file_path, "engdict_cache.pickle") |
| NAMECACHE_PATH = os.path.join(current_file_path, "namedict_cache.pickle") |
|
|
| arpa = { |
| "AH0", |
| "S", |
| "AH1", |
| "EY2", |
| "AE2", |
| "EH0", |
| "OW2", |
| "UH0", |
| "NG", |
| "B", |
| "G", |
| "AY0", |
| "M", |
| "AA0", |
| "F", |
| "AO0", |
| "ER2", |
| "UH1", |
| "IY1", |
| "AH2", |
| "DH", |
| "IY0", |
| "EY1", |
| "IH0", |
| "K", |
| "N", |
| "W", |
| "IY2", |
| "T", |
| "AA1", |
| "ER1", |
| "EH2", |
| "OY0", |
| "UH2", |
| "UW1", |
| "Z", |
| "AW2", |
| "AW1", |
| "V", |
| "UW2", |
| "AA2", |
| "ER", |
| "AW0", |
| "UW0", |
| "R", |
| "OW1", |
| "EH1", |
| "ZH", |
| "AE0", |
| "IH2", |
| "IH", |
| "Y", |
| "JH", |
| "P", |
| "AY1", |
| "EY0", |
| "OY2", |
| "TH", |
| "HH", |
| "D", |
| "ER0", |
| "CH", |
| "AO1", |
| "AE1", |
| "AO2", |
| "OY1", |
| "AY2", |
| "IH1", |
| "OW0", |
| "L", |
| "SH", |
| } |
|
|
|
|
| def replace_phs(phs): |
| rep_map = {"'": "-"} |
| phs_new = [] |
| for ph in phs: |
| if ph in symbols: |
| phs_new.append(ph) |
| elif ph in rep_map.keys(): |
| phs_new.append(rep_map[ph]) |
| else: |
| print("ph not in symbols: ", ph) |
| return phs_new |
|
|
|
|
| def replace_consecutive_punctuation(text): |
| punctuations = ''.join(re.escape(p) for p in punctuation) |
| pattern = f'([{punctuations}])([{punctuations}])+' |
| result = re.sub(pattern, r'\1', text) |
| return result |
|
|
|
|
| def read_dict(): |
| g2p_dict = {} |
| start_line = 49 |
| with open(CMU_DICT_PATH) as f: |
| line = f.readline() |
| line_index = 1 |
| while line: |
| if line_index >= start_line: |
| line = line.strip() |
| word_split = line.split(" ") |
| word = word_split[0].lower() |
|
|
| syllable_split = word_split[1].split(" - ") |
| g2p_dict[word] = [] |
| for syllable in syllable_split: |
| phone_split = syllable.split(" ") |
| g2p_dict[word].append(phone_split) |
|
|
| line_index = line_index + 1 |
| line = f.readline() |
|
|
| return g2p_dict |
|
|
|
|
| def read_dict_new(): |
| g2p_dict = {} |
| with open(CMU_DICT_PATH) as f: |
| line = f.readline() |
| line_index = 1 |
| while line: |
| if line_index >= 57: |
| line = line.strip() |
| word_split = line.split(" ") |
| word = word_split[0].lower() |
| g2p_dict[word] = [word_split[1].split(" ")] |
|
|
| line_index = line_index + 1 |
| line = f.readline() |
|
|
| with open(CMU_DICT_FAST_PATH) as f: |
| line = f.readline() |
| line_index = 1 |
| while line: |
| if line_index >= 0: |
| line = line.strip() |
| word_split = line.split(" ") |
| word = word_split[0].lower() |
| if word not in g2p_dict: |
| g2p_dict[word] = [word_split[1:]] |
|
|
| line_index = line_index + 1 |
| line = f.readline() |
|
|
| return g2p_dict |
|
|
| def hot_reload_hot(g2p_dict): |
| with open(CMU_DICT_HOT_PATH) as f: |
| line = f.readline() |
| line_index = 1 |
| while line: |
| if line_index >= 0: |
| line = line.strip() |
| word_split = line.split(" ") |
| word = word_split[0].lower() |
| |
| g2p_dict[word] = [word_split[1:]] |
|
|
| line_index = line_index + 1 |
| line = f.readline() |
|
|
| return g2p_dict |
|
|
|
|
| def cache_dict(g2p_dict, file_path): |
| with open(file_path, "wb") as pickle_file: |
| pickle.dump(g2p_dict, pickle_file) |
|
|
|
|
| def get_dict(): |
| if os.path.exists(CACHE_PATH): |
| with open(CACHE_PATH, "rb") as pickle_file: |
| g2p_dict = pickle.load(pickle_file) |
| else: |
| g2p_dict = read_dict_new() |
| cache_dict(g2p_dict, CACHE_PATH) |
|
|
| g2p_dict = hot_reload_hot(g2p_dict) |
|
|
| return g2p_dict |
|
|
|
|
| def get_namedict(): |
| if os.path.exists(NAMECACHE_PATH): |
| with open(NAMECACHE_PATH, "rb") as pickle_file: |
| name_dict = pickle.load(pickle_file) |
| else: |
| name_dict = {} |
|
|
| return name_dict |
|
|
|
|
| def text_normalize(text): |
| |
| |
| rep_map = { |
| "[;::,;]": ",", |
| '["’]': "'", |
| "。": ".", |
| "!": "!", |
| "?": "?", |
| } |
| for p, r in rep_map.items(): |
| text = re.sub(p, r, text) |
|
|
| |
| |
| text = unicode(text) |
| text = normalize_numbers(text) |
| text = ''.join(char for char in unicodedata.normalize('NFD', text) |
| if unicodedata.category(char) != 'Mn') |
| text = re.sub("[^ A-Za-z'.,?!\-]", "", text) |
| text = re.sub(r"(?i)i\.e\.", "that is", text) |
| text = re.sub(r"(?i)e\.g\.", "for example", text) |
|
|
| |
| text = replace_consecutive_punctuation(text) |
|
|
| return text |
|
|
|
|
| class en_G2p(G2p): |
| def __init__(self): |
| super().__init__() |
| |
| wordsegment.load() |
|
|
| |
| self.cmu = get_dict() |
| self.namedict = get_namedict() |
|
|
| |
| for word in ["AE", "AI", "AR", "IOS", "HUD", "OS"]: |
| del self.cmu[word.lower()] |
|
|
| |
| self.homograph2features["read"] = (['R', 'IY1', 'D'], ['R', 'EH1', 'D'], 'VBP') |
| self.homograph2features["complex"] = (['K', 'AH0', 'M', 'P', 'L', 'EH1', 'K', 'S'], ['K', 'AA1', 'M', 'P', 'L', 'EH0', 'K', 'S'], 'JJ') |
|
|
|
|
| def __call__(self, text): |
| |
| words = word_tokenize(text) |
| tokens = pos_tag(words) |
|
|
| |
| prons = [] |
| for o_word, pos in tokens: |
| |
| word = o_word.lower() |
|
|
| if re.search("[a-z]", word) is None: |
| pron = [word] |
| |
| elif len(word) == 1: |
| |
| if o_word == "A": |
| pron = ['EY1'] |
| else: |
| pron = self.cmu[word][0] |
| |
| elif word in self.homograph2features: |
| pron1, pron2, pos1 = self.homograph2features[word] |
| if pos.startswith(pos1): |
| pron = pron1 |
| |
| elif len(pos) < len(pos1) and pos == pos1[:len(pos)]: |
| pron = pron1 |
| else: |
| pron = pron2 |
| else: |
| |
| pron = self.qryword(o_word) |
|
|
| prons.extend(pron) |
| prons.extend([" "]) |
|
|
| return prons[:-1] |
|
|
|
|
| def qryword(self, o_word): |
| word = o_word.lower() |
|
|
| |
| if len(word) > 1 and word in self.cmu: |
| return self.cmu[word][0] |
|
|
| |
| if o_word.istitle() and word in self.namedict: |
| return self.namedict[word][0] |
|
|
| |
| if len(word) <= 3: |
| phones = [] |
| for w in word: |
| |
| if w == "a": |
| phones.extend(['EY1']) |
| else: |
| phones.extend(self.cmu[w][0]) |
| return phones |
|
|
| |
| if re.match(r"^([a-z]+)('s)$", word): |
| phones = self.qryword(word[:-2])[:] |
| |
| if phones[-1] in ['P', 'T', 'K', 'F', 'TH', 'HH']: |
| phones.extend(['S']) |
| |
| elif phones[-1] in ['S', 'Z', 'SH', 'ZH', 'CH', 'JH']: |
| phones.extend(['AH0', 'Z']) |
| |
| |
| |
| else: |
| phones.extend(['Z']) |
| return phones |
|
|
| |
| comps = wordsegment.segment(word.lower()) |
|
|
| |
| if len(comps)==1: |
| return self.predict(word) |
|
|
| |
| return [phone for comp in comps for phone in self.qryword(comp)] |
|
|
|
|
| _g2p = en_G2p() |
|
|
|
|
| def g2p(text): |
| |
| phone_list = _g2p(text) |
| phones = [ph if ph != "<unk>" else "UNK" for ph in phone_list if ph not in [" ", "<pad>", "UW", "</s>", "<s>"]] |
|
|
| return replace_phs(phones) |
|
|
|
|
| if __name__ == "__main__": |
| print(g2p("hello")) |
| print(g2p(text_normalize("e.g. I used openai's AI tool to draw a picture."))) |
| print(g2p(text_normalize("In this; paper, we propose 1 DSPGAN, a GAN-based universal vocoder."))) |
|
|