sadickam's picture
Initial commit for HF Space
3326079
"""Token-aware text chunking with sliding window strategy.
This module provides token-aware chunking capabilities for the RAG pipeline.
It implements a sliding window chunker that respects sentence and paragraph
boundaries while maintaining consistent token counts across chunks.
Key Features:
- Lazy loading of tiktoken for fast module import
- Token counting with GPT-4/Claude compatible tokenizer (cl100k_base)
- Sliding window chunking with configurable overlap
- Sentence and paragraph boundary preservation
- Integration with ChunkingConfig for consistent configuration
Components:
- Tokenizer: Lazy-loaded wrapper around tiktoken for token operations
- SlidingWindowChunker: Main chunker class implementing sliding window strategy
Lazy Loading:
The tiktoken library is loaded on first use via the __getattr__ pattern.
This ensures fast import times when the module is not immediately needed.
Design Principles:
- Token-aware splitting ensures chunks fit within embedding model limits
- Overlap between chunks maintains context continuity
- Natural text boundaries (sentences, paragraphs) are preferred split points
- Short documents below min_tokens are not split
Example:
-------
>>> from rag_chatbot.chunking import ChunkingConfig, SlidingWindowChunker
>>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1)
>>> chunker = SlidingWindowChunker(config)
>>> chunks = chunker.chunk_text(
... text="Long document text here...",
... source="document.pdf",
... page=1
... )
>>> for chunk in chunks:
... print(f"Chunk {chunk.chunk_id}: {chunk.token_count} tokens")
Note:
----
This module uses the cl100k_base encoding which is compatible with
GPT-4, GPT-3.5-turbo, and Claude models. The tokenizer provides
accurate token counts for chunking decisions.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import TYPE_CHECKING
# =============================================================================
# Type Checking Imports
# =============================================================================
# These imports are only processed by type checkers (mypy, pyright) and IDEs.
# They enable proper type hints without runtime overhead.
# =============================================================================
if TYPE_CHECKING:
import tiktoken
from .models import Chunk, ChunkingConfig, TextNormalizer
# =============================================================================
# Module Exports
# =============================================================================
__all__: list[str] = [
"Tokenizer",
"SlidingWindowChunker",
]
# =============================================================================
# Module-Level State for Lazy Loading
# =============================================================================
# These module-level variables hold lazily-loaded instances.
# They are initialized to None and populated on first access.
# =============================================================================
# Cached tiktoken encoding instance (loaded on first use)
_tiktoken_encoding: tiktoken.Encoding | None = None
# Default encoding name compatible with GPT-4 and Claude
_DEFAULT_ENCODING: str = "cl100k_base"
# =============================================================================
# Lazy Loading Infrastructure
# =============================================================================
def _get_tiktoken_encoding(encoding_name: str = _DEFAULT_ENCODING) -> tiktoken.Encoding:
"""Get or create the tiktoken encoding instance (lazy loading).
This function implements lazy loading for the tiktoken library.
The encoding is created on first call and cached for subsequent calls.
Args:
----
encoding_name: The name of the tiktoken encoding to use.
Defaults to "cl100k_base" (GPT-4/Claude compatible).
Returns:
-------
The tiktoken Encoding instance.
Raises:
------
ImportError: If tiktoken is not installed.
Note:
----
The encoding is cached at module level to avoid repeated
initialization overhead.
"""
global _tiktoken_encoding # noqa: PLW0603
# Return cached encoding if available and matches requested encoding
if _tiktoken_encoding is not None:
return _tiktoken_encoding
# Import tiktoken only when first needed (lazy loading)
import tiktoken as _tiktoken
# Create and cache the encoding
_tiktoken_encoding = _tiktoken.get_encoding(encoding_name)
return _tiktoken_encoding
# =============================================================================
# Sentence and Paragraph Detection Helpers
# =============================================================================
# These regex patterns and helper functions identify natural text boundaries
# for making intelligent split decisions.
# =============================================================================
# Pattern for finding sentence boundaries within text
# Matches the end of a sentence (punctuation) followed by space(s) before next sentence
_SENTENCE_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"([.!?])(\s+)(?=[A-Z])")
# Pattern for paragraph boundaries: double newlines (with optional whitespace)
_PARAGRAPH_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"\n\s*\n")
# Pattern for word boundaries (whitespace)
_WORD_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"\s+")
def find_sentence_boundaries(text: str) -> list[int]:
r"""Find all sentence boundary positions in the text.
A sentence boundary is defined as a position immediately after
sentence-ending punctuation (., !, ?) followed by whitespace
and either a capital letter or end of text.
Args:
----
text: The text to analyze for sentence boundaries.
Returns:
-------
List of character positions where sentences end (after punctuation).
Empty list if no sentence boundaries are found.
Example:
-------
>>> text = "First sentence. Second one! Third?"
>>> boundaries = find_sentence_boundaries(text)
>>> boundaries # Positions after '.', '!', and '?'
[15, 27, 34]
Note:
----
The returned positions are after the punctuation mark itself,
suitable for splitting text at sentence boundaries.
"""
boundaries: list[int] = []
# Find all matches of sentence-ending patterns
for match in _SENTENCE_BOUNDARY_PATTERN.finditer(text):
# The boundary is after the punctuation (end of group 1)
boundaries.append(match.end(1))
# Check for sentence ending at the very end of text
if text and text.rstrip()[-1:] in ".!?":
final_pos = len(text.rstrip())
if final_pos not in boundaries:
boundaries.append(final_pos)
return sorted(boundaries)
def find_paragraph_boundaries(text: str) -> list[int]:
r"""Find all paragraph boundary positions in the text.
A paragraph boundary is defined as a position where double newlines
(with optional whitespace between) occur, indicating a paragraph break.
Args:
----
text: The text to analyze for paragraph boundaries.
Returns:
-------
List of character positions where paragraph breaks occur.
The position is at the start of the double-newline sequence.
Empty list if no paragraph boundaries are found.
Example:
-------
>>> text = "First paragraph.\n\nSecond paragraph."
>>> boundaries = find_paragraph_boundaries(text)
>>> boundaries
[16]
Note:
----
The returned positions are at the start of the whitespace between
paragraphs, suitable for splitting text at paragraph boundaries.
"""
boundaries: list[int] = []
# Find all matches of paragraph break patterns
for match in _PARAGRAPH_BOUNDARY_PATTERN.finditer(text):
boundaries.append(match.start())
return sorted(boundaries)
def find_word_boundaries(text: str) -> list[int]:
"""Find all word boundary positions in the text.
A word boundary is defined as a position where whitespace occurs,
suitable for splitting text without breaking words.
Args:
----
text: The text to analyze for word boundaries.
Returns:
-------
List of character positions where word boundaries occur.
The position is at the start of the whitespace.
Empty list if no word boundaries are found.
Example:
-------
>>> text = "Hello world example"
>>> boundaries = find_word_boundaries(text)
>>> boundaries
[5, 11]
"""
boundaries: list[int] = []
# Find all matches of whitespace
for match in _WORD_BOUNDARY_PATTERN.finditer(text):
boundaries.append(match.start())
return sorted(boundaries)
def _find_best_boundary_in_list(
boundaries: list[int],
target_pos: int,
current_best: int,
) -> int:
"""Find the best boundary from a sorted list that doesn't exceed target.
Helper function to reduce code duplication in find_best_split_point.
Args:
----
boundaries: Sorted list of boundary positions.
target_pos: Maximum position to consider.
current_best: Current best position found.
Returns:
-------
The best boundary position found, or current_best if none better.
"""
for boundary in boundaries:
if boundary > target_pos:
break
if boundary > current_best:
current_best = boundary
return current_best
def find_best_split_point(
text: str,
target_pos: int,
preserve_sentences: bool = True,
preserve_paragraphs: bool = True,
) -> int:
"""Find the best split point near the target position.
This function finds a natural text boundary (paragraph, sentence, or word)
that is closest to the target position without exceeding it. The preference
order for split points is:
1. Paragraph boundary (if preserve_paragraphs is True)
2. Sentence boundary (if preserve_sentences is True)
3. Word boundary
4. Character position (fallback)
Args:
----
text: The text to find a split point in.
target_pos: The target character position to split near.
The returned position will be <= target_pos.
preserve_sentences: If True, prefer sentence boundaries over words.
preserve_paragraphs: If True, prefer paragraph boundaries over sentences.
Returns:
-------
The best split point position (<= target_pos).
Returns target_pos if no better boundary is found.
Example:
-------
>>> text = "First sentence. Second sentence here."
>>> # Target is in the middle of "Second"
>>> find_best_split_point(text, 22, preserve_sentences=True)
15 # Returns position after first sentence
Note:
----
If no suitable boundary is found before target_pos, the function
returns target_pos as a fallback (character-level split).
"""
# Handle edge cases
if target_pos <= 0:
return 0
if target_pos >= len(text):
return len(text)
best_pos = 0 # Default to start of text
# Try paragraph boundaries first (highest preference)
if preserve_paragraphs:
para_boundaries = find_paragraph_boundaries(text)
best_pos = _find_best_boundary_in_list(para_boundaries, target_pos, best_pos)
# Try sentence boundaries next
if preserve_sentences:
sent_boundaries = find_sentence_boundaries(text)
best_pos = _find_best_boundary_in_list(sent_boundaries, target_pos, best_pos)
# Try word boundaries as fallback
word_boundaries = find_word_boundaries(text)
best_pos = _find_best_boundary_in_list(word_boundaries, target_pos, best_pos)
# If we still have 0, use target_pos (character-level split)
return best_pos if best_pos > 0 else target_pos
# =============================================================================
# Tokenizer Class
# =============================================================================
class Tokenizer:
"""Lazy-loading wrapper around tiktoken for token operations.
This class provides a clean interface for token counting, encoding,
and decoding operations using the tiktoken library. The tiktoken
library is loaded lazily on first use to minimize import time.
The default encoding (cl100k_base) is compatible with:
- GPT-4 and GPT-3.5-turbo models
- Claude models (approximate compatibility)
- Text embedding models like text-embedding-3-small
Attributes:
----------
encoding_name : str
The name of the tiktoken encoding being used.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokenizer.count_tokens("Hello, world!")
4
>>> tokens = tokenizer.encode("Hello, world!")
>>> tokenizer.decode(tokens)
'Hello, world!'
Note:
----
The tokenizer is thread-safe for read operations (count_tokens,
encode, decode). The tiktoken library handles thread safety internally.
"""
def __init__(self, encoding_name: str = _DEFAULT_ENCODING) -> None:
"""Initialize the Tokenizer with a specific encoding.
The tiktoken library is NOT loaded during initialization.
It will be loaded lazily on first method call.
Args:
----
encoding_name: The name of the tiktoken encoding to use.
Defaults to "cl100k_base" (GPT-4/Claude compatible).
Example:
-------
>>> tokenizer = Tokenizer() # Uses default cl100k_base
>>> tokenizer = Tokenizer("p50k_base") # GPT-3 encoding
"""
self._encoding_name = encoding_name
# The encoding instance is not created here (lazy loading)
# It will be retrieved via _get_tiktoken_encoding on first use
@property
def encoding_name(self) -> str:
"""Get the name of the tiktoken encoding being used.
Returns
-------
The encoding name string (e.g., "cl100k_base").
"""
return self._encoding_name
def _get_encoding(self) -> tiktoken.Encoding:
"""Get the tiktoken encoding instance (lazy loading).
Returns:
-------
The tiktoken Encoding instance.
Note:
----
This method triggers lazy loading of tiktoken if not already loaded.
"""
return _get_tiktoken_encoding(self._encoding_name)
def count_tokens(self, text: str) -> int:
"""Count the number of tokens in the given text.
This method encodes the text and returns the number of tokens.
Useful for checking if text fits within token limits.
Args:
----
text: The text to count tokens for.
Returns:
-------
The number of tokens in the text.
Returns 0 for empty or whitespace-only text.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokenizer.count_tokens("Hello, world!")
4
>>> tokenizer.count_tokens("The PMV model predicts thermal sensation.")
8
"""
if not text or not text.strip():
return 0
encoding = self._get_encoding()
return len(encoding.encode(text))
def encode(self, text: str) -> list[int]:
"""Encode text to a list of token IDs.
This method converts text into a list of integer token IDs
using the configured encoding.
Args:
----
text: The text to encode.
Returns:
-------
List of integer token IDs.
Empty list for empty text.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokens = tokenizer.encode("Hello")
>>> len(tokens)
1
>>> tokens = tokenizer.encode("Hello, world!")
>>> len(tokens)
4
"""
if not text:
return []
encoding = self._get_encoding()
# Cast to list[int] to satisfy mypy (tiktoken returns Any)
return list(encoding.encode(text))
def decode(self, tokens: list[int]) -> str:
"""Decode a list of token IDs back to text.
This method converts a list of integer token IDs back into
the original text string.
Args:
----
tokens: List of integer token IDs to decode.
Returns:
-------
The decoded text string.
Empty string for empty token list.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokens = tokenizer.encode("Hello, world!")
>>> tokenizer.decode(tokens)
'Hello, world!'
Note:
----
Decoding always produces valid text, but special tokens
may be represented differently.
"""
if not tokens:
return ""
encoding = self._get_encoding()
# Cast to str to satisfy mypy (tiktoken returns Any)
return str(encoding.decode(tokens))
def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""Truncate text to fit within a maximum token count.
This method encodes the text, truncates to the specified
number of tokens, and decodes back to text. Useful for
ensuring text fits within model context limits.
Args:
----
text: The text to truncate.
max_tokens: Maximum number of tokens to keep.
Must be >= 0.
Returns:
-------
The truncated text, or the original text if already
within the limit.
Empty string if max_tokens is 0.
Example:
-------
>>> tokenizer = Tokenizer()
>>> text = "This is a longer sentence with many tokens."
>>> tokenizer.truncate_to_tokens(text, 5)
'This is a longer sentence'
Note:
----
Truncation may produce text that ends mid-word if the
token boundary falls within a word.
"""
if max_tokens <= 0:
return ""
if not text:
return ""
encoding = self._get_encoding()
tokens = encoding.encode(text)
# Check if truncation is needed
if len(tokens) <= max_tokens:
return text
# Truncate and decode
truncated_tokens = tokens[:max_tokens]
# Cast to str to satisfy mypy (tiktoken returns Any)
return str(encoding.decode(truncated_tokens))
# =============================================================================
# SlidingWindowChunker Class
# =============================================================================
class SlidingWindowChunker:
"""Token-aware chunker using sliding window strategy with overlap.
This class implements a sliding window chunking algorithm that:
- Respects token limits (min_tokens, max_tokens)
- Preserves sentence boundaries when possible
- Preserves paragraph boundaries when possible
- Maintains overlap between consecutive chunks for context
- Uses token-accurate splitting via tiktoken
The chunker is designed for preparing text for embedding models
that have fixed context windows. The overlap ensures that context
is maintained across chunk boundaries.
Attributes:
----------
config : ChunkingConfig
The configuration parameters for chunking.
normalizer : TextNormalizer | None
Optional text normalizer for pre-processing.
tokenizer : Tokenizer
The tokenizer instance for token operations.
Example:
-------
>>> from rag_chatbot.chunking import ChunkingConfig, SlidingWindowChunker
>>> config = ChunkingConfig(
... min_tokens=450,
... max_tokens=700,
... overlap_percent=0.12,
... preserve_sentences=True,
... preserve_paragraphs=True,
... )
>>> chunker = SlidingWindowChunker(config)
>>> chunks = chunker.chunk_text(
... text="Long document content...",
... source="document.pdf",
... page=1,
... )
Note:
----
Short documents with fewer tokens than min_tokens are NOT chunked.
They are returned as a single chunk to avoid unnecessary splitting.
"""
def __init__(
self,
config: ChunkingConfig,
normalizer: TextNormalizer | None = None,
) -> None:
"""Initialize the SlidingWindowChunker with configuration.
Args:
----
config: ChunkingConfig instance with chunking parameters.
Defines min_tokens, max_tokens, overlap_percent, and
boundary preservation settings.
normalizer: Optional TextNormalizer for pre-processing text
before chunking. If provided, text will be normalized
before being split into chunks.
Example:
-------
>>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1)
>>> chunker = SlidingWindowChunker(config)
>>> # With normalizer
>>> from rag_chatbot.chunking import TextNormalizer
>>> normalizer = TextNormalizer()
>>> chunker = SlidingWindowChunker(config, normalizer=normalizer)
"""
self._config = config
self._normalizer = normalizer
self._tokenizer = Tokenizer()
# Pre-calculate overlap tokens for efficiency
self._overlap_tokens = config.calculate_overlap_tokens()
@property
def config(self) -> ChunkingConfig:
"""Get the chunking configuration.
Returns
-------
The ChunkingConfig instance.
"""
return self._config
@property
def normalizer(self) -> TextNormalizer | None:
"""Get the text normalizer (if any).
Returns
-------
The TextNormalizer instance, or None if not configured.
"""
return self._normalizer
@property
def tokenizer(self) -> Tokenizer:
"""Get the tokenizer instance.
Returns
-------
The Tokenizer instance.
"""
return self._tokenizer
def _generate_chunk_id(self, source: str, chunk_index: int) -> str:
"""Generate a unique chunk ID from source and index.
The chunk ID format is "{source_basename}_{chunk_index:03d}"
where source_basename is the filename without extension.
Args:
----
source: The source document path or identifier.
chunk_index: The 0-based index of this chunk.
Returns:
-------
A unique chunk ID string.
Example:
-------
>>> chunker._generate_chunk_id("document.pdf", 0)
'document_000'
>>> chunker._generate_chunk_id("/path/to/file.pdf", 42)
'file_042'
"""
# Extract basename without extension
source_path = Path(source)
basename = source_path.stem
# Format with zero-padded index
return f"{basename}_{chunk_index:03d}"
def _find_char_position_for_tokens(
self,
text: str,
target_tokens: int,
) -> int:
"""Find the character position corresponding to a token count.
This method performs a binary search to find the character position
in the text that corresponds approximately to the target number of
tokens. This is used to convert token-based boundaries to character
positions for text splitting.
Args:
----
text: The text to analyze.
target_tokens: The target number of tokens.
Returns:
-------
The character position that yields approximately target_tokens.
May be slightly less to avoid exceeding the token limit.
Note:
----
This method uses binary search for efficiency, as encoding
the entire text repeatedly would be slow for large documents.
"""
if not text:
return 0
# Quick check: if entire text is within target, return length
total_tokens = self._tokenizer.count_tokens(text)
if total_tokens <= target_tokens:
return len(text)
# Binary search for the character position
low = 0
high = len(text)
best_pos = 0
while low <= high:
mid = (low + high) // 2
# Count tokens up to this position
tokens = self._tokenizer.count_tokens(text[:mid])
if tokens <= target_tokens:
best_pos = mid
low = mid + 1
else:
high = mid - 1
return best_pos
def _extract_chunk_text(
self,
text: str,
start_char: int,
max_tokens: int,
) -> tuple[str, int]:
"""Extract chunk text starting from a position with token limit.
This method extracts text starting from start_char, attempting to
fit max_tokens while respecting natural text boundaries.
Args:
----
text: The full text to extract from.
start_char: The starting character position.
max_tokens: Maximum tokens for this chunk.
Returns:
-------
Tuple of (chunk_text, end_char) where end_char is the
character position where this chunk ends.
"""
# Get remaining text from start position
remaining_text = text[start_char:]
if not remaining_text.strip():
return "", start_char
# Find approximate character position for max_tokens
approx_end = self._find_char_position_for_tokens(remaining_text, max_tokens)
if approx_end >= len(remaining_text):
# Remaining text fits within max_tokens
chunk_text = remaining_text.strip()
return chunk_text, start_char + len(remaining_text)
# Find best split point respecting boundaries
split_pos = find_best_split_point(
remaining_text,
approx_end,
preserve_sentences=self._config.preserve_sentences,
preserve_paragraphs=self._config.preserve_paragraphs,
)
# Extract the chunk text
chunk_text = remaining_text[:split_pos].strip()
end_char = start_char + split_pos
return chunk_text, end_char
def _calculate_overlap_start(
self,
text: str,
current_end: int,
) -> int:
"""Calculate the start position for overlap with the next chunk.
This method determines where the next chunk should start to
include the configured overlap with the current chunk.
Args:
----
text: The full text being chunked.
current_end: The end position of the current chunk.
Returns:
-------
The start position for the next chunk (accounting for overlap).
Returns current_end if no overlap is configured.
"""
if self._overlap_tokens <= 0:
return current_end
# Get the text before current_end
text_before_end = text[:current_end]
# Find how far back we need to go for overlap_tokens
# We work backwards from current_end
overlap_start = current_end
# Count tokens from the end, working backwards
for i in range(current_end, -1, -1):
segment = text_before_end[i:current_end]
tokens = self._tokenizer.count_tokens(segment)
if tokens >= self._overlap_tokens:
overlap_start = i
break
overlap_start = i
# Find a natural boundary for the overlap start
# We want to start at a good boundary, not mid-word
overlap_text = text[overlap_start:current_end]
boundaries = find_word_boundaries(overlap_text)
if boundaries:
# Adjust to nearest word boundary
first_boundary = boundaries[0]
overlap_start = overlap_start + first_boundary + 1 # +1 to skip whitespace
return overlap_start
def chunk_text(
self,
text: str,
source: str,
page: int = 1,
start_offset: int = 0,
) -> list[Chunk]:
"""Split text into token-aware chunks with overlap.
This is the main chunking method. It splits the input text into
chunks that respect the configured token limits and boundary
preferences. Short documents (< min_tokens) are not chunked.
Args:
----
text: The text content to chunk.
source: Source document identifier (e.g., "document.pdf").
Used for generating chunk IDs and source attribution.
page: The page number where this text originates (1-indexed).
Defaults to 1.
start_offset: Character offset for this text within the source.
Used for tracking character positions across a larger document.
Defaults to 0.
Returns:
-------
List of Chunk objects, each containing:
- chunk_id: Unique identifier
- text: The chunk content
- source: Source document identifier
- page: Page number
- start_char: Starting character position
- end_char: Ending character position
- token_count: Number of tokens in chunk
- heading_path: Empty list (to be filled by upstream processor)
- chunk_hash: Auto-generated content hash
Example:
-------
>>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1)
>>> chunker = SlidingWindowChunker(config)
>>> chunks = chunker.chunk_text(
... text="Long document text here...",
... source="document.pdf",
... page=1,
... )
>>> len(chunks)
3
>>> chunks[0].token_count <= 500
True
Note:
----
The heading_path field is left empty. It should be filled by
an upstream processor (like HeadingParser) that has access to
the document structure.
"""
# Import Chunk model here to avoid circular imports
# This is a local import pattern used throughout the codebase
from .models import Chunk
# Handle empty input
if not text or not text.strip():
return []
# Apply text normalization if normalizer is configured
processed_text = text
if self._normalizer is not None:
processed_text = self._normalizer.normalize(text)
# Check total token count
total_tokens = self._tokenizer.count_tokens(processed_text)
# Short document handling: don't chunk if below min_tokens
if total_tokens < self._config.min_tokens:
# Return single chunk for short documents
chunk = Chunk(
chunk_id=self._generate_chunk_id(source, 0),
text=processed_text.strip(),
heading_path=[], # To be filled by upstream processor
source=source,
page=page,
start_char=start_offset,
end_char=start_offset + len(processed_text),
token_count=total_tokens,
)
return [chunk]
# Initialize chunking state
chunks: list[Chunk] = []
chunk_index = 0
current_pos = 0
text_length = len(processed_text)
# Main chunking loop
while current_pos < text_length:
# Check if remaining text is small enough to be the last chunk
remaining_text = processed_text[current_pos:]
remaining_tokens = self._tokenizer.count_tokens(remaining_text)
if remaining_tokens <= self._config.max_tokens:
# Last chunk: include all remaining text
chunk_text = remaining_text.strip()
if chunk_text: # Only create chunk if non-empty
chunk = Chunk(
chunk_id=self._generate_chunk_id(source, chunk_index),
text=chunk_text,
heading_path=[],
source=source,
page=page,
start_char=start_offset + current_pos,
end_char=start_offset + text_length,
token_count=remaining_tokens,
)
chunks.append(chunk)
break
# Extract chunk respecting boundaries
chunk_text, end_pos = self._extract_chunk_text(
processed_text,
current_pos,
self._config.max_tokens,
)
# Handle edge case: no progress made (shouldn't happen but be safe)
if end_pos <= current_pos:
# Force progress by taking at least some text
end_pos = min(current_pos + 100, text_length)
chunk_text = processed_text[current_pos:end_pos].strip()
# Create chunk if we have content
if chunk_text:
token_count = self._tokenizer.count_tokens(chunk_text)
chunk = Chunk(
chunk_id=self._generate_chunk_id(source, chunk_index),
text=chunk_text,
heading_path=[],
source=source,
page=page,
start_char=start_offset + current_pos,
end_char=start_offset + end_pos,
token_count=token_count,
)
chunks.append(chunk)
chunk_index += 1
# Calculate next start position with overlap
next_pos = self._calculate_overlap_start(processed_text, end_pos)
# Ensure we make progress (avoid infinite loop)
if next_pos <= current_pos:
next_pos = end_pos
current_pos = next_pos
return chunks