| | |
| | """ |
| | BPE Tokenization implementation for better language understanding |
| | """ |
| |
|
| | import re |
| | from collections import defaultdict, Counter |
| | import pickle |
| | import os |
| |
|
| | class BPETokenizer: |
| | """Byte Pair Encoding tokenizer for better language modeling""" |
| |
|
| | def __init__(self, vocab_size=5000): |
| | self.vocab_size = vocab_size |
| | self.merges = {} |
| | self.vocab = None |
| | self.inverse_vocab = None |
| |
|
| | def get_stats(self, ids): |
| | """Count pairs of consecutive symbols""" |
| | counts = defaultdict(int) |
| | for pair in zip(ids, ids[1:]): |
| | counts[pair] += 1 |
| | return counts |
| |
|
| | def merge(self, ids, pair, idx): |
| | """Merge pair into single token""" |
| | new_ids = [] |
| | i = 0 |
| | while i < len(ids): |
| | if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]: |
| | new_ids.append(idx) |
| | i += 2 |
| | else: |
| | new_ids.append(ids[i]) |
| | i += 1 |
| | return new_ids |
| |
|
| | def train(self, text, verbose=False): |
| | """Train BPE tokenizer on text""" |
| | |
| | tokens = list(text.encode('utf-8')) |
| |
|
| | |
| | vocab = {idx: bytes([idx]) for idx in range(256)} |
| | num_merges = self.vocab_size - 256 |
| |
|
| | ids = list(tokens) |
| |
|
| | for i in range(num_merges): |
| | stats = self.get_stats(ids) |
| | if not stats: |
| | break |
| |
|
| | |
| | pair = max(stats, key=stats.get) |
| | idx = 256 + i |
| |
|
| | |
| | ids = self.merge(ids, pair, idx) |
| |
|
| | |
| | vocab[idx] = vocab[pair[0]] + vocab[pair[1]] |
| |
|
| | |
| | self.merges[pair] = idx |
| |
|
| | if verbose and i % 100 == 0: |
| | print(f"Merge {i+1}/{num_merges}: {pair} -> {idx}") |
| |
|
| | |
| | self.vocab = vocab |
| | self.inverse_vocab = {v: k for k, v in vocab.items()} |
| |
|
| | print(f"โ
BPE Tokenizer trained with {len(self.vocab)} tokens") |
| |
|
| | def encode(self, text): |
| | """Encode text to token IDs""" |
| | tokens = list(text.encode('utf-8')) |
| |
|
| | |
| | ids = tokens[:] |
| | for pair, idx in self.merges.items(): |
| | ids = self.merge(ids, pair, idx) |
| |
|
| | return ids |
| |
|
| | def decode(self, ids): |
| | """Decode token IDs to text""" |
| | tokens = b"" |
| | for idx in ids: |
| | if idx in self.vocab: |
| | tokens += self.vocab[idx] |
| | else: |
| | |
| | tokens += b"?" |
| |
|
| | try: |
| | return tokens.decode('utf-8', errors='replace') |
| | except: |
| | return str(tokens) |
| |
|
| | def save(self, path): |
| | """Save tokenizer""" |
| | with open(path, 'wb') as f: |
| | pickle.dump({ |
| | 'merges': self.merges, |
| | 'vocab': self.vocab, |
| | 'vocab_size': self.vocab_size |
| | }, f) |
| |
|
| | def load(self, path): |
| | """Load tokenizer""" |
| | with open(path, 'rb') as f: |
| | data = pickle.load(f) |
| | self.merges = data['merges'] |
| | self.vocab = data['vocab'] |
| | self.vocab_size = data['vocab_size'] |
| | self.inverse_vocab = {v: k for k, v in self.vocab.items()} |
| |
|
| | def create_optimized_tokenizer(text, vocab_size=8000): |
| | """Create and train optimized BPE tokenizer""" |
| | print(f"๐๏ธ Training BPE tokenizer with vocab_size={vocab_size}...") |
| |
|
| | tokenizer = BPETokenizer(vocab_size=vocab_size) |
| | tokenizer.train(text, verbose=True) |
| |
|
| | |
| | test_text = "Hello, how are you doing today?" |
| | encoded = tokenizer.encode(test_text) |
| | decoded = tokenizer.decode(encoded) |
| |
|
| | print(f"๐ Test encoding: '{test_text}'") |
| | print(f"๐ข Encoded: {encoded[:20]}... ({len(encoded)} tokens)") |
| | print(f"๐ Decoded: '{decoded}'") |
| |
|
| | |
| | char_tokens = len(test_text.encode('utf-8')) |
| | bpe_tokens = len(encoded) |
| | compression = char_tokens / bpe_tokens |
| | print(f"๐๏ธ Compression ratio: {compression:.2f}x") |
| |
|
| | return tokenizer |
| |
|
| | if __name__ == "__main__": |
| | |
| | sample_text = "Hello world! This is a test of the BPE tokenizer. " * 100 |
| | tokenizer = create_optimized_tokenizer(sample_text) |
| | tokenizer.save("bpe_tokenizer.pkl") |
| |
|