Spaces:
Sleeping
Sleeping
| """Token-aware text chunking with sliding window strategy. | |
| This module provides token-aware chunking capabilities for the RAG pipeline. | |
| It implements a sliding window chunker that respects sentence and paragraph | |
| boundaries while maintaining consistent token counts across chunks. | |
| Key Features: | |
| - Lazy loading of tiktoken for fast module import | |
| - Token counting with GPT-4/Claude compatible tokenizer (cl100k_base) | |
| - Sliding window chunking with configurable overlap | |
| - Sentence and paragraph boundary preservation | |
| - Integration with ChunkingConfig for consistent configuration | |
| Components: | |
| - Tokenizer: Lazy-loaded wrapper around tiktoken for token operations | |
| - SlidingWindowChunker: Main chunker class implementing sliding window strategy | |
| Lazy Loading: | |
| The tiktoken library is loaded on first use via the __getattr__ pattern. | |
| This ensures fast import times when the module is not immediately needed. | |
| Design Principles: | |
| - Token-aware splitting ensures chunks fit within embedding model limits | |
| - Overlap between chunks maintains context continuity | |
| - Natural text boundaries (sentences, paragraphs) are preferred split points | |
| - Short documents below min_tokens are not split | |
| Example: | |
| ------- | |
| >>> from rag_chatbot.chunking import ChunkingConfig, SlidingWindowChunker | |
| >>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1) | |
| >>> chunker = SlidingWindowChunker(config) | |
| >>> chunks = chunker.chunk_text( | |
| ... text="Long document text here...", | |
| ... source="document.pdf", | |
| ... page=1 | |
| ... ) | |
| >>> for chunk in chunks: | |
| ... print(f"Chunk {chunk.chunk_id}: {chunk.token_count} tokens") | |
| Note: | |
| ---- | |
| This module uses the cl100k_base encoding which is compatible with | |
| GPT-4, GPT-3.5-turbo, and Claude models. The tokenizer provides | |
| accurate token counts for chunking decisions. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING | |
| # ============================================================================= | |
| # Type Checking Imports | |
| # ============================================================================= | |
| # These imports are only processed by type checkers (mypy, pyright) and IDEs. | |
| # They enable proper type hints without runtime overhead. | |
| # ============================================================================= | |
| if TYPE_CHECKING: | |
| import tiktoken | |
| from .models import Chunk, ChunkingConfig, TextNormalizer | |
| # ============================================================================= | |
| # Module Exports | |
| # ============================================================================= | |
| __all__: list[str] = [ | |
| "Tokenizer", | |
| "SlidingWindowChunker", | |
| ] | |
| # ============================================================================= | |
| # Module-Level State for Lazy Loading | |
| # ============================================================================= | |
| # These module-level variables hold lazily-loaded instances. | |
| # They are initialized to None and populated on first access. | |
| # ============================================================================= | |
| # Cached tiktoken encoding instance (loaded on first use) | |
| _tiktoken_encoding: tiktoken.Encoding | None = None | |
| # Default encoding name compatible with GPT-4 and Claude | |
| _DEFAULT_ENCODING: str = "cl100k_base" | |
| # ============================================================================= | |
| # Lazy Loading Infrastructure | |
| # ============================================================================= | |
| def _get_tiktoken_encoding(encoding_name: str = _DEFAULT_ENCODING) -> tiktoken.Encoding: | |
| """Get or create the tiktoken encoding instance (lazy loading). | |
| This function implements lazy loading for the tiktoken library. | |
| The encoding is created on first call and cached for subsequent calls. | |
| Args: | |
| ---- | |
| encoding_name: The name of the tiktoken encoding to use. | |
| Defaults to "cl100k_base" (GPT-4/Claude compatible). | |
| Returns: | |
| ------- | |
| The tiktoken Encoding instance. | |
| Raises: | |
| ------ | |
| ImportError: If tiktoken is not installed. | |
| Note: | |
| ---- | |
| The encoding is cached at module level to avoid repeated | |
| initialization overhead. | |
| """ | |
| global _tiktoken_encoding # noqa: PLW0603 | |
| # Return cached encoding if available and matches requested encoding | |
| if _tiktoken_encoding is not None: | |
| return _tiktoken_encoding | |
| # Import tiktoken only when first needed (lazy loading) | |
| import tiktoken as _tiktoken | |
| # Create and cache the encoding | |
| _tiktoken_encoding = _tiktoken.get_encoding(encoding_name) | |
| return _tiktoken_encoding | |
| # ============================================================================= | |
| # Sentence and Paragraph Detection Helpers | |
| # ============================================================================= | |
| # These regex patterns and helper functions identify natural text boundaries | |
| # for making intelligent split decisions. | |
| # ============================================================================= | |
| # Pattern for finding sentence boundaries within text | |
| # Matches the end of a sentence (punctuation) followed by space(s) before next sentence | |
| _SENTENCE_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"([.!?])(\s+)(?=[A-Z])") | |
| # Pattern for paragraph boundaries: double newlines (with optional whitespace) | |
| _PARAGRAPH_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"\n\s*\n") | |
| # Pattern for word boundaries (whitespace) | |
| _WORD_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"\s+") | |
| def find_sentence_boundaries(text: str) -> list[int]: | |
| r"""Find all sentence boundary positions in the text. | |
| A sentence boundary is defined as a position immediately after | |
| sentence-ending punctuation (., !, ?) followed by whitespace | |
| and either a capital letter or end of text. | |
| Args: | |
| ---- | |
| text: The text to analyze for sentence boundaries. | |
| Returns: | |
| ------- | |
| List of character positions where sentences end (after punctuation). | |
| Empty list if no sentence boundaries are found. | |
| Example: | |
| ------- | |
| >>> text = "First sentence. Second one! Third?" | |
| >>> boundaries = find_sentence_boundaries(text) | |
| >>> boundaries # Positions after '.', '!', and '?' | |
| [15, 27, 34] | |
| Note: | |
| ---- | |
| The returned positions are after the punctuation mark itself, | |
| suitable for splitting text at sentence boundaries. | |
| """ | |
| boundaries: list[int] = [] | |
| # Find all matches of sentence-ending patterns | |
| for match in _SENTENCE_BOUNDARY_PATTERN.finditer(text): | |
| # The boundary is after the punctuation (end of group 1) | |
| boundaries.append(match.end(1)) | |
| # Check for sentence ending at the very end of text | |
| if text and text.rstrip()[-1:] in ".!?": | |
| final_pos = len(text.rstrip()) | |
| if final_pos not in boundaries: | |
| boundaries.append(final_pos) | |
| return sorted(boundaries) | |
| def find_paragraph_boundaries(text: str) -> list[int]: | |
| r"""Find all paragraph boundary positions in the text. | |
| A paragraph boundary is defined as a position where double newlines | |
| (with optional whitespace between) occur, indicating a paragraph break. | |
| Args: | |
| ---- | |
| text: The text to analyze for paragraph boundaries. | |
| Returns: | |
| ------- | |
| List of character positions where paragraph breaks occur. | |
| The position is at the start of the double-newline sequence. | |
| Empty list if no paragraph boundaries are found. | |
| Example: | |
| ------- | |
| >>> text = "First paragraph.\n\nSecond paragraph." | |
| >>> boundaries = find_paragraph_boundaries(text) | |
| >>> boundaries | |
| [16] | |
| Note: | |
| ---- | |
| The returned positions are at the start of the whitespace between | |
| paragraphs, suitable for splitting text at paragraph boundaries. | |
| """ | |
| boundaries: list[int] = [] | |
| # Find all matches of paragraph break patterns | |
| for match in _PARAGRAPH_BOUNDARY_PATTERN.finditer(text): | |
| boundaries.append(match.start()) | |
| return sorted(boundaries) | |
| def find_word_boundaries(text: str) -> list[int]: | |
| """Find all word boundary positions in the text. | |
| A word boundary is defined as a position where whitespace occurs, | |
| suitable for splitting text without breaking words. | |
| Args: | |
| ---- | |
| text: The text to analyze for word boundaries. | |
| Returns: | |
| ------- | |
| List of character positions where word boundaries occur. | |
| The position is at the start of the whitespace. | |
| Empty list if no word boundaries are found. | |
| Example: | |
| ------- | |
| >>> text = "Hello world example" | |
| >>> boundaries = find_word_boundaries(text) | |
| >>> boundaries | |
| [5, 11] | |
| """ | |
| boundaries: list[int] = [] | |
| # Find all matches of whitespace | |
| for match in _WORD_BOUNDARY_PATTERN.finditer(text): | |
| boundaries.append(match.start()) | |
| return sorted(boundaries) | |
| def _find_best_boundary_in_list( | |
| boundaries: list[int], | |
| target_pos: int, | |
| current_best: int, | |
| ) -> int: | |
| """Find the best boundary from a sorted list that doesn't exceed target. | |
| Helper function to reduce code duplication in find_best_split_point. | |
| Args: | |
| ---- | |
| boundaries: Sorted list of boundary positions. | |
| target_pos: Maximum position to consider. | |
| current_best: Current best position found. | |
| Returns: | |
| ------- | |
| The best boundary position found, or current_best if none better. | |
| """ | |
| for boundary in boundaries: | |
| if boundary > target_pos: | |
| break | |
| if boundary > current_best: | |
| current_best = boundary | |
| return current_best | |
| def find_best_split_point( | |
| text: str, | |
| target_pos: int, | |
| preserve_sentences: bool = True, | |
| preserve_paragraphs: bool = True, | |
| ) -> int: | |
| """Find the best split point near the target position. | |
| This function finds a natural text boundary (paragraph, sentence, or word) | |
| that is closest to the target position without exceeding it. The preference | |
| order for split points is: | |
| 1. Paragraph boundary (if preserve_paragraphs is True) | |
| 2. Sentence boundary (if preserve_sentences is True) | |
| 3. Word boundary | |
| 4. Character position (fallback) | |
| Args: | |
| ---- | |
| text: The text to find a split point in. | |
| target_pos: The target character position to split near. | |
| The returned position will be <= target_pos. | |
| preserve_sentences: If True, prefer sentence boundaries over words. | |
| preserve_paragraphs: If True, prefer paragraph boundaries over sentences. | |
| Returns: | |
| ------- | |
| The best split point position (<= target_pos). | |
| Returns target_pos if no better boundary is found. | |
| Example: | |
| ------- | |
| >>> text = "First sentence. Second sentence here." | |
| >>> # Target is in the middle of "Second" | |
| >>> find_best_split_point(text, 22, preserve_sentences=True) | |
| 15 # Returns position after first sentence | |
| Note: | |
| ---- | |
| If no suitable boundary is found before target_pos, the function | |
| returns target_pos as a fallback (character-level split). | |
| """ | |
| # Handle edge cases | |
| if target_pos <= 0: | |
| return 0 | |
| if target_pos >= len(text): | |
| return len(text) | |
| best_pos = 0 # Default to start of text | |
| # Try paragraph boundaries first (highest preference) | |
| if preserve_paragraphs: | |
| para_boundaries = find_paragraph_boundaries(text) | |
| best_pos = _find_best_boundary_in_list(para_boundaries, target_pos, best_pos) | |
| # Try sentence boundaries next | |
| if preserve_sentences: | |
| sent_boundaries = find_sentence_boundaries(text) | |
| best_pos = _find_best_boundary_in_list(sent_boundaries, target_pos, best_pos) | |
| # Try word boundaries as fallback | |
| word_boundaries = find_word_boundaries(text) | |
| best_pos = _find_best_boundary_in_list(word_boundaries, target_pos, best_pos) | |
| # If we still have 0, use target_pos (character-level split) | |
| return best_pos if best_pos > 0 else target_pos | |
| # ============================================================================= | |
| # Tokenizer Class | |
| # ============================================================================= | |
| class Tokenizer: | |
| """Lazy-loading wrapper around tiktoken for token operations. | |
| This class provides a clean interface for token counting, encoding, | |
| and decoding operations using the tiktoken library. The tiktoken | |
| library is loaded lazily on first use to minimize import time. | |
| The default encoding (cl100k_base) is compatible with: | |
| - GPT-4 and GPT-3.5-turbo models | |
| - Claude models (approximate compatibility) | |
| - Text embedding models like text-embedding-3-small | |
| Attributes: | |
| ---------- | |
| encoding_name : str | |
| The name of the tiktoken encoding being used. | |
| Example: | |
| ------- | |
| >>> tokenizer = Tokenizer() | |
| >>> tokenizer.count_tokens("Hello, world!") | |
| 4 | |
| >>> tokens = tokenizer.encode("Hello, world!") | |
| >>> tokenizer.decode(tokens) | |
| 'Hello, world!' | |
| Note: | |
| ---- | |
| The tokenizer is thread-safe for read operations (count_tokens, | |
| encode, decode). The tiktoken library handles thread safety internally. | |
| """ | |
| def __init__(self, encoding_name: str = _DEFAULT_ENCODING) -> None: | |
| """Initialize the Tokenizer with a specific encoding. | |
| The tiktoken library is NOT loaded during initialization. | |
| It will be loaded lazily on first method call. | |
| Args: | |
| ---- | |
| encoding_name: The name of the tiktoken encoding to use. | |
| Defaults to "cl100k_base" (GPT-4/Claude compatible). | |
| Example: | |
| ------- | |
| >>> tokenizer = Tokenizer() # Uses default cl100k_base | |
| >>> tokenizer = Tokenizer("p50k_base") # GPT-3 encoding | |
| """ | |
| self._encoding_name = encoding_name | |
| # The encoding instance is not created here (lazy loading) | |
| # It will be retrieved via _get_tiktoken_encoding on first use | |
| def encoding_name(self) -> str: | |
| """Get the name of the tiktoken encoding being used. | |
| Returns | |
| ------- | |
| The encoding name string (e.g., "cl100k_base"). | |
| """ | |
| return self._encoding_name | |
| def _get_encoding(self) -> tiktoken.Encoding: | |
| """Get the tiktoken encoding instance (lazy loading). | |
| Returns: | |
| ------- | |
| The tiktoken Encoding instance. | |
| Note: | |
| ---- | |
| This method triggers lazy loading of tiktoken if not already loaded. | |
| """ | |
| return _get_tiktoken_encoding(self._encoding_name) | |
| def count_tokens(self, text: str) -> int: | |
| """Count the number of tokens in the given text. | |
| This method encodes the text and returns the number of tokens. | |
| Useful for checking if text fits within token limits. | |
| Args: | |
| ---- | |
| text: The text to count tokens for. | |
| Returns: | |
| ------- | |
| The number of tokens in the text. | |
| Returns 0 for empty or whitespace-only text. | |
| Example: | |
| ------- | |
| >>> tokenizer = Tokenizer() | |
| >>> tokenizer.count_tokens("Hello, world!") | |
| 4 | |
| >>> tokenizer.count_tokens("The PMV model predicts thermal sensation.") | |
| 8 | |
| """ | |
| if not text or not text.strip(): | |
| return 0 | |
| encoding = self._get_encoding() | |
| return len(encoding.encode(text)) | |
| def encode(self, text: str) -> list[int]: | |
| """Encode text to a list of token IDs. | |
| This method converts text into a list of integer token IDs | |
| using the configured encoding. | |
| Args: | |
| ---- | |
| text: The text to encode. | |
| Returns: | |
| ------- | |
| List of integer token IDs. | |
| Empty list for empty text. | |
| Example: | |
| ------- | |
| >>> tokenizer = Tokenizer() | |
| >>> tokens = tokenizer.encode("Hello") | |
| >>> len(tokens) | |
| 1 | |
| >>> tokens = tokenizer.encode("Hello, world!") | |
| >>> len(tokens) | |
| 4 | |
| """ | |
| if not text: | |
| return [] | |
| encoding = self._get_encoding() | |
| # Cast to list[int] to satisfy mypy (tiktoken returns Any) | |
| return list(encoding.encode(text)) | |
| def decode(self, tokens: list[int]) -> str: | |
| """Decode a list of token IDs back to text. | |
| This method converts a list of integer token IDs back into | |
| the original text string. | |
| Args: | |
| ---- | |
| tokens: List of integer token IDs to decode. | |
| Returns: | |
| ------- | |
| The decoded text string. | |
| Empty string for empty token list. | |
| Example: | |
| ------- | |
| >>> tokenizer = Tokenizer() | |
| >>> tokens = tokenizer.encode("Hello, world!") | |
| >>> tokenizer.decode(tokens) | |
| 'Hello, world!' | |
| Note: | |
| ---- | |
| Decoding always produces valid text, but special tokens | |
| may be represented differently. | |
| """ | |
| if not tokens: | |
| return "" | |
| encoding = self._get_encoding() | |
| # Cast to str to satisfy mypy (tiktoken returns Any) | |
| return str(encoding.decode(tokens)) | |
| def truncate_to_tokens(self, text: str, max_tokens: int) -> str: | |
| """Truncate text to fit within a maximum token count. | |
| This method encodes the text, truncates to the specified | |
| number of tokens, and decodes back to text. Useful for | |
| ensuring text fits within model context limits. | |
| Args: | |
| ---- | |
| text: The text to truncate. | |
| max_tokens: Maximum number of tokens to keep. | |
| Must be >= 0. | |
| Returns: | |
| ------- | |
| The truncated text, or the original text if already | |
| within the limit. | |
| Empty string if max_tokens is 0. | |
| Example: | |
| ------- | |
| >>> tokenizer = Tokenizer() | |
| >>> text = "This is a longer sentence with many tokens." | |
| >>> tokenizer.truncate_to_tokens(text, 5) | |
| 'This is a longer sentence' | |
| Note: | |
| ---- | |
| Truncation may produce text that ends mid-word if the | |
| token boundary falls within a word. | |
| """ | |
| if max_tokens <= 0: | |
| return "" | |
| if not text: | |
| return "" | |
| encoding = self._get_encoding() | |
| tokens = encoding.encode(text) | |
| # Check if truncation is needed | |
| if len(tokens) <= max_tokens: | |
| return text | |
| # Truncate and decode | |
| truncated_tokens = tokens[:max_tokens] | |
| # Cast to str to satisfy mypy (tiktoken returns Any) | |
| return str(encoding.decode(truncated_tokens)) | |
| # ============================================================================= | |
| # SlidingWindowChunker Class | |
| # ============================================================================= | |
| class SlidingWindowChunker: | |
| """Token-aware chunker using sliding window strategy with overlap. | |
| This class implements a sliding window chunking algorithm that: | |
| - Respects token limits (min_tokens, max_tokens) | |
| - Preserves sentence boundaries when possible | |
| - Preserves paragraph boundaries when possible | |
| - Maintains overlap between consecutive chunks for context | |
| - Uses token-accurate splitting via tiktoken | |
| The chunker is designed for preparing text for embedding models | |
| that have fixed context windows. The overlap ensures that context | |
| is maintained across chunk boundaries. | |
| Attributes: | |
| ---------- | |
| config : ChunkingConfig | |
| The configuration parameters for chunking. | |
| normalizer : TextNormalizer | None | |
| Optional text normalizer for pre-processing. | |
| tokenizer : Tokenizer | |
| The tokenizer instance for token operations. | |
| Example: | |
| ------- | |
| >>> from rag_chatbot.chunking import ChunkingConfig, SlidingWindowChunker | |
| >>> config = ChunkingConfig( | |
| ... min_tokens=450, | |
| ... max_tokens=700, | |
| ... overlap_percent=0.12, | |
| ... preserve_sentences=True, | |
| ... preserve_paragraphs=True, | |
| ... ) | |
| >>> chunker = SlidingWindowChunker(config) | |
| >>> chunks = chunker.chunk_text( | |
| ... text="Long document content...", | |
| ... source="document.pdf", | |
| ... page=1, | |
| ... ) | |
| Note: | |
| ---- | |
| Short documents with fewer tokens than min_tokens are NOT chunked. | |
| They are returned as a single chunk to avoid unnecessary splitting. | |
| """ | |
| def __init__( | |
| self, | |
| config: ChunkingConfig, | |
| normalizer: TextNormalizer | None = None, | |
| ) -> None: | |
| """Initialize the SlidingWindowChunker with configuration. | |
| Args: | |
| ---- | |
| config: ChunkingConfig instance with chunking parameters. | |
| Defines min_tokens, max_tokens, overlap_percent, and | |
| boundary preservation settings. | |
| normalizer: Optional TextNormalizer for pre-processing text | |
| before chunking. If provided, text will be normalized | |
| before being split into chunks. | |
| Example: | |
| ------- | |
| >>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1) | |
| >>> chunker = SlidingWindowChunker(config) | |
| >>> # With normalizer | |
| >>> from rag_chatbot.chunking import TextNormalizer | |
| >>> normalizer = TextNormalizer() | |
| >>> chunker = SlidingWindowChunker(config, normalizer=normalizer) | |
| """ | |
| self._config = config | |
| self._normalizer = normalizer | |
| self._tokenizer = Tokenizer() | |
| # Pre-calculate overlap tokens for efficiency | |
| self._overlap_tokens = config.calculate_overlap_tokens() | |
| def config(self) -> ChunkingConfig: | |
| """Get the chunking configuration. | |
| Returns | |
| ------- | |
| The ChunkingConfig instance. | |
| """ | |
| return self._config | |
| def normalizer(self) -> TextNormalizer | None: | |
| """Get the text normalizer (if any). | |
| Returns | |
| ------- | |
| The TextNormalizer instance, or None if not configured. | |
| """ | |
| return self._normalizer | |
| def tokenizer(self) -> Tokenizer: | |
| """Get the tokenizer instance. | |
| Returns | |
| ------- | |
| The Tokenizer instance. | |
| """ | |
| return self._tokenizer | |
| def _generate_chunk_id(self, source: str, chunk_index: int) -> str: | |
| """Generate a unique chunk ID from source and index. | |
| The chunk ID format is "{source_basename}_{chunk_index:03d}" | |
| where source_basename is the filename without extension. | |
| Args: | |
| ---- | |
| source: The source document path or identifier. | |
| chunk_index: The 0-based index of this chunk. | |
| Returns: | |
| ------- | |
| A unique chunk ID string. | |
| Example: | |
| ------- | |
| >>> chunker._generate_chunk_id("document.pdf", 0) | |
| 'document_000' | |
| >>> chunker._generate_chunk_id("/path/to/file.pdf", 42) | |
| 'file_042' | |
| """ | |
| # Extract basename without extension | |
| source_path = Path(source) | |
| basename = source_path.stem | |
| # Format with zero-padded index | |
| return f"{basename}_{chunk_index:03d}" | |
| def _find_char_position_for_tokens( | |
| self, | |
| text: str, | |
| target_tokens: int, | |
| ) -> int: | |
| """Find the character position corresponding to a token count. | |
| This method performs a binary search to find the character position | |
| in the text that corresponds approximately to the target number of | |
| tokens. This is used to convert token-based boundaries to character | |
| positions for text splitting. | |
| Args: | |
| ---- | |
| text: The text to analyze. | |
| target_tokens: The target number of tokens. | |
| Returns: | |
| ------- | |
| The character position that yields approximately target_tokens. | |
| May be slightly less to avoid exceeding the token limit. | |
| Note: | |
| ---- | |
| This method uses binary search for efficiency, as encoding | |
| the entire text repeatedly would be slow for large documents. | |
| """ | |
| if not text: | |
| return 0 | |
| # Quick check: if entire text is within target, return length | |
| total_tokens = self._tokenizer.count_tokens(text) | |
| if total_tokens <= target_tokens: | |
| return len(text) | |
| # Binary search for the character position | |
| low = 0 | |
| high = len(text) | |
| best_pos = 0 | |
| while low <= high: | |
| mid = (low + high) // 2 | |
| # Count tokens up to this position | |
| tokens = self._tokenizer.count_tokens(text[:mid]) | |
| if tokens <= target_tokens: | |
| best_pos = mid | |
| low = mid + 1 | |
| else: | |
| high = mid - 1 | |
| return best_pos | |
| def _extract_chunk_text( | |
| self, | |
| text: str, | |
| start_char: int, | |
| max_tokens: int, | |
| ) -> tuple[str, int]: | |
| """Extract chunk text starting from a position with token limit. | |
| This method extracts text starting from start_char, attempting to | |
| fit max_tokens while respecting natural text boundaries. | |
| Args: | |
| ---- | |
| text: The full text to extract from. | |
| start_char: The starting character position. | |
| max_tokens: Maximum tokens for this chunk. | |
| Returns: | |
| ------- | |
| Tuple of (chunk_text, end_char) where end_char is the | |
| character position where this chunk ends. | |
| """ | |
| # Get remaining text from start position | |
| remaining_text = text[start_char:] | |
| if not remaining_text.strip(): | |
| return "", start_char | |
| # Find approximate character position for max_tokens | |
| approx_end = self._find_char_position_for_tokens(remaining_text, max_tokens) | |
| if approx_end >= len(remaining_text): | |
| # Remaining text fits within max_tokens | |
| chunk_text = remaining_text.strip() | |
| return chunk_text, start_char + len(remaining_text) | |
| # Find best split point respecting boundaries | |
| split_pos = find_best_split_point( | |
| remaining_text, | |
| approx_end, | |
| preserve_sentences=self._config.preserve_sentences, | |
| preserve_paragraphs=self._config.preserve_paragraphs, | |
| ) | |
| # Extract the chunk text | |
| chunk_text = remaining_text[:split_pos].strip() | |
| end_char = start_char + split_pos | |
| return chunk_text, end_char | |
| def _calculate_overlap_start( | |
| self, | |
| text: str, | |
| current_end: int, | |
| ) -> int: | |
| """Calculate the start position for overlap with the next chunk. | |
| This method determines where the next chunk should start to | |
| include the configured overlap with the current chunk. | |
| Args: | |
| ---- | |
| text: The full text being chunked. | |
| current_end: The end position of the current chunk. | |
| Returns: | |
| ------- | |
| The start position for the next chunk (accounting for overlap). | |
| Returns current_end if no overlap is configured. | |
| """ | |
| if self._overlap_tokens <= 0: | |
| return current_end | |
| # Get the text before current_end | |
| text_before_end = text[:current_end] | |
| # Find how far back we need to go for overlap_tokens | |
| # We work backwards from current_end | |
| overlap_start = current_end | |
| # Count tokens from the end, working backwards | |
| for i in range(current_end, -1, -1): | |
| segment = text_before_end[i:current_end] | |
| tokens = self._tokenizer.count_tokens(segment) | |
| if tokens >= self._overlap_tokens: | |
| overlap_start = i | |
| break | |
| overlap_start = i | |
| # Find a natural boundary for the overlap start | |
| # We want to start at a good boundary, not mid-word | |
| overlap_text = text[overlap_start:current_end] | |
| boundaries = find_word_boundaries(overlap_text) | |
| if boundaries: | |
| # Adjust to nearest word boundary | |
| first_boundary = boundaries[0] | |
| overlap_start = overlap_start + first_boundary + 1 # +1 to skip whitespace | |
| return overlap_start | |
| def chunk_text( | |
| self, | |
| text: str, | |
| source: str, | |
| page: int = 1, | |
| start_offset: int = 0, | |
| ) -> list[Chunk]: | |
| """Split text into token-aware chunks with overlap. | |
| This is the main chunking method. It splits the input text into | |
| chunks that respect the configured token limits and boundary | |
| preferences. Short documents (< min_tokens) are not chunked. | |
| Args: | |
| ---- | |
| text: The text content to chunk. | |
| source: Source document identifier (e.g., "document.pdf"). | |
| Used for generating chunk IDs and source attribution. | |
| page: The page number where this text originates (1-indexed). | |
| Defaults to 1. | |
| start_offset: Character offset for this text within the source. | |
| Used for tracking character positions across a larger document. | |
| Defaults to 0. | |
| Returns: | |
| ------- | |
| List of Chunk objects, each containing: | |
| - chunk_id: Unique identifier | |
| - text: The chunk content | |
| - source: Source document identifier | |
| - page: Page number | |
| - start_char: Starting character position | |
| - end_char: Ending character position | |
| - token_count: Number of tokens in chunk | |
| - heading_path: Empty list (to be filled by upstream processor) | |
| - chunk_hash: Auto-generated content hash | |
| Example: | |
| ------- | |
| >>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1) | |
| >>> chunker = SlidingWindowChunker(config) | |
| >>> chunks = chunker.chunk_text( | |
| ... text="Long document text here...", | |
| ... source="document.pdf", | |
| ... page=1, | |
| ... ) | |
| >>> len(chunks) | |
| 3 | |
| >>> chunks[0].token_count <= 500 | |
| True | |
| Note: | |
| ---- | |
| The heading_path field is left empty. It should be filled by | |
| an upstream processor (like HeadingParser) that has access to | |
| the document structure. | |
| """ | |
| # Import Chunk model here to avoid circular imports | |
| # This is a local import pattern used throughout the codebase | |
| from .models import Chunk | |
| # Handle empty input | |
| if not text or not text.strip(): | |
| return [] | |
| # Apply text normalization if normalizer is configured | |
| processed_text = text | |
| if self._normalizer is not None: | |
| processed_text = self._normalizer.normalize(text) | |
| # Check total token count | |
| total_tokens = self._tokenizer.count_tokens(processed_text) | |
| # Short document handling: don't chunk if below min_tokens | |
| if total_tokens < self._config.min_tokens: | |
| # Return single chunk for short documents | |
| chunk = Chunk( | |
| chunk_id=self._generate_chunk_id(source, 0), | |
| text=processed_text.strip(), | |
| heading_path=[], # To be filled by upstream processor | |
| source=source, | |
| page=page, | |
| start_char=start_offset, | |
| end_char=start_offset + len(processed_text), | |
| token_count=total_tokens, | |
| ) | |
| return [chunk] | |
| # Initialize chunking state | |
| chunks: list[Chunk] = [] | |
| chunk_index = 0 | |
| current_pos = 0 | |
| text_length = len(processed_text) | |
| # Main chunking loop | |
| while current_pos < text_length: | |
| # Check if remaining text is small enough to be the last chunk | |
| remaining_text = processed_text[current_pos:] | |
| remaining_tokens = self._tokenizer.count_tokens(remaining_text) | |
| if remaining_tokens <= self._config.max_tokens: | |
| # Last chunk: include all remaining text | |
| chunk_text = remaining_text.strip() | |
| if chunk_text: # Only create chunk if non-empty | |
| chunk = Chunk( | |
| chunk_id=self._generate_chunk_id(source, chunk_index), | |
| text=chunk_text, | |
| heading_path=[], | |
| source=source, | |
| page=page, | |
| start_char=start_offset + current_pos, | |
| end_char=start_offset + text_length, | |
| token_count=remaining_tokens, | |
| ) | |
| chunks.append(chunk) | |
| break | |
| # Extract chunk respecting boundaries | |
| chunk_text, end_pos = self._extract_chunk_text( | |
| processed_text, | |
| current_pos, | |
| self._config.max_tokens, | |
| ) | |
| # Handle edge case: no progress made (shouldn't happen but be safe) | |
| if end_pos <= current_pos: | |
| # Force progress by taking at least some text | |
| end_pos = min(current_pos + 100, text_length) | |
| chunk_text = processed_text[current_pos:end_pos].strip() | |
| # Create chunk if we have content | |
| if chunk_text: | |
| token_count = self._tokenizer.count_tokens(chunk_text) | |
| chunk = Chunk( | |
| chunk_id=self._generate_chunk_id(source, chunk_index), | |
| text=chunk_text, | |
| heading_path=[], | |
| source=source, | |
| page=page, | |
| start_char=start_offset + current_pos, | |
| end_char=start_offset + end_pos, | |
| token_count=token_count, | |
| ) | |
| chunks.append(chunk) | |
| chunk_index += 1 | |
| # Calculate next start position with overlap | |
| next_pos = self._calculate_overlap_start(processed_text, end_pos) | |
| # Ensure we make progress (avoid infinite loop) | |
| if next_pos <= current_pos: | |
| next_pos = end_pos | |
| current_pos = next_pos | |
| return chunks | |