|
|
|
|
|
""" |
|
|
Intelligent Chunking Processor |
|
|
============================== |
|
|
Advanced chunking system with semantic awareness and context preservation. |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
import hashlib |
|
|
import numpy as np |
|
|
from typing import List, Dict, Any, Optional, Tuple, Generator |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime |
|
|
import spacy |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import networkx as nx |
|
|
from sklearn.cluster import KMeans |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
@dataclass |
|
|
class ChunkMetadata: |
|
|
"""Metadata for a text chunk.""" |
|
|
chunk_id: str |
|
|
content_type: str |
|
|
semantic_topic: str |
|
|
importance_score: float |
|
|
context_connections: List[str] |
|
|
language: str |
|
|
readability_score: float |
|
|
entity_count: int |
|
|
sentiment_score: float |
|
|
|
|
|
@dataclass |
|
|
class IntelligentChunk: |
|
|
"""Intelligent chunk with semantic metadata.""" |
|
|
chunk_id: str |
|
|
content: str |
|
|
chunk_index: int |
|
|
total_chunks: int |
|
|
file_hash: str |
|
|
metadata: ChunkMetadata |
|
|
semantic_embedding: Optional[np.ndarray] = None |
|
|
timestamp: str = "" |
|
|
|
|
|
class IntelligentChunkingProcessor: |
|
|
"""Advanced chunking processor with semantic awareness.""" |
|
|
|
|
|
def __init__(self, |
|
|
max_chunk_size: int = 1000000, |
|
|
overlap_size: int = 1000, |
|
|
semantic_model: str = "all-MiniLM-L6-v2", |
|
|
language_model: str = "en_core_web_sm"): |
|
|
|
|
|
self.max_chunk_size = max_chunk_size |
|
|
self.overlap_size = overlap_size |
|
|
|
|
|
|
|
|
self.semantic_model = None |
|
|
self.nlp = None |
|
|
self._load_models(semantic_model, language_model) |
|
|
|
|
|
|
|
|
self.content_patterns = { |
|
|
'code': [ |
|
|
r'```[\s\S]*?```', |
|
|
r'`[^`]+`', |
|
|
r'def\s+\w+\s*\(', |
|
|
r'class\s+\w+', |
|
|
r'function\s+\w+\s*\(', |
|
|
r'#include\s*<', |
|
|
], |
|
|
'mathematical': [ |
|
|
r'\$[\s\S]*?\$', |
|
|
r'\\[a-zA-Z]+\{[^}]*\}', |
|
|
r'\b\d+\s*[+\-*/=]\s*\d+', |
|
|
r'\\frac\{[^}]+\}\{[^}]+\}', |
|
|
], |
|
|
'structured_data': [ |
|
|
r'\{[\s\S]*?\}', |
|
|
r'\[[\s\S]*?\]', |
|
|
r'<[^>]+>', |
|
|
r'^\s*[a-zA-Z_][a-zA-Z0-9_]*\s*:', |
|
|
], |
|
|
'natural_language': [ |
|
|
r'[.!?]+\s+[A-Z]', |
|
|
r'\n\n+', |
|
|
] |
|
|
} |
|
|
|
|
|
def _load_models(self, semantic_model: str, language_model: str): |
|
|
"""Load NLP models.""" |
|
|
try: |
|
|
|
|
|
self.semantic_model = SentenceTransformer(semantic_model) |
|
|
print(f"โ
Loaded semantic model: {semantic_model}") |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Semantic model loading failed: {e}") |
|
|
self.semantic_model = None |
|
|
|
|
|
try: |
|
|
|
|
|
self.nlp = spacy.load(language_model) |
|
|
print(f"โ
Loaded language model: {language_model}") |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Language model loading failed: {e}") |
|
|
self.nlp = None |
|
|
|
|
|
def detect_content_type(self, content: str) -> str: |
|
|
"""Detect the primary content type of the text.""" |
|
|
content = content.strip() |
|
|
|
|
|
|
|
|
code_matches = 0 |
|
|
for pattern in self.content_patterns['code']: |
|
|
code_matches += len(re.findall(pattern, content, re.MULTILINE)) |
|
|
|
|
|
if code_matches > 0: |
|
|
return 'code' |
|
|
|
|
|
|
|
|
math_matches = 0 |
|
|
for pattern in self.content_patterns['mathematical']: |
|
|
math_matches += len(re.findall(pattern, content)) |
|
|
|
|
|
if math_matches > 0: |
|
|
return 'mathematical' |
|
|
|
|
|
|
|
|
structured_matches = 0 |
|
|
for pattern in self.content_patterns['structured_data']: |
|
|
structured_matches += len(re.findall(pattern, content)) |
|
|
|
|
|
if structured_matches > len(content) / 100: |
|
|
return 'structured_data' |
|
|
|
|
|
|
|
|
return 'natural_language' |
|
|
|
|
|
def extract_semantic_topics(self, content: str) -> List[str]: |
|
|
"""Extract semantic topics from content.""" |
|
|
if not self.nlp: |
|
|
return ['general'] |
|
|
|
|
|
try: |
|
|
doc = self.nlp(content) |
|
|
|
|
|
|
|
|
topics = [] |
|
|
|
|
|
|
|
|
for ent in doc.ents: |
|
|
if ent.label_ in ['PERSON', 'ORG', 'GPE', 'EVENT', 'WORK_OF_ART', 'LAW']: |
|
|
topics.append(ent.text.lower()) |
|
|
|
|
|
|
|
|
for chunk in doc.noun_chunks: |
|
|
if len(chunk.text.split()) >= 2: |
|
|
topics.append(chunk.text.lower()) |
|
|
|
|
|
|
|
|
topics = list(set(topics))[:10] |
|
|
|
|
|
return topics if topics else ['general'] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Topic extraction failed: {e}") |
|
|
return ['general'] |
|
|
|
|
|
def calculate_importance_score(self, content: str, content_type: str) -> float: |
|
|
"""Calculate importance score for content.""" |
|
|
score = 0.5 |
|
|
|
|
|
|
|
|
length_score = min(len(content) / 1000, 1.0) * 0.2 |
|
|
score += length_score |
|
|
|
|
|
|
|
|
type_scores = { |
|
|
'code': 0.3, |
|
|
'mathematical': 0.25, |
|
|
'structured_data': 0.2, |
|
|
'natural_language': 0.1 |
|
|
} |
|
|
score += type_scores.get(content_type, 0.1) |
|
|
|
|
|
|
|
|
important_keywords = [ |
|
|
'important', 'critical', 'essential', 'key', 'main', 'primary', |
|
|
'function', 'class', 'method', 'algorithm', 'definition', 'theorem', |
|
|
'conclusion', 'summary', 'abstract', 'introduction' |
|
|
] |
|
|
|
|
|
keyword_count = sum(1 for keyword in important_keywords if keyword.lower() in content.lower()) |
|
|
keyword_score = min(keyword_count / 10, 0.3) |
|
|
score += keyword_score |
|
|
|
|
|
return min(score, 1.0) |
|
|
|
|
|
def calculate_readability_score(self, content: str) -> float: |
|
|
"""Calculate readability score (simplified Flesch score).""" |
|
|
if not self.nlp: |
|
|
return 0.5 |
|
|
|
|
|
try: |
|
|
doc = self.nlp(content) |
|
|
|
|
|
sentences = [sent for sent in doc.sents] |
|
|
words = [token for token in doc if not token.is_punct and not token.is_space] |
|
|
|
|
|
if not sentences or not words: |
|
|
return 0.5 |
|
|
|
|
|
avg_sentence_length = len(words) / len(sentences) |
|
|
avg_syllables_per_word = sum(self._count_syllables(word.text) for word in words) / len(words) |
|
|
|
|
|
|
|
|
score = 206.835 - (1.015 * avg_sentence_length) - (84.6 * avg_syllables_per_word) |
|
|
|
|
|
|
|
|
return max(0, min(1, score / 100)) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Readability calculation failed: {e}") |
|
|
return 0.5 |
|
|
|
|
|
def _count_syllables(self, word: str) -> int: |
|
|
"""Count syllables in a word (simplified).""" |
|
|
word = word.lower() |
|
|
vowels = 'aeiouy' |
|
|
syllable_count = 0 |
|
|
prev_was_vowel = False |
|
|
|
|
|
for char in word: |
|
|
if char in vowels: |
|
|
if not prev_was_vowel: |
|
|
syllable_count += 1 |
|
|
prev_was_vowel = True |
|
|
else: |
|
|
prev_was_vowel = False |
|
|
|
|
|
|
|
|
if word.endswith('e') and syllable_count > 1: |
|
|
syllable_count -= 1 |
|
|
|
|
|
return max(1, syllable_count) |
|
|
|
|
|
def calculate_sentiment_score(self, content: str) -> float: |
|
|
"""Calculate sentiment score (-1 to 1).""" |
|
|
if not self.nlp: |
|
|
return 0.0 |
|
|
|
|
|
try: |
|
|
doc = self.nlp(content) |
|
|
|
|
|
|
|
|
positive_words = ['good', 'great', 'excellent', 'amazing', 'wonderful', 'fantastic', 'perfect'] |
|
|
negative_words = ['bad', 'terrible', 'awful', 'horrible', 'disappointing', 'wrong', 'error'] |
|
|
|
|
|
pos_count = sum(1 for word in doc if word.text.lower() in positive_words) |
|
|
neg_count = sum(1 for word in doc if word.text.lower() in negative_words) |
|
|
|
|
|
total_words = len([token for token in doc if token.is_alpha]) |
|
|
|
|
|
if total_words == 0: |
|
|
return 0.0 |
|
|
|
|
|
sentiment = (pos_count - neg_count) / total_words |
|
|
return max(-1, min(1, sentiment)) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Sentiment calculation failed: {e}") |
|
|
return 0.0 |
|
|
|
|
|
def extract_entities(self, content: str) -> int: |
|
|
"""Extract and count entities.""" |
|
|
if not self.nlp: |
|
|
return 0 |
|
|
|
|
|
try: |
|
|
doc = self.nlp(content) |
|
|
entities = [ent for ent in doc.ents if ent.label_ in ['PERSON', 'ORG', 'GPE', 'EVENT', 'WORK_OF_ART']] |
|
|
return len(entities) |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Entity extraction failed: {e}") |
|
|
return 0 |
|
|
|
|
|
def detect_language(self, content: str) -> str: |
|
|
"""Detect language of content.""" |
|
|
if not self.nlp: |
|
|
return 'en' |
|
|
|
|
|
try: |
|
|
doc = self.nlp(content[:1000]) |
|
|
return doc.lang_ if hasattr(doc, 'lang_') else 'en' |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Language detection failed: {e}") |
|
|
return 'en' |
|
|
|
|
|
def generate_semantic_embedding(self, content: str) -> Optional[np.ndarray]: |
|
|
"""Generate semantic embedding for content.""" |
|
|
if not self.semantic_model: |
|
|
return None |
|
|
|
|
|
try: |
|
|
embedding = self.semantic_model.encode(content) |
|
|
return embedding |
|
|
except Exception as e: |
|
|
print(f"โ ๏ธ Embedding generation failed: {e}") |
|
|
return None |
|
|
|
|
|
def find_semantic_boundaries(self, content: str, content_type: str) -> List[int]: |
|
|
"""Find optimal chunk boundaries based on content type.""" |
|
|
boundaries = [] |
|
|
|
|
|
if content_type == 'code': |
|
|
|
|
|
patterns = [ |
|
|
r'\n\s*(def\s+\w+\s*\(|class\s+\w+|function\s+\w+\s*\()', |
|
|
r'\n\s*#\s*---+\n', |
|
|
r'\n\s*//\s*---+\n', |
|
|
r'\n\n+', |
|
|
] |
|
|
elif content_type == 'natural_language': |
|
|
|
|
|
patterns = [ |
|
|
r'\n\s*#{1,6}\s+', |
|
|
r'\n\n+', |
|
|
r'[.!?]\s+\n', |
|
|
] |
|
|
elif content_type == 'structured_data': |
|
|
|
|
|
patterns = [ |
|
|
r'\n\s*\{', |
|
|
r'\n\s*\[', |
|
|
r'\n\s*<[^>]+>', |
|
|
] |
|
|
else: |
|
|
|
|
|
patterns = [r'\n\n+', r'[.!?]\s+\n'] |
|
|
|
|
|
for pattern in patterns: |
|
|
for match in re.finditer(pattern, content): |
|
|
boundaries.append(match.start()) |
|
|
|
|
|
|
|
|
boundaries = [0] + sorted(set(boundaries)) + [len(content)] |
|
|
|
|
|
return boundaries |
|
|
|
|
|
def create_intelligent_chunks(self, |
|
|
content: str, |
|
|
file_hash: str, |
|
|
chunk_overlap: int = None) -> List[IntelligentChunk]: |
|
|
"""Create intelligent chunks with semantic awareness.""" |
|
|
|
|
|
if chunk_overlap is None: |
|
|
chunk_overlap = self.overlap_size |
|
|
|
|
|
|
|
|
content_type = self.detect_content_type(content) |
|
|
|
|
|
|
|
|
if len(content) <= self.max_chunk_size: |
|
|
metadata = self._create_chunk_metadata( |
|
|
content, content_type, chunk_index=0, total_chunks=1 |
|
|
) |
|
|
|
|
|
embedding = self.generate_semantic_embedding(content) |
|
|
|
|
|
return [IntelligentChunk( |
|
|
chunk_id="chunk_0", |
|
|
content=content, |
|
|
chunk_index=0, |
|
|
total_chunks=1, |
|
|
file_hash=file_hash, |
|
|
metadata=metadata, |
|
|
semantic_embedding=embedding, |
|
|
timestamp=datetime.now().isoformat() |
|
|
)] |
|
|
|
|
|
|
|
|
boundaries = self.find_semantic_boundaries(content, content_type) |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
total_chunks = 0 |
|
|
|
|
|
|
|
|
estimated_chunks = max(1, len(content) // (self.max_chunk_size - chunk_overlap)) |
|
|
total_chunks = estimated_chunks |
|
|
|
|
|
for i in range(total_chunks): |
|
|
start_idx = i * (self.max_chunk_size - chunk_overlap) |
|
|
end_idx = min(start_idx + self.max_chunk_size, len(content)) |
|
|
|
|
|
|
|
|
if boundaries: |
|
|
|
|
|
best_boundary = end_idx |
|
|
for boundary in boundaries: |
|
|
if start_idx < boundary < end_idx: |
|
|
|
|
|
if abs(boundary - end_idx) < abs(best_boundary - end_idx): |
|
|
best_boundary = boundary |
|
|
|
|
|
end_idx = best_boundary |
|
|
|
|
|
chunk_content = content[start_idx:end_idx] |
|
|
|
|
|
|
|
|
metadata = self._create_chunk_metadata( |
|
|
chunk_content, content_type, chunk_index=i, total_chunks=total_chunks |
|
|
) |
|
|
|
|
|
|
|
|
embedding = self.generate_semantic_embedding(chunk_content) |
|
|
|
|
|
chunk = IntelligentChunk( |
|
|
chunk_id=f"chunk_{i}", |
|
|
content=chunk_content, |
|
|
chunk_index=i, |
|
|
total_chunks=total_chunks, |
|
|
file_hash=file_hash, |
|
|
metadata=metadata, |
|
|
semantic_embedding=embedding, |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
chunks.append(chunk) |
|
|
|
|
|
|
|
|
for chunk in chunks: |
|
|
chunk.total_chunks = len(chunks) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _create_chunk_metadata(self, content: str, content_type: str, chunk_index: int, total_chunks: int) -> ChunkMetadata: |
|
|
"""Create metadata for a chunk.""" |
|
|
|
|
|
|
|
|
topics = self.extract_semantic_topics(content) |
|
|
primary_topic = topics[0] if topics else 'general' |
|
|
|
|
|
|
|
|
importance_score = self.calculate_importance_score(content, content_type) |
|
|
readability_score = self.calculate_readability_score(content) |
|
|
sentiment_score = self.calculate_sentiment_score(content) |
|
|
entity_count = self.extract_entities(content) |
|
|
language = self.detect_language(content) |
|
|
|
|
|
|
|
|
context_connections = [] |
|
|
if chunk_index > 0: |
|
|
context_connections.append(f"chunk_{chunk_index-1}") |
|
|
if chunk_index < total_chunks - 1: |
|
|
context_connections.append(f"chunk_{chunk_index+1}") |
|
|
|
|
|
return ChunkMetadata( |
|
|
chunk_id=f"chunk_{chunk_index}", |
|
|
content_type=content_type, |
|
|
semantic_topic=primary_topic, |
|
|
importance_score=importance_score, |
|
|
context_connections=context_connections, |
|
|
language=language, |
|
|
readability_score=readability_score, |
|
|
entity_count=entity_count, |
|
|
sentiment_score=sentiment_score |
|
|
) |
|
|
|
|
|
def cluster_chunks_by_semantics(self, chunks: List[IntelligentChunk], n_clusters: int = None) -> Dict[int, List[IntelligentChunk]]: |
|
|
"""Cluster chunks by semantic similarity.""" |
|
|
|
|
|
if not chunks or not any(chunk.semantic_embedding is not None for chunk in chunks): |
|
|
return {0: chunks} |
|
|
|
|
|
|
|
|
embeddings = [] |
|
|
valid_chunks = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
if chunk.semantic_embedding is not None: |
|
|
embeddings.append(chunk.semantic_embedding) |
|
|
valid_chunks.append(chunk) |
|
|
|
|
|
if len(embeddings) < 2: |
|
|
return {0: chunks} |
|
|
|
|
|
embeddings = np.array(embeddings) |
|
|
|
|
|
|
|
|
if n_clusters is None: |
|
|
n_clusters = min(max(2, len(chunks) // 5), 10) |
|
|
|
|
|
|
|
|
kmeans = KMeans(n_clusters=n_clusters, random_state=42) |
|
|
cluster_labels = kmeans.fit_predict(embeddings) |
|
|
|
|
|
|
|
|
clusters = {} |
|
|
for i, chunk in enumerate(valid_chunks): |
|
|
cluster_id = int(cluster_labels[i]) |
|
|
if cluster_id not in clusters: |
|
|
clusters[cluster_id] = [] |
|
|
clusters[cluster_id].append(chunk) |
|
|
|
|
|
return clusters |
|
|
|
|
|
def create_semantic_summary(self, chunks: List[IntelligentChunk]) -> Dict[str, Any]: |
|
|
"""Create semantic summary of chunks.""" |
|
|
|
|
|
if not chunks: |
|
|
return {} |
|
|
|
|
|
|
|
|
content_types = {} |
|
|
topics = {} |
|
|
languages = {} |
|
|
importance_scores = [] |
|
|
readability_scores = [] |
|
|
sentiment_scores = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
|
|
|
ct = chunk.metadata.content_type |
|
|
content_types[ct] = content_types.get(ct, 0) + 1 |
|
|
|
|
|
|
|
|
topic = chunk.metadata.semantic_topic |
|
|
topics[topic] = topics.get(topic, 0) + 1 |
|
|
|
|
|
|
|
|
lang = chunk.metadata.language |
|
|
languages[lang] = languages.get(lang, 0) + 1 |
|
|
|
|
|
|
|
|
importance_scores.append(chunk.metadata.importance_score) |
|
|
readability_scores.append(chunk.metadata.readability_score) |
|
|
sentiment_scores.append(chunk.metadata.sentiment_score) |
|
|
|
|
|
return { |
|
|
'total_chunks': len(chunks), |
|
|
'content_types': content_types, |
|
|
'topics': topics, |
|
|
'languages': languages, |
|
|
'avg_importance': np.mean(importance_scores) if importance_scores else 0, |
|
|
'avg_readability': np.mean(readability_scores) if readability_scores else 0, |
|
|
'avg_sentiment': np.mean(sentiment_scores) if sentiment_scores else 0, |
|
|
'total_entities': sum(chunk.metadata.entity_count for chunk in chunks) |
|
|
} |
|
|
|
|
|
def main(): |
|
|
"""Demo the intelligent chunking processor.""" |
|
|
|
|
|
print("๐ง Intelligent Chunking Processor Demo") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
processor = IntelligentChunkingProcessor() |
|
|
|
|
|
|
|
|
demo_content = """ |
|
|
# Machine Learning Fundamentals |
|
|
|
|
|
Machine learning is a subset of artificial intelligence that focuses on algorithms and statistical models. |
|
|
|
|
|
## Key Concepts |
|
|
|
|
|
### Supervised Learning |
|
|
Supervised learning uses labeled training data to learn a mapping from inputs to outputs. |
|
|
|
|
|
```python |
|
|
from sklearn.linear_model import LinearRegression |
|
|
model = LinearRegression() |
|
|
model.fit(X_train, y_train) |
|
|
predictions = model.predict(X_test) |
|
|
``` |
|
|
|
|
|
### Unsupervised Learning |
|
|
Unsupervised learning finds hidden patterns in data without labeled examples. |
|
|
|
|
|
The K-means algorithm is a popular clustering method: |
|
|
|
|
|
$$\\sum_{i=1}^{k} \\sum_{x \\in C_i} ||x - \\mu_i||^2$$ |
|
|
|
|
|
## Applications |
|
|
|
|
|
Machine learning has numerous applications in: |
|
|
- Computer vision |
|
|
- Natural language processing |
|
|
- Recommendation systems |
|
|
- Autonomous vehicles |
|
|
|
|
|
This technology is revolutionizing many industries and creating new opportunities. |
|
|
""" |
|
|
|
|
|
|
|
|
print(f"\n๐ Processing content ({len(demo_content)} characters)...") |
|
|
|
|
|
file_hash = hashlib.sha256(demo_content.encode()).hexdigest() |
|
|
chunks = processor.create_intelligent_chunks(demo_content, file_hash) |
|
|
|
|
|
print(f"โ
Created {len(chunks)} intelligent chunks") |
|
|
|
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
print(f"\n๐ Chunk {i+1}:") |
|
|
print(f" Content type: {chunk.metadata.content_type}") |
|
|
print(f" Topic: {chunk.metadata.semantic_topic}") |
|
|
print(f" Importance: {chunk.metadata.importance_score:.2f}") |
|
|
print(f" Readability: {chunk.metadata.readability_score:.2f}") |
|
|
print(f" Entities: {chunk.metadata.entity_count}") |
|
|
print(f" Language: {chunk.metadata.language}") |
|
|
print(f" Content preview: {chunk.content[:100]}...") |
|
|
|
|
|
|
|
|
summary = processor.create_semantic_summary(chunks) |
|
|
print(f"\n๐ Semantic Summary:") |
|
|
print(f" Total chunks: {summary['total_chunks']}") |
|
|
print(f" Content types: {summary['content_types']}") |
|
|
print(f" Topics: {summary['topics']}") |
|
|
print(f" Average importance: {summary['avg_importance']:.2f}") |
|
|
print(f" Average readability: {summary['avg_readability']:.2f}") |
|
|
print(f" Total entities: {summary['total_entities']}") |
|
|
|
|
|
print(f"\nโ
Intelligent chunking processor ready!") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|