"""Semantic document chunker for RAG processing.""" import re from pathlib import Path from typing import Optional from pydantic import BaseModel from src.config import settings class DocumentChunk(BaseModel): """A chunk of document content with metadata.""" content: str source_file: str chunk_index: int start_char: int end_char: int section_title: Optional[str] = None page_hint: Optional[str] = None @property def chunk_id(self) -> str: """Generate unique chunk identifier.""" return f"{Path(self.source_file).stem}_{self.chunk_index:04d}" class SemanticChunker: """Chunks Markdown documents by semantic boundaries. Respects document structure (headers, paragraphs, lists) while maintaining target chunk sizes for optimal embedding performance. """ def __init__( self, chunk_size: int = None, chunk_overlap: int = None, ): """Initialize the chunker. Args: chunk_size: Target chunk size in characters. chunk_overlap: Overlap between chunks in characters. """ self.chunk_size = chunk_size or settings.chunk_size self.chunk_overlap = chunk_overlap or settings.chunk_overlap # Patterns for semantic splitting self._header_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) self._section_break_pattern = re.compile(r"\n{3,}") self._list_item_pattern = re.compile(r"^[\s]*[-*+]\s+", re.MULTILINE) def _extract_frontmatter(self, content: str) -> tuple[dict, str]: """Extract YAML frontmatter from markdown content.""" frontmatter = {} body = content if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: import yaml try: frontmatter = yaml.safe_load(parts[1]) or {} except Exception: pass body = parts[2].strip() return frontmatter, body def _find_section_boundaries(self, content: str) -> list[tuple[int, int, str]]: """Find semantic section boundaries based on headers. Returns list of (start_pos, end_pos, section_title) tuples. """ boundaries = [] headers = list(self._header_pattern.finditer(content)) if not headers: return [(0, len(content), "Document")] # Add content before first header if exists if headers[0].start() > 0: boundaries.append((0, headers[0].start(), "Preamble")) # Add each section for i, header in enumerate(headers): start = header.start() end = headers[i + 1].start() if i + 1 < len(headers) else len(content) title = header.group(2).strip() boundaries.append((start, end, title)) return boundaries def _split_section(self, content: str, section_title: str) -> list[str]: """Split a section into smaller chunks respecting boundaries.""" if len(content) <= self.chunk_size: return [content] if content.strip() else [] chunks = [] current_chunk = "" # Split by paragraphs first paragraphs = re.split(r"\n\n+", content) for para in paragraphs: para = para.strip() if not para: continue # If paragraph alone exceeds chunk size, split by sentences if len(para) > self.chunk_size: sentences = re.split(r"(?<=[.!?])\s+", para) for sentence in sentences: if len(current_chunk) + len(sentence) + 1 <= self.chunk_size: current_chunk += (" " if current_chunk else "") + sentence else: if current_chunk: chunks.append(current_chunk) current_chunk = sentence elif len(current_chunk) + len(para) + 2 <= self.chunk_size: current_chunk += ("\n\n" if current_chunk else "") + para else: if current_chunk: chunks.append(current_chunk) current_chunk = para if current_chunk.strip(): chunks.append(current_chunk) return chunks def _add_overlap(self, chunks: list[str]) -> list[str]: """Add overlap between chunks for context preservation.""" if self.chunk_overlap <= 0 or len(chunks) <= 1: return chunks overlapped = [] for i, chunk in enumerate(chunks): if i > 0: # Add end of previous chunk as prefix prev_chunk = chunks[i - 1] overlap_text = prev_chunk[-self.chunk_overlap :].strip() if overlap_text: chunk = f"...{overlap_text}\n\n{chunk}" overlapped.append(chunk) return overlapped def chunk_document(self, markdown_path: Path) -> list[DocumentChunk]: """Chunk a Markdown document into semantic pieces. Args: markdown_path: Path to the Markdown file. Returns: List of DocumentChunks with metadata. """ markdown_path = Path(markdown_path) content = markdown_path.read_text(encoding="utf-8") frontmatter, body = self._extract_frontmatter(content) source_file = frontmatter.get("source", markdown_path.name) sections = self._find_section_boundaries(body) all_chunks = [] chunk_index = 0 for start_pos, end_pos, section_title in sections: section_content = body[start_pos:end_pos].strip() if not section_content: continue section_chunks = self._split_section(section_content, section_title) section_chunks = self._add_overlap(section_chunks) for chunk_content in section_chunks: if not chunk_content.strip(): continue chunk = DocumentChunk( content=chunk_content, source_file=str(markdown_path), chunk_index=chunk_index, start_char=start_pos, end_char=end_pos, section_title=section_title, ) all_chunks.append(chunk) chunk_index += 1 return all_chunks def chunk_documents(self, markdown_paths: list[Path]) -> list[DocumentChunk]: """Chunk multiple Markdown documents. Args: markdown_paths: List of paths to Markdown files. Returns: List of all DocumentChunks from all documents. """ all_chunks = [] for path in markdown_paths: chunks = self.chunk_document(path) all_chunks.extend(chunks) return all_chunks