LiMp-Pipeline-Integration-System / integration_systems /enhanced_tokenizer_integration.py
9x25dillon's picture
Initial upload of LiMp Pipeline Integration System
22ae78a verified
#!/usr/bin/env python3
"""
Enhanced Tokenizer Integration
=============================
Integrates the enhanced tokenizer with the pipeline system for
full feature extraction and processing.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime
import json
# Import enhanced tokenizer
try:
from enhanced_advanced_tokenizer import EnhancedAdvancedTokenizer, TokenizerConfig
ENHANCED_TOKENIZER_AVAILABLE = True
except ImportError:
ENHANCED_TOKENIZER_AVAILABLE = False
print("⚠️ Enhanced advanced tokenizer not available")
try:
from enhanced_tokenizer_minimal import MinimalEnhancedTokenizer
MINIMAL_TOKENIZER_AVAILABLE = True
except ImportError:
MINIMAL_TOKENIZER_AVAILABLE = False
print("⚠️ Minimal enhanced tokenizer not available")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TokenizerIntegrationConfig:
"""Configuration for tokenizer integration."""
use_advanced_tokenizer: bool = True
enable_semantic_embedding: bool = True
enable_ner: bool = True
enable_math_processing: bool = True
enable_fractal_analysis: bool = True
chunk_size: int = 512
max_tokens: int = 1000000
semantic_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
@dataclass
class TokenizerIntegrationResult:
"""Result from tokenizer integration processing."""
tokenizer_results: Dict[str, Any] = field(default_factory=dict)
combined_features: Dict[str, Any] = field(default_factory=dict)
processing_time: float = 0.0
success: bool = False
error_message: Optional[str] = None
class EnhancedTokenizerIntegration:
"""
Integration system for enhanced tokenizer processing.
Handles both advanced and minimal tokenizer variants.
"""
def __init__(self, config: Optional[TokenizerIntegrationConfig] = None):
self.config = config or TokenizerIntegrationConfig()
self.initialized = False
# Tokenizer instances
self.advanced_tokenizer = None
self.minimal_tokenizer = None
# Performance tracking
self.stats = {
"total_tokenization_requests": 0,
"successful_tokenization_requests": 0,
"advanced_tokenizer_requests": 0,
"minimal_tokenizer_requests": 0,
"average_processing_time": 0.0,
"total_tokens_processed": 0
}
logger.info(f"🔤 Initializing Enhanced Tokenizer Integration")
logger.info(f" Advanced Tokenizer: {ENHANCED_TOKENIZER_AVAILABLE}")
logger.info(f" Minimal Tokenizer: {MINIMAL_TOKENIZER_AVAILABLE}")
async def initialize(self) -> bool:
"""Initialize tokenizer instances."""
try:
logger.info("🚀 Initializing Enhanced Tokenizer Integration...")
# Initialize advanced tokenizer if available and requested
if ENHANCED_TOKENIZER_AVAILABLE and self.config.use_advanced_tokenizer:
await self._initialize_advanced_tokenizer()
# Initialize minimal tokenizer as fallback
if MINIMAL_TOKENIZER_AVAILABLE:
await self._initialize_minimal_tokenizer()
if not self.advanced_tokenizer and not self.minimal_tokenizer:
raise RuntimeError("No tokenizer instances available")
self.initialized = True
logger.info("✅ Enhanced Tokenizer Integration initialized successfully")
return True
except Exception as e:
logger.error(f"❌ Tokenizer integration initialization failed: {e}")
return False
async def _initialize_advanced_tokenizer(self):
"""Initialize the advanced enhanced tokenizer."""
try:
tokenizer_config = TokenizerConfig(
semantic_model_name=self.config.semantic_model_name,
enable_semantic_embedding=self.config.enable_semantic_embedding,
enable_ner=self.config.enable_ner,
enable_math_processing=self.config.enable_math_processing,
enable_fractal_analysis=self.config.enable_fractal_analysis,
chunk_size=self.config.chunk_size,
max_tokens=self.config.max_tokens
)
self.advanced_tokenizer = EnhancedAdvancedTokenizer(tokenizer_config)
logger.info("✅ Advanced Enhanced Tokenizer initialized")
except Exception as e:
logger.error(f"❌ Advanced tokenizer initialization failed: {e}")
# Don't raise - we can fall back to minimal tokenizer
async def _initialize_minimal_tokenizer(self):
"""Initialize the minimal enhanced tokenizer."""
try:
self.minimal_tokenizer = MinimalEnhancedTokenizer()
logger.info("✅ Minimal Enhanced Tokenizer initialized")
except Exception as e:
logger.error(f"❌ Minimal tokenizer initialization failed: {e}")
raise
async def process_with_enhanced_tokenizer(
self,
text_input: str,
context: Optional[Dict[str, Any]] = None
) -> TokenizerIntegrationResult:
"""
Process text through enhanced tokenizer with full feature extraction.
Args:
text_input: Text to tokenize and analyze
context: Additional context information
Returns:
TokenizerIntegrationResult with all features
"""
start_time = datetime.now()
if not self.initialized:
await self.initialize()
if not self.initialized:
return TokenizerIntegrationResult(
success=False,
error_message="Tokenizer integration not initialized",
processing_time=0.0
)
try:
logger.info("🔄 Processing with enhanced tokenizer...")
# Initialize result
result = TokenizerIntegrationResult()
# Process with advanced tokenizer if available
if self.advanced_tokenizer:
try:
tokenizer_result = await self.advanced_tokenizer.tokenize(text_input)
result.tokenizer_results["advanced"] = self._extract_advanced_features(tokenizer_result)
self.stats["advanced_tokenizer_requests"] += 1
self.stats["total_tokens_processed"] += tokenizer_result.token_count
logger.info("✅ Advanced tokenizer processing completed")
except Exception as e:
logger.warning(f"⚠️ Advanced tokenizer failed: {e}")
result.tokenizer_results["advanced"] = {"error": str(e)}
# Process with minimal tokenizer as fallback or supplement
if self.minimal_tokenizer:
try:
tokenizer_result = await self.minimal_tokenizer.tokenize(text_input)
result.tokenizer_results["minimal"] = self._extract_minimal_features(tokenizer_result)
self.stats["minimal_tokenizer_requests"] += 1
if "advanced" not in result.tokenizer_results or "error" in result.tokenizer_results["advanced"]:
self.stats["total_tokens_processed"] += tokenizer_result.token_count
logger.info("✅ Minimal tokenizer processing completed")
except Exception as e:
logger.warning(f"⚠️ Minimal tokenizer failed: {e}")
result.tokenizer_results["minimal"] = {"error": str(e)}
# Combine features from all tokenizers
result.combined_features = self._combine_tokenizer_features(result.tokenizer_results)
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
result.processing_time = processing_time
result.success = True
# Update stats
self._update_stats(processing_time, True)
logger.info(f"✅ Enhanced tokenizer processing completed in {processing_time:.3f}s")
return result
except Exception as e:
logger.error(f"❌ Enhanced tokenizer processing failed: {e}")
processing_time = (datetime.now() - start_time).total_seconds()
self._update_stats(processing_time, False)
return TokenizerIntegrationResult(
success=False,
error_message=str(e),
processing_time=processing_time
)
def _extract_advanced_features(self, tokenizer_result) -> Dict[str, Any]:
"""Extract features from advanced tokenizer result."""
return {
"token_count": tokenizer_result.token_count,
"semantic_features": tokenizer_result.semantic_features,
"entities": tokenizer_result.entities,
"math_expressions": tokenizer_result.math_expressions,
"fractal_features": tokenizer_result.fractal_features,
"embeddings_dim": len(tokenizer_result.embeddings) if tokenizer_result.embeddings is not None else 0,
"processing_time": getattr(tokenizer_result, 'processing_time', 0.0),
"content_type": tokenizer_result.semantic_features.get("content_type", "unknown"),
"complexity_score": tokenizer_result.semantic_features.get("complexity_score", 0.0),
"language_detection": tokenizer_result.semantic_features.get("language", "unknown")
}
def _extract_minimal_features(self, tokenizer_result) -> Dict[str, Any]:
"""Extract features from minimal tokenizer result."""
return {
"token_count": tokenizer_result.token_count,
"semantic_features": tokenizer_result.semantic_features,
"entities": tokenizer_result.entities,
"math_expressions": tokenizer_result.math_expressions,
"fractal_features": tokenizer_result.fractal_features,
"embeddings_dim": len(tokenizer_result.embeddings) if tokenizer_result.embeddings is not None else 0,
"processing_time": getattr(tokenizer_result, 'processing_time', 0.0),
"content_type": tokenizer_result.semantic_features.get("content_type", "unknown"),
"complexity_score": tokenizer_result.semantic_features.get("complexity_score", 0.0)
}
def _combine_tokenizer_features(self, tokenizer_results: Dict[str, Any]) -> Dict[str, Any]:
"""Combine features from all tokenizer results."""
combined_features = {
"total_token_count": 0,
"content_types": [],
"entities_found": 0,
"math_expressions_found": 0,
"embeddings_available": False,
"processing_times": {},
"complexity_scores": [],
"fractal_features": {},
"language_detection": "unknown"
}
# Combine features from all tokenizers
for tokenizer_name, features in tokenizer_results.items():
if "error" in features:
continue
# Token count
token_count = features.get("token_count", 0)
combined_features["total_token_count"] = max(combined_features["total_token_count"], token_count)
# Content types
content_type = features.get("content_type", "unknown")
if content_type not in combined_features["content_types"]:
combined_features["content_types"].append(content_type)
# Entities
entities = features.get("entities", [])
combined_features["entities_found"] += len(entities)
# Math expressions
math_expressions = features.get("math_expressions", [])
combined_features["math_expressions_found"] += len(math_expressions)
# Embeddings
embeddings_dim = features.get("embeddings_dim", 0)
if embeddings_dim > 0:
combined_features["embeddings_available"] = True
# Processing times
processing_time = features.get("processing_time", 0.0)
combined_features["processing_times"][tokenizer_name] = processing_time
# Complexity scores
complexity_score = features.get("complexity_score", 0.0)
if complexity_score > 0:
combined_features["complexity_scores"].append(complexity_score)
# Fractal features
fractal_features = features.get("fractal_features", {})
if fractal_features:
combined_features["fractal_features"][tokenizer_name] = fractal_features
# Language detection (prefer advanced tokenizer)
if tokenizer_name == "advanced":
language = features.get("language_detection", "unknown")
if language != "unknown":
combined_features["language_detection"] = language
# Calculate average complexity score
if combined_features["complexity_scores"]:
combined_features["average_complexity_score"] = sum(combined_features["complexity_scores"]) / len(combined_features["complexity_scores"])
else:
combined_features["average_complexity_score"] = 0.0
# Determine primary content type
if combined_features["content_types"]:
combined_features["primary_content_type"] = combined_features["content_types"][0]
else:
combined_features["primary_content_type"] = "unknown"
return combined_features
def _update_stats(self, processing_time: float, success: bool):
"""Update performance statistics."""
self.stats["total_tokenization_requests"] += 1
if success:
self.stats["successful_tokenization_requests"] += 1
# Update average processing time
total_time = self.stats["average_processing_time"] * (self.stats["total_tokenization_requests"] - 1)
total_time += processing_time
self.stats["average_processing_time"] = total_time / self.stats["total_tokenization_requests"]
def get_stats(self) -> Dict[str, Any]:
"""Get performance statistics."""
return {
**self.stats,
"initialized": self.initialized,
"tokenizers_available": {
"advanced": ENHANCED_TOKENIZER_AVAILABLE,
"minimal": MINIMAL_TOKENIZER_AVAILABLE
},
"success_rate": (
self.stats["successful_tokenization_requests"] / self.stats["total_tokenization_requests"]
if self.stats["total_tokenization_requests"] > 0 else 0
)
}
async def cleanup(self):
"""Clean up tokenizer resources."""
logger.info("🧹 Cleaning up Enhanced Tokenizer Integration...")
# Clean up tokenizers
if self.advanced_tokenizer:
del self.advanced_tokenizer
if self.minimal_tokenizer:
del self.minimal_tokenizer
self.initialized = False
logger.info("✅ Tokenizer integration cleanup completed")
async def main():
"""Demo function to test enhanced tokenizer integration."""
print("🚀 Testing Enhanced Tokenizer Integration")
print("=" * 50)
# Create system
config = TokenizerIntegrationConfig(
use_advanced_tokenizer=True,
enable_semantic_embedding=True,
enable_ner=True,
enable_math_processing=True,
enable_fractal_analysis=True
)
system = EnhancedTokenizerIntegration(config)
try:
# Initialize
if await system.initialize():
print("✅ Enhanced tokenizer integration initialized successfully")
# Test processing
test_texts = [
"Explain the concept of dimensional entanglement in AI systems.",
"The equation x^2 + y^2 = z^2 is fundamental to geometry.",
"def fibonacci(n): return n if n <= 1 else fibonacci(n-1) + fibonacci(n-2)",
"Machine learning algorithms can process large datasets efficiently using neural networks.",
"Quantum computing uses superposition and entanglement for parallel processing."
]
for i, text in enumerate(test_texts, 1):
print(f"\n🧪 Test {i}: {text[:50]}...")
result = await system.process_with_enhanced_tokenizer(text)
if result.success:
print(f"✅ Success ({result.processing_time:.3f}s)")
print(f" Token Count: {result.combined_features['total_token_count']}")
print(f" Content Type: {result.combined_features['primary_content_type']}")
print(f" Entities: {result.combined_features['entities_found']}")
print(f" Math Expressions: {result.combined_features['math_expressions_found']}")
print(f" Embeddings: {'Yes' if result.combined_features['embeddings_available'] else 'No'}")
print(f" Complexity: {result.combined_features['average_complexity_score']:.3f}")
print(f" Language: {result.combined_features['language_detection']}")
# Show tokenizer results
for tokenizer_name, features in result.tokenizer_results.items():
if "error" not in features:
print(f" {tokenizer_name.capitalize()}: {features['token_count']} tokens")
else:
print(f" {tokenizer_name.capitalize()}: Failed")
else:
print(f"❌ Failed: {result.error_message}")
# Show stats
stats = system.get_stats()
print(f"\n📊 Statistics:")
print(f" Total requests: {stats['total_tokenization_requests']}")
print(f" Success rate: {stats['success_rate']:.2%}")
print(f" Avg processing time: {stats['average_processing_time']:.3f}s")
print(f" Total tokens processed: {stats['total_tokens_processed']}")
print(f" Advanced requests: {stats['advanced_tokenizer_requests']}")
print(f" Minimal requests: {stats['minimal_tokenizer_requests']}")
print(f" Tokenizers available: {sum(stats['tokenizers_available'].values())}/2")
else:
print("❌ Failed to initialize enhanced tokenizer integration")
except Exception as e:
print(f"❌ Error: {e}")
finally:
# Cleanup
await system.cleanup()
print("\n🧹 Cleanup completed")
if __name__ == "__main__":
asyncio.run(main())