| | """Semantic chunker for processing markdown documents with hierarchical structure.""" |
| |
|
| | import hashlib |
| | import json |
| | import re |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Tuple |
| |
|
| | from llama_index.core.node_parser import SentenceSplitter |
| | from pydantic import BaseModel, Field |
| | from rich.console import Console |
| | from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn |
| |
|
| | from config.settings import settings |
| |
|
| |
|
| | class ChunkNode(BaseModel): |
| | """ |
| | Pydantic model representing a semantic chunk of text. |
| | |
| | Attributes: |
| | chunk_id: Unique identifier for the chunk |
| | content: The actual text content |
| | parent_section: The section header this chunk belongs to |
| | document_title: Original article title |
| | source_url: EyeWiki URL of the source document |
| | chunk_index: Position of chunk in the document (0-indexed) |
| | token_count: Approximate number of tokens in the chunk |
| | metadata: Additional metadata from the source document |
| | """ |
| |
|
| | chunk_id: str = Field(..., description="Unique identifier (hash-based)") |
| | content: str = Field(..., description="Text content of the chunk") |
| | parent_section: str = Field(default="", description="Parent section header") |
| | document_title: str = Field(default="", description="Original document title") |
| | source_url: str = Field(default="", description="Source URL") |
| | chunk_index: int = Field(..., ge=0, description="Position in document") |
| | token_count: int = Field(..., ge=0, description="Approximate token count") |
| | metadata: Dict = Field(default_factory=dict, description="Additional metadata") |
| |
|
| | def to_dict(self) -> Dict: |
| | """Convert to dictionary representation.""" |
| | return self.model_dump() |
| |
|
| | @classmethod |
| | def from_dict(cls, data: Dict) -> "ChunkNode": |
| | """Create ChunkNode from dictionary.""" |
| | return cls(**data) |
| |
|
| |
|
| | class SemanticChunker: |
| | """ |
| | Hierarchical semantic chunker that respects markdown structure. |
| | |
| | Features: |
| | - Splits on ## headers first (sections) |
| | - Then splits large sections into semantic chunks |
| | - Preserves parent section context |
| | - Uses LlamaIndex SentenceSplitter for semantic splitting |
| | - Configurable chunk sizes and overlap |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | chunk_size: Optional[int] = None, |
| | chunk_overlap: Optional[int] = None, |
| | min_chunk_size: int = 100, |
| | ): |
| | """ |
| | Initialize the SemanticChunker. |
| | |
| | Args: |
| | chunk_size: Target chunk size in tokens (default: from settings) |
| | chunk_overlap: Overlap between chunks in tokens (default: from settings) |
| | min_chunk_size: Minimum chunk size to keep (default: 100 tokens) |
| | """ |
| | self.chunk_size = chunk_size or settings.chunk_size |
| | self.chunk_overlap = chunk_overlap or settings.chunk_overlap |
| | self.min_chunk_size = min_chunk_size |
| |
|
| | |
| | self.sentence_splitter = SentenceSplitter( |
| | chunk_size=self.chunk_size, |
| | chunk_overlap=self.chunk_overlap, |
| | ) |
| |
|
| | self.console = Console() |
| |
|
| | def _estimate_tokens(self, text: str) -> int: |
| | """ |
| | Estimate token count for text. |
| | |
| | Uses a simple heuristic: ~4 characters per token. |
| | More accurate than word count for medical/technical text. |
| | |
| | Args: |
| | text: Input text |
| | |
| | Returns: |
| | Estimated token count |
| | """ |
| | return len(text) // 4 |
| |
|
| | def _generate_chunk_id(self, content: str, chunk_index: int, source_url: str) -> str: |
| | """ |
| | Generate unique chunk ID using hash. |
| | |
| | Args: |
| | content: Chunk content |
| | chunk_index: Index of chunk |
| | source_url: Source URL |
| | |
| | Returns: |
| | Unique chunk identifier |
| | """ |
| | |
| | unique_string = f"{source_url}:{chunk_index}:{content[:100]}" |
| | return hashlib.sha256(unique_string.encode()).hexdigest()[:16] |
| |
|
| | def _parse_markdown_sections(self, markdown: str) -> List[Tuple[str, str]]: |
| | """ |
| | Parse markdown into sections based on ## headers. |
| | |
| | Args: |
| | markdown: Markdown content |
| | |
| | Returns: |
| | List of (header, content) tuples |
| | """ |
| | sections = [] |
| |
|
| | |
| | |
| | pattern = r"^##\s+(.+?)$" |
| | lines = markdown.split("\n") |
| |
|
| | current_header = "" |
| | current_content = [] |
| |
|
| | for line in lines: |
| | match = re.match(pattern, line) |
| | if match: |
| | |
| | if current_content: |
| | sections.append((current_header, "\n".join(current_content))) |
| |
|
| | |
| | current_header = match.group(1).strip() |
| | current_content = [line] |
| | else: |
| | current_content.append(line) |
| |
|
| | |
| | if current_content: |
| | sections.append((current_header, "\n".join(current_content))) |
| |
|
| | return sections |
| |
|
| | def _split_large_section(self, text: str) -> List[str]: |
| | """ |
| | Split large section into semantic chunks using LlamaIndex. |
| | |
| | Args: |
| | text: Section text to split |
| | |
| | Returns: |
| | List of text chunks |
| | """ |
| | |
| | chunks = self.sentence_splitter.split_text(text) |
| | return chunks |
| |
|
| | def _clean_content(self, content: str) -> str: |
| | """ |
| | Clean chunk content by removing excessive whitespace. |
| | |
| | Args: |
| | content: Raw content |
| | |
| | Returns: |
| | Cleaned content |
| | """ |
| | |
| | content = re.sub(r"\n{3,}", "\n\n", content) |
| |
|
| | |
| | content = content.strip() |
| |
|
| | return content |
| |
|
| | def chunk_document( |
| | self, |
| | markdown_content: str, |
| | metadata: Dict, |
| | ) -> List[ChunkNode]: |
| | """ |
| | Chunk a markdown document with hierarchical structure. |
| | |
| | Process: |
| | 1. Parse document into sections by ## headers |
| | 2. For each section, check if it needs splitting |
| | 3. If section is small enough, keep as single chunk |
| | 4. If section is large, split into semantic chunks |
| | 5. Preserve parent section context in each chunk |
| | |
| | Args: |
| | markdown_content: Markdown text content |
| | metadata: Document metadata (must include 'url' and 'title') |
| | |
| | Returns: |
| | List of ChunkNode objects |
| | """ |
| | chunks = [] |
| | chunk_index = 0 |
| |
|
| | |
| | source_url = metadata.get("url", "") |
| | document_title = metadata.get("title", "Untitled") |
| |
|
| | |
| | sections = self._parse_markdown_sections(markdown_content) |
| |
|
| | |
| | if not sections or (len(sections) == 1 and not sections[0][0]): |
| | sections = [("", markdown_content)] |
| |
|
| | for section_header, section_content in sections: |
| | |
| | section_content = self._clean_content(section_content) |
| |
|
| | |
| | if not section_content: |
| | continue |
| |
|
| | |
| | section_tokens = self._estimate_tokens(section_content) |
| |
|
| | |
| | if section_tokens <= self.chunk_size: |
| | |
| | if section_tokens >= self.min_chunk_size: |
| | chunk_id = self._generate_chunk_id( |
| | section_content, chunk_index, source_url |
| | ) |
| |
|
| | chunk = ChunkNode( |
| | chunk_id=chunk_id, |
| | content=section_content, |
| | parent_section=section_header, |
| | document_title=document_title, |
| | source_url=source_url, |
| | chunk_index=chunk_index, |
| | token_count=section_tokens, |
| | metadata=metadata, |
| | ) |
| | chunks.append(chunk) |
| | chunk_index += 1 |
| | else: |
| | |
| | sub_chunks = self._split_large_section(section_content) |
| |
|
| | for sub_chunk_content in sub_chunks: |
| | sub_chunk_content = self._clean_content(sub_chunk_content) |
| |
|
| | |
| | sub_chunk_tokens = self._estimate_tokens(sub_chunk_content) |
| | if sub_chunk_tokens < self.min_chunk_size: |
| | continue |
| |
|
| | chunk_id = self._generate_chunk_id( |
| | sub_chunk_content, chunk_index, source_url |
| | ) |
| |
|
| | chunk = ChunkNode( |
| | chunk_id=chunk_id, |
| | content=sub_chunk_content, |
| | parent_section=section_header, |
| | document_title=document_title, |
| | source_url=source_url, |
| | chunk_index=chunk_index, |
| | token_count=sub_chunk_tokens, |
| | metadata=metadata, |
| | ) |
| | chunks.append(chunk) |
| | chunk_index += 1 |
| |
|
| | return chunks |
| |
|
| | def chunk_directory( |
| | self, |
| | input_dir: Path, |
| | output_dir: Path, |
| | pattern: str = "*.md", |
| | ) -> Dict[str, int]: |
| | """ |
| | Process all markdown files in a directory. |
| | |
| | For each .md file, looks for corresponding .json metadata file, |
| | chunks the document, and saves chunks to output directory. |
| | |
| | Args: |
| | input_dir: Directory containing markdown files |
| | output_dir: Directory to save chunked outputs |
| | pattern: Glob pattern for files to process (default: "*.md") |
| | |
| | Returns: |
| | Dictionary with processing statistics |
| | """ |
| | input_dir = Path(input_dir) |
| | output_dir = Path(output_dir) |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | md_files = list(input_dir.glob(pattern)) |
| |
|
| | if not md_files: |
| | self.console.print(f"[yellow]No files matching '{pattern}' found in {input_dir}[/yellow]") |
| | return {"processed": 0, "failed": 0, "total_chunks": 0} |
| |
|
| | stats = { |
| | "processed": 0, |
| | "failed": 0, |
| | "skipped": 0, |
| | "total_chunks": 0, |
| | "total_tokens": 0, |
| | } |
| |
|
| | self.console.print(f"\n[bold cyan]Chunking Documents[/bold cyan]") |
| | self.console.print(f"Input: {input_dir}") |
| | self.console.print(f"Output: {output_dir}") |
| | self.console.print(f"Files found: {len(md_files)}\n") |
| |
|
| | with Progress( |
| | SpinnerColumn(), |
| | TextColumn("[progress.description]{task.description}"), |
| | BarColumn(), |
| | TaskProgressColumn(), |
| | console=self.console, |
| | ) as progress: |
| |
|
| | task = progress.add_task( |
| | "[cyan]Processing...", |
| | total=len(md_files), |
| | ) |
| |
|
| | for md_file in md_files: |
| | try: |
| | |
| | json_file = md_file.with_suffix(".json") |
| |
|
| | if not json_file.exists(): |
| | self.console.print( |
| | f"[yellow]Skipping {md_file.name}: No metadata file found[/yellow]" |
| | ) |
| | stats["skipped"] += 1 |
| | progress.advance(task) |
| | continue |
| |
|
| | |
| | with open(md_file, "r", encoding="utf-8") as f: |
| | markdown_content = f.read() |
| |
|
| | |
| | with open(json_file, "r", encoding="utf-8") as f: |
| | metadata = json.load(f) |
| |
|
| | |
| | if self._estimate_tokens(markdown_content) < self.min_chunk_size: |
| | self.console.print( |
| | f"[yellow]Skipping {md_file.name}: Content too small[/yellow]" |
| | ) |
| | stats["skipped"] += 1 |
| | progress.advance(task) |
| | continue |
| |
|
| | |
| | chunks = self.chunk_document(markdown_content, metadata) |
| |
|
| | if not chunks: |
| | self.console.print( |
| | f"[yellow]Skipping {md_file.name}: No chunks created[/yellow]" |
| | ) |
| | stats["skipped"] += 1 |
| | progress.advance(task) |
| | continue |
| |
|
| | |
| | output_file = output_dir / f"{md_file.stem}_chunks.json" |
| | with open(output_file, "w", encoding="utf-8") as f: |
| | chunk_dicts = [chunk.to_dict() for chunk in chunks] |
| | json.dump(chunk_dicts, f, indent=2, ensure_ascii=False) |
| |
|
| | |
| | stats["processed"] += 1 |
| | stats["total_chunks"] += len(chunks) |
| | stats["total_tokens"] += sum(chunk.token_count for chunk in chunks) |
| |
|
| | progress.update( |
| | task, |
| | description=f"[cyan]Processing ({stats['processed']} done, {stats['total_chunks']} chunks): {md_file.name[:40]}...", |
| | ) |
| | progress.advance(task) |
| |
|
| | except Exception as e: |
| | self.console.print(f"[red]Error processing {md_file.name}: {e}[/red]") |
| | stats["failed"] += 1 |
| | progress.advance(task) |
| |
|
| | |
| | self.console.print("\n[bold cyan]Chunking Summary[/bold cyan]") |
| | self.console.print(f"Files processed: {stats['processed']}") |
| | self.console.print(f"Files skipped: {stats['skipped']}") |
| | self.console.print(f"Files failed: {stats['failed']}") |
| | self.console.print(f"Total chunks created: {stats['total_chunks']}") |
| | self.console.print(f"Total tokens: {stats['total_tokens']:,}") |
| |
|
| | if stats["processed"] > 0: |
| | avg_chunks = stats["total_chunks"] / stats["processed"] |
| | avg_tokens = stats["total_tokens"] / stats["total_chunks"] if stats["total_chunks"] > 0 else 0 |
| | self.console.print(f"Average chunks per document: {avg_chunks:.1f}") |
| | self.console.print(f"Average tokens per chunk: {avg_tokens:.1f}") |
| |
|
| | return stats |
| |
|