File size: 3,099 Bytes
c9622da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
"""Text splitter for chunking documents."""
from dataclasses import dataclass
from typing import List, Optional
from src.config import ChunkingConfig
from src.document_loader.loader import Document
@dataclass
class TextChunk:
"""Represents a chunk of text."""
content: str
metadata: dict
chunk_index: int
class TextSplitter:
"""Split text into overlapping chunks."""
def __init__(self, config: Optional[ChunkingConfig] = None):
"""Initialize the text splitter.
Args:
config: Chunking configuration. Uses defaults if not provided.
"""
self.config = config or ChunkingConfig()
def split_text(self, text: str, metadata: Optional[dict] = None) -> List[TextChunk]:
"""Split text into chunks.
Args:
text: Text to split.
metadata: Optional metadata to attach to chunks.
Returns:
List of text chunks.
"""
if not text.strip():
return []
metadata = metadata or {}
chunks = []
# Split by sentences/paragraphs first
text = text.replace("\r\n", "\n")
start = 0
chunk_index = 0
while start < len(text):
# Calculate end position
end = start + self.config.chunk_size
# If not at the end, try to break at a sentence boundary
if end < len(text):
# Look for sentence boundaries
for sep in ["\n\n", "\n", ". ", "! ", "? "]:
last_sep = text.rfind(sep, start, end)
if last_sep > start:
end = last_sep + len(sep)
break
else:
end = len(text)
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(TextChunk(
content=chunk_text,
metadata={
**metadata,
"chunk_index": chunk_index,
"start_char": start,
"end_char": end
},
chunk_index=chunk_index
))
chunk_index += 1
# Move start with overlap
start = end - self.config.chunk_overlap
if start <= chunks[-1].metadata.get("start_char", 0) if chunks else 0:
start = end # Avoid infinite loop
return chunks
def split_documents(self, documents: List[Document]) -> List[TextChunk]:
"""Split multiple documents into chunks.
Args:
documents: List of documents to split.
Returns:
List of text chunks from all documents.
"""
all_chunks = []
for doc in documents:
chunks = self.split_text(doc.content, doc.metadata)
all_chunks.extend(chunks)
return all_chunks
|