Spaces:
Sleeping
Sleeping
| """Semantic document chunker for RAG processing.""" | |
| import re | |
| from pathlib import Path | |
| from typing import Optional | |
| from pydantic import BaseModel | |
| from src.config import settings | |
| class DocumentChunk(BaseModel): | |
| """A chunk of document content with metadata.""" | |
| content: str | |
| source_file: str | |
| chunk_index: int | |
| start_char: int | |
| end_char: int | |
| section_title: Optional[str] = None | |
| page_hint: Optional[str] = None | |
| def chunk_id(self) -> str: | |
| """Generate unique chunk identifier.""" | |
| return f"{Path(self.source_file).stem}_{self.chunk_index:04d}" | |
| class SemanticChunker: | |
| """Chunks Markdown documents by semantic boundaries. | |
| Respects document structure (headers, paragraphs, lists) while | |
| maintaining target chunk sizes for optimal embedding performance. | |
| """ | |
| def __init__( | |
| self, | |
| chunk_size: int = None, | |
| chunk_overlap: int = None, | |
| ): | |
| """Initialize the chunker. | |
| Args: | |
| chunk_size: Target chunk size in characters. | |
| chunk_overlap: Overlap between chunks in characters. | |
| """ | |
| self.chunk_size = chunk_size or settings.chunk_size | |
| self.chunk_overlap = chunk_overlap or settings.chunk_overlap | |
| # Patterns for semantic splitting | |
| self._header_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) | |
| self._section_break_pattern = re.compile(r"\n{3,}") | |
| self._list_item_pattern = re.compile(r"^[\s]*[-*+]\s+", re.MULTILINE) | |
| def _extract_frontmatter(self, content: str) -> tuple[dict, str]: | |
| """Extract YAML frontmatter from markdown content.""" | |
| frontmatter = {} | |
| body = content | |
| if content.startswith("---"): | |
| parts = content.split("---", 2) | |
| if len(parts) >= 3: | |
| import yaml | |
| try: | |
| frontmatter = yaml.safe_load(parts[1]) or {} | |
| except Exception: | |
| pass | |
| body = parts[2].strip() | |
| return frontmatter, body | |
| def _find_section_boundaries(self, content: str) -> list[tuple[int, int, str]]: | |
| """Find semantic section boundaries based on headers. | |
| Returns list of (start_pos, end_pos, section_title) tuples. | |
| """ | |
| boundaries = [] | |
| headers = list(self._header_pattern.finditer(content)) | |
| if not headers: | |
| return [(0, len(content), "Document")] | |
| # Add content before first header if exists | |
| if headers[0].start() > 0: | |
| boundaries.append((0, headers[0].start(), "Preamble")) | |
| # Add each section | |
| for i, header in enumerate(headers): | |
| start = header.start() | |
| end = headers[i + 1].start() if i + 1 < len(headers) else len(content) | |
| title = header.group(2).strip() | |
| boundaries.append((start, end, title)) | |
| return boundaries | |
| def _split_section(self, content: str, section_title: str) -> list[str]: | |
| """Split a section into smaller chunks respecting boundaries.""" | |
| if len(content) <= self.chunk_size: | |
| return [content] if content.strip() else [] | |
| chunks = [] | |
| current_chunk = "" | |
| # Split by paragraphs first | |
| paragraphs = re.split(r"\n\n+", content) | |
| for para in paragraphs: | |
| para = para.strip() | |
| if not para: | |
| continue | |
| # If paragraph alone exceeds chunk size, split by sentences | |
| if len(para) > self.chunk_size: | |
| sentences = re.split(r"(?<=[.!?])\s+", para) | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) + 1 <= self.chunk_size: | |
| current_chunk += (" " if current_chunk else "") + sentence | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = sentence | |
| elif len(current_chunk) + len(para) + 2 <= self.chunk_size: | |
| current_chunk += ("\n\n" if current_chunk else "") + para | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = para | |
| if current_chunk.strip(): | |
| chunks.append(current_chunk) | |
| return chunks | |
| def _add_overlap(self, chunks: list[str]) -> list[str]: | |
| """Add overlap between chunks for context preservation.""" | |
| if self.chunk_overlap <= 0 or len(chunks) <= 1: | |
| return chunks | |
| overlapped = [] | |
| for i, chunk in enumerate(chunks): | |
| if i > 0: | |
| # Add end of previous chunk as prefix | |
| prev_chunk = chunks[i - 1] | |
| overlap_text = prev_chunk[-self.chunk_overlap :].strip() | |
| if overlap_text: | |
| chunk = f"...{overlap_text}\n\n{chunk}" | |
| overlapped.append(chunk) | |
| return overlapped | |
| def chunk_document(self, markdown_path: Path) -> list[DocumentChunk]: | |
| """Chunk a Markdown document into semantic pieces. | |
| Args: | |
| markdown_path: Path to the Markdown file. | |
| Returns: | |
| List of DocumentChunks with metadata. | |
| """ | |
| markdown_path = Path(markdown_path) | |
| content = markdown_path.read_text(encoding="utf-8") | |
| frontmatter, body = self._extract_frontmatter(content) | |
| source_file = frontmatter.get("source", markdown_path.name) | |
| sections = self._find_section_boundaries(body) | |
| all_chunks = [] | |
| chunk_index = 0 | |
| for start_pos, end_pos, section_title in sections: | |
| section_content = body[start_pos:end_pos].strip() | |
| if not section_content: | |
| continue | |
| section_chunks = self._split_section(section_content, section_title) | |
| section_chunks = self._add_overlap(section_chunks) | |
| for chunk_content in section_chunks: | |
| if not chunk_content.strip(): | |
| continue | |
| chunk = DocumentChunk( | |
| content=chunk_content, | |
| source_file=str(markdown_path), | |
| chunk_index=chunk_index, | |
| start_char=start_pos, | |
| end_char=end_pos, | |
| section_title=section_title, | |
| ) | |
| all_chunks.append(chunk) | |
| chunk_index += 1 | |
| return all_chunks | |
| def chunk_documents(self, markdown_paths: list[Path]) -> list[DocumentChunk]: | |
| """Chunk multiple Markdown documents. | |
| Args: | |
| markdown_paths: List of paths to Markdown files. | |
| Returns: | |
| List of all DocumentChunks from all documents. | |
| """ | |
| all_chunks = [] | |
| for path in markdown_paths: | |
| chunks = self.chunk_document(path) | |
| all_chunks.extend(chunks) | |
| return all_chunks | |