sadickam's picture
feat: enhance chunking and extraction processes with debug artifact handling and function name corrections
3a4f3dc
"""Pydantic models for structure-aware document chunking.
This module defines the data models used throughout the chunking pipeline.
These models provide type-safe, validated representations of document chunks
and chunking configuration parameters.
Models:
- Chunk: Represents a single chunk of document content with metadata
- ChunkingConfig: Configuration parameters for the chunking process
- TextNormalizer: Utility class for normalizing extracted text
Design Principles:
- All models use Pydantic v2 for validation and serialization
- Comprehensive validation ensures data integrity
- Text normalization handles common PDF extraction artifacts
- JSON/JSONL serialization is optimized for storage and transfer
Lazy Loading:
Pydantic is a lightweight dependency that loads quickly. No lazy loading
is needed for this module.
Example:
-------
>>> from rag_chatbot.chunking.models import Chunk, ChunkingConfig
>>> config = ChunkingConfig(min_tokens=450, max_tokens=700)
>>> chunk = Chunk(
... chunk_id="doc1_chunk_001",
... text="The PMV model predicts thermal sensation...",
... heading_path=["H1: Thermal Comfort", "H2: PMV Model"],
... source="ashrae_55.pdf",
... page=5,
... start_char=1024,
... end_char=2048,
... token_count=156,
... )
>>> print(chunk.chunk_hash) # Auto-generated
'a3f2b1c4d5e6f789'
"""
from __future__ import annotations
import hashlib
import re
from typing import TYPE_CHECKING, Any
from pydantic import (
BaseModel,
ConfigDict,
Field,
field_validator,
model_validator,
)
# =============================================================================
# Type Checking Imports
# =============================================================================
# These imports are only processed by type checkers (mypy, pyright) and IDEs.
# They enable proper type hints without runtime overhead.
# =============================================================================
if TYPE_CHECKING:
from types import ModuleType
from typing import Self
# =============================================================================
# Module Exports
# =============================================================================
__all__: list[str] = [
"Chunk",
"ChunkingConfig",
"TextNormalizer",
"THERMAL_COMFORT_TERMS",
]
# =============================================================================
# Constants
# =============================================================================
# Domain-specific vocabulary for thermal comfort terminology.
# This dictionary maps common variations and OCR errors to their correct forms.
# Used by TextNormalizer to fix jumbled or incorrectly extracted terms.
# The dictionary includes:
# - Standard thermal comfort indices (PMV, PPD, SET, etc.)
# - ASHRAE terminology and standards references
# - Physical units and measurement terms (clo, met, etc.)
# - Common thermal comfort parameters
# - pythermalcomfort library function names
THERMAL_COMFORT_TERMS: dict[str, str] = {
# Thermal comfort indices and models
"p m v": "PMV",
"pmv": "PMV",
"p p d": "PPD",
"ppd": "PPD",
"p m v - p p d": "PMV-PPD",
"pmv-ppd": "PMV-PPD",
"pmv ppd": "PMV-PPD",
"s e t": "SET",
"set*": "SET*",
"s e t *": "SET*",
"u t c i": "UTCI",
"utci": "UTCI",
"p h s": "PHS",
"phs": "PHS",
"a t h b": "ATHB",
"athb": "ATHB",
"adaptive": "adaptive",
"a d a p t i v e": "adaptive",
# Standards and organizations
"a s h r a e": "ASHRAE",
"ashrae": "ASHRAE",
"ashrae 55": "ASHRAE 55",
"ashrae-55": "ASHRAE 55",
"a s h r a e 55": "ASHRAE 55",
"i s o": "ISO",
"iso": "ISO",
"i s o 7730": "ISO 7730",
"iso 7730": "ISO 7730",
"iso-7730": "ISO 7730",
"i s o 7243": "ISO 7243",
"iso 7243": "ISO 7243",
"e n 15251": "EN 15251",
"en 15251": "EN 15251",
"en-15251": "EN 15251",
"e n 16798": "EN 16798",
"en 16798": "EN 16798",
"en-16798": "EN 16798",
# Physical units and measurements
"c l o": "clo",
"m e t": "met",
"w / m 2": "W/m2",
"w/m2": "W/m2",
"w / m^2": "W/m2",
"w/m^2": "W/m2",
"m / s": "m/s",
"k p a": "kPa",
"kpa": "kPa",
"p a": "Pa",
"deg c": "degC",
"deg f": "degF",
"deg k": "degK",
# Thermal comfort parameters
"t a": "ta", # air temperature
"t r": "tr", # radiant temperature
"t o": "to", # operative temperature
"t o p": "top", # operative temperature
"t m r t": "tmrt", # mean radiant temperature
"tmrt": "tmrt",
"t mrt": "tmrt",
"m r t": "MRT", # mean radiant temperature
"mrt": "MRT",
"r h": "RH", # relative humidity
"v e l": "vel", # velocity
"v a": "va", # air velocity
"v r": "vr", # relative air velocity
# pythermalcomfort specific terms
"pythermalcomfort": "pythermalcomfort",
"p y t h e r m a l c o m f o r t": "pythermalcomfort",
"thermal comfort": "thermal comfort",
"ther mal com fort": "thermal comfort",
"ther mal": "thermal",
"com fort": "comfort",
# Common function names from the library (preserving underscores)
"pmv_ppd": "pmv_ppd",
"adaptive_ashrae": "adaptive_ashrae",
"adaptive_en": "adaptive_en",
"clo_dynamic": "clo_dynamic",
"cooling_effect": "cooling_effect",
"set_tmp": "set_tmp",
"solar_gain": "solar_gain",
"use_fans_heatwaves": "use_fans_heatwaves",
"wbgt": "WBGT",
"w b g t": "WBGT",
"heat_index": "heat_index",
"humidex": "humidex",
"net": "NET",
"n e t": "NET",
"at": "AT", # apparent temperature
"a t": "AT",
"wind_chill": "wind_chill",
"phs_model": "phs_model",
"two_nodes": "two_nodes",
"solar_altitude": "solar_altitude",
"mean_radiant_temperature": "mean_radiant_temperature",
# =========================================================================
# pythermalcomfort function name corrections
# Maps concatenated versions (without underscores) to correct snake_case
# This handles cases where PDF extraction strips underscores from names.
# =========================================================================
# Models - PMV/PPD variants
"pmvppdashrae": "pmv_ppd_ashrae",
"pmvppdiso": "pmv_ppd_iso",
"pmvathb": "pmv_athb",
"pmva": "pmv_a",
"pmve": "pmv_e",
# Models - Adaptive comfort
"adaptiveashrae": "adaptive_ashrae",
"adaptiveen": "adaptive_en",
# Models - Two-node models
"twonodesgagge": "two_nodes_gagge",
"twonodesgaggesleep": "two_nodes_gagge_sleep",
"twonodesgaggeji": "two_nodes_gagge_ji",
# Models - Heat indices
"heatindexlu": "heat_index_lu",
"heatindexrothfusz": "heat_index_rothfusz",
"discomfortindex": "discomfort_index",
# Models - Other thermal indices
"petsteady": "pet_steady",
"settmp": "set_tmp",
"coolingeffect": "cooling_effect",
"solargain": "solar_gain",
"usefansheatwaves": "use_fans_heatwaves",
"verticaltmpgradppd": "vertical_tmp_grad_ppd",
"ankledraft": "ankle_draft",
"clotout": "clo_tout",
# Models - Work capacity
"workcapacitydunne": "work_capacity_dunne",
"workcapacityhothaps": "work_capacity_hothaps",
"workcapacityiso": "work_capacity_iso",
"workcapacityniosh": "work_capacity_niosh",
# Models - Wind chill
"windchilltemperature": "wind_chill_temperature",
# Utilities - Temperature and psychrometrics
"runningmeanoutdoortemperature": "running_mean_outdoor_temperature",
"meanradianttmp": "mean_radiant_tmp",
"operativetmp": "operative_tmp",
"dewpointtmp": "dew_point_tmp",
"wetbulbtmp": "wet_bulb_tmp",
"enthalpyair": "enthalpy_air",
"bodysurfacearea": "body_surface_area",
"psytarh": "psy_ta_rh",
"vrelative": "v_relative",
"unitsconverter": "units_converter",
# Utilities - Clothing functions
"clodynamicashrae": "clo_dynamic_ashrae",
"clodynamiciso": "clo_dynamic_iso",
"cloinsulationairlayer": "clo_insulation_air_layer",
"cloareafactor": "clo_area_factor",
"clocorrectionfactorenvironment": "clo_correction_factor_environment",
"clointrinsicinsulatioensemble": "clo_intrinsic_insulation_ensemble",
"clototalinsulation": "clo_total_insulation",
"clotypicalensembles": "clo_typical_ensembles",
"cloindividualgarments": "clo_individual_garments",
"mettypicaltasks": "met_typical_tasks",
}
# Regex pattern for detecting ALL CAPS words (3+ consecutive capital letters)
# Used for heading normalization
_ALL_CAPS_PATTERN: re.Pattern[str] = re.compile(r"\b([A-Z]{3,})\b")
# Regex pattern for detecting mid-word spaces (common OCR artifact)
# Matches single letters separated by spaces: "t h e r m a l" -> "thermal"
_JUMBLED_WORD_PATTERN: re.Pattern[str] = re.compile(r"\b([a-zA-Z])\s+(?=[a-zA-Z]\b)")
# Regex pattern for fixing missing spaces after punctuation
# Matches period/comma/semicolon followed immediately by a letter
_MISSING_SPACE_PATTERN: re.Pattern[str] = re.compile(r"([.!?,;:])([A-Za-z])")
# Regex pattern for multiple consecutive whitespace characters
_MULTI_WHITESPACE_PATTERN: re.Pattern[str] = re.compile(r"[ \t]+")
# Regex pattern for multiple consecutive newlines (more than 2)
_MULTI_NEWLINE_PATTERN: re.Pattern[str] = re.compile(r"\n{3,}")
# Threshold for considering text as "predominantly uppercase" (80%)
# Used in capitalization normalization to detect ALL CAPS text
_UPPERCASE_THRESHOLD: float = 0.8
# Minimum length for an acronym (e.g., "PMV" has length 3, "AT" has length 2)
_MIN_ACRONYM_LENGTH: int = 2
# Minimum length of a word to consider for segmentation
# Words shorter than this are unlikely to be concatenated
# Set to 12 to catch common OCR errors like "conditionsthat" (14 chars)
_MIN_SEGMENT_WORD_LENGTH: int = 12
# Maximum length of a single word in English (reasonable limit)
# Words longer than this are almost certainly concatenated
_MAX_SINGLE_WORD_LENGTH: int = 25
# Minimum length for a valid word segment
# Segments shorter than this are likely errors (e.g., single letters)
_MIN_VALID_SEGMENT_LENGTH: int = 2
# Regex pattern to match HTML comments (e.g., <!-- Page 4 -->)
# These are added by ExtractedDocument.to_markdown() to mark page boundaries
# but should be stripped before creating chunks for embedding
_HTML_COMMENT_PATTERN: re.Pattern[str] = re.compile(r"<!--.*?-->", re.DOTALL)
# Technical terms that should NOT be segmented
# These are valid compound words or domain-specific terms
#
# IMPORTANT: This list includes pythermalcomfort function names in their
# concatenated form (without underscores) because PDF extraction sometimes
# strips underscores. When a word like "pmvppdashrae" is encountered, it
# should NOT be segmented into "pmv ppd ashrae" - instead, it should be
# preserved so that downstream processing or the LLM can recognise it as
# a function name variant.
#
# The function names are extracted from:
# pythermalcomfort-readthedocs-io-en-latest.pdf (official documentation)
_PROTECTED_TERMS: frozenset[str] = frozenset(
{
# General technical terms
"pythermalcomfort",
"thermalcomfort",
"metabolicrate",
"ashrae",
"coefficient",
"coefficients",
"environmental",
"physiological",
"temperature",
"temperatures",
# =====================================================================
# pythermalcomfort.models function names (concatenated, lowercase)
# These protect against incorrect segmentation of function names
# when underscores are stripped during PDF extraction.
# =====================================================================
"adaptiveashrae", # adaptive_ashrae
"adaptiveen", # adaptive_en
"ankledraft", # ankle_draft
"clotout", # clo_tout
"coolingeffect", # cooling_effect
"discomfortindex", # discomfort_index
"twonodesgagge", # two_nodes_gagge
"twonodesgaggesleep", # two_nodes_gagge_sleep
"twonodesgaggeji", # two_nodes_gagge_ji
"heatindexlu", # heat_index_lu
"heatindexrothfusz", # heat_index_rothfusz
"petsteady", # pet_steady
"pmvppdiso", # pmv_ppd_iso
"pmvppdashrae", # pmv_ppd_ashrae
"pmvathb", # pmv_athb
"solargain", # solar_gain
"settmp", # set_tmp
"usefansheatwaves", # use_fans_heatwaves
"verticaltmpgradppd", # vertical_tmp_grad_ppd
"windchilltemperature", # wind_chill_temperature
"workcapacitydunne", # work_capacity_dunne
"workcapacityhothaps", # work_capacity_hothaps
"workcapacityiso", # work_capacity_iso
"workcapacityniosh", # work_capacity_niosh
# =====================================================================
# pythermalcomfort.utilities function names (concatenated, lowercase)
# =====================================================================
"runningmeanoutdoortemperature", # running_mean_outdoor_temperature
"vrelative", # v_relative
"clodynamicashrae", # clo_dynamic_ashrae
"clodynamiciso", # clo_dynamic_iso
"bodysurfacearea", # body_surface_area
"dewpointtmp", # dew_point_tmp
"enthalpyair", # enthalpy_air
"meanradianttmp", # mean_radiant_tmp
"operativetmp", # operative_tmp
"psytarh", # psy_ta_rh
"psat", # p_sat
"fsvv", # f_svv
"unitsconverter", # units_converter
"wetbulbtmp", # wet_bulb_tmp
"cloinsulationairlayer", # clo_insulation_air_layer
"cloareafactor", # clo_area_factor
"clocorrectionfactorenvironment", # clo_correction_factor_environment
"clointrinsicinsulatioensemble", # clo_intrinsic_insulation_ensemble
"clototalinsulation", # clo_total_insulation
"clotypicalensembles", # clo_typical_ensembles
"cloindividualgarments", # clo_individual_garments
"mettypicaltasks", # met_typical_tasks
}
)
# =============================================================================
# Lazy Loading for wordsegment
# =============================================================================
# The wordsegment library is used for detecting and segmenting concatenated
# words from OCR/PDF extraction errors. It is lazily loaded to avoid import
# overhead when not needed.
# =============================================================================
# Lazy-loaded wordsegment module
_wordsegment_module: ModuleType | None = None
_wordsegment_loaded: bool = False
def _get_wordsegment() -> ModuleType:
"""Lazily load and initialize the wordsegment module.
This function loads the wordsegment library on first use and initializes
its word frequency data. Subsequent calls return the cached module.
Returns:
-------
The initialized wordsegment module.
Note:
----
The wordsegment.load() call must happen once before using segment().
This function handles that initialization automatically.
"""
global _wordsegment_module, _wordsegment_loaded # noqa: PLW0603
if not _wordsegment_loaded:
import wordsegment
wordsegment.load() # Load word frequency data
_wordsegment_module = wordsegment
_wordsegment_loaded = True
# At this point _wordsegment_module is guaranteed to be set
assert _wordsegment_module is not None
return _wordsegment_module
# =============================================================================
# Text Normalization
# =============================================================================
class TextNormalizer:
"""Utility class for normalizing extracted text from PDF documents.
PDF extraction often introduces artifacts such as:
- Extra whitespace from column layouts
- Mid-word spaces from OCR errors (e.g., "ther mal" instead of "thermal")
- ALL CAPS headings that should be title case
- Missing spaces after punctuation
This class provides methods to fix these common issues while preserving
the semantic content of the text.
The normalizer uses a domain-specific dictionary of thermal comfort terms
to correctly handle specialized vocabulary that might be incorrectly
extracted by OCR or PDF parsing.
Attributes:
----------
domain_terms : dict[str, str]
Dictionary mapping incorrect/jumbled terms to their correct forms.
Defaults to THERMAL_COMFORT_TERMS if not provided.
Example:
-------
>>> normalizer = TextNormalizer()
>>> text = "The ther mal com fort index"
>>> normalizer.normalize(text)
'The thermal comfort index'
Note:
----
Normalization is applied in a specific order to avoid conflicts:
1. Whitespace normalization (basic cleanup)
2. Jumbled word fixing (domain-specific)
3. Capitalization fixing (headings only)
4. Sentence spacing (punctuation)
"""
def __init__(
self,
domain_terms: dict[str, str] | None = None,
) -> None:
"""Initialize the text normalizer with optional domain terms.
Args:
----
domain_terms: Optional dictionary mapping incorrect terms to their
correct forms. If not provided, uses THERMAL_COMFORT_TERMS.
Example:
-------
>>> normalizer = TextNormalizer()
>>> custom_normalizer = TextNormalizer({"cust om": "custom"})
"""
# Use provided domain terms or fall back to thermal comfort vocabulary
self.domain_terms: dict[str, str] = (
domain_terms if domain_terms is not None else THERMAL_COMFORT_TERMS
)
# Pre-compile patterns for domain terms for efficient matching
# Sort by length (longest first) to match longer phrases before shorter ones
self._term_patterns: list[tuple[re.Pattern[str], str]] = []
for incorrect, correct in sorted(
self.domain_terms.items(),
key=lambda x: len(x[0]),
reverse=True,
):
# Use word boundaries to avoid partial matches
# Case-insensitive matching for flexibility
pattern = re.compile(
r"\b" + re.escape(incorrect) + r"\b",
re.IGNORECASE,
)
self._term_patterns.append((pattern, correct))
def normalize_whitespace(self, text: str) -> str:
r"""Normalize whitespace in the text.
This method:
- Replaces multiple spaces/tabs with a single space
- Normalizes line endings to Unix-style (LF)
- Reduces multiple blank lines to at most two
- Strips leading/trailing whitespace
Args:
----
text: The text to normalize.
Returns:
-------
Text with normalized whitespace.
Example:
-------
>>> normalizer = TextNormalizer()
>>> text = "Hello world\r\n\r\n\r\nTest"
>>> normalizer.normalize_whitespace(text)
'Hello world\n\nTest'
"""
# Return early for empty strings
if not text:
return text
# Normalize line endings: CRLF -> LF, CR -> LF
result = text.replace("\r\n", "\n").replace("\r", "\n")
# Replace multiple spaces/tabs with single space (preserve newlines)
result = _MULTI_WHITESPACE_PATTERN.sub(" ", result)
# Reduce multiple newlines (3+) to double newlines (paragraph break)
result = _MULTI_NEWLINE_PATTERN.sub("\n\n", result)
# Strip leading/trailing whitespace
return result.strip()
def normalize_jumbled_words(self, text: str) -> str:
"""Fix mid-word spaces caused by OCR or PDF extraction.
This method uses the domain dictionary to fix common jumbled words
in thermal comfort terminology. It handles cases like:
- "ther mal" -> "thermal"
- "p m v" -> "PMV"
- "pythermalcomfort" (various broken forms)
Args:
----
text: The text containing potentially jumbled words.
Returns:
-------
Text with jumbled words fixed according to the domain dictionary.
Example:
-------
>>> normalizer = TextNormalizer()
>>> text = "Calculate p m v using pythermalcomfort"
>>> normalizer.normalize_jumbled_words(text)
'Calculate PMV using pythermalcomfort'
Note:
----
This method only fixes terms that are in the domain dictionary.
Unknown jumbled words are left unchanged to avoid false corrections.
"""
if not text:
return text
result = text
# Apply domain-specific corrections using pre-compiled patterns
for pattern, replacement in self._term_patterns:
result = pattern.sub(replacement, result)
return result
def normalize_capitalization(self, text: str, is_heading: bool = False) -> str:
"""Normalize capitalization, especially for ALL CAPS text.
For headings, this method converts ALL CAPS text to Title Case
while preserving known acronyms (PMV, ASHRAE, etc.) from the
domain dictionary.
For regular text, only obvious ALL CAPS sentences are converted.
Args:
----
text: The text to normalize.
is_heading: If True, apply more aggressive title case conversion.
Returns:
-------
Text with normalized capitalization.
Example:
-------
>>> normalizer = TextNormalizer()
>>> normalizer.normalize_capitalization("THERMAL COMFORT", is_heading=True)
'Thermal Comfort'
>>> normalizer.normalize_capitalization("PMV INDEX", is_heading=True)
'PMV Index'
Note:
----
Acronyms from the domain dictionary are preserved in their
correct form (e.g., "PMV" stays as "PMV", not "Pmv").
"""
if not text:
return text
# For non-headings, only process if the entire text is uppercase
if not is_heading:
# Check if text is predominantly uppercase (>80% capital letters)
alpha_chars = [c for c in text if c.isalpha()]
if not alpha_chars:
return text
upper_ratio = sum(1 for c in alpha_chars if c.isupper()) / len(alpha_chars)
if upper_ratio < _UPPERCASE_THRESHOLD:
return text
# Convert to title case
result = text.title()
# Restore known acronyms from domain dictionary
# These should remain in their original uppercase form
acronyms = {
v
for v in self.domain_terms.values()
if v.isupper() and len(v) >= _MIN_ACRONYM_LENGTH
}
for acronym in acronyms:
# Replace title-cased version with correct acronym
title_version = acronym.title()
if title_version in result:
result = result.replace(title_version, acronym)
return result
def normalize_sentences(self, text: str) -> str:
"""Fix missing spaces after punctuation marks.
PDF extraction sometimes loses spaces after periods, commas,
and other punctuation marks. This method adds them back.
Args:
----
text: The text with potential missing spaces.
Returns:
-------
Text with proper spacing after punctuation.
Example:
-------
>>> normalizer = TextNormalizer()
>>> normalizer.normalize_sentences("First sentence.Second sentence")
'First sentence. Second sentence'
Note:
----
This only adds spaces after punctuation when followed directly
by a letter. It won't affect abbreviations like "Dr." when
properly spaced.
"""
if not text:
return text
# Add space after punctuation marks when followed by a letter
result = _MISSING_SPACE_PATTERN.sub(r"\1 \2", text)
return result
def segment_concatenated_words(self, text: str) -> str:
"""Segment concatenated words that are missing spaces.
Uses the wordsegment library to detect and split words that
appear to be concatenated due to OCR or PDF extraction errors.
Examples:
--------
- "conditionsthat" -> "conditions that"
- "theenvironment" -> "the environment"
Args:
----
text: The text containing potential concatenated words.
Returns:
-------
Text with concatenated words segmented.
Note:
----
- Only processes words longer than _MIN_SEGMENT_WORD_LENGTH
- Preserves protected terms from _PROTECTED_TERMS
- Preserves words in the domain dictionary
"""
if not text:
return text
ws = _get_wordsegment()
# Split into words while preserving punctuation and whitespace
words = text.split()
result_words: list[str] = []
# Define punctuation characters to strip (avoid escaping issues)
punct_chars = ".,;:!?()[]{}\"'"
for word in words:
# Strip punctuation for checking, but preserve it
stripped = word.strip(punct_chars)
punct_start = word[: len(word) - len(word.lstrip(punct_chars))]
punct_end = word[len(word.rstrip(punct_chars)) :]
# Skip short words
if len(stripped) < _MIN_SEGMENT_WORD_LENGTH:
result_words.append(word)
continue
# Skip words containing underscores - these are Python identifiers
# (e.g., pmv_ppd_ashrae, clo_dynamic_iso) that should be preserved
# exactly as-is. Underscores in function names are intentional and
# segmenting them would corrupt the identifier.
if "_" in stripped:
result_words.append(word)
continue
# Skip protected terms (case-insensitive)
if stripped.lower() in _PROTECTED_TERMS:
result_words.append(word)
continue
# Skip words in domain dictionary (case-insensitive)
if stripped.lower() in {k.lower() for k in self.domain_terms}:
result_words.append(word)
continue
if stripped.lower() in {v.lower() for v in self.domain_terms.values()}:
result_words.append(word)
continue
# Skip words that look like they're already properly spaced
# (contain uppercase in the middle, suggesting camelCase/acronyms)
if any(c.isupper() for c in stripped[1:-1] if c.isalpha()):
result_words.append(word)
continue
# Segment the word using wordsegment library
segments: list[str] = ws.segment(stripped.lower())
# Only accept segmentation if it produces multiple reasonable words
if len(segments) > 1 and all(
len(s) >= _MIN_VALID_SEGMENT_LENGTH for s in segments
):
# Preserve original capitalization of first letter if uppercase
if stripped[0].isupper():
segments[0] = segments[0].capitalize()
segmented = " ".join(segments)
result_words.append(punct_start + segmented + punct_end)
else:
result_words.append(word)
return " ".join(result_words)
def strip_html_comments(self, text: str) -> str:
"""Remove HTML comments from text.
PDF extraction adds HTML comments like `<!-- Page 4 -->` to mark
page boundaries. These should be stripped before creating chunks
for embedding, as they add noise without semantic value.
Args:
----
text: The text potentially containing HTML comments.
Returns:
-------
Text with all HTML comments removed.
Example:
-------
>>> normalizer = TextNormalizer()
>>> text = "Hello <!-- Page 1 --> world <!-- Page 2 -->"
>>> normalizer.strip_html_comments(text)
'Hello world '
Note:
----
This method removes all HTML comments, not just page markers.
The surrounding whitespace may need cleanup after removal,
which is handled by subsequent whitespace normalization.
"""
if not text:
return text
return _HTML_COMMENT_PATTERN.sub("", text)
def normalize(self, text: str, is_heading: bool = False) -> str:
"""Apply all normalizations to the text.
This is the main method that combines all normalization steps
in the correct order:
0. HTML comment stripping (remove page markers)
1. Whitespace normalization (basic cleanup)
2. Jumbled word fixing (domain-specific corrections)
2.5. Concatenated word segmentation (OCR missing spaces)
3. Sentence spacing (punctuation fixes)
4. Capitalization (if needed for headings)
Args:
----
text: The text to normalize.
is_heading: If True, apply heading-specific normalization
(e.g., title case for ALL CAPS).
Returns:
-------
Fully normalized text.
Example:
-------
>>> normalizer = TextNormalizer()
>>> text = "The ther mal com fort index.Is used for"
>>> normalizer.normalize(text)
'The thermal comfort index. Is used for'
"""
if not text:
return text
# Step 0: Strip HTML comments (page markers, etc.)
result = self.strip_html_comments(text)
# Step 1: Normalize whitespace first (basic cleanup)
result = self.normalize_whitespace(result)
# Step 2: Fix jumbled words using domain dictionary
result = self.normalize_jumbled_words(result)
# Step 2.5: Segment concatenated words (missing spaces from OCR)
result = self.segment_concatenated_words(result)
# Step 3: Fix missing spaces after punctuation
result = self.normalize_sentences(result)
# Step 4: Normalize capitalization (especially for headings)
if is_heading:
result = self.normalize_capitalization(result, is_heading=True)
return result
# =============================================================================
# Data Models
# =============================================================================
class Chunk(BaseModel):
"""Represent a single chunk of document content with metadata.
A chunk is a semantically meaningful portion of a document that is
suitable for embedding and retrieval. Each chunk contains:
- The text content itself
- Heading hierarchy for context (e.g., ["H1: Chapter", "H2: Section"])
- Source file and page information
- Character offsets for traceability
- Token count for size management
- Content hash for deduplication
The chunk model is designed to be serialized to JSONL format for
storage and loading during the retrieval pipeline.
Attributes:
----------
chunk_id : str
Unique identifier for the chunk within the corpus.
Typically formatted as "{source}_{index}" for traceability.
text : str
The actual text content of the chunk. This is the content
that will be embedded and retrieved.
heading_path : list[str]
Hierarchical list of headings providing context.
Format: ["H1: Title", "H2: Section", "H3: Subsection"]
Empty list if no heading hierarchy is available.
source : str
The original filename or source document identifier.
Used for citation and source attribution.
page : int
The 1-indexed page number where this chunk originates.
Must be >= 1 as PDF pages are conventionally numbered from 1.
start_char : int
Starting character offset in the source document.
Must be >= 0.
end_char : int
Ending character offset in the source document (exclusive).
Must be > start_char.
token_count : int
Approximate number of tokens in the chunk text.
Used for managing chunk sizes during retrieval.
chunk_hash : str
SHA-256 hash of the text content (first 16 characters).
Auto-generated if not provided. Used for deduplication.
Example:
-------
>>> chunk = Chunk(
... chunk_id="ashrae55_001",
... text="The PMV model predicts thermal sensation...",
... heading_path=["H1: Thermal Comfort", "H2: PMV Model"],
... source="ashrae_55.pdf",
... page=5,
... start_char=1024,
... end_char=2048,
... token_count=156,
... )
>>> chunk.chunk_hash # Auto-generated
'a3f2b1c4d5e6f789'
>>> chunk.text_preview(50)
'The PMV model predicts thermal sensation...'
Note:
----
The chunk_hash is automatically generated from the text content
if not explicitly provided. This enables efficient deduplication
and change detection.
"""
# -------------------------------------------------------------------------
# Model Configuration
# -------------------------------------------------------------------------
# Configure Pydantic model behavior for serialization and validation
# -------------------------------------------------------------------------
model_config = ConfigDict(
# Allow population by field name or alias
populate_by_name=True,
# Validate default values during model creation
validate_default=True,
# Use enum values in serialization rather than enum objects
use_enum_values=True,
# Extra fields are forbidden to catch typos and ensure data integrity
extra="forbid",
# Enable JSON schema generation with examples
json_schema_extra={
"examples": [
{
"chunk_id": "ashrae55_001",
"text": "The PMV model predicts thermal sensation...",
"heading_path": ["H1: Thermal Comfort", "H2: PMV Model"],
"source": "ashrae_55.pdf",
"page": 5,
"start_char": 1024,
"end_char": 2048,
"token_count": 156,
"chunk_hash": "a3f2b1c4d5e6f789",
},
{
"chunk_id": "iso7730_042",
"text": "The PPD index represents the percentage...",
"heading_path": ["H1: ISO 7730", "H2: PPD Calculation"],
"source": "iso_7730.pdf",
"page": 12,
"start_char": 5120,
"end_char": 6144,
"token_count": 189,
"chunk_hash": "b4e3c2d1f5a6e8c7",
},
]
},
)
# -------------------------------------------------------------------------
# Fields
# -------------------------------------------------------------------------
chunk_id: str = Field(
..., # Required field (no default)
min_length=1, # Must not be empty
description="Unique identifier for the chunk within the corpus",
examples=["ashrae55_001", "iso7730_042", "guide_chapter2_015"],
)
text: str = Field(
..., # Required field
min_length=1, # Must not be empty
description="The text content of the chunk",
examples=["The PMV model predicts thermal sensation based on..."],
)
heading_path: list[str] = Field(
default_factory=list,
description="Hierarchical list of headings providing context",
examples=[["H1: Thermal Comfort", "H2: PMV Model", "H3: Calculation"]],
)
source: str = Field(
..., # Required field
min_length=1, # Must not be empty
description="Original filename or source document identifier",
examples=["ashrae_55.pdf", "iso_7730.pdf", "pythermalcomfort_guide.pdf"],
)
page: int = Field(
..., # Required field
ge=1, # Must be >= 1 (1-indexed page numbers)
description="1-indexed page number where this chunk originates",
examples=[1, 5, 42],
)
start_char: int = Field(
..., # Required field
ge=0, # Must be >= 0
description="Starting character offset in the source document",
examples=[0, 1024, 5120],
)
end_char: int = Field(
..., # Required field
gt=0, # Must be > 0 (will be validated further against start_char)
description="Ending character offset in the source document (exclusive)",
examples=[512, 2048, 6144],
)
token_count: int = Field(
..., # Required field
ge=0, # Must be >= 0 (can be 0 for empty-ish chunks)
description="Approximate number of tokens in the chunk text",
examples=[100, 256, 512],
)
chunk_hash: str = Field(
default="", # Will be auto-generated in model_post_init
max_length=16, # SHA-256 truncated to 16 characters
description="SHA-256 hash of text content (first 16 chars) for deduplication",
examples=["a3f2b1c4d5e6f789", "b4e3c2d1f5a6e8c7"],
)
# -------------------------------------------------------------------------
# Validators
# -------------------------------------------------------------------------
@field_validator("text", mode="before")
@classmethod
def _normalize_text(cls, value: object) -> str:
"""Normalize text content and strip leading/trailing whitespace.
Args:
----
value: The input value to normalize.
Returns:
-------
Normalized string content.
Raises:
------
ValueError: If value is None or empty after stripping.
"""
if value is None:
msg = "text cannot be None"
raise ValueError(msg)
text = str(value).strip()
if not text:
msg = "text cannot be empty"
raise ValueError(msg)
return text
@field_validator("heading_path", mode="before")
@classmethod
def _ensure_heading_list(cls, value: object) -> list[str]:
"""Ensure heading_path is always a list of strings.
Args:
----
value: The input value to normalize.
Returns:
-------
List of heading strings.
"""
if value is None:
return []
if isinstance(value, str):
# Single heading provided as string
return [value] if value.strip() else []
if isinstance(value, list):
# Filter out empty strings and convert all to strings
return [str(h).strip() for h in value if str(h).strip()]
# Handle other iterables
try:
iterator = iter(value) # type: ignore[call-overload]
return [str(h).strip() for h in iterator if str(h).strip()]
except TypeError:
# Not iterable, wrap in list if non-empty
h_str = str(value).strip()
return [h_str] if h_str else []
@model_validator(mode="after")
def _validate_char_offsets(self) -> Self:
"""Validate that end_char is greater than start_char.
Returns
-------
The validated model instance.
Raises
------
ValueError: If end_char is not greater than start_char.
"""
if self.end_char <= self.start_char:
msg = (
f"end_char ({self.end_char}) must be greater than "
f"start_char ({self.start_char})"
)
raise ValueError(msg)
return self
# -------------------------------------------------------------------------
# Post-Initialization
# -------------------------------------------------------------------------
def model_post_init(self, __context: object) -> None:
"""Generate chunk_hash if not provided.
This method is called after the model is fully initialized.
It generates a SHA-256 hash of the text content (truncated to
16 characters) if the chunk_hash field was not explicitly set.
Args:
----
__context: Pydantic context object (unused but required by signature).
Note:
----
The hash is deterministic - the same text will always produce
the same hash, enabling deduplication across runs.
"""
# Only generate hash if not already set
if not self.chunk_hash:
# Generate SHA-256 hash of text content
text_bytes = self.text.encode("utf-8")
full_hash = hashlib.sha256(text_bytes).hexdigest()
# Use object.__setattr__ to bypass frozen model if needed
object.__setattr__(self, "chunk_hash", full_hash[:16])
# -------------------------------------------------------------------------
# Methods
# -------------------------------------------------------------------------
def to_jsonl_dict(self) -> dict[str, Any]:
"""Export the chunk to a dictionary suitable for JSONL serialization.
This method produces a flat dictionary representation that can be
written to a JSONL file. All values are JSON-serializable.
Returns:
-------
Dictionary with all chunk fields ready for JSON serialization.
Example:
-------
>>> chunk = Chunk(
... chunk_id="test_001",
... text="Example text",
... source="test.pdf",
... page=1,
... start_char=0,
... end_char=12,
... token_count=2,
... )
>>> data = chunk.to_jsonl_dict()
>>> data["chunk_id"]
'test_001'
Note:
----
This method is preferred over model_dump() for JSONL output
as it ensures consistent field ordering and formatting.
"""
return {
"chunk_id": self.chunk_id,
"text": self.text,
"heading_path": self.heading_path,
"source": self.source,
"page": self.page,
"start_char": self.start_char,
"end_char": self.end_char,
"token_count": self.token_count,
"chunk_hash": self.chunk_hash,
}
def text_preview(self, max_length: int = 100) -> str:
"""Get a truncated preview of the chunk text.
This method returns the first `max_length` characters of the
text, with an ellipsis appended if the text was truncated.
Args:
----
max_length: Maximum number of characters to include.
Defaults to 100.
Returns:
-------
Truncated text with ellipsis if needed.
Example:
-------
>>> chunk = Chunk(
... chunk_id="test_001",
... text="This is a very long text that needs truncation",
... source="test.pdf",
... page=1,
... start_char=0,
... end_char=47,
... token_count=10,
... )
>>> chunk.text_preview(20)
'This is a very lo...'
"""
if len(self.text) <= max_length:
return self.text
# Truncate and add ellipsis
return self.text[: max_length - 3] + "..."
class ChunkingConfig(BaseModel):
"""Configuration parameters for the document chunking process.
This model defines the parameters that control how documents are
split into chunks. The parameters balance several concerns:
- Chunk size (min/max tokens)
- Context overlap between chunks
- Preservation of natural text boundaries
The default values are tuned for thermal comfort documentation
and the BGE embedding model used in this pipeline.
Attributes:
----------
min_tokens : int
Minimum number of tokens per chunk. Chunks smaller than
this will be merged with adjacent content. Default: 450.
max_tokens : int
Maximum number of tokens per chunk. Content exceeding
this limit will be split. Default: 700.
overlap_percent : float
Percentage of max_tokens to overlap between consecutive
chunks. Helps maintain context across chunk boundaries.
Must be between 0.0 and 1.0. Default: 0.12 (12%).
preserve_sentences : bool
If True, avoid splitting in the middle of sentences.
Default: True.
preserve_paragraphs : bool
If True, prefer paragraph boundaries as split points.
Default: True.
Example:
-------
>>> config = ChunkingConfig(min_tokens=400, max_tokens=600)
>>> config.calculate_overlap_tokens()
72
>>> config = ChunkingConfig(overlap_percent=0.15)
>>> config.calculate_overlap_tokens()
105
Note:
----
The overlap is calculated as: overlap_tokens = max_tokens * overlap_percent
For the default values: 700 * 0.12 = 84 tokens of overlap.
"""
# -------------------------------------------------------------------------
# Model Configuration
# -------------------------------------------------------------------------
model_config = ConfigDict(
# Allow population by field name or alias
populate_by_name=True,
# Validate default values during model creation
validate_default=True,
# Extra fields are forbidden
extra="forbid",
# JSON schema examples
json_schema_extra={
"examples": [
{
"min_tokens": 450,
"max_tokens": 700,
"overlap_percent": 0.12,
"preserve_sentences": True,
"preserve_paragraphs": True,
},
{
"min_tokens": 300,
"max_tokens": 512,
"overlap_percent": 0.10,
"preserve_sentences": True,
"preserve_paragraphs": False,
},
]
},
)
# -------------------------------------------------------------------------
# Fields
# -------------------------------------------------------------------------
min_tokens: int = Field(
default=450,
ge=1, # Must be at least 1 token
description="Minimum number of tokens per chunk",
examples=[300, 450, 500],
)
max_tokens: int = Field(
default=700,
ge=1, # Must be at least 1 token
description="Maximum number of tokens per chunk",
examples=[512, 700, 1024],
)
overlap_percent: float = Field(
default=0.12,
ge=0.0, # At least 0% overlap
le=1.0, # At most 100% overlap
description="Percentage of max_tokens to overlap between chunks (0.0-1.0)",
examples=[0.10, 0.12, 0.15, 0.20],
)
preserve_sentences: bool = Field(
default=True,
description="Avoid splitting in the middle of sentences",
)
preserve_paragraphs: bool = Field(
default=True,
description="Prefer paragraph boundaries as split points",
)
# -------------------------------------------------------------------------
# Validators
# -------------------------------------------------------------------------
@model_validator(mode="after")
def _validate_token_range(self) -> Self:
"""Validate that min_tokens is less than max_tokens.
Returns
-------
The validated model instance.
Raises
------
ValueError: If min_tokens is greater than or equal to max_tokens.
"""
if self.min_tokens >= self.max_tokens:
msg = (
f"min_tokens ({self.min_tokens}) must be less than "
f"max_tokens ({self.max_tokens})"
)
raise ValueError(msg)
return self
# -------------------------------------------------------------------------
# Methods
# -------------------------------------------------------------------------
def calculate_overlap_tokens(self) -> int:
"""Calculate the number of overlap tokens based on configuration.
This method computes the actual number of tokens to overlap
between consecutive chunks based on max_tokens and overlap_percent.
Returns:
-------
Number of tokens to overlap between consecutive chunks.
Example:
-------
>>> config = ChunkingConfig(max_tokens=700, overlap_percent=0.12)
>>> config.calculate_overlap_tokens()
84
>>> config = ChunkingConfig(max_tokens=512, overlap_percent=0.10)
>>> config.calculate_overlap_tokens()
51
Note:
----
The result is rounded to the nearest integer.
"""
return round(self.max_tokens * self.overlap_percent)