|
|
|
|
|
""" |
|
|
Minimal Enhanced Advanced Tokenizer |
|
|
================================== |
|
|
Working version with fallbacks for missing dependencies. |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
import asyncio |
|
|
import numpy as np |
|
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
from dataclasses import dataclass, field |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
TORCH_AVAILABLE = False |
|
|
TRANSFORMERS_AVAILABLE = False |
|
|
SENTENCE_TRANSFORMERS_AVAILABLE = False |
|
|
SPACY_AVAILABLE = False |
|
|
SKLEARN_AVAILABLE = False |
|
|
SYMPY_AVAILABLE = False |
|
|
SCIPY_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
import torch |
|
|
TORCH_AVAILABLE = True |
|
|
print("✅ PyTorch available") |
|
|
except ImportError: |
|
|
print("⚠️ PyTorch not available") |
|
|
|
|
|
try: |
|
|
import transformers |
|
|
TRANSFORMERS_AVAILABLE = True |
|
|
print("✅ Transformers available") |
|
|
except ImportError: |
|
|
print("⚠️ Transformers not available") |
|
|
|
|
|
try: |
|
|
import sentence_transformers |
|
|
SENTENCE_TRANSFORMERS_AVAILABLE = True |
|
|
print("✅ Sentence Transformers available") |
|
|
except ImportError: |
|
|
print("⚠️ Sentence Transformers not available") |
|
|
|
|
|
try: |
|
|
import spacy |
|
|
SPACY_AVAILABLE = True |
|
|
print("✅ spaCy available") |
|
|
except ImportError: |
|
|
print("⚠️ spaCy not available") |
|
|
|
|
|
try: |
|
|
import sklearn |
|
|
SKLEARN_AVAILABLE = True |
|
|
print("✅ scikit-learn available") |
|
|
except ImportError: |
|
|
print("⚠️ scikit-learn not available") |
|
|
|
|
|
try: |
|
|
import sympy |
|
|
SYMPY_AVAILABLE = True |
|
|
print("✅ SymPy available") |
|
|
except ImportError: |
|
|
print("⚠️ SymPy not available") |
|
|
|
|
|
try: |
|
|
import scipy |
|
|
SCIPY_AVAILABLE = True |
|
|
print("✅ SciPy available") |
|
|
except ImportError: |
|
|
print("⚠️ SciPy not available") |
|
|
|
|
|
@dataclass |
|
|
class TokenizationResult: |
|
|
"""Result of tokenization process.""" |
|
|
text: str |
|
|
tokens: List[str] |
|
|
token_count: int |
|
|
embeddings: Optional[np.ndarray] = None |
|
|
entities: List[Tuple[str, str]] = field(default_factory=list) |
|
|
math_expressions: List[str] = field(default_factory=list) |
|
|
semantic_features: Dict[str, Any] = field(default_factory=dict) |
|
|
fractal_features: Dict[str, Any] = field(default_factory=dict) |
|
|
processing_time: float = 0.0 |
|
|
|
|
|
class MinimalSemanticEmbedder: |
|
|
"""Minimal semantic embedder with fallbacks.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.model = None |
|
|
if SENTENCE_TRANSFORMERS_AVAILABLE: |
|
|
try: |
|
|
from sentence_transformers import SentenceTransformer |
|
|
self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") |
|
|
print("✅ Loaded semantic model") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Semantic model failed: {e}") |
|
|
|
|
|
def embed_text(self, text: str) -> Optional[np.ndarray]: |
|
|
"""Generate semantic embeddings for text.""" |
|
|
if self.model is None: |
|
|
|
|
|
text_bytes = text.encode('utf-8') |
|
|
hash_val = hash(text_bytes) |
|
|
|
|
|
embedding = np.zeros(384) |
|
|
for i in range(384): |
|
|
embedding[i] = (hash_val + i) % 1000 / 1000.0 |
|
|
return embedding |
|
|
|
|
|
try: |
|
|
embedding = self.model.encode(text) |
|
|
return embedding |
|
|
except Exception as e: |
|
|
print(f"⚠️ Embedding failed: {e}") |
|
|
return None |
|
|
|
|
|
class MinimalMathematicalEmbedder: |
|
|
"""Minimal mathematical embedder.""" |
|
|
|
|
|
def extract_math_expressions(self, text: str) -> List[str]: |
|
|
"""Extract mathematical expressions from text.""" |
|
|
math_patterns = [ |
|
|
r'\$\$[^$]+\$\$', |
|
|
r'\$[^$]+\$', |
|
|
r'\b\d+\.?\d*\s*[+\-*/=<>]\s*\d+\.?\d*', |
|
|
r'\b\w+\s*=\s*\d+\.?\d*', |
|
|
] |
|
|
|
|
|
expressions = [] |
|
|
for pattern in math_patterns: |
|
|
matches = re.findall(pattern, text) |
|
|
expressions.extend(matches) |
|
|
|
|
|
return list(set(expressions)) |
|
|
|
|
|
def analyze_math_expression(self, expression: str) -> Dict[str, Any]: |
|
|
"""Analyze a mathematical expression.""" |
|
|
try: |
|
|
clean_expr = expression.replace('$', '').strip() |
|
|
|
|
|
analysis = { |
|
|
"expression": clean_expr, |
|
|
"length": len(clean_expr), |
|
|
"has_equals": '=' in clean_expr, |
|
|
"has_operators": any(op in clean_expr for op in ['+', '-', '*', '/']), |
|
|
"has_variables": any(c.isalpha() for c in clean_expr), |
|
|
} |
|
|
|
|
|
return analysis |
|
|
|
|
|
except Exception as e: |
|
|
return {"error": str(e), "expression": expression} |
|
|
|
|
|
class MinimalNERProcessor: |
|
|
"""Minimal NER processor with fallbacks.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.nlp = None |
|
|
if SPACY_AVAILABLE: |
|
|
try: |
|
|
import spacy |
|
|
self.nlp = spacy.load("en_core_web_sm") |
|
|
print("✅ Loaded NER model") |
|
|
except Exception as e: |
|
|
print(f"⚠️ NER model failed: {e}") |
|
|
|
|
|
def extract_entities(self, text: str) -> List[Tuple[str, str]]: |
|
|
"""Extract named entities from text.""" |
|
|
if self.nlp is None: |
|
|
|
|
|
entities = [] |
|
|
|
|
|
|
|
|
patterns = { |
|
|
'PERSON': r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', |
|
|
'ORG': r'\b[A-Z][A-Z]+\b', |
|
|
'DATE': r'\b\d{1,2}/\d{1,2}/\d{2,4}\b', |
|
|
'TIME': r'\b\d{1,2}:\d{2}\b', |
|
|
} |
|
|
|
|
|
for label, pattern in patterns.items(): |
|
|
matches = re.findall(pattern, text) |
|
|
for match in matches: |
|
|
entities.append((match, label)) |
|
|
|
|
|
return entities |
|
|
|
|
|
try: |
|
|
doc = self.nlp(text) |
|
|
entities = [(ent.text, ent.label_) for ent in doc.ents] |
|
|
return entities |
|
|
except Exception as e: |
|
|
print(f"⚠️ NER failed: {e}") |
|
|
return [] |
|
|
|
|
|
class MinimalFractalEmbedder: |
|
|
"""Minimal fractal embedder.""" |
|
|
|
|
|
def generate_fractal_features(self, text: str) -> Dict[str, Any]: |
|
|
"""Generate fractal-based features from text.""" |
|
|
|
|
|
text_bytes = text.encode('utf-8') |
|
|
text_array = np.frombuffer(text_bytes, dtype=np.uint8) |
|
|
|
|
|
|
|
|
target_length = 256 |
|
|
if len(text_array) < target_length: |
|
|
text_array = np.pad(text_array, (0, target_length - len(text_array))) |
|
|
else: |
|
|
text_array = text_array[:target_length] |
|
|
|
|
|
|
|
|
fractal_features = { |
|
|
"variance": float(np.var(text_array)), |
|
|
"mean": float(np.mean(text_array)), |
|
|
"std": float(np.std(text_array)), |
|
|
"entropy": self._calculate_entropy(text_array), |
|
|
"self_similarity": self._calculate_self_similarity(text_array), |
|
|
} |
|
|
|
|
|
return fractal_features |
|
|
|
|
|
def _calculate_entropy(self, data: np.ndarray) -> float: |
|
|
"""Calculate Shannon entropy.""" |
|
|
unique, counts = np.unique(data, return_counts=True) |
|
|
probabilities = counts / len(data) |
|
|
entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10)) |
|
|
return float(entropy) |
|
|
|
|
|
def _calculate_self_similarity(self, data: np.ndarray) -> float: |
|
|
"""Calculate self-similarity measure.""" |
|
|
mid = len(data) // 2 |
|
|
first_half = data[:mid] |
|
|
second_half = data[mid:mid*2] |
|
|
|
|
|
if len(first_half) == len(second_half) and len(first_half) > 0: |
|
|
return float(np.corrcoef(first_half, second_half)[0, 1]) |
|
|
return 0.0 |
|
|
|
|
|
class MinimalEnhancedTokenizer: |
|
|
"""Minimal enhanced tokenizer with fallbacks.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.semantic_embedder = MinimalSemanticEmbedder() |
|
|
self.math_embedder = MinimalMathematicalEmbedder() |
|
|
self.fractal_embedder = MinimalFractalEmbedder() |
|
|
self.ner_processor = MinimalNERProcessor() |
|
|
|
|
|
print("🚀 Minimal Enhanced Tokenizer initialized") |
|
|
|
|
|
def detect_content_type(self, text: str) -> str: |
|
|
"""Detect the type of content.""" |
|
|
|
|
|
math_patterns = [ |
|
|
r'\$\$[^$]+\$\$', |
|
|
r'\$[^$]+\$', |
|
|
r'\b\d+\.?\d*\s*[+\-*/=]\s*\d+\.?\d*', |
|
|
] |
|
|
|
|
|
math_score = sum(len(re.findall(pattern, text)) for pattern in math_patterns) |
|
|
|
|
|
|
|
|
code_keywords = ['def ', 'class ', 'import ', 'from ', 'if __name__', 'function', 'var ', 'const '] |
|
|
code_score = sum(1 for keyword in code_keywords if keyword in text) |
|
|
|
|
|
|
|
|
words = text.split() |
|
|
avg_word_length = sum(len(word) for word in words) / len(words) if words else 0 |
|
|
|
|
|
if math_score > len(words) * 0.1: |
|
|
return "mathematical" |
|
|
elif code_score > 0: |
|
|
return "code" |
|
|
elif avg_word_length > 4: |
|
|
return "academic" |
|
|
else: |
|
|
return "natural" |
|
|
|
|
|
async def tokenize(self, text: str) -> TokenizationResult: |
|
|
"""Main tokenization method.""" |
|
|
start_time = datetime.now() |
|
|
|
|
|
|
|
|
tokens = text.split() |
|
|
|
|
|
|
|
|
content_type = self.detect_content_type(text) |
|
|
|
|
|
|
|
|
result = TokenizationResult( |
|
|
text=text, |
|
|
tokens=tokens, |
|
|
token_count=len(tokens), |
|
|
) |
|
|
|
|
|
|
|
|
result.embeddings = self.semantic_embedder.embed_text(text) |
|
|
|
|
|
|
|
|
result.entities = self.ner_processor.extract_entities(text) |
|
|
|
|
|
|
|
|
math_expressions = self.math_embedder.extract_math_expressions(text) |
|
|
result.math_expressions = math_expressions |
|
|
|
|
|
if math_expressions: |
|
|
math_analysis = [] |
|
|
for expr in math_expressions: |
|
|
analysis = self.math_embedder.analyze_math_expression(expr) |
|
|
math_analysis.append(analysis) |
|
|
|
|
|
result.semantic_features["math_expressions"] = math_analysis |
|
|
result.semantic_features["math_count"] = len(math_expressions) |
|
|
|
|
|
|
|
|
result.fractal_features = self.fractal_embedder.generate_fractal_features(text) |
|
|
|
|
|
|
|
|
result.semantic_features["content_type"] = content_type |
|
|
result.semantic_features["text_length"] = len(text) |
|
|
result.semantic_features["word_count"] = len(tokens) |
|
|
result.semantic_features["avg_word_length"] = sum(len(word) for word in tokens) / len(tokens) if tokens else 0 |
|
|
result.semantic_features["entity_count"] = len(result.entities) |
|
|
|
|
|
|
|
|
end_time = datetime.now() |
|
|
result.processing_time = (end_time - start_time).total_seconds() |
|
|
|
|
|
return result |
|
|
|
|
|
def main(): |
|
|
"""Demo minimal enhanced system.""" |
|
|
print("🚀 Minimal Enhanced Advanced Tokenizer System") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
tokenizer = MinimalEnhancedTokenizer() |
|
|
|
|
|
test_texts = [ |
|
|
"Hello world! This is a test of the minimal enhanced tokenizer system.", |
|
|
"The equation $x^2 + y^2 = z^2$ is the Pythagorean theorem.", |
|
|
"Machine learning uses gradient descent optimization: $\\theta_{new} = \\theta_{old} - \\alpha \\nabla J(\\theta)$", |
|
|
"def hello_world():\n print('Hello, world!')\n return 42", |
|
|
"The quick brown fox jumps over the lazy dog. This is a pangram.", |
|
|
] |
|
|
|
|
|
async def run_demo(): |
|
|
print(f"🧪 Testing with {len(test_texts)} sample texts...") |
|
|
|
|
|
results = [] |
|
|
for text in test_texts: |
|
|
result = await tokenizer.tokenize(text) |
|
|
results.append(result) |
|
|
|
|
|
print("\n📊 Results Summary:") |
|
|
print("-" * 40) |
|
|
|
|
|
for i, result in enumerate(results): |
|
|
print(f"\nText {i+1}:") |
|
|
print(f" 📝 Type: {result.semantic_features.get('content_type', 'unknown')}") |
|
|
print(f" 🔢 Tokens: {result.token_count}") |
|
|
print(f" 🏷️ Entities: {len(result.entities)}") |
|
|
print(f" 🧮 Math expressions: {len(result.math_expressions)}") |
|
|
print(f" ⏱️ Processing time: {result.processing_time:.3f}s") |
|
|
|
|
|
if result.entities: |
|
|
print(f" 📍 Entity types: {[ent[1] for ent in result.entities[:3]]}") |
|
|
|
|
|
if result.fractal_features: |
|
|
print(f" 🌀 Fractal variance: {result.fractal_features.get('variance', 0):.2f}") |
|
|
|
|
|
|
|
|
data = [] |
|
|
for result in results: |
|
|
data.append({ |
|
|
"text": result.text, |
|
|
"token_count": result.token_count, |
|
|
"content_type": result.semantic_features.get("content_type", "unknown"), |
|
|
"entities": result.entities, |
|
|
"math_expressions": result.math_expressions, |
|
|
"processing_time": result.processing_time, |
|
|
"fractal_features": result.fractal_features, |
|
|
}) |
|
|
|
|
|
with open("minimal_enhanced_results.json", 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"\n✅ Minimal enhanced system demo complete!") |
|
|
print(f"📁 Results saved to: minimal_enhanced_results.json") |
|
|
|
|
|
asyncio.run(run_demo()) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|