| """ |
| Entropy-Guided Vocabulary Construction Module. |
| |
| Implements Algorithm 3.1 from the XERV Crayon Engineering Treatise: |
| - Extract substring candidates up to SIMD limit (16 bytes) |
| - Calculate information gain with entropy reduction |
| - Select top-K candidates maximizing gain-to-cost ratio |
| |
| This is the production-grade implementation for building optimal vocabularies. |
| """ |
|
|
| import math |
| import hashlib |
| from collections import defaultdict |
| from typing import Dict, List, Tuple, Optional, Set |
| from dataclasses import dataclass |
|
|
| |
| MAX_TOKEN_LENGTH = 16 |
|
|
|
|
| @dataclass |
| class TokenCandidate: |
| """Scored vocabulary candidate.""" |
| token: str |
| frequency: int |
| entropy: float |
| information_gain: float |
| computational_cost: float |
| utility_score: float |
|
|
|
|
| class EntropyVocabBuilder: |
| """ |
| Production-grade entropy-guided vocabulary builder. |
| |
| Implements the mathematical optimization from Section 2.1 [cite: 129-135]: |
| - Entropy-bound sizing: V_optimal ≈ 2^(H(corpus) + ε) |
| - Information gain: Gain(s) = Frequency(s) × EntropyReduction(s) - Cost(s) |
| """ |
| |
| def __init__( |
| self, |
| target_size: int = 500000, |
| max_token_length: int = MAX_TOKEN_LENGTH, |
| min_frequency: int = 2, |
| special_tokens: Optional[List[str]] = None |
| ): |
| self.target_size = target_size |
| self.max_token_length = max_token_length |
| self.min_frequency = min_frequency |
| self.special_tokens = special_tokens or ["<PAD>", "<UNK>", "<BOS>", "<EOS>"] |
| |
| |
| self.corpus_entropy: float = 0.0 |
| self.optimal_vocab_size: int = 0 |
| |
| def construct_optimal_vocabulary( |
| self, |
| corpus: str, |
| progress_callback: Optional[callable] = None |
| ) -> List[str]: |
| """ |
| Implements Algorithm 3.1: Entropy-Guided Candidate Selection [cite: 126-135]. |
| |
| Args: |
| corpus: Training text corpus |
| progress_callback: Optional callback for progress reporting |
| |
| Returns: |
| Optimally ordered list of tokens for vocabulary |
| """ |
| if progress_callback: |
| progress_callback("Extracting candidates...") |
| |
| |
| candidates = self._extract_candidates(corpus) |
| |
| if progress_callback: |
| progress_callback(f"Extracted {len(candidates):,} unique candidates") |
| |
| |
| self.corpus_entropy = self._calculate_corpus_entropy(corpus) |
| self.optimal_vocab_size = self._calculate_optimal_size(self.corpus_entropy) |
| |
| if progress_callback: |
| progress_callback(f"Corpus entropy: {self.corpus_entropy:.4f} bits/char") |
| progress_callback(f"Optimal vocab size: {self.optimal_vocab_size:,}") |
| |
| |
| total_chars = len(corpus) |
| scored = self._score_candidates(candidates, total_chars) |
| |
| if progress_callback: |
| progress_callback(f"Scored {len(scored):,} candidates") |
| |
| |
| effective_size = min(self.target_size, self.optimal_vocab_size) |
| |
| |
| reserved = len(self.special_tokens) + 256 |
| available = effective_size - reserved |
| |
| |
| scored.sort(key=lambda x: x.utility_score, reverse=True) |
| |
| |
| vocab_tokens = list(self.special_tokens) |
| |
| |
| for i in range(256): |
| char = chr(i) |
| if char not in vocab_tokens and char.isprintable(): |
| vocab_tokens.append(char) |
| |
| |
| seen: Set[str] = set(vocab_tokens) |
| for candidate in scored[:available]: |
| if candidate.token not in seen: |
| vocab_tokens.append(candidate.token) |
| seen.add(candidate.token) |
| |
| if progress_callback: |
| progress_callback(f"Final vocabulary: {len(vocab_tokens):,} tokens") |
| |
| return vocab_tokens |
| |
| def _extract_candidates(self, corpus: str) -> Dict[str, int]: |
| """ |
| Sliding window extraction of all valid substrings [cite: 128]. |
| |
| Uses SIMD-aligned max length (16 bytes) for hardware optimization. |
| """ |
| candidates: Dict[str, int] = defaultdict(int) |
| corpus_bytes = corpus.encode('utf-8') |
| corpus_len = len(corpus) |
| |
| |
| byte_pos = 0 |
| for char_pos in range(corpus_len): |
| char = corpus[char_pos] |
| char_bytes = len(char.encode('utf-8')) |
| |
| |
| current_byte_len = 0 |
| for length in range(1, min(self.max_token_length + 1, corpus_len - char_pos + 1)): |
| end_char = corpus[char_pos:char_pos + length] |
| end_byte_len = len(end_char.encode('utf-8')) |
| |
| |
| if end_byte_len > self.max_token_length: |
| break |
| |
| candidates[end_char] += 1 |
| |
| byte_pos += char_bytes |
| |
| return candidates |
| |
| def _calculate_corpus_entropy(self, corpus: str) -> float: |
| """ |
| Calculate Shannon entropy of the corpus [cite: 93-96]. |
| |
| H(X) = -Σ p(x) log2(p(x)) |
| """ |
| char_counts: Dict[str, int] = defaultdict(int) |
| for char in corpus: |
| char_counts[char] += 1 |
| |
| total = len(corpus) |
| if total == 0: |
| return 0.0 |
| |
| entropy = 0.0 |
| for count in char_counts.values(): |
| p = count / total |
| if p > 0: |
| entropy -= p * math.log2(p) |
| |
| return entropy |
| |
| def _calculate_optimal_size(self, entropy: float, epsilon: float = 0.5) -> int: |
| """ |
| Calculate optimal vocabulary size from entropy [cite: 94]. |
| |
| V_optimal ≈ 2^(H(corpus) + ε) |
| |
| For English text (H ≈ 1.2 bits/char), this yields ~500k tokens. |
| """ |
| return int(2 ** (entropy + epsilon)) |
| |
| def _score_candidates( |
| self, |
| candidates: Dict[str, int], |
| total_chars: int |
| ) -> List[TokenCandidate]: |
| """ |
| Calculate information gain for each candidate [cite: 129-134]. |
| |
| Gain(s) = Frequency(s) × EntropyReduction(s) - ComputationalCost(s) |
| |
| Utility = (Gain × Compression) / Cost |
| """ |
| scored: List[TokenCandidate] = [] |
| |
| for token, freq in candidates.items(): |
| |
| if freq < self.min_frequency: |
| continue |
| |
| |
| if len(token) == 1 and not token.isalnum(): |
| continue |
| |
| |
| p_token = freq / total_chars |
| |
| |
| |
| if p_token > 0: |
| entropy = -math.log2(p_token) |
| else: |
| continue |
| |
| |
| |
| byte_length = len(token.encode('utf-8')) |
| comp_cost = byte_length * 0.1 + 1.0 |
| |
| |
| info_gain = entropy * freq |
| |
| |
| compression = byte_length * freq |
| |
| |
| |
| utility = ( |
| (info_gain * 0.4) + |
| (compression * 0.3) + |
| ((1.0 / comp_cost) * 0.3 * freq) |
| ) |
| |
| scored.append(TokenCandidate( |
| token=token, |
| frequency=freq, |
| entropy=entropy, |
| information_gain=info_gain, |
| computational_cost=comp_cost, |
| utility_score=utility |
| )) |
| |
| return scored |
| |
| def get_statistics(self) -> Dict: |
| """Return vocabulary construction statistics.""" |
| return { |
| "corpus_entropy": self.corpus_entropy, |
| "optimal_vocab_size": self.optimal_vocab_size, |
| "target_size": self.target_size, |
| "max_token_length": self.max_token_length, |
| "min_frequency": self.min_frequency |
| } |
|
|
|
|
| def construct_optimal_vocabulary( |
| corpus: str, |
| target_size: int = 500000, |
| min_frequency: int = 2 |
| ) -> List[str]: |
| """ |
| Convenience function for vocabulary construction. |
| |
| This is the main entry point for building an entropy-optimized vocabulary. |
| """ |
| builder = EntropyVocabBuilder( |
| target_size=target_size, |
| min_frequency=min_frequency |
| ) |
| return builder.construct_optimal_vocabulary(corpus) |
|
|
|
|
| def deterministic_sort_key(token: str, frequency: int) -> tuple: |
| """ |
| 4-Key Deterministic Sort Tuple [cite: 1040-1049]. |
| |
| Guarantees reproducible token ordering across environments: |
| 1. -frequency: High frequency first (for variable-byte encoding efficiency) |
| 2. len(bytes): Shortest tokens first |
| 3. token: Alphabetical ordering |
| 4. MD5 hash: Absolute determinism tie-breaker |
| """ |
| token_bytes = token.encode('utf-8') |
| return ( |
| -frequency, |
| len(token_bytes), |
| token, |
| hashlib.md5(token_bytes).hexdigest() |
| ) |
|
|
|
|
| def assign_stable_ids( |
| tokens: List[str], |
| frequencies: Optional[Dict[str, int]] = None |
| ) -> Dict[str, int]: |
| """ |
| Assign stable, deterministic IDs to tokens [cite: 1009-1051]. |
| |
| Reserved ID Ranges: |
| - 0-99: Special tokens (<PAD>, <UNK>, <BOS>, <EOS>) |
| - 100-355: ASCII byte values |
| - 356-9999: Common words |
| - 10000+: Subwords and rare tokens |
| """ |
| if frequencies is None: |
| frequencies = {t: 1 for t in tokens} |
| |
| |
| specials = ["<PAD>", "<UNK>", "<BOS>", "<EOS>"] |
| |
| |
| ascii_tokens = [t for t in tokens if len(t) == 1 and ord(t) < 256 and t not in specials] |
| regular_tokens = [t for t in tokens if t not in specials and t not in ascii_tokens] |
| |
| |
| regular_tokens.sort(key=lambda t: deterministic_sort_key(t, frequencies.get(t, 0))) |
| |
| |
| token_to_id: Dict[str, int] = {} |
| current_id = 0 |
| |
| |
| for t in specials: |
| if t in tokens or t in specials: |
| token_to_id[t] = current_id |
| current_id += 1 |
| |
| |
| current_id = 100 |
| |
| |
| for t in sorted(ascii_tokens, key=ord): |
| token_to_id[t] = current_id |
| current_id += 1 |
| |
| |
| current_id = max(current_id, 356) |
| |
| |
| for t in regular_tokens: |
| if t not in token_to_id: |
| token_to_id[t] = current_id |
| current_id += 1 |
| |
| return token_to_id |
|
|