Spaces:
Sleeping
Sleeping
| """ | |
| TokenChunker.py | |
| A module for token-based document chunking with configurable overlap and preprocessing. | |
| Features: | |
| - Token-based document splitting with overlap | |
| - Content validation and token counting | |
| - Smart boundary detection to preserve word integrity | |
| - Compatible with multiple tokenizer types (tiktoken, transformers, basic) | |
| """ | |
| import logging | |
| import re | |
| from typing import List, Optional, Dict, Any | |
| from langchain_core.documents import Document | |
| from core.BaseChunker import BaseChunker | |
| logger = logging.getLogger(__name__) | |
| class TokenChunker(BaseChunker): | |
| """Handles document chunking at the token level with configurable overlap.""" | |
| def __init__( | |
| self, | |
| model_name=None, | |
| embedding_model=None, | |
| chunk_size: int = 256, | |
| chunk_overlap: int = 50, | |
| min_chunk_size: int = 50 | |
| ): | |
| """ | |
| Initialize token chunker with specified models and parameters. | |
| Args: | |
| model_name: Name of the model for tokenization | |
| embedding_model: Model for generating embeddings | |
| chunk_size: Maximum tokens per chunk | |
| chunk_overlap: Number of tokens to overlap between chunks | |
| min_chunk_size: Minimum tokens for a valid chunk | |
| """ | |
| super().__init__(model_name, embedding_model) | |
| # Validate chunking parameters | |
| if chunk_overlap >= chunk_size: | |
| raise ValueError("chunk_overlap must be less than chunk_size") | |
| if min_chunk_size <= 0: | |
| raise ValueError("min_chunk_size must be positive") | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.min_chunk_size = min_chunk_size | |
| self.chunk_stats = [] | |
| logger.info(f"TokenChunker initialized: chunk_size={chunk_size}, overlap={chunk_overlap}, min_size={min_chunk_size}") | |
| def _smart_tokenize(self, text: str) -> List[str]: | |
| """ | |
| Tokenize text while preserving word boundaries for reconstruction. | |
| Args: | |
| text: The text content to tokenize | |
| Returns: | |
| List of tokens that can be cleanly rejoined | |
| """ | |
| if not text.strip(): | |
| return [] | |
| try: | |
| if self.uses_tiktoken: | |
| # For tiktoken, we need a hybrid approach to preserve boundaries | |
| return self._tiktoken_boundary_aware_split(text) | |
| elif hasattr(self.tokenizer, 'tokenize'): | |
| # For transformers tokenizers | |
| tokens = self.tokenizer.tokenize(text) | |
| return self._clean_subword_tokens(tokens) | |
| else: | |
| # Fallback to intelligent word splitting | |
| return self._word_boundary_split(text) | |
| except Exception as e: | |
| logger.warning(f"Tokenization failed: {e}. Using word boundary fallback.") | |
| return self._word_boundary_split(text) | |
| def _tiktoken_boundary_aware_split(self, text: str) -> List[str]: | |
| """ | |
| Split text in a way that's compatible with tiktoken while preserving boundaries. | |
| Args: | |
| text: Input text | |
| Returns: | |
| List of text segments that approximate tokens | |
| """ | |
| # Get actual token count for validation | |
| target_token_count = self.count_tokens(text) | |
| # Split on natural boundaries (spaces, punctuation) | |
| words = re.findall(r'\S+|\s+', text) | |
| # If we have roughly the right number of words, use them | |
| if abs(len(words) - target_token_count) / max(target_token_count, 1) < 0.3: | |
| return [w for w in words if w.strip()] | |
| # Otherwise, use a more granular split | |
| segments = re.findall(r'\w+|[^\w\s]|\s+', text) | |
| return [s for s in segments if s.strip()] | |
| def _clean_subword_tokens(self, tokens: List[str]) -> List[str]: | |
| """ | |
| Clean subword tokens for better reconstruction. | |
| Args: | |
| tokens: Raw tokens from tokenizer | |
| Returns: | |
| Cleaned tokens | |
| """ | |
| cleaned = [] | |
| for token in tokens: | |
| # Remove special tokens but keep the content | |
| if token.startswith('##'): | |
| # BERT-style subwords | |
| cleaned.append(token[2:]) | |
| elif token.startswith('▁'): | |
| # SentencePiece-style | |
| cleaned.append(' ' + token[1:]) | |
| else: | |
| cleaned.append(token) | |
| return [t for t in cleaned if t.strip()] | |
| def _word_boundary_split(self, text: str) -> List[str]: | |
| """ | |
| Split text on word boundaries as fallback tokenization. | |
| Args: | |
| text: Input text | |
| Returns: | |
| List of words | |
| """ | |
| # Split on whitespace but preserve some punctuation as separate tokens | |
| tokens = re.findall(r'\w+|[.!?;,]', text) | |
| return tokens | |
| def _detokenize(self, tokens: List[str]) -> str: | |
| """ | |
| Reconstruct text from tokens, handling different tokenizer types. | |
| Args: | |
| tokens: List of token strings | |
| Returns: | |
| Reconstructed text | |
| """ | |
| if not tokens: | |
| return "" | |
| if self.uses_tiktoken or not hasattr(self.tokenizer, 'tokenize'): | |
| # For tiktoken and basic tokenizers, use space joining with smart spacing | |
| result = "" | |
| for i, token in enumerate(tokens): | |
| if not token.strip(): | |
| continue | |
| if i == 0: | |
| result = token | |
| elif token in '.,!?;:': | |
| result += token | |
| elif result and result[-1] in '.,!?;:': | |
| result += " " + token | |
| else: | |
| result += " " + token | |
| return result | |
| else: | |
| # For transformers tokenizers, handle subword reconstruction | |
| text = "".join(tokens) | |
| # Clean up spacing around punctuation | |
| text = re.sub(r'\s+([.!?;,])', r'\1', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def _create_token_chunks(self, tokens: List[str]) -> List[List[str]]: | |
| """ | |
| Split tokens into overlapping chunks of specified size. | |
| Args: | |
| tokens: List of token strings | |
| Returns: | |
| List of token chunks | |
| """ | |
| if not tokens: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| while start < len(tokens): | |
| # Calculate end position for this chunk | |
| end = min(start + self.chunk_size, len(tokens)) | |
| # Extract the chunk | |
| chunk_tokens = tokens[start:end] | |
| # Only add chunks that meet minimum size requirement | |
| if len(chunk_tokens) >= self.min_chunk_size: | |
| chunks.append(chunk_tokens) | |
| self.chunk_stats.append(f"Created chunk with {len(chunk_tokens)} tokens") | |
| else: | |
| self.chunk_stats.append(f"Skipped small chunk with {len(chunk_tokens)} tokens") | |
| # Break if we've reached the end | |
| if end >= len(tokens): | |
| break | |
| # Calculate next start position with overlap | |
| start = end - self.chunk_overlap | |
| # Ensure forward progress | |
| if start <= 0: | |
| start = end | |
| return chunks | |
| def _process_single_chunk(self, chunk_tokens: List[str], chunk_index: int, | |
| source_metadata: Dict[str, Any]) -> Optional[Document]: | |
| """ | |
| Process a single token chunk into a Document with metadata. | |
| Args: | |
| chunk_tokens: List of tokens for this chunk | |
| chunk_index: Index of this chunk in the document | |
| source_metadata: Metadata from source document | |
| Returns: | |
| Document object with processed content and metadata, or None if invalid | |
| """ | |
| # Reconstruct text from tokens | |
| chunk_text = self._detokenize(chunk_tokens) | |
| # Validate chunk content | |
| if not self.is_content_valid(chunk_text, min_tokens=self.min_chunk_size): | |
| self.chunk_stats.append(f"Chunk {chunk_index} failed validation") | |
| return None | |
| # Analyze the chunk content | |
| stats = self.analyze_text(chunk_text) | |
| # Create comprehensive metadata | |
| metadata = source_metadata.copy() | |
| metadata.update({ | |
| "chunk_index": chunk_index, | |
| "chunk_type": "token", | |
| "chunking_method": "token_based", | |
| "token_count": len(chunk_tokens), | |
| "char_count": stats["char_count"], | |
| "sentence_count": stats["sentence_count"], | |
| "word_count": stats["word_count"], | |
| "chunk_size_limit": self.chunk_size, | |
| "chunk_overlap": self.chunk_overlap | |
| }) | |
| return Document(page_content=chunk_text, metadata=metadata) | |
| def token_process_document(self, file_path: str, preprocess: bool = True) -> List[Document]: | |
| """ | |
| Process document using token-based chunking with overlap. | |
| Args: | |
| file_path: Path to the document file | |
| preprocess: Whether to preprocess text content | |
| Returns: | |
| List of Document objects, one per valid token chunk | |
| """ | |
| try: | |
| self.chunk_stats = [] # Reset stats for this document | |
| raw_pages = self.load_document(file_path) | |
| processed_chunks = [] | |
| logger.info(f"Processing document with {len(raw_pages)} pages using token chunking") | |
| # Combine all pages into a single text for token-based processing | |
| full_text = "" | |
| combined_metadata = {} | |
| page_info = [] # Track which pages contributed to the text | |
| for page_idx, page in enumerate(raw_pages): | |
| content = page.page_content | |
| # Skip invalid content | |
| if not self.is_content_valid(content): | |
| logger.debug(f"Skipping invalid content on page {page_idx + 1}") | |
| continue | |
| # Preprocess if requested | |
| if preprocess: | |
| content = self.preprocess_text(content) | |
| if not self.is_content_valid(content): | |
| continue | |
| # Track page information | |
| page_info.append({ | |
| "page_number": page_idx + 1, | |
| "original_metadata": page.metadata | |
| }) | |
| # Combine text with page separation | |
| if full_text: | |
| full_text += "\n\n" + content | |
| else: | |
| full_text = content | |
| # Use metadata from first valid page as base | |
| combined_metadata = page.metadata.copy() | |
| # Update combined metadata to reflect all pages | |
| if page_info: | |
| combined_metadata.update({ | |
| "total_pages_processed": len(page_info), | |
| "page_range": f"{page_info[0]['page_number']}-{page_info[-1]['page_number']}", | |
| "source_pages": [str(p["page_number"]) for p in page_info] # ✅ Convert to list of strings | |
| }) | |
| # Remove the single "page" field since this represents multiple pages | |
| combined_metadata.pop("page", None) | |
| if not full_text.strip(): | |
| logger.warning("No valid content found in document") | |
| return [] | |
| # Tokenize the entire document | |
| all_tokens = self._smart_tokenize(full_text) | |
| logger.info(f"Document tokenized into {len(all_tokens)} tokens") | |
| if len(all_tokens) < self.min_chunk_size: | |
| logger.warning(f"Document too short for chunking ({len(all_tokens)} tokens)") | |
| return [] | |
| # Create overlapping token chunks | |
| token_chunks = self._create_token_chunks(all_tokens) | |
| logger.info(f"Created {len(token_chunks)} token chunks") | |
| # Convert token chunks to Document objects | |
| for chunk_idx, chunk_tokens in enumerate(token_chunks): | |
| chunk_doc = self._process_single_chunk( | |
| chunk_tokens, | |
| chunk_idx, | |
| combined_metadata | |
| ) | |
| if chunk_doc: | |
| processed_chunks.append(chunk_doc) | |
| # Output processing statistics | |
| if self.chunk_stats: | |
| logger.info("\n".join(self.chunk_stats)) | |
| logger.info(f"Processed {len(processed_chunks)} valid token chunks") | |
| return processed_chunks | |
| except Exception as e: | |
| logger.error(f"Error in token_process_document: {e}") | |
| raise | |
| def process_document(self, file_path: str, preprocess: bool = True) -> List[Document]: | |
| """ | |
| Process document using token chunking strategy (implements abstract method). | |
| Args: | |
| file_path: Path to the document file | |
| preprocess: Whether to preprocess text content | |
| Returns: | |
| List of Document objects, one per valid token chunk | |
| """ | |
| return self.token_process_document(file_path, preprocess) | |
| def process_text_file(self, file_path: str, preprocess: bool = True) -> List[Document]: | |
| """ | |
| Process text file directly using token-based chunking with overlap. | |
| Args: | |
| file_path: Path to the text file | |
| preprocess: Whether to preprocess text content | |
| Returns: | |
| List of Document objects, one per valid token chunk | |
| """ | |
| try: | |
| from pathlib import Path | |
| from datetime import datetime | |
| self.chunk_stats = [] # Reset stats for this document | |
| # Load the text file directly | |
| content = self.load_text_file(file_path) | |
| # Clean the text using the same logic as PDF conversion | |
| content = self.clean_text_for_processing(content) | |
| # Basic validation | |
| if not self.is_content_valid(content): | |
| logger.warning("Text file content failed validation") | |
| return [] | |
| # Light preprocessing if requested (no header/footer removal for txt files) | |
| if preprocess: | |
| # Only apply basic text cleaning, not aggressive preprocessing | |
| content = ' '.join(content.split()) # Normalize whitespace | |
| # Create file-level metadata | |
| file_path_obj = Path(file_path) | |
| file_metadata = { | |
| "source": file_path, | |
| "file_name": file_path_obj.name, | |
| "file_type": "txt", | |
| "total_characters": len(content), | |
| "processing_timestamp": datetime.now().isoformat(), | |
| } | |
| logger.info(f"Processing text file: {file_path_obj.name} ({len(content)} characters)") | |
| # Tokenize the entire document | |
| all_tokens = self._smart_tokenize(content) | |
| logger.info(f"Text file tokenized into {len(all_tokens)} tokens") | |
| if len(all_tokens) < self.min_chunk_size: | |
| logger.warning(f"Text file too short for chunking ({len(all_tokens)} tokens)") | |
| return [] | |
| # Create overlapping token chunks | |
| token_chunks = self._create_token_chunks(all_tokens) | |
| logger.info(f"Created {len(token_chunks)} token chunks from text file") | |
| # Convert token chunks to Document objects | |
| processed_chunks = [] | |
| for chunk_idx, chunk_tokens in enumerate(token_chunks): | |
| chunk_doc = self._process_single_chunk( | |
| chunk_tokens, | |
| chunk_idx, | |
| file_metadata | |
| ) | |
| if chunk_doc: | |
| processed_chunks.append(chunk_doc) | |
| # Output processing statistics | |
| if self.chunk_stats: | |
| logger.info("\n".join(self.chunk_stats)) | |
| logger.info(f"Processed {len(processed_chunks)} valid token chunks from text file") | |
| return processed_chunks | |
| except Exception as e: | |
| logger.error(f"Error processing text file: {e}") | |
| raise |