SmokeScan / rag /chunker.py
KinetoLabs's picture
Initial commit: FDAM AI Pipeline v4.0.1
88bdcff
"""Semantic chunker with table preservation for FDAM knowledge base.
Chunking rules:
- Keep markdown tables intact (never split)
- Preserve headers with content for context
- Target 400-600 tokens per chunk
- Include metadata (source, category, section, priority)
"""
import re
from dataclasses import dataclass, field
from typing import Literal
from pathlib import Path
@dataclass
class Chunk:
"""A chunk of text with metadata for RAG indexing."""
id: str
text: str
source: str # Filename
category: Literal[
"methodology",
"thresholds",
"lab-methods",
"cleaning-procedures",
"wildfire",
"safety",
]
section: str # Section header path (e.g., "4.1 Zone Classification")
priority: Literal["primary", "reference-threshold", "reference-narrative"]
content_type: Literal["narrative", "table", "list", "mixed"]
keywords: list[str] = field(default_factory=list)
def to_metadata(self) -> dict:
"""Convert to metadata dict for ChromaDB."""
return {
"source": self.source,
"category": self.category,
"section": self.section,
"priority": self.priority,
"content_type": self.content_type,
"keywords": ",".join(self.keywords),
}
class SemanticChunker:
"""Chunks markdown documents while preserving tables and semantic structure."""
# Approximate tokens per character (conservative estimate)
CHARS_PER_TOKEN = 4
TARGET_MIN_TOKENS = 400
TARGET_MAX_TOKENS = 600
def __init__(self):
self.target_min_chars = self.TARGET_MIN_TOKENS * self.CHARS_PER_TOKEN
self.target_max_chars = self.TARGET_MAX_TOKENS * self.CHARS_PER_TOKEN
def chunk_document(
self,
text: str,
source: str,
category: Literal[
"methodology",
"thresholds",
"lab-methods",
"cleaning-procedures",
"wildfire",
"safety",
],
priority: Literal["primary", "reference-threshold", "reference-narrative"],
) -> list[Chunk]:
"""Chunk a markdown document into semantic units.
Args:
text: Full document text (markdown format)
source: Source filename
category: Document category
priority: Document priority level
Returns:
List of Chunk objects ready for indexing
"""
# Split into sections by headers
sections = self._split_by_headers(text)
chunks = []
chunk_counter = 0
# Accumulator that persists across sections
current_chunk_text = ""
current_content_types: set[str] = set()
current_section = "Introduction" # Track primary section for metadata
for section_header, section_content in sections:
# Split section into blocks (paragraphs, tables, lists)
blocks = self._split_into_blocks(section_content)
for block_text, block_type in blocks:
block_len = len(block_text)
# Tables are never split - flush current and add table as own chunk
if block_type == "table":
# Flush current chunk if it meets minimum size
if current_chunk_text.strip() and len(current_chunk_text) >= self.target_min_chars:
chunks.append(
self._create_chunk(
chunk_id=f"{source}_{chunk_counter}",
text=current_chunk_text.strip(),
source=source,
category=category,
section=current_section,
priority=priority,
content_types=current_content_types,
)
)
chunk_counter += 1
current_chunk_text = ""
current_content_types = set()
current_section = section_header
elif current_chunk_text.strip():
# Below minimum - prepend to table context
pass # Keep accumulating, table will have its own chunk
# Add table as its own chunk (tables always standalone)
table_text = f"{section_header}\n\n{block_text}".strip()
# If we have small accumulated content, prepend it to give context
if current_chunk_text.strip() and len(current_chunk_text) < self.target_min_chars:
table_text = current_chunk_text.strip() + "\n\n" + table_text
current_chunk_text = ""
current_content_types = set()
chunks.append(
self._create_chunk(
chunk_id=f"{source}_{chunk_counter}",
text=table_text,
source=source,
category=category,
section=section_header,
priority=priority,
content_types={"table"},
)
)
chunk_counter += 1
current_section = section_header
continue
# Check if adding this block exceeds target max
potential_len = len(current_chunk_text) + block_len + len(section_header) + 4
if potential_len > self.target_max_chars and len(current_chunk_text) >= self.target_min_chars:
# Flush current chunk - it's large enough
chunks.append(
self._create_chunk(
chunk_id=f"{source}_{chunk_counter}",
text=current_chunk_text.strip(),
source=source,
category=category,
section=current_section,
priority=priority,
content_types=current_content_types,
)
)
chunk_counter += 1
# Start new chunk with section header
current_chunk_text = f"{section_header}\n\n"
current_content_types = set()
current_section = section_header
# Add section header if starting fresh or new section
if not current_chunk_text.strip():
current_chunk_text = f"{section_header}\n\n"
current_section = section_header
elif section_header != current_section and section_header not in current_chunk_text:
# Add new section header inline for context
current_chunk_text += f"\n{section_header}\n\n"
current_chunk_text += block_text + "\n\n"
current_content_types.add(block_type)
# Flush remaining content (regardless of size - it's the end)
if current_chunk_text.strip():
chunks.append(
self._create_chunk(
chunk_id=f"{source}_{chunk_counter}",
text=current_chunk_text.strip(),
source=source,
category=category,
section=current_section,
priority=priority,
content_types=current_content_types,
)
)
return chunks
def _split_by_headers(self, text: str) -> list[tuple[str, str]]:
"""Split document by markdown headers (## and ###).
Returns list of (header, content) tuples.
"""
# Match ## or ### headers
header_pattern = r"^(#{2,3}\s+.+)$"
lines = text.split("\n")
sections = []
current_header = "Introduction"
current_content = []
for line in lines:
if re.match(header_pattern, line):
# Save previous section
if current_content:
sections.append((current_header, "\n".join(current_content)))
current_header = line.strip()
current_content = []
else:
current_content.append(line)
# Save final section
if current_content:
sections.append((current_header, "\n".join(current_content)))
return sections
def _split_into_blocks(self, text: str) -> list[tuple[str, str]]:
"""Split section content into blocks (paragraphs, tables, lists).
Returns list of (block_text, block_type) tuples.
"""
blocks = []
lines = text.split("\n")
current_block = []
current_type = "narrative"
in_table = False
for line in lines:
# Detect table start/end
if line.strip().startswith("|") and "|" in line[1:]:
if not in_table:
# Flush current block
if current_block:
block_text = "\n".join(current_block).strip()
if block_text:
blocks.append((block_text, current_type))
current_block = []
in_table = True
current_type = "table"
current_block.append(line)
elif in_table:
# Table ended
block_text = "\n".join(current_block).strip()
if block_text:
blocks.append((block_text, "table"))
current_block = [line] if line.strip() else []
in_table = False
current_type = "narrative"
elif line.strip().startswith(("- ", "* ", "1. ", "2. ", "3. ")):
# List item
if current_type != "list" and current_block:
block_text = "\n".join(current_block).strip()
if block_text:
blocks.append((block_text, current_type))
current_block = []
current_type = "list"
current_block.append(line)
elif line.strip() == "" and current_block:
# Paragraph break
if not in_table:
block_text = "\n".join(current_block).strip()
if block_text:
blocks.append((block_text, current_type))
current_block = []
current_type = "narrative"
else:
if current_type == "list" and not line.strip().startswith(
("- ", "* ", " ")
):
# End of list
block_text = "\n".join(current_block).strip()
if block_text:
blocks.append((block_text, "list"))
current_block = []
current_type = "narrative"
current_block.append(line)
# Flush remaining
if current_block:
block_text = "\n".join(current_block).strip()
if block_text:
blocks.append((block_text, current_type))
return blocks
def _create_chunk(
self,
chunk_id: str,
text: str,
source: str,
category: str,
section: str,
priority: str,
content_types: set[str],
) -> Chunk:
"""Create a Chunk object with extracted keywords."""
# Determine primary content type
if "table" in content_types:
content_type = "table"
elif "list" in content_types and "narrative" in content_types:
content_type = "mixed"
elif "list" in content_types:
content_type = "list"
else:
content_type = "narrative"
# Extract keywords from text
keywords = self._extract_keywords(text)
return Chunk(
id=chunk_id,
text=text,
source=source,
category=category,
section=section,
priority=priority,
content_type=content_type,
keywords=keywords,
)
def _extract_keywords(self, text: str) -> list[str]:
"""Extract relevant keywords from chunk text."""
# Domain-specific keywords to look for
domain_terms = [
# Zone classifications
"burn zone",
"near-field",
"far-field",
# Condition levels
"background",
"light",
"moderate",
"heavy",
"structural damage",
# Dispositions
"no action",
"clean",
"evaluate",
"remove",
"remove/repair",
# Materials
"soot",
"char",
"ash",
"particulate",
"aciniform",
# Thresholds
"lead",
"cadmium",
"arsenic",
"metals",
"µg/100cm²",
"cts/cm²",
# Facility types
"operational",
"non-operational",
"public",
"childcare",
# Standards
"ach",
"nadca",
"epa",
"hud",
"osha",
# Sampling
"sampling",
"wipe",
"bulk",
"air",
"clearance",
# Lab methods
"plm",
"icp-ms",
"xrf",
"tapelift",
# Actions
"hepa",
"vacuum",
"deodorization",
"encapsulation",
]
text_lower = text.lower()
found_keywords = []
for term in domain_terms:
if term in text_lower:
found_keywords.append(term)
return found_keywords[:10] # Limit to top 10
def chunk_file(
filepath: Path,
category: Literal[
"methodology",
"thresholds",
"lab-methods",
"cleaning-procedures",
"wildfire",
"safety",
],
priority: Literal["primary", "reference-threshold", "reference-narrative"],
) -> list[Chunk]:
"""Convenience function to chunk a markdown file.
Args:
filepath: Path to markdown file
category: Document category
priority: Document priority level
Returns:
List of Chunk objects
"""
chunker = SemanticChunker()
text = filepath.read_text(encoding="utf-8")
return chunker.chunk_document(
text=text,
source=filepath.name,
category=category,
priority=priority,
)