| """Hugging Face compatible tokenizer combining text encoder and numeric decoder.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import math |
| import re |
| from pathlib import Path |
| from typing import Dict, Iterable, List, Sequence |
|
|
| import numpy as np |
| from transformers import AutoTokenizer, PreTrainedTokenizer |
|
|
| DELIMITERS = ("<", ">") |
|
|
|
|
| def _to_token(value: str | int) -> str: |
| left, right = DELIMITERS |
| return f"{left}{value}{right}" |
|
|
|
|
| def _from_token(token: str) -> str: |
| left, right = DELIMITERS |
| match = re.fullmatch(rf"{re.escape(left)}(.*?){re.escape(right)}", token) |
| if not match: |
| raise ValueError(f"Cannot deserialize token: {token}") |
| return match.group(1) |
|
|
|
|
| class _NumericTokenizerBase(PreTrainedTokenizer): |
| """Shared utilities for numeric decoder tokenizers.""" |
|
|
| vocab_files_names: Dict[str, str] = {} |
| model_input_names = ["input_ids"] |
| vocab_filename = "numeric_vocab.json" |
|
|
| def __init__( |
| self, |
| *, |
| encoder_tokenizer_dir: str | None = None, |
| encoder_tokenizer_name: str | None = None, |
| encoder_tokenizer: PreTrainedTokenizer | None = None, |
| bos_token: str = "<pad>", |
| eos_token: str | None = None, |
| pad_token: str | None = None, |
| unk_token: str | None = None, |
| **kwargs, |
| ) -> None: |
| eos_token = eos_token or bos_token |
| pad_token = pad_token or bos_token |
| self.encoder_tokenizer_dir = encoder_tokenizer_dir |
| self.encoder_tokenizer_name = encoder_tokenizer_name |
| self.encoder_tokenizer = encoder_tokenizer |
|
|
| base_tokens = self._build_base_tokens() |
| base_tokens = sorted(base_tokens) |
| tokens: List[str] = [pad_token] + base_tokens |
| self._tokens = tokens |
| self._token_to_id = {token: idx for idx, token in enumerate(tokens)} |
| self._id_to_token = {idx: token for token, idx in self._token_to_id.items()} |
| init_kwargs = dict(kwargs) |
| init_kwargs.update(self._extra_init_kwargs()) |
|
|
| super().__init__( |
| bos_token=bos_token, |
| eos_token=eos_token, |
| pad_token=pad_token, |
| unk_token=unk_token, |
| encoder_tokenizer_dir=encoder_tokenizer_dir, |
| encoder_tokenizer_name=encoder_tokenizer_name, |
| **init_kwargs, |
| ) |
|
|
| self._load_encoder_tokenizer() |
|
|
| |
| |
| |
| def _build_base_tokens(self) -> List[str]: |
| raise NotImplementedError |
|
|
| def _extra_init_kwargs(self) -> Dict[str, object]: |
| return {} |
|
|
| def float_to_tokens(self, value: float) -> List[str]: |
| raise NotImplementedError |
|
|
| def tokens_to_float(self, tokens: Sequence[str]) -> float: |
| raise NotImplementedError |
|
|
| def _possible_next_tokens(self, prev_tokens: Sequence[str]) -> List[str]: |
| raise NotImplementedError |
|
|
| |
| |
| |
| def _load_encoder_tokenizer(self) -> None: |
| if self.encoder_tokenizer is not None: |
| return |
| if not self.encoder_tokenizer_dir and not self.encoder_tokenizer_name: |
| return |
| base_dir = None |
| if getattr(self, "name_or_path", None): |
| base_dir = Path(self.name_or_path) |
| if self.encoder_tokenizer_dir and base_dir is not None: |
| candidate = base_dir / self.encoder_tokenizer_dir |
| if candidate.exists(): |
| self.encoder_tokenizer = AutoTokenizer.from_pretrained(str(candidate)) |
| return |
| if self.encoder_tokenizer_name: |
| self.encoder_tokenizer = AutoTokenizer.from_pretrained(self.encoder_tokenizer_name) |
|
|
| def _ensure_encoder_tokenizer(self) -> None: |
| if self.encoder_tokenizer is None: |
| raise NotImplementedError( |
| "Text tokenization requires encoder tokenizer assets. " |
| "Ensure `encoder_tokenizer_dir` or `encoder_tokenizer_name` are provided." |
| ) |
|
|
| |
| |
| |
| def __call__(self, *args, **kwargs): |
| self._ensure_encoder_tokenizer() |
| return self.encoder_tokenizer(*args, **kwargs) |
|
|
| def encode(self, *args, **kwargs): |
| self._ensure_encoder_tokenizer() |
| return self.encoder_tokenizer.encode(*args, **kwargs) |
|
|
| def encode_plus(self, *args, **kwargs): |
| self._ensure_encoder_tokenizer() |
| return self.encoder_tokenizer.encode_plus(*args, **kwargs) |
|
|
| def batch_encode_plus(self, *args, **kwargs): |
| self._ensure_encoder_tokenizer() |
| return self.encoder_tokenizer.batch_encode_plus(*args, **kwargs) |
|
|
| def tokenize(self, *args, **kwargs): |
| self._ensure_encoder_tokenizer() |
| return self.encoder_tokenizer.tokenize(*args, **kwargs) |
|
|
| def _tokenize(self, text: str) -> List[str]: |
| raise NotImplementedError("Numeric tokenizers operate directly on floats, not text.") |
|
|
| def build_inputs_with_special_tokens( |
| self, token_ids_0: List[int], token_ids_1: List[int] | None = None |
| ) -> List[int]: |
| if token_ids_1: |
| raise ValueError("Numeric decoder tokenizer does not support pair inputs.") |
| return token_ids_0 |
|
|
| |
| |
| |
| def get_vocab(self) -> Dict[str, int]: |
| vocab = dict(self._token_to_id) |
| if self.encoder_tokenizer is not None: |
| vocab.update(self.encoder_tokenizer.get_vocab()) |
| return vocab |
|
|
| @property |
| def vocab_size(self) -> int: |
| if self.encoder_tokenizer is not None and getattr(self.encoder_tokenizer, "vocab_size", None): |
| return int(self.encoder_tokenizer.vocab_size) |
| return len(self._tokens) |
|
|
| @property |
| def decoder_vocab_size(self) -> int: |
| return len(self._tokens) |
|
|
| def _convert_token_to_id(self, token: str) -> int: |
| if token not in self._token_to_id: |
| if self.encoder_tokenizer is None: |
| raise KeyError(f"Unknown token: {token}") |
| return self.encoder_tokenizer.convert_tokens_to_ids(token) |
| return self._token_to_id[token] |
|
|
| def _convert_id_to_token(self, index: int) -> str: |
| if index not in self._id_to_token: |
| if self.encoder_tokenizer is None: |
| raise KeyError(f"Unknown token id: {index}") |
| return self.encoder_tokenizer.convert_ids_to_tokens(index) |
| return self._id_to_token[index] |
|
|
| def save_vocabulary(self, save_directory: str | Path, filename_prefix: str | None = None) -> tuple[str]: |
| save_directory = Path(save_directory) |
| save_directory.mkdir(parents=True, exist_ok=True) |
| name = self.vocab_filename if filename_prefix is None else f"{filename_prefix}-{self.vocab_filename}" |
| path = save_directory / name |
| with path.open("w", encoding="utf-8") as f: |
| json.dump({token: idx for idx, token in enumerate(self._tokens)}, f, indent=2) |
| return (str(path),) |
|
|
| def save_pretrained(self, save_directory: str | Path, filename_prefix: str | None = None): |
| paths = super().save_pretrained(save_directory, filename_prefix=filename_prefix) |
| if self.encoder_tokenizer is not None and self.encoder_tokenizer_dir: |
| encoder_dir = Path(save_directory) / self.encoder_tokenizer_dir |
| encoder_dir.mkdir(parents=True, exist_ok=True) |
| self.encoder_tokenizer.save_pretrained(encoder_dir) |
| return paths |
|
|
| |
| |
| |
| def float_to_token_ids(self, value: float) -> List[int]: |
| tokens = self.float_to_tokens(value) |
| return [self._convert_token_to_id(token) for token in tokens] |
|
|
| def token_ids_to_floats(self, token_ids: Sequence[int]) -> List[float]: |
| cleaned = list(token_ids[1:]) if token_ids else [] |
| if not cleaned: |
| return [] |
| if len(cleaned) % self.num_tokens_per_obj != 0: |
| raise ValueError("Token ids length is not a multiple of tokens per object.") |
| floats: List[float] = [] |
| for start in range(0, len(cleaned), self.num_tokens_per_obj): |
| chunk = cleaned[start : start + self.num_tokens_per_obj] |
| tokens = [] |
| for idx in chunk: |
| token = self._convert_id_to_token(idx) |
| if token not in self._token_to_id: |
| raise ValueError( |
| "Token id is not part of the numeric decoder vocabulary: " |
| f"{idx}" |
| ) |
| tokens.append(token) |
| floats.append(self.tokens_to_float(tokens)) |
| return floats |
|
|
| |
| |
| |
| def possible_next_token_ids(self, prev_token_ids: Sequence[int]) -> List[int]: |
| prev_core = list(prev_token_ids[1:]) if prev_token_ids else [] |
| if not prev_core: |
| local_context: List[int] = [] |
| else: |
| remainder = len(prev_core) % self.num_tokens_per_obj |
| local_context = prev_core[-remainder:] if remainder else [] |
| local_tokens = [self._convert_id_to_token(idx) for idx in local_context] |
| allowed_tokens = self._possible_next_tokens(local_tokens) |
| return [self._convert_token_to_id(token) for token in allowed_tokens] |
|
|
| |
| |
| |
| def convert_tokens_to_string(self, tokens: List[str]) -> str: |
| if tokens and all(token in self._token_to_id for token in tokens): |
| return " ".join(tokens) |
| if self.encoder_tokenizer is not None: |
| return self.encoder_tokenizer.convert_tokens_to_string(tokens) |
| return " ".join(tokens) |
|
|
| def decode(self, token_ids: Sequence[int], **kwargs) -> str: |
| token_list = list(token_ids) |
| if token_list and all(0 <= idx < len(self._tokens) for idx in token_list): |
| floats = self.token_ids_to_floats(token_list) |
| return " ".join(f"{value:.6g}" for value in floats) |
| if self.encoder_tokenizer is not None: |
| return self.encoder_tokenizer.decode(token_ids, **kwargs) |
| raise ValueError("Cannot decode token ids without encoder tokenizer assets.") |
|
|
|
|
| class P10Tokenizer(_NumericTokenizerBase): |
| """Tokenizer that mirrors :class:`regress_lm.tokenizers.P10Tokenizer`.""" |
|
|
| vocab_filename = "p10_vocab.json" |
|
|
| def __init__( |
| self, |
| num_digits: int = 6, |
| exponent_range: int = 10, |
| **kwargs, |
| ) -> None: |
| self.num_digits = int(num_digits) |
| self.exponent_range = int(exponent_range) |
| if self.num_digits < 1: |
| raise ValueError("num_digits must be >= 1") |
| if self.exponent_range < 0: |
| raise ValueError("exponent_range must be >= 0") |
| super().__init__(**kwargs) |
| self.num_tokens_per_obj = 2 + self.num_digits |
| self.decoder_tokenizer = "P10" |
|
|
| def _extra_init_kwargs(self) -> Dict[str, object]: |
| return { |
| "num_digits": self.num_digits, |
| "exponent_range": self.exponent_range, |
| "decoder_tokenizer": "P10", |
| "auto_map": {"AutoTokenizer": ["tokenization_p10.P10Tokenizer", None]}, |
| "tokenizer_class": "P10Tokenizer", |
| } |
| def _build_base_tokens(self) -> List[str]: |
| tokens: List[str] = [] |
| tokens.extend(_to_token(sign) for sign in ["+", "-"]) |
| tokens.extend(_to_token(digit) for digit in range(10)) |
| exponents = [f"E{value}" for value in range(-self.exponent_range, self.exponent_range + 1)] |
| tokens.extend(_to_token(exp) for exp in exponents) |
| return tokens |
|
|
| def _round_float(self, value: float) -> float: |
| abs_value = abs(value) |
| max_abs = float("9" * self.num_digits) * (10.0**self.exponent_range) |
| min_abs = float("1" + "0" * (self.num_digits - 1)) * (10.0 ** (-self.exponent_range)) |
| abs_value = min(abs_value, max_abs) |
| if abs_value < min_abs: |
| zero_or_min = round(abs_value / min_abs) |
| abs_value = min_abs * zero_or_min |
| return abs_value if value >= 0 else -abs_value |
|
|
| def float_to_tokens(self, value: float) -> List[str]: |
| rounded = self._round_float(value) |
| sci = np.format_float_scientific( |
| rounded, |
| precision=self.num_digits - 1, |
| min_digits=self.num_digits - 1, |
| sign=True, |
| ) |
| match = re.fullmatch(r"([+-])([0-9.]*)e(.*)", sci) |
| if not match: |
| raise RuntimeError(f"Unexpected scientific notation from numpy: {sci}") |
| sign = match.group(1) |
| digits = list(match.group(2).replace(".", "")) |
| exponent = int(match.group(3)) - len(digits) + 1 if rounded else 0 |
| tokens = [sign] + digits + [f"E{exponent}"] |
| return [_to_token(token) for token in tokens] |
|
|
| def tokens_to_float(self, tokens: Sequence[str]) -> float: |
| primitives = [_from_token(token) for token in tokens] |
| sign = -1 if primitives[0] == "-" else 1 |
| mantissa = int("".join(map(str, primitives[1:-1]))) |
| exponent = int(primitives[-1].lstrip("E")) |
| return float(sign * mantissa * (10 ** exponent)) |
|
|
| def _possible_next_tokens(self, prev_tokens: Sequence[str]) -> List[str]: |
| index = len(prev_tokens) |
| if index < 0 or index >= self.num_tokens_per_obj: |
| raise ValueError( |
| f"Index {index} out of bounds for tokens per object {self.num_tokens_per_obj}." |
| ) |
| if index == 0: |
| candidates: Iterable[str | int] = ["+", "-"] |
| elif index == self.num_tokens_per_obj - 1: |
| candidates = [ |
| f"E{value}" for value in range(-self.exponent_range, self.exponent_range + 1) |
| ] |
| else: |
| candidates = range(10) |
| return [_to_token(candidate) for candidate in candidates] |
|
|
|
|
| class IEEEFloatTokenizer(_NumericTokenizerBase): |
| """Tokenizer that mirrors :class:`regress_lm.tokenizers.IEEEFloatTokenizer`.""" |
|
|
| vocab_filename = "ieee_vocab.json" |
|
|
| def __init__( |
| self, |
| *, |
| base: int = 10, |
| num_exponent_digits: int = 1, |
| num_mantissa_digits: int = 4, |
| **kwargs, |
| ) -> None: |
| if base < 2: |
| raise ValueError("base must be >= 2") |
| if num_exponent_digits < 1: |
| raise ValueError("num_exponent_digits must be >= 1") |
| if num_mantissa_digits < 1: |
| raise ValueError("num_mantissa_digits must be >= 1") |
| self.base = int(base) |
| self.num_exponent_digits = int(num_exponent_digits) |
| self.num_mantissa_digits = int(num_mantissa_digits) |
| super().__init__(**kwargs) |
| self.num_tokens_per_obj = 2 + self.num_exponent_digits + self.num_mantissa_digits |
| self.decoder_tokenizer = f"IEEE_{self.num_mantissa_digits}_{self.num_exponent_digits}" |
|
|
| def _extra_init_kwargs(self) -> Dict[str, object]: |
| return { |
| "base": self.base, |
| "num_exponent_digits": self.num_exponent_digits, |
| "num_mantissa_digits": self.num_mantissa_digits, |
| "auto_map": {"AutoTokenizer": ["tokenization_p10.IEEEFloatTokenizer", None]}, |
| "tokenizer_class": "IEEEFloatTokenizer", |
| } |
|
|
| def _build_base_tokens(self) -> List[str]: |
| tokens = ["+", "-"] + list(range(self.base)) |
| return [_to_token(token) for token in tokens] |
|
|
| def float_to_tokens(self, value: float) -> List[str]: |
| sign = "+" if value >= 0 else "-" |
| abs_value = abs(value) |
| exponent = ( |
| math.floor(np.log(abs_value) / np.log(self.base)) if abs_value > 0 else 0 |
| ) |
|
|
| exponent_sign = "+" if exponent >= 0 else "-" |
| abs_exponent = abs(exponent) |
|
|
| exponent_repr = np.base_repr(abs_exponent, base=self.base) |
| if len(exponent_repr) > self.num_exponent_digits and exponent_sign == "+": |
| raise ValueError(f"Overflow: Exponent {abs_exponent} too large.") |
| if len(exponent_repr) > self.num_exponent_digits and exponent_sign == "-": |
| all_zeros = ["0"] * (self.num_exponent_digits + self.num_mantissa_digits) |
| out = [sign, "-"] + all_zeros |
| return [_to_token(s) for s in out] |
| exponent_repr = exponent_repr.zfill(self.num_exponent_digits) |
|
|
| mantissa = np.base_repr( |
| abs_value * self.base ** (self.num_mantissa_digits - 1 - exponent), |
| base=self.base, |
| ) |
| if len(mantissa) > self.num_mantissa_digits: |
| mantissa = mantissa[: self.num_mantissa_digits] |
| if len(mantissa) < self.num_mantissa_digits: |
| mantissa += "0" * (self.num_mantissa_digits - len(mantissa)) |
|
|
| raw_str = sign + exponent_sign + exponent_repr + mantissa |
| return [_to_token(s) for s in raw_str] |
|
|
| def tokens_to_float(self, tokens: Sequence[str]) -> float: |
| primitives = [_from_token(token) for token in tokens] |
| sign = -1 if primitives[0] == "-" else 1 |
| exponent_sign = -1 if primitives[1] == "-" else 1 |
| abs_exponent_str = "".join( |
| map(str, primitives[2 : 2 + self.num_exponent_digits]) |
| ) |
| abs_exponent = int(abs_exponent_str, base=self.base) |
| exponent = exponent_sign * abs_exponent |
| mantissa_str = "".join(map(str, primitives[2 + self.num_exponent_digits :])) |
| mantissa_unscaled = int(mantissa_str, base=self.base) |
| mantissa = mantissa_unscaled / self.base ** (self.num_mantissa_digits - 1) |
| return sign * (self.base**exponent) * mantissa |
|
|
| def _possible_next_tokens(self, prev_tokens: Sequence[str]) -> List[str]: |
| index = len(prev_tokens) |
| if index < 0 or index >= self.num_tokens_per_obj: |
| raise ValueError( |
| f"Index {index} out of bounds for tokens per object {self.num_tokens_per_obj}." |
| ) |
| if index in (0, 1): |
| candidates: Iterable[str | int] = ["+", "-"] |
| else: |
| candidates = range(self.base) |
| return [_to_token(candidate) for candidate in candidates] |
|
|