| """ |
| bpe_tokenizer.py |
| ================ |
| Byte Pair Encoding (BPE) algorithm implemented from scratch in pure Python. |
| |
| This module is part of the project: |
| "A bilingual PT+EN LLM with BPE tokenizer and training loop |
| implemented from scratch, with didactic and documented code" |
| |
| Author : AndrΓ© Costa |
| License : MIT |
| |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| THEORETICAL BACKGROUND |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| |
| What is tokenization? |
| --------------------- |
| Language models do not operate on raw characters or whole words β |
| they operate on *tokens*, intermediate text units. Tokenization is |
| the process of converting text into sequences of integers that the |
| model can process. |
| |
| Text β Tokens β Integer IDs β Embeddings β Model |
| |
| Why not use whole words? |
| ------------------------ |
| Word-level vocabularies have two serious problems: |
| |
| 1. Huge vocabulary: Portuguese and English together have hundreds |
| of thousands of words. Each would need its own embedding β |
| infeasible for small models. |
| |
| 2. Unknown words (OOV - Out of Vocabulary): any word not seen |
| during training produces an <UNK> token, losing semantic |
| information. |
| |
| Why not use individual characters? |
| ------------------------------------ |
| Character vocabularies solve OOV, but produce very long sequences. |
| The sentence "Hello world" becomes 11 tokens instead of 2. |
| Long sequences increase computational cost quadratically in the |
| Transformer attention mechanism (O(nΒ²)). |
| |
| BPE as a compromise |
| --------------------- |
| Byte Pair Encoding (Gage, 1994; Sennrich et al., 2016) finds a |
| middle ground: it starts with characters and iteratively merges the |
| most frequent pairs, building a subword vocabulary. |
| |
| "learning" β ["learn", "ing"] |
| "learned" β ["learn", "ed"] |
| "learnable" β ["learn", "able"] |
| |
| The prefix "learn" is shared β the model learns morphology |
| naturally, without explicit supervision. |
| |
| References: |
| - Gage, P. (1994). A new algorithm for data compression. |
| C Users Journal, 12(2), 23-38. |
| - Sennrich, R., Haddow, B., & Birch, A. (2016). Neural machine |
| translation of rare words with subword units. ACL 2016. |
| - Radford, A. et al. (2019). Language models are unsupervised |
| multitask learners. (GPT-2 β popularized BPE in LLMs) |
| |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| BPE ALGORITHM β OVERVIEW |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| |
| Training (offline, done once on the corpus): |
| 1. Encode each byte of the corpus as an initial token (base vocab = 256) |
| 2. Count the frequency of all adjacent token pairs |
| 3. Select the most frequent pair (p_max) |
| 4. Create a new token = merge of p_max |
| 5. Replace all occurrences of p_max with the new token |
| 6. Repeat steps 2β5 until reaching the desired vocab_size |
| |
| Encoding (online, for each new text): |
| 1. Convert text to bytes |
| 2. Apply learned merges in the order they were learned |
| 3. Return the sequence of IDs |
| |
| Decoding: |
| 1. Convert IDs back to bytes using the vocabulary |
| 2. Decode the bytes as UTF-8 |
| """ |
|
|
| |
| |
| |
| |
| import os |
| import json |
| import regex |
| from collections import defaultdict |
| from typing import Optional |
|
|
|
|
| |
| |
| |
|
|
| def get_pairs(ids: list[int]) -> dict[tuple[int, int], int]: |
| """ |
| Count the frequency of all adjacent pairs in a sequence. |
| |
| This is the central operation of BPE. For each position i in the |
| sequence, forms the pair (ids[i], ids[i+1]) and increments its count. |
| |
| Example: |
| ids = [1, 2, 3, 2, 1, 2] |
| returns: {(1,2): 2, (2,3): 1, (3,2): 1, (2,1): 1} |
| |
| Complexity: O(n), where n = len(ids) |
| |
| Args: |
| ids: Sequence of token IDs. |
| |
| Returns: |
| Dictionary mapping each pair to its frequency. |
| """ |
| counts: dict[tuple[int, int], int] = defaultdict(int) |
| for pair in zip(ids, ids[1:]): |
| counts[pair] += 1 |
| return counts |
|
|
|
|
| def merge_sequence(ids: list[int], pair: tuple[int, int], new_id: int) -> list[int]: |
| """ |
| Replace all occurrences of `pair` in `ids` with token `new_id`. |
| |
| This function implements the "merge" step of BPE. It scans the |
| sequence once from left to right, replacing each occurrence of the |
| target pair with the new token. |
| |
| Example: |
| ids = [1, 2, 3, 1, 2] |
| pair = (1, 2) |
| new_id = 99 |
| returns: [99, 3, 99] |
| |
| Note: Replacement is non-overlapping. The sequence (1,2,1,2) with |
| pair=(1,2) results in [99, 99], not [1, 99, 2] or [99, 1, 2]. |
| |
| Complexity: O(n), where n = len(ids) |
| |
| Args: |
| ids: Original sequence of IDs. |
| pair: Token pair to merge (a, b). |
| new_id: ID of the new token resulting from the merge. |
| |
| Returns: |
| New sequence with merges applied. |
| """ |
| result: list[int] = [] |
| i = 0 |
| while i < len(ids): |
| |
| if i < len(ids) - 1 and ids[i] == pair[0] and ids[i + 1] == pair[1]: |
| result.append(new_id) |
| i += 2 |
| else: |
| result.append(ids[i]) |
| i += 1 |
| return result |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| GPT4_SPLIT_PATTERN = regex.compile( |
| r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?\p{L}+|\p{N}{1,3}| ?[^\s\p{L}\p{N}]+[\r\n]*|\s*[\r\n]+|\s+(?!\S)|\s+""" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class BPETokenizer: |
| """ |
| Byte Pair Encoding (BPE) tokenizer implemented from scratch. |
| |
| This implementation operates directly on UTF-8 bytes, which guarantees: |
| - Full coverage of any Unicode text (PT, EN, emojis, etc.) |
| - Fixed base vocabulary of exactly 256 tokens (one per byte) |
| - No <UNK> tokens β any text is encodable |
| |
| Public attributes: |
| vocab_size (int): Total vocabulary size after training. |
| merges (dict): Table of learned merges. Maps |
| (id_a, id_b) β id_new. |
| vocab (dict): Full vocabulary. Maps id β bytes. |
| |
| Basic usage: |
| >>> tokenizer = BPETokenizer(vocab_size=1000) |
| >>> tokenizer.train(["Hello world. OlΓ‘ mundo."]) |
| >>> ids = tokenizer.encode("Hello") |
| >>> tokenizer.decode(ids) |
| 'Hello' |
| """ |
|
|
| def __init__(self, vocab_size: int = 16384): |
| """ |
| Initialize the tokenizer. |
| |
| The base vocabulary always starts with the 256 possible bytes (0β255). |
| The number of merges to be learned is vocab_size - 256. |
| |
| Args: |
| vocab_size: Desired final vocabulary size. |
| Typical values: 4096, 8192, 16384, 32768. |
| Must be greater than 256. |
| |
| Raises: |
| ValueError: If vocab_size <= 256. |
| """ |
| if vocab_size <= 256: |
| raise ValueError( |
| f"vocab_size must be greater than 256 (byte base vocabulary). " |
| f"Received: {vocab_size}" |
| ) |
|
|
| self.vocab_size: int = vocab_size |
|
|
| |
| |
| |
| |
| self.merges: dict[tuple[int, int], int] = {} |
|
|
| |
| |
| self.vocab: dict[int, bytes] = {i: bytes([i]) for i in range(256)} |
|
|
| |
| self._split_pattern = GPT4_SPLIT_PATTERN |
|
|
| |
| |
| |
|
|
| def train(self, corpus: list[str], verbose: bool = False) -> None: |
| """ |
| Train the BPE tokenizer on a text corpus. |
| |
| Training executes `vocab_size - 256` merge iterations. |
| In each iteration: |
| 1. Count all adjacent pairs in the tokenized corpus |
| 2. Select the most frequent pair |
| 3. Record the merge in self.merges |
| 4. Update self.vocab with the new token |
| 5. Apply the merge to the corpus (in-place) |
| |
| Total complexity: O(N Γ M), where: |
| N = total number of tokens in the corpus (decreases each merge) |
| M = number of merges = vocab_size - 256 |
| |
| Args: |
| corpus: List of strings forming the training corpus. |
| Example: ["Text in Portuguese.", "Text in English."] |
| verbose: If True, prints progress after each merge. |
| |
| Example: |
| >>> tok = BPETokenizer(vocab_size=300) |
| >>> tok.train(["abracadabra " * 100], verbose=True) |
| Merge 1/44 | pair: (b'a', b'b') β token 256 | freq: 200 |
| ... |
| """ |
| num_merges = self.vocab_size - 256 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| ids_per_chunk: list[list[int]] = [] |
| for text in corpus: |
| words = regex.findall(self._split_pattern, text) |
| for word in words: |
| word_bytes = word.encode("utf-8") |
| ids_per_chunk.append(list(word_bytes)) |
|
|
| if verbose: |
| total_tokens = sum(len(chunk) for chunk in ids_per_chunk) |
| print(f"Pre-tokenization complete.") |
| print(f" Chunks (words): {len(ids_per_chunk)}") |
| print(f" Total initial tokens (bytes): {total_tokens}") |
| print(f" Merges to perform: {num_merges}\n") |
|
|
| |
| for merge_idx in range(num_merges): |
|
|
| |
| pair_counts: dict[tuple[int, int], int] = defaultdict(int) |
| for chunk_ids in ids_per_chunk: |
| chunk_pairs = get_pairs(chunk_ids) |
| for pair, count in chunk_pairs.items(): |
| pair_counts[pair] += count |
|
|
| |
| if not pair_counts: |
| if verbose: |
| print(f"Warning: corpus exhausted after {merge_idx} merges.") |
| break |
|
|
| |
| best_pair = max(pair_counts, key=lambda p: pair_counts[p]) |
| best_freq = pair_counts[best_pair] |
|
|
| |
| new_id = 256 + merge_idx |
|
|
| |
| self.merges[best_pair] = new_id |
|
|
| |
| |
| self.vocab[new_id] = self.vocab[best_pair[0]] + self.vocab[best_pair[1]] |
|
|
| |
| ids_per_chunk = [ |
| merge_sequence(chunk, best_pair, new_id) |
| for chunk in ids_per_chunk |
| ] |
|
|
| if verbose: |
| token_str_a = self.vocab[best_pair[0]] |
| token_str_b = self.vocab[best_pair[1]] |
| print( |
| f"Merge {merge_idx + 1:>5}/{num_merges} | " |
| f"pair: ({token_str_a!r}, {token_str_b!r}) " |
| f"β token {new_id} | " |
| f"freq: {best_freq}" |
| ) |
|
|
| if verbose: |
| total_after = sum(len(chunk) for chunk in ids_per_chunk) |
| print(f"\nTraining complete.") |
| print(f" Final vocabulary: {len(self.vocab)} tokens") |
| print(f" Total tokens after merges: {total_after}") |
|
|
| |
| |
| |
|
|
| def encode(self, text: str) -> list[int]: |
| """ |
| Convert a string into a sequence of token IDs. |
| |
| The encoding process follows these steps: |
| 1. Split text into chunks via pre-tokenization (regex) |
| 2. Convert each chunk to bytes β list of IDs (0β255) |
| 3. Apply learned merges in order to each chunk |
| 4. Concatenate IDs from all chunks |
| |
| Applying merges in order is crucial: merges learned first have |
| priority. This ensures consistency with training. |
| |
| Args: |
| text: Text to encode. Can be any UTF-8 string. |
| |
| Returns: |
| List of integers representing the tokens. |
| |
| Raises: |
| RuntimeError: If the tokenizer has not been trained (empty merges). |
| |
| Example: |
| >>> tok.encode("Hello") |
| [323, 195] # IDs depend on training |
| """ |
| if not self.merges: |
| raise RuntimeError( |
| "The tokenizer has not been trained. " |
| "Call .train() before .encode()." |
| ) |
|
|
| all_ids: list[int] = [] |
|
|
| chunks = regex.findall(self._split_pattern, text) |
|
|
| for chunk in chunks: |
| |
| chunk_ids = list(chunk.encode("utf-8")) |
|
|
| |
| while len(chunk_ids) >= 2: |
| pairs = get_pairs(chunk_ids) |
|
|
| |
| |
| best_pair = min( |
| pairs, |
| key=lambda p: self.merges.get(p, float("inf")) |
| ) |
|
|
| |
| if best_pair not in self.merges: |
| break |
|
|
| new_id = self.merges[best_pair] |
| chunk_ids = merge_sequence(chunk_ids, best_pair, new_id) |
|
|
| all_ids.extend(chunk_ids) |
|
|
| return all_ids |
|
|
| |
| |
| |
|
|
| def decode(self, ids: list[int]) -> str: |
| """ |
| Convert a sequence of IDs back to a string. |
| |
| Each ID is mapped to its byte sequence via self.vocab, |
| and the bytes are concatenated and decoded as UTF-8. |
| |
| Note on UTF-8 errors: |
| Individual tokens may correspond to incomplete bytes |
| (e.g., the first half of a 2-byte UTF-8 character). |
| Therefore, we concatenate ALL bytes before decoding, |
| and use errors="replace" to handle invalid sequences |
| that may arise from out-of-context IDs. |
| |
| Args: |
| ids: Sequence of IDs to decode. |
| |
| Returns: |
| Decoded string. |
| |
| Example: |
| >>> tok.decode([323, 195]) |
| 'Hello' |
| """ |
| raw_bytes = b"".join(self.vocab[i] for i in ids) |
| return raw_bytes.decode("utf-8", errors="replace") |
|
|
| |
| |
| |
|
|
| def save(self, path: str) -> None: |
| """ |
| Save the trained tokenizer to disk. |
| |
| Creates two files in directory `path`: |
| tokenizer.json β metadata and merge table (human-readable) |
| vocab.json β full vocabulary id β byte representation |
| |
| JSON format was chosen for being readable, portable and compatible |
| with the HuggingFace ecosystem (tokenizers library). |
| |
| Structure of tokenizer.json: |
| { |
| "vocab_size": int, |
| "num_merges": int, |
| "merges": [[id_a, id_b, id_new], ...] |
| } |
| |
| Args: |
| path: Directory path where files will be saved. |
| Created if it does not exist. |
| """ |
| os.makedirs(path, exist_ok=True) |
|
|
| merges_list = [ |
| [int(a), int(b), int(new_id)] |
| for (a, b), new_id in self.merges.items() |
| ] |
|
|
| tokenizer_data = { |
| "vocab_size": self.vocab_size, |
| "num_merges": len(self.merges), |
| "merges": merges_list, |
| } |
|
|
| with open(os.path.join(path, "tokenizer.json"), "w", encoding="utf-8") as f: |
| json.dump(tokenizer_data, f, indent=2, ensure_ascii=False) |
|
|
| vocab_data = { |
| str(token_id): list(token_bytes) |
| for token_id, token_bytes in self.vocab.items() |
| } |
|
|
| with open(os.path.join(path, "vocab.json"), "w", encoding="utf-8") as f: |
| json.dump(vocab_data, f, indent=2, ensure_ascii=False) |
|
|
| print(f"Tokenizer saved to '{path}/'") |
| print(f" tokenizer.json β {len(self.merges)} merges") |
| print(f" vocab.json β {len(self.vocab)} tokens") |
|
|
| @classmethod |
| def load(cls, path: str) -> "BPETokenizer": |
| """ |
| Load a previously saved tokenizer. |
| |
| Class method (factory method): creates a new instance and fills |
| it with data loaded from disk, without needing to re-train. |
| |
| Args: |
| path: Directory where files were saved by .save(). |
| |
| Returns: |
| Ready-to-use BPETokenizer instance. |
| |
| Raises: |
| FileNotFoundError: If files do not exist at the given path. |
| |
| Example: |
| >>> tok = BPETokenizer.load("./my_tokenizer") |
| >>> tok.encode("Hello world") |
| """ |
| tokenizer_path = os.path.join(path, "tokenizer.json") |
| vocab_path = os.path.join(path, "vocab.json") |
|
|
| with open(tokenizer_path, "r", encoding="utf-8") as f: |
| tokenizer_data = json.load(f) |
|
|
| with open(vocab_path, "r", encoding="utf-8") as f: |
| vocab_data = json.load(f) |
|
|
| tokenizer = cls(vocab_size=tokenizer_data["vocab_size"]) |
|
|
| for a, b, new_id in tokenizer_data["merges"]: |
| tokenizer.merges[(int(a), int(b))] = int(new_id) |
|
|
| tokenizer.vocab = { |
| int(token_id): bytes(token_bytes) |
| for token_id, token_bytes in vocab_data.items() |
| } |
|
|
| print(f"Tokenizer loaded from '{path}/'") |
| print(f" vocab_size : {tokenizer.vocab_size}") |
| print(f" merges : {len(tokenizer.merges)}") |
|
|
| return tokenizer |
|
|
| |
| |
| |
|
|
| def token_to_str(self, token_id: int) -> str: |
| """ |
| Return the human-readable representation of a token by its ID. |
| |
| Useful for inspecting the vocabulary and understanding which |
| subwords the tokenizer has learned. |
| |
| Args: |
| token_id: ID of the token to inspect. |
| |
| Returns: |
| String representing the token bytes (decoded if possible). |
| """ |
| token_bytes = self.vocab.get(token_id, b"<unknown>") |
| try: |
| return token_bytes.decode("utf-8") |
| except UnicodeDecodeError: |
| return repr(token_bytes) |
|
|
| def vocab_stats(self) -> None: |
| """ |
| Print statistics about the trained vocabulary. |
| |
| Displays the 20 longest learned tokens, which generally |
| correspond to words or subwords that are very frequent in the corpus. |
| """ |
| print(f"\n{'='*50}") |
| print(f" BPE Vocabulary Statistics") |
| print(f"{'='*50}") |
| print(f" vocab_size : {self.vocab_size}") |
| print(f" base tokens : 256 (bytes 0β255)") |
| print(f" merges : {len(self.merges)}") |
| print(f"\n 20 longest tokens (frequent subwords):") |
|
|
| sorted_vocab = sorted( |
| [(tid, tb) for tid, tb in self.vocab.items() if tid >= 256], |
| key=lambda x: len(x[1]), |
| reverse=True |
| ) |
|
|
| for token_id, token_bytes in sorted_vocab[:20]: |
| try: |
| readable = token_bytes.decode("utf-8") |
| except UnicodeDecodeError: |
| readable = repr(token_bytes) |
| print(f" [{token_id:>6}] {repr(readable):<30} ({len(token_bytes)} bytes)") |
|
|
| print(f"{'='*50}\n") |
|
|
| def __repr__(self) -> str: |
| status = "trained" if self.merges else "not trained" |
| return ( |
| f"BPETokenizer(" |
| f"vocab_size={self.vocab_size}, " |
| f"merges={len(self.merges)}, " |
| f"status='{status}')" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description="BPE Tokenizer β train and validate") |
| parser.add_argument( |
| "--demo", |
| action="store_true", |
| help="Run a quick demo with a small vocab (320 tokens). " |
| "Does NOT produce a tokenizer suitable for training." |
| ) |
| args = parser.parse_args() |
|
|
| |
| |
| |
| |
| |
| if args.demo: |
| print("=" * 60) |
| print(" BPETokenizer β Demo mode (vocab_size=320)") |
| print(" NOTE: this tokenizer is for illustration only.") |
| print(" Run without --demo to produce the real tokenizer.") |
| print("=" * 60) |
|
|
| corpus_demo = [ |
| |
| "aprendizado de mΓ‘quina Γ© fascinante. " |
| "redes neurais aprendem padrΓ΅es complexos. " |
| "o modelo aprende a linguagem naturalmente. " |
| "aprender, aprendendo, aprendizado, aprendiz. ", |
| |
| "machine learning is fascinating. " |
| "neural networks learn complex patterns. " |
| "the model learns language naturally. " |
| "learn, learning, learned, learner. ", |
| ] * 50 |
|
|
| tokenizer = BPETokenizer(vocab_size=320) |
| tokenizer.train(corpus_demo, verbose=True) |
| tokenizer.vocab_stats() |
|
|
| tests = [ |
| "aprendizado", "learning", |
| "redes neurais", "neural networks", |
| "OlΓ‘, mundo!", "Hello, world!", |
| ] |
|
|
| print("Encode/decode tests:") |
| print("-" * 50) |
| for text in tests: |
| ids = tokenizer.encode(text) |
| decoded = tokenizer.decode(ids) |
| tokens = [tokenizer.token_to_str(i) for i in ids] |
| print(f" Text : {repr(text)}") |
| print(f" IDs : {ids}") |
| print(f" Tokens : {tokens}") |
| print(f" Decoded : {repr(decoded)}") |
| print(f" OK : {text == decoded}") |
| print() |
|
|
| print("Demo complete. No files were saved.") |
| print("Run 'python bpe_tokenizer.py' (without --demo) to train the real tokenizer.") |
|
|
| |
| |
| |
| |
| else: |
| print("=" * 60) |
| print(" BPETokenizer β Training (vocab_size=16384)") |
| print(" Output: ./tokenizer/") |
| print("=" * 60) |
|
|
| corpus_production = [ |
| |
| "aprendizado de mΓ‘quina Γ© fascinante. " |
| "redes neurais aprendem padrΓ΅es complexos. " |
| "o modelo aprende a linguagem naturalmente. " |
| "aprender, aprendendo, aprendizado, aprendiz. " |
| "o brasil Γ© um paΓs de dimensΓ΅es continentais. " |
| "a lΓngua portuguesa Γ© falada em vΓ‘rios paΓses. " |
| "ciΓͺncia de dados e inteligΓͺncia artificial. " |
| "processamento de linguagem natural em portuguΓͺs. ", |
| |
| "machine learning is fascinating. " |
| "neural networks learn complex patterns. " |
| "the model learns language naturally. " |
| "learn, learning, learned, learner. " |
| "artificial intelligence and data science. " |
| "natural language processing and transformers. " |
| "deep learning models require large datasets. " |
| "the quick brown fox jumps over the lazy dog. ", |
| ] * 500 |
|
|
| tokenizer = BPETokenizer(vocab_size=16384) |
| tokenizer.train(corpus_production, verbose=True) |
| tokenizer.vocab_stats() |
|
|
| |
| tokenizer.save("./tokenizer") |
|
|
| |
| print("\nValidating save/load round-trip...") |
| tokenizer2 = BPETokenizer.load("./tokenizer") |
|
|
| for text in ["machine learning", "aprendizado de mΓ‘quina", "OlΓ‘ mundo!"]: |
| ids = tokenizer2.encode(text) |
| decoded = tokenizer2.decode(ids) |
| status = "OK" if decoded == text else "FAIL" |
| print(f" [{status}] {repr(text)} β {ids[:5]}{'...' if len(ids) > 5 else ''} β {repr(decoded)}") |
|
|
| print("\nTokenizer ready. You can now run:") |
| print(" python data_pipeline.py --dry-run") |
|
|