Buckets:
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| import abc | |
| import logging | |
| import os | |
| from copy import copy | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Sequence, Tuple, Union | |
| import numpy as np | |
| import tiktoken | |
| from sentencepiece import SentencePieceProcessor | |
| from tiktoken.load import load_tiktoken_bpe | |
| logger = logging.getLogger(__name__) | |
| class TokenizerArgs: | |
| name: str = "" #"bytes" | |
| path: Optional[str] = None | |
| tokenizers: Optional[List[Dict[str, Any]]] = None | |
| load_supermapping: Optional[bool] = False | |
| dropout: float = 0.0 | |
| # following are needed for supertokenizer | |
| seed: Optional[int] = 42 | |
| superset_code_name: Optional[str] = "super_vocab" | |
| n_words: Optional[int] = None | |
| class Tokenizer(abc.ABC): | |
| mapping: Dict[str, Any] = {} | |
| def encode(self, tokens, add_bos, add_eos): | |
| pass | |
| def decode(self, tokens,skip_special_tokens:bool=True): | |
| pass | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| """Return the offsets of the tokens in the original text. Only used for evaluation.""" | |
| pass | |
| def load_supermapping(self, base_path: str, path: str, superset_code_name: str = "super_vocab") -> None: | |
| import json | |
| mapping_path = Path(base_path, f"{path.replace('/', '--')}_super_mapping.json") | |
| if not mapping_path.exists(): | |
| import huggingface_hub as hf_hub | |
| try: | |
| assert os.environ.get("HF_HUB_OFFLINE") != "1" | |
| repo_id = path | |
| # e.g. fineweb2_hq/flexitok--bpe_arb_Arab_8000_super_mapping.json | |
| # import code; code.interact(local=locals()|globals()) | |
| # 8k_v2/flexitok--bpe_ltr_vie_Latn_8000_v2_super_mapping.json | |
| mapping_path = hf_hub.hf_hub_download( | |
| repo_id, f"{superset_code_name}/{path.replace('/', '--')}_super_mapping.json" | |
| ) | |
| logger.info(f"Downloaded super mapping from HF Hub {repo_id} to {mapping_path}") | |
| # except hf_hub.errors.file | |
| ## backward compatibility for old path format | |
| except hf_hub.errors.RepositoryNotFoundError as e: | |
| assert os.environ.get("HF_HUB_OFFLINE") != "1" | |
| repo_id = f"gsaltintas/supertokenizer-{path.replace('/', '-')}" | |
| mapping_path = hf_hub.hf_hub_download( | |
| repo_id, f"{path.replace('/', '--')}_super_mapping.json" | |
| ) | |
| logger.info(f"Downloaded super mapping from HF Hub {repo_id} to {mapping_path}") | |
| except hf_hub.errors.RepositoryNotFoundError as e: | |
| assert os.environ.get("HF_HUB_OFFLINE") != "1" | |
| repo_id = f"flexitok/supertokenizer-{path.replace('/', '-')}" | |
| mapping_path = hf_hub.hf_hub_download( | |
| repo_id, f"{path.replace('/', '--')}_super_mapping.json" | |
| ) | |
| logger.info(f"Downloaded super mapping from HF Hub {repo_id} to {mapping_path}") | |
| except: | |
| mapping_path = Path(base_path, f"{path.replace('/', '--')}_super_mapping.json") | |
| logger.warning(f"Failed to download super mapping from HF Hub {repo_id}. Trying local path {mapping_path}") | |
| assert os.path.isfile(mapping_path), mapping_path | |
| with open(mapping_path, "r") as f: | |
| mapping = json.load(f) | |
| self.mapping = {int(k): v for k, v in mapping.items()} | |
| logger.info(f"Loaded super mapping from {mapping_path}") | |
| def encode_to_supermapping(self, tokens: List[str], add_bos: bool, add_eos: bool) -> List[int]: | |
| ids = [] | |
| token_ids = self.encode(tokens, add_bos=add_bos, add_eos=add_eos) | |
| if len(self.mapping) == 0: | |
| return token_ids | |
| return [self.mapping[tid] for tid in token_ids if tid in self.mapping ] | |
| def decode_from_supermapping(self, tokens: List[int], skip_special_tokens: bool = True) -> str: | |
| if len(self.mapping) == 0: | |
| return self.decode(tokens, skip_special_tokens=skip_special_tokens) | |
| reverse_mapping = {v: k for k, v in self.mapping.items()} | |
| token_ids = [reverse_mapping[tid] for tid in tokens if tid in reverse_mapping] | |
| return self.decode(token_ids, skip_special_tokens=skip_special_tokens) | |
| class MockTokenizer(Tokenizer): | |
| n_words: int = 256 | |
| def encode(self, tokens, add_bos, add_eos): | |
| return tokens | |
| class ByteTokenizer(Tokenizer): | |
| def __init__(self): | |
| self.bos_id = 256 | |
| self.eos_id = 257 | |
| self.n_words = 258 | |
| def encode(self, s: str, add_bos: bool = False, add_eos: bool = False): | |
| tokens = [self.bos_id] * add_bos + list(s.encode()) + [self.eos_id] * add_eos | |
| return tokens | |
| def decode(self, tokens: List[int],skip_special_tokens:bool=True): | |
| byte_tokens = bytes([t for t in tokens if t < 256]) | |
| return byte_tokens.decode("utf-8", errors="backslashreplace") | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| if tokens is None: | |
| tokens = self.encode(text) | |
| decoded_chars, offsets = [], [] | |
| byte_pos = 0 | |
| for token in tokens: | |
| if token < 256: | |
| char = bytes([token]).decode("utf-8", errors="ignore") | |
| if char: | |
| decoded_chars.append(char) | |
| offsets.append(byte_pos) | |
| byte_pos += len(char.encode("utf-8")) | |
| return decoded_chars, offsets | |
| class SentencePieceTokenizer(Tokenizer): | |
| def __init__(self, model_path: str, alpha: float=0.0) -> None: | |
| assert os.path.isfile(model_path), model_path | |
| self.sp_model = SentencePieceProcessor(model_file=model_path) | |
| self.alpha = alpha | |
| logger.info(f"Reloaded SentencePiece model from {model_path}") | |
| # BOS / EOS token IDs | |
| self.n_words: int = self.sp_model.vocab_size() | |
| self.bos_id: int = self.sp_model.bos_id() | |
| self.eos_id: int = self.sp_model.eos_id() | |
| self.pad_id: int = self.sp_model.pad_id() | |
| logger.info( | |
| f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}" | |
| ) | |
| assert self.sp_model.vocab_size() == self.sp_model.get_piece_size() | |
| def encode(self, s: str, add_bos: bool, add_eos: bool): | |
| assert type(s) is str | |
| # s.encode('New York', out_type=str, enable_sampling=True, alpha=0.1, nbest_size=-1) | |
| tokens = ( | |
| [self.bos_id] * add_bos | |
| + self.sp_model.encode(s, enable_sampling=self.alpha>0, alpha=self.alpha, nbest_size=-1) | |
| + [self.eos_id] * add_eos | |
| ) | |
| return tokens | |
| def decode(self, tokens: List[int],skip_special_tokens:bool=True): | |
| return self.sp_model.decode(tokens) | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| pieces = self.sp_model.encode_as_immutable_proto(text).pieces | |
| substrs = [p.surface for p in pieces] | |
| offsets = [p.begin for p in pieces] | |
| return substrs, offsets | |
| DEFAULT_TIKTOKEN_PATTERN = r"""(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\r\n\p{L}\p{N}]?\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]+[\r\n]*|\s*[\r\n]+|\s+(?!\S)|\s+""" | |
| DEFAULT_TIKTOKEN_SPECIAL_TOKENS = { | |
| "<|begin_of_text|>": 0, | |
| "<|end_of_text|>": 1, | |
| "<|fim_prefix|>": 2, | |
| "<|fim_middle|>": 3, | |
| "<|fim_end_fill|>": 253, | |
| "<|fim_pad|>": 254, | |
| "<|fim_suffix|>": 255, | |
| } | |
| TIKTOKEN_MAX_ENCODE_CHARS = 400_000 | |
| DEFAULT_SPECIAL_TOKENS = { | |
| "bos": ["<|begin_of_text|>", "<s>", "<bos>"], | |
| "eos": ["<|end_of_text|>", "</s>", "<eos>"], | |
| "pad": ["<pad>"], | |
| } | |
| ALIGNED_BOS = "~SPECIAL~ALIGNED~BOS~SYMBOL~" | |
| class TikTokenTokenizer(Tokenizer): | |
| def __init__(self, model_path: str) -> None: | |
| try: | |
| # on Vulcan need to first load these because there is no internet connection on the compute nodes | |
| self.tkt_model = tiktoken.encoding_for_model(model_path) | |
| except: | |
| mergeable_ranks = load_tiktoken_bpe(model_path) | |
| all_special_tokens_with_ids = copy(DEFAULT_TIKTOKEN_SPECIAL_TOKENS) | |
| missing_ids = set(range(256)) - set(all_special_tokens_with_ids.values()) | |
| for id in missing_ids: | |
| all_special_tokens_with_ids[f"<|reserved_special_token_{id}|>"] = id | |
| for name in all_special_tokens_with_ids: | |
| all_special_tokens_with_ids[name] += len(mergeable_ranks) | |
| logger.error(f"Failed to load TikToken model from {model_path}") | |
| mergeable_ranks = load_tiktoken_bpe(model_path) | |
| self.tkt_model = tiktoken.core.Encoding( | |
| name=Path(model_path).stem, | |
| pat_str=DEFAULT_TIKTOKEN_PATTERN, | |
| mergeable_ranks=mergeable_ranks, | |
| special_tokens=all_special_tokens_with_ids, | |
| ) | |
| try: | |
| self.bos_id: int = self.tkt_model.encode_single_token("<|begin_of_text|>") | |
| except: | |
| self.bos_id: int = None | |
| self.eos_id: int = self.tkt_model.encode_single_token("<|endoftext|>") | |
| self.n_words: int = self.tkt_model.n_vocab | |
| logger.info( | |
| f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}" | |
| ) | |
| def encode(self, s: str, add_bos: bool, add_eos: bool): | |
| assert isinstance(s, str) | |
| add_bos = self.bos_id is not None and add_bos | |
| subs = [] | |
| for i in range(0, len(s), TIKTOKEN_MAX_ENCODE_CHARS): | |
| subs.append(s[i : i + TIKTOKEN_MAX_ENCODE_CHARS]) | |
| return ( | |
| [self.bos_id] * add_bos | |
| + sum(self.tkt_model.encode_ordinary_batch(subs), start=[]) | |
| + [self.eos_id] * add_eos | |
| ) | |
| def decode(self, tokens: List[int],skip_special_tokens:bool=False): | |
| return self.tkt_model.decode(tokens) | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| if tokens is not None: | |
| token_bytes = self.tkt_model.decode_tokens_bytes(tokens) | |
| else: | |
| token_bytes = self.tkt_model.decode_tokens_bytes( | |
| self.tkt_model.encode(text, allowed_special="all") | |
| ) | |
| text_len, offsets = 0, [] | |
| for token in token_bytes: | |
| offsets.append(max(0, text_len - (0x80 <= token[0] < 0xC0))) | |
| text_len += sum(1 for c in token if not 0x80 <= c < 0xC0) | |
| substrs = [text[s:e] for s, e in zip(offsets, offsets[1:] + [None])] | |
| return substrs, offsets | |
| def find_id(tokenizer, surfaces: Sequence[str]): | |
| """Look through surfaces to see if any are in the tokenizer's vocab.""" | |
| token_id = None | |
| for surface in surfaces: | |
| token_id = tokenizer.token_to_id(surface) | |
| if token_id is not None: | |
| logger.info("Found id for special token: %s", surface) | |
| break | |
| else: | |
| logger.warning("No id found for special token.") | |
| return token_id | |
| class HFTokenizer(Tokenizer): | |
| def __init__(self, model_path: str, dropout: float = 0) -> None: | |
| try: | |
| import transformers | |
| # Try to load as a transformers.Tokenizer as it includes more | |
| # information about things like bos/eos | |
| transformers_tokenizer = transformers.AutoTokenizer.from_pretrained( | |
| model_path | |
| ) | |
| logger.info("Loaded Transformers Tokenizer from %s", model_path) | |
| # Extract the underlying tokenizers.Tokenizer to get access to things | |
| # like the offests. | |
| self.hf_tokenizer = transformers_tokenizer._tokenizer | |
| logger.info( | |
| "Extracted Tokenizers Tokenizer from Transformers Tokenizer" | |
| ) | |
| if dropout > 0: | |
| try: | |
| self.hf_tokenizer.model.dropout = dropout | |
| logger.info("Set tokenizer dropout to %f", dropout) | |
| except Exception as e: | |
| logger.warning("Failed to set tokenizer dropout: %s", e) | |
| # Find special tokens based on the transformers.Tokenizer | |
| bos_token = transformers_tokenizer.bos_token | |
| logger.info( | |
| "Found bos_token: %s based on Transformers Tokenizer.", bos_token | |
| ) | |
| self.bos_id = transformers_tokenizer.convert_tokens_to_ids(bos_token) | |
| eos_token = transformers_tokenizer.eos_token | |
| logger.info( | |
| "Found eos_token: %s based on Transformers Tokenizer.", eos_token | |
| ) | |
| self.eos_id = transformers_tokenizer.convert_tokens_to_ids(eos_token) | |
| pad_token = transformers_tokenizer.pad_token | |
| logger.info( | |
| "Found pad_token: %s based on Transformers Tokenizer.", pad_token | |
| ) | |
| if pad_token is not None: | |
| # It is ok for this not be set for models that don't have a pad | |
| # because it isn't set for some the other lingua implementations. | |
| self.pad_id = transformers_tokenizer.convert_tokens_to_ids(pad_token) | |
| except: | |
| import tokenizers | |
| # If we failed to load as a transformers.Tokenizer, load as a | |
| # tokenizers.Tokenizer | |
| self.hf_tokenizer = tokenizers.Tokenizer.from_file(model_path) | |
| logger.info("Loaded Tokenizers Tokenizer.") | |
| if dropout > 0: | |
| try: | |
| self.hf_tokenizer._tokenizer.model.dropout = dropout | |
| logger.info("Set tokenizer dropout to %f", dropout) | |
| except Exception as e: | |
| logger.warning("Failed to set tokenizer dropout: %s", e) | |
| # We need to infer the special tokens. If you used a different | |
| # special token, it needs to be added tothe DEFAULT_SPECIAL_TOKENS | |
| # dict. | |
| logger.info("Infering bos id.") | |
| self.bos_id = find_id(self.hf_tokenizer, DEFAULT_SPECIAL_TOKENS["bos"]) | |
| logger.info("Infering eos id.") | |
| self.eos_id = find_id(self.hf_tokenizer, DEFAULT_SPECIAL_TOKENS["eos"]) | |
| logger.info("Infering pad id.") | |
| self.pad_id = find_id(self.hf_tokenizer, DEFAULT_SPECIAL_TOKENS["pad"]) | |
| self.n_words = self.hf_tokenizer.get_vocab_size() | |
| logger.info( | |
| "#words: %d - BOS ID: %d - EOS ID: %d", | |
| self.n_words, | |
| self.bos_id, | |
| self.eos_id, | |
| ) | |
| def encode(self, s: str, add_bos: bool, add_eos: bool): | |
| """Convert a string to a list of tokens.""" | |
| # Never add bos/eos special tokens because we are using a | |
| # tokenizers.Tokenizer which doesn't auto add them. | |
| encoded = self.hf_tokenizer.encode(s, add_special_tokens=False).ids | |
| # Add bos/eos as needed, easy because we are not processing batches. | |
| if add_bos and self.bos_id is not None: | |
| encoded = [self.bos_id] + encoded | |
| if add_eos and self.eos_id is not None: | |
| encoded = encoded + [self.eos_id] | |
| return encoded | |
| def decode(self, tokens: List[int], skip_special_tokens: bool = True): | |
| """Convert a list of tokens to a stirng.""" | |
| return self.hf_tokenizer.decode(tokens, skip_special_tokens=skip_special_tokens) | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| """Get the offsets (and surface) for each token in the original string.""" | |
| if tokens is not None: | |
| logger.warning( | |
| "`tokens` passed to `get_token_offsets`, but are ignored with the HFTokenizer." | |
| ) | |
| # Don't add special tokens so we don't need to handle things like the | |
| # offset of the bos token. | |
| encoding = self.hf_tokenizer.encode(text, add_special_tokens=False) | |
| # Slice the original text instead of using encoding.tokens to avoid the | |
| # fact that tokenizers uses Ġ instead of space. | |
| substrs = [text[s:e] for s, e in encoding.offsets] | |
| return substrs, encoding.offsets | |
| class ByT5HFTokenizer(HFTokenizer): | |
| def __init__(self, model_path: str) -> None: | |
| import transformers | |
| self.hf_tokenizer = transformers.AutoTokenizer.from_pretrained(model_path) | |
| self.bos_token = self.hf_tokenizer.pad_token | |
| self.eos_token = self.hf_tokenizer.eos_token | |
| self.bos_id = self.hf_tokenizer.convert_tokens_to_ids(self.bos_token) | |
| self.eos_id = self.hf_tokenizer.convert_tokens_to_ids(self.eos_token) | |
| self.n_words = self.hf_tokenizer.vocab_size | |
| logger.info( | |
| "#words: %d - BOS ID: %d - EOS ID: %d", | |
| self.n_words, | |
| self.bos_id, | |
| self.eos_id, | |
| ) | |
| def encode(self, s: str, add_bos: bool, add_eos: bool): | |
| """Convert a string to a list of tokens.""" | |
| # Never add bos/eos special tokens because we are using a | |
| # tokenizers.Tokenizer which doesn't auto add them. | |
| encoded = self.hf_tokenizer.encode(s, add_special_tokens=False) | |
| # Add bos/eos as needed, easy because we are not processing batches. | |
| if add_bos and self.bos_id is not None: | |
| encoded = [self.bos_id] + encoded | |
| if add_eos and self.eos_id is not None: | |
| encoded = encoded + [self.eos_id] | |
| return encoded | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| """Get the offsets (and surface) for each token in the original string.""" | |
| return None, None | |
| class SimplifiedHFTokenizer(HFTokenizer): | |
| def __init__(self, model_path: str, dropout: float = 0) -> None: | |
| import transformers | |
| # Try to load as a transformers.Tokenizer as it includes more | |
| # information about things like bos/eos | |
| transformers_tokenizer = transformers.AutoTokenizer.from_pretrained( | |
| model_path | |
| ) | |
| logger.info("Loaded Transformers Tokenizer from %s", model_path) | |
| # Extract the underlying tokenizers.Tokenizer to get access to things | |
| # like the offests. | |
| self.hf_tokenizer = transformers_tokenizer._tokenizer | |
| logger.info( | |
| "Extracted Tokenizers Tokenizer from Transformers Tokenizer" | |
| ) | |
| if dropout > 0: | |
| try: | |
| self.hf_tokenizer.model.dropout = dropout | |
| logger.info("Set tokenizer dropout to %f", dropout) | |
| except Exception as e: | |
| logger.warning("Failed to set tokenizer dropout: %s", e) | |
| special_tokens = getattr(transformers_tokenizer, "special_tokens_map", {}) | |
| if "bert" in model_path: | |
| self.bos_token = special_tokens.get("cls_token") | |
| elif "t5" in model_path: | |
| self.bos_token = special_tokens.get("pad_token") | |
| else: | |
| self.bos_token = special_tokens.get("bos_token") | |
| logger.info( | |
| "Found bos_token: %s based on Transformers Tokenizer.", self.bos_token | |
| ) | |
| if self.bos_token is not None: | |
| self.bos_id = transformers_tokenizer.convert_tokens_to_ids(self.bos_token) | |
| else: | |
| self.bos_id = None | |
| logger.info( | |
| "Found bos_id: %s based on Transformers Tokenizer.", self.bos_id | |
| ) | |
| if "bert" in model_path: | |
| self.eos_token = special_tokens.get("pad_token") | |
| else: | |
| self.eos_token = special_tokens.get("eos_token") | |
| logger.info( | |
| "Found eos_token: %s based on Transformers Tokenizer.", self.eos_token | |
| ) | |
| if self.eos_token is not None: | |
| self.eos_id = transformers_tokenizer.convert_tokens_to_ids(self.eos_token) | |
| else: | |
| self.eos_id = None | |
| logger.info( | |
| "Found eos_id: %s based on Transformers Tokenizer.", self.eos_id | |
| ) | |
| self.n_words = self.hf_tokenizer.get_vocab_size() | |
| logger.info( | |
| "#words: %d - BOS ID: %d - EOS ID: %d", | |
| self.n_words, | |
| self.bos_id, | |
| self.eos_id, | |
| ) | |
| class TokenMonsterTokenizer(Tokenizer): | |
| def __init__(self, model_path: str): | |
| import tokenmonster | |
| self.tokenizer = tokenmonster.load(model_path) | |
| self.n_words = self.tokenizer.vocab_size | |
| self.bos_id = None | |
| self.eos_id = None | |
| logger.info( | |
| "#words: %d - BOS ID: %d - EOS ID: %d", | |
| self.n_words, | |
| self.bos_id, | |
| self.eos_id, | |
| ) | |
| def encode(self, s: str, add_bos: bool, add_eos: bool): | |
| token_ids = self.tokenizer.tokenize(s) | |
| if token_ids is None: | |
| return np.array([], dtype=np.longlong) | |
| return token_ids.astype(np.longlong) | |
| def decode(self, tokens: List[int], skip_special_tokens: bool = True): | |
| return self.tokenizer.decode(tokens) | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| return None, None | |
| class TekkenTokenizer(Tokenizer): | |
| def __init__(self): | |
| from mistral_common.tokens.tokenizers.mistral import MistralTokenizer | |
| tok = MistralTokenizer.v3(is_tekken=True) | |
| self.tokenizer = tok.instruct_tokenizer.tokenizer | |
| self.n_words = self.tokenizer.n_words | |
| self.bos_id = self.tokenizer.bos_id | |
| self.eos_id = self.tokenizer.eos_id | |
| logger.info( | |
| "#words: %d - BOS ID: %d - EOS ID: %d", | |
| self.n_words, | |
| self.bos_id, | |
| self.eos_id, | |
| ) | |
| def encode(self, s: str, add_bos: bool, add_eos: bool): | |
| return self.tokenizer.encode(s, add_bos, add_eos) | |
| def decode(self, tokens: List[int], skip_special_tokens: bool = True): | |
| if tokens[0] == self.bos_id and skip_special_tokens: | |
| tokens = tokens[1:] | |
| if tokens[-1] == self.eos_id and skip_special_tokens: | |
| tokens = tokens[:-1] | |
| return self.tokenizer.decode(tokens) | |
| def get_token_offsets( | |
| self, text: str, tokens: Optional[List[int]] = None | |
| ) -> Tuple[List[str], List[int]]: | |
| return None, None | |
| class SupersetTokenizer(Tokenizer): | |
| n_words: int = 851586 | |
| def __init__(self, tokenizers: List[Dict[str, str]], rng_state: Dict[str, Any] = None, superset_code_name: str = "super_vocab", n_words: Optional[int] = None): | |
| self.tokenizers = {} | |
| self.superset_code_name = superset_code_name | |
| self.n_words = n_words if n_words is not None else self.n_words | |
| ## todo: need to load mappings too | |
| import os | |
| for tokenizer_info in tokenizers: | |
| name = tokenizer_info["name"] | |
| path = tokenizer_info.get("path", None) | |
| kwargs = {} | |
| dropout = tokenizer_info.get("dropout", 0) | |
| if dropout > 0: | |
| kwargs["dropout"] = dropout | |
| load_supermapping = tokenizer_info.get("load_supermapping", False) | |
| try: | |
| tokenizer = build_tokenizer(name, path, **kwargs) | |
| encoding_path = path | |
| if name == "tiktoken": | |
| encoding_path = f"tiktoken/{path}" | |
| elif name == "tokenmonster": | |
| encoding_path = f"tokenmonster/{path}" | |
| elif name == "tekken": | |
| encoding_path = f"mistralai/{path}" | |
| if load_supermapping: | |
| logger.info(f"Loading supermapping for the tokenizer {path}") | |
| tokenizer.load_supermapping(f"{os.environ.get('PROJECT')}/tokenizers/super_mappings", encoding_path, superset_code_name) | |
| else: | |
| logger.info(f"Not loading supermapping for the tokenizer {path}") | |
| self.tokenizers[f"{name}/{path}"] = tokenizer | |
| logger.info(f"Loaded tokenizer {name} from {path}") | |
| except Exception as e: | |
| logger.error("Error loading tokenizer %s from %s. %s",path, name, e) | |
| if len(self.tokenizers) == 0: | |
| raise ValueError("No valid tokenizers provided.") | |
| logger.info(f"Number of tokenizers loaded: {len(self.tokenizers)}") | |
| if rng_state is not None: | |
| # import code; code.interact(local=dict(globals(), **locals())) | |
| if isinstance(rng_state, int): | |
| rng = np.random.default_rng(seed=rng_state) | |
| else: | |
| rng = np.random.default_rng() | |
| rng.bit_generator.state = rng_state | |
| self.rng = rng | |
| logger.info("Restored RNG state for supertokenizer.") | |
| else: | |
| self.rng = np.random.default_rng(seed=42) | |
| logger.info("Initialized new RNG for supertokenizer.") | |
| import json | |
| import huggingface_hub as hf_hub | |
| path = f"{os.environ.get('PROJECT')}/tokenizers/{superset_code_name}/super_vocab.json" | |
| if not Path(path).exists(): | |
| try: | |
| supervocab_repo_id = f"flexitok/supertokenizer-{superset_code_name}" | |
| # flexitok/supertokenizer-fineweb2_hq | |
| assert os.environ.get("HF_HUB_OFFLINE") != "1" | |
| repo_id = supervocab_repo_id | |
| path = hf_hub.hf_hub_download(repo_id, "super_vocab.json") | |
| logger.info(f"Downloaded super mapping from HF Hub {repo_id} to {path}") | |
| ## backward compatibility for old path format | |
| except hf_hub.errors.RepositoryNotFoundError as e: | |
| assert os.environ.get("HF_HUB_OFFLINE") != "1" | |
| repo_id = "gsaltintas/supertokenizer-super_vocab" | |
| path = hf_hub.hf_hub_download(repo_id, "super_vocab.json") | |
| except hf_hub.errors.RepositoryNotFoundError as e: | |
| raise ValueError(f"Failed to download super vocab from HF Hub. Tried repo_id {repo_id}. Error: {e}") | |
| with open(path, "r") as f: | |
| self.super_vocab = json.load(f) | |
| # align bos eos with llama | |
| self.bos_id = self.super_vocab.get(ALIGNED_BOS) | |
| self.eos_id = self.super_vocab.get("<|end_of_text|>") | |
| self.bos_token, self.eos_token = ALIGNED_BOS, "<|end_of_text|>" | |
| if self.eos_id is None: | |
| self.eos_id = self.super_vocab.get("</s>") | |
| self.bos_token, self.eos_token = ALIGNED_BOS, "</s>" | |
| logger.info( | |
| "Setting bos_token: %s with id %d.", self.bos_token, self.bos_id | |
| ) | |
| logger.info( | |
| "Setting eos_token: %s with id %d.", self.eos_token, self.eos_id | |
| ) | |
| def _resolve_tokenizer_choice(self, tokenizer_choice: Optional[Union[int, str]]) -> Optional[int]: | |
| if tokenizer_choice is None: | |
| return None | |
| tokenizer_keys = list(self.tokenizers.keys()) | |
| if isinstance(tokenizer_choice, int): | |
| if tokenizer_choice < 0 or tokenizer_choice >= len(tokenizer_keys): | |
| raise ValueError( | |
| f"tokenizer_choice {tokenizer_choice} is out of range for available tokenizers {tokenizer_keys}" | |
| ) | |
| return tokenizer_choice | |
| if isinstance(tokenizer_choice, str): | |
| if tokenizer_choice in self.tokenizers: | |
| return tokenizer_keys.index(tokenizer_choice) | |
| lowered_choice = tokenizer_choice.strip().lower() | |
| for index, key in enumerate(tokenizer_keys): | |
| lowered_key = key.lower() | |
| if lowered_key == lowered_choice: | |
| return index | |
| if lowered_key.endswith(f"/{lowered_choice}"): | |
| return index | |
| return None | |
| raise TypeError( | |
| f"tokenizer_choice must be int, str or None, got {type(tokenizer_choice).__name__}" | |
| ) | |
| def sample_tokenizer( | |
| self, | |
| preferred_tokenizer: Optional[Union[int, str]] = None, | |
| preferred_probability: float = 0.0, | |
| ): | |
| tokenizer_keys = list(self.tokenizers.keys()) | |
| if preferred_tokenizer is not None: | |
| assert preferred_tokenizer in tokenizer_keys, f"Preferred tokenizer {preferred_tokenizer} not in available tokenizers {tokenizer_keys}" | |
| preferred_probability = float(preferred_probability) | |
| preferred_probability = max(0.0, min(1.0, preferred_probability)) | |
| preferred_choice = self._resolve_tokenizer_choice(preferred_tokenizer) | |
| if preferred_choice is not None and self.rng.random() < preferred_probability: | |
| tokenizer_choice = preferred_choice | |
| else: | |
| tokenizer_choice = int(self.rng.choice(len(tokenizer_keys))) | |
| return tokenizer_choice, tokenizer_keys[tokenizer_choice] | |
| def encode(self, tokens, add_bos, add_eos, tokenizer_choice: Optional[Union[int, str]] = None): | |
| resolved_choice = self._resolve_tokenizer_choice(tokenizer_choice) | |
| if resolved_choice is None: | |
| resolved_choice, tokenizer_key = self.sample_tokenizer() | |
| else: | |
| tokenizer_keys = list(self.tokenizers.keys()) | |
| tokenizer_key = tokenizer_keys[resolved_choice] | |
| tokenizer = self.tokenizers[tokenizer_key] | |
| ids = tokenizer.encode_to_supermapping(tokens, add_bos=False, add_eos=False) | |
| if add_bos: | |
| ids = [self.bos_id] + ids | |
| if add_eos: | |
| ids = ids + [self.eos_id] | |
| logger.debug(f"Selected tokenizer {tokenizer_key}, for string ({tokens[:40]}) length of ids: {len(ids)}, add_bos: {add_bos}, add_eos: {add_eos}") | |
| return ids | |
| def decode(self, tokens: List[int],skip_special_tokens:bool=True, tokenizer_choice: Optional[Union[int, str]] = None): | |
| resolved_choice = self._resolve_tokenizer_choice(tokenizer_choice) | |
| if resolved_choice is None: | |
| resolved_choice, tokenizer_key = self.sample_tokenizer() | |
| else: | |
| tokenizer_keys = list(self.tokenizers.keys()) | |
| tokenizer_key = tokenizer_keys[resolved_choice] | |
| tokenizer = self.tokenizers[tokenizer_key] | |
| return tokenizer.decode_from_supermapping(tokens, skip_special_tokens=skip_special_tokens) | |
| pass | |
| def get_token_offsets(self, text: str, tokens: List[int] | None = None) -> Tuple[List[str] | List[int]]: | |
| return None, None | |
| def build_token_bytes(tokenizer: "Tokenizer", vocab_size: int) -> Dict[int, int]: | |
| """Return a dict mapping token_id -> UTF-8 byte length of its surface form. | |
| Special tokens (bos/eos or ~SPECIAL~ surfaces) are omitted (0 bytes → excluded from BPB). | |
| """ | |
| special_ids = {getattr(tokenizer, "bos_id", None), getattr(tokenizer, "eos_id", None)} | |
| special_ids.discard(None) | |
| result: Dict[int, int] = {} | |
| if hasattr(tokenizer, "super_vocab"): | |
| # SupersetTokenizer: super_vocab is surface_string -> token_id | |
| for surface, token_id in tokenizer.super_vocab.items(): | |
| if token_id >= vocab_size or token_id in special_ids: | |
| continue | |
| if surface.startswith("~SPECIAL~"): | |
| continue | |
| nb = len(surface.encode("utf-8")) | |
| if nb > 0: | |
| result[token_id] = nb | |
| elif hasattr(tokenizer, "sp_model"): | |
| sp = tokenizer.sp_model | |
| for i in range(vocab_size): | |
| if i in special_ids: | |
| continue | |
| piece = sp.id_to_piece(i) | |
| nb = len(piece.replace("\u2581", " ").encode("utf-8")) | |
| if nb > 0: | |
| result[i] = nb | |
| elif hasattr(tokenizer, "tkt_model"): | |
| for i in range(vocab_size): | |
| if i in special_ids: | |
| continue | |
| try: | |
| nb = len(tokenizer.tkt_model.decode_single_token_bytes(i)) | |
| if nb > 0: | |
| result[i] = nb | |
| except Exception: | |
| pass | |
| elif hasattr(tokenizer, "hf_tokenizer"): | |
| for i in range(vocab_size): | |
| if i in special_ids: | |
| continue | |
| try: | |
| nb = len(tokenizer.decode([i], skip_special_tokens=False).encode("utf-8")) | |
| if nb > 0: | |
| result[i] = nb | |
| except Exception: | |
| pass | |
| else: | |
| # ByteTokenizer: tokens 0-255 are exactly one byte each | |
| for i in range(min(256, vocab_size)): | |
| if i not in special_ids: | |
| result[i] = 1 | |
| return result | |
| def build_tokenizer(name: str, path: Optional[Union[str, List[Dict[str, str]]]] = None, tokenizers: Optional[List[Dict[str, str]]]=None, dropout: float = 0, rng_state: Dict[str, Any] = None, superset_code_name: Optional[str] = None, n_words: Optional[int] = None) -> Tokenizer: | |
| if name == "bytes": | |
| return ByteTokenizer() | |
| elif name == "mock": | |
| return MockTokenizer() | |
| elif name == "sp": | |
| return SentencePieceTokenizer(path, alpha=dropout) | |
| elif name == "tiktoken": | |
| return TikTokenTokenizer(path) | |
| elif name == "huggingface" and "byt5" in path: | |
| return ByT5HFTokenizer(path) | |
| elif name == "huggingface": | |
| return SimplifiedHFTokenizer(path, dropout=dropout) | |
| elif name == "tokenmonster": | |
| return TokenMonsterTokenizer(path) | |
| elif name == "tekken": | |
| return TekkenTokenizer() | |
| elif name == "supertokenizer": | |
| return SupersetTokenizer(tokenizers, rng_state=rng_state, superset_code_name=superset_code_name, n_words=n_words) | |
| else: | |
| raise NotImplementedError(f"{name} tokenizer type is not implemented") | |
Xet Storage Details
- Size:
- 33.8 kB
- Xet hash:
- a6893a2eca96fbbd195d1943b827785413d8edf1e01d7601b32266a46f144ab5
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.