QuerySphere / chunking /base_chunker.py
satyakimitra's picture
first commit
0a4529c
# DEPENDENCIES
import re
from abc import ABC
from typing import List
from pathlib import Path
from typing import Optional
from abc import abstractmethod
from config.models import DocumentChunk
from config.models import DocumentMetadata
from config.models import ChunkingStrategy
from config.logging_config import get_logger
from chunking.token_counter import count_tokens
# Setup Logging
logger = get_logger(__name__)
class BaseChunker(ABC):
"""
Abstract base class for all chunking strategies: Implements Template Method pattern for consistent chunking pipeline
"""
def __init__(self, strategy_name: ChunkingStrategy):
"""
Initialize base chunker
Arguments:
----------
strategy_name { ChunkingStrategy } : Chunking strategy enum
"""
self.strategy_name = strategy_name
self.logger = logger
@abstractmethod
def chunk_text(self, text: str, metadata: Optional[DocumentMetadata] = None) -> List[DocumentChunk]:
"""
Chunk text into smaller pieces - must be implemented by subclasses
Arguments:
----------
text { str } : Input text to chunk
metadata { DocumentMetadata } : Document metadata
Returns:
--------
{ list } : List of DocumentChunk objects
"""
pass
def chunk_document(self, text: str, metadata: DocumentMetadata) -> List[DocumentChunk]:
"""
Chunk document with full metadata: Template method that calls chunk_text and adds metadata
Arguments:
----------
text { str } : Document text
metadata { DocumentMetadata } : Document metadata
Returns:
--------
{ list } : List of DocumentChunk objects with metadata
"""
try:
self.logger.info(f"Chunking document {metadata.document_id} using {self.strategy_name.value}")
# Validate input
if not text or not text.strip():
self.logger.warning(f"Empty text for document {metadata.document_id}")
return []
# Perform chunking
chunks = self.chunk_text(text = text,
metadata = metadata,
)
# Update metadata
metadata.num_chunks = len(chunks)
metadata.chunking_strategy = self.strategy_name
# Validate chunks
if not self.validate_chunks(chunks):
self.logger.warning(f"Chunk validation failed for {metadata.document_id}")
self.logger.info(f"Created {len(chunks)} chunks for {metadata.document_id}")
return chunks
except Exception as e:
self.logger.error(f"Chunking failed for {metadata.document_id}: {repr(e)}")
raise
def _create_chunk(self, text: str, chunk_index: int, document_id: str, start_char: int, end_char: int, page_number: Optional[int] = None,
section_title: Optional[str] = None, metadata: Optional[dict] = None) -> DocumentChunk:
"""
Create a DocumentChunk object with proper formatting
Arguments:
----------
text { str } : Chunk text
chunk_index { int } : Index of chunk in document
document_id { str } : Parent document ID
start_char { int } : Start character position
end_char { int } : End character position
page_number { int } : Page number (if applicable)
section_title { str } : Section heading (CRITICAL for retrieval)
metadata { dict } : Additional metadata
Returns:
--------
{ DocumentChunk } : DocumentChunk object
"""
# Generate unique chunk ID
chunk_id = f"chunk_{document_id}_{chunk_index}"
# Count tokens
token_count = count_tokens(text)
# Create chunk with section context
chunk = DocumentChunk(chunk_id = chunk_id,
document_id = document_id,
text = text,
chunk_index = chunk_index,
start_char = start_char,
end_char = end_char,
page_number = page_number,
section_title = section_title,
token_count = token_count,
metadata = metadata or {},
)
return chunk
def _extract_page_number(self, text: str, full_text: str) -> Optional[int]:
"""
Try to extract page number from text: Looks for [PAGE N] markers inserted during parsing
"""
# Look for page markers in current chunk
page_match = re.search(r'\[PAGE (\d+)\]', text)
if page_match:
return int(page_match.group(1))
# Alternative: try to determine from position in full text
if full_text:
chunk_start = full_text.find(text[:min(200, len(text))])
if (chunk_start >= 0):
text_before = full_text[:chunk_start]
page_matches = re.findall(r'\[PAGE (\d+)\]', text_before)
if page_matches:
return int(page_matches[-1])
return None
def _clean_chunk_text(self, text: str) -> str:
"""
Clean chunk text by removing markers and extra whitespace
Arguments:
----------
text { str } : Raw chunk text
Returns:
--------
{ str } : Cleaned text
"""
# Remove page markers
text = re.sub(r'\[PAGE \d+\]', '', text)
# Remove other common markers
text = re.sub(r'\[HEADER\]|\[FOOTER\]|\[TABLE \d+\]', '', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def validate_chunks(self, chunks: List[DocumentChunk]) -> bool:
"""
Validate chunk list for consistency
Arguments:
----------
chunks { list } : List of chunks to validate
Returns:
--------
{ bool } : True if valid
"""
if not chunks:
return True
# Check all chunks have the same document_id
doc_ids = {chunk.document_id for chunk in chunks}
if (len(doc_ids) > 1):
self.logger.error(f"Chunks have multiple document IDs: {doc_ids}")
return False
# Check chunk indices are sequential
indices = [chunk.chunk_index for chunk in chunks]
expected_indices = list(range(len(chunks)))
if (indices != expected_indices):
self.logger.warning(f"Non-sequential chunk indices: {indices}")
# Check for empty chunks
empty_chunks = [c.chunk_index for c in chunks if not c.text.strip()]
if empty_chunks:
self.logger.warning(f"Empty chunks at indices: {empty_chunks}")
# Check token counts
zero_token_chunks = [c.chunk_index for c in chunks if (c.token_count == 0)]
if zero_token_chunks:
self.logger.warning(f"Zero-token chunks at indices: {zero_token_chunks}")
# NEW: Check section_title preservation (important for structured documents)
chunks_with_sections = [c for c in chunks if c.section_title]
if chunks_with_sections:
self.logger.info(f"{len(chunks_with_sections)}/{len(chunks)} chunks have section titles preserved")
return True
def get_chunk_statistics(self, chunks: List[DocumentChunk]) -> dict:
"""
Calculate statistics for chunk list
Arguments:
----------
chunks { list } : List of chunks
Returns:
--------
{ dict } : Dictionary with statistics
"""
if not chunks:
return {"num_chunks" : 0,
"total_tokens" : 0,
"avg_tokens_per_chunk" : 0,
"min_tokens" : 0,
"max_tokens" : 0,
"total_chars" : 0,
"avg_chars_per_chunk" : 0,
"chunks_with_sections" : 0,
}
token_counts = [c.token_count for c in chunks]
char_counts = [len(c.text) for c in chunks]
chunks_with_sections = sum(1 for c in chunks if c.section_title)
stats = {"num_chunks" : len(chunks),
"total_tokens" : sum(token_counts),
"avg_tokens_per_chunk" : sum(token_counts) / len(chunks),
"min_tokens" : min(token_counts),
"max_tokens" : max(token_counts),
"total_chars" : sum(char_counts),
"avg_chars_per_chunk" : sum(char_counts) / len(chunks),
"strategy" : self.strategy_name.value,
"chunks_with_sections" : chunks_with_sections,
"section_coverage_pct" : (chunks_with_sections / len(chunks)) * 100,
}
return stats
def merge_chunks(self, chunks: List[DocumentChunk], max_tokens: int) -> List[DocumentChunk]:
"""
Merge small chunks up to max_tokens: Useful for optimizing chunk sizes
Arguments:
----------
chunks { list } : List of chunks to merge
max_tokens { int } : Maximum tokens per merged chunk
Returns:
--------
{ list } : List of merged chunks
"""
if not chunks:
return []
merged = list()
current_chunks = list()
current_tokens = 0
document_id = chunks[0].document_id
for chunk in chunks:
if ((current_tokens + chunk.token_count) <= max_tokens):
current_chunks.append(chunk)
current_tokens += chunk.token_count
else:
# Save current merged chunk
if current_chunks:
merged_text = " ".join(c.text for c in current_chunks)
merged_chunk = self._create_chunk(text = merged_text,
chunk_index = len(merged),
document_id = document_id,
start_char = current_chunks[0].start_char,
end_char = current_chunks[-1].end_char,
page_number = current_chunks[0].page_number,
section_title = current_chunks[0].section_title,
)
merged.append(merged_chunk)
# Start new chunk
current_chunks = [chunk]
current_tokens = chunk.token_count
# Add final merged chunk
if current_chunks:
merged_text = " ".join(c.text for c in current_chunks)
merged_chunk = self._create_chunk(text = merged_text,
chunk_index = len(merged),
document_id = document_id,
start_char = current_chunks[0].start_char,
end_char = current_chunks[-1].end_char,
page_number = current_chunks[0].page_number,
section_title = current_chunks[0].section_title,
)
merged.append(merged_chunk)
self.logger.info(f"Merged {len(chunks)} chunks into {len(merged)}")
return merged
def __str__(self) -> str:
"""
String representation
"""
return f"{self.__class__.__name__}(strategy={self.strategy_name.value})"
def __repr__(self) -> str:
"""
Detailed representation
"""
return self.__str__()
class ChunkerConfig:
"""
Configuration for chunking strategies: Provides a way to pass parameters to chunkers
"""
def __init__(self, chunk_size: int = 512, overlap: int = 50, respect_boundaries: bool = True, min_chunk_size: int = 100, **kwargs):
"""
Initialize chunker configuration
Arguments:
----------
chunk_size { int } : Target chunk size in tokens
overlap { int } : Overlap between chunks in tokens
respect_boundaries { bool } : Respect sentence/paragraph/section boundaries
min_chunk_size { int } : Minimum chunk size in tokens
**kwargs : Additional strategy-specific parameters
"""
self.chunk_size = chunk_size
self.overlap = overlap
self.respect_boundaries = respect_boundaries
self.min_chunk_size = min_chunk_size
self.extra = kwargs
def to_dict(self) -> dict:
"""
Convert to dictionary
"""
return {"chunk_size" : self.chunk_size,
"overlap" : self.overlap,
"respect_boundaries" : self.respect_boundaries,
"min_chunk_size" : self.min_chunk_size,
**self.extra
}
def __repr__(self) -> str:
return f"ChunkerConfig({self.to_dict()})"