| |
| """ |
| Export SentencePiece model to HuggingFace tokenizer format. |
| Uses a custom tokenizer class that properly wraps SentencePiece. |
| """ |
| import argparse |
| import shutil |
| import json |
| from pathlib import Path |
| import sentencepiece as spm |
| from transformers import PreTrainedTokenizer |
|
|
| def read_specials(path: str): |
| if not path: |
| return [] |
| out = [] |
| for line in Path(path).read_text(encoding="utf-8").splitlines(): |
| s = line.strip() |
| if s: |
| out.append(s) |
| return out |
|
|
| def verify_spm_model(model_path: Path): |
| """Verify SentencePiece model can be loaded and works.""" |
| try: |
| sp = spm.SentencePieceProcessor() |
| sp.Load(str(model_path)) |
| vocab_size = sp.GetPieceSize() |
| print(f"[VERIFY] SentencePiece model loaded: {vocab_size} pieces") |
| |
| |
| test_text = "Україна" |
| ids = sp.EncodeAsIds(test_text) |
| decoded = sp.DecodeIds(ids) |
| print(f"[VERIFY] Test encode/decode: '{test_text}' -> {len(ids)} tokens -> '{decoded}'") |
| return vocab_size, sp |
| except Exception as e: |
| print(f"[ERROR] Failed to load SentencePiece model: {e}") |
| raise |
|
|
| class SentencePieceTokenizer(PreTrainedTokenizer): |
| """Custom tokenizer that properly wraps SentencePiece.""" |
| |
| vocab_files_names = {"vocab_file": "spiece.model"} |
| model_input_names = ["input_ids", "attention_mask"] |
| |
| def __init__( |
| self, |
| vocab_file, |
| bos_token="<s>", |
| eos_token="</s>", |
| unk_token="<unk>", |
| pad_token="<pad>", |
| **kwargs |
| ): |
| |
| |
| self.vocab_file = vocab_file |
| self.sp_model = spm.SentencePieceProcessor() |
| self.sp_model.Load(vocab_file) |
| |
| |
| |
| object.__setattr__(self, '_vocab_size', self.sp_model.GetPieceSize()) |
| |
| |
| |
| super().__init__( |
| bos_token=bos_token, |
| eos_token=eos_token, |
| unk_token=unk_token, |
| pad_token=pad_token, |
| **kwargs |
| ) |
| |
| @property |
| def vocab_size(self): |
| """Return vocabulary size from SentencePiece model.""" |
| if hasattr(self, '_vocab_size'): |
| return self._vocab_size |
| elif hasattr(self, 'sp_model'): |
| return self.sp_model.GetPieceSize() |
| else: |
| return 0 |
| |
| def get_vocab(self): |
| """Get vocabulary dictionary.""" |
| vocab = {} |
| |
| vocab_size = self._vocab_size if hasattr(self, '_vocab_size') else self.sp_model.GetPieceSize() |
| for i in range(vocab_size): |
| piece = self.sp_model.IdToPiece(i) |
| vocab[piece] = i |
| return vocab |
| |
| def _tokenize(self, text): |
| return self.sp_model.EncodeAsPieces(text) |
| |
| def _convert_token_to_id(self, token): |
| return self.sp_model.PieceToId(token) |
| |
| def _convert_id_to_token(self, index): |
| return self.sp_model.IdToPiece(index) |
| |
| def convert_tokens_to_string(self, tokens): |
| return self.sp_model.DecodePieces(tokens) |
| |
| def save_vocabulary(self, save_directory, filename_prefix=None): |
| """Save the SentencePiece model file.""" |
| save_dir = Path(save_directory) |
| if not save_dir.exists(): |
| save_dir.mkdir(parents=True, exist_ok=True) |
| |
| out_vocab_file = save_dir / "spiece.model" |
| |
| |
| source_path = Path(self.vocab_file).resolve() |
| dest_path = out_vocab_file.resolve() |
| |
| if source_path == dest_path: |
| |
| return (str(out_vocab_file),) |
| |
| |
| shutil.copy2(self.vocab_file, out_vocab_file) |
| return (str(out_vocab_file),) |
|
|
| def main(): |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--spm_model", required=True, help="Path to .model file") |
| ap.add_argument("--specials", required=True, help="Special tokens file (one per line)") |
| ap.add_argument("--out_dir", required=True, help="HF tokenizer output dir") |
| args = ap.parse_args() |
|
|
| out_dir = Path(args.out_dir) |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| spm_model_path = Path(args.spm_model) |
| if not spm_model_path.exists(): |
| raise FileNotFoundError(f"SentencePiece model not found: {spm_model_path}") |
|
|
| |
| print("🔍 Verifying SentencePiece model...") |
| spm_vocab_size, sp_model = verify_spm_model(spm_model_path) |
|
|
| specials = read_specials(args.specials) |
| print(f"[INFO] Found {len(specials)} special tokens") |
|
|
| |
| output_model = out_dir / "spiece.model" |
| print(f"[COPY] Copying {spm_model_path} -> {output_model}") |
| shutil.copy2(spm_model_path, output_model) |
| |
| |
| if not output_model.exists(): |
| raise FileNotFoundError(f"Failed to copy model file to {output_model}") |
| file_size = output_model.stat().st_size |
| print(f"[COPY] Model file size: {file_size:,} bytes") |
|
|
| |
| print("[INIT] Initializing SentencePieceTokenizer...") |
| tok = SentencePieceTokenizer( |
| vocab_file=str(output_model), |
| bos_token="<s>", |
| eos_token="</s>", |
| unk_token="<unk>", |
| pad_token="<pad>", |
| ) |
| |
| print(f"[INIT] Vocab size: {tok.vocab_size} (expected: {spm_vocab_size})") |
| |
| if tok.vocab_size != spm_vocab_size: |
| raise RuntimeError(f"Vocab size mismatch: {tok.vocab_size} != {spm_vocab_size}") |
| |
| |
| if specials: |
| print(f"[ADD] Adding {len(specials)} special tokens...") |
| |
| |
| tok.add_special_tokens({"additional_special_tokens": specials}) |
| print(f"[ADD] Special tokens registered") |
| |
| |
| print(f"[SAVE] Saving tokenizer to {out_dir}...") |
| tok.save_pretrained(str(out_dir)) |
| |
| |
| |
| script_path = Path(__file__).absolute() |
| tokenizer_py = out_dir / "tokenizer.py" |
| print(f"[COPY] Copying tokenizer class to {tokenizer_py}...") |
| shutil.copy2(script_path, tokenizer_py) |
| |
| |
| tokenizer_config_path = out_dir / "tokenizer_config.json" |
| |
| |
| config = { |
| "tokenizer_class": "SentencePieceTokenizer", |
| "auto_map": { |
| "AutoTokenizer": ["tokenizer.SentencePieceTokenizer", None] |
| }, |
| "model_type": "llama", |
| "vocab_size": tok.vocab_size, |
| "bos_token": "<s>", |
| "eos_token": "</s>", |
| "unk_token": "<unk>", |
| "pad_token": "<pad>", |
| } |
| |
| |
| if specials: |
| config["additional_special_tokens"] = specials |
| |
| with open(tokenizer_config_path, "w") as f: |
| json.dump(config, f, indent=2) |
| |
| print(f"[CONFIG] Updated tokenizer_config.json with auto_map") |
|
|
| print("\n[OK] saved HF tokenizer to:", out_dir) |
| print(f"[INFO] vocab_size: {tok.vocab_size}") |
| print(f"[INFO] added_special_tokens: {len(specials)}") |
| |
| |
| print("\n[TEST] Testing tokenizer...") |
| test_text = "Україна — це країна" |
| tokens = tok.encode(test_text, add_special_tokens=False) |
| decoded = tok.decode(tokens) |
| print(f"[TEST] Encoded '{test_text}' -> {len(tokens)} tokens") |
| print(f"[TEST] Decoded back: '{decoded}'") |
| |
| |
| if specials: |
| test_with_special = f"<user>Привіт</user><assistant>Вітаю!</assistant>" |
| tokens_special = tok.encode(test_with_special, add_special_tokens=False) |
| print(f"[TEST] Special tokens test: {len(tokens_special)} tokens") |
| |
| if len(tokens) == 0: |
| print("\n⚠️ WARNING: Tokenizer encoded 0 tokens!") |
| print(" The tokenizer may not be working correctly.") |
| else: |
| print("\n✅ Tokenizer export successful!") |
| print(f"\n💡 To use this tokenizer:") |
| print(f" # Method 1: Using AutoTokenizer (recommended)") |
| print(f" from transformers import AutoTokenizer") |
| print(f" tok = AutoTokenizer.from_pretrained('{out_dir}', trust_remote_code=True)") |
| print(f"\n # Method 2: Direct import") |
| print(f" import sys; sys.path.insert(0, '{out_dir}')") |
| print(f" from tokenizer import SentencePieceTokenizer") |
| print(f" tok = SentencePieceTokenizer.from_pretrained('{out_dir}')") |
|
|
| if __name__ == "__main__": |
| main() |
|
|