| | """
|
| | VortexScienceTokenizer: A custom BPE tokenizer optimized for scientific text.
|
| | Trains on science corpus and extends vocabulary with domain-specific tokens.
|
| | """
|
| |
|
| | import os
|
| | import json
|
| | import re
|
| | from pathlib import Path
|
| | from typing import List, Dict, Optional, Tuple, Union
|
| | import torch
|
| |
|
| | try:
|
| | from tokenizers import Tokenizer, models, pre_tokenizers, processors, trainers
|
| | from tokenizers.normalizers import Lowercase, NFD, StripAccents
|
| | except ImportError:
|
| | print("Please install tokenizers: pip install tokenizers")
|
| | raise
|
| |
|
| |
|
| | class VortexScienceTokenizer:
|
| | """
|
| | Science-optimized BPE tokenizer with domain extensions.
|
| |
|
| | Features:
|
| | - Base BPE vocabulary (40,000 tokens) trained on scientific corpus
|
| | - Extended science vocabulary (10,000 tokens) for LaTeX, chemistry, units, etc.
|
| | - Special tokens for equation/citation/molecule spans
|
| | - Domain tags for science areas
|
| | - Digit-level number handling (optional, can be toggled)
|
| | """
|
| |
|
| | def __init__(
|
| | self,
|
| | config: Dict,
|
| | tokenizer_path: Optional[str] = None,
|
| | vocab_size: int = 50000,
|
| | base_vocab_size: int = 40000,
|
| | extension_vocab_size: int = 10000,
|
| | ):
|
| | """
|
| | Initialize the tokenizer.
|
| |
|
| | Args:
|
| | config: Model configuration with special tokens
|
| | tokenizer_path: Path to pre-trained tokenizer (if loading)
|
| | vocab_size: Total vocabulary size
|
| | base_vocab_size: Size of base BPE vocabulary
|
| | extension_vocab_size: Size of science extension vocabulary
|
| | """
|
| | self.config = config
|
| | self.base_vocab_size = base_vocab_size
|
| | self.extension_vocab_size = extension_vocab_size
|
| | self._vocab_size = vocab_size
|
| |
|
| | self.special_tokens = config.get("special_tokens", {})
|
| | self.domain_tags = config.get("domain_tags", [])
|
| |
|
| | if tokenizer_path and os.path.exists(tokenizer_path):
|
| | self.tokenizer = Tokenizer.from_file(tokenizer_path)
|
| | print(f"Loaded tokenizer from {tokenizer_path}")
|
| | else:
|
| |
|
| | self.tokenizer = Tokenizer(models.BPE())
|
| | self._setup_pre_tokenizer()
|
| | print("Initialized empty BPE tokenizer")
|
| |
|
| | def _setup_pre_tokenizer(self):
|
| | """Configure pre-tokenization rules."""
|
| |
|
| | self.tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False)
|
| | self.tokenizer.normalizer = None
|
| |
|
| | def train(
|
| | self,
|
| | file_paths: List[str],
|
| | min_frequency: int = 2,
|
| | special_tokens: Optional[List[str]] = None,
|
| | ):
|
| | """
|
| | Train the BPE tokenizer on scientific text files.
|
| |
|
| | Args:
|
| | file_paths: List of text file paths for training
|
| | min_frequency: Minimum token frequency to keep
|
| | special_tokens: Additional special tokens to add
|
| | """
|
| | if special_tokens is None:
|
| | special_tokens = list(self.special_tokens.keys()) + self.domain_tags
|
| |
|
| | print(f"Training tokenizer on {len(file_paths)} files...")
|
| | print(f"Base vocab size: {self.base_vocab_size}")
|
| | print(f"Special tokens: {special_tokens}")
|
| |
|
| | trainer = trainers.BpeTrainer(
|
| | vocab_size=self.base_vocab_size,
|
| | min_frequency=min_frequency,
|
| | special_tokens=special_tokens,
|
| | show_progress=True,
|
| | )
|
| |
|
| | self.tokenizer.train(file_paths, trainer=trainer)
|
| | print(f"Training complete. Vocabulary size: {self.tokenizer.get_vocab_size()}")
|
| |
|
| |
|
| | self._extend_science_vocabulary()
|
| |
|
| | def _extend_science_vocabulary(self):
|
| | """Add science-specific tokens to the vocabulary."""
|
| | current_vocab = self.tokenizer.get_vocab()
|
| | new_tokens = []
|
| |
|
| |
|
| | latex_symbols = [
|
| | "\\alpha", "\\beta", "\\gamma", "\\delta", "\\epsilon", "\\zeta",
|
| | "\\eta", "\\theta", "\\iota", "\\kappa", "\\lambda", "\\mu",
|
| | "\\nu", "\\xi", "\\pi", "\\rho", "\\sigma", "\\tau",
|
| | "\\upsilon", "\\phi", "\\chi", "\\psi", "\\omega",
|
| | "\\Gamma", "\\Delta", "\\Theta", "\\Lambda", "\\Xi", "\\Pi",
|
| | "\\Sigma", "\\Phi", "\\Psi", "\\Omega",
|
| | "\\sum", "\\prod", "\\int", "\\partial", "\\nabla", "\\infty",
|
| | "\\leq", "\\geq", "\\neq", "\\approx", "\\equiv", "\\sim",
|
| | "\\in", "\\notin", "\\subset", "\\supset", "\\cup", "\\cap",
|
| | "\\forall", "\\exists", "\\neg", "\\land", "\\lor", "\\rightarrow",
|
| | "\\leftarrow", "\\Rightarrow", "\\Leftarrow", "\\leftrightarrow",
|
| | "\\frac", "\\sqrt", "\\binom", "\\begin", "\\end", "\\mathbf",
|
| | "\\mathcal", "\\mathrm", "\\mathbb", "\\mathfrak",
|
| | ]
|
| | new_tokens.extend(latex_symbols)
|
| |
|
| |
|
| | greek_letters = [
|
| | "α", "β", "γ", "δ", "ε", "ζ", "η", "θ", "ι", "κ", "λ", "μ",
|
| | "ν", "ξ", "ο", "π", "ρ", "σ", "τ", "υ", "φ", "χ", "ψ", "ω",
|
| | "Γ", "Δ", "Θ", "Λ", "Ξ", "Π", "Σ", "Φ", "Ψ", "Ω",
|
| | ]
|
| | new_tokens.extend(greek_letters)
|
| |
|
| |
|
| | si_units = [
|
| | "m", "kg", "s", "mol", "K", "A", "cd", "mol",
|
| | "Hz", "N", "Pa", "J", "W", "C", "V", "F", "Ω", "S",
|
| | "Wb", "T", "H", "lm", "lx", "Bq", "Gy", "Sv", "kat",
|
| | "eV", "u", "Da", "Å", "°C", "%", "‰",
|
| | "M", "mM", "μM", "nM", "pM",
|
| | "g", "mg", "μg", "ng", "pg",
|
| | "km", "m", "cm", "mm", "μm", "nm", "pm",
|
| | "L", "mL", "μL", "nL",
|
| | "h", "min", "s", "ms", "μs", "ns",
|
| | ]
|
| | new_tokens.extend(si_units)
|
| |
|
| |
|
| | sci_abbrevs = [
|
| | "DNA", "RNA", "mRNA", "tRNA", "rRNA", "cDNA", "gDNA",
|
| | "ATP", "ADP", "AMP", "NAD", "NADP", "FAD", "CoA",
|
| | "pH", "pKa", "pKb", "pI",
|
| | "PCR", "RT", "qPCR", "NGS", "WGS",
|
| | "IC50", "EC50", "KD", "Ki",
|
| | "XRD", "NMR", "IR", "UV", "VIS", "MS", "GC", "HPLC",
|
| | "SEM", "TEM", "AFM", "STM",
|
| | "S/N", "SNR", "RMS", "Std", "Var", "Cov",
|
| | "et al.", "vs.", "cf.", "viz.",
|
| | "Fig", "Eq", "Ref", "Tab", "Suppl",
|
| | ]
|
| | new_tokens.extend(sci_abbrevs)
|
| |
|
| |
|
| | elements = [
|
| | "H", "He", "Li", "Be", "B", "C", "N", "O", "F", "Ne",
|
| | "Na", "Mg", "Al", "Si", "P", "S", "Cl", "Ar",
|
| | "K", "Ca", "Sc", "Ti", "V", "Cr", "Mn", "Fe", "Co", "Ni", "Cu", "Zn",
|
| | "Ga", "Ge", "As", "Se", "Br", "Kr",
|
| | "Rb", "Sr", "Y", "Zr", "Nb", "Mo", "Tc", "Ru", "Rh", "Pd", "Ag", "Cd",
|
| | "In", "Sn", "Sb", "Te", "I", "Xe",
|
| | "Cs", "Ba", "La", "Ce", "Pr", "Nd", "Pm", "Sm", "Eu", "Gd", "Tb",
|
| | "Dy", "Ho", "Er", "Tm", "Yb", "Lu",
|
| | "Hf", "Ta", "W", "Re", "Os", "Ir", "Pt", "Au", "Hg", "Tl", "Pb",
|
| | "Bi", "Po", "At", "Rn",
|
| | "Fr", "Ra", "Ac", "Th", "Pa", "U", "Np", "Pu", "Am", "Cm", "Bk",
|
| | "Cf", "Es", "Fm", "Md", "No", "Lr",
|
| | "Rf", "Db", "Sg", "Bh", "Hs", "Mt", "Ds", "Rg", "Cn", "Nh",
|
| | "Fl", "Mc", "Lv", "Ts", "Og",
|
| | ]
|
| | new_tokens.extend(elements)
|
| |
|
| |
|
| | amino_acids = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I",
|
| | "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V"]
|
| | new_tokens.extend(amino_acids)
|
| |
|
| |
|
| | math_ops = [
|
| | "±", "∓", "×", "÷", "∈", "∉", "∋", "∏", "∑", "∧", "∨", "¬",
|
| | "≤", "≥", "≠", "≈", "≡", "≅", "≆", "≇", "≉", "≊", "≋",
|
| | "⊂", "⊃", "⊆", "⊇", "⊄", "⊅", "⊈", "⊉",
|
| | "∞", "∂", "∇", "√", "∛", "∜",
|
| | "∫", "∬", "∭", "∮", "∯", "∰",
|
| | "∴", "∵", "∶", "∷", "∼", "∽", "≈", "≋",
|
| | "⟨", "⟩", "|", "‖", "‵", "′", "″", "‴",
|
| | "•", "·", "‣", "⁂", "※", "‼", "⁇", "⁈",
|
| | ]
|
| | new_tokens.extend(math_ops)
|
| |
|
| |
|
| | for token in new_tokens:
|
| | if token not in current_vocab:
|
| | self.tokenizer.add_tokens([token])
|
| |
|
| | print(f"Extended vocabulary with {len(new_tokens)} science tokens")
|
| | print(f"Final vocabulary size: {self.tokenizer.get_vocab_size()}")
|
| |
|
| | def save(self, path: str):
|
| | """Save tokenizer to disk."""
|
| | self.tokenizer.save(path)
|
| | print(f"Tokenizer saved to {path}")
|
| |
|
| | def encode(
|
| | self,
|
| | text: str,
|
| | add_special_tokens: bool = True,
|
| | return_tensors: str = "pt",
|
| | ) -> Union[Dict, torch.Tensor]:
|
| | """
|
| | Encode text to token IDs.
|
| |
|
| | Args:
|
| | text: Input text
|
| | add_special_tokens: Add BOS/EOS tokens
|
| | return_tensors: "pt" for PyTorch tensors, "np" for numpy, None for list
|
| |
|
| | Returns:
|
| | Dictionary with input_ids and attention_mask, or tensors/list
|
| | """
|
| | encoding = self.tokenizer.encode(text, add_special_tokens=add_special_tokens)
|
| |
|
| | result = {
|
| | "input_ids": encoding.ids,
|
| | "attention_mask": encoding.attention_mask,
|
| | }
|
| |
|
| | if return_tensors == "pt":
|
| | result = {k: torch.tensor(v).unsqueeze(0) for k, v in result.items()}
|
| | elif return_tensors == "np":
|
| | import numpy as np
|
| | result = {k: np.array(v) for k, v in result.items()}
|
| |
|
| | return result
|
| |
|
| | def decode(self, token_ids: List[int], skip_special_tokens: bool = True) -> str:
|
| | """Decode token IDs back to text."""
|
| | return self.tokenizer.decode(token_ids, skip_special_tokens=skip_special_tokens)
|
| |
|
| | def batch_encode(
|
| | self,
|
| | texts: List[str],
|
| | padding: bool = True,
|
| | truncation: bool = True,
|
| | max_length: Optional[int] = None,
|
| | return_tensors: str = "pt",
|
| | ) -> Dict:
|
| | """
|
| | Encode a batch of texts.
|
| |
|
| | Args:
|
| | texts: List of input texts
|
| | padding: Pad to same length
|
| | truncation: Truncate to max_length
|
| | max_length: Maximum sequence length
|
| | return_tensors: Tensor format
|
| |
|
| | Returns:
|
| | Batch encoded dictionary
|
| | """
|
| | if max_length is None:
|
| | max_length = self.config.get("max_seq_len", 16384)
|
| |
|
| | encodings = self.tokenizer.encode_batch(
|
| | texts,
|
| | add_special_tokens=True,
|
| | )
|
| |
|
| |
|
| | input_ids = []
|
| | attention_masks = []
|
| |
|
| | for enc in encodings:
|
| | ids = enc.ids
|
| | mask = enc.attention_mask
|
| |
|
| | if truncation and len(ids) > max_length:
|
| | ids = ids[:max_length]
|
| | mask = mask[:max_length]
|
| |
|
| | input_ids.append(ids)
|
| | attention_masks.append(mask)
|
| |
|
| |
|
| | if padding:
|
| | max_len = max(len(ids) for ids in input_ids)
|
| | padded_ids = []
|
| | padded_masks = []
|
| |
|
| | for ids, mask in zip(input_ids, attention_masks):
|
| | pad_len = max_len - len(ids)
|
| | padded_ids.append(ids + [self.special_tokens["[PAD]"]] * pad_len)
|
| | padded_masks.append(mask + [0] * pad_len)
|
| |
|
| | input_ids = padded_ids
|
| | attention_masks = padded_masks
|
| |
|
| | result = {
|
| | "input_ids": input_ids,
|
| | "attention_mask": attention_masks,
|
| | }
|
| |
|
| | if return_tensors == "pt":
|
| | result = {k: torch.tensor(v) for k, v in result.items()}
|
| |
|
| | return result
|
| |
|
| | @property
|
| | def vocab_size(self) -> int:
|
| | """Get vocabulary size."""
|
| | return self.tokenizer.get_vocab_size()
|
| |
|
| | def get_vocab(self) -> Dict[str, int]:
|
| | """Get vocabulary dictionary."""
|
| | return self.tokenizer.get_vocab()
|
| |
|
| | def token_to_id(self, token: str) -> int:
|
| | """Convert token to ID."""
|
| | return self.tokenizer.token_to_id(token)
|
| |
|
| | def id_to_token(self, id: int) -> str:
|
| | """Convert ID to token."""
|
| | return self.tokenizer.id_to_token(id)
|
| |
|
| |
|
| | def build_science_vocabulary_file(output_path: str):
|
| | """
|
| | Build a science vocabulary text file for BPE training.
|
| | This file contains seed vocabulary terms to ensure science tokens are present.
|
| | """
|
| | science_terms = []
|
| |
|
| |
|
| | latex_terms = [
|
| | "\\alpha", "\\beta", "\\gamma", "\\delta", "\\epsilon", "\\zeta",
|
| | "\\eta", "\\theta", "\\iota", "\\kappa", "\\lambda", "\\mu",
|
| | "\\nu", "\\xi", "\\pi", "\\rho", "\\sigma", "\\tau",
|
| | "\\upsilon", "\\phi", "\\chi", "\\psi", "\\omega",
|
| | "\\sum", "\\prod", "\\int", "\\partial", "\\nabla", "\\infty",
|
| | "\\frac", "\\sqrt", "\\binom", "\\begin", "\\end",
|
| | "\\mathbf", "\\mathcal", "\\mathrm", "\\mathbb",
|
| | "\\in", "\\subset", "\\cup", "\\cap", "\\forall", "\\exists",
|
| | "\\rightarrow", "\\leftarrow", "\\Rightarrow", "\\Leftarrow",
|
| | "\\leq", "\\geq", "\\neq", "\\approx", "\\equiv",
|
| | ]
|
| | science_terms.extend(latex_terms)
|
| |
|
| |
|
| | chem_formulas = [
|
| | "H2O", "CO2", "O2", "N2", "H2", "CH4", "C2H6", "C3H8",
|
| | "C6H12O6", "C12H22O11", "HCl", "H2SO4", "HNO3", "H3PO4",
|
| | "NaOH", "KOH", "CaCO3", "NaCl", "KCl", "MgCl2",
|
| | "Fe2O3", "Fe3O4", "CuO", "Cu2O", "ZnO", "Al2O3",
|
| | "SiO2", "TiO2", "MnO2", "NH3", "NO", "NO2", "N2O",
|
| | "SO2", "SO3", "CO", "CH3COOH", "C2H5OH",
|
| | ]
|
| | science_terms.extend(chem_formulas)
|
| |
|
| |
|
| | math_exprs = [
|
| | "x^2", "x^3", "e^x", "ln(x)", "log(x)", "sin(x)", "cos(x)",
|
| | "tan(x)", "arcsin(x)", "arccos(x)", "arctan(x)",
|
| | "f(x)", "g(x)", "h(x)", "F(x)", "G(x)",
|
| | "dx", "dy", "dz", "dt", "∂x", "∂y", "∂z",
|
| | "∫", "∬", "∭", "∮", "∑_{i=1}^{n}", "∏_{i=1}^{n}",
|
| | ]
|
| | science_terms.extend(math_exprs)
|
| |
|
| |
|
| | unit_exprs = [
|
| | "10^6", "10^9", "10^12", "10^15", "10^18",
|
| | "10^-3", "10^-6", "10^-9", "10^-12",
|
| | "m/s", "km/h", "cm/s", "mm/s",
|
| | "J/mol", "kJ/mol", "cal", "kcal",
|
| | "eV", "MeV", "GeV", "TeV",
|
| | "Hz", "kHz", "MHz", "GHz",
|
| | "Pa", "kPa", "MPa", "GPa",
|
| | "°C", "K", "°F",
|
| | ]
|
| | science_terms.extend(unit_exprs)
|
| |
|
| |
|
| | with open(output_path, "w", encoding="utf-8") as f:
|
| | for term in science_terms:
|
| | f.write(term + "\n")
|
| |
|
| | print(f"Science vocabulary seed file written to {output_path}")
|
| | print(f"Total seed terms: {len(science_terms)}")
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| |
|
| | import sys
|
| |
|
| | if len(sys.argv) < 2:
|
| | print("Usage: python vortex_tokenizer.py <train_data.txt> [output_dir]")
|
| | sys.exit(1)
|
| |
|
| | train_data = sys.argv[1]
|
| | output_dir = sys.argv[2] if len(sys.argv) > 2 else "."
|
| |
|
| |
|
| | config = {
|
| | "special_tokens": {
|
| | "[PAD]": 0, "[UNK]": 1, "[BOS]": 2, "[EOS]": 3,
|
| | "[EQUATION]": 4, "[/EQUATION]": 5,
|
| | "[CITATION]": 6, "[/CITATION]": 7,
|
| | "[MOLECULE]": 8, "[/MOLECULE]": 9,
|
| | "[FIGURE]": 10, "[TABLE]": 11,
|
| | "[MATH]": 12, "[CHEM]": 13, "[BIO]": 14,
|
| | "[PHYS]": 15, "[EARTH]": 16, "[SPACE]": 17, "[ZOO]": 18,
|
| | },
|
| | "domain_tags": ["[MATH]", "[CHEM]", "[BIO]", "[PHYS]", "[EARTH]", "[SPACE]", "[ZOO]"],
|
| | "max_seq_len": 16384,
|
| | }
|
| |
|
| |
|
| | seed_vocab_path = os.path.join(output_dir, "science_seed_vocab.txt")
|
| | build_science_vocabulary_file(seed_vocab_path)
|
| |
|
| |
|
| | tokenizer = VortexScienceTokenizer(config)
|
| | tokenizer.train([train_data])
|
| |
|
| |
|
| | tokenizer_path = os.path.join(output_dir, "vortex_tokenizer.json")
|
| | tokenizer.save(tokenizer_path)
|
| | print(f"Tokenizer saved to {tokenizer_path}")
|
| |
|