Spaces:
Paused
Paused
| """Semantic chunker with table preservation for FDAM knowledge base. | |
| Chunking rules: | |
| - Keep markdown tables intact (never split) | |
| - Preserve headers with content for context | |
| - Target 400-600 tokens per chunk | |
| - Include metadata (source, category, section, priority) | |
| """ | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Literal | |
| from pathlib import Path | |
| class Chunk: | |
| """A chunk of text with metadata for RAG indexing.""" | |
| id: str | |
| text: str | |
| source: str # Filename | |
| category: Literal[ | |
| "methodology", | |
| "thresholds", | |
| "lab-methods", | |
| "cleaning-procedures", | |
| "wildfire", | |
| "safety", | |
| ] | |
| section: str # Section header path (e.g., "4.1 Zone Classification") | |
| priority: Literal["primary", "reference-threshold", "reference-narrative"] | |
| content_type: Literal["narrative", "table", "list", "mixed"] | |
| keywords: list[str] = field(default_factory=list) | |
| def to_metadata(self) -> dict: | |
| """Convert to metadata dict for ChromaDB.""" | |
| return { | |
| "source": self.source, | |
| "category": self.category, | |
| "section": self.section, | |
| "priority": self.priority, | |
| "content_type": self.content_type, | |
| "keywords": ",".join(self.keywords), | |
| } | |
| class SemanticChunker: | |
| """Chunks markdown documents while preserving tables and semantic structure.""" | |
| # Approximate tokens per character (conservative estimate) | |
| CHARS_PER_TOKEN = 4 | |
| TARGET_MIN_TOKENS = 400 | |
| TARGET_MAX_TOKENS = 600 | |
| def __init__(self): | |
| self.target_min_chars = self.TARGET_MIN_TOKENS * self.CHARS_PER_TOKEN | |
| self.target_max_chars = self.TARGET_MAX_TOKENS * self.CHARS_PER_TOKEN | |
| def chunk_document( | |
| self, | |
| text: str, | |
| source: str, | |
| category: Literal[ | |
| "methodology", | |
| "thresholds", | |
| "lab-methods", | |
| "cleaning-procedures", | |
| "wildfire", | |
| "safety", | |
| ], | |
| priority: Literal["primary", "reference-threshold", "reference-narrative"], | |
| ) -> list[Chunk]: | |
| """Chunk a markdown document into semantic units. | |
| Args: | |
| text: Full document text (markdown format) | |
| source: Source filename | |
| category: Document category | |
| priority: Document priority level | |
| Returns: | |
| List of Chunk objects ready for indexing | |
| """ | |
| # Split into sections by headers | |
| sections = self._split_by_headers(text) | |
| chunks = [] | |
| chunk_counter = 0 | |
| # Accumulator that persists across sections | |
| current_chunk_text = "" | |
| current_content_types: set[str] = set() | |
| current_section = "Introduction" # Track primary section for metadata | |
| for section_header, section_content in sections: | |
| # Split section into blocks (paragraphs, tables, lists) | |
| blocks = self._split_into_blocks(section_content) | |
| for block_text, block_type in blocks: | |
| block_len = len(block_text) | |
| # Tables are never split - flush current and add table as own chunk | |
| if block_type == "table": | |
| # Flush current chunk if it meets minimum size | |
| if current_chunk_text.strip() and len(current_chunk_text) >= self.target_min_chars: | |
| chunks.append( | |
| self._create_chunk( | |
| chunk_id=f"{source}_{chunk_counter}", | |
| text=current_chunk_text.strip(), | |
| source=source, | |
| category=category, | |
| section=current_section, | |
| priority=priority, | |
| content_types=current_content_types, | |
| ) | |
| ) | |
| chunk_counter += 1 | |
| current_chunk_text = "" | |
| current_content_types = set() | |
| current_section = section_header | |
| elif current_chunk_text.strip(): | |
| # Below minimum - prepend to table context | |
| pass # Keep accumulating, table will have its own chunk | |
| # Add table as its own chunk (tables always standalone) | |
| table_text = f"{section_header}\n\n{block_text}".strip() | |
| # If we have small accumulated content, prepend it to give context | |
| if current_chunk_text.strip() and len(current_chunk_text) < self.target_min_chars: | |
| table_text = current_chunk_text.strip() + "\n\n" + table_text | |
| current_chunk_text = "" | |
| current_content_types = set() | |
| chunks.append( | |
| self._create_chunk( | |
| chunk_id=f"{source}_{chunk_counter}", | |
| text=table_text, | |
| source=source, | |
| category=category, | |
| section=section_header, | |
| priority=priority, | |
| content_types={"table"}, | |
| ) | |
| ) | |
| chunk_counter += 1 | |
| current_section = section_header | |
| continue | |
| # Check if adding this block exceeds target max | |
| potential_len = len(current_chunk_text) + block_len + len(section_header) + 4 | |
| if potential_len > self.target_max_chars and len(current_chunk_text) >= self.target_min_chars: | |
| # Flush current chunk - it's large enough | |
| chunks.append( | |
| self._create_chunk( | |
| chunk_id=f"{source}_{chunk_counter}", | |
| text=current_chunk_text.strip(), | |
| source=source, | |
| category=category, | |
| section=current_section, | |
| priority=priority, | |
| content_types=current_content_types, | |
| ) | |
| ) | |
| chunk_counter += 1 | |
| # Start new chunk with section header | |
| current_chunk_text = f"{section_header}\n\n" | |
| current_content_types = set() | |
| current_section = section_header | |
| # Add section header if starting fresh or new section | |
| if not current_chunk_text.strip(): | |
| current_chunk_text = f"{section_header}\n\n" | |
| current_section = section_header | |
| elif section_header != current_section and section_header not in current_chunk_text: | |
| # Add new section header inline for context | |
| current_chunk_text += f"\n{section_header}\n\n" | |
| current_chunk_text += block_text + "\n\n" | |
| current_content_types.add(block_type) | |
| # Flush remaining content (regardless of size - it's the end) | |
| if current_chunk_text.strip(): | |
| chunks.append( | |
| self._create_chunk( | |
| chunk_id=f"{source}_{chunk_counter}", | |
| text=current_chunk_text.strip(), | |
| source=source, | |
| category=category, | |
| section=current_section, | |
| priority=priority, | |
| content_types=current_content_types, | |
| ) | |
| ) | |
| return chunks | |
| def _split_by_headers(self, text: str) -> list[tuple[str, str]]: | |
| """Split document by markdown headers (## and ###). | |
| Returns list of (header, content) tuples. | |
| """ | |
| # Match ## or ### headers | |
| header_pattern = r"^(#{2,3}\s+.+)$" | |
| lines = text.split("\n") | |
| sections = [] | |
| current_header = "Introduction" | |
| current_content = [] | |
| for line in lines: | |
| if re.match(header_pattern, line): | |
| # Save previous section | |
| if current_content: | |
| sections.append((current_header, "\n".join(current_content))) | |
| current_header = line.strip() | |
| current_content = [] | |
| else: | |
| current_content.append(line) | |
| # Save final section | |
| if current_content: | |
| sections.append((current_header, "\n".join(current_content))) | |
| return sections | |
| def _split_into_blocks(self, text: str) -> list[tuple[str, str]]: | |
| """Split section content into blocks (paragraphs, tables, lists). | |
| Returns list of (block_text, block_type) tuples. | |
| """ | |
| blocks = [] | |
| lines = text.split("\n") | |
| current_block = [] | |
| current_type = "narrative" | |
| in_table = False | |
| for line in lines: | |
| # Detect table start/end | |
| if line.strip().startswith("|") and "|" in line[1:]: | |
| if not in_table: | |
| # Flush current block | |
| if current_block: | |
| block_text = "\n".join(current_block).strip() | |
| if block_text: | |
| blocks.append((block_text, current_type)) | |
| current_block = [] | |
| in_table = True | |
| current_type = "table" | |
| current_block.append(line) | |
| elif in_table: | |
| # Table ended | |
| block_text = "\n".join(current_block).strip() | |
| if block_text: | |
| blocks.append((block_text, "table")) | |
| current_block = [line] if line.strip() else [] | |
| in_table = False | |
| current_type = "narrative" | |
| elif line.strip().startswith(("- ", "* ", "1. ", "2. ", "3. ")): | |
| # List item | |
| if current_type != "list" and current_block: | |
| block_text = "\n".join(current_block).strip() | |
| if block_text: | |
| blocks.append((block_text, current_type)) | |
| current_block = [] | |
| current_type = "list" | |
| current_block.append(line) | |
| elif line.strip() == "" and current_block: | |
| # Paragraph break | |
| if not in_table: | |
| block_text = "\n".join(current_block).strip() | |
| if block_text: | |
| blocks.append((block_text, current_type)) | |
| current_block = [] | |
| current_type = "narrative" | |
| else: | |
| if current_type == "list" and not line.strip().startswith( | |
| ("- ", "* ", " ") | |
| ): | |
| # End of list | |
| block_text = "\n".join(current_block).strip() | |
| if block_text: | |
| blocks.append((block_text, "list")) | |
| current_block = [] | |
| current_type = "narrative" | |
| current_block.append(line) | |
| # Flush remaining | |
| if current_block: | |
| block_text = "\n".join(current_block).strip() | |
| if block_text: | |
| blocks.append((block_text, current_type)) | |
| return blocks | |
| def _create_chunk( | |
| self, | |
| chunk_id: str, | |
| text: str, | |
| source: str, | |
| category: str, | |
| section: str, | |
| priority: str, | |
| content_types: set[str], | |
| ) -> Chunk: | |
| """Create a Chunk object with extracted keywords.""" | |
| # Determine primary content type | |
| if "table" in content_types: | |
| content_type = "table" | |
| elif "list" in content_types and "narrative" in content_types: | |
| content_type = "mixed" | |
| elif "list" in content_types: | |
| content_type = "list" | |
| else: | |
| content_type = "narrative" | |
| # Extract keywords from text | |
| keywords = self._extract_keywords(text) | |
| return Chunk( | |
| id=chunk_id, | |
| text=text, | |
| source=source, | |
| category=category, | |
| section=section, | |
| priority=priority, | |
| content_type=content_type, | |
| keywords=keywords, | |
| ) | |
| def _extract_keywords(self, text: str) -> list[str]: | |
| """Extract relevant keywords from chunk text.""" | |
| # Domain-specific keywords to look for | |
| domain_terms = [ | |
| # Zone classifications | |
| "burn zone", | |
| "near-field", | |
| "far-field", | |
| # Condition levels | |
| "background", | |
| "light", | |
| "moderate", | |
| "heavy", | |
| "structural damage", | |
| # Dispositions | |
| "no action", | |
| "clean", | |
| "evaluate", | |
| "remove", | |
| "remove/repair", | |
| # Materials | |
| "soot", | |
| "char", | |
| "ash", | |
| "particulate", | |
| "aciniform", | |
| # Thresholds | |
| "lead", | |
| "cadmium", | |
| "arsenic", | |
| "metals", | |
| "µg/100cm²", | |
| "cts/cm²", | |
| # Facility types | |
| "operational", | |
| "non-operational", | |
| "public", | |
| "childcare", | |
| # Standards | |
| "ach", | |
| "nadca", | |
| "epa", | |
| "hud", | |
| "osha", | |
| # Sampling | |
| "sampling", | |
| "wipe", | |
| "bulk", | |
| "air", | |
| "clearance", | |
| # Lab methods | |
| "plm", | |
| "icp-ms", | |
| "xrf", | |
| "tapelift", | |
| # Actions | |
| "hepa", | |
| "vacuum", | |
| "deodorization", | |
| "encapsulation", | |
| ] | |
| text_lower = text.lower() | |
| found_keywords = [] | |
| for term in domain_terms: | |
| if term in text_lower: | |
| found_keywords.append(term) | |
| return found_keywords[:10] # Limit to top 10 | |
| def chunk_file( | |
| filepath: Path, | |
| category: Literal[ | |
| "methodology", | |
| "thresholds", | |
| "lab-methods", | |
| "cleaning-procedures", | |
| "wildfire", | |
| "safety", | |
| ], | |
| priority: Literal["primary", "reference-threshold", "reference-narrative"], | |
| ) -> list[Chunk]: | |
| """Convenience function to chunk a markdown file. | |
| Args: | |
| filepath: Path to markdown file | |
| category: Document category | |
| priority: Document priority level | |
| Returns: | |
| List of Chunk objects | |
| """ | |
| chunker = SemanticChunker() | |
| text = filepath.read_text(encoding="utf-8") | |
| return chunker.chunk_document( | |
| text=text, | |
| source=filepath.name, | |
| category=category, | |
| priority=priority, | |
| ) | |