Phase-Technologies's picture
Upload folder using huggingface_hub
708f4a3 verified
"""
Adaptive Vocabulary Manager Module.
Implements Section 8.2 of the XERV Crayon Engineering Treatise:
- Real-time entropy monitoring
- Adaptive vocabulary updates with feedback control
- Unknown token handling with candidate extraction
"""
import time
import math
from collections import defaultdict, deque
from typing import List, Tuple, Dict, Any, Optional, Set
from ..core.vocabulary import CrayonVocab
from .stability import StableVocabularyManager
class AdaptiveVocabularyManager:
"""
Manages vocabulary adaptation for out-of-distribution text processing.
Implements the control loop defined in Section 8.2:
dV/dt = eta * grad_V [Performance(V,t) - Complexity(V)][cite: 140].
Features:
- Rolling window unknown token rate monitoring
- Entropy-guided candidate extraction
- Multi-objective utility ranking
- Cooldown-based adaptation triggering
"""
def __init__(self,
base_vocab_manager: StableVocabularyManager,
core_vocab: CrayonVocab,
adaptation_threshold: float = 0.15,
min_candidate_frequency: int = 5,
max_candidates_per_batch: int = 50,
cooldown_seconds: float = 300.0):
"""
Initialize the adaptive manager.
Args:
base_vocab_manager: Stable ID assignment manager
core_vocab: Core vocabulary for tokenization
adaptation_threshold: Unknown rate threshold for triggering adaptation
min_candidate_frequency: Minimum frequency for candidate consideration
max_candidates_per_batch: Maximum tokens to add per adaptation event
cooldown_seconds: Minimum time between adaptations
"""
self.vocab_manager = base_vocab_manager
self.core_vocab = core_vocab
self.adaptation_threshold = adaptation_threshold
self.min_candidate_frequency = min_candidate_frequency
self.max_candidates_per_batch = max_candidates_per_batch
self.cooldown_seconds = cooldown_seconds
# Rolling window for effectiveness monitoring [cite: 1106]
self.unknown_token_rate: deque = deque(maxlen=1000)
self.candidate_tokens: Dict[str, int] = defaultdict(int)
self.candidate_lengths: Dict[str, List[int]] = defaultdict(list)
# Active unknown spans for extraction
self._current_unknown_spans: List[Tuple[int, int]] = []
self.processing_stats = {
'total_tokens': 0,
'unknown_tokens': 0,
'adaptation_events': 0,
'last_adaptation_time': 0.0,
'total_texts_processed': 0,
'candidates_extracted': 0
}
def tokenize_with_adaptation(self, text: str) -> Tuple[List[int], Dict[str, Any]]:
"""
Tokenizes text while monitoring for adaptation opportunities[cite: 1120].
Returns:
Tuple(List[int], MetadataDict with adaptation info)
"""
# 1. Standard Tokenization
tokens = self.core_vocab.tokenize(text)
# 2. Analyze Unknowns
unk_id = self.core_vocab.unk_token_id
unknown_positions = [i for i, t in enumerate(tokens) if t == unk_id]
unknown_count = len(unknown_positions)
total = len(tokens)
# 3. Update Statistics
self.processing_stats['total_tokens'] += total
self.processing_stats['unknown_tokens'] += unknown_count
self.processing_stats['total_texts_processed'] += 1
current_rate = unknown_count / total if total > 0 else 0.0
self.unknown_token_rate.append(current_rate)
# 4. Extract Candidates from unknown spans
if unknown_count > 0:
self._extract_candidates_from_text(text, tokens, unknown_positions)
# 5. Trigger Adaptation? [cite: 1157]
adaptation_metadata = {
'unknown_rate': current_rate,
'total_tokens': total,
'unknown_count': unknown_count,
'adaptation_triggered': False
}
if self._should_trigger_adaptation():
result = self._perform_vocabulary_adaptation()
adaptation_metadata.update(result)
adaptation_metadata['adaptation_triggered'] = True
return tokens, adaptation_metadata
def _extract_candidates_from_text(
self,
text: str,
tokens: List[int],
unknown_positions: List[int]
) -> None:
"""
Extract candidate tokens from text regions that caused UNK tokens.
Maps token positions back to character positions to identify
untokenized spans for vocabulary expansion.
"""
if not unknown_positions:
return
unk_id = self.core_vocab.unk_token_id
text_len = len(text)
# Reconstruct character positions from tokens
# Each UNK corresponds to exactly 1 character in our tokenizer
char_pos = 0
unknown_chars: Set[int] = set()
for i, token_id in enumerate(tokens):
if token_id == unk_id:
if char_pos < text_len:
unknown_chars.add(char_pos)
char_pos += 1
else:
# Get token string length
token_str = self.core_vocab.id_to_token.get(token_id, '')
char_pos += len(token_str)
# Find contiguous unknown spans
if not unknown_chars:
return
sorted_positions = sorted(unknown_chars)
spans: List[Tuple[int, int]] = []
span_start = sorted_positions[0]
span_end = span_start
for pos in sorted_positions[1:]:
if pos == span_end + 1:
span_end = pos
else:
spans.append((span_start, span_end + 1))
span_start = pos
span_end = pos
spans.append((span_start, span_end + 1))
# Extract candidate substrings from spans with context
for start, end in spans:
# Extend context window for better candidates
context_start = max(0, start - 2)
context_end = min(text_len, end + 2)
# Extract all substrings in the span (up to SIMD limit of 16 bytes)
for length in range(1, min(17, context_end - context_start + 1)):
for i in range(context_start, context_end - length + 1):
candidate = text[i:i + length]
# Skip if already in vocabulary
if candidate in self.core_vocab.token_to_id:
continue
# Skip control characters and whitespace-only
if not candidate.strip() or not candidate.isprintable():
continue
# Skip if byte length exceeds SIMD limit
if len(candidate.encode('utf-8')) > 16:
continue
self.candidate_tokens[candidate] += 1
self.candidate_lengths[candidate].append(length)
self.processing_stats['candidates_extracted'] += 1
def _should_trigger_adaptation(self) -> bool:
"""
Determines trigger based on threshold and cooldown[cite: 1157].
Criteria:
1. Minimum sample size (100 recent tokenizations)
2. Unknown rate exceeds threshold
3. Cooldown period elapsed
4. Candidate pool has viable options
"""
# Check minimum samples
if len(self.unknown_token_rate) < 100:
return False
# Calculate recent unknown rate
recent_rate = sum(self.unknown_token_rate) / len(self.unknown_token_rate)
# Check threshold
if recent_rate < self.adaptation_threshold:
return False
# Check cooldown (default 5 minutes) [cite: 1173]
current_time = time.time()
if current_time - self.processing_stats['last_adaptation_time'] < self.cooldown_seconds:
return False
# Check candidate pool
viable_candidates = sum(
1 for freq in self.candidate_tokens.values()
if freq >= self.min_candidate_frequency
)
if viable_candidates < 5:
return False
return True
def _rank_candidates_by_utility(self) -> List[Tuple[str, float]]:
"""
Ranks candidates using the multi-objective utility function[cite: 1224].
Utility = (Compression × 0.4) + (1/Speed × 0.3) + (Coherence × 0.3)
Where:
- Compression: bits saved = len(token) × frequency
- Speed: inverse of lookup cost (favors shorter tokens)
- Coherence: linguistic quality score (alpha = 1.0, mixed = 0.5)
"""
results: List[Tuple[str, float]] = []
for token, freq in self.candidate_tokens.items():
# Filter low-frequency noise
if freq < self.min_candidate_frequency:
continue
# Already in vocabulary check
if token in self.core_vocab.token_to_id:
continue
# Compression benefit: bytes saved per occurrence
byte_len = len(token.encode('utf-8'))
compression_benefit = byte_len * freq
# Speed impact: shorter tokens are faster to process
# Normalized to 0-1 range (16 bytes max)
speed_factor = 1.0 - (byte_len / 16.0)
# Coherence: linguistic quality heuristics
coherence = 1.0
if token.isalpha():
coherence = 1.0 # Pure alphabetic
elif token.isalnum():
coherence = 0.8 # Alphanumeric
elif any(c.isalpha() for c in token):
coherence = 0.6 # Mixed with some letters
else:
coherence = 0.3 # Punctuation/symbols
# Multi-objective utility [cite: 1224]
utility = (
(compression_benefit * 0.4) +
(speed_factor * freq * 0.3) +
(coherence * freq * 0.3)
)
results.append((token, utility))
return sorted(results, key=lambda x: x[1], reverse=True)
def _perform_vocabulary_adaptation(self) -> Dict[str, Any]:
"""
Executes the vocabulary update[cite: 1179].
Steps:
1. Rank candidates by utility
2. Select top-N candidates
3. Add to stable vocabulary manager
4. Clear candidate pool
5. Update statistics
"""
candidates = self._rank_candidates_by_utility()
# Select top candidates up to batch limit
selected = [c[0] for c in candidates[:self.max_candidates_per_batch]]
if not selected:
return {
'new_tokens': 0,
'candidates_considered': len(candidates),
'timestamp': time.time()
}
# Add to vocabulary manager with stable ID assignment
new_ids = self.vocab_manager.add_tokens_incrementally(selected)
# Note: In production, would need to rebuild C-trie here
# This requires re-calling _build_c_trie on the core vocab
# For now, new tokens will use Python fallback until restart
# Clear candidate pool after successful adaptation
self.candidate_tokens.clear()
self.candidate_lengths.clear()
# Update statistics
self.processing_stats['last_adaptation_time'] = time.time()
self.processing_stats['adaptation_events'] += 1
return {
'new_tokens': len(new_ids),
'tokens_added': list(new_ids.keys()),
'candidates_considered': len(candidates),
'timestamp': time.time()
}
def get_statistics(self) -> Dict[str, Any]:
"""Return current processing and adaptation statistics."""
avg_unknown_rate = (
sum(self.unknown_token_rate) / len(self.unknown_token_rate)
if self.unknown_token_rate else 0.0
)
return {
**self.processing_stats,
'current_unknown_rate': avg_unknown_rate,
'candidate_pool_size': len(self.candidate_tokens),
'viable_candidates': sum(
1 for f in self.candidate_tokens.values()
if f >= self.min_candidate_frequency
)
}
def force_adaptation(self) -> Dict[str, Any]:
"""Force an immediate adaptation regardless of thresholds."""
return self._perform_vocabulary_adaptation()
def clear_candidates(self) -> None:
"""Clear the candidate token pool."""
self.candidate_tokens.clear()
self.candidate_lengths.clear()
self.processing_stats['candidates_extracted'] = 0