#!/usr/bin/env python3 """ Advanced Tokenizer System ========================= Integrates multiple tokenization approaches with semantic awareness, mathematical processing, and fractal-based tokenization for high-capacity input processing. """ import re import json import hashlib import asyncio import numpy as np import torch from typing import List, Dict, Any, Optional, Union, Tuple, Generator from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path import logging # Import existing systems try: from advanced_embedding_pipeline.semantic_embedder import SemanticEmbedder, SemanticConfig from advanced_embedding_pipeline.mathematical_embedder import MathematicalEmbedder, MathematicalConfig from advanced_embedding_pipeline.fractal_cascade_embedder import FractalCascadeEmbedder, FractalConfig except ImportError: print("⚠️ Advanced embedding pipeline not available, using fallback implementations") SemanticEmbedder = None MathematicalEmbedder = None FractalCascadeEmbedder = None from intelligent_chunking_processor import IntelligentChunkingProcessor, IntelligentChunk from high_capacity_input_processor import HighCapacityInputProcessor, InputChunk logger = logging.getLogger(__name__) @dataclass class TokenizerConfig: """Configuration for the advanced tokenizer system.""" # Core tokenization max_vocab_size: int = 50000 max_sequence_length: int = 8192 min_token_length: int = 1 max_token_length: int = 100 # Semantic processing use_semantic_tokenization: bool = True semantic_threshold: float = 0.7 context_window: int = 128 # Mathematical processing use_mathematical_tokenization: bool = True math_detection_threshold: float = 0.3 symbolic_processing: bool = True # Fractal processing use_fractal_tokenization: bool = True fractal_dimensions: int = 3 fractal_iterations: int = 5 # Chunking integration use_intelligent_chunking: bool = True chunk_overlap: int = 100 semantic_chunking: bool = True # Performance batch_size: int = 32 cache_tokens: bool = True parallel_processing: bool = True # File paths cache_dir: str = "./tokenizer_cache" model_cache_dir: str = "./model_cache" @dataclass class Token: """Represents a single token with metadata.""" token_id: int text: str token_type: str # 'word', 'math', 'symbol', 'punctuation', 'semantic', 'fractal' position: int length: int semantic_embedding: Optional[np.ndarray] = None mathematical_embedding: Optional[np.ndarray] = None fractal_embedding: Optional[np.ndarray] = None metadata: Dict[str, Any] = None @dataclass class TokenizedSequence: """Represents a tokenized sequence with full metadata.""" sequence_id: str original_text: str tokens: List[Token] total_tokens: int token_types: Dict[str, int] semantic_coherence: float mathematical_content_ratio: float fractal_patterns: List[Dict[str, Any]] processing_time: float metadata: Dict[str, Any] class AdvancedTokenizer: """ Advanced tokenizer system that integrates multiple tokenization approaches: - Traditional tokenization - Semantic-aware tokenization - Mathematical expression tokenization - Fractal-based tokenization - Intelligent chunking integration """ def __init__(self, config: Optional[TokenizerConfig] = None): self.config = config or TokenizerConfig() # Initialize components self.vocab = {} self.reverse_vocab = {} self.token_cache = {} # Initialize embedding systems self.semantic_embedder = None self.mathematical_embedder = None self.fractal_embedder = None self.intelligent_chunker = None self.high_capacity_processor = None self._initialize_components() self._setup_cache() # Token patterns self.token_patterns = { 'word': re.compile(r'\b[a-zA-Z]+\b'), 'number': re.compile(r'\b\d+(?:\.\d+)?\b'), 'math_symbol': re.compile(r'[+\-*/=<>(){}[\]^%&|~!@#$]+'), 'punctuation': re.compile(r'[.,;:!?\'"`]+'), 'whitespace': re.compile(r'\s+'), 'code': re.compile(r'```[\s\S]*?```|`[^`]+`'), 'math_expression': re.compile(r'\$\$[\s\S]*?\$\$|\$[^$]+\$'), 'url': re.compile(r'https?://\S+|www\.\S+'), 'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b') } # Special tokens self.special_tokens = { '': 0, '': 1, '': 2, '': 3, '': 4, '': 5, '': 6, '': 7, '': 8, '': 9 } # Initialize vocabulary with special tokens self._initialize_vocabulary() def _initialize_components(self): """Initialize all tokenizer components.""" try: # Initialize semantic embedder if SemanticEmbedder and self.config.use_semantic_tokenization: semantic_config = SemanticConfig() self.semantic_embedder = SemanticEmbedder(semantic_config) logger.info("✅ Semantic embedder initialized") # Initialize mathematical embedder if MathematicalEmbedder and self.config.use_mathematical_tokenization: math_config = MathematicalConfig() self.mathematical_embedder = MathematicalEmbedder(math_config) logger.info("✅ Mathematical embedder initialized") # Initialize fractal embedder if FractalCascadeEmbedder and self.config.use_fractal_tokenization: fractal_config = FractalConfig() self.fractal_embedder = FractalCascadeEmbedder(fractal_config) logger.info("✅ Fractal embedder initialized") # Initialize intelligent chunker if self.config.use_intelligent_chunking: self.intelligent_chunker = IntelligentChunkingProcessor( max_chunk_size=self.config.max_sequence_length, overlap_size=self.config.chunk_overlap ) logger.info("✅ Intelligent chunker initialized") # Initialize high capacity processor self.high_capacity_processor = HighCapacityInputProcessor( max_chunk_size=self.config.max_sequence_length, chunk_overlap=self.config.chunk_overlap ) logger.info("✅ High capacity processor initialized") except Exception as e: logger.warning(f"⚠️ Component initialization failed: {e}") def _setup_cache(self): """Setup tokenization cache.""" if self.config.cache_tokens: cache_path = Path(self.config.cache_dir) cache_path.mkdir(parents=True, exist_ok=True) self.cache_path = cache_path def _initialize_vocabulary(self): """Initialize vocabulary with special tokens.""" self.vocab = self.special_tokens.copy() self.reverse_vocab = {v: k for k, v in self.vocab.items()} self.next_token_id = len(self.special_tokens) def _get_or_add_token(self, text: str, token_type: str = 'word') -> int: """Get or add token to vocabulary.""" if text in self.vocab: return self.vocab[text] if len(self.vocab) >= self.config.max_vocab_size: return self.vocab[''] token_id = self.next_token_id self.vocab[text] = token_id self.reverse_vocab[token_id] = text self.next_token_id += 1 return token_id def _detect_content_type(self, text: str) -> Dict[str, float]: """Detect content type ratios in text.""" content_ratios = { 'mathematical': 0.0, 'code': 0.0, 'natural_language': 0.0, 'structured_data': 0.0 } total_chars = len(text) if total_chars == 0: return content_ratios # Mathematical content math_matches = len(re.findall(self.token_patterns['math_expression'], text)) math_symbols = len(re.findall(self.token_patterns['math_symbol'], text)) content_ratios['mathematical'] = (math_matches + math_symbols) / total_chars # Code content code_matches = len(re.findall(self.token_patterns['code'], text)) content_ratios['code'] = code_matches / total_chars # Natural language (words) word_matches = len(re.findall(self.token_patterns['word'], text)) content_ratios['natural_language'] = word_matches / total_chars # Structured data (JSON-like) json_like = len(re.findall(r'[{}[\]]', text)) content_ratios['structured_data'] = json_like / total_chars return content_ratios def _extract_mathematical_expressions(self, text: str) -> List[Tuple[str, int, int]]: """Extract mathematical expressions with positions.""" expressions = [] # LaTeX math for match in re.finditer(self.token_patterns['math_expression'], text): expressions.append((match.group(), match.start(), match.end())) # Simple mathematical patterns math_patterns = [ r'\b\d+\s*[+\-*/]\s*\d+', # Simple arithmetic r'\b\w+\s*=\s*\d+', # Assignments r'\b\w+\s*\([^)]*\)', # Functions ] for pattern in math_patterns: for match in re.finditer(pattern, text): expressions.append((match.group(), match.start(), match.end())) return expressions def _generate_fractal_tokens(self, text: str, position: int) -> List[Token]: """Generate fractal-based tokens for text segment.""" tokens = [] if not self.config.use_fractal_tokenization: return tokens try: # Generate fractal pattern based on text content text_hash = hashlib.md5(text.encode()).hexdigest() # Create fractal sequence fractal_sequence = self._create_fractal_sequence(text_hash) for i, fractal_value in enumerate(fractal_sequence): fractal_text = f"" token_id = self._get_or_add_token(fractal_text, 'fractal') token = Token( token_id=token_id, text=fractal_text, token_type='fractal', position=position + i, length=len(fractal_text), metadata={'fractal_value': fractal_value, 'fractal_index': i} ) tokens.append(token) if len(tokens) >= 10: # Limit fractal tokens break except Exception as e: logger.warning(f"Fractal token generation failed: {e}") return tokens def _create_fractal_sequence(self, seed: str) -> List[float]: """Create a fractal sequence from seed.""" # Simple fractal-like sequence generation sequence = [] value = 0.5 for i in range(10): # Use seed to modify value seed_val = int(seed[i % len(seed)], 16) / 16.0 value = 4 * value * (1 - value) + seed_val * 0.1 sequence.append(value) return sequence def _generate_semantic_tokens(self, text: str, position: int) -> List[Token]: """Generate semantic-aware tokens.""" tokens = [] if not self.config.use_semantic_tokenization or not self.semantic_embedder: return tokens try: # Extract semantic concepts words = text.split() if len(words) < 2: return tokens # Create semantic chunks semantic_chunks = [] for i in range(0, len(words), self.config.context_window // 10): chunk = ' '.join(words[i:i + self.config.context_window // 10]) semantic_chunks.append(chunk) for i, chunk in enumerate(semantic_chunks): semantic_text = f"" token_id = self._get_or_add_token(semantic_text, 'semantic') token = Token( token_id=token_id, text=semantic_text, token_type='semantic', position=position + i, length=len(semantic_text), metadata={'semantic_chunk': chunk, 'chunk_index': i} ) tokens.append(token) except Exception as e: logger.warning(f"Semantic token generation failed: {e}") return tokens def _tokenize_traditional(self, text: str, position_offset: int = 0) -> List[Token]: """Traditional tokenization approach.""" tokens = [] position = position_offset # Split by whitespace first parts = re.split(r'(\s+)', text) for part in parts: if not part: continue if part.isspace(): # Whitespace token token_id = self._get_or_add_token('', 'whitespace') token = Token( token_id=token_id, text=part, token_type='whitespace', position=position, length=len(part) ) tokens.append(token) position += len(part) continue # Determine token type token_type = 'word' if re.match(self.token_patterns['number'], part): token_type = 'number' elif re.match(self.token_patterns['math_symbol'], part): token_type = 'symbol' elif re.match(self.token_patterns['punctuation'], part): token_type = 'punctuation' elif re.match(self.token_patterns['url'], part): token_type = 'url' elif re.match(self.token_patterns['email'], part): token_type = 'email' # Add token token_id = self._get_or_add_token(part, token_type) token = Token( token_id=token_id, text=part, token_type=token_type, position=position, length=len(part) ) tokens.append(token) position += len(part) return tokens def _tokenize_mathematical(self, text: str, position_offset: int = 0) -> List[Token]: """Mathematical expression tokenization.""" tokens = [] position = position_offset # Extract mathematical expressions math_expressions = self._extract_mathematical_expressions(text) current_pos = 0 for expr_text, expr_start, expr_end in math_expressions: # Add tokens before expression if expr_start > current_pos: before_text = text[current_pos:expr_start] before_tokens = self._tokenize_traditional(before_text, position + current_pos) tokens.extend(before_tokens) # Add mathematical expression token token_id = self._get_or_add_token(f"{expr_text}", 'math') token = Token( token_id=token_id, text=expr_text, token_type='math', position=position + expr_start, length=len(expr_text), metadata={'is_mathematical': True, 'expression': expr_text} ) tokens.append(token) current_pos = expr_end # Add remaining tokens if current_pos < len(text): remaining_text = text[current_pos:] remaining_tokens = self._tokenize_traditional(remaining_text, position + current_pos) tokens.extend(remaining_tokens) return tokens async def tokenize(self, text: str) -> TokenizedSequence: """ Main tokenization method that combines all approaches. Args: text: Input text to tokenize Returns: TokenizedSequence with all tokens and metadata """ start_time = datetime.now() sequence_id = hashlib.md5(f"{text}_{datetime.now().isoformat()}".encode()).hexdigest()[:16] # Detect content type content_ratios = self._detect_content_type(text) # Initialize token list all_tokens = [] position = 0 # Traditional tokenization traditional_tokens = self._tokenize_traditional(text) all_tokens.extend(traditional_tokens) # Mathematical tokenization (if mathematical content detected) if content_ratios['mathematical'] > self.config.math_detection_threshold: math_tokens = self._tokenize_mathematical(text) all_tokens = math_tokens # Replace with mathematical tokens # Semantic tokenization if self.config.use_semantic_tokenization: semantic_tokens = self._generate_semantic_tokens(text, len(all_tokens)) all_tokens.extend(semantic_tokens) # Fractal tokenization if self.config.use_fractal_tokenization: fractal_tokens = self._generate_fractal_tokens(text, len(all_tokens)) all_tokens.extend(fractal_tokens) # Sort tokens by position all_tokens.sort(key=lambda t: t.position) # Calculate token type distribution token_types = {} for token in all_tokens: token_types[token.token_type] = token_types.get(token.token_type, 0) + 1 # Calculate semantic coherence semantic_coherence = self._calculate_semantic_coherence(all_tokens) # Calculate mathematical content ratio mathematical_content_ratio = content_ratios['mathematical'] # Extract fractal patterns fractal_patterns = self._extract_fractal_patterns(all_tokens) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() # Create metadata metadata = { 'content_ratios': content_ratios, 'total_characters': len(text), 'unique_tokens': len(set(token.text for token in all_tokens)), 'vocabulary_size': len(self.vocab), 'processing_config': asdict(self.config) } return TokenizedSequence( sequence_id=sequence_id, original_text=text, tokens=all_tokens, total_tokens=len(all_tokens), token_types=token_types, semantic_coherence=semantic_coherence, mathematical_content_ratio=mathematical_content_ratio, fractal_patterns=fractal_patterns, processing_time=processing_time, metadata=metadata ) def _calculate_semantic_coherence(self, tokens: List[Token]) -> float: """Calculate semantic coherence score.""" if not tokens: return 0.0 # Simple coherence based on token type diversity token_types = set(token.token_type for token in tokens) type_diversity = len(token_types) / len(tokens) if tokens else 0 # Coherence is inverse of diversity (more diverse = less coherent) coherence = 1.0 - type_diversity return max(0.0, min(1.0, coherence)) def _extract_fractal_patterns(self, tokens: List[Token]) -> List[Dict[str, Any]]: """Extract fractal patterns from tokens.""" patterns = [] fractal_tokens = [t for t in tokens if t.token_type == 'fractal'] for i, token in enumerate(fractal_tokens): if token.metadata and 'fractal_value' in token.metadata: patterns.append({ 'position': token.position, 'fractal_value': token.metadata['fractal_value'], 'fractal_index': token.metadata.get('fractal_index', i) }) return patterns async def tokenize_batch(self, texts: List[str]) -> List[TokenizedSequence]: """Tokenize a batch of texts.""" sequences = [] for text in texts: try: sequence = await self.tokenize(text) sequences.append(sequence) except Exception as e: logger.error(f"Tokenization failed for text: {e}") # Create empty sequence as fallback empty_sequence = TokenizedSequence( sequence_id="error", original_text=text, tokens=[], total_tokens=0, token_types={}, semantic_coherence=0.0, mathematical_content_ratio=0.0, fractal_patterns=[], processing_time=0.0, metadata={'error': str(e)} ) sequences.append(empty_sequence) return sequences def decode(self, token_ids: List[int]) -> str: """Decode token IDs back to text.""" tokens = [] for token_id in token_ids: if token_id in self.reverse_vocab: token_text = self.reverse_vocab[token_id] if not token_text.startswith('<') or token_text in ['']: tokens.append(token_text) else: tokens.append('') return ' '.join(tokens) def get_vocab_size(self) -> int: """Get current vocabulary size.""" return len(self.vocab) def save_vocabulary(self, filepath: str): """Save vocabulary to file.""" vocab_data = { 'vocab': self.vocab, 'reverse_vocab': self.reverse_vocab, 'next_token_id': self.next_token_id, 'config': asdict(self.config) } with open(filepath, 'w', encoding='utf-8') as f: json.dump(vocab_data, f, indent=2, ensure_ascii=False) def load_vocabulary(self, filepath: str): """Load vocabulary from file.""" with open(filepath, 'r', encoding='utf-8') as f: vocab_data = json.load(f) self.vocab = vocab_data['vocab'] self.reverse_vocab = vocab_data['reverse_vocab'] self.next_token_id = vocab_data['next_token_id'] # Update config if available if 'config' in vocab_data: self.config = TokenizerConfig(**vocab_data['config']) async def close(self): """Close all components.""" if self.semantic_embedder: await self.semantic_embedder.close() if self.mathematical_embedder: await self.mathematical_embedder.close() if self.fractal_embedder: await self.fractal_embedder.close() def main(): """Demo the advanced tokenizer system.""" print("🧠 Advanced Tokenizer System Demo") print("=" * 50) # Initialize tokenizer config = TokenizerConfig( use_semantic_tokenization=True, use_mathematical_tokenization=True, use_fractal_tokenization=True, use_intelligent_chunking=True ) tokenizer = AdvancedTokenizer(config) # Demo texts demo_texts = [ "Hello world! This is a simple text.", "The equation x^2 + y^2 = z^2 represents the Pythagorean theorem.", "```python\nprint('Hello, World!')\n```", "The fractal dimension of the Mandelbrot set is approximately 2.0.", "Machine learning algorithms use gradient descent: θ = θ - α∇J(θ)" ] async def run_demo(): print(f"\n📝 Tokenizing {len(demo_texts)} demo texts...") for i, text in enumerate(demo_texts): print(f"\n--- Text {i+1} ---") print(f"Original: {text}") sequence = await tokenizer.tokenize(text) print(f"Total tokens: {sequence.total_tokens}") print(f"Token types: {sequence.token_types}") print(f"Semantic coherence: {sequence.semantic_coherence:.3f}") print(f"Mathematical content: {sequence.mathematical_content_ratio:.3f}") print(f"Fractal patterns: {len(sequence.fractal_patterns)}") print(f"Processing time: {sequence.processing_time:.3f}s") # Show first few tokens print("First 10 tokens:") for j, token in enumerate(sequence.tokens[:10]): print(f" {j}: {token.text} ({token.token_type})") # Batch processing demo print(f"\n🔄 Batch processing demo...") sequences = await tokenizer.tokenize_batch(demo_texts) total_tokens = sum(seq.total_tokens for seq in sequences) avg_coherence = np.mean([seq.semantic_coherence for seq in sequences]) print(f"Total tokens across all texts: {total_tokens}") print(f"Average semantic coherence: {avg_coherence:.3f}") # Vocabulary info print(f"\n📚 Vocabulary size: {tokenizer.get_vocab_size()}") # Save vocabulary tokenizer.save_vocabulary("advanced_tokenizer_vocab.json") print("✅ Vocabulary saved to advanced_tokenizer_vocab.json") await tokenizer.close() # Run demo asyncio.run(run_demo()) print(f"\n✅ Advanced tokenizer system demo complete!") if __name__ == "__main__": main()