#!/usr/bin/env python3 """ Intelligent Chunking Processor ============================== Advanced chunking system with semantic awareness and context preservation. """ import re import json import hashlib import numpy as np from typing import List, Dict, Any, Optional, Tuple, Generator from dataclasses import dataclass, asdict from datetime import datetime import spacy from sentence_transformers import SentenceTransformer import networkx as nx from sklearn.cluster import KMeans from sklearn.metrics.pairwise import cosine_similarity @dataclass class ChunkMetadata: """Metadata for a text chunk.""" chunk_id: str content_type: str semantic_topic: str importance_score: float context_connections: List[str] language: str readability_score: float entity_count: int sentiment_score: float @dataclass class IntelligentChunk: """Intelligent chunk with semantic metadata.""" chunk_id: str content: str chunk_index: int total_chunks: int file_hash: str metadata: ChunkMetadata semantic_embedding: Optional[np.ndarray] = None timestamp: str = "" class IntelligentChunkingProcessor: """Advanced chunking processor with semantic awareness.""" def __init__(self, max_chunk_size: int = 1000000, overlap_size: int = 1000, semantic_model: str = "all-MiniLM-L6-v2", language_model: str = "en_core_web_sm"): self.max_chunk_size = max_chunk_size self.overlap_size = overlap_size # Initialize NLP models self.semantic_model = None self.nlp = None self._load_models(semantic_model, language_model) # Content type patterns self.content_patterns = { 'code': [ r'```[\s\S]*?```', # Code blocks r'`[^`]+`', # Inline code r'def\s+\w+\s*\(', # Python functions r'class\s+\w+', # Python classes r'function\s+\w+\s*\(', # JavaScript functions r'#include\s*<', # C/C++ includes ], 'mathematical': [ r'\$[\s\S]*?\$', # LaTeX math r'\\[a-zA-Z]+\{[^}]*\}', # LaTeX commands r'\b\d+\s*[+\-*/=]\s*\d+', # Simple math r'\\frac\{[^}]+\}\{[^}]+\}', # Fractions ], 'structured_data': [ r'\{[\s\S]*?\}', # JSON objects r'\[[\s\S]*?\]', # JSON arrays r'<[^>]+>', # XML/HTML tags r'^\s*[a-zA-Z_][a-zA-Z0-9_]*\s*:', # Key-value pairs ], 'natural_language': [ r'[.!?]+\s+[A-Z]', # Sentence boundaries r'\n\n+', # Paragraph breaks ] } def _load_models(self, semantic_model: str, language_model: str): """Load NLP models.""" try: # Load semantic model self.semantic_model = SentenceTransformer(semantic_model) print(f"✅ Loaded semantic model: {semantic_model}") except Exception as e: print(f"⚠️ Semantic model loading failed: {e}") self.semantic_model = None try: # Load language model self.nlp = spacy.load(language_model) print(f"✅ Loaded language model: {language_model}") except Exception as e: print(f"⚠️ Language model loading failed: {e}") self.nlp = None def detect_content_type(self, content: str) -> str: """Detect the primary content type of the text.""" content = content.strip() # Check for code patterns code_matches = 0 for pattern in self.content_patterns['code']: code_matches += len(re.findall(pattern, content, re.MULTILINE)) if code_matches > 0: return 'code' # Check for mathematical content math_matches = 0 for pattern in self.content_patterns['mathematical']: math_matches += len(re.findall(pattern, content)) if math_matches > 0: return 'mathematical' # Check for structured data structured_matches = 0 for pattern in self.content_patterns['structured_data']: structured_matches += len(re.findall(pattern, content)) if structured_matches > len(content) / 100: # Threshold for structured content return 'structured_data' # Default to natural language return 'natural_language' def extract_semantic_topics(self, content: str) -> List[str]: """Extract semantic topics from content.""" if not self.nlp: return ['general'] try: doc = self.nlp(content) # Extract noun phrases and named entities topics = [] # Named entities for ent in doc.ents: if ent.label_ in ['PERSON', 'ORG', 'GPE', 'EVENT', 'WORK_OF_ART', 'LAW']: topics.append(ent.text.lower()) # Noun phrases for chunk in doc.noun_chunks: if len(chunk.text.split()) >= 2: # Multi-word phrases topics.append(chunk.text.lower()) # Remove duplicates and limit topics = list(set(topics))[:10] return topics if topics else ['general'] except Exception as e: print(f"⚠️ Topic extraction failed: {e}") return ['general'] def calculate_importance_score(self, content: str, content_type: str) -> float: """Calculate importance score for content.""" score = 0.5 # Base score # Length factor length_score = min(len(content) / 1000, 1.0) * 0.2 score += length_score # Content type factor type_scores = { 'code': 0.3, 'mathematical': 0.25, 'structured_data': 0.2, 'natural_language': 0.1 } score += type_scores.get(content_type, 0.1) # Keyword density important_keywords = [ 'important', 'critical', 'essential', 'key', 'main', 'primary', 'function', 'class', 'method', 'algorithm', 'definition', 'theorem', 'conclusion', 'summary', 'abstract', 'introduction' ] keyword_count = sum(1 for keyword in important_keywords if keyword.lower() in content.lower()) keyword_score = min(keyword_count / 10, 0.3) score += keyword_score return min(score, 1.0) def calculate_readability_score(self, content: str) -> float: """Calculate readability score (simplified Flesch score).""" if not self.nlp: return 0.5 try: doc = self.nlp(content) sentences = [sent for sent in doc.sents] words = [token for token in doc if not token.is_punct and not token.is_space] if not sentences or not words: return 0.5 avg_sentence_length = len(words) / len(sentences) avg_syllables_per_word = sum(self._count_syllables(word.text) for word in words) / len(words) # Simplified Flesch score score = 206.835 - (1.015 * avg_sentence_length) - (84.6 * avg_syllables_per_word) # Normalize to 0-1 return max(0, min(1, score / 100)) except Exception as e: print(f"⚠️ Readability calculation failed: {e}") return 0.5 def _count_syllables(self, word: str) -> int: """Count syllables in a word (simplified).""" word = word.lower() vowels = 'aeiouy' syllable_count = 0 prev_was_vowel = False for char in word: if char in vowels: if not prev_was_vowel: syllable_count += 1 prev_was_vowel = True else: prev_was_vowel = False # Handle silent 'e' if word.endswith('e') and syllable_count > 1: syllable_count -= 1 return max(1, syllable_count) def calculate_sentiment_score(self, content: str) -> float: """Calculate sentiment score (-1 to 1).""" if not self.nlp: return 0.0 try: doc = self.nlp(content) # Simple sentiment based on positive/negative words positive_words = ['good', 'great', 'excellent', 'amazing', 'wonderful', 'fantastic', 'perfect'] negative_words = ['bad', 'terrible', 'awful', 'horrible', 'disappointing', 'wrong', 'error'] pos_count = sum(1 for word in doc if word.text.lower() in positive_words) neg_count = sum(1 for word in doc if word.text.lower() in negative_words) total_words = len([token for token in doc if token.is_alpha]) if total_words == 0: return 0.0 sentiment = (pos_count - neg_count) / total_words return max(-1, min(1, sentiment)) except Exception as e: print(f"⚠️ Sentiment calculation failed: {e}") return 0.0 def extract_entities(self, content: str) -> int: """Extract and count entities.""" if not self.nlp: return 0 try: doc = self.nlp(content) entities = [ent for ent in doc.ents if ent.label_ in ['PERSON', 'ORG', 'GPE', 'EVENT', 'WORK_OF_ART']] return len(entities) except Exception as e: print(f"⚠️ Entity extraction failed: {e}") return 0 def detect_language(self, content: str) -> str: """Detect language of content.""" if not self.nlp: return 'en' try: doc = self.nlp(content[:1000]) # Sample first 1000 chars return doc.lang_ if hasattr(doc, 'lang_') else 'en' except Exception as e: print(f"⚠️ Language detection failed: {e}") return 'en' def generate_semantic_embedding(self, content: str) -> Optional[np.ndarray]: """Generate semantic embedding for content.""" if not self.semantic_model: return None try: embedding = self.semantic_model.encode(content) return embedding except Exception as e: print(f"⚠️ Embedding generation failed: {e}") return None def find_semantic_boundaries(self, content: str, content_type: str) -> List[int]: """Find optimal chunk boundaries based on content type.""" boundaries = [] if content_type == 'code': # For code, split on function/class boundaries patterns = [ r'\n\s*(def\s+\w+\s*\(|class\s+\w+|function\s+\w+\s*\()', r'\n\s*#\s*---+\n', # Comment separators r'\n\s*//\s*---+\n', # Comment separators r'\n\n+', # Multiple newlines ] elif content_type == 'natural_language': # For natural language, split on paragraph/section boundaries patterns = [ r'\n\s*#{1,6}\s+', # Markdown headers r'\n\n+', # Paragraph breaks r'[.!?]\s+\n', # Sentence ends followed by newline ] elif content_type == 'structured_data': # For structured data, split on object/array boundaries patterns = [ r'\n\s*\{', # New JSON objects r'\n\s*\[', # New JSON arrays r'\n\s*<[^>]+>', # New XML/HTML elements ] else: # Default patterns patterns = [r'\n\n+', r'[.!?]\s+\n'] for pattern in patterns: for match in re.finditer(pattern, content): boundaries.append(match.start()) # Add beginning and end boundaries = [0] + sorted(set(boundaries)) + [len(content)] return boundaries def create_intelligent_chunks(self, content: str, file_hash: str, chunk_overlap: int = None) -> List[IntelligentChunk]: """Create intelligent chunks with semantic awareness.""" if chunk_overlap is None: chunk_overlap = self.overlap_size # Detect content type content_type = self.detect_content_type(content) # If content is small enough, return as single chunk if len(content) <= self.max_chunk_size: metadata = self._create_chunk_metadata( content, content_type, chunk_index=0, total_chunks=1 ) embedding = self.generate_semantic_embedding(content) return [IntelligentChunk( chunk_id="chunk_0", content=content, chunk_index=0, total_chunks=1, file_hash=file_hash, metadata=metadata, semantic_embedding=embedding, timestamp=datetime.now().isoformat() )] # Find semantic boundaries boundaries = self.find_semantic_boundaries(content, content_type) # Create chunks based on boundaries and size constraints chunks = [] total_chunks = 0 # Calculate optimal number of chunks estimated_chunks = max(1, len(content) // (self.max_chunk_size - chunk_overlap)) total_chunks = estimated_chunks for i in range(total_chunks): start_idx = i * (self.max_chunk_size - chunk_overlap) end_idx = min(start_idx + self.max_chunk_size, len(content)) # Adjust boundaries to semantic boundaries if possible if boundaries: # Find the best semantic boundary near our calculated boundary best_boundary = end_idx for boundary in boundaries: if start_idx < boundary < end_idx: # Prefer boundaries closer to our calculated end if abs(boundary - end_idx) < abs(best_boundary - end_idx): best_boundary = boundary end_idx = best_boundary chunk_content = content[start_idx:end_idx] # Create metadata metadata = self._create_chunk_metadata( chunk_content, content_type, chunk_index=i, total_chunks=total_chunks ) # Generate embedding embedding = self.generate_semantic_embedding(chunk_content) chunk = IntelligentChunk( chunk_id=f"chunk_{i}", content=chunk_content, chunk_index=i, total_chunks=total_chunks, file_hash=file_hash, metadata=metadata, semantic_embedding=embedding, timestamp=datetime.now().isoformat() ) chunks.append(chunk) # Update total chunks for chunk in chunks: chunk.total_chunks = len(chunks) return chunks def _create_chunk_metadata(self, content: str, content_type: str, chunk_index: int, total_chunks: int) -> ChunkMetadata: """Create metadata for a chunk.""" # Extract topics topics = self.extract_semantic_topics(content) primary_topic = topics[0] if topics else 'general' # Calculate scores importance_score = self.calculate_importance_score(content, content_type) readability_score = self.calculate_readability_score(content) sentiment_score = self.calculate_sentiment_score(content) entity_count = self.extract_entities(content) language = self.detect_language(content) # Generate context connections (simplified) context_connections = [] if chunk_index > 0: context_connections.append(f"chunk_{chunk_index-1}") if chunk_index < total_chunks - 1: context_connections.append(f"chunk_{chunk_index+1}") return ChunkMetadata( chunk_id=f"chunk_{chunk_index}", content_type=content_type, semantic_topic=primary_topic, importance_score=importance_score, context_connections=context_connections, language=language, readability_score=readability_score, entity_count=entity_count, sentiment_score=sentiment_score ) def cluster_chunks_by_semantics(self, chunks: List[IntelligentChunk], n_clusters: int = None) -> Dict[int, List[IntelligentChunk]]: """Cluster chunks by semantic similarity.""" if not chunks or not any(chunk.semantic_embedding is not None for chunk in chunks): return {0: chunks} # Get embeddings embeddings = [] valid_chunks = [] for chunk in chunks: if chunk.semantic_embedding is not None: embeddings.append(chunk.semantic_embedding) valid_chunks.append(chunk) if len(embeddings) < 2: return {0: chunks} embeddings = np.array(embeddings) # Determine number of clusters if n_clusters is None: n_clusters = min(max(2, len(chunks) // 5), 10) # Perform clustering kmeans = KMeans(n_clusters=n_clusters, random_state=42) cluster_labels = kmeans.fit_predict(embeddings) # Group chunks by cluster clusters = {} for i, chunk in enumerate(valid_chunks): cluster_id = int(cluster_labels[i]) if cluster_id not in clusters: clusters[cluster_id] = [] clusters[cluster_id].append(chunk) return clusters def create_semantic_summary(self, chunks: List[IntelligentChunk]) -> Dict[str, Any]: """Create semantic summary of chunks.""" if not chunks: return {} # Aggregate metadata content_types = {} topics = {} languages = {} importance_scores = [] readability_scores = [] sentiment_scores = [] for chunk in chunks: # Content types ct = chunk.metadata.content_type content_types[ct] = content_types.get(ct, 0) + 1 # Topics topic = chunk.metadata.semantic_topic topics[topic] = topics.get(topic, 0) + 1 # Languages lang = chunk.metadata.language languages[lang] = languages.get(lang, 0) + 1 # Scores importance_scores.append(chunk.metadata.importance_score) readability_scores.append(chunk.metadata.readability_score) sentiment_scores.append(chunk.metadata.sentiment_score) return { 'total_chunks': len(chunks), 'content_types': content_types, 'topics': topics, 'languages': languages, 'avg_importance': np.mean(importance_scores) if importance_scores else 0, 'avg_readability': np.mean(readability_scores) if readability_scores else 0, 'avg_sentiment': np.mean(sentiment_scores) if sentiment_scores else 0, 'total_entities': sum(chunk.metadata.entity_count for chunk in chunks) } def main(): """Demo the intelligent chunking processor.""" print("🧠 Intelligent Chunking Processor Demo") print("=" * 50) # Initialize processor processor = IntelligentChunkingProcessor() # Demo content demo_content = """ # Machine Learning Fundamentals Machine learning is a subset of artificial intelligence that focuses on algorithms and statistical models. ## Key Concepts ### Supervised Learning Supervised learning uses labeled training data to learn a mapping from inputs to outputs. ```python from sklearn.linear_model import LinearRegression model = LinearRegression() model.fit(X_train, y_train) predictions = model.predict(X_test) ``` ### Unsupervised Learning Unsupervised learning finds hidden patterns in data without labeled examples. The K-means algorithm is a popular clustering method: $$\\sum_{i=1}^{k} \\sum_{x \\in C_i} ||x - \\mu_i||^2$$ ## Applications Machine learning has numerous applications in: - Computer vision - Natural language processing - Recommendation systems - Autonomous vehicles This technology is revolutionizing many industries and creating new opportunities. """ # Create intelligent chunks print(f"\n📝 Processing content ({len(demo_content)} characters)...") file_hash = hashlib.sha256(demo_content.encode()).hexdigest() chunks = processor.create_intelligent_chunks(demo_content, file_hash) print(f"✅ Created {len(chunks)} intelligent chunks") # Show chunk details for i, chunk in enumerate(chunks): print(f"\n📄 Chunk {i+1}:") print(f" Content type: {chunk.metadata.content_type}") print(f" Topic: {chunk.metadata.semantic_topic}") print(f" Importance: {chunk.metadata.importance_score:.2f}") print(f" Readability: {chunk.metadata.readability_score:.2f}") print(f" Entities: {chunk.metadata.entity_count}") print(f" Language: {chunk.metadata.language}") print(f" Content preview: {chunk.content[:100]}...") # Create semantic summary summary = processor.create_semantic_summary(chunks) print(f"\n📊 Semantic Summary:") print(f" Total chunks: {summary['total_chunks']}") print(f" Content types: {summary['content_types']}") print(f" Topics: {summary['topics']}") print(f" Average importance: {summary['avg_importance']:.2f}") print(f" Average readability: {summary['avg_readability']:.2f}") print(f" Total entities: {summary['total_entities']}") print(f"\n✅ Intelligent chunking processor ready!") if __name__ == "__main__": main()