File size: 19,462 Bytes
22ae78a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
#!/usr/bin/env python3
"""
Enhanced Tokenizer Integration
=============================
Integrates the enhanced tokenizer with the pipeline system for
full feature extraction and processing.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime
import json
# Import enhanced tokenizer
try:
from enhanced_advanced_tokenizer import EnhancedAdvancedTokenizer, TokenizerConfig
ENHANCED_TOKENIZER_AVAILABLE = True
except ImportError:
ENHANCED_TOKENIZER_AVAILABLE = False
print("⚠️ Enhanced advanced tokenizer not available")
try:
from enhanced_tokenizer_minimal import MinimalEnhancedTokenizer
MINIMAL_TOKENIZER_AVAILABLE = True
except ImportError:
MINIMAL_TOKENIZER_AVAILABLE = False
print("⚠️ Minimal enhanced tokenizer not available")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TokenizerIntegrationConfig:
"""Configuration for tokenizer integration."""
use_advanced_tokenizer: bool = True
enable_semantic_embedding: bool = True
enable_ner: bool = True
enable_math_processing: bool = True
enable_fractal_analysis: bool = True
chunk_size: int = 512
max_tokens: int = 1000000
semantic_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
@dataclass
class TokenizerIntegrationResult:
"""Result from tokenizer integration processing."""
tokenizer_results: Dict[str, Any] = field(default_factory=dict)
combined_features: Dict[str, Any] = field(default_factory=dict)
processing_time: float = 0.0
success: bool = False
error_message: Optional[str] = None
class EnhancedTokenizerIntegration:
"""
Integration system for enhanced tokenizer processing.
Handles both advanced and minimal tokenizer variants.
"""
def __init__(self, config: Optional[TokenizerIntegrationConfig] = None):
self.config = config or TokenizerIntegrationConfig()
self.initialized = False
# Tokenizer instances
self.advanced_tokenizer = None
self.minimal_tokenizer = None
# Performance tracking
self.stats = {
"total_tokenization_requests": 0,
"successful_tokenization_requests": 0,
"advanced_tokenizer_requests": 0,
"minimal_tokenizer_requests": 0,
"average_processing_time": 0.0,
"total_tokens_processed": 0
}
logger.info(f"🔤 Initializing Enhanced Tokenizer Integration")
logger.info(f" Advanced Tokenizer: {ENHANCED_TOKENIZER_AVAILABLE}")
logger.info(f" Minimal Tokenizer: {MINIMAL_TOKENIZER_AVAILABLE}")
async def initialize(self) -> bool:
"""Initialize tokenizer instances."""
try:
logger.info("🚀 Initializing Enhanced Tokenizer Integration...")
# Initialize advanced tokenizer if available and requested
if ENHANCED_TOKENIZER_AVAILABLE and self.config.use_advanced_tokenizer:
await self._initialize_advanced_tokenizer()
# Initialize minimal tokenizer as fallback
if MINIMAL_TOKENIZER_AVAILABLE:
await self._initialize_minimal_tokenizer()
if not self.advanced_tokenizer and not self.minimal_tokenizer:
raise RuntimeError("No tokenizer instances available")
self.initialized = True
logger.info("✅ Enhanced Tokenizer Integration initialized successfully")
return True
except Exception as e:
logger.error(f"❌ Tokenizer integration initialization failed: {e}")
return False
async def _initialize_advanced_tokenizer(self):
"""Initialize the advanced enhanced tokenizer."""
try:
tokenizer_config = TokenizerConfig(
semantic_model_name=self.config.semantic_model_name,
enable_semantic_embedding=self.config.enable_semantic_embedding,
enable_ner=self.config.enable_ner,
enable_math_processing=self.config.enable_math_processing,
enable_fractal_analysis=self.config.enable_fractal_analysis,
chunk_size=self.config.chunk_size,
max_tokens=self.config.max_tokens
)
self.advanced_tokenizer = EnhancedAdvancedTokenizer(tokenizer_config)
logger.info("✅ Advanced Enhanced Tokenizer initialized")
except Exception as e:
logger.error(f"❌ Advanced tokenizer initialization failed: {e}")
# Don't raise - we can fall back to minimal tokenizer
async def _initialize_minimal_tokenizer(self):
"""Initialize the minimal enhanced tokenizer."""
try:
self.minimal_tokenizer = MinimalEnhancedTokenizer()
logger.info("✅ Minimal Enhanced Tokenizer initialized")
except Exception as e:
logger.error(f"❌ Minimal tokenizer initialization failed: {e}")
raise
async def process_with_enhanced_tokenizer(
self,
text_input: str,
context: Optional[Dict[str, Any]] = None
) -> TokenizerIntegrationResult:
"""
Process text through enhanced tokenizer with full feature extraction.
Args:
text_input: Text to tokenize and analyze
context: Additional context information
Returns:
TokenizerIntegrationResult with all features
"""
start_time = datetime.now()
if not self.initialized:
await self.initialize()
if not self.initialized:
return TokenizerIntegrationResult(
success=False,
error_message="Tokenizer integration not initialized",
processing_time=0.0
)
try:
logger.info("🔄 Processing with enhanced tokenizer...")
# Initialize result
result = TokenizerIntegrationResult()
# Process with advanced tokenizer if available
if self.advanced_tokenizer:
try:
tokenizer_result = await self.advanced_tokenizer.tokenize(text_input)
result.tokenizer_results["advanced"] = self._extract_advanced_features(tokenizer_result)
self.stats["advanced_tokenizer_requests"] += 1
self.stats["total_tokens_processed"] += tokenizer_result.token_count
logger.info("✅ Advanced tokenizer processing completed")
except Exception as e:
logger.warning(f"⚠️ Advanced tokenizer failed: {e}")
result.tokenizer_results["advanced"] = {"error": str(e)}
# Process with minimal tokenizer as fallback or supplement
if self.minimal_tokenizer:
try:
tokenizer_result = await self.minimal_tokenizer.tokenize(text_input)
result.tokenizer_results["minimal"] = self._extract_minimal_features(tokenizer_result)
self.stats["minimal_tokenizer_requests"] += 1
if "advanced" not in result.tokenizer_results or "error" in result.tokenizer_results["advanced"]:
self.stats["total_tokens_processed"] += tokenizer_result.token_count
logger.info("✅ Minimal tokenizer processing completed")
except Exception as e:
logger.warning(f"⚠️ Minimal tokenizer failed: {e}")
result.tokenizer_results["minimal"] = {"error": str(e)}
# Combine features from all tokenizers
result.combined_features = self._combine_tokenizer_features(result.tokenizer_results)
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
result.processing_time = processing_time
result.success = True
# Update stats
self._update_stats(processing_time, True)
logger.info(f"✅ Enhanced tokenizer processing completed in {processing_time:.3f}s")
return result
except Exception as e:
logger.error(f"❌ Enhanced tokenizer processing failed: {e}")
processing_time = (datetime.now() - start_time).total_seconds()
self._update_stats(processing_time, False)
return TokenizerIntegrationResult(
success=False,
error_message=str(e),
processing_time=processing_time
)
def _extract_advanced_features(self, tokenizer_result) -> Dict[str, Any]:
"""Extract features from advanced tokenizer result."""
return {
"token_count": tokenizer_result.token_count,
"semantic_features": tokenizer_result.semantic_features,
"entities": tokenizer_result.entities,
"math_expressions": tokenizer_result.math_expressions,
"fractal_features": tokenizer_result.fractal_features,
"embeddings_dim": len(tokenizer_result.embeddings) if tokenizer_result.embeddings is not None else 0,
"processing_time": getattr(tokenizer_result, 'processing_time', 0.0),
"content_type": tokenizer_result.semantic_features.get("content_type", "unknown"),
"complexity_score": tokenizer_result.semantic_features.get("complexity_score", 0.0),
"language_detection": tokenizer_result.semantic_features.get("language", "unknown")
}
def _extract_minimal_features(self, tokenizer_result) -> Dict[str, Any]:
"""Extract features from minimal tokenizer result."""
return {
"token_count": tokenizer_result.token_count,
"semantic_features": tokenizer_result.semantic_features,
"entities": tokenizer_result.entities,
"math_expressions": tokenizer_result.math_expressions,
"fractal_features": tokenizer_result.fractal_features,
"embeddings_dim": len(tokenizer_result.embeddings) if tokenizer_result.embeddings is not None else 0,
"processing_time": getattr(tokenizer_result, 'processing_time', 0.0),
"content_type": tokenizer_result.semantic_features.get("content_type", "unknown"),
"complexity_score": tokenizer_result.semantic_features.get("complexity_score", 0.0)
}
def _combine_tokenizer_features(self, tokenizer_results: Dict[str, Any]) -> Dict[str, Any]:
"""Combine features from all tokenizer results."""
combined_features = {
"total_token_count": 0,
"content_types": [],
"entities_found": 0,
"math_expressions_found": 0,
"embeddings_available": False,
"processing_times": {},
"complexity_scores": [],
"fractal_features": {},
"language_detection": "unknown"
}
# Combine features from all tokenizers
for tokenizer_name, features in tokenizer_results.items():
if "error" in features:
continue
# Token count
token_count = features.get("token_count", 0)
combined_features["total_token_count"] = max(combined_features["total_token_count"], token_count)
# Content types
content_type = features.get("content_type", "unknown")
if content_type not in combined_features["content_types"]:
combined_features["content_types"].append(content_type)
# Entities
entities = features.get("entities", [])
combined_features["entities_found"] += len(entities)
# Math expressions
math_expressions = features.get("math_expressions", [])
combined_features["math_expressions_found"] += len(math_expressions)
# Embeddings
embeddings_dim = features.get("embeddings_dim", 0)
if embeddings_dim > 0:
combined_features["embeddings_available"] = True
# Processing times
processing_time = features.get("processing_time", 0.0)
combined_features["processing_times"][tokenizer_name] = processing_time
# Complexity scores
complexity_score = features.get("complexity_score", 0.0)
if complexity_score > 0:
combined_features["complexity_scores"].append(complexity_score)
# Fractal features
fractal_features = features.get("fractal_features", {})
if fractal_features:
combined_features["fractal_features"][tokenizer_name] = fractal_features
# Language detection (prefer advanced tokenizer)
if tokenizer_name == "advanced":
language = features.get("language_detection", "unknown")
if language != "unknown":
combined_features["language_detection"] = language
# Calculate average complexity score
if combined_features["complexity_scores"]:
combined_features["average_complexity_score"] = sum(combined_features["complexity_scores"]) / len(combined_features["complexity_scores"])
else:
combined_features["average_complexity_score"] = 0.0
# Determine primary content type
if combined_features["content_types"]:
combined_features["primary_content_type"] = combined_features["content_types"][0]
else:
combined_features["primary_content_type"] = "unknown"
return combined_features
def _update_stats(self, processing_time: float, success: bool):
"""Update performance statistics."""
self.stats["total_tokenization_requests"] += 1
if success:
self.stats["successful_tokenization_requests"] += 1
# Update average processing time
total_time = self.stats["average_processing_time"] * (self.stats["total_tokenization_requests"] - 1)
total_time += processing_time
self.stats["average_processing_time"] = total_time / self.stats["total_tokenization_requests"]
def get_stats(self) -> Dict[str, Any]:
"""Get performance statistics."""
return {
**self.stats,
"initialized": self.initialized,
"tokenizers_available": {
"advanced": ENHANCED_TOKENIZER_AVAILABLE,
"minimal": MINIMAL_TOKENIZER_AVAILABLE
},
"success_rate": (
self.stats["successful_tokenization_requests"] / self.stats["total_tokenization_requests"]
if self.stats["total_tokenization_requests"] > 0 else 0
)
}
async def cleanup(self):
"""Clean up tokenizer resources."""
logger.info("🧹 Cleaning up Enhanced Tokenizer Integration...")
# Clean up tokenizers
if self.advanced_tokenizer:
del self.advanced_tokenizer
if self.minimal_tokenizer:
del self.minimal_tokenizer
self.initialized = False
logger.info("✅ Tokenizer integration cleanup completed")
async def main():
"""Demo function to test enhanced tokenizer integration."""
print("🚀 Testing Enhanced Tokenizer Integration")
print("=" * 50)
# Create system
config = TokenizerIntegrationConfig(
use_advanced_tokenizer=True,
enable_semantic_embedding=True,
enable_ner=True,
enable_math_processing=True,
enable_fractal_analysis=True
)
system = EnhancedTokenizerIntegration(config)
try:
# Initialize
if await system.initialize():
print("✅ Enhanced tokenizer integration initialized successfully")
# Test processing
test_texts = [
"Explain the concept of dimensional entanglement in AI systems.",
"The equation x^2 + y^2 = z^2 is fundamental to geometry.",
"def fibonacci(n): return n if n <= 1 else fibonacci(n-1) + fibonacci(n-2)",
"Machine learning algorithms can process large datasets efficiently using neural networks.",
"Quantum computing uses superposition and entanglement for parallel processing."
]
for i, text in enumerate(test_texts, 1):
print(f"\n🧪 Test {i}: {text[:50]}...")
result = await system.process_with_enhanced_tokenizer(text)
if result.success:
print(f"✅ Success ({result.processing_time:.3f}s)")
print(f" Token Count: {result.combined_features['total_token_count']}")
print(f" Content Type: {result.combined_features['primary_content_type']}")
print(f" Entities: {result.combined_features['entities_found']}")
print(f" Math Expressions: {result.combined_features['math_expressions_found']}")
print(f" Embeddings: {'Yes' if result.combined_features['embeddings_available'] else 'No'}")
print(f" Complexity: {result.combined_features['average_complexity_score']:.3f}")
print(f" Language: {result.combined_features['language_detection']}")
# Show tokenizer results
for tokenizer_name, features in result.tokenizer_results.items():
if "error" not in features:
print(f" {tokenizer_name.capitalize()}: {features['token_count']} tokens")
else:
print(f" {tokenizer_name.capitalize()}: Failed")
else:
print(f"❌ Failed: {result.error_message}")
# Show stats
stats = system.get_stats()
print(f"\n📊 Statistics:")
print(f" Total requests: {stats['total_tokenization_requests']}")
print(f" Success rate: {stats['success_rate']:.2%}")
print(f" Avg processing time: {stats['average_processing_time']:.3f}s")
print(f" Total tokens processed: {stats['total_tokens_processed']}")
print(f" Advanced requests: {stats['advanced_tokenizer_requests']}")
print(f" Minimal requests: {stats['minimal_tokenizer_requests']}")
print(f" Tokenizers available: {sum(stats['tokenizers_available'].values())}/2")
else:
print("❌ Failed to initialize enhanced tokenizer integration")
except Exception as e:
print(f"❌ Error: {e}")
finally:
# Cleanup
await system.cleanup()
print("\n🧹 Cleanup completed")
if __name__ == "__main__":
asyncio.run(main())
|