Spaces:
Sleeping
Sleeping
File size: 6,956 Bytes
8a1c0d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | """Semantic document chunker for RAG processing."""
import re
from pathlib import Path
from typing import Optional
from pydantic import BaseModel
from src.config import settings
class DocumentChunk(BaseModel):
"""A chunk of document content with metadata."""
content: str
source_file: str
chunk_index: int
start_char: int
end_char: int
section_title: Optional[str] = None
page_hint: Optional[str] = None
@property
def chunk_id(self) -> str:
"""Generate unique chunk identifier."""
return f"{Path(self.source_file).stem}_{self.chunk_index:04d}"
class SemanticChunker:
"""Chunks Markdown documents by semantic boundaries.
Respects document structure (headers, paragraphs, lists) while
maintaining target chunk sizes for optimal embedding performance.
"""
def __init__(
self,
chunk_size: int = None,
chunk_overlap: int = None,
):
"""Initialize the chunker.
Args:
chunk_size: Target chunk size in characters.
chunk_overlap: Overlap between chunks in characters.
"""
self.chunk_size = chunk_size or settings.chunk_size
self.chunk_overlap = chunk_overlap or settings.chunk_overlap
# Patterns for semantic splitting
self._header_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
self._section_break_pattern = re.compile(r"\n{3,}")
self._list_item_pattern = re.compile(r"^[\s]*[-*+]\s+", re.MULTILINE)
def _extract_frontmatter(self, content: str) -> tuple[dict, str]:
"""Extract YAML frontmatter from markdown content."""
frontmatter = {}
body = content
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
import yaml
try:
frontmatter = yaml.safe_load(parts[1]) or {}
except Exception:
pass
body = parts[2].strip()
return frontmatter, body
def _find_section_boundaries(self, content: str) -> list[tuple[int, int, str]]:
"""Find semantic section boundaries based on headers.
Returns list of (start_pos, end_pos, section_title) tuples.
"""
boundaries = []
headers = list(self._header_pattern.finditer(content))
if not headers:
return [(0, len(content), "Document")]
# Add content before first header if exists
if headers[0].start() > 0:
boundaries.append((0, headers[0].start(), "Preamble"))
# Add each section
for i, header in enumerate(headers):
start = header.start()
end = headers[i + 1].start() if i + 1 < len(headers) else len(content)
title = header.group(2).strip()
boundaries.append((start, end, title))
return boundaries
def _split_section(self, content: str, section_title: str) -> list[str]:
"""Split a section into smaller chunks respecting boundaries."""
if len(content) <= self.chunk_size:
return [content] if content.strip() else []
chunks = []
current_chunk = ""
# Split by paragraphs first
paragraphs = re.split(r"\n\n+", content)
for para in paragraphs:
para = para.strip()
if not para:
continue
# If paragraph alone exceeds chunk size, split by sentences
if len(para) > self.chunk_size:
sentences = re.split(r"(?<=[.!?])\s+", para)
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 <= self.chunk_size:
current_chunk += (" " if current_chunk else "") + sentence
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
elif len(current_chunk) + len(para) + 2 <= self.chunk_size:
current_chunk += ("\n\n" if current_chunk else "") + para
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = para
if current_chunk.strip():
chunks.append(current_chunk)
return chunks
def _add_overlap(self, chunks: list[str]) -> list[str]:
"""Add overlap between chunks for context preservation."""
if self.chunk_overlap <= 0 or len(chunks) <= 1:
return chunks
overlapped = []
for i, chunk in enumerate(chunks):
if i > 0:
# Add end of previous chunk as prefix
prev_chunk = chunks[i - 1]
overlap_text = prev_chunk[-self.chunk_overlap :].strip()
if overlap_text:
chunk = f"...{overlap_text}\n\n{chunk}"
overlapped.append(chunk)
return overlapped
def chunk_document(self, markdown_path: Path) -> list[DocumentChunk]:
"""Chunk a Markdown document into semantic pieces.
Args:
markdown_path: Path to the Markdown file.
Returns:
List of DocumentChunks with metadata.
"""
markdown_path = Path(markdown_path)
content = markdown_path.read_text(encoding="utf-8")
frontmatter, body = self._extract_frontmatter(content)
source_file = frontmatter.get("source", markdown_path.name)
sections = self._find_section_boundaries(body)
all_chunks = []
chunk_index = 0
for start_pos, end_pos, section_title in sections:
section_content = body[start_pos:end_pos].strip()
if not section_content:
continue
section_chunks = self._split_section(section_content, section_title)
section_chunks = self._add_overlap(section_chunks)
for chunk_content in section_chunks:
if not chunk_content.strip():
continue
chunk = DocumentChunk(
content=chunk_content,
source_file=str(markdown_path),
chunk_index=chunk_index,
start_char=start_pos,
end_char=end_pos,
section_title=section_title,
)
all_chunks.append(chunk)
chunk_index += 1
return all_chunks
def chunk_documents(self, markdown_paths: list[Path]) -> list[DocumentChunk]:
"""Chunk multiple Markdown documents.
Args:
markdown_paths: List of paths to Markdown files.
Returns:
List of all DocumentChunks from all documents.
"""
all_chunks = []
for path in markdown_paths:
chunks = self.chunk_document(path)
all_chunks.extend(chunks)
return all_chunks
|