"""Semantic chunker with table preservation for FDAM knowledge base. Chunking rules: - Keep markdown tables intact (never split) - Preserve headers with content for context - Target 400-600 tokens per chunk - Include metadata (source, category, section, priority) """ import re from dataclasses import dataclass, field from typing import Literal from pathlib import Path @dataclass class Chunk: """A chunk of text with metadata for RAG indexing.""" id: str text: str source: str # Filename category: Literal[ "methodology", "thresholds", "lab-methods", "cleaning-procedures", "wildfire", "safety", ] section: str # Section header path (e.g., "4.1 Zone Classification") priority: Literal["primary", "reference-threshold", "reference-narrative"] content_type: Literal["narrative", "table", "list", "mixed"] keywords: list[str] = field(default_factory=list) def to_metadata(self) -> dict: """Convert to metadata dict for ChromaDB.""" return { "source": self.source, "category": self.category, "section": self.section, "priority": self.priority, "content_type": self.content_type, "keywords": ",".join(self.keywords), } class SemanticChunker: """Chunks markdown documents while preserving tables and semantic structure.""" # Approximate tokens per character (conservative estimate) CHARS_PER_TOKEN = 4 TARGET_MIN_TOKENS = 400 TARGET_MAX_TOKENS = 600 def __init__(self): self.target_min_chars = self.TARGET_MIN_TOKENS * self.CHARS_PER_TOKEN self.target_max_chars = self.TARGET_MAX_TOKENS * self.CHARS_PER_TOKEN def chunk_document( self, text: str, source: str, category: Literal[ "methodology", "thresholds", "lab-methods", "cleaning-procedures", "wildfire", "safety", ], priority: Literal["primary", "reference-threshold", "reference-narrative"], ) -> list[Chunk]: """Chunk a markdown document into semantic units. Args: text: Full document text (markdown format) source: Source filename category: Document category priority: Document priority level Returns: List of Chunk objects ready for indexing """ # Split into sections by headers sections = self._split_by_headers(text) chunks = [] chunk_counter = 0 # Accumulator that persists across sections current_chunk_text = "" current_content_types: set[str] = set() current_section = "Introduction" # Track primary section for metadata for section_header, section_content in sections: # Split section into blocks (paragraphs, tables, lists) blocks = self._split_into_blocks(section_content) for block_text, block_type in blocks: block_len = len(block_text) # Tables are never split - flush current and add table as own chunk if block_type == "table": # Flush current chunk if it meets minimum size if current_chunk_text.strip() and len(current_chunk_text) >= self.target_min_chars: chunks.append( self._create_chunk( chunk_id=f"{source}_{chunk_counter}", text=current_chunk_text.strip(), source=source, category=category, section=current_section, priority=priority, content_types=current_content_types, ) ) chunk_counter += 1 current_chunk_text = "" current_content_types = set() current_section = section_header elif current_chunk_text.strip(): # Below minimum - prepend to table context pass # Keep accumulating, table will have its own chunk # Add table as its own chunk (tables always standalone) table_text = f"{section_header}\n\n{block_text}".strip() # If we have small accumulated content, prepend it to give context if current_chunk_text.strip() and len(current_chunk_text) < self.target_min_chars: table_text = current_chunk_text.strip() + "\n\n" + table_text current_chunk_text = "" current_content_types = set() chunks.append( self._create_chunk( chunk_id=f"{source}_{chunk_counter}", text=table_text, source=source, category=category, section=section_header, priority=priority, content_types={"table"}, ) ) chunk_counter += 1 current_section = section_header continue # Check if adding this block exceeds target max potential_len = len(current_chunk_text) + block_len + len(section_header) + 4 if potential_len > self.target_max_chars and len(current_chunk_text) >= self.target_min_chars: # Flush current chunk - it's large enough chunks.append( self._create_chunk( chunk_id=f"{source}_{chunk_counter}", text=current_chunk_text.strip(), source=source, category=category, section=current_section, priority=priority, content_types=current_content_types, ) ) chunk_counter += 1 # Start new chunk with section header current_chunk_text = f"{section_header}\n\n" current_content_types = set() current_section = section_header # Add section header if starting fresh or new section if not current_chunk_text.strip(): current_chunk_text = f"{section_header}\n\n" current_section = section_header elif section_header != current_section and section_header not in current_chunk_text: # Add new section header inline for context current_chunk_text += f"\n{section_header}\n\n" current_chunk_text += block_text + "\n\n" current_content_types.add(block_type) # Flush remaining content (regardless of size - it's the end) if current_chunk_text.strip(): chunks.append( self._create_chunk( chunk_id=f"{source}_{chunk_counter}", text=current_chunk_text.strip(), source=source, category=category, section=current_section, priority=priority, content_types=current_content_types, ) ) return chunks def _split_by_headers(self, text: str) -> list[tuple[str, str]]: """Split document by markdown headers (## and ###). Returns list of (header, content) tuples. """ # Match ## or ### headers header_pattern = r"^(#{2,3}\s+.+)$" lines = text.split("\n") sections = [] current_header = "Introduction" current_content = [] for line in lines: if re.match(header_pattern, line): # Save previous section if current_content: sections.append((current_header, "\n".join(current_content))) current_header = line.strip() current_content = [] else: current_content.append(line) # Save final section if current_content: sections.append((current_header, "\n".join(current_content))) return sections def _split_into_blocks(self, text: str) -> list[tuple[str, str]]: """Split section content into blocks (paragraphs, tables, lists). Returns list of (block_text, block_type) tuples. """ blocks = [] lines = text.split("\n") current_block = [] current_type = "narrative" in_table = False for line in lines: # Detect table start/end if line.strip().startswith("|") and "|" in line[1:]: if not in_table: # Flush current block if current_block: block_text = "\n".join(current_block).strip() if block_text: blocks.append((block_text, current_type)) current_block = [] in_table = True current_type = "table" current_block.append(line) elif in_table: # Table ended block_text = "\n".join(current_block).strip() if block_text: blocks.append((block_text, "table")) current_block = [line] if line.strip() else [] in_table = False current_type = "narrative" elif line.strip().startswith(("- ", "* ", "1. ", "2. ", "3. ")): # List item if current_type != "list" and current_block: block_text = "\n".join(current_block).strip() if block_text: blocks.append((block_text, current_type)) current_block = [] current_type = "list" current_block.append(line) elif line.strip() == "" and current_block: # Paragraph break if not in_table: block_text = "\n".join(current_block).strip() if block_text: blocks.append((block_text, current_type)) current_block = [] current_type = "narrative" else: if current_type == "list" and not line.strip().startswith( ("- ", "* ", " ") ): # End of list block_text = "\n".join(current_block).strip() if block_text: blocks.append((block_text, "list")) current_block = [] current_type = "narrative" current_block.append(line) # Flush remaining if current_block: block_text = "\n".join(current_block).strip() if block_text: blocks.append((block_text, current_type)) return blocks def _create_chunk( self, chunk_id: str, text: str, source: str, category: str, section: str, priority: str, content_types: set[str], ) -> Chunk: """Create a Chunk object with extracted keywords.""" # Determine primary content type if "table" in content_types: content_type = "table" elif "list" in content_types and "narrative" in content_types: content_type = "mixed" elif "list" in content_types: content_type = "list" else: content_type = "narrative" # Extract keywords from text keywords = self._extract_keywords(text) return Chunk( id=chunk_id, text=text, source=source, category=category, section=section, priority=priority, content_type=content_type, keywords=keywords, ) def _extract_keywords(self, text: str) -> list[str]: """Extract relevant keywords from chunk text.""" # Domain-specific keywords to look for domain_terms = [ # Zone classifications "burn zone", "near-field", "far-field", # Condition levels "background", "light", "moderate", "heavy", "structural damage", # Dispositions "no action", "clean", "evaluate", "remove", "remove/repair", # Materials "soot", "char", "ash", "particulate", "aciniform", # Thresholds "lead", "cadmium", "arsenic", "metals", "µg/100cm²", "cts/cm²", # Facility types "operational", "non-operational", "public", "childcare", # Standards "ach", "nadca", "epa", "hud", "osha", # Sampling "sampling", "wipe", "bulk", "air", "clearance", # Lab methods "plm", "icp-ms", "xrf", "tapelift", # Actions "hepa", "vacuum", "deodorization", "encapsulation", ] text_lower = text.lower() found_keywords = [] for term in domain_terms: if term in text_lower: found_keywords.append(term) return found_keywords[:10] # Limit to top 10 def chunk_file( filepath: Path, category: Literal[ "methodology", "thresholds", "lab-methods", "cleaning-procedures", "wildfire", "safety", ], priority: Literal["primary", "reference-threshold", "reference-narrative"], ) -> list[Chunk]: """Convenience function to chunk a markdown file. Args: filepath: Path to markdown file category: Document category priority: Document priority level Returns: List of Chunk objects """ chunker = SemanticChunker() text = filepath.read_text(encoding="utf-8") return chunker.chunk_document( text=text, source=filepath.name, category=category, priority=priority, )