| | """ |
| | Semantic Chunking Utilities |
| | |
| | Strategies for splitting and merging document content |
| | into semantically meaningful chunks. |
| | """ |
| |
|
| | import re |
| | from dataclasses import dataclass |
| | from typing import Any, Dict, List, Optional, Tuple |
| |
|
| | from ..chunks.models import ( |
| | BoundingBox, |
| | ChunkType, |
| | DocumentChunk, |
| | ) |
| |
|
| |
|
| | @dataclass |
| | class ChunkingConfig: |
| | """Configuration for semantic chunking.""" |
| |
|
| | |
| | min_chunk_chars: int = 50 |
| | max_chunk_chars: int = 2000 |
| | target_chunk_chars: int = 500 |
| |
|
| | |
| | overlap_chars: int = 100 |
| |
|
| | |
| | split_on_headings: bool = True |
| | split_on_paragraphs: bool = True |
| | preserve_sentences: bool = True |
| |
|
| | |
| | merge_small_chunks: bool = True |
| | merge_threshold_chars: int = 100 |
| |
|
| |
|
| | class SemanticChunker: |
| | """ |
| | Semantic chunking engine. |
| | |
| | Splits text into meaningful chunks based on document structure, |
| | headings, paragraphs, and sentence boundaries. |
| | """ |
| |
|
| | |
| | HEADING_PATTERN = re.compile(r'^(?:#{1,6}\s+|[A-Z0-9][\.\)]\s+|\d+[\.\)]\s+)', re.MULTILINE) |
| | PARAGRAPH_PATTERN = re.compile(r'\n\s*\n') |
| | SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') |
| |
|
| | def __init__(self, config: Optional[ChunkingConfig] = None): |
| | self.config = config or ChunkingConfig() |
| |
|
| | def chunk_text( |
| | self, |
| | text: str, |
| | metadata: Optional[Dict[str, Any]] = None, |
| | ) -> List[Dict[str, Any]]: |
| | """ |
| | Split text into semantic chunks. |
| | |
| | Args: |
| | text: Input text to chunk |
| | metadata: Optional metadata to include with each chunk |
| | |
| | Returns: |
| | List of chunk dictionaries with text and metadata |
| | """ |
| | if not text or not text.strip(): |
| | return [] |
| |
|
| | metadata = metadata or {} |
| | chunks: List[Dict[str, Any]] = [] |
| |
|
| | |
| | if self.config.split_on_headings: |
| | sections = self._split_by_headings(text) |
| | else: |
| | sections = [{"heading": None, "text": text}] |
| |
|
| | for section in sections: |
| | section_chunks = self._chunk_section( |
| | section["text"], |
| | section.get("heading"), |
| | ) |
| | for chunk_text in section_chunks: |
| | if len(chunk_text.strip()) >= self.config.min_chunk_chars: |
| | chunks.append({ |
| | "text": chunk_text.strip(), |
| | "heading": section.get("heading"), |
| | **metadata, |
| | }) |
| |
|
| | |
| | if self.config.merge_small_chunks: |
| | chunks = self._merge_small_chunks(chunks) |
| |
|
| | return chunks |
| |
|
| | def _split_by_headings(self, text: str) -> List[Dict[str, Any]]: |
| | """Split text by heading patterns.""" |
| | sections = [] |
| | current_heading = None |
| | current_text = [] |
| |
|
| | lines = text.split("\n") |
| |
|
| | for line in lines: |
| | if self.HEADING_PATTERN.match(line): |
| | |
| | if current_text: |
| | sections.append({ |
| | "heading": current_heading, |
| | "text": "\n".join(current_text), |
| | }) |
| | current_heading = line.strip() |
| | current_text = [] |
| | else: |
| | current_text.append(line) |
| |
|
| | |
| | if current_text: |
| | sections.append({ |
| | "heading": current_heading, |
| | "text": "\n".join(current_text), |
| | }) |
| |
|
| | return sections if sections else [{"heading": None, "text": text}] |
| |
|
| | def _chunk_section( |
| | self, |
| | text: str, |
| | heading: Optional[str], |
| | ) -> List[str]: |
| | """Chunk a single section.""" |
| | if len(text) <= self.config.max_chunk_chars: |
| | return [text] |
| |
|
| | |
| | if self.config.split_on_paragraphs: |
| | paragraphs = self.PARAGRAPH_PATTERN.split(text) |
| | else: |
| | paragraphs = [text] |
| |
|
| | chunks = [] |
| | current_chunk = "" |
| |
|
| | for para in paragraphs: |
| | para = para.strip() |
| | if not para: |
| | continue |
| |
|
| | |
| | if len(current_chunk) + len(para) + 1 <= self.config.target_chunk_chars: |
| | if current_chunk: |
| | current_chunk += "\n\n" + para |
| | else: |
| | current_chunk = para |
| | else: |
| | |
| | if current_chunk: |
| | chunks.append(current_chunk) |
| |
|
| | |
| | if len(para) > self.config.max_chunk_chars: |
| | sub_chunks = self._split_long_text(para) |
| | chunks.extend(sub_chunks[:-1]) |
| | current_chunk = sub_chunks[-1] if sub_chunks else "" |
| | else: |
| | current_chunk = para |
| |
|
| | if current_chunk: |
| | chunks.append(current_chunk) |
| |
|
| | return chunks |
| |
|
| | def _split_long_text(self, text: str) -> List[str]: |
| | """Split long text by sentences.""" |
| | if not self.config.preserve_sentences: |
| | |
| | return self._split_by_chars(text) |
| |
|
| | sentences = self.SENTENCE_PATTERN.split(text) |
| | chunks = [] |
| | current_chunk = "" |
| |
|
| | for sentence in sentences: |
| | sentence = sentence.strip() |
| | if not sentence: |
| | continue |
| |
|
| | if len(current_chunk) + len(sentence) + 1 <= self.config.target_chunk_chars: |
| | if current_chunk: |
| | current_chunk += " " + sentence |
| | else: |
| | current_chunk = sentence |
| | else: |
| | if current_chunk: |
| | chunks.append(current_chunk) |
| |
|
| | if len(sentence) > self.config.max_chunk_chars: |
| | |
| | sub_chunks = self._split_by_chars(sentence) |
| | chunks.extend(sub_chunks[:-1]) |
| | current_chunk = sub_chunks[-1] if sub_chunks else "" |
| | else: |
| | current_chunk = sentence |
| |
|
| | if current_chunk: |
| | chunks.append(current_chunk) |
| |
|
| | return chunks |
| |
|
| | def _split_by_chars(self, text: str) -> List[str]: |
| | """Split text by character count with overlap.""" |
| | chunks = [] |
| | start = 0 |
| | text_len = len(text) |
| |
|
| | while start < text_len: |
| | end = min(start + self.config.target_chunk_chars, text_len) |
| |
|
| | |
| | if end < text_len: |
| | |
| | space_idx = text.rfind(" ", start, end) |
| | if space_idx > start: |
| | end = space_idx |
| |
|
| | chunks.append(text[start:end].strip()) |
| |
|
| | |
| | start = end - self.config.overlap_chars |
| | if start < 0 or start >= text_len: |
| | break |
| |
|
| | return chunks |
| |
|
| | def _merge_small_chunks( |
| | self, |
| | chunks: List[Dict[str, Any]], |
| | ) -> List[Dict[str, Any]]: |
| | """Merge chunks smaller than threshold.""" |
| | if not chunks: |
| | return chunks |
| |
|
| | merged = [] |
| | current = None |
| |
|
| | for chunk in chunks: |
| | text = chunk["text"] |
| |
|
| | if current is None: |
| | current = chunk.copy() |
| | continue |
| |
|
| | |
| | current_len = len(current["text"]) |
| | new_len = len(text) |
| |
|
| | if (current_len < self.config.merge_threshold_chars and |
| | current_len + new_len <= self.config.max_chunk_chars and |
| | current.get("heading") == chunk.get("heading")): |
| | |
| | current["text"] = current["text"] + "\n\n" + text |
| | else: |
| | merged.append(current) |
| | current = chunk.copy() |
| |
|
| | if current: |
| | merged.append(current) |
| |
|
| | return merged |
| |
|
| |
|
| | class DocumentChunkBuilder: |
| | """ |
| | Builder for creating DocumentChunk objects. |
| | |
| | Provides a fluent interface for chunk construction with |
| | automatic ID generation and validation. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | doc_id: str, |
| | page: int, |
| | ): |
| | self.doc_id = doc_id |
| | self.page = page |
| | self._chunks: List[DocumentChunk] = [] |
| | self._sequence_index = 0 |
| |
|
| | def add_chunk( |
| | self, |
| | text: str, |
| | chunk_type: ChunkType, |
| | bbox: BoundingBox, |
| | confidence: float = 1.0, |
| | metadata: Optional[Dict[str, Any]] = None, |
| | ) -> "DocumentChunkBuilder": |
| | """Add a chunk.""" |
| | chunk_id = DocumentChunk.generate_chunk_id( |
| | doc_id=self.doc_id, |
| | page=self.page, |
| | bbox=bbox, |
| | chunk_type_str=chunk_type.value, |
| | ) |
| |
|
| | chunk = DocumentChunk( |
| | chunk_id=chunk_id, |
| | doc_id=self.doc_id, |
| | chunk_type=chunk_type, |
| | text=text, |
| | page=self.page, |
| | bbox=bbox, |
| | confidence=confidence, |
| | sequence_index=self._sequence_index, |
| | metadata=metadata or {}, |
| | ) |
| |
|
| | self._chunks.append(chunk) |
| | self._sequence_index += 1 |
| | return self |
| |
|
| | def add_text( |
| | self, |
| | text: str, |
| | bbox: BoundingBox, |
| | confidence: float = 1.0, |
| | ) -> "DocumentChunkBuilder": |
| | """Add a text chunk.""" |
| | return self.add_chunk(text, ChunkType.TEXT, bbox, confidence) |
| |
|
| | def add_title( |
| | self, |
| | text: str, |
| | bbox: BoundingBox, |
| | confidence: float = 1.0, |
| | ) -> "DocumentChunkBuilder": |
| | """Add a title chunk.""" |
| | return self.add_chunk(text, ChunkType.TITLE, bbox, confidence) |
| |
|
| | def add_heading( |
| | self, |
| | text: str, |
| | bbox: BoundingBox, |
| | confidence: float = 1.0, |
| | ) -> "DocumentChunkBuilder": |
| | """Add a heading chunk.""" |
| | return self.add_chunk(text, ChunkType.HEADING, bbox, confidence) |
| |
|
| | def add_paragraph( |
| | self, |
| | text: str, |
| | bbox: BoundingBox, |
| | confidence: float = 1.0, |
| | ) -> "DocumentChunkBuilder": |
| | """Add a paragraph chunk.""" |
| | return self.add_chunk(text, ChunkType.PARAGRAPH, bbox, confidence) |
| |
|
| | def build(self) -> List[DocumentChunk]: |
| | """Build and return the list of chunks.""" |
| | return self._chunks.copy() |
| |
|
| | def reset(self) -> "DocumentChunkBuilder": |
| | """Reset the builder.""" |
| | self._chunks = [] |
| | self._sequence_index = 0 |
| | return self |
| |
|
| |
|
| | def estimate_tokens(text: str) -> int: |
| | """ |
| | Estimate token count for text. |
| | |
| | Uses simple heuristic: ~4 characters per token. |
| | """ |
| | return len(text) // 4 |
| |
|
| |
|
| | def split_for_embedding( |
| | text: str, |
| | max_tokens: int = 512, |
| | overlap_tokens: int = 50, |
| | ) -> List[str]: |
| | """ |
| | Split text for embedding model input. |
| | |
| | Args: |
| | text: Text to split |
| | max_tokens: Maximum tokens per chunk |
| | overlap_tokens: Overlap between chunks |
| | |
| | Returns: |
| | List of text chunks |
| | """ |
| | max_chars = max_tokens * 4 |
| | overlap_chars = overlap_tokens * 4 |
| |
|
| | config = ChunkingConfig( |
| | max_chunk_chars=max_chars, |
| | target_chunk_chars=max_chars - 100, |
| | overlap_chars=overlap_chars, |
| | ) |
| |
|
| | chunker = SemanticChunker(config) |
| | chunks = chunker.chunk_text(text) |
| |
|
| | return [c["text"] for c in chunks] |
| |
|