|
|
|
|
|
""" |
|
|
Enhanced Advanced Tokenizer System |
|
|
================================== |
|
|
Real implementation with actual dependencies and working tokenization. |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
import hashlib |
|
|
import asyncio |
|
|
import numpy as np |
|
|
import logging |
|
|
from typing import List, Dict, Any, Optional, Union, Tuple |
|
|
from dataclasses import dataclass, field |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import warnings |
|
|
|
|
|
|
|
|
try: |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
TORCH_AVAILABLE = True |
|
|
print("โ
PyTorch available") |
|
|
except ImportError: |
|
|
TORCH_AVAILABLE = False |
|
|
print("โ ๏ธ PyTorch not available - install with: pip install torch") |
|
|
|
|
|
try: |
|
|
import transformers |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
TRANSFORMERS_AVAILABLE = True |
|
|
print("โ
Transformers available") |
|
|
except ImportError: |
|
|
TRANSFORMERS_AVAILABLE = False |
|
|
print("โ ๏ธ Transformers not available - install with: pip install transformers") |
|
|
|
|
|
try: |
|
|
import sentence_transformers |
|
|
from sentence_transformers import SentenceTransformer |
|
|
SENTENCE_TRANSFORMERS_AVAILABLE = True |
|
|
print("โ
Sentence Transformers available") |
|
|
except ImportError: |
|
|
SENTENCE_TRANSFORMERS_AVAILABLE = False |
|
|
print("โ ๏ธ Sentence Transformers not available - install with: pip install sentence-transformers") |
|
|
|
|
|
try: |
|
|
import spacy |
|
|
SPACY_AVAILABLE = True |
|
|
print("โ
spaCy available") |
|
|
except ImportError: |
|
|
SPACY_AVAILABLE = False |
|
|
print("โ ๏ธ spaCy not available - install with: pip install spacy") |
|
|
|
|
|
try: |
|
|
import sklearn |
|
|
from sklearn.cluster import KMeans |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
SKLEARN_AVAILABLE = True |
|
|
print("โ
scikit-learn available") |
|
|
except ImportError: |
|
|
SKLEARN_AVAILABLE = False |
|
|
print("โ ๏ธ scikit-learn not available - install with: pip install scikit-learn") |
|
|
|
|
|
try: |
|
|
import sympy as sp |
|
|
SYMPY_AVAILABLE = True |
|
|
print("โ
SymPy available") |
|
|
except ImportError: |
|
|
SYMPY_AVAILABLE = False |
|
|
print("โ ๏ธ SymPy not available - install with: pip install sympy") |
|
|
|
|
|
try: |
|
|
import scipy |
|
|
from scipy.spatial.distance import pdist, squareform |
|
|
SCIPY_AVAILABLE = True |
|
|
print("โ
SciPy available") |
|
|
except ImportError: |
|
|
SCIPY_AVAILABLE = False |
|
|
print("โ ๏ธ SciPy not available - install with: pip install scipy") |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class TokenizerConfig: |
|
|
"""Configuration for the enhanced tokenizer.""" |
|
|
semantic_model_name: str = "sentence-transformers/all-MiniLM-L6-v2" |
|
|
spacy_model: str = "en_core_web_sm" |
|
|
chunk_size: int = 512 |
|
|
overlap_size: int = 50 |
|
|
enable_math_processing: bool = True |
|
|
enable_semantic_embedding: bool = True |
|
|
enable_ner: bool = True |
|
|
enable_fractal_analysis: bool = True |
|
|
max_tokens: int = 1000000 |
|
|
|
|
|
@dataclass |
|
|
class TokenizationResult: |
|
|
"""Result of tokenization process.""" |
|
|
text: str |
|
|
tokens: List[str] |
|
|
token_count: int |
|
|
embeddings: Optional[np.ndarray] = None |
|
|
entities: List[Tuple[str, str]] = field(default_factory=list) |
|
|
math_expressions: List[str] = field(default_factory=list) |
|
|
semantic_features: Dict[str, Any] = field(default_factory=dict) |
|
|
fractal_features: Dict[str, Any] = field(default_factory=dict) |
|
|
processing_time: float = 0.0 |
|
|
|
|
|
class RealSemanticEmbedder: |
|
|
"""Real semantic embedder using sentence-transformers.""" |
|
|
|
|
|
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): |
|
|
self.model_name = model_name |
|
|
self.model = None |
|
|
self._initialize_model() |
|
|
|
|
|
def _initialize_model(self): |
|
|
"""Initialize the semantic model.""" |
|
|
if SENTENCE_TRANSFORMERS_AVAILABLE: |
|
|
try: |
|
|
self.model = SentenceTransformer(self.model_name) |
|
|
logger.info(f"โ
Loaded semantic model: {self.model_name}") |
|
|
except Exception as e: |
|
|
logger.error(f"โ Failed to load semantic model: {e}") |
|
|
self.model = None |
|
|
else: |
|
|
logger.warning("โ ๏ธ Sentence transformers not available") |
|
|
|
|
|
def embed_text(self, text: str) -> Optional[np.ndarray]: |
|
|
"""Generate semantic embeddings for text.""" |
|
|
if self.model is None: |
|
|
return None |
|
|
|
|
|
try: |
|
|
embedding = self.model.encode(text) |
|
|
return embedding |
|
|
except Exception as e: |
|
|
logger.error(f"โ Embedding failed: {e}") |
|
|
return None |
|
|
|
|
|
def embed_batch(self, texts: List[str]) -> List[Optional[np.ndarray]]: |
|
|
"""Generate embeddings for a batch of texts.""" |
|
|
if self.model is None: |
|
|
return [None] * len(texts) |
|
|
|
|
|
try: |
|
|
embeddings = self.model.encode(texts) |
|
|
return [emb for emb in embeddings] |
|
|
except Exception as e: |
|
|
logger.error(f"โ Batch embedding failed: {e}") |
|
|
return [None] * len(texts) |
|
|
|
|
|
class RealMathematicalEmbedder: |
|
|
"""Real mathematical embedder using SymPy and SciPy.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.sympy_available = SYMPY_AVAILABLE |
|
|
self.scipy_available = SCIPY_AVAILABLE |
|
|
|
|
|
def extract_math_expressions(self, text: str) -> List[str]: |
|
|
"""Extract mathematical expressions from text.""" |
|
|
math_patterns = [ |
|
|
r'\$\$[^$]+\$\$', |
|
|
r'\$[^$]+\$', |
|
|
r'\b\d+\.?\d*\s*[+\-*/=<>]\s*\d+\.?\d*', |
|
|
r'\b\w+\s*=\s*\d+\.?\d*', |
|
|
r'\b\w+\s*=\s*[a-zA-Z]\w*', |
|
|
r'\b\w+\s*\([^)]+\)', |
|
|
] |
|
|
|
|
|
expressions = [] |
|
|
for pattern in math_patterns: |
|
|
matches = re.findall(pattern, text) |
|
|
expressions.extend(matches) |
|
|
|
|
|
return list(set(expressions)) |
|
|
|
|
|
def analyze_math_expression(self, expression: str) -> Dict[str, Any]: |
|
|
"""Analyze a mathematical expression.""" |
|
|
if not self.sympy_available: |
|
|
return {"error": "SymPy not available"} |
|
|
|
|
|
try: |
|
|
|
|
|
clean_expr = expression.replace('$', '').strip() |
|
|
|
|
|
|
|
|
parsed = sp.sympify(clean_expr) |
|
|
|
|
|
analysis = { |
|
|
"expression": clean_expr, |
|
|
"parsed": str(parsed), |
|
|
"variables": list(parsed.free_symbols), |
|
|
"complexity": len(str(parsed)), |
|
|
"is_equation": '=' in clean_expr, |
|
|
"has_functions": any(func in clean_expr for func in ['sin', 'cos', 'tan', 'log', 'exp', 'sqrt']), |
|
|
} |
|
|
|
|
|
return analysis |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e), "expression": expression} |
|
|
|
|
|
def create_math_embedding(self, expression: str) -> np.ndarray: |
|
|
"""Create a mathematical embedding.""" |
|
|
analysis = self.analyze_math_expression(expression) |
|
|
|
|
|
|
|
|
features = [ |
|
|
len(expression), |
|
|
len(analysis.get("variables", [])), |
|
|
analysis.get("complexity", 0), |
|
|
1 if analysis.get("is_equation", False) else 0, |
|
|
1 if analysis.get("has_functions", False) else 0, |
|
|
] |
|
|
|
|
|
|
|
|
embedding = np.zeros(128) |
|
|
embedding[:len(features)] = features |
|
|
|
|
|
return embedding |
|
|
|
|
|
class RealFractalEmbedder: |
|
|
"""Real fractal embedder using mathematical fractals.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.np_available = True |
|
|
|
|
|
def generate_fractal_features(self, text: str) -> Dict[str, Any]: |
|
|
"""Generate fractal-based features from text.""" |
|
|
|
|
|
text_bytes = text.encode('utf-8') |
|
|
text_array = np.frombuffer(text_bytes, dtype=np.uint8) |
|
|
|
|
|
|
|
|
target_length = 256 |
|
|
if len(text_array) < target_length: |
|
|
text_array = np.pad(text_array, (0, target_length - len(text_array))) |
|
|
else: |
|
|
text_array = text_array[:target_length] |
|
|
|
|
|
|
|
|
fractal_features = { |
|
|
"mandelbrot_complexity": self._calculate_mandelbrot_complexity(text_array), |
|
|
"julia_pattern": self._calculate_julia_pattern(text_array), |
|
|
"self_similarity": self._calculate_self_similarity(text_array), |
|
|
"recursive_depth": self._calculate_recursive_depth(text_array), |
|
|
"chaos_measure": self._calculate_chaos_measure(text_array), |
|
|
} |
|
|
|
|
|
return fractal_features |
|
|
|
|
|
def _calculate_mandelbrot_complexity(self, data: np.ndarray) -> float: |
|
|
"""Calculate Mandelbrot-like complexity.""" |
|
|
|
|
|
return float(np.var(data)) |
|
|
|
|
|
def _calculate_julia_pattern(self, data: np.ndarray) -> float: |
|
|
"""Calculate Julia set-like pattern.""" |
|
|
|
|
|
unique, counts = np.unique(data, return_counts=True) |
|
|
return float(np.std(counts)) |
|
|
|
|
|
def _calculate_self_similarity(self, data: np.ndarray) -> float: |
|
|
"""Calculate self-similarity measure.""" |
|
|
|
|
|
mid = len(data) // 2 |
|
|
first_half = data[:mid] |
|
|
second_half = data[mid:mid*2] |
|
|
|
|
|
if len(first_half) == len(second_half): |
|
|
return float(np.corrcoef(first_half, second_half)[0, 1]) |
|
|
return 0.0 |
|
|
|
|
|
def _calculate_recursive_depth(self, data: np.ndarray) -> float: |
|
|
"""Calculate recursive depth measure.""" |
|
|
|
|
|
return float(len(np.where(np.diff(data) == 0)[0])) |
|
|
|
|
|
def _calculate_chaos_measure(self, data: np.ndarray) -> float: |
|
|
"""Calculate chaos/entropy measure.""" |
|
|
|
|
|
unique, counts = np.unique(data, return_counts=True) |
|
|
probabilities = counts / len(data) |
|
|
entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10)) |
|
|
return float(entropy) |
|
|
|
|
|
class RealNERProcessor: |
|
|
"""Real Named Entity Recognition processor.""" |
|
|
|
|
|
def __init__(self, model_name: str = "en_core_web_sm"): |
|
|
self.model_name = model_name |
|
|
self.nlp = None |
|
|
self._initialize_model() |
|
|
|
|
|
def _initialize_model(self): |
|
|
"""Initialize the NER model.""" |
|
|
if SPACY_AVAILABLE: |
|
|
try: |
|
|
self.nlp = spacy.load(self.model_name) |
|
|
logger.info(f"โ
Loaded NER model: {self.model_name}") |
|
|
except Exception as e: |
|
|
logger.error(f"โ Failed to load NER model: {e}") |
|
|
self.nlp = None |
|
|
else: |
|
|
logger.warning("โ ๏ธ spaCy not available") |
|
|
|
|
|
def extract_entities(self, text: str) -> List[Tuple[str, str]]: |
|
|
"""Extract named entities from text.""" |
|
|
if self.nlp is None: |
|
|
return [] |
|
|
|
|
|
try: |
|
|
doc = self.nlp(text) |
|
|
entities = [(ent.text, ent.label_) for ent in doc.ents] |
|
|
return entities |
|
|
except Exception as e: |
|
|
logger.error(f"โ NER failed: {e}") |
|
|
return [] |
|
|
|
|
|
def analyze_entities(self, entities: List[Tuple[str, str]]) -> Dict[str, Any]: |
|
|
"""Analyze extracted entities.""" |
|
|
if not entities: |
|
|
return {"entity_count": 0, "entity_types": {}, "most_common": None} |
|
|
|
|
|
entity_types = {} |
|
|
for text, label in entities: |
|
|
entity_types[label] = entity_types.get(label, 0) + 1 |
|
|
|
|
|
most_common_type = max(entity_types.items(), key=lambda x: x[1]) if entity_types else None |
|
|
|
|
|
return { |
|
|
"entity_count": len(entities), |
|
|
"entity_types": entity_types, |
|
|
"most_common": most_common_type, |
|
|
} |
|
|
|
|
|
class EnhancedAdvancedTokenizer: |
|
|
"""Enhanced tokenizer with real dependency integration.""" |
|
|
|
|
|
def __init__(self, config: TokenizerConfig = None): |
|
|
self.config = config or TokenizerConfig() |
|
|
|
|
|
|
|
|
self.semantic_embedder = RealSemanticEmbedder(self.config.semantic_model_name) |
|
|
self.math_embedder = RealMathematicalEmbedder() |
|
|
self.fractal_embedder = RealFractalEmbedder() |
|
|
self.ner_processor = RealNERProcessor(self.config.spacy_model) |
|
|
|
|
|
|
|
|
self.transformers_tokenizer = None |
|
|
if TRANSFORMERS_AVAILABLE: |
|
|
try: |
|
|
self.transformers_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") |
|
|
logger.info("โ
Loaded BERT tokenizer") |
|
|
except Exception as e: |
|
|
logger.warning(f"โ ๏ธ Failed to load BERT tokenizer: {e}") |
|
|
|
|
|
logger.info("๐ Enhanced Advanced Tokenizer initialized") |
|
|
|
|
|
def detect_content_type(self, text: str) -> str: |
|
|
"""Detect the type of content.""" |
|
|
|
|
|
math_patterns = [ |
|
|
r'\$\$[^$]+\$\$', |
|
|
r'\$[^$]+\$', |
|
|
r'\b\d+\.?\d*\s*[+\-*/=]\s*\d+\.?\d*', |
|
|
] |
|
|
|
|
|
math_score = sum(len(re.findall(pattern, text)) for pattern in math_patterns) |
|
|
|
|
|
|
|
|
code_keywords = ['def ', 'class ', 'import ', 'from ', 'if __name__', 'function', 'var ', 'const '] |
|
|
code_score = sum(1 for keyword in code_keywords if keyword in text) |
|
|
|
|
|
|
|
|
words = text.split() |
|
|
avg_word_length = sum(len(word) for word in words) / len(words) if words else 0 |
|
|
|
|
|
if math_score > len(words) * 0.1: |
|
|
return "mathematical" |
|
|
elif code_score > 0: |
|
|
return "code" |
|
|
elif avg_word_length > 4: |
|
|
return "academic" |
|
|
else: |
|
|
return "natural" |
|
|
|
|
|
async def tokenize(self, text: str) -> TokenizationResult: |
|
|
"""Main tokenization method.""" |
|
|
start_time = datetime.now() |
|
|
|
|
|
|
|
|
tokens = text.split() |
|
|
|
|
|
|
|
|
content_type = self.detect_content_type(text) |
|
|
|
|
|
|
|
|
result = TokenizationResult( |
|
|
text=text, |
|
|
tokens=tokens, |
|
|
token_count=len(tokens), |
|
|
) |
|
|
|
|
|
|
|
|
if self.config.enable_semantic_embedding: |
|
|
result.embeddings = self.semantic_embedder.embed_text(text) |
|
|
|
|
|
|
|
|
if self.config.enable_ner: |
|
|
result.entities = self.ner_processor.extract_entities(text) |
|
|
entity_analysis = self.ner_processor.analyze_entities(result.entities) |
|
|
result.semantic_features.update(entity_analysis) |
|
|
|
|
|
|
|
|
if self.config.enable_math_processing: |
|
|
math_expressions = self.math_embedder.extract_math_expressions(text) |
|
|
result.math_expressions = math_expressions |
|
|
|
|
|
if math_expressions: |
|
|
math_analysis = [] |
|
|
for expr in math_expressions: |
|
|
analysis = self.math_embedder.analyze_math_expression(expr) |
|
|
math_analysis.append(analysis) |
|
|
|
|
|
result.semantic_features["math_expressions"] = math_analysis |
|
|
result.semantic_features["math_count"] = len(math_expressions) |
|
|
|
|
|
|
|
|
if self.config.enable_fractal_analysis: |
|
|
result.fractal_features = self.fractal_embedder.generate_fractal_features(text) |
|
|
|
|
|
|
|
|
result.semantic_features["content_type"] = content_type |
|
|
result.semantic_features["text_length"] = len(text) |
|
|
result.semantic_features["word_count"] = len(tokens) |
|
|
result.semantic_features["avg_word_length"] = sum(len(word) for word in tokens) / len(tokens) if tokens else 0 |
|
|
|
|
|
|
|
|
end_time = datetime.now() |
|
|
result.processing_time = (end_time - start_time).total_seconds() |
|
|
|
|
|
return result |
|
|
|
|
|
async def tokenize_batch(self, texts: List[str]) -> List[TokenizationResult]: |
|
|
"""Tokenize a batch of texts.""" |
|
|
results = [] |
|
|
for text in texts: |
|
|
result = await self.tokenize(text) |
|
|
results.append(result) |
|
|
return results |
|
|
|
|
|
class EnhancedBatchProcessor: |
|
|
"""Enhanced batch processor with real implementations.""" |
|
|
|
|
|
def __init__(self, config: TokenizerConfig = None): |
|
|
self.config = config or TokenizerConfig() |
|
|
self.tokenizer = EnhancedAdvancedTokenizer(config) |
|
|
self.results = [] |
|
|
|
|
|
async def process_batch(self, texts: List[str]) -> List[TokenizationResult]: |
|
|
"""Process a batch of texts.""" |
|
|
logger.info(f"๐ Processing batch of {len(texts)} texts") |
|
|
|
|
|
results = await self.tokenizer.tokenize_batch(texts) |
|
|
|
|
|
|
|
|
total_tokens = sum(result.token_count for result in results) |
|
|
avg_processing_time = sum(result.processing_time for result in results) / len(results) |
|
|
|
|
|
logger.info(f"โ
Batch complete: {total_tokens} total tokens, {avg_processing_time:.3f}s avg time") |
|
|
|
|
|
return results |
|
|
|
|
|
def save_results(self, results: List[TokenizationResult], filename: str): |
|
|
"""Save results to file.""" |
|
|
data = [] |
|
|
for result in results: |
|
|
data.append({ |
|
|
"text": result.text, |
|
|
"token_count": result.token_count, |
|
|
"content_type": result.semantic_features.get("content_type", "unknown"), |
|
|
"entities": result.entities, |
|
|
"math_expressions": result.math_expressions, |
|
|
"processing_time": result.processing_time, |
|
|
}) |
|
|
|
|
|
with open(filename, 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
logger.info(f"๐พ Results saved to {filename}") |
|
|
|
|
|
def main(): |
|
|
"""Demo enhanced system.""" |
|
|
print("๐ Enhanced Advanced Tokenizer System") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
processor = EnhancedBatchProcessor() |
|
|
|
|
|
test_texts = [ |
|
|
"Hello world! This is a test of the enhanced tokenizer system.", |
|
|
"The equation $x^2 + y^2 = z^2$ is the Pythagorean theorem.", |
|
|
"Machine learning uses gradient descent optimization: $\\theta_{new} = \\theta_{old} - \\alpha \\nabla J(\\theta)$", |
|
|
"def hello_world():\n print('Hello, world!')\n return 42", |
|
|
"The quick brown fox jumps over the lazy dog. This is a pangram.", |
|
|
] |
|
|
|
|
|
async def run_demo(): |
|
|
print(f"๐งช Testing with {len(test_texts)} sample texts...") |
|
|
|
|
|
results = await processor.process_batch(test_texts) |
|
|
|
|
|
print("\n๐ Results Summary:") |
|
|
print("-" * 40) |
|
|
|
|
|
for i, result in enumerate(results): |
|
|
print(f"\nText {i+1}:") |
|
|
print(f" ๐ Type: {result.semantic_features.get('content_type', 'unknown')}") |
|
|
print(f" ๐ข Tokens: {result.token_count}") |
|
|
print(f" ๐ท๏ธ Entities: {len(result.entities)}") |
|
|
print(f" ๐งฎ Math expressions: {len(result.math_expressions)}") |
|
|
print(f" โฑ๏ธ Processing time: {result.processing_time:.3f}s") |
|
|
|
|
|
if result.entities: |
|
|
print(f" ๐ Entity types: {[ent[1] for ent in result.entities[:3]]}") |
|
|
|
|
|
if result.fractal_features: |
|
|
print(f" ๐ Fractal complexity: {result.fractal_features.get('mandelbrot_complexity', 0):.2f}") |
|
|
|
|
|
|
|
|
processor.save_results(results, "enhanced_tokenizer_results.json") |
|
|
|
|
|
print(f"\nโ
Enhanced system demo complete!") |
|
|
print(f"๐ Results saved to: enhanced_tokenizer_results.json") |
|
|
|
|
|
asyncio.run(run_demo()) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|