File size: 8,937 Bytes
708f4a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""
Crayon Vocabulary Training Module.

Implements Algorithm 3.1 from the XERV Crayon Engineering Treatise:
- Extract substring candidates up to SIMD limit (16 bytes)
- Calculate information gain with entropy reduction
- Select top-K candidates maximizing gain-to-cost ratio

This is the production-grade implementation for building optimal vocabularies
from either user-provided corpora or the built-in default sources.
"""

import math
import logging
import string
from collections import defaultdict
from typing import List, Tuple, Dict, Iterator, Optional, Callable

# Configure module logger
logger = logging.getLogger(__name__)

# SIMD Hardware Limit [cite: 128]
MAX_TOKEN_LENGTH = 16

# Minimum frequency threshold to filter noise
DEFAULT_MIN_FREQUENCY = 2


def build_default_vocabulary(
    target_size: int = 500000,
    progress_callback: Optional[Callable[[str], None]] = None
) -> List[str]:
    """
    Builds a 'Batteries-Included' vocabulary using Xerv-AI's curated datasets.
    
    Sources:
    - Xerv-AI/GRAD (Graduate Mathematics)
    - Xerv-AI/Physics-dataset-700 (Scientific Reasoning)
    - Xerv-AI/RainDrop-DTS (General Instruction)
    - Tiny Shakespeare (Classical Literature)
    - Built-in corpus (Baseline Coverage)
    
    No local files are required; data is streamed directly into the entropy engine.
    
    Args:
        target_size: Maximum vocabulary size (default 500k)
        progress_callback: Optional callback for progress updates
        
    Returns:
        List of token strings ordered by utility
    """
    from .resources import get_default_corpus_iterator
    
    if progress_callback:
        progress_callback("Initializing default corpus stream...")
    
    corpus_stream = get_default_corpus_iterator()
    return train_vocabulary(
        corpus_stream, 
        target_size=target_size,
        progress_callback=progress_callback
    )


def train_vocabulary(
    corpus_iterator: Iterator[str], 
    target_size: int = 500000,
    min_frequency: int = DEFAULT_MIN_FREQUENCY,
    progress_callback: Optional[Callable[[str], None]] = None
) -> List[str]:
    """
    Constructs an optimal vocabulary from a corpus using first-principles entropy analysis.
    
    Algorithm 3.1 [cite: 127-135]:
    1. Extract all substrings up to MAX_TOKEN_LENGTH (16 bytes for AVX2).
    2. Calculate Information Gain: Gain(s) = Frequency(s) × Entropy(s) - Cost(s).
    3. Select Top-K candidates maximizing utility score.
    
    Args:
        corpus_iterator: Iterator yielding chunks/lines of text
        target_size: Maximum vocabulary size (default 500k)
        min_frequency: Minimum token frequency threshold
        progress_callback: Optional callback for progress updates
        
    Returns:
        List of token strings ordered for stable ID assignment
    """
    if progress_callback:
        progress_callback("Starting Entropy-Guided Vocabulary Construction...")
    
    logger.info("Starting Entropy-Guided Vocabulary Construction...")
    
    # ========================================================================
    # Phase 1: Candidate Extraction & Frequency Counting [cite: 128]
    # ========================================================================
    candidates: Dict[str, int] = defaultdict(int)
    total_chars = 0
    chunk_count = 0
    
    # Process stream chunk by chunk (Zero-Disk Accumulation)
    for text_chunk in corpus_iterator:
        if not text_chunk:
            continue
        
        text_len = len(text_chunk)
        total_chars += text_len
        chunk_count += 1
        
        # Hot-path extraction loop - extract all valid substrings
        for i in range(text_len):
            # Hardware constraint: Tokens > 16 bytes degrade SIMD performance
            limit = min(i + MAX_TOKEN_LENGTH, text_len)
            for j in range(i + 1, limit + 1):
                token = text_chunk[i:j]
                
                # Skip tokens that exceed byte limit when encoded
                if len(token.encode('utf-8')) <= MAX_TOKEN_LENGTH:
                    candidates[token] += 1
        
        # Progress update every 100 chunks
        if chunk_count % 100 == 0 and progress_callback:
            progress_callback(f"Processed {chunk_count} chunks, {len(candidates):,} candidates...")
    
    if progress_callback:
        progress_callback(f"Extracted {len(candidates):,} unique candidates from {total_chars:,} chars")
    
    logger.info(f"Extracted {len(candidates):,} unique candidates from {total_chars:,} chars.")

    # ========================================================================
    # Phase 2: Information Gain Calculation [cite: 129-134]
    # ========================================================================
    if progress_callback:
        progress_callback("Scoring candidates by information gain...")
    
    scored_candidates: List[Tuple[str, float]] = []
    
    for token, freq in candidates.items():
        # Filter low-frequency noise
        if freq < min_frequency:
            continue
        
        # Skip control characters and empty strings
        if not token or not token.isprintable():
            continue
            
        # Probability p(s)
        p_s = freq / total_chars
        if p_s <= 0:
            continue
        
        # Information content (entropy reduction) [cite: 131]
        # H(s) = -log2(p(s))
        entropy = -math.log2(p_s)
        
        # Computational Cost Estimate [cite: 133]
        # Cost is linear to byte length + constant overhead for SIMD alignment
        byte_length = len(token.encode('utf-8'))
        comp_cost = byte_length * 0.1 + 1.0
        
        # Information Gain [cite: 134]
        # Gain = (Entropy × Frequency) / Cost
        gain = (entropy * freq) / comp_cost
        
        scored_candidates.append((token, gain))

    if progress_callback:
        progress_callback(f"Scored {len(scored_candidates):,} viable candidates")
    
    logger.info(f"Scored {len(scored_candidates):,} viable candidates")

    # ========================================================================
    # Phase 3: Selection with Priority Categories [cite: 1009-1012]
    # ========================================================================
    if progress_callback:
        progress_callback("Building final vocabulary...")
    
    # Sort by gain descending
    scored_candidates.sort(key=lambda x: x[1], reverse=True)
    
    # Build vocabulary with reserved categories
    vocab_set: set = set()
    
    # 1. Special tokens (MANDATORY) [cite: 1009]
    specials = ["<PAD>", "<UNK>", "<BOS>", "<EOS>"]
    for s in specials:
        vocab_set.add(s)
    
    # 2. ASCII printable characters (BASELINE) [cite: 1010]
    for c in string.printable:
        if c not in vocab_set and c.strip():
            vocab_set.add(c)
    
    # 3. Common single-byte sequences
    for i in range(256):
        try:
            char = chr(i)
            if char.isprintable() and char not in vocab_set:
                vocab_set.add(char)
        except (ValueError, UnicodeDecodeError):
            pass
    
    # 4. Fill remainder with entropy-optimized tokens
    remaining_slots = target_size - len(vocab_set)
    added_count = 0
    
    for token, gain in scored_candidates:
        if added_count >= remaining_slots:
            break
        if token not in vocab_set:
            vocab_set.add(token)
            added_count += 1
    
    final_vocab = list(vocab_set)
    
    if progress_callback:
        progress_callback(f"Final vocabulary: {len(final_vocab):,} tokens")
    
    logger.info(f"Final vocabulary: {len(final_vocab):,} tokens")
    
    return final_vocab


def calculate_corpus_entropy(corpus_iterator: Iterator[str]) -> float:
    """
    Calculate Shannon entropy of a corpus [cite: 93-96].
    
    H(X) = -Σ p(x) log2(p(x))
    
    Args:
        corpus_iterator: Iterator yielding text chunks
        
    Returns:
        Entropy in bits per character
    """
    char_counts: Dict[str, int] = defaultdict(int)
    total = 0
    
    for chunk in corpus_iterator:
        for char in chunk:
            char_counts[char] += 1
            total += 1
    
    if total == 0:
        return 0.0
    
    entropy = 0.0
    for count in char_counts.values():
        p = count / total
        if p > 0:
            entropy -= p * math.log2(p)
    
    return entropy


def estimate_optimal_vocab_size(entropy: float, epsilon: float = 0.5) -> int:
    """
    Calculate optimal vocabulary size from corpus entropy [cite: 94].
    
    V_optimal ≈ 2^(H(corpus) + ε)
    
    For English text (H ≈ 1.2 bits/char), this yields ~500k tokens.
    
    Args:
        entropy: Corpus entropy in bits per character
        epsilon: Adjustment factor (default 0.5)
        
    Returns:
        Estimated optimal vocabulary size
    """
    return int(2 ** (entropy + epsilon))