""" VortexScienceTokenizer: A custom BPE tokenizer optimized for scientific text. Trains on science corpus and extends vocabulary with domain-specific tokens. """ import os import json import re from pathlib import Path from typing import List, Dict, Optional, Tuple, Union import torch try: from tokenizers import Tokenizer, models, pre_tokenizers, processors, trainers from tokenizers.normalizers import Lowercase, NFD, StripAccents except ImportError: print("Please install tokenizers: pip install tokenizers") raise class VortexScienceTokenizer: """ Science-optimized BPE tokenizer with domain extensions. Features: - Base BPE vocabulary (40,000 tokens) trained on scientific corpus - Extended science vocabulary (10,000 tokens) for LaTeX, chemistry, units, etc. - Special tokens for equation/citation/molecule spans - Domain tags for science areas - Digit-level number handling (optional, can be toggled) """ def __init__( self, config: Dict, tokenizer_path: Optional[str] = None, vocab_size: int = 50000, base_vocab_size: int = 40000, extension_vocab_size: int = 10000, ): """ Initialize the tokenizer. Args: config: Model configuration with special tokens tokenizer_path: Path to pre-trained tokenizer (if loading) vocab_size: Total vocabulary size base_vocab_size: Size of base BPE vocabulary extension_vocab_size: Size of science extension vocabulary """ self.config = config self.base_vocab_size = base_vocab_size self.extension_vocab_size = extension_vocab_size self._vocab_size = vocab_size self.special_tokens = config.get("special_tokens", {}) self.domain_tags = config.get("domain_tags", []) if tokenizer_path and os.path.exists(tokenizer_path): self.tokenizer = Tokenizer.from_file(tokenizer_path) print(f"Loaded tokenizer from {tokenizer_path}") else: # Initialize empty BPE tokenizer self.tokenizer = Tokenizer(models.BPE()) self._setup_pre_tokenizer() print("Initialized empty BPE tokenizer") def _setup_pre_tokenizer(self): """Configure pre-tokenization rules.""" # Use byte-level pre-tokenization for robustness self.tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False) self.tokenizer.normalizer = None # Keep original casing for science terms def train( self, file_paths: List[str], min_frequency: int = 2, special_tokens: Optional[List[str]] = None, ): """ Train the BPE tokenizer on scientific text files. Args: file_paths: List of text file paths for training min_frequency: Minimum token frequency to keep special_tokens: Additional special tokens to add """ if special_tokens is None: special_tokens = list(self.special_tokens.keys()) + self.domain_tags print(f"Training tokenizer on {len(file_paths)} files...") print(f"Base vocab size: {self.base_vocab_size}") print(f"Special tokens: {special_tokens}") trainer = trainers.BpeTrainer( vocab_size=self.base_vocab_size, min_frequency=min_frequency, special_tokens=special_tokens, show_progress=True, ) self.tokenizer.train(file_paths, trainer=trainer) print(f"Training complete. Vocabulary size: {self.tokenizer.get_vocab_size()}") # Extend with science-specific tokens self._extend_science_vocabulary() def _extend_science_vocabulary(self): """Add science-specific tokens to the vocabulary.""" current_vocab = self.tokenizer.get_vocab() new_tokens = [] # LaTeX math symbols (common ones) latex_symbols = [ "\\alpha", "\\beta", "\\gamma", "\\delta", "\\epsilon", "\\zeta", "\\eta", "\\theta", "\\iota", "\\kappa", "\\lambda", "\\mu", "\\nu", "\\xi", "\\pi", "\\rho", "\\sigma", "\\tau", "\\upsilon", "\\phi", "\\chi", "\\psi", "\\omega", "\\Gamma", "\\Delta", "\\Theta", "\\Lambda", "\\Xi", "\\Pi", "\\Sigma", "\\Phi", "\\Psi", "\\Omega", "\\sum", "\\prod", "\\int", "\\partial", "\\nabla", "\\infty", "\\leq", "\\geq", "\\neq", "\\approx", "\\equiv", "\\sim", "\\in", "\\notin", "\\subset", "\\supset", "\\cup", "\\cap", "\\forall", "\\exists", "\\neg", "\\land", "\\lor", "\\rightarrow", "\\leftarrow", "\\Rightarrow", "\\Leftarrow", "\\leftrightarrow", "\\frac", "\\sqrt", "\\binom", "\\begin", "\\end", "\\mathbf", "\\mathcal", "\\mathrm", "\\mathbb", "\\mathfrak", ] new_tokens.extend(latex_symbols) # Greek letters (Unicode) greek_letters = [ "α", "β", "γ", "δ", "ε", "ζ", "η", "θ", "ι", "κ", "λ", "μ", "ν", "ξ", "ο", "π", "ρ", "σ", "τ", "υ", "φ", "χ", "ψ", "ω", "Γ", "Δ", "Θ", "Λ", "Ξ", "Π", "Σ", "Φ", "Ψ", "Ω", ] new_tokens.extend(greek_letters) # SI units and derived units si_units = [ "m", "kg", "s", "mol", "K", "A", "cd", "mol", "Hz", "N", "Pa", "J", "W", "C", "V", "F", "Ω", "S", "Wb", "T", "H", "lm", "lx", "Bq", "Gy", "Sv", "kat", "eV", "u", "Da", "Å", "°C", "%", "‰", "M", "mM", "μM", "nM", "pM", "g", "mg", "μg", "ng", "pg", "km", "m", "cm", "mm", "μm", "nm", "pm", "L", "mL", "μL", "nL", "h", "min", "s", "ms", "μs", "ns", ] new_tokens.extend(si_units) # Common scientific abbreviations sci_abbrevs = [ "DNA", "RNA", "mRNA", "tRNA", "rRNA", "cDNA", "gDNA", "ATP", "ADP", "AMP", "NAD", "NADP", "FAD", "CoA", "pH", "pKa", "pKb", "pI", "PCR", "RT", "qPCR", "NGS", "WGS", "IC50", "EC50", "KD", "Ki", "XRD", "NMR", "IR", "UV", "VIS", "MS", "GC", "HPLC", "SEM", "TEM", "AFM", "STM", "S/N", "SNR", "RMS", "Std", "Var", "Cov", "et al.", "vs.", "cf.", "viz.", "Fig", "Eq", "Ref", "Tab", "Suppl", ] new_tokens.extend(sci_abbrevs) # Chemical element symbols elements = [ "H", "He", "Li", "Be", "B", "C", "N", "O", "F", "Ne", "Na", "Mg", "Al", "Si", "P", "S", "Cl", "Ar", "K", "Ca", "Sc", "Ti", "V", "Cr", "Mn", "Fe", "Co", "Ni", "Cu", "Zn", "Ga", "Ge", "As", "Se", "Br", "Kr", "Rb", "Sr", "Y", "Zr", "Nb", "Mo", "Tc", "Ru", "Rh", "Pd", "Ag", "Cd", "In", "Sn", "Sb", "Te", "I", "Xe", "Cs", "Ba", "La", "Ce", "Pr", "Nd", "Pm", "Sm", "Eu", "Gd", "Tb", "Dy", "Ho", "Er", "Tm", "Yb", "Lu", "Hf", "Ta", "W", "Re", "Os", "Ir", "Pt", "Au", "Hg", "Tl", "Pb", "Bi", "Po", "At", "Rn", "Fr", "Ra", "Ac", "Th", "Pa", "U", "Np", "Pu", "Am", "Cm", "Bk", "Cf", "Es", "Fm", "Md", "No", "Lr", "Rf", "Db", "Sg", "Bh", "Hs", "Mt", "Ds", "Rg", "Cn", "Nh", "Fl", "Mc", "Lv", "Ts", "Og", ] new_tokens.extend(elements) # Amino acid single-letter codes amino_acids = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V"] new_tokens.extend(amino_acids) # Mathematical operators (Unicode) math_ops = [ "±", "∓", "×", "÷", "∈", "∉", "∋", "∏", "∑", "∧", "∨", "¬", "≤", "≥", "≠", "≈", "≡", "≅", "≆", "≇", "≉", "≊", "≋", "⊂", "⊃", "⊆", "⊇", "⊄", "⊅", "⊈", "⊉", "∞", "∂", "∇", "√", "∛", "∜", "∫", "∬", "∭", "∮", "∯", "∰", "∴", "∵", "∶", "∷", "∼", "∽", "≈", "≋", "⟨", "⟩", "|", "‖", "‵", "′", "″", "‴", "•", "·", "‣", "⁂", "※", "‼", "⁇", "⁈", ] new_tokens.extend(math_ops) # Add tokens that aren't already in vocabulary for token in new_tokens: if token not in current_vocab: self.tokenizer.add_tokens([token]) print(f"Extended vocabulary with {len(new_tokens)} science tokens") print(f"Final vocabulary size: {self.tokenizer.get_vocab_size()}") def save(self, path: str): """Save tokenizer to disk.""" self.tokenizer.save(path) print(f"Tokenizer saved to {path}") def encode( self, text: str, add_special_tokens: bool = True, return_tensors: str = "pt", ) -> Union[Dict, torch.Tensor]: """ Encode text to token IDs. Args: text: Input text add_special_tokens: Add BOS/EOS tokens return_tensors: "pt" for PyTorch tensors, "np" for numpy, None for list Returns: Dictionary with input_ids and attention_mask, or tensors/list """ encoding = self.tokenizer.encode(text, add_special_tokens=add_special_tokens) result = { "input_ids": encoding.ids, "attention_mask": encoding.attention_mask, } if return_tensors == "pt": result = {k: torch.tensor(v).unsqueeze(0) for k, v in result.items()} elif return_tensors == "np": import numpy as np result = {k: np.array(v) for k, v in result.items()} return result def decode(self, token_ids: List[int], skip_special_tokens: bool = True) -> str: """Decode token IDs back to text.""" return self.tokenizer.decode(token_ids, skip_special_tokens=skip_special_tokens) def batch_encode( self, texts: List[str], padding: bool = True, truncation: bool = True, max_length: Optional[int] = None, return_tensors: str = "pt", ) -> Dict: """ Encode a batch of texts. Args: texts: List of input texts padding: Pad to same length truncation: Truncate to max_length max_length: Maximum sequence length return_tensors: Tensor format Returns: Batch encoded dictionary """ if max_length is None: max_length = self.config.get("max_seq_len", 16384) encodings = self.tokenizer.encode_batch( texts, add_special_tokens=True, ) # Manual padding/truncation input_ids = [] attention_masks = [] for enc in encodings: ids = enc.ids mask = enc.attention_mask if truncation and len(ids) > max_length: ids = ids[:max_length] mask = mask[:max_length] input_ids.append(ids) attention_masks.append(mask) # Pad to same length if requested if padding: max_len = max(len(ids) for ids in input_ids) padded_ids = [] padded_masks = [] for ids, mask in zip(input_ids, attention_masks): pad_len = max_len - len(ids) padded_ids.append(ids + [self.special_tokens["[PAD]"]] * pad_len) padded_masks.append(mask + [0] * pad_len) input_ids = padded_ids attention_masks = padded_masks result = { "input_ids": input_ids, "attention_mask": attention_masks, } if return_tensors == "pt": result = {k: torch.tensor(v) for k, v in result.items()} return result @property def vocab_size(self) -> int: """Get vocabulary size.""" return self.tokenizer.get_vocab_size() def get_vocab(self) -> Dict[str, int]: """Get vocabulary dictionary.""" return self.tokenizer.get_vocab() def token_to_id(self, token: str) -> int: """Convert token to ID.""" return self.tokenizer.token_to_id(token) def id_to_token(self, id: int) -> str: """Convert ID to token.""" return self.tokenizer.id_to_token(id) def build_science_vocabulary_file(output_path: str): """ Build a science vocabulary text file for BPE training. This file contains seed vocabulary terms to ensure science tokens are present. """ science_terms = [] # LaTeX commands latex_terms = [ "\\alpha", "\\beta", "\\gamma", "\\delta", "\\epsilon", "\\zeta", "\\eta", "\\theta", "\\iota", "\\kappa", "\\lambda", "\\mu", "\\nu", "\\xi", "\\pi", "\\rho", "\\sigma", "\\tau", "\\upsilon", "\\phi", "\\chi", "\\psi", "\\omega", "\\sum", "\\prod", "\\int", "\\partial", "\\nabla", "\\infty", "\\frac", "\\sqrt", "\\binom", "\\begin", "\\end", "\\mathbf", "\\mathcal", "\\mathrm", "\\mathbb", "\\in", "\\subset", "\\cup", "\\cap", "\\forall", "\\exists", "\\rightarrow", "\\leftarrow", "\\Rightarrow", "\\Leftarrow", "\\leq", "\\geq", "\\neq", "\\approx", "\\equiv", ] science_terms.extend(latex_terms) # Chemical formulas chem_formulas = [ "H2O", "CO2", "O2", "N2", "H2", "CH4", "C2H6", "C3H8", "C6H12O6", "C12H22O11", "HCl", "H2SO4", "HNO3", "H3PO4", "NaOH", "KOH", "CaCO3", "NaCl", "KCl", "MgCl2", "Fe2O3", "Fe3O4", "CuO", "Cu2O", "ZnO", "Al2O3", "SiO2", "TiO2", "MnO2", "NH3", "NO", "NO2", "N2O", "SO2", "SO3", "CO", "CH3COOH", "C2H5OH", ] science_terms.extend(chem_formulas) # Mathematical expressions math_exprs = [ "x^2", "x^3", "e^x", "ln(x)", "log(x)", "sin(x)", "cos(x)", "tan(x)", "arcsin(x)", "arccos(x)", "arctan(x)", "f(x)", "g(x)", "h(x)", "F(x)", "G(x)", "dx", "dy", "dz", "dt", "∂x", "∂y", "∂z", "∫", "∬", "∭", "∮", "∑_{i=1}^{n}", "∏_{i=1}^{n}", ] science_terms.extend(math_exprs) # Units with numbers unit_exprs = [ "10^6", "10^9", "10^12", "10^15", "10^18", "10^-3", "10^-6", "10^-9", "10^-12", "m/s", "km/h", "cm/s", "mm/s", "J/mol", "kJ/mol", "cal", "kcal", "eV", "MeV", "GeV", "TeV", "Hz", "kHz", "MHz", "GHz", "Pa", "kPa", "MPa", "GPa", "°C", "K", "°F", ] science_terms.extend(unit_exprs) # Write to file with open(output_path, "w", encoding="utf-8") as f: for term in science_terms: f.write(term + "\n") print(f"Science vocabulary seed file written to {output_path}") print(f"Total seed terms: {len(science_terms)}") if __name__ == "__main__": # Example usage import sys if len(sys.argv) < 2: print("Usage: python vortex_tokenizer.py [output_dir]") sys.exit(1) train_data = sys.argv[1] output_dir = sys.argv[2] if len(sys.argv) > 2 else "." # Load config (simplified for standalone) config = { "special_tokens": { "[PAD]": 0, "[UNK]": 1, "[BOS]": 2, "[EOS]": 3, "[EQUATION]": 4, "[/EQUATION]": 5, "[CITATION]": 6, "[/CITATION]": 7, "[MOLECULE]": 8, "[/MOLECULE]": 9, "[FIGURE]": 10, "[TABLE]": 11, "[MATH]": 12, "[CHEM]": 13, "[BIO]": 14, "[PHYS]": 15, "[EARTH]": 16, "[SPACE]": 17, "[ZOO]": 18, }, "domain_tags": ["[MATH]", "[CHEM]", "[BIO]", "[PHYS]", "[EARTH]", "[SPACE]", "[ZOO]"], "max_seq_len": 16384, } # Build seed vocabulary seed_vocab_path = os.path.join(output_dir, "science_seed_vocab.txt") build_science_vocabulary_file(seed_vocab_path) # Initialize and train tokenizer tokenizer = VortexScienceTokenizer(config) tokenizer.train([train_data]) # Save tokenizer tokenizer_path = os.path.join(output_dir, "vortex_tokenizer.json") tokenizer.save(tokenizer_path) print(f"Tokenizer saved to {tokenizer_path}")