| |
| |
|
|
| import os |
| from logging import getLogger |
| from typing import List |
|
|
| from sentencepiece import SentencePieceProcessor |
|
|
|
|
| logger = getLogger() |
|
|
|
|
| class TextTokenizer: |
| """Tokenizing and encoding/decoding text using SentencePiece.""" |
|
|
| def __init__(self, model_path=None): |
| """ |
| Initializes the Tokenizer with a SentencePiece model. |
| |
| Args: |
| model_path (str): The path to the SentencePiece model file. |
| """ |
| if model_path is None: |
| model_path = os.path.join( |
| os.path.dirname(os.path.abspath(__file__)), "text_tokenizer.model" |
| ) |
| |
| assert os.path.isfile(model_path), model_path |
| self.sp_model = SentencePieceProcessor(model_file=model_path) |
| logger.info(f"Reloaded SentencePiece model from {model_path}") |
| |
| self.n_words: int = self.sp_model.vocab_size() |
| self.bos_id: int = self.sp_model.bos_id() |
| self.eos_id: int = self.sp_model.eos_id() |
| self.pad_id: int = self.sp_model.pad_id() |
| self.pad_id += self.n_words if self.pad_id < 0 else 0 |
| logger.info(f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}") |
| assert self.sp_model.vocab_size() == self.sp_model.get_piece_size() |
|
|
| def encode(self, s: str, bos: bool, eos: bool) -> List[int]: |
| """ |
| Encodes a string into a list of token IDs. |
| |
| Args: |
| s (str): The input string to be encoded. |
| bos (bool): Whether to prepend the beginning-of-sequence token. |
| eos (bool): Whether to append the end-of-sequence token. |
| |
| Returns: |
| List[int]: A list of token IDs. |
| """ |
| assert type(s) is str |
| t = self.sp_model.encode(s) |
| if bos: |
| t = [self.bos_id] + t |
| if eos: |
| t = t + [self.eos_id] |
| return t |
|
|
| def decode(self, t: List[int]) -> str: |
| """ |
| Decodes a list of token IDs into a string. |
| |
| Args: |
| t (List[int]): The list of token IDs to be decoded. |
| |
| Returns: |
| str: The decoded string. |
| """ |
| return self.sp_model.decode(t) |
|
|
| def tokenize(self, texts, context_length=None): |
| """Encode a list of string. |
| |
| Parameters |
| ---------- |
| texts : Union[str, List[str]] |
| The input text(s). |
| context_length : int, optional |
| The max token length. |
| |
| Returns |
| ------- |
| List[List[int]] |
| The encoded token indices. |
| |
| """ |
| if isinstance(texts, str): |
| texts = [texts] |
| tokens = [self.encode(text, bos=True, eos=True) for text in texts] |
| if context_length is None: |
| return tokens |
| truncated_tokens = [] |
| for k, t in enumerate(tokens): |
| if len(t) > context_length: |
| t = t[:context_length] |
| t[-1] = self.eos_id |
| truncated_tokens.append(t) |
| return truncated_tokens |
|
|
| def detokenize(self, tokens): |
| """Decode a list of string. |
| |
| Parameters |
| ---------- |
| tokens : Union[List[List[int]], numpy.ndarray] |
| The input tokens. |
| |
| Returns |
| ------- |
| List[str] |
| The decoded text strings. |
| |
| """ |
| if hasattr(tokens, "tolist"): |
| tokens = tokens.tolist() |
| texts = [] |
| for i in range(len(tokens)): |
| t = tokens[i][1:] |
| try: |
| eot_idx = t.index(self.eos_id) |
| t = t[:eot_idx] |
| except ValueError: |
| pass |
| texts.append(self.decode(t)) |
| return texts |
|
|