|
|
""" |
|
|
Custom BPE Tokenizer for SLM v1. |
|
|
16,384 vocabulary size optimized for conversational use. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
from typing import List, Optional, Union |
|
|
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, processors, decoders |
|
|
from tokenizers.normalizers import NFKC, Lowercase, Sequence |
|
|
|
|
|
|
|
|
class SLMTokenizer: |
|
|
"""Custom BPE tokenizer for the SLM model. |
|
|
|
|
|
Features: |
|
|
- 16,384 token vocabulary (memory efficient) |
|
|
- Special tokens for conversation format |
|
|
- Compatible with the model's embedding layer |
|
|
""" |
|
|
|
|
|
|
|
|
PAD_TOKEN = "<|pad|>" |
|
|
BOS_TOKEN = "<|bos|>" |
|
|
EOS_TOKEN = "<|eos|>" |
|
|
UNK_TOKEN = "<|unk|>" |
|
|
USER_TOKEN = "<|user|>" |
|
|
ASSISTANT_TOKEN = "<|assistant|>" |
|
|
|
|
|
SPECIAL_TOKENS = [PAD_TOKEN, BOS_TOKEN, EOS_TOKEN, UNK_TOKEN, USER_TOKEN, ASSISTANT_TOKEN] |
|
|
|
|
|
def __init__(self, tokenizer: Optional[Tokenizer] = None): |
|
|
"""Initialize tokenizer. |
|
|
|
|
|
Args: |
|
|
tokenizer: Pre-trained HuggingFace tokenizer object |
|
|
""" |
|
|
self.tokenizer = tokenizer |
|
|
self._setup_special_token_ids() |
|
|
|
|
|
def _setup_special_token_ids(self): |
|
|
"""Setup special token IDs for easy access.""" |
|
|
if self.tokenizer is not None: |
|
|
self.pad_token_id = self.tokenizer.token_to_id(self.PAD_TOKEN) |
|
|
self.bos_token_id = self.tokenizer.token_to_id(self.BOS_TOKEN) |
|
|
self.eos_token_id = self.tokenizer.token_to_id(self.EOS_TOKEN) |
|
|
self.unk_token_id = self.tokenizer.token_to_id(self.UNK_TOKEN) |
|
|
self.user_token_id = self.tokenizer.token_to_id(self.USER_TOKEN) |
|
|
self.assistant_token_id = self.tokenizer.token_to_id(self.ASSISTANT_TOKEN) |
|
|
|
|
|
@classmethod |
|
|
def train( |
|
|
cls, |
|
|
files: List[str], |
|
|
vocab_size: int = 16384, |
|
|
min_frequency: int = 2, |
|
|
save_path: Optional[str] = None, |
|
|
) -> "SLMTokenizer": |
|
|
"""Train a new BPE tokenizer on the given files. |
|
|
|
|
|
Args: |
|
|
files: List of text file paths to train on |
|
|
vocab_size: Size of vocabulary (default 16,384) |
|
|
min_frequency: Minimum token frequency to include |
|
|
save_path: Optional path to save the trained tokenizer |
|
|
|
|
|
Returns: |
|
|
Trained SLMTokenizer instance |
|
|
""" |
|
|
print(f"Training BPE tokenizer with vocab_size={vocab_size}...") |
|
|
print(f"Training files: {files}") |
|
|
|
|
|
|
|
|
tokenizer = Tokenizer(models.BPE(unk_token=cls.UNK_TOKEN)) |
|
|
|
|
|
|
|
|
|
|
|
tokenizer.normalizer = NFKC() |
|
|
|
|
|
|
|
|
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False) |
|
|
|
|
|
|
|
|
tokenizer.decoder = decoders.ByteLevel() |
|
|
|
|
|
|
|
|
trainer = trainers.BpeTrainer( |
|
|
vocab_size=vocab_size, |
|
|
min_frequency=min_frequency, |
|
|
special_tokens=cls.SPECIAL_TOKENS, |
|
|
show_progress=True, |
|
|
) |
|
|
|
|
|
|
|
|
tokenizer.train(files, trainer) |
|
|
|
|
|
|
|
|
tokenizer.post_processor = processors.TemplateProcessing( |
|
|
single=f"{cls.BOS_TOKEN} $A {cls.EOS_TOKEN}", |
|
|
pair=f"{cls.BOS_TOKEN} $A {cls.EOS_TOKEN} {cls.BOS_TOKEN} $B {cls.EOS_TOKEN}", |
|
|
special_tokens=[ |
|
|
(cls.BOS_TOKEN, tokenizer.token_to_id(cls.BOS_TOKEN)), |
|
|
(cls.EOS_TOKEN, tokenizer.token_to_id(cls.EOS_TOKEN)), |
|
|
], |
|
|
) |
|
|
|
|
|
print(f"Tokenizer trained! Vocabulary size: {tokenizer.get_vocab_size()}") |
|
|
|
|
|
|
|
|
instance = cls(tokenizer) |
|
|
|
|
|
|
|
|
if save_path: |
|
|
instance.save(save_path) |
|
|
|
|
|
return instance |
|
|
|
|
|
@classmethod |
|
|
def from_file(cls, path: str) -> "SLMTokenizer": |
|
|
"""Load a tokenizer from a saved file. |
|
|
|
|
|
Args: |
|
|
path: Path to the tokenizer.json file |
|
|
|
|
|
Returns: |
|
|
Loaded SLMTokenizer instance |
|
|
""" |
|
|
tokenizer = Tokenizer.from_file(path) |
|
|
return cls(tokenizer) |
|
|
|
|
|
def save(self, path: str): |
|
|
"""Save the tokenizer to a file. |
|
|
|
|
|
Args: |
|
|
path: Path to save the tokenizer (directory or file) |
|
|
""" |
|
|
if os.path.isdir(path): |
|
|
save_path = os.path.join(path, "tokenizer.json") |
|
|
else: |
|
|
save_path = path |
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True) |
|
|
|
|
|
self.tokenizer.save(save_path) |
|
|
print(f"Tokenizer saved to: {save_path}") |
|
|
|
|
|
|
|
|
config_path = save_path.replace("tokenizer.json", "tokenizer_config.json") |
|
|
config = { |
|
|
"vocab_size": self.vocab_size, |
|
|
"pad_token": self.PAD_TOKEN, |
|
|
"bos_token": self.BOS_TOKEN, |
|
|
"eos_token": self.EOS_TOKEN, |
|
|
"unk_token": self.UNK_TOKEN, |
|
|
"user_token": self.USER_TOKEN, |
|
|
"assistant_token": self.ASSISTANT_TOKEN, |
|
|
} |
|
|
with open(config_path, "w") as f: |
|
|
json.dump(config, f, indent=2) |
|
|
print(f"Tokenizer config saved to: {config_path}") |
|
|
|
|
|
def encode( |
|
|
self, |
|
|
text: str, |
|
|
add_special_tokens: bool = True, |
|
|
max_length: Optional[int] = None, |
|
|
padding: bool = False, |
|
|
truncation: bool = False, |
|
|
) -> List[int]: |
|
|
"""Encode text to token IDs. |
|
|
|
|
|
Args: |
|
|
text: Input text string |
|
|
add_special_tokens: Whether to add BOS/EOS tokens |
|
|
max_length: Maximum sequence length |
|
|
padding: Whether to pad to max_length |
|
|
truncation: Whether to truncate to max_length |
|
|
|
|
|
Returns: |
|
|
List of token IDs |
|
|
""" |
|
|
|
|
|
if add_special_tokens: |
|
|
encoding = self.tokenizer.encode(text) |
|
|
else: |
|
|
encoding = self.tokenizer.encode(text, add_special_tokens=False) |
|
|
|
|
|
ids = encoding.ids |
|
|
|
|
|
|
|
|
if truncation and max_length and len(ids) > max_length: |
|
|
ids = ids[:max_length] |
|
|
|
|
|
if add_special_tokens and ids[-1] != self.eos_token_id: |
|
|
ids[-1] = self.eos_token_id |
|
|
|
|
|
|
|
|
if padding and max_length and len(ids) < max_length: |
|
|
ids = ids + [self.pad_token_id] * (max_length - len(ids)) |
|
|
|
|
|
return ids |
|
|
|
|
|
def decode(self, ids: List[int], skip_special_tokens: bool = True) -> str: |
|
|
"""Decode token IDs to text. |
|
|
|
|
|
Args: |
|
|
ids: List of token IDs |
|
|
skip_special_tokens: Whether to remove special tokens |
|
|
|
|
|
Returns: |
|
|
Decoded text string |
|
|
""" |
|
|
return self.tokenizer.decode(ids, skip_special_tokens=skip_special_tokens) |
|
|
|
|
|
def encode_conversation( |
|
|
self, |
|
|
user_message: str, |
|
|
assistant_message: Optional[str] = None, |
|
|
max_length: Optional[int] = None, |
|
|
) -> List[int]: |
|
|
"""Encode a conversation turn. |
|
|
|
|
|
Format: <|bos|><|user|>message<|assistant|>response<|eos|> |
|
|
|
|
|
Args: |
|
|
user_message: The user's message |
|
|
assistant_message: Optional assistant response |
|
|
max_length: Maximum sequence length |
|
|
|
|
|
Returns: |
|
|
List of token IDs |
|
|
""" |
|
|
|
|
|
if assistant_message: |
|
|
text = f"{self.USER_TOKEN}{user_message}{self.ASSISTANT_TOKEN}{assistant_message}" |
|
|
else: |
|
|
|
|
|
text = f"{self.USER_TOKEN}{user_message}{self.ASSISTANT_TOKEN}" |
|
|
|
|
|
return self.encode(text, add_special_tokens=True, max_length=max_length, truncation=True) |
|
|
|
|
|
@property |
|
|
def vocab_size(self) -> int: |
|
|
"""Get vocabulary size.""" |
|
|
return self.tokenizer.get_vocab_size() |
|
|
|
|
|
def get_vocab(self) -> dict: |
|
|
"""Get the vocabulary as a dictionary.""" |
|
|
return self.tokenizer.get_vocab() |
|
|
|
|
|
def __len__(self) -> int: |
|
|
"""Return vocabulary size.""" |
|
|
return self.vocab_size |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
text: Union[str, List[str]], |
|
|
max_length: Optional[int] = None, |
|
|
padding: bool = False, |
|
|
truncation: bool = False, |
|
|
return_tensors: Optional[str] = None, |
|
|
) -> dict: |
|
|
"""Tokenize text (HuggingFace-style interface). |
|
|
|
|
|
Args: |
|
|
text: Input text or list of texts |
|
|
max_length: Maximum sequence length |
|
|
padding: Whether to pad sequences |
|
|
truncation: Whether to truncate sequences |
|
|
return_tensors: If "pt", return PyTorch tensors |
|
|
|
|
|
Returns: |
|
|
Dictionary with input_ids and attention_mask |
|
|
""" |
|
|
if isinstance(text, str): |
|
|
text = [text] |
|
|
|
|
|
all_ids = [] |
|
|
for t in text: |
|
|
ids = self.encode( |
|
|
t, |
|
|
max_length=max_length, |
|
|
padding=padding, |
|
|
truncation=truncation, |
|
|
) |
|
|
all_ids.append(ids) |
|
|
|
|
|
|
|
|
attention_mask = [[1 if id != self.pad_token_id else 0 for id in ids] for ids in all_ids] |
|
|
|
|
|
result = { |
|
|
"input_ids": all_ids, |
|
|
"attention_mask": attention_mask, |
|
|
} |
|
|
|
|
|
if return_tensors == "pt": |
|
|
import torch |
|
|
result["input_ids"] = torch.tensor(all_ids) |
|
|
result["attention_mask"] = torch.tensor(attention_mask) |
|
|
|
|
|
return result |
|
|
|