"""Semantic chunker for processing markdown documents with hierarchical structure.""" import hashlib import json import re from pathlib import Path from typing import Dict, List, Optional, Tuple from llama_index.core.node_parser import SentenceSplitter from pydantic import BaseModel, Field from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn from config.settings import settings class ChunkNode(BaseModel): """ Pydantic model representing a semantic chunk of text. Attributes: chunk_id: Unique identifier for the chunk content: The actual text content parent_section: The section header this chunk belongs to document_title: Original article title source_url: EyeWiki URL of the source document chunk_index: Position of chunk in the document (0-indexed) token_count: Approximate number of tokens in the chunk metadata: Additional metadata from the source document """ chunk_id: str = Field(..., description="Unique identifier (hash-based)") content: str = Field(..., description="Text content of the chunk") parent_section: str = Field(default="", description="Parent section header") document_title: str = Field(default="", description="Original document title") source_url: str = Field(default="", description="Source URL") chunk_index: int = Field(..., ge=0, description="Position in document") token_count: int = Field(..., ge=0, description="Approximate token count") metadata: Dict = Field(default_factory=dict, description="Additional metadata") def to_dict(self) -> Dict: """Convert to dictionary representation.""" return self.model_dump() @classmethod def from_dict(cls, data: Dict) -> "ChunkNode": """Create ChunkNode from dictionary.""" return cls(**data) class SemanticChunker: """ Hierarchical semantic chunker that respects markdown structure. Features: - Splits on ## headers first (sections) - Then splits large sections into semantic chunks - Preserves parent section context - Uses LlamaIndex SentenceSplitter for semantic splitting - Configurable chunk sizes and overlap """ def __init__( self, chunk_size: Optional[int] = None, chunk_overlap: Optional[int] = None, min_chunk_size: int = 100, ): """ Initialize the SemanticChunker. Args: chunk_size: Target chunk size in tokens (default: from settings) chunk_overlap: Overlap between chunks in tokens (default: from settings) min_chunk_size: Minimum chunk size to keep (default: 100 tokens) """ self.chunk_size = chunk_size or settings.chunk_size self.chunk_overlap = chunk_overlap or settings.chunk_overlap self.min_chunk_size = min_chunk_size # Initialize LlamaIndex sentence splitter self.sentence_splitter = SentenceSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, ) self.console = Console() def _estimate_tokens(self, text: str) -> int: """ Estimate token count for text. Uses a simple heuristic: ~4 characters per token. More accurate than word count for medical/technical text. Args: text: Input text Returns: Estimated token count """ return len(text) // 4 def _generate_chunk_id(self, content: str, chunk_index: int, source_url: str) -> str: """ Generate unique chunk ID using hash. Args: content: Chunk content chunk_index: Index of chunk source_url: Source URL Returns: Unique chunk identifier """ # Create a unique string combining content snippet, index, and source unique_string = f"{source_url}:{chunk_index}:{content[:100]}" return hashlib.sha256(unique_string.encode()).hexdigest()[:16] def _parse_markdown_sections(self, markdown: str) -> List[Tuple[str, str]]: """ Parse markdown into sections based on ## headers. Args: markdown: Markdown content Returns: List of (header, content) tuples """ sections = [] # Split by ## headers (h2) # Pattern matches: ## Header or ##Header pattern = r"^##\s+(.+?)$" lines = markdown.split("\n") current_header = "" current_content = [] for line in lines: match = re.match(pattern, line) if match: # Save previous section if it has content if current_content: sections.append((current_header, "\n".join(current_content))) # Start new section current_header = match.group(1).strip() current_content = [line] # Include the header in content else: current_content.append(line) # Add final section if current_content: sections.append((current_header, "\n".join(current_content))) return sections def _split_large_section(self, text: str) -> List[str]: """ Split large section into semantic chunks using LlamaIndex. Args: text: Section text to split Returns: List of text chunks """ # Use LlamaIndex SentenceSplitter chunks = self.sentence_splitter.split_text(text) return chunks def _clean_content(self, content: str) -> str: """ Clean chunk content by removing excessive whitespace. Args: content: Raw content Returns: Cleaned content """ # Remove excessive blank lines (more than 2 consecutive) content = re.sub(r"\n{3,}", "\n\n", content) # Remove leading/trailing whitespace content = content.strip() return content def chunk_document( self, markdown_content: str, metadata: Dict, ) -> List[ChunkNode]: """ Chunk a markdown document with hierarchical structure. Process: 1. Parse document into sections by ## headers 2. For each section, check if it needs splitting 3. If section is small enough, keep as single chunk 4. If section is large, split into semantic chunks 5. Preserve parent section context in each chunk Args: markdown_content: Markdown text content metadata: Document metadata (must include 'url' and 'title') Returns: List of ChunkNode objects """ chunks = [] chunk_index = 0 # Extract metadata source_url = metadata.get("url", "") document_title = metadata.get("title", "Untitled") # Parse into sections sections = self._parse_markdown_sections(markdown_content) # If no sections found, treat entire document as one section if not sections or (len(sections) == 1 and not sections[0][0]): sections = [("", markdown_content)] for section_header, section_content in sections: # Clean section content section_content = self._clean_content(section_content) # Skip empty sections if not section_content: continue # Estimate tokens in section section_tokens = self._estimate_tokens(section_content) # If section is smaller than chunk size, keep as single chunk if section_tokens <= self.chunk_size: # Only create chunk if it meets minimum size if section_tokens >= self.min_chunk_size: chunk_id = self._generate_chunk_id( section_content, chunk_index, source_url ) chunk = ChunkNode( chunk_id=chunk_id, content=section_content, parent_section=section_header, document_title=document_title, source_url=source_url, chunk_index=chunk_index, token_count=section_tokens, metadata=metadata, ) chunks.append(chunk) chunk_index += 1 else: # Section is large, split into semantic chunks sub_chunks = self._split_large_section(section_content) for sub_chunk_content in sub_chunks: sub_chunk_content = self._clean_content(sub_chunk_content) # Skip if empty or too small sub_chunk_tokens = self._estimate_tokens(sub_chunk_content) if sub_chunk_tokens < self.min_chunk_size: continue chunk_id = self._generate_chunk_id( sub_chunk_content, chunk_index, source_url ) chunk = ChunkNode( chunk_id=chunk_id, content=sub_chunk_content, parent_section=section_header, document_title=document_title, source_url=source_url, chunk_index=chunk_index, token_count=sub_chunk_tokens, metadata=metadata, ) chunks.append(chunk) chunk_index += 1 return chunks def chunk_directory( self, input_dir: Path, output_dir: Path, pattern: str = "*.md", ) -> Dict[str, int]: """ Process all markdown files in a directory. For each .md file, looks for corresponding .json metadata file, chunks the document, and saves chunks to output directory. Args: input_dir: Directory containing markdown files output_dir: Directory to save chunked outputs pattern: Glob pattern for files to process (default: "*.md") Returns: Dictionary with processing statistics """ input_dir = Path(input_dir) output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Find all markdown files md_files = list(input_dir.glob(pattern)) if not md_files: self.console.print(f"[yellow]No files matching '{pattern}' found in {input_dir}[/yellow]") return {"processed": 0, "failed": 0, "total_chunks": 0} stats = { "processed": 0, "failed": 0, "skipped": 0, "total_chunks": 0, "total_tokens": 0, } self.console.print(f"\n[bold cyan]Chunking Documents[/bold cyan]") self.console.print(f"Input: {input_dir}") self.console.print(f"Output: {output_dir}") self.console.print(f"Files found: {len(md_files)}\n") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=self.console, ) as progress: task = progress.add_task( "[cyan]Processing...", total=len(md_files), ) for md_file in md_files: try: # Look for corresponding JSON metadata file json_file = md_file.with_suffix(".json") if not json_file.exists(): self.console.print( f"[yellow]Skipping {md_file.name}: No metadata file found[/yellow]" ) stats["skipped"] += 1 progress.advance(task) continue # Read markdown content with open(md_file, "r", encoding="utf-8") as f: markdown_content = f.read() # Read metadata with open(json_file, "r", encoding="utf-8") as f: metadata = json.load(f) # Skip if markdown is too small if self._estimate_tokens(markdown_content) < self.min_chunk_size: self.console.print( f"[yellow]Skipping {md_file.name}: Content too small[/yellow]" ) stats["skipped"] += 1 progress.advance(task) continue # Chunk the document chunks = self.chunk_document(markdown_content, metadata) if not chunks: self.console.print( f"[yellow]Skipping {md_file.name}: No chunks created[/yellow]" ) stats["skipped"] += 1 progress.advance(task) continue # Save chunks to output file output_file = output_dir / f"{md_file.stem}_chunks.json" with open(output_file, "w", encoding="utf-8") as f: chunk_dicts = [chunk.to_dict() for chunk in chunks] json.dump(chunk_dicts, f, indent=2, ensure_ascii=False) # Update stats stats["processed"] += 1 stats["total_chunks"] += len(chunks) stats["total_tokens"] += sum(chunk.token_count for chunk in chunks) progress.update( task, description=f"[cyan]Processing ({stats['processed']} done, {stats['total_chunks']} chunks): {md_file.name[:40]}...", ) progress.advance(task) except Exception as e: self.console.print(f"[red]Error processing {md_file.name}: {e}[/red]") stats["failed"] += 1 progress.advance(task) # Print summary self.console.print("\n[bold cyan]Chunking Summary[/bold cyan]") self.console.print(f"Files processed: {stats['processed']}") self.console.print(f"Files skipped: {stats['skipped']}") self.console.print(f"Files failed: {stats['failed']}") self.console.print(f"Total chunks created: {stats['total_chunks']}") self.console.print(f"Total tokens: {stats['total_tokens']:,}") if stats["processed"] > 0: avg_chunks = stats["total_chunks"] / stats["processed"] avg_tokens = stats["total_tokens"] / stats["total_chunks"] if stats["total_chunks"] > 0 else 0 self.console.print(f"Average chunks per document: {avg_chunks:.1f}") self.console.print(f"Average tokens per chunk: {avg_tokens:.1f}") return stats