bpe-tokenizer-id / bpe_tokenizer.py
romizone's picture
Upload BPE Tokenizer Bahasa Indonesia
fbf2c3e verified
"""
BPE (Byte Pair Encoding) Tokenizer - Built from Scratch
Bahasa Indonesia Tokenizer untuk Hugging Face
Author: Jekardah AI Lab
"""
import json
import re
import os
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Optional
class BPETokenizer:
"""
Byte Pair Encoding Tokenizer built from scratch.
Learns subword units from raw text data without requiring any dictionary.
"""
def __init__(self, vocab_size: int = 32000, do_lower_case: bool = True):
self.vocab_size = vocab_size
self.do_lower_case = do_lower_case
self.vocab = {} # token -> id
self.inverse_vocab = {} # id -> token
self.merges = [] # list of (pair_a, pair_b) merge rules
self._merge_priority = {} # (pair) -> priority index for fast lookup
self.pattern = re.compile(
r"""'nya|'kan|'lah|'kah|'pun| ?\w+| ?\d+| ?[^\s\w\d]+|\s+(?!\S)|\s+"""
)
# Special tokens
self.special_tokens = {
"<PAD>": 0,
"<UNK>": 1,
"<BOS>": 2,
"<EOS>": 3,
}
def _get_pairs(self, word: List[str]) -> Counter:
"""Get frequency of adjacent pairs in a word."""
pairs = Counter()
for i in range(len(word) - 1):
pairs[(word[i], word[i + 1])] += 1
return pairs
def _get_corpus_pairs(self, corpus: Dict[tuple, int]) -> Counter:
"""Get frequency of all adjacent pairs across the entire corpus."""
pairs = Counter()
for word, freq in corpus.items():
for i in range(len(word) - 1):
pairs[(word[i], word[i + 1])] += freq
return pairs
def _merge_pair(self, pair: Tuple[str, str], corpus: Dict[tuple, int]) -> Dict[tuple, int]:
"""Merge all occurrences of a pair in the corpus."""
new_corpus = {}
bigram = pair
for word, freq in corpus.items():
new_word = []
i = 0
while i < len(word):
if i < len(word) - 1 and word[i] == bigram[0] and word[i + 1] == bigram[1]:
new_word.append(bigram[0] + bigram[1])
i += 2
else:
new_word.append(word[i])
i += 1
new_corpus[tuple(new_word)] = freq
return new_corpus
def _pre_tokenize(self, text: str) -> List[str]:
"""Split text into initial words/chunks."""
return self.pattern.findall(text)
def train(self, texts: List[str], min_frequency: int = 2, verbose: bool = True):
"""
Train BPE tokenizer on a list of texts.
Args:
texts: List of training text strings
min_frequency: Minimum pair frequency to consider for merging
verbose: Print progress during training
"""
if verbose:
print("=" * 60)
print("🚀 Training BPE Tokenizer")
print(f" Target vocab size: {self.vocab_size}")
print(f" Training texts: {len(texts)}")
print("=" * 60)
# Step 1: Pre-tokenize and build initial corpus
if verbose:
print("\n📝 Step 1: Pre-tokenizing text...")
word_freqs = Counter()
for text in texts:
text_input = text.lower() if self.do_lower_case else text
words = self._pre_tokenize(text_input)
for word in words:
word_freqs[word] += 1
if verbose:
print(f" Found {len(word_freqs)} unique words")
# Step 2: Initialize corpus as character-level splits
if verbose:
print("\n🔤 Step 2: Initializing character-level tokens...")
corpus = {}
for word, freq in word_freqs.items():
chars = tuple(list(word))
corpus[chars] = freq
# Build initial character vocabulary
char_vocab = set()
for word in corpus.keys():
for char in word:
char_vocab.add(char)
if verbose:
print(f" Initial character vocab: {len(char_vocab)} characters")
# Step 3: Iteratively merge most frequent pairs
if verbose:
print(f"\n🔗 Step 3: Learning merges (target: {self.vocab_size} tokens)...")
num_merges = self.vocab_size - len(char_vocab) - len(self.special_tokens)
self.merges = []
for i in range(num_merges):
pairs = self._get_corpus_pairs(corpus)
if not pairs:
if verbose:
print(f" No more pairs to merge at step {i}")
break
best_pair = pairs.most_common(1)[0]
if best_pair[1] < min_frequency:
if verbose:
print(f" Stopping at step {i}: min frequency {min_frequency} reached")
break
pair = best_pair[0]
self.merges.append(pair)
corpus = self._merge_pair(pair, corpus)
if verbose and (i + 1) % 500 == 0:
merged_token = pair[0] + pair[1]
print(f" Merge {i + 1}/{num_merges}: '{pair[0]}' + '{pair[1]}' → '{merged_token}' (freq: {best_pair[1]})")
if verbose:
print(f" Total merges learned: {len(self.merges)}")
# Step 4: Build final vocabulary
if verbose:
print("\n📚 Step 4: Building final vocabulary...")
self.vocab = dict(self.special_tokens)
idx = len(self.special_tokens)
# Add individual characters
for char in sorted(char_vocab):
if char not in self.vocab:
self.vocab[char] = idx
idx += 1
# Add merged tokens
for pair in self.merges:
merged = pair[0] + pair[1]
if merged not in self.vocab:
self.vocab[merged] = idx
idx += 1
self.inverse_vocab = {v: k for k, v in self.vocab.items()}
self._merge_priority = {pair: i for i, pair in enumerate(self.merges)}
if verbose:
print(f" Final vocab size: {len(self.vocab)}")
print("\n✅ Training complete!")
print("=" * 60)
def _apply_merges(self, tokens: List[str]) -> List[str]:
"""Apply learned merge rules to a list of tokens using greedy-by-priority."""
while len(tokens) >= 2:
# Find the adjacent pair with the highest priority (lowest index)
best_pair = None
best_rank = float('inf')
for i in range(len(tokens) - 1):
pair = (tokens[i], tokens[i + 1])
rank = self._merge_priority.get(pair, float('inf'))
if rank < best_rank:
best_rank = rank
best_pair = pair
if best_pair is None or best_rank == float('inf'):
break
# Merge all occurrences of best_pair
new_tokens = []
i = 0
while i < len(tokens):
if i < len(tokens) - 1 and tokens[i] == best_pair[0] and tokens[i + 1] == best_pair[1]:
new_tokens.append(best_pair[0] + best_pair[1])
i += 2
else:
new_tokens.append(tokens[i])
i += 1
tokens = new_tokens
return tokens
def encode(self, text: str) -> List[int]:
"""
Encode text to token IDs.
Args:
text: Input text string
Returns:
List of token IDs
"""
text_input = text.lower() if self.do_lower_case else text
words = self._pre_tokenize(text_input)
ids = []
for word in words:
chars = list(word)
tokens = self._apply_merges(chars)
for token in tokens:
if token in self.vocab:
ids.append(self.vocab[token])
else:
ids.append(self.special_tokens["<UNK>"])
return ids
def decode(self, ids: List[int]) -> str:
"""
Decode token IDs back to text.
Args:
ids: List of token IDs
Returns:
Decoded text string
"""
tokens = []
for token_id in ids:
if token_id in self.inverse_vocab:
tokens.append(self.inverse_vocab[token_id])
else:
tokens.append("<UNK>")
return "".join(tokens)
def tokenize(self, text: str) -> List[str]:
"""
Tokenize text into subword tokens (string form).
Args:
text: Input text string
Returns:
List of token strings
"""
text_input = text.lower() if self.do_lower_case else text
words = self._pre_tokenize(text_input)
all_tokens = []
for word in words:
chars = list(word)
tokens = self._apply_merges(chars)
all_tokens.extend(tokens)
return all_tokens
def save(self, directory: str):
"""Save tokenizer to directory (HuggingFace compatible format)."""
os.makedirs(directory, exist_ok=True)
# 1. vocab.json
with open(os.path.join(directory, "vocab.json"), "w", encoding="utf-8") as f:
json.dump(self.vocab, f, ensure_ascii=False, indent=2)
# 2. merges.txt (space-separated with U+2581 for literal spaces)
with open(os.path.join(directory, "merges.txt"), "w", encoding="utf-8") as f:
f.write("#version: 0.3\n")
for pair in self.merges:
a = pair[0].replace(' ', '\u2581')
b = pair[1].replace(' ', '\u2581')
f.write(f"{a} {b}\n")
# 3. tokenizer_config.json
config = {
"tokenizer_class": "BPETokenizer",
"vocab_size": len(self.vocab),
"model_type": "bpe",
"special_tokens": self.special_tokens,
"do_lower_case": self.do_lower_case,
"language": "id",
}
with open(os.path.join(directory, "tokenizer_config.json"), "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 4. special_tokens_map.json
special_map = {
"pad_token": "<PAD>",
"unk_token": "<UNK>",
"bos_token": "<BOS>",
"eos_token": "<EOS>",
}
with open(os.path.join(directory, "special_tokens_map.json"), "w", encoding="utf-8") as f:
json.dump(special_map, f, ensure_ascii=False, indent=2)
# 5. tokenizer.json (HuggingFace format)
hf_tokenizer = {
"version": "1.0",
"model": {
"type": "BPE",
"vocab": self.vocab,
"merges": [
f"{p[0].replace(' ', chr(0x2581))} {p[1].replace(' ', chr(0x2581))}"
for p in self.merges
],
},
"pre_tokenizer": {
"type": "Split",
"pattern": {"Regex": self.pattern.pattern},
"behavior": "Isolated",
},
"decoder": {
"type": "Fuse",
},
"added_tokens": [
{"id": v, "content": k, "special": True}
for k, v in self.special_tokens.items()
],
}
if self.do_lower_case:
hf_tokenizer["normalizer"] = {"type": "Lowercase"}
with open(os.path.join(directory, "tokenizer.json"), "w", encoding="utf-8") as f:
json.dump(hf_tokenizer, f, ensure_ascii=False, indent=2)
print(f"💾 Tokenizer saved to: {directory}")
@classmethod
def from_pretrained(cls, directory: str) -> "BPETokenizer":
"""Load tokenizer from directory."""
tokenizer = cls()
# Load vocab
with open(os.path.join(directory, "vocab.json"), "r", encoding="utf-8") as f:
tokenizer.vocab = json.load(f)
tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()}
# Load merges (supports both old JSON+tab and new space-separated formats)
tokenizer.merges = []
with open(os.path.join(directory, "merges.txt"), "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
if "\t" in line:
# Old JSON+tab format (backward compat)
parts = line.split("\t")
if len(parts) == 2:
a = json.loads(parts[0])
b = json.loads(parts[1])
tokenizer.merges.append((a, b))
else:
# New space-separated format with U+2581 escape
parts = line.split(" ", 1)
if len(parts) == 2:
a = parts[0].replace('\u2581', ' ')
b = parts[1].replace('\u2581', ' ')
tokenizer.merges.append((a, b))
tokenizer._merge_priority = {pair: i for i, pair in enumerate(tokenizer.merges)}
# Load config
with open(os.path.join(directory, "tokenizer_config.json"), "r", encoding="utf-8") as f:
config = json.load(f)
tokenizer.special_tokens = config.get("special_tokens", tokenizer.special_tokens)
tokenizer.vocab_size = config.get("vocab_size", len(tokenizer.vocab))
tokenizer.do_lower_case = config.get("do_lower_case", True)
print(f"✅ Tokenizer loaded from: {directory}")
return tokenizer
if __name__ == "__main__":
# Quick test
tokenizer = BPETokenizer(vocab_size=1000)
sample_texts = [
"Saya suka makan nasi goreng di Jakarta",
"Indonesia adalah negara kepulauan terbesar di dunia",
]
tokenizer.train(sample_texts, min_frequency=1)
test = "saya makan nasi goreng"
tokens = tokenizer.tokenize(test)
ids = tokenizer.encode(test)
decoded = tokenizer.decode(ids)
print(f"\nInput: {test}")
print(f"Tokens: {tokens}")
print(f"IDs: {ids}")
print(f"Decoded: {decoded}")