| | import logging |
| | import json |
| |
|
| | import torch |
| | from pathlib import Path |
| | from unicodedata import category, normalize |
| | from tokenizers import Tokenizer |
| | from huggingface_hub import hf_hub_download |
| |
|
| |
|
| | |
| | SOT = "[START]" |
| | EOT = "[STOP]" |
| | UNK = "[UNK]" |
| | SPACE = "[SPACE]" |
| | SPECIAL_TOKENS = [SOT, EOT, UNK, SPACE, "[PAD]", "[SEP]", "[CLS]", "[MASK]"] |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class EnTokenizer: |
| | def __init__(self, vocab_file_path): |
| | self.tokenizer: Tokenizer = Tokenizer.from_file(vocab_file_path) |
| | self.check_vocabset_sot_eot() |
| |
|
| | def check_vocabset_sot_eot(self): |
| | voc = self.tokenizer.get_vocab() |
| | assert SOT in voc |
| | assert EOT in voc |
| |
|
| | def text_to_tokens(self, text: str): |
| | text_tokens = self.encode(text) |
| | text_tokens = torch.IntTensor(text_tokens).unsqueeze(0) |
| | return text_tokens |
| |
|
| | def encode(self, txt: str): |
| | """ |
| | clean_text > (append `lang_id`) > replace SPACE > encode text using Tokenizer |
| | """ |
| | txt = txt.replace(' ', SPACE) |
| | code = self.tokenizer.encode(txt) |
| | ids = code.ids |
| | return ids |
| |
|
| | def decode(self, seq): |
| | if isinstance(seq, torch.Tensor): |
| | seq = seq.cpu().numpy() |
| |
|
| | txt: str = self.tokenizer.decode(seq, skip_special_tokens=False) |
| | txt = txt.replace(' ', '') |
| | txt = txt.replace(SPACE, ' ') |
| | txt = txt.replace(EOT, '') |
| | txt = txt.replace(UNK, '') |
| | return txt |
| |
|
| |
|
| | |
| | REPO_ID = "ResembleAI/chatterbox" |
| |
|
| | |
| | _kakasi = None |
| | _dicta = None |
| | _russian_stresser = None |
| |
|
| |
|
| | def is_kanji(c: str) -> bool: |
| | """Check if character is kanji.""" |
| | return 19968 <= ord(c) <= 40959 |
| |
|
| |
|
| | def is_katakana(c: str) -> bool: |
| | """Check if character is katakana.""" |
| | return 12449 <= ord(c) <= 12538 |
| |
|
| |
|
| | def hiragana_normalize(text: str) -> str: |
| | """Japanese text normalization: converts kanji to hiragana; katakana remains the same.""" |
| | global _kakasi |
| | |
| | try: |
| | if _kakasi is None: |
| | import pykakasi |
| | _kakasi = pykakasi.kakasi() |
| | |
| | result = _kakasi.convert(text) |
| | out = [] |
| | |
| | for r in result: |
| | inp = r['orig'] |
| | hira = r["hira"] |
| |
|
| | |
| | if any([is_kanji(c) for c in inp]): |
| | if hira and hira[0] in ["は", "へ"]: |
| | hira = " " + hira |
| | out.append(hira) |
| |
|
| | |
| | elif all([is_katakana(c) for c in inp]) if inp else False: |
| | out.append(r['orig']) |
| |
|
| | else: |
| | out.append(inp) |
| | |
| | normalized_text = "".join(out) |
| | |
| | |
| | import unicodedata |
| | normalized_text = unicodedata.normalize('NFKD', normalized_text) |
| | |
| | return normalized_text |
| | |
| | except ImportError: |
| | logger.warning("pykakasi not available - Japanese text processing skipped") |
| | return text |
| |
|
| |
|
| | def add_hebrew_diacritics(text: str) -> str: |
| | """Hebrew text normalization: adds diacritics to Hebrew text.""" |
| | global _dicta |
| | |
| | try: |
| | if _dicta is None: |
| | from dicta_onnx import Dicta |
| | _dicta = Dicta() |
| | |
| | return _dicta.add_diacritics(text) |
| | |
| | except ImportError: |
| | logger.warning("dicta_onnx not available - Hebrew text processing skipped") |
| | return text |
| | except Exception as e: |
| | logger.warning(f"Hebrew diacritization failed: {e}") |
| | return text |
| |
|
| |
|
| | def korean_normalize(text: str) -> str: |
| | """Korean text normalization: decompose syllables into Jamo for tokenization.""" |
| | |
| | def decompose_hangul(char): |
| | """Decompose Korean syllable into Jamo components.""" |
| | if not ('\uac00' <= char <= '\ud7af'): |
| | return char |
| | |
| | |
| | base = ord(char) - 0xAC00 |
| | initial = chr(0x1100 + base // (21 * 28)) |
| | medial = chr(0x1161 + (base % (21 * 28)) // 28) |
| | final = chr(0x11A7 + base % 28) if base % 28 > 0 else '' |
| | |
| | return initial + medial + final |
| | |
| | |
| | result = ''.join(decompose_hangul(char) for char in text) |
| | return result.strip() |
| |
|
| |
|
| | class ChineseCangjieConverter: |
| | """Converts Chinese characters to Cangjie codes for tokenization.""" |
| | |
| | def __init__(self, model_dir=None): |
| | self.word2cj = {} |
| | self.cj2word = {} |
| | self.segmenter = None |
| | self._load_cangjie_mapping(model_dir) |
| | self._init_segmenter() |
| | |
| | def _load_cangjie_mapping(self, model_dir=None): |
| | """Load Cangjie mapping from HuggingFace model repository.""" |
| | try: |
| | cangjie_file = hf_hub_download( |
| | repo_id=REPO_ID, |
| | filename="Cangjie5_TC.json", |
| | cache_dir=model_dir |
| | ) |
| | |
| | with open(cangjie_file, "r", encoding="utf-8") as fp: |
| | data = json.load(fp) |
| | |
| | for entry in data: |
| | word, code = entry.split("\t")[:2] |
| | self.word2cj[word] = code |
| | if code not in self.cj2word: |
| | self.cj2word[code] = [word] |
| | else: |
| | self.cj2word[code].append(word) |
| | |
| | except Exception as e: |
| | logger.warning(f"Could not load Cangjie mapping: {e}") |
| | |
| | def _init_segmenter(self): |
| | """Initialize pkuseg segmenter.""" |
| | try: |
| | from spacy_pkuseg import pkuseg |
| | self.segmenter = pkuseg() |
| | except ImportError: |
| | logger.warning("pkuseg not available - Chinese segmentation will be skipped") |
| | self.segmenter = None |
| | |
| | def _cangjie_encode(self, glyph: str): |
| | """Encode a single Chinese glyph to Cangjie code.""" |
| | normed_glyph = glyph |
| | code = self.word2cj.get(normed_glyph, None) |
| | if code is None: |
| | return None |
| | index = self.cj2word[code].index(normed_glyph) |
| | index = str(index) if index > 0 else "" |
| | return code + str(index) |
| | |
| | |
| | def __call__(self, text): |
| | """Convert Chinese characters in text to Cangjie tokens.""" |
| | output = [] |
| | if self.segmenter is not None: |
| | segmented_words = self.segmenter.cut(text) |
| | full_text = " ".join(segmented_words) |
| | else: |
| | full_text = text |
| | |
| | for t in full_text: |
| | if category(t) == "Lo": |
| | cangjie = self._cangjie_encode(t) |
| | if cangjie is None: |
| | output.append(t) |
| | continue |
| | code = [] |
| | for c in cangjie: |
| | code.append(f"[cj_{c}]") |
| | code.append("[cj_.]") |
| | code = "".join(code) |
| | output.append(code) |
| | else: |
| | output.append(t) |
| | return "".join(output) |
| |
|
| |
|
| | def add_russian_stress(text: str) -> str: |
| | """Russian text normalization: adds stress marks to Russian text.""" |
| | global _russian_stresser |
| | |
| | try: |
| | if _russian_stresser is None: |
| | from russian_text_stresser.text_stresser import RussianTextStresser |
| | _russian_stresser = RussianTextStresser() |
| | |
| | return _russian_stresser.stress_text(text) |
| | |
| | except ImportError: |
| | logger.warning("russian_text_stresser not available - Russian stress labeling skipped") |
| | return text |
| | except Exception as e: |
| | logger.warning(f"Russian stress labeling failed: {e}") |
| | return text |
| |
|
| |
|
| | class MTLTokenizer: |
| | def __init__(self, vocab_file_path): |
| | self.tokenizer: Tokenizer = Tokenizer.from_file(vocab_file_path) |
| | model_dir = Path(vocab_file_path).parent |
| | self.cangjie_converter = ChineseCangjieConverter(model_dir) |
| | self.check_vocabset_sot_eot() |
| |
|
| | def check_vocabset_sot_eot(self): |
| | voc = self.tokenizer.get_vocab() |
| | assert SOT in voc |
| | assert EOT in voc |
| |
|
| | def preprocess_text(self, raw_text: str, language_id: str = None, lowercase: bool = True, nfkd_normalize: bool = True): |
| | """ |
| | Text preprocessor that handles lowercase conversion and NFKD normalization. |
| | """ |
| | preprocessed_text = raw_text |
| | if lowercase: |
| | preprocessed_text = preprocessed_text.lower() |
| | if nfkd_normalize: |
| | preprocessed_text = normalize("NFKD", preprocessed_text) |
| | |
| | return preprocessed_text |
| |
|
| | def text_to_tokens(self, text: str, language_id: str = None, lowercase: bool = True, nfkd_normalize: bool = True): |
| | text_tokens = self.encode(text, language_id=language_id, lowercase=lowercase, nfkd_normalize=nfkd_normalize) |
| | text_tokens = torch.IntTensor(text_tokens).unsqueeze(0) |
| | return text_tokens |
| |
|
| | def encode(self, txt: str, language_id: str = None, lowercase: bool = True, nfkd_normalize: bool = True): |
| | txt = self.preprocess_text(txt, language_id=language_id, lowercase=lowercase, nfkd_normalize=nfkd_normalize) |
| | |
| | |
| | if language_id == 'zh': |
| | txt = self.cangjie_converter(txt) |
| | elif language_id == 'ja': |
| | txt = hiragana_normalize(txt) |
| | elif language_id == 'he': |
| | txt = add_hebrew_diacritics(txt) |
| | elif language_id == 'ko': |
| | txt = korean_normalize(txt) |
| | elif language_id == 'ru': |
| | txt = add_russian_stress(txt) |
| | |
| | |
| | if language_id: |
| | txt = f"[{language_id.lower()}]{txt}" |
| | |
| | txt = txt.replace(' ', SPACE) |
| | return self.tokenizer.encode(txt).ids |
| |
|
| | def decode(self, seq): |
| | if isinstance(seq, torch.Tensor): |
| | seq = seq.cpu().numpy() |
| |
|
| | txt = self.tokenizer.decode(seq, skip_special_tokens=False) |
| | txt = txt.replace(' ', '').replace(SPACE, ' ').replace(EOT, '').replace(UNK, '') |
| | return txt |
| |
|