File size: 4,480 Bytes
1c70d34 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | #!/usr/bin/env python3
"""
BPE Tokenization implementation for better language understanding
"""
import re
from collections import defaultdict, Counter
import pickle
import os
class BPETokenizer:
"""Byte Pair Encoding tokenizer for better language modeling"""
def __init__(self, vocab_size=5000):
self.vocab_size = vocab_size
self.merges = {}
self.vocab = None
self.inverse_vocab = None
def get_stats(self, ids):
"""Count pairs of consecutive symbols"""
counts = defaultdict(int)
for pair in zip(ids, ids[1:]):
counts[pair] += 1
return counts
def merge(self, ids, pair, idx):
"""Merge pair into single token"""
new_ids = []
i = 0
while i < len(ids):
if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]:
new_ids.append(idx)
i += 2
else:
new_ids.append(ids[i])
i += 1
return new_ids
def train(self, text, verbose=False):
"""Train BPE tokenizer on text"""
# Basic character-level tokenization first
tokens = list(text.encode('utf-8'))
# Build initial vocab (0-255 for bytes)
vocab = {idx: bytes([idx]) for idx in range(256)}
num_merges = self.vocab_size - 256
ids = list(tokens)
for i in range(num_merges):
stats = self.get_stats(ids)
if not stats:
break
# Find most frequent pair
pair = max(stats, key=stats.get)
idx = 256 + i
# Merge the pair
ids = self.merge(ids, pair, idx)
# Update vocab
vocab[idx] = vocab[pair[0]] + vocab[pair[1]]
# Save merge
self.merges[pair] = idx
if verbose and i % 100 == 0:
print(f"Merge {i+1}/{num_merges}: {pair} -> {idx}")
# Build final vocab and inverse vocab
self.vocab = vocab
self.inverse_vocab = {v: k for k, v in vocab.items()}
print(f"โ
BPE Tokenizer trained with {len(self.vocab)} tokens")
def encode(self, text):
"""Encode text to token IDs"""
tokens = list(text.encode('utf-8'))
# Apply merges in order
ids = tokens[:]
for pair, idx in self.merges.items():
ids = self.merge(ids, pair, idx)
return ids
def decode(self, ids):
"""Decode token IDs to text"""
tokens = b""
for idx in ids:
if idx in self.vocab:
tokens += self.vocab[idx]
else:
# Fallback for unknown tokens
tokens += b"?"
try:
return tokens.decode('utf-8', errors='replace')
except:
return str(tokens)
def save(self, path):
"""Save tokenizer"""
with open(path, 'wb') as f:
pickle.dump({
'merges': self.merges,
'vocab': self.vocab,
'vocab_size': self.vocab_size
}, f)
def load(self, path):
"""Load tokenizer"""
with open(path, 'rb') as f:
data = pickle.load(f)
self.merges = data['merges']
self.vocab = data['vocab']
self.vocab_size = data['vocab_size']
self.inverse_vocab = {v: k for k, v in self.vocab.items()}
def create_optimized_tokenizer(text, vocab_size=8000):
"""Create and train optimized BPE tokenizer"""
print(f"๐๏ธ Training BPE tokenizer with vocab_size={vocab_size}...")
tokenizer = BPETokenizer(vocab_size=vocab_size)
tokenizer.train(text, verbose=True)
# Test the tokenizer
test_text = "Hello, how are you doing today?"
encoded = tokenizer.encode(test_text)
decoded = tokenizer.decode(encoded)
print(f"๐ Test encoding: '{test_text}'")
print(f"๐ข Encoded: {encoded[:20]}... ({len(encoded)} tokens)")
print(f"๐ Decoded: '{decoded}'")
# Calculate compression ratio
char_tokens = len(test_text.encode('utf-8'))
bpe_tokens = len(encoded)
compression = char_tokens / bpe_tokens
print(f"๐๏ธ Compression ratio: {compression:.2f}x")
return tokenizer
if __name__ == "__main__":
# Test tokenizer
sample_text = "Hello world! This is a test of the BPE tokenizer. " * 100
tokenizer = create_optimized_tokenizer(sample_text)
tokenizer.save("bpe_tokenizer.pkl")
|