# DEPENDENCIES import re from abc import ABC from typing import List from pathlib import Path from typing import Optional from abc import abstractmethod from config.models import DocumentChunk from config.models import DocumentMetadata from config.models import ChunkingStrategy from config.logging_config import get_logger from chunking.token_counter import count_tokens # Setup Logging logger = get_logger(__name__) class BaseChunker(ABC): """ Abstract base class for all chunking strategies: Implements Template Method pattern for consistent chunking pipeline """ def __init__(self, strategy_name: ChunkingStrategy): """ Initialize base chunker Arguments: ---------- strategy_name { ChunkingStrategy } : Chunking strategy enum """ self.strategy_name = strategy_name self.logger = logger @abstractmethod def chunk_text(self, text: str, metadata: Optional[DocumentMetadata] = None) -> List[DocumentChunk]: """ Chunk text into smaller pieces - must be implemented by subclasses Arguments: ---------- text { str } : Input text to chunk metadata { DocumentMetadata } : Document metadata Returns: -------- { list } : List of DocumentChunk objects """ pass def chunk_document(self, text: str, metadata: DocumentMetadata) -> List[DocumentChunk]: """ Chunk document with full metadata: Template method that calls chunk_text and adds metadata Arguments: ---------- text { str } : Document text metadata { DocumentMetadata } : Document metadata Returns: -------- { list } : List of DocumentChunk objects with metadata """ try: self.logger.info(f"Chunking document {metadata.document_id} using {self.strategy_name.value}") # Validate input if not text or not text.strip(): self.logger.warning(f"Empty text for document {metadata.document_id}") return [] # Perform chunking chunks = self.chunk_text(text = text, metadata = metadata, ) # Update metadata metadata.num_chunks = len(chunks) metadata.chunking_strategy = self.strategy_name # Validate chunks if not self.validate_chunks(chunks): self.logger.warning(f"Chunk validation failed for {metadata.document_id}") self.logger.info(f"Created {len(chunks)} chunks for {metadata.document_id}") return chunks except Exception as e: self.logger.error(f"Chunking failed for {metadata.document_id}: {repr(e)}") raise def _create_chunk(self, text: str, chunk_index: int, document_id: str, start_char: int, end_char: int, page_number: Optional[int] = None, section_title: Optional[str] = None, metadata: Optional[dict] = None) -> DocumentChunk: """ Create a DocumentChunk object with proper formatting Arguments: ---------- text { str } : Chunk text chunk_index { int } : Index of chunk in document document_id { str } : Parent document ID start_char { int } : Start character position end_char { int } : End character position page_number { int } : Page number (if applicable) section_title { str } : Section heading (CRITICAL for retrieval) metadata { dict } : Additional metadata Returns: -------- { DocumentChunk } : DocumentChunk object """ # Generate unique chunk ID chunk_id = f"chunk_{document_id}_{chunk_index}" # Count tokens token_count = count_tokens(text) # Create chunk with section context chunk = DocumentChunk(chunk_id = chunk_id, document_id = document_id, text = text, chunk_index = chunk_index, start_char = start_char, end_char = end_char, page_number = page_number, section_title = section_title, token_count = token_count, metadata = metadata or {}, ) return chunk def _extract_page_number(self, text: str, full_text: str) -> Optional[int]: """ Try to extract page number from text: Looks for [PAGE N] markers inserted during parsing """ # Look for page markers in current chunk page_match = re.search(r'\[PAGE (\d+)\]', text) if page_match: return int(page_match.group(1)) # Alternative: try to determine from position in full text if full_text: chunk_start = full_text.find(text[:min(200, len(text))]) if (chunk_start >= 0): text_before = full_text[:chunk_start] page_matches = re.findall(r'\[PAGE (\d+)\]', text_before) if page_matches: return int(page_matches[-1]) return None def _clean_chunk_text(self, text: str) -> str: """ Clean chunk text by removing markers and extra whitespace Arguments: ---------- text { str } : Raw chunk text Returns: -------- { str } : Cleaned text """ # Remove page markers text = re.sub(r'\[PAGE \d+\]', '', text) # Remove other common markers text = re.sub(r'\[HEADER\]|\[FOOTER\]|\[TABLE \d+\]', '', text) # Normalize whitespace text = re.sub(r'\s+', ' ', text) text = text.strip() return text def validate_chunks(self, chunks: List[DocumentChunk]) -> bool: """ Validate chunk list for consistency Arguments: ---------- chunks { list } : List of chunks to validate Returns: -------- { bool } : True if valid """ if not chunks: return True # Check all chunks have the same document_id doc_ids = {chunk.document_id for chunk in chunks} if (len(doc_ids) > 1): self.logger.error(f"Chunks have multiple document IDs: {doc_ids}") return False # Check chunk indices are sequential indices = [chunk.chunk_index for chunk in chunks] expected_indices = list(range(len(chunks))) if (indices != expected_indices): self.logger.warning(f"Non-sequential chunk indices: {indices}") # Check for empty chunks empty_chunks = [c.chunk_index for c in chunks if not c.text.strip()] if empty_chunks: self.logger.warning(f"Empty chunks at indices: {empty_chunks}") # Check token counts zero_token_chunks = [c.chunk_index for c in chunks if (c.token_count == 0)] if zero_token_chunks: self.logger.warning(f"Zero-token chunks at indices: {zero_token_chunks}") # NEW: Check section_title preservation (important for structured documents) chunks_with_sections = [c for c in chunks if c.section_title] if chunks_with_sections: self.logger.info(f"{len(chunks_with_sections)}/{len(chunks)} chunks have section titles preserved") return True def get_chunk_statistics(self, chunks: List[DocumentChunk]) -> dict: """ Calculate statistics for chunk list Arguments: ---------- chunks { list } : List of chunks Returns: -------- { dict } : Dictionary with statistics """ if not chunks: return {"num_chunks" : 0, "total_tokens" : 0, "avg_tokens_per_chunk" : 0, "min_tokens" : 0, "max_tokens" : 0, "total_chars" : 0, "avg_chars_per_chunk" : 0, "chunks_with_sections" : 0, } token_counts = [c.token_count for c in chunks] char_counts = [len(c.text) for c in chunks] chunks_with_sections = sum(1 for c in chunks if c.section_title) stats = {"num_chunks" : len(chunks), "total_tokens" : sum(token_counts), "avg_tokens_per_chunk" : sum(token_counts) / len(chunks), "min_tokens" : min(token_counts), "max_tokens" : max(token_counts), "total_chars" : sum(char_counts), "avg_chars_per_chunk" : sum(char_counts) / len(chunks), "strategy" : self.strategy_name.value, "chunks_with_sections" : chunks_with_sections, "section_coverage_pct" : (chunks_with_sections / len(chunks)) * 100, } return stats def merge_chunks(self, chunks: List[DocumentChunk], max_tokens: int) -> List[DocumentChunk]: """ Merge small chunks up to max_tokens: Useful for optimizing chunk sizes Arguments: ---------- chunks { list } : List of chunks to merge max_tokens { int } : Maximum tokens per merged chunk Returns: -------- { list } : List of merged chunks """ if not chunks: return [] merged = list() current_chunks = list() current_tokens = 0 document_id = chunks[0].document_id for chunk in chunks: if ((current_tokens + chunk.token_count) <= max_tokens): current_chunks.append(chunk) current_tokens += chunk.token_count else: # Save current merged chunk if current_chunks: merged_text = " ".join(c.text for c in current_chunks) merged_chunk = self._create_chunk(text = merged_text, chunk_index = len(merged), document_id = document_id, start_char = current_chunks[0].start_char, end_char = current_chunks[-1].end_char, page_number = current_chunks[0].page_number, section_title = current_chunks[0].section_title, ) merged.append(merged_chunk) # Start new chunk current_chunks = [chunk] current_tokens = chunk.token_count # Add final merged chunk if current_chunks: merged_text = " ".join(c.text for c in current_chunks) merged_chunk = self._create_chunk(text = merged_text, chunk_index = len(merged), document_id = document_id, start_char = current_chunks[0].start_char, end_char = current_chunks[-1].end_char, page_number = current_chunks[0].page_number, section_title = current_chunks[0].section_title, ) merged.append(merged_chunk) self.logger.info(f"Merged {len(chunks)} chunks into {len(merged)}") return merged def __str__(self) -> str: """ String representation """ return f"{self.__class__.__name__}(strategy={self.strategy_name.value})" def __repr__(self) -> str: """ Detailed representation """ return self.__str__() class ChunkerConfig: """ Configuration for chunking strategies: Provides a way to pass parameters to chunkers """ def __init__(self, chunk_size: int = 512, overlap: int = 50, respect_boundaries: bool = True, min_chunk_size: int = 100, **kwargs): """ Initialize chunker configuration Arguments: ---------- chunk_size { int } : Target chunk size in tokens overlap { int } : Overlap between chunks in tokens respect_boundaries { bool } : Respect sentence/paragraph/section boundaries min_chunk_size { int } : Minimum chunk size in tokens **kwargs : Additional strategy-specific parameters """ self.chunk_size = chunk_size self.overlap = overlap self.respect_boundaries = respect_boundaries self.min_chunk_size = min_chunk_size self.extra = kwargs def to_dict(self) -> dict: """ Convert to dictionary """ return {"chunk_size" : self.chunk_size, "overlap" : self.overlap, "respect_boundaries" : self.respect_boundaries, "min_chunk_size" : self.min_chunk_size, **self.extra } def __repr__(self) -> str: return f"ChunkerConfig({self.to_dict()})"