Spaces:
Sleeping
Sleeping
File size: 34,782 Bytes
3326079 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 | """Token-aware text chunking with sliding window strategy.
This module provides token-aware chunking capabilities for the RAG pipeline.
It implements a sliding window chunker that respects sentence and paragraph
boundaries while maintaining consistent token counts across chunks.
Key Features:
- Lazy loading of tiktoken for fast module import
- Token counting with GPT-4/Claude compatible tokenizer (cl100k_base)
- Sliding window chunking with configurable overlap
- Sentence and paragraph boundary preservation
- Integration with ChunkingConfig for consistent configuration
Components:
- Tokenizer: Lazy-loaded wrapper around tiktoken for token operations
- SlidingWindowChunker: Main chunker class implementing sliding window strategy
Lazy Loading:
The tiktoken library is loaded on first use via the __getattr__ pattern.
This ensures fast import times when the module is not immediately needed.
Design Principles:
- Token-aware splitting ensures chunks fit within embedding model limits
- Overlap between chunks maintains context continuity
- Natural text boundaries (sentences, paragraphs) are preferred split points
- Short documents below min_tokens are not split
Example:
-------
>>> from rag_chatbot.chunking import ChunkingConfig, SlidingWindowChunker
>>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1)
>>> chunker = SlidingWindowChunker(config)
>>> chunks = chunker.chunk_text(
... text="Long document text here...",
... source="document.pdf",
... page=1
... )
>>> for chunk in chunks:
... print(f"Chunk {chunk.chunk_id}: {chunk.token_count} tokens")
Note:
----
This module uses the cl100k_base encoding which is compatible with
GPT-4, GPT-3.5-turbo, and Claude models. The tokenizer provides
accurate token counts for chunking decisions.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import TYPE_CHECKING
# =============================================================================
# Type Checking Imports
# =============================================================================
# These imports are only processed by type checkers (mypy, pyright) and IDEs.
# They enable proper type hints without runtime overhead.
# =============================================================================
if TYPE_CHECKING:
import tiktoken
from .models import Chunk, ChunkingConfig, TextNormalizer
# =============================================================================
# Module Exports
# =============================================================================
__all__: list[str] = [
"Tokenizer",
"SlidingWindowChunker",
]
# =============================================================================
# Module-Level State for Lazy Loading
# =============================================================================
# These module-level variables hold lazily-loaded instances.
# They are initialized to None and populated on first access.
# =============================================================================
# Cached tiktoken encoding instance (loaded on first use)
_tiktoken_encoding: tiktoken.Encoding | None = None
# Default encoding name compatible with GPT-4 and Claude
_DEFAULT_ENCODING: str = "cl100k_base"
# =============================================================================
# Lazy Loading Infrastructure
# =============================================================================
def _get_tiktoken_encoding(encoding_name: str = _DEFAULT_ENCODING) -> tiktoken.Encoding:
"""Get or create the tiktoken encoding instance (lazy loading).
This function implements lazy loading for the tiktoken library.
The encoding is created on first call and cached for subsequent calls.
Args:
----
encoding_name: The name of the tiktoken encoding to use.
Defaults to "cl100k_base" (GPT-4/Claude compatible).
Returns:
-------
The tiktoken Encoding instance.
Raises:
------
ImportError: If tiktoken is not installed.
Note:
----
The encoding is cached at module level to avoid repeated
initialization overhead.
"""
global _tiktoken_encoding # noqa: PLW0603
# Return cached encoding if available and matches requested encoding
if _tiktoken_encoding is not None:
return _tiktoken_encoding
# Import tiktoken only when first needed (lazy loading)
import tiktoken as _tiktoken
# Create and cache the encoding
_tiktoken_encoding = _tiktoken.get_encoding(encoding_name)
return _tiktoken_encoding
# =============================================================================
# Sentence and Paragraph Detection Helpers
# =============================================================================
# These regex patterns and helper functions identify natural text boundaries
# for making intelligent split decisions.
# =============================================================================
# Pattern for finding sentence boundaries within text
# Matches the end of a sentence (punctuation) followed by space(s) before next sentence
_SENTENCE_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"([.!?])(\s+)(?=[A-Z])")
# Pattern for paragraph boundaries: double newlines (with optional whitespace)
_PARAGRAPH_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"\n\s*\n")
# Pattern for word boundaries (whitespace)
_WORD_BOUNDARY_PATTERN: re.Pattern[str] = re.compile(r"\s+")
def find_sentence_boundaries(text: str) -> list[int]:
r"""Find all sentence boundary positions in the text.
A sentence boundary is defined as a position immediately after
sentence-ending punctuation (., !, ?) followed by whitespace
and either a capital letter or end of text.
Args:
----
text: The text to analyze for sentence boundaries.
Returns:
-------
List of character positions where sentences end (after punctuation).
Empty list if no sentence boundaries are found.
Example:
-------
>>> text = "First sentence. Second one! Third?"
>>> boundaries = find_sentence_boundaries(text)
>>> boundaries # Positions after '.', '!', and '?'
[15, 27, 34]
Note:
----
The returned positions are after the punctuation mark itself,
suitable for splitting text at sentence boundaries.
"""
boundaries: list[int] = []
# Find all matches of sentence-ending patterns
for match in _SENTENCE_BOUNDARY_PATTERN.finditer(text):
# The boundary is after the punctuation (end of group 1)
boundaries.append(match.end(1))
# Check for sentence ending at the very end of text
if text and text.rstrip()[-1:] in ".!?":
final_pos = len(text.rstrip())
if final_pos not in boundaries:
boundaries.append(final_pos)
return sorted(boundaries)
def find_paragraph_boundaries(text: str) -> list[int]:
r"""Find all paragraph boundary positions in the text.
A paragraph boundary is defined as a position where double newlines
(with optional whitespace between) occur, indicating a paragraph break.
Args:
----
text: The text to analyze for paragraph boundaries.
Returns:
-------
List of character positions where paragraph breaks occur.
The position is at the start of the double-newline sequence.
Empty list if no paragraph boundaries are found.
Example:
-------
>>> text = "First paragraph.\n\nSecond paragraph."
>>> boundaries = find_paragraph_boundaries(text)
>>> boundaries
[16]
Note:
----
The returned positions are at the start of the whitespace between
paragraphs, suitable for splitting text at paragraph boundaries.
"""
boundaries: list[int] = []
# Find all matches of paragraph break patterns
for match in _PARAGRAPH_BOUNDARY_PATTERN.finditer(text):
boundaries.append(match.start())
return sorted(boundaries)
def find_word_boundaries(text: str) -> list[int]:
"""Find all word boundary positions in the text.
A word boundary is defined as a position where whitespace occurs,
suitable for splitting text without breaking words.
Args:
----
text: The text to analyze for word boundaries.
Returns:
-------
List of character positions where word boundaries occur.
The position is at the start of the whitespace.
Empty list if no word boundaries are found.
Example:
-------
>>> text = "Hello world example"
>>> boundaries = find_word_boundaries(text)
>>> boundaries
[5, 11]
"""
boundaries: list[int] = []
# Find all matches of whitespace
for match in _WORD_BOUNDARY_PATTERN.finditer(text):
boundaries.append(match.start())
return sorted(boundaries)
def _find_best_boundary_in_list(
boundaries: list[int],
target_pos: int,
current_best: int,
) -> int:
"""Find the best boundary from a sorted list that doesn't exceed target.
Helper function to reduce code duplication in find_best_split_point.
Args:
----
boundaries: Sorted list of boundary positions.
target_pos: Maximum position to consider.
current_best: Current best position found.
Returns:
-------
The best boundary position found, or current_best if none better.
"""
for boundary in boundaries:
if boundary > target_pos:
break
if boundary > current_best:
current_best = boundary
return current_best
def find_best_split_point(
text: str,
target_pos: int,
preserve_sentences: bool = True,
preserve_paragraphs: bool = True,
) -> int:
"""Find the best split point near the target position.
This function finds a natural text boundary (paragraph, sentence, or word)
that is closest to the target position without exceeding it. The preference
order for split points is:
1. Paragraph boundary (if preserve_paragraphs is True)
2. Sentence boundary (if preserve_sentences is True)
3. Word boundary
4. Character position (fallback)
Args:
----
text: The text to find a split point in.
target_pos: The target character position to split near.
The returned position will be <= target_pos.
preserve_sentences: If True, prefer sentence boundaries over words.
preserve_paragraphs: If True, prefer paragraph boundaries over sentences.
Returns:
-------
The best split point position (<= target_pos).
Returns target_pos if no better boundary is found.
Example:
-------
>>> text = "First sentence. Second sentence here."
>>> # Target is in the middle of "Second"
>>> find_best_split_point(text, 22, preserve_sentences=True)
15 # Returns position after first sentence
Note:
----
If no suitable boundary is found before target_pos, the function
returns target_pos as a fallback (character-level split).
"""
# Handle edge cases
if target_pos <= 0:
return 0
if target_pos >= len(text):
return len(text)
best_pos = 0 # Default to start of text
# Try paragraph boundaries first (highest preference)
if preserve_paragraphs:
para_boundaries = find_paragraph_boundaries(text)
best_pos = _find_best_boundary_in_list(para_boundaries, target_pos, best_pos)
# Try sentence boundaries next
if preserve_sentences:
sent_boundaries = find_sentence_boundaries(text)
best_pos = _find_best_boundary_in_list(sent_boundaries, target_pos, best_pos)
# Try word boundaries as fallback
word_boundaries = find_word_boundaries(text)
best_pos = _find_best_boundary_in_list(word_boundaries, target_pos, best_pos)
# If we still have 0, use target_pos (character-level split)
return best_pos if best_pos > 0 else target_pos
# =============================================================================
# Tokenizer Class
# =============================================================================
class Tokenizer:
"""Lazy-loading wrapper around tiktoken for token operations.
This class provides a clean interface for token counting, encoding,
and decoding operations using the tiktoken library. The tiktoken
library is loaded lazily on first use to minimize import time.
The default encoding (cl100k_base) is compatible with:
- GPT-4 and GPT-3.5-turbo models
- Claude models (approximate compatibility)
- Text embedding models like text-embedding-3-small
Attributes:
----------
encoding_name : str
The name of the tiktoken encoding being used.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokenizer.count_tokens("Hello, world!")
4
>>> tokens = tokenizer.encode("Hello, world!")
>>> tokenizer.decode(tokens)
'Hello, world!'
Note:
----
The tokenizer is thread-safe for read operations (count_tokens,
encode, decode). The tiktoken library handles thread safety internally.
"""
def __init__(self, encoding_name: str = _DEFAULT_ENCODING) -> None:
"""Initialize the Tokenizer with a specific encoding.
The tiktoken library is NOT loaded during initialization.
It will be loaded lazily on first method call.
Args:
----
encoding_name: The name of the tiktoken encoding to use.
Defaults to "cl100k_base" (GPT-4/Claude compatible).
Example:
-------
>>> tokenizer = Tokenizer() # Uses default cl100k_base
>>> tokenizer = Tokenizer("p50k_base") # GPT-3 encoding
"""
self._encoding_name = encoding_name
# The encoding instance is not created here (lazy loading)
# It will be retrieved via _get_tiktoken_encoding on first use
@property
def encoding_name(self) -> str:
"""Get the name of the tiktoken encoding being used.
Returns
-------
The encoding name string (e.g., "cl100k_base").
"""
return self._encoding_name
def _get_encoding(self) -> tiktoken.Encoding:
"""Get the tiktoken encoding instance (lazy loading).
Returns:
-------
The tiktoken Encoding instance.
Note:
----
This method triggers lazy loading of tiktoken if not already loaded.
"""
return _get_tiktoken_encoding(self._encoding_name)
def count_tokens(self, text: str) -> int:
"""Count the number of tokens in the given text.
This method encodes the text and returns the number of tokens.
Useful for checking if text fits within token limits.
Args:
----
text: The text to count tokens for.
Returns:
-------
The number of tokens in the text.
Returns 0 for empty or whitespace-only text.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokenizer.count_tokens("Hello, world!")
4
>>> tokenizer.count_tokens("The PMV model predicts thermal sensation.")
8
"""
if not text or not text.strip():
return 0
encoding = self._get_encoding()
return len(encoding.encode(text))
def encode(self, text: str) -> list[int]:
"""Encode text to a list of token IDs.
This method converts text into a list of integer token IDs
using the configured encoding.
Args:
----
text: The text to encode.
Returns:
-------
List of integer token IDs.
Empty list for empty text.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokens = tokenizer.encode("Hello")
>>> len(tokens)
1
>>> tokens = tokenizer.encode("Hello, world!")
>>> len(tokens)
4
"""
if not text:
return []
encoding = self._get_encoding()
# Cast to list[int] to satisfy mypy (tiktoken returns Any)
return list(encoding.encode(text))
def decode(self, tokens: list[int]) -> str:
"""Decode a list of token IDs back to text.
This method converts a list of integer token IDs back into
the original text string.
Args:
----
tokens: List of integer token IDs to decode.
Returns:
-------
The decoded text string.
Empty string for empty token list.
Example:
-------
>>> tokenizer = Tokenizer()
>>> tokens = tokenizer.encode("Hello, world!")
>>> tokenizer.decode(tokens)
'Hello, world!'
Note:
----
Decoding always produces valid text, but special tokens
may be represented differently.
"""
if not tokens:
return ""
encoding = self._get_encoding()
# Cast to str to satisfy mypy (tiktoken returns Any)
return str(encoding.decode(tokens))
def truncate_to_tokens(self, text: str, max_tokens: int) -> str:
"""Truncate text to fit within a maximum token count.
This method encodes the text, truncates to the specified
number of tokens, and decodes back to text. Useful for
ensuring text fits within model context limits.
Args:
----
text: The text to truncate.
max_tokens: Maximum number of tokens to keep.
Must be >= 0.
Returns:
-------
The truncated text, or the original text if already
within the limit.
Empty string if max_tokens is 0.
Example:
-------
>>> tokenizer = Tokenizer()
>>> text = "This is a longer sentence with many tokens."
>>> tokenizer.truncate_to_tokens(text, 5)
'This is a longer sentence'
Note:
----
Truncation may produce text that ends mid-word if the
token boundary falls within a word.
"""
if max_tokens <= 0:
return ""
if not text:
return ""
encoding = self._get_encoding()
tokens = encoding.encode(text)
# Check if truncation is needed
if len(tokens) <= max_tokens:
return text
# Truncate and decode
truncated_tokens = tokens[:max_tokens]
# Cast to str to satisfy mypy (tiktoken returns Any)
return str(encoding.decode(truncated_tokens))
# =============================================================================
# SlidingWindowChunker Class
# =============================================================================
class SlidingWindowChunker:
"""Token-aware chunker using sliding window strategy with overlap.
This class implements a sliding window chunking algorithm that:
- Respects token limits (min_tokens, max_tokens)
- Preserves sentence boundaries when possible
- Preserves paragraph boundaries when possible
- Maintains overlap between consecutive chunks for context
- Uses token-accurate splitting via tiktoken
The chunker is designed for preparing text for embedding models
that have fixed context windows. The overlap ensures that context
is maintained across chunk boundaries.
Attributes:
----------
config : ChunkingConfig
The configuration parameters for chunking.
normalizer : TextNormalizer | None
Optional text normalizer for pre-processing.
tokenizer : Tokenizer
The tokenizer instance for token operations.
Example:
-------
>>> from rag_chatbot.chunking import ChunkingConfig, SlidingWindowChunker
>>> config = ChunkingConfig(
... min_tokens=450,
... max_tokens=700,
... overlap_percent=0.12,
... preserve_sentences=True,
... preserve_paragraphs=True,
... )
>>> chunker = SlidingWindowChunker(config)
>>> chunks = chunker.chunk_text(
... text="Long document content...",
... source="document.pdf",
... page=1,
... )
Note:
----
Short documents with fewer tokens than min_tokens are NOT chunked.
They are returned as a single chunk to avoid unnecessary splitting.
"""
def __init__(
self,
config: ChunkingConfig,
normalizer: TextNormalizer | None = None,
) -> None:
"""Initialize the SlidingWindowChunker with configuration.
Args:
----
config: ChunkingConfig instance with chunking parameters.
Defines min_tokens, max_tokens, overlap_percent, and
boundary preservation settings.
normalizer: Optional TextNormalizer for pre-processing text
before chunking. If provided, text will be normalized
before being split into chunks.
Example:
-------
>>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1)
>>> chunker = SlidingWindowChunker(config)
>>> # With normalizer
>>> from rag_chatbot.chunking import TextNormalizer
>>> normalizer = TextNormalizer()
>>> chunker = SlidingWindowChunker(config, normalizer=normalizer)
"""
self._config = config
self._normalizer = normalizer
self._tokenizer = Tokenizer()
# Pre-calculate overlap tokens for efficiency
self._overlap_tokens = config.calculate_overlap_tokens()
@property
def config(self) -> ChunkingConfig:
"""Get the chunking configuration.
Returns
-------
The ChunkingConfig instance.
"""
return self._config
@property
def normalizer(self) -> TextNormalizer | None:
"""Get the text normalizer (if any).
Returns
-------
The TextNormalizer instance, or None if not configured.
"""
return self._normalizer
@property
def tokenizer(self) -> Tokenizer:
"""Get the tokenizer instance.
Returns
-------
The Tokenizer instance.
"""
return self._tokenizer
def _generate_chunk_id(self, source: str, chunk_index: int) -> str:
"""Generate a unique chunk ID from source and index.
The chunk ID format is "{source_basename}_{chunk_index:03d}"
where source_basename is the filename without extension.
Args:
----
source: The source document path or identifier.
chunk_index: The 0-based index of this chunk.
Returns:
-------
A unique chunk ID string.
Example:
-------
>>> chunker._generate_chunk_id("document.pdf", 0)
'document_000'
>>> chunker._generate_chunk_id("/path/to/file.pdf", 42)
'file_042'
"""
# Extract basename without extension
source_path = Path(source)
basename = source_path.stem
# Format with zero-padded index
return f"{basename}_{chunk_index:03d}"
def _find_char_position_for_tokens(
self,
text: str,
target_tokens: int,
) -> int:
"""Find the character position corresponding to a token count.
This method performs a binary search to find the character position
in the text that corresponds approximately to the target number of
tokens. This is used to convert token-based boundaries to character
positions for text splitting.
Args:
----
text: The text to analyze.
target_tokens: The target number of tokens.
Returns:
-------
The character position that yields approximately target_tokens.
May be slightly less to avoid exceeding the token limit.
Note:
----
This method uses binary search for efficiency, as encoding
the entire text repeatedly would be slow for large documents.
"""
if not text:
return 0
# Quick check: if entire text is within target, return length
total_tokens = self._tokenizer.count_tokens(text)
if total_tokens <= target_tokens:
return len(text)
# Binary search for the character position
low = 0
high = len(text)
best_pos = 0
while low <= high:
mid = (low + high) // 2
# Count tokens up to this position
tokens = self._tokenizer.count_tokens(text[:mid])
if tokens <= target_tokens:
best_pos = mid
low = mid + 1
else:
high = mid - 1
return best_pos
def _extract_chunk_text(
self,
text: str,
start_char: int,
max_tokens: int,
) -> tuple[str, int]:
"""Extract chunk text starting from a position with token limit.
This method extracts text starting from start_char, attempting to
fit max_tokens while respecting natural text boundaries.
Args:
----
text: The full text to extract from.
start_char: The starting character position.
max_tokens: Maximum tokens for this chunk.
Returns:
-------
Tuple of (chunk_text, end_char) where end_char is the
character position where this chunk ends.
"""
# Get remaining text from start position
remaining_text = text[start_char:]
if not remaining_text.strip():
return "", start_char
# Find approximate character position for max_tokens
approx_end = self._find_char_position_for_tokens(remaining_text, max_tokens)
if approx_end >= len(remaining_text):
# Remaining text fits within max_tokens
chunk_text = remaining_text.strip()
return chunk_text, start_char + len(remaining_text)
# Find best split point respecting boundaries
split_pos = find_best_split_point(
remaining_text,
approx_end,
preserve_sentences=self._config.preserve_sentences,
preserve_paragraphs=self._config.preserve_paragraphs,
)
# Extract the chunk text
chunk_text = remaining_text[:split_pos].strip()
end_char = start_char + split_pos
return chunk_text, end_char
def _calculate_overlap_start(
self,
text: str,
current_end: int,
) -> int:
"""Calculate the start position for overlap with the next chunk.
This method determines where the next chunk should start to
include the configured overlap with the current chunk.
Args:
----
text: The full text being chunked.
current_end: The end position of the current chunk.
Returns:
-------
The start position for the next chunk (accounting for overlap).
Returns current_end if no overlap is configured.
"""
if self._overlap_tokens <= 0:
return current_end
# Get the text before current_end
text_before_end = text[:current_end]
# Find how far back we need to go for overlap_tokens
# We work backwards from current_end
overlap_start = current_end
# Count tokens from the end, working backwards
for i in range(current_end, -1, -1):
segment = text_before_end[i:current_end]
tokens = self._tokenizer.count_tokens(segment)
if tokens >= self._overlap_tokens:
overlap_start = i
break
overlap_start = i
# Find a natural boundary for the overlap start
# We want to start at a good boundary, not mid-word
overlap_text = text[overlap_start:current_end]
boundaries = find_word_boundaries(overlap_text)
if boundaries:
# Adjust to nearest word boundary
first_boundary = boundaries[0]
overlap_start = overlap_start + first_boundary + 1 # +1 to skip whitespace
return overlap_start
def chunk_text(
self,
text: str,
source: str,
page: int = 1,
start_offset: int = 0,
) -> list[Chunk]:
"""Split text into token-aware chunks with overlap.
This is the main chunking method. It splits the input text into
chunks that respect the configured token limits and boundary
preferences. Short documents (< min_tokens) are not chunked.
Args:
----
text: The text content to chunk.
source: Source document identifier (e.g., "document.pdf").
Used for generating chunk IDs and source attribution.
page: The page number where this text originates (1-indexed).
Defaults to 1.
start_offset: Character offset for this text within the source.
Used for tracking character positions across a larger document.
Defaults to 0.
Returns:
-------
List of Chunk objects, each containing:
- chunk_id: Unique identifier
- text: The chunk content
- source: Source document identifier
- page: Page number
- start_char: Starting character position
- end_char: Ending character position
- token_count: Number of tokens in chunk
- heading_path: Empty list (to be filled by upstream processor)
- chunk_hash: Auto-generated content hash
Example:
-------
>>> config = ChunkingConfig(max_tokens=500, overlap_percent=0.1)
>>> chunker = SlidingWindowChunker(config)
>>> chunks = chunker.chunk_text(
... text="Long document text here...",
... source="document.pdf",
... page=1,
... )
>>> len(chunks)
3
>>> chunks[0].token_count <= 500
True
Note:
----
The heading_path field is left empty. It should be filled by
an upstream processor (like HeadingParser) that has access to
the document structure.
"""
# Import Chunk model here to avoid circular imports
# This is a local import pattern used throughout the codebase
from .models import Chunk
# Handle empty input
if not text or not text.strip():
return []
# Apply text normalization if normalizer is configured
processed_text = text
if self._normalizer is not None:
processed_text = self._normalizer.normalize(text)
# Check total token count
total_tokens = self._tokenizer.count_tokens(processed_text)
# Short document handling: don't chunk if below min_tokens
if total_tokens < self._config.min_tokens:
# Return single chunk for short documents
chunk = Chunk(
chunk_id=self._generate_chunk_id(source, 0),
text=processed_text.strip(),
heading_path=[], # To be filled by upstream processor
source=source,
page=page,
start_char=start_offset,
end_char=start_offset + len(processed_text),
token_count=total_tokens,
)
return [chunk]
# Initialize chunking state
chunks: list[Chunk] = []
chunk_index = 0
current_pos = 0
text_length = len(processed_text)
# Main chunking loop
while current_pos < text_length:
# Check if remaining text is small enough to be the last chunk
remaining_text = processed_text[current_pos:]
remaining_tokens = self._tokenizer.count_tokens(remaining_text)
if remaining_tokens <= self._config.max_tokens:
# Last chunk: include all remaining text
chunk_text = remaining_text.strip()
if chunk_text: # Only create chunk if non-empty
chunk = Chunk(
chunk_id=self._generate_chunk_id(source, chunk_index),
text=chunk_text,
heading_path=[],
source=source,
page=page,
start_char=start_offset + current_pos,
end_char=start_offset + text_length,
token_count=remaining_tokens,
)
chunks.append(chunk)
break
# Extract chunk respecting boundaries
chunk_text, end_pos = self._extract_chunk_text(
processed_text,
current_pos,
self._config.max_tokens,
)
# Handle edge case: no progress made (shouldn't happen but be safe)
if end_pos <= current_pos:
# Force progress by taking at least some text
end_pos = min(current_pos + 100, text_length)
chunk_text = processed_text[current_pos:end_pos].strip()
# Create chunk if we have content
if chunk_text:
token_count = self._tokenizer.count_tokens(chunk_text)
chunk = Chunk(
chunk_id=self._generate_chunk_id(source, chunk_index),
text=chunk_text,
heading_path=[],
source=source,
page=page,
start_char=start_offset + current_pos,
end_char=start_offset + end_pos,
token_count=token_count,
)
chunks.append(chunk)
chunk_index += 1
# Calculate next start position with overlap
next_pos = self._calculate_overlap_start(processed_text, end_pos)
# Ensure we make progress (avoid infinite loop)
if next_pos <= current_pos:
next_pos = end_pos
current_pos = next_pos
return chunks
|