| """ |
| BPE (Byte Pair Encoding) Tokenizer - Built from Scratch |
| Bahasa Indonesia Tokenizer untuk Hugging Face |
| |
| Author: Jekardah AI Lab |
| """ |
|
|
| import json |
| import re |
| import os |
| from collections import Counter, defaultdict |
| from typing import List, Dict, Tuple, Optional |
|
|
|
|
| class BPETokenizer: |
| """ |
| Byte Pair Encoding Tokenizer built from scratch. |
| Learns subword units from raw text data without requiring any dictionary. |
| """ |
|
|
| def __init__(self, vocab_size: int = 32000, do_lower_case: bool = True): |
| self.vocab_size = vocab_size |
| self.do_lower_case = do_lower_case |
| self.vocab = {} |
| self.inverse_vocab = {} |
| self.merges = [] |
| self._merge_priority = {} |
| self.pattern = re.compile( |
| r"""'nya|'kan|'lah|'kah|'pun| ?\w+| ?\d+| ?[^\s\w\d]+|\s+(?!\S)|\s+""" |
| ) |
|
|
| |
| self.special_tokens = { |
| "<PAD>": 0, |
| "<UNK>": 1, |
| "<BOS>": 2, |
| "<EOS>": 3, |
| } |
|
|
| def _get_pairs(self, word: List[str]) -> Counter: |
| """Get frequency of adjacent pairs in a word.""" |
| pairs = Counter() |
| for i in range(len(word) - 1): |
| pairs[(word[i], word[i + 1])] += 1 |
| return pairs |
|
|
| def _get_corpus_pairs(self, corpus: Dict[tuple, int]) -> Counter: |
| """Get frequency of all adjacent pairs across the entire corpus.""" |
| pairs = Counter() |
| for word, freq in corpus.items(): |
| for i in range(len(word) - 1): |
| pairs[(word[i], word[i + 1])] += freq |
| return pairs |
|
|
| def _merge_pair(self, pair: Tuple[str, str], corpus: Dict[tuple, int]) -> Dict[tuple, int]: |
| """Merge all occurrences of a pair in the corpus.""" |
| new_corpus = {} |
| bigram = pair |
| for word, freq in corpus.items(): |
| new_word = [] |
| i = 0 |
| while i < len(word): |
| if i < len(word) - 1 and word[i] == bigram[0] and word[i + 1] == bigram[1]: |
| new_word.append(bigram[0] + bigram[1]) |
| i += 2 |
| else: |
| new_word.append(word[i]) |
| i += 1 |
| new_corpus[tuple(new_word)] = freq |
| return new_corpus |
|
|
| def _pre_tokenize(self, text: str) -> List[str]: |
| """Split text into initial words/chunks.""" |
| return self.pattern.findall(text) |
|
|
| def train(self, texts: List[str], min_frequency: int = 2, verbose: bool = True): |
| """ |
| Train BPE tokenizer on a list of texts. |
| |
| Args: |
| texts: List of training text strings |
| min_frequency: Minimum pair frequency to consider for merging |
| verbose: Print progress during training |
| """ |
| if verbose: |
| print("=" * 60) |
| print("🚀 Training BPE Tokenizer") |
| print(f" Target vocab size: {self.vocab_size}") |
| print(f" Training texts: {len(texts)}") |
| print("=" * 60) |
|
|
| |
| if verbose: |
| print("\n📝 Step 1: Pre-tokenizing text...") |
|
|
| word_freqs = Counter() |
| for text in texts: |
| text_input = text.lower() if self.do_lower_case else text |
| words = self._pre_tokenize(text_input) |
| for word in words: |
| word_freqs[word] += 1 |
|
|
| if verbose: |
| print(f" Found {len(word_freqs)} unique words") |
|
|
| |
| if verbose: |
| print("\n🔤 Step 2: Initializing character-level tokens...") |
|
|
| corpus = {} |
| for word, freq in word_freqs.items(): |
| chars = tuple(list(word)) |
| corpus[chars] = freq |
|
|
| |
| char_vocab = set() |
| for word in corpus.keys(): |
| for char in word: |
| char_vocab.add(char) |
|
|
| if verbose: |
| print(f" Initial character vocab: {len(char_vocab)} characters") |
|
|
| |
| if verbose: |
| print(f"\n🔗 Step 3: Learning merges (target: {self.vocab_size} tokens)...") |
|
|
| num_merges = self.vocab_size - len(char_vocab) - len(self.special_tokens) |
| self.merges = [] |
|
|
| for i in range(num_merges): |
| pairs = self._get_corpus_pairs(corpus) |
| if not pairs: |
| if verbose: |
| print(f" No more pairs to merge at step {i}") |
| break |
|
|
| best_pair = pairs.most_common(1)[0] |
| if best_pair[1] < min_frequency: |
| if verbose: |
| print(f" Stopping at step {i}: min frequency {min_frequency} reached") |
| break |
|
|
| pair = best_pair[0] |
| self.merges.append(pair) |
| corpus = self._merge_pair(pair, corpus) |
|
|
| if verbose and (i + 1) % 500 == 0: |
| merged_token = pair[0] + pair[1] |
| print(f" Merge {i + 1}/{num_merges}: '{pair[0]}' + '{pair[1]}' → '{merged_token}' (freq: {best_pair[1]})") |
|
|
| if verbose: |
| print(f" Total merges learned: {len(self.merges)}") |
|
|
| |
| if verbose: |
| print("\n📚 Step 4: Building final vocabulary...") |
|
|
| self.vocab = dict(self.special_tokens) |
| idx = len(self.special_tokens) |
|
|
| |
| for char in sorted(char_vocab): |
| if char not in self.vocab: |
| self.vocab[char] = idx |
| idx += 1 |
|
|
| |
| for pair in self.merges: |
| merged = pair[0] + pair[1] |
| if merged not in self.vocab: |
| self.vocab[merged] = idx |
| idx += 1 |
|
|
| self.inverse_vocab = {v: k for k, v in self.vocab.items()} |
| self._merge_priority = {pair: i for i, pair in enumerate(self.merges)} |
|
|
| if verbose: |
| print(f" Final vocab size: {len(self.vocab)}") |
| print("\n✅ Training complete!") |
| print("=" * 60) |
|
|
| def _apply_merges(self, tokens: List[str]) -> List[str]: |
| """Apply learned merge rules to a list of tokens using greedy-by-priority.""" |
| while len(tokens) >= 2: |
| |
| best_pair = None |
| best_rank = float('inf') |
| for i in range(len(tokens) - 1): |
| pair = (tokens[i], tokens[i + 1]) |
| rank = self._merge_priority.get(pair, float('inf')) |
| if rank < best_rank: |
| best_rank = rank |
| best_pair = pair |
| if best_pair is None or best_rank == float('inf'): |
| break |
| |
| new_tokens = [] |
| i = 0 |
| while i < len(tokens): |
| if i < len(tokens) - 1 and tokens[i] == best_pair[0] and tokens[i + 1] == best_pair[1]: |
| new_tokens.append(best_pair[0] + best_pair[1]) |
| i += 2 |
| else: |
| new_tokens.append(tokens[i]) |
| i += 1 |
| tokens = new_tokens |
| return tokens |
|
|
| def encode(self, text: str) -> List[int]: |
| """ |
| Encode text to token IDs. |
| |
| Args: |
| text: Input text string |
| |
| Returns: |
| List of token IDs |
| """ |
| text_input = text.lower() if self.do_lower_case else text |
| words = self._pre_tokenize(text_input) |
| ids = [] |
|
|
| for word in words: |
| chars = list(word) |
| tokens = self._apply_merges(chars) |
| for token in tokens: |
| if token in self.vocab: |
| ids.append(self.vocab[token]) |
| else: |
| ids.append(self.special_tokens["<UNK>"]) |
|
|
| return ids |
|
|
| def decode(self, ids: List[int]) -> str: |
| """ |
| Decode token IDs back to text. |
| |
| Args: |
| ids: List of token IDs |
| |
| Returns: |
| Decoded text string |
| """ |
| tokens = [] |
| for token_id in ids: |
| if token_id in self.inverse_vocab: |
| tokens.append(self.inverse_vocab[token_id]) |
| else: |
| tokens.append("<UNK>") |
| return "".join(tokens) |
|
|
| def tokenize(self, text: str) -> List[str]: |
| """ |
| Tokenize text into subword tokens (string form). |
| |
| Args: |
| text: Input text string |
| |
| Returns: |
| List of token strings |
| """ |
| text_input = text.lower() if self.do_lower_case else text |
| words = self._pre_tokenize(text_input) |
| all_tokens = [] |
|
|
| for word in words: |
| chars = list(word) |
| tokens = self._apply_merges(chars) |
| all_tokens.extend(tokens) |
|
|
| return all_tokens |
|
|
| def save(self, directory: str): |
| """Save tokenizer to directory (HuggingFace compatible format).""" |
| os.makedirs(directory, exist_ok=True) |
|
|
| |
| with open(os.path.join(directory, "vocab.json"), "w", encoding="utf-8") as f: |
| json.dump(self.vocab, f, ensure_ascii=False, indent=2) |
|
|
| |
| with open(os.path.join(directory, "merges.txt"), "w", encoding="utf-8") as f: |
| f.write("#version: 0.3\n") |
| for pair in self.merges: |
| a = pair[0].replace(' ', '\u2581') |
| b = pair[1].replace(' ', '\u2581') |
| f.write(f"{a} {b}\n") |
|
|
| |
| config = { |
| "tokenizer_class": "BPETokenizer", |
| "vocab_size": len(self.vocab), |
| "model_type": "bpe", |
| "special_tokens": self.special_tokens, |
| "do_lower_case": self.do_lower_case, |
| "language": "id", |
| } |
| with open(os.path.join(directory, "tokenizer_config.json"), "w", encoding="utf-8") as f: |
| json.dump(config, f, ensure_ascii=False, indent=2) |
|
|
| |
| special_map = { |
| "pad_token": "<PAD>", |
| "unk_token": "<UNK>", |
| "bos_token": "<BOS>", |
| "eos_token": "<EOS>", |
| } |
| with open(os.path.join(directory, "special_tokens_map.json"), "w", encoding="utf-8") as f: |
| json.dump(special_map, f, ensure_ascii=False, indent=2) |
|
|
| |
| hf_tokenizer = { |
| "version": "1.0", |
| "model": { |
| "type": "BPE", |
| "vocab": self.vocab, |
| "merges": [ |
| f"{p[0].replace(' ', chr(0x2581))} {p[1].replace(' ', chr(0x2581))}" |
| for p in self.merges |
| ], |
| }, |
| "pre_tokenizer": { |
| "type": "Split", |
| "pattern": {"Regex": self.pattern.pattern}, |
| "behavior": "Isolated", |
| }, |
| "decoder": { |
| "type": "Fuse", |
| }, |
| "added_tokens": [ |
| {"id": v, "content": k, "special": True} |
| for k, v in self.special_tokens.items() |
| ], |
| } |
| if self.do_lower_case: |
| hf_tokenizer["normalizer"] = {"type": "Lowercase"} |
| with open(os.path.join(directory, "tokenizer.json"), "w", encoding="utf-8") as f: |
| json.dump(hf_tokenizer, f, ensure_ascii=False, indent=2) |
|
|
| print(f"💾 Tokenizer saved to: {directory}") |
|
|
| @classmethod |
| def from_pretrained(cls, directory: str) -> "BPETokenizer": |
| """Load tokenizer from directory.""" |
| tokenizer = cls() |
|
|
| |
| with open(os.path.join(directory, "vocab.json"), "r", encoding="utf-8") as f: |
| tokenizer.vocab = json.load(f) |
|
|
| tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()} |
|
|
| |
| tokenizer.merges = [] |
| with open(os.path.join(directory, "merges.txt"), "r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if line and not line.startswith("#"): |
| if "\t" in line: |
| |
| parts = line.split("\t") |
| if len(parts) == 2: |
| a = json.loads(parts[0]) |
| b = json.loads(parts[1]) |
| tokenizer.merges.append((a, b)) |
| else: |
| |
| parts = line.split(" ", 1) |
| if len(parts) == 2: |
| a = parts[0].replace('\u2581', ' ') |
| b = parts[1].replace('\u2581', ' ') |
| tokenizer.merges.append((a, b)) |
|
|
| tokenizer._merge_priority = {pair: i for i, pair in enumerate(tokenizer.merges)} |
|
|
| |
| with open(os.path.join(directory, "tokenizer_config.json"), "r", encoding="utf-8") as f: |
| config = json.load(f) |
| tokenizer.special_tokens = config.get("special_tokens", tokenizer.special_tokens) |
| tokenizer.vocab_size = config.get("vocab_size", len(tokenizer.vocab)) |
| tokenizer.do_lower_case = config.get("do_lower_case", True) |
|
|
| print(f"✅ Tokenizer loaded from: {directory}") |
| return tokenizer |
|
|
|
|
| if __name__ == "__main__": |
| |
| tokenizer = BPETokenizer(vocab_size=1000) |
|
|
| sample_texts = [ |
| "Saya suka makan nasi goreng di Jakarta", |
| "Indonesia adalah negara kepulauan terbesar di dunia", |
| ] |
|
|
| tokenizer.train(sample_texts, min_frequency=1) |
|
|
| test = "saya makan nasi goreng" |
| tokens = tokenizer.tokenize(test) |
| ids = tokenizer.encode(test) |
| decoded = tokenizer.decode(ids) |
|
|
| print(f"\nInput: {test}") |
| print(f"Tokens: {tokens}") |
| print(f"IDs: {ids}") |
| print(f"Decoded: {decoded}") |
|
|