|
|
"""Semantic chunking strategies for documents""" |
|
|
|
|
|
import re |
|
|
from typing import List, Optional |
|
|
from src.rag.document_processing.models import DocumentChunk |
|
|
|
|
|
|
|
|
class SemanticChunker: |
|
|
""" |
|
|
Chunks documents into semantically coherent units. |
|
|
Supports both fixed-size and semantic-aware chunking. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
chunk_size: int = 400, |
|
|
chunk_overlap: int = 100, |
|
|
min_chunk_size: int = 50, |
|
|
): |
|
|
""" |
|
|
Initialize the chunker. |
|
|
|
|
|
Args: |
|
|
chunk_size: Target tokens per chunk (approximate) |
|
|
chunk_overlap: Tokens to overlap between chunks |
|
|
min_chunk_size: Minimum chunk size to avoid tiny fragments |
|
|
""" |
|
|
self.chunk_size = chunk_size |
|
|
self.chunk_overlap = chunk_overlap |
|
|
self.min_chunk_size = min_chunk_size |
|
|
|
|
|
def _count_tokens_approx(self, text: str) -> int: |
|
|
"""Approximate token count (simple word-based estimate)""" |
|
|
return len(text.split()) |
|
|
|
|
|
def _split_on_delimiters(self, text: str) -> List[str]: |
|
|
"""Split text on semantic boundaries (sentences, paragraphs)""" |
|
|
|
|
|
paragraphs = text.split('\n\n') |
|
|
segments = [] |
|
|
|
|
|
for para in paragraphs: |
|
|
if not para.strip(): |
|
|
continue |
|
|
|
|
|
sentences = re.split(r'(?<=[.!?])\s+', para.strip()) |
|
|
segments.extend(sentences) |
|
|
|
|
|
return [s.strip() for s in segments if s.strip()] |
|
|
|
|
|
def chunk( |
|
|
self, |
|
|
text: str, |
|
|
doc_id: str, |
|
|
source_doc: str, |
|
|
metadata: Optional[dict] = None, |
|
|
) -> List[DocumentChunk]: |
|
|
""" |
|
|
Chunk a document into semantic units. |
|
|
|
|
|
Args: |
|
|
text: Document content to chunk |
|
|
doc_id: Document ID |
|
|
source_doc: Source filename |
|
|
metadata: Optional document metadata |
|
|
|
|
|
Returns: |
|
|
List of DocumentChunk objects |
|
|
""" |
|
|
if metadata is None: |
|
|
metadata = {} |
|
|
|
|
|
|
|
|
segments = self._split_on_delimiters(text) |
|
|
|
|
|
chunks = [] |
|
|
current_chunk = [] |
|
|
current_char_pos = 0 |
|
|
chunk_index = 0 |
|
|
|
|
|
for segment in segments: |
|
|
current_chunk.append(segment) |
|
|
current_tokens = self._count_tokens_approx(' '.join(current_chunk)) |
|
|
|
|
|
|
|
|
if current_tokens >= self.chunk_size or segment == segments[-1]: |
|
|
chunk_text = ' '.join(current_chunk) |
|
|
|
|
|
if self._count_tokens_approx(chunk_text) >= self.min_chunk_size: |
|
|
chunk_id = f"{doc_id}_chunk_{chunk_index}" |
|
|
start_char = text.find(chunk_text) |
|
|
end_char = start_char + len(chunk_text) |
|
|
|
|
|
chunk = DocumentChunk( |
|
|
chunk_id=chunk_id, |
|
|
content=chunk_text, |
|
|
source_doc=source_doc, |
|
|
chunk_index=chunk_index, |
|
|
start_char=start_char if start_char >= 0 else current_char_pos, |
|
|
end_char=end_char if end_char >= 0 else current_char_pos + len(chunk_text), |
|
|
token_count=self._count_tokens_approx(chunk_text), |
|
|
metadata=metadata.copy(), |
|
|
) |
|
|
chunks.append(chunk) |
|
|
chunk_index += 1 |
|
|
current_char_pos += len(chunk_text) + 1 |
|
|
|
|
|
|
|
|
if current_tokens >= self.chunk_size: |
|
|
overlap_segments = [] |
|
|
remaining_tokens = 0 |
|
|
for seg in reversed(current_chunk): |
|
|
overlap_segments.insert(0, seg) |
|
|
remaining_tokens += self._count_tokens_approx(seg) |
|
|
if remaining_tokens >= self.chunk_overlap: |
|
|
break |
|
|
current_chunk = overlap_segments |
|
|
else: |
|
|
current_chunk = [] |
|
|
|
|
|
return chunks |
|
|
|