Spaces:
Sleeping
Sleeping
| """ | |
| Text Chunker Module | |
| Handles chunking text into smaller pieces with overlap for better context preservation. | |
| """ | |
| import re | |
| from typing import List | |
| from config.config import CHUNK_SIZE, CHUNK_OVERLAP | |
| class TextChunker: | |
| """Handles text chunking with overlap and smart boundary detection.""" | |
| def __init__(self): | |
| """Initialize the text chunker.""" | |
| self.chunk_size = CHUNK_SIZE | |
| self.chunk_overlap = CHUNK_OVERLAP | |
| def chunk_text(self, text: str) -> List[str]: | |
| """ | |
| Chunk text into smaller pieces with overlap. | |
| Args: | |
| text: The input text to chunk | |
| Returns: | |
| List[str]: List of text chunks | |
| """ | |
| print(f"✂️ Chunking text into {self.chunk_size} character chunks with {self.chunk_overlap} overlap") | |
| # Clean the text | |
| cleaned_text = self._clean_text(text) | |
| chunks = [] | |
| start = 0 | |
| while start < len(cleaned_text): | |
| end = start + self.chunk_size | |
| # Try to end at sentence boundary | |
| if end < len(cleaned_text): | |
| end = self._find_sentence_boundary(cleaned_text, start, end) | |
| chunk = cleaned_text[start:end].strip() | |
| # Only add chunk if it's meaningful | |
| if chunk and len(chunk) > 50: | |
| chunks.append(chunk) | |
| # Move start position with overlap | |
| start = end - self.chunk_overlap | |
| if start >= len(cleaned_text): | |
| break | |
| print(f"✅ Created {len(chunks)} chunks (size={self.chunk_size}, overlap={self.chunk_overlap})") | |
| return chunks | |
| def _clean_text(self, text: str) -> str: | |
| """ | |
| Clean text by normalizing whitespace and removing excessive line breaks. | |
| Args: | |
| text: Raw text to clean | |
| Returns: | |
| str: Cleaned text | |
| """ | |
| # Replace multiple whitespace with single space | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def _find_sentence_boundary(self, text: str, start: int, preferred_end: int) -> int: | |
| """ | |
| Find the best sentence boundary near the preferred end position. | |
| Args: | |
| text: The full text | |
| start: Start position of the chunk | |
| preferred_end: Preferred end position | |
| Returns: | |
| int: Adjusted end position at sentence boundary | |
| """ | |
| # Look for sentence endings within a reasonable range | |
| search_start = max(start, preferred_end - 100) | |
| search_end = min(len(text), preferred_end + 50) | |
| sentence_endings = ['.', '!', '?'] | |
| best_end = preferred_end | |
| # Search backwards from preferred end for sentence boundary | |
| for i in range(preferred_end - 1, search_start - 1, -1): | |
| if text[i] in sentence_endings: | |
| # Check if this looks like a real sentence ending | |
| if self._is_valid_sentence_ending(text, i): | |
| best_end = i + 1 | |
| break | |
| return best_end | |
| def _is_valid_sentence_ending(self, text: str, pos: int) -> bool: | |
| """ | |
| Check if a punctuation mark represents a valid sentence ending. | |
| Args: | |
| text: The full text | |
| pos: Position of the punctuation mark | |
| Returns: | |
| bool: True if it's a valid sentence ending | |
| """ | |
| # Avoid breaking on abbreviations like "Dr.", "Mr.", etc. | |
| if pos > 0 and text[pos] == '.': | |
| # Look at the character before the period | |
| char_before = text[pos - 1] | |
| if char_before.isupper(): | |
| # Might be an abbreviation | |
| word_start = pos - 1 | |
| while word_start > 0 and text[word_start - 1].isalpha(): | |
| word_start -= 1 | |
| word = text[word_start:pos] | |
| # Common abbreviations to avoid breaking on | |
| abbreviations = {'Dr', 'Mr', 'Mrs', 'Ms', 'Prof', 'Inc', 'Ltd', 'Corp', 'Co'} | |
| if word in abbreviations: | |
| return False | |
| # Check if there's a space or newline after the punctuation | |
| if pos + 1 < len(text): | |
| next_char = text[pos + 1] | |
| return next_char.isspace() or next_char.isupper() | |
| return True | |
| def get_chunk_stats(self, chunks: List[str]) -> dict: | |
| """ | |
| Get statistics about the created chunks. | |
| Args: | |
| chunks: List of text chunks | |
| Returns: | |
| dict: Statistics about the chunks | |
| """ | |
| if not chunks: | |
| return { | |
| "total_chunks": 0, | |
| "total_characters": 0, | |
| "total_words": 0, | |
| "avg_chunk_size": 0, | |
| "min_chunk_size": 0, | |
| "max_chunk_size": 0 | |
| } | |
| chunk_sizes = [len(chunk) for chunk in chunks] | |
| total_chars = sum(chunk_sizes) | |
| total_words = sum(len(chunk.split()) for chunk in chunks) | |
| return { | |
| "total_chunks": len(chunks), | |
| "total_characters": total_chars, | |
| "total_words": total_words, | |
| "avg_chunk_size": total_chars / len(chunks), | |
| "min_chunk_size": min(chunk_sizes), | |
| "max_chunk_size": max(chunk_sizes) | |
| } | |