File size: 4,339 Bytes
e885bfa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
"""Semantic chunking strategies for documents"""
import re
from typing import List, Optional
from src.rag.document_processing.models import DocumentChunk
class SemanticChunker:
"""
Chunks documents into semantically coherent units.
Supports both fixed-size and semantic-aware chunking.
"""
def __init__(
self,
chunk_size: int = 400,
chunk_overlap: int = 100,
min_chunk_size: int = 50,
):
"""
Initialize the chunker.
Args:
chunk_size: Target tokens per chunk (approximate)
chunk_overlap: Tokens to overlap between chunks
min_chunk_size: Minimum chunk size to avoid tiny fragments
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
def _count_tokens_approx(self, text: str) -> int:
"""Approximate token count (simple word-based estimate)"""
return len(text.split())
def _split_on_delimiters(self, text: str) -> List[str]:
"""Split text on semantic boundaries (sentences, paragraphs)"""
# Split on double newlines (paragraphs)
paragraphs = text.split('\n\n')
segments = []
for para in paragraphs:
if not para.strip():
continue
# Further split on sentences
sentences = re.split(r'(?<=[.!?])\s+', para.strip())
segments.extend(sentences)
return [s.strip() for s in segments if s.strip()]
def chunk(
self,
text: str,
doc_id: str,
source_doc: str,
metadata: Optional[dict] = None,
) -> List[DocumentChunk]:
"""
Chunk a document into semantic units.
Args:
text: Document content to chunk
doc_id: Document ID
source_doc: Source filename
metadata: Optional document metadata
Returns:
List of DocumentChunk objects
"""
if metadata is None:
metadata = {}
# Split into segments
segments = self._split_on_delimiters(text)
chunks = []
current_chunk = []
current_char_pos = 0
chunk_index = 0
for segment in segments:
current_chunk.append(segment)
current_tokens = self._count_tokens_approx(' '.join(current_chunk))
# Create chunk if we exceed size or this is the last segment
if current_tokens >= self.chunk_size or segment == segments[-1]:
chunk_text = ' '.join(current_chunk)
if self._count_tokens_approx(chunk_text) >= self.min_chunk_size:
chunk_id = f"{doc_id}_chunk_{chunk_index}"
start_char = text.find(chunk_text)
end_char = start_char + len(chunk_text)
chunk = DocumentChunk(
chunk_id=chunk_id,
content=chunk_text,
source_doc=source_doc,
chunk_index=chunk_index,
start_char=start_char if start_char >= 0 else current_char_pos,
end_char=end_char if end_char >= 0 else current_char_pos + len(chunk_text),
token_count=self._count_tokens_approx(chunk_text),
metadata=metadata.copy(),
)
chunks.append(chunk)
chunk_index += 1
current_char_pos += len(chunk_text) + 1
# Reset for next chunk, keeping overlap
if current_tokens >= self.chunk_size:
overlap_segments = []
remaining_tokens = 0
for seg in reversed(current_chunk):
overlap_segments.insert(0, seg)
remaining_tokens += self._count_tokens_approx(seg)
if remaining_tokens >= self.chunk_overlap:
break
current_chunk = overlap_segments
else:
current_chunk = []
return chunks
|