Spaces:
Running
Running
| r"""Markdown conversion utilities for extracted PDF content. | |
| This module provides the MarkdownConverter class for converting | |
| extracted PDF content to clean, well-structured Markdown format. | |
| The converter handles: | |
| - Whitespace normalization (blank lines, trailing spaces, line endings) | |
| - Heading hierarchy normalization (levels, formatting, deduplication) | |
| - Table structure validation and repair | |
| - Code block preservation (content inside fenced blocks is untouched) | |
| - Link cleanup (normalize format, remove broken links) | |
| - Image placeholder normalization (standardize format) | |
| Design Principles: | |
| - All processing uses regex for pattern matching (no heavy dependencies) | |
| - Code blocks are preserved verbatim (never modified) | |
| - Operations are idempotent (applying twice produces same result) | |
| - All methods are typed for mypy strict mode compliance | |
| Lazy Loading: | |
| No heavy dependencies - this module uses only the standard library | |
| (re module) and loads quickly. | |
| Example: | |
| ------- | |
| >>> from rag_chatbot.extraction import MarkdownConverter | |
| >>> | |
| >>> converter = MarkdownConverter() | |
| >>> raw_md = "# Title\n\n\n\n##Content\n\nSome text." | |
| >>> clean_md = converter.convert(raw_md) | |
| >>> print(clean_md) | |
| # Title | |
| <BLANKLINE> | |
| ## Content | |
| <BLANKLINE> | |
| Some text. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from typing import TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| pass # Future type imports will go here | |
| # ============================================================================= | |
| # Module Exports | |
| # ============================================================================= | |
| __all__: list[str] = ["MarkdownConverter"] | |
| # ============================================================================= | |
| # Constants - Configuration | |
| # ============================================================================= | |
| # Minimum number of lines required to be considered a valid table | |
| # (at least header row and separator row) | |
| _MIN_TABLE_LINES = 2 | |
| # ============================================================================= | |
| # Constants - Regex Patterns | |
| # ============================================================================= | |
| # These compiled regex patterns are used throughout the converter for | |
| # pattern matching and replacement operations. | |
| # ============================================================================= | |
| # Pattern to match fenced code blocks (``` or ~~~) | |
| # Captures: opening fence, optional language, content, closing fence | |
| # Uses DOTALL so . matches newlines within the block | |
| _CODE_BLOCK_PATTERN = re.compile( | |
| r"(```|~~~)([^\n]*)\n(.*?)\1", | |
| re.DOTALL, | |
| ) | |
| # Pattern to match markdown headings (# to ######) | |
| # Captures: the hashes and optional space, then the heading text | |
| _HEADING_PATTERN = re.compile( | |
| r"^(#{1,6})\s*(.*)$", | |
| re.MULTILINE, | |
| ) | |
| # Pattern to match markdown table rows | |
| # A table row starts with | and contains cells separated by | | |
| _TABLE_ROW_PATTERN = re.compile( | |
| r"^\|[^\n]+\|$", | |
| re.MULTILINE, | |
| ) | |
| # Pattern to match table separator rows (e.g., |---|---|) | |
| # These rows use -, :, and | characters to define alignment | |
| _TABLE_SEPARATOR_PATTERN = re.compile( | |
| r"^\|[\s\-:|]+\|$", | |
| re.MULTILINE, | |
| ) | |
| # Pattern to match complete markdown tables | |
| # Tables have a header row, separator row, and zero or more data rows | |
| # Note: Data rows might be missing the trailing | | |
| _TABLE_BLOCK_PATTERN = re.compile( | |
| r"(?:^|\n)(\|[^\n]+\|\n\|[-:\s|]+\|\n(?:\|[^\n]+\n?)*)", | |
| re.MULTILINE, | |
| ) | |
| # Pattern to match markdown links: [text](url) | |
| _LINK_PATTERN = re.compile( | |
| r"\[([^\]]*)\]\(([^)]*)\)", | |
| ) | |
| # Pattern to match markdown images:  | |
| _IMAGE_MD_PATTERN = re.compile( | |
| r"!\[([^\]]*)\]\(([^)]*)\)", | |
| ) | |
| # Pattern to match various image placeholder formats from extraction | |
| # Matches: [Image: ...], [IMG: ...], [Figure: ...], [[Image: ...]], etc. | |
| _IMAGE_PLACEHOLDER_PATTERN = re.compile( | |
| r"\[\[?(?:Image|IMG|Figure|image|img|figure):\s*([^\]]+?)\]?\]", | |
| re.IGNORECASE, | |
| ) | |
| # Pattern to match empty or broken links [text]() or []() | |
| _EMPTY_LINK_PATTERN = re.compile( | |
| r"\[([^\]]*)\]\(\s*\)", | |
| ) | |
| # Pattern to match multiple consecutive blank lines | |
| _MULTIPLE_BLANK_LINES_PATTERN = re.compile( | |
| r"\n{3,}", | |
| ) | |
| # Pattern to match trailing whitespace at end of lines | |
| _TRAILING_WHITESPACE_PATTERN = re.compile( | |
| r"[ \t]+$", | |
| re.MULTILINE, | |
| ) | |
| # Pattern to match Windows-style line endings | |
| _WINDOWS_LINE_ENDING_PATTERN = re.compile( | |
| r"\r\n", | |
| ) | |
| # Pattern to match old Mac-style line endings (bare CR) | |
| _MAC_LINE_ENDING_PATTERN = re.compile( | |
| r"\r(?!\n)", | |
| ) | |
| # ============================================================================= | |
| # Constants - OCR Text Normalization Patterns | |
| # ============================================================================= | |
| # These patterns are used for fixing common OCR artifacts in extracted text. | |
| # ============================================================================= | |
| # Pattern to match multiple consecutive spaces (2 or more) | |
| # Used for normalizing excessive spacing between words | |
| _MULTIPLE_SPACES_PATTERN = re.compile( | |
| r" {2,}", | |
| ) | |
| # Pattern to match spaces before common punctuation marks | |
| # Matches space followed by . , ; : ! ? ) ] } | |
| _SPACE_BEFORE_PUNCT_PATTERN = re.compile( | |
| r" +([.,;:!?\)\]\}])", | |
| ) | |
| # Pattern to match missing space after period followed by uppercase letter | |
| # Example: "text.Another" -> should have space after period | |
| _MISSING_SPACE_AFTER_PERIOD_PATTERN = re.compile( | |
| r"([.!?])([A-Z])", | |
| ) | |
| # Pattern to match missing space after comma | |
| # Example: "first,second" -> "first, second" | |
| _MISSING_SPACE_AFTER_COMMA_PATTERN = re.compile( | |
| r",([A-Za-z])", | |
| ) | |
| # Pattern to match all-caps sequences (3+ uppercase letters, possibly with spaces) | |
| # Used for detecting headings that need Title Case conversion | |
| _ALL_CAPS_SEQUENCE_PATTERN = re.compile( | |
| r"\b([A-Z]{3,}(?:\s+[A-Z]{3,})*)\b", | |
| ) | |
| # Pattern to match markdown heading lines with all-caps text | |
| # Captures: the hash prefix and the heading text | |
| _ALL_CAPS_HEADING_PATTERN = re.compile( | |
| r"^(#{1,6}\s+)([A-Z][A-Z\s]+[A-Z])$", | |
| re.MULTILINE, | |
| ) | |
| # Pattern to match single letters separated by spaces (OCR artifact) | |
| # Example: "t h e r m a l" should become "thermal" | |
| # Matches 3+ single letters each followed by a space | |
| _SINGLE_LETTER_SPACES_PATTERN = re.compile( | |
| r"\b([A-Za-z])( [A-Za-z]){2,}\b", | |
| ) | |
| # Pattern to match short word fragments separated by spaces (OCR artifact) | |
| # Matches "ther mal" or "com fort" - sequences of 2-5 char fragments with spaces | |
| # Used to detect OCR errors where words are incorrectly split | |
| _SHORT_FRAGMENTS_PATTERN = re.compile( | |
| r"\b([A-Za-z]{2,5})( [A-Za-z]{2,5}){1,4}\b", | |
| ) | |
| # Minimum/maximum fragment length for OCR word fragment detection | |
| # Fragments shorter than 2 chars are likely noise | |
| # Fragments longer than 5 chars are likely complete words | |
| _MIN_FRAGMENT_LEN = 2 | |
| _MAX_FRAGMENT_LEN = 5 | |
| # ============================================================================= | |
| # Constants - Jumbled Content Detection | |
| # ============================================================================= | |
| # Used for detecting potentially jumbled content from multi-column PDF extraction. | |
| # ============================================================================= | |
| # Maximum number of words for a sentence to be considered "short" | |
| # Short sentences between longer ones may indicate column boundary artifacts | |
| _MAX_SHORT_SENTENCE_WORDS = 2 | |
| # Minimum number of words for a sentence to be considered "long" | |
| # Used when checking if a short sentence is isolated between longer ones | |
| _MIN_LONG_SENTENCE_WORDS = 8 | |
| # ============================================================================= | |
| # Constants - Thermal Comfort Domain Dictionary | |
| # ============================================================================= | |
| # Curated set of terms common in thermal comfort documentation. | |
| # Used for detecting and correcting OCR artifacts where words are split. | |
| # ============================================================================= | |
| # Core thermal comfort terms | |
| _THERMAL_COMFORT_TERMS: frozenset[str] = frozenset( | |
| { | |
| # Basic concepts | |
| "thermal", | |
| "comfort", | |
| "temperature", | |
| "humidity", | |
| "ventilation", | |
| "metabolism", | |
| "clothing", | |
| "insulation", | |
| "radiation", | |
| "convection", | |
| "conduction", | |
| "evaporation", | |
| "environment", | |
| "environmental", | |
| "occupant", | |
| "occupants", | |
| "sensation", | |
| "satisfaction", | |
| "dissatisfaction", | |
| "acceptable", | |
| "unacceptable", | |
| # Technical terms | |
| "pythermalcomfort", | |
| "psychrometric", | |
| "enthalpy", | |
| "operative", | |
| "metabolic", | |
| "radiant", | |
| "relative", | |
| "absolute", | |
| "specific", | |
| "neutral", | |
| "adaptive", | |
| "predicted", | |
| "percentage", | |
| "dissatisfied", | |
| "velocity", | |
| "turbulence", | |
| "asymmetry", | |
| "gradient", | |
| "stratification", | |
| "draft", | |
| "draught", | |
| # Body-related | |
| "skin", | |
| "core", | |
| "body", | |
| "surface", | |
| "sweating", | |
| "shivering", | |
| "vasodilation", | |
| "vasoconstriction", | |
| # Measurement and units | |
| "clo", | |
| "met", | |
| "celsius", | |
| "fahrenheit", | |
| "kelvin", | |
| "pascal", | |
| "watt", | |
| "meter", | |
| "second", | |
| # Building-related | |
| "building", | |
| "indoor", | |
| "outdoor", | |
| "heating", | |
| "cooling", | |
| "air", | |
| "conditioning", | |
| "zone", | |
| "space", | |
| "room", | |
| "office", | |
| "residential", | |
| "commercial", | |
| # Model-related | |
| "model", | |
| "index", | |
| "indices", | |
| "equation", | |
| "formula", | |
| "calculation", | |
| "parameter", | |
| "parameters", | |
| "coefficient", | |
| "standard", | |
| "guideline", | |
| "compliance", | |
| # Data-related | |
| "data", | |
| "value", | |
| "values", | |
| "range", | |
| "limit", | |
| "limits", | |
| "threshold", | |
| "minimum", | |
| "maximum", | |
| "average", | |
| "mean", | |
| } | |
| ) | |
| # Acronyms that should ALWAYS remain uppercase | |
| # These are never converted to Title Case or lowercase | |
| _PRESERVE_ACRONYMS: frozenset[str] = frozenset( | |
| { | |
| # Thermal comfort indices | |
| "PMV", # Predicted Mean Vote | |
| "PPD", # Predicted Percentage Dissatisfied | |
| "SET", # Standard Effective Temperature | |
| "ET", # Effective Temperature | |
| "PET", # Physiological Equivalent Temperature | |
| "UTCI", # Universal Thermal Climate Index | |
| "WBGT", # Wet Bulb Globe Temperature | |
| "MRT", # Mean Radiant Temperature | |
| "OT", # Operative Temperature | |
| "AT", # Apparent Temperature | |
| "HI", # Heat Index | |
| "WCT", # Wind Chill Temperature | |
| # Standards organizations | |
| "ASHRAE", # American Society of Heating, Refrigerating & AC Engineers | |
| "ISO", # International Organization for Standardization | |
| "CEN", # European Committee for Standardization | |
| "CBE", # Center for the Built Environment | |
| "CIBSE", # Chartered Institution of Building Services Engineers | |
| # Building systems | |
| "HVAC", # Heating, Ventilation, and Air Conditioning | |
| "VAV", # Variable Air Volume | |
| "AHU", # Air Handling Unit | |
| "FCU", # Fan Coil Unit | |
| "RTU", # Rooftop Unit | |
| "BMS", # Building Management System | |
| "BEMS", # Building Energy Management System | |
| # Other technical | |
| "CFD", # Computational Fluid Dynamics | |
| "IAQ", # Indoor Air Quality | |
| "IEQ", # Indoor Environmental Quality | |
| "POE", # Post-Occupancy Evaluation | |
| "PDF", # Portable Document Format (or Probability Density Function) | |
| "API", # Application Programming Interface | |
| "USA", # United States of America | |
| "UK", # United Kingdom | |
| "EU", # European Union | |
| "AC", # Air Conditioning | |
| "DC", # Direct Current | |
| "RH", # Relative Humidity | |
| "DB", # Dry Bulb | |
| "WB", # Wet Bulb | |
| "DP", # Dew Point | |
| "CO2", # Carbon Dioxide | |
| "VOC", # Volatile Organic Compounds | |
| } | |
| ) | |
| class MarkdownConverter: | |
| r"""Convert extracted PDF content to clean Markdown format. | |
| This class provides methods for post-processing extracted PDF | |
| content to produce clean, consistent Markdown output. It handles | |
| common issues with PDF extraction such as: | |
| - Excessive whitespace and inconsistent line endings | |
| - Inconsistent heading levels and formatting | |
| - Malformed or broken tables | |
| - Unclosed code blocks | |
| - Broken or empty links | |
| - Inconsistent image placeholder formats | |
| The converter ensures output is suitable for further processing | |
| in the chunking pipeline. All processing preserves the semantic | |
| content while normalizing the formatting. | |
| Processing Order: | |
| The convert() method applies normalizations in a specific order | |
| to ensure correct results: | |
| 1. Line ending normalization (CRLF -> LF) | |
| 2. Code block closure (ensure all blocks are closed) | |
| 3. Heading normalization (fix levels and format) | |
| 4. Table normalization (fix structure) | |
| 5. Image normalization (standardize placeholders) | |
| 6. Link cleanup (remove broken links) | |
| 7. Whitespace normalization (final cleanup) | |
| Thread Safety: | |
| This class is thread-safe. All methods are stateless and can | |
| be called concurrently from multiple threads. | |
| Attributes: | |
| ---------- | |
| None. This class is stateless and uses no instance attributes. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> | |
| >>> # Clean up messy extracted content | |
| >>> raw = "# Title\n\n\n\n##No space\n\n###Skip level" | |
| >>> clean = converter.convert(raw) | |
| >>> print(clean) | |
| # Title | |
| <BLANKLINE> | |
| ## No space | |
| <BLANKLINE> | |
| ### Skip level | |
| Note: | |
| ---- | |
| The converter is designed to be idempotent - applying it | |
| multiple times to the same content produces the same result. | |
| """ | |
| # ------------------------------------------------------------------------- | |
| # Initialization | |
| # ------------------------------------------------------------------------- | |
| def __init__(self) -> None: | |
| r"""Initialize the Markdown converter. | |
| The converter is stateless, so initialization simply creates | |
| a new instance that's ready to use. No configuration is needed. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> # Ready to use immediately | |
| >>> clean_md = converter.convert("# My Document\n\nContent here.") | |
| """ | |
| # Stateless converter - no initialization needed | |
| # This enables thread-safe operation and simple usage patterns | |
| pass | |
| # ------------------------------------------------------------------------- | |
| # Public Methods - Main API | |
| # ------------------------------------------------------------------------- | |
| def convert(self, raw_content: str) -> str: | |
| r"""Convert raw extracted content to clean Markdown. | |
| This is the main entry point for the converter. It applies all | |
| normalization steps in the correct order to produce clean, | |
| well-structured Markdown output. | |
| The processing order is: | |
| 1. Line ending normalization (ensures consistent newline line endings) | |
| 2. Code block closure (ensures all fenced blocks are closed) | |
| 3. Heading normalization (fixes levels and adds space after #) | |
| 4. Table normalization (fixes structure and formatting) | |
| 5. Image normalization (standardizes placeholder format) | |
| 6. Link cleanup (removes broken/empty links) | |
| 7. OCR jumbled words (fixes "ther mal" -> "thermal") | |
| 8. OCR extra spaces (normalizes multiple/misplaced spaces) | |
| 9. OCR capitalization (fixes ALL CAPS headings, preserves acronyms) | |
| 10. OCR sentence structure (fixes missing spaces after punctuation) | |
| 11. Whitespace normalization (removes excess blank lines) | |
| Code blocks are preserved throughout - their content is never | |
| modified by any normalization step. | |
| Args: | |
| ---- | |
| raw_content : str | |
| Raw Markdown content from PDF extraction. May contain | |
| various formatting issues that need cleanup. | |
| Returns: | |
| ------- | |
| str | |
| Cleaned and normalized Markdown string. The output is | |
| suitable for chunking and embedding in the RAG pipeline. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = '''# Title | |
| ... | |
| ... | |
| ... | |
| ... ##No space after hash | |
| ... | |
| ... Some text with [broken]() link. | |
| ... | |
| ... [IMG: figure1] | |
| ... ''' | |
| >>> clean = converter.convert(raw) | |
| >>> print(clean) | |
| # Title | |
| <BLANKLINE> | |
| ## No space after hash | |
| <BLANKLINE> | |
| Some text with broken link. | |
| <BLANKLINE> | |
| [Image: figure1] | |
| Note: | |
| ---- | |
| Empty input returns empty output. The method handles None | |
| gracefully by treating it as an empty string. | |
| """ | |
| # ------------------------------------------------------------------------- | |
| # Handle edge cases: empty or None input | |
| # ------------------------------------------------------------------------- | |
| if not raw_content: | |
| return "" | |
| # ------------------------------------------------------------------------- | |
| # Step 1: Normalize line endings to Unix-style (LF) | |
| # ------------------------------------------------------------------------- | |
| # This must be done first to ensure all other patterns match correctly. | |
| # Converts CRLF (Windows) and CR (old Mac) to LF (Unix). | |
| # ------------------------------------------------------------------------- | |
| content = self._normalize_line_endings(raw_content) | |
| # ------------------------------------------------------------------------- | |
| # Step 2: Ensure code blocks are properly closed | |
| # ------------------------------------------------------------------------- | |
| # Unclosed code blocks can cause downstream processing issues. | |
| # This step adds closing fences where needed. | |
| # ------------------------------------------------------------------------- | |
| content = self.ensure_code_blocks_closed(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 3: Extract and preserve code blocks | |
| # ------------------------------------------------------------------------- | |
| # Code blocks are replaced with placeholders before other processing | |
| # to prevent their content from being modified. | |
| # ------------------------------------------------------------------------- | |
| content, code_blocks = self._extract_code_blocks(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 4: Normalize headings (levels and formatting) | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_headings(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 5: Normalize tables (structure and formatting) | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_tables(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 6: Normalize image placeholders | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_images(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 7: Clean up links (remove broken/empty links) | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_links(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 8: Restore code blocks | |
| # ------------------------------------------------------------------------- | |
| content = self._restore_code_blocks(content, code_blocks) | |
| # ------------------------------------------------------------------------- | |
| # Step 9: OCR Text Normalization - Fix jumbled words | |
| # ------------------------------------------------------------------------- | |
| # Fix words split by OCR errors (e.g., "ther mal" -> "thermal") | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_jumbled_words(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 10: OCR Text Normalization - Fix extra spaces | |
| # ------------------------------------------------------------------------- | |
| # Normalize multiple spaces, spaces before punctuation, etc. | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_extra_spaces(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 11: OCR Text Normalization - Fix capitalization | |
| # ------------------------------------------------------------------------- | |
| # Convert ALL CAPS headings to Title Case, preserve acronyms | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_capitalization(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 12: OCR Text Normalization - Fix sentence structure | |
| # ------------------------------------------------------------------------- | |
| # Add missing spaces after periods and commas | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_sentence_structure(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 13: Final whitespace normalization | |
| # ------------------------------------------------------------------------- | |
| content = self.normalize_whitespace(content) | |
| return content | |
| # ------------------------------------------------------------------------- | |
| # Public Methods - Individual Normalizations | |
| # ------------------------------------------------------------------------- | |
| def normalize_headings(self, content: str) -> str: | |
| r"""Fix heading levels and formatting. | |
| This method performs several heading normalizations: | |
| 1. Ensures consistent format (space after #, e.g., "# Title" not "#Title") | |
| 2. Fixes skipped heading levels (H1 -> H3 becomes H1 -> H2) | |
| 3. Removes duplicate headings (same text on consecutive lines) | |
| 4. Removes empty headings (# with no text) | |
| The heading level fix is "soft" - it doesn't promote headings, | |
| only demotes them when they skip levels. For example: | |
| - H1 followed by H3 -> H1 followed by H2 | |
| - H2 followed by H4 -> H2 followed by H3 | |
| - H1 followed by H2 -> unchanged (no skip) | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with potentially malformed headings. | |
| Returns: | |
| ------- | |
| str | |
| Content with normalized heading format and levels. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "# Title\n\n###Skipped level\n\n## " | |
| >>> clean = converter.normalize_headings(raw) | |
| >>> print(clean) | |
| # Title | |
| <BLANKLINE> | |
| ## Skipped level | |
| Note: | |
| ---- | |
| This method does not modify content inside code blocks, | |
| so code examples with # characters are preserved. | |
| """ | |
| if not content: | |
| return "" | |
| # ------------------------------------------------------------------------- | |
| # Step 1: Ensure space after # in headings | |
| # ------------------------------------------------------------------------- | |
| # Regex matches # at line start followed by non-space, non-# | |
| # Adds a space between the hashes and the heading text | |
| # ------------------------------------------------------------------------- | |
| def add_heading_space(match: re.Match[str]) -> str: | |
| """Add space after heading hashes if missing.""" | |
| hashes = match.group(1) | |
| text = match.group(2).strip() | |
| if text: | |
| return f"{hashes} {text}" | |
| # Return empty string for empty headings (to be removed later) | |
| return "" | |
| content = _HEADING_PATTERN.sub(add_heading_space, content) | |
| # ------------------------------------------------------------------------- | |
| # Step 2: Remove empty headings (lines with just #) | |
| # ------------------------------------------------------------------------- | |
| # After adding spaces, empty headings become just "# " or "## " etc. | |
| # These should be removed entirely. | |
| # ------------------------------------------------------------------------- | |
| content = re.sub(r"^#{1,6}\s*$", "", content, flags=re.MULTILINE) | |
| # ------------------------------------------------------------------------- | |
| # Step 3: Fix skipped heading levels | |
| # ------------------------------------------------------------------------- | |
| # Process line by line to track heading levels | |
| # ------------------------------------------------------------------------- | |
| lines = content.split("\n") | |
| result_lines: list[str] = [] | |
| last_heading_level = 0 | |
| for line in lines: | |
| # Check if this line is a heading | |
| heading_match = re.match(r"^(#{1,6})\s+(.+)$", line) | |
| if heading_match: | |
| hashes = heading_match.group(1) | |
| text = heading_match.group(2) | |
| current_level = len(hashes) | |
| # --------------------------------------------------------- | |
| # Fix skipped levels | |
| # --------------------------------------------------------- | |
| # If we jump more than one level, demote to max allowed | |
| # Example: last=1, current=4 -> should be 2 (one more) | |
| # --------------------------------------------------------- | |
| if last_heading_level > 0 and current_level > last_heading_level + 1: | |
| # Demote to the correct level (one more than last) | |
| correct_level = last_heading_level + 1 | |
| corrected_line = f"{'#' * correct_level} {text}" | |
| last_heading_level = correct_level | |
| result_lines.append(corrected_line) | |
| else: | |
| last_heading_level = current_level | |
| result_lines.append(line) | |
| else: | |
| result_lines.append(line) | |
| content = "\n".join(result_lines) | |
| # ------------------------------------------------------------------------- | |
| # Step 4: Remove duplicate consecutive headings | |
| # ------------------------------------------------------------------------- | |
| # Sometimes extraction produces duplicate headings on adjacent lines | |
| # Keep only the first occurrence | |
| # ------------------------------------------------------------------------- | |
| lines = content.split("\n") | |
| result_lines = [] | |
| last_heading: str | None = None | |
| for line in lines: | |
| heading_match = re.match(r"^#{1,6}\s+(.+)$", line) | |
| if heading_match: | |
| heading_text = heading_match.group(1).strip() | |
| # Skip if this is a duplicate of the previous heading | |
| if heading_text == last_heading: | |
| continue | |
| last_heading = heading_text | |
| elif line.strip(): | |
| # Non-empty, non-heading line - reset tracking | |
| last_heading = None | |
| result_lines.append(line) | |
| return "\n".join(result_lines) | |
| def normalize_whitespace(self, content: str) -> str: | |
| r"""Normalize whitespace and blank lines. | |
| This method performs several whitespace normalizations: | |
| 1. Normalizes line endings to Unix-style (newline) | |
| 2. Removes trailing whitespace from all lines | |
| 3. Reduces excessive blank lines (more than 2 -> 2) | |
| 4. Removes leading/trailing blank lines from document | |
| The result is clean, consistently formatted Markdown with | |
| appropriate spacing between sections. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with potentially excessive whitespace. | |
| Returns: | |
| ------- | |
| str | |
| Content with normalized whitespace. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "# Title \n\n\n\n\nContent \n\n\nMore" | |
| >>> clean = converter.normalize_whitespace(raw) | |
| >>> print(repr(clean)) | |
| '# Title\n\nContent\n\nMore' | |
| Note: | |
| ---- | |
| Whitespace inside code blocks is preserved since code blocks | |
| should be extracted before calling this method. | |
| """ | |
| if not content: | |
| return "" | |
| # ------------------------------------------------------------------------- | |
| # Step 1: Normalize line endings | |
| # ------------------------------------------------------------------------- | |
| content = self._normalize_line_endings(content) | |
| # ------------------------------------------------------------------------- | |
| # Step 2: Remove trailing whitespace from lines | |
| # ------------------------------------------------------------------------- | |
| # This preserves intentional line breaks (two trailing spaces in MD) | |
| # if needed, but PDF extraction rarely produces these intentionally. | |
| # ------------------------------------------------------------------------- | |
| content = _TRAILING_WHITESPACE_PATTERN.sub("", content) | |
| # ------------------------------------------------------------------------- | |
| # Step 3: Reduce excessive blank lines | |
| # ------------------------------------------------------------------------- | |
| # More than 2 consecutive newlines become exactly 2 (one blank line) | |
| # This preserves paragraph separation while removing excess spacing | |
| # ------------------------------------------------------------------------- | |
| content = _MULTIPLE_BLANK_LINES_PATTERN.sub("\n\n", content) | |
| # ------------------------------------------------------------------------- | |
| # Step 4: Strip leading and trailing whitespace from document | |
| # ------------------------------------------------------------------------- | |
| content = content.strip() | |
| return content | |
| def normalize_tables(self, content: str) -> str: | |
| r"""Validate and fix table formatting. | |
| This method processes markdown tables to fix common issues: | |
| 1. Ensures separator row exists (adds if missing) | |
| 2. Normalizes column count across rows | |
| 3. Fixes missing cell delimiters | |
| 4. Removes empty tables (header only, no data) | |
| Tables that cannot be repaired are left as-is to preserve | |
| the original content for manual review. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with potentially malformed tables. | |
| Returns: | |
| ------- | |
| str | |
| Content with normalized table formatting. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "| A | B |\n|---|---|\n| 1 | 2" # Missing trailing | | |
| >>> clean = converter.normalize_tables(raw) | |
| >>> print(clean) | |
| | A | B | | |
| |---|---| | |
| | 1 | 2 | | |
| Note: | |
| ---- | |
| This method does not remove tables, only cleans them. | |
| Severely malformed tables may pass through unchanged. | |
| """ | |
| if not content: | |
| return "" | |
| # ------------------------------------------------------------------------- | |
| # Find and process each table block | |
| # ------------------------------------------------------------------------- | |
| # Tables are identified by the pattern: header row, separator row, data rows | |
| # ------------------------------------------------------------------------- | |
| def fix_table(match: re.Match[str]) -> str: | |
| """Fix a single table block.""" | |
| table_text = match.group(1) | |
| return self._normalize_single_table(table_text) | |
| # Process all tables in content | |
| content = _TABLE_BLOCK_PATTERN.sub( | |
| lambda m: "\n" + self._normalize_single_table(m.group(1)), | |
| content, | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Fix potential table-like structures that don't have separator rows | |
| # ------------------------------------------------------------------------- | |
| # Look for consecutive lines starting with | that might be tables | |
| # ------------------------------------------------------------------------- | |
| content = self._fix_incomplete_tables(content) | |
| return content | |
| def normalize_images(self, content: str) -> str: | |
| r"""Normalize image placeholder format. | |
| This method standardizes various image placeholder formats | |
| to a consistent format: [Image: description] | |
| Handled input formats: | |
| -  - standard Markdown images | |
| - [Image: description] - already normalized | |
| - [IMG: description] - alternate format | |
| - [Figure: description] - figure references | |
| - [[Image: description]] - double-bracketed variants | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with various image placeholder formats. | |
| Returns: | |
| ------- | |
| str | |
| Content with standardized [Image: description] placeholders. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "\n[IMG: diagram]\n[Figure: graph]" | |
| >>> clean = converter.normalize_images(raw) | |
| >>> print(clean) | |
| [Image: Chart] | |
| [Image: diagram] | |
| [Image: graph] | |
| Note: | |
| ---- | |
| Empty alt text images become [Image: (no description)]. | |
| URL-only images (no alt text) are also handled. | |
| """ | |
| if not content: | |
| return "" | |
| # ------------------------------------------------------------------------- | |
| # Step 1: Convert standard Markdown images  | |
| # ------------------------------------------------------------------------- | |
| def convert_md_image(match: re.Match[str]) -> str: | |
| """Convert Markdown image to placeholder format.""" | |
| alt_text = match.group(1).strip() | |
| if alt_text: | |
| return f"[Image: {alt_text}]" | |
| # No alt text - use a default placeholder | |
| return "[Image: (no description)]" | |
| content = _IMAGE_MD_PATTERN.sub(convert_md_image, content) | |
| # ------------------------------------------------------------------------- | |
| # Step 2: Normalize various placeholder formats | |
| # ------------------------------------------------------------------------- | |
| def normalize_placeholder(match: re.Match[str]) -> str: | |
| """Normalize image placeholder to standard format.""" | |
| description = match.group(1).strip() | |
| if description: | |
| # Remove trailing ] if present (from double-bracket formats) | |
| description = description.rstrip("]").strip() | |
| return f"[Image: {description}]" | |
| return "[Image: (no description)]" | |
| content = _IMAGE_PLACEHOLDER_PATTERN.sub(normalize_placeholder, content) | |
| return content | |
| def normalize_links(self, content: str) -> str: | |
| """Normalize link format and remove broken links. | |
| This method cleans up markdown links: | |
| 1. Removes empty links [text]() - keeps text, removes link | |
| 2. Removes completely empty links []() - removes entirely | |
| 3. Normalizes whitespace in link text and URLs | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with potentially broken links. | |
| Returns: | |
| ------- | |
| str | |
| Content with cleaned links. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "Click [here]() for more. See [](broken) too." | |
| >>> clean = converter.normalize_links(raw) | |
| >>> print(clean) | |
| Click here for more. See too. | |
| Note: | |
| ---- | |
| Valid links are preserved. Only empty/broken links are modified. | |
| """ | |
| if not content: | |
| return "" | |
| # ------------------------------------------------------------------------- | |
| # Step 1: Remove empty links [text]() - keep the text | |
| # ------------------------------------------------------------------------- | |
| def remove_empty_link(match: re.Match[str]) -> str: | |
| """Remove empty link, keeping the text.""" | |
| text = match.group(1).strip() | |
| return text | |
| content = _EMPTY_LINK_PATTERN.sub(remove_empty_link, content) | |
| # ------------------------------------------------------------------------- | |
| # Step 2: Remove completely empty links []( anything ) | |
| # ------------------------------------------------------------------------- | |
| content = re.sub(r"\[\s*\]\([^)]*\)", "", content) | |
| # ------------------------------------------------------------------------- | |
| # Step 3: Normalize whitespace in remaining links | |
| # ------------------------------------------------------------------------- | |
| def normalize_link(match: re.Match[str]) -> str: | |
| """Normalize whitespace in link.""" | |
| text = match.group(1).strip() | |
| url = match.group(2).strip() | |
| if text and url: | |
| return f"[{text}]({url})" | |
| elif text: | |
| # URL is empty - just return text | |
| return text | |
| elif url: | |
| # Text is empty - return as is (might be intentional) | |
| return f"[]({url})" | |
| return "" | |
| content = _LINK_PATTERN.sub(normalize_link, content) | |
| return content | |
| def ensure_code_blocks_closed(self, content: str) -> str: | |
| r"""Ensure all code blocks are properly closed. | |
| This method finds unclosed code blocks (``` or ~~~) and adds | |
| closing fences. An unclosed code block is one where an opening | |
| fence has no matching closing fence. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with potentially unclosed code blocks. | |
| Returns: | |
| ------- | |
| str | |
| Content with all code blocks properly closed. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "Some text\n```python\ncode here\nMore text" | |
| >>> clean = converter.ensure_code_blocks_closed(raw) | |
| >>> print(clean) | |
| Some text | |
| ```python | |
| code here | |
| ``` | |
| More text | |
| Note: | |
| ---- | |
| The method uses a simple stack-based approach to track | |
| opening and closing fences. | |
| """ | |
| if not content: | |
| return "" | |
| lines = content.split("\n") | |
| result_lines: list[str] = [] | |
| # Track open code block state | |
| in_code_block = False | |
| code_fence: str | None = None # '```' or '~~~' | |
| for line in lines: | |
| stripped = line.strip() | |
| # Check for code fence (``` or ~~~) | |
| if stripped.startswith("```") or stripped.startswith("~~~"): | |
| fence_type = stripped[:3] | |
| if not in_code_block: | |
| # Opening a code block | |
| in_code_block = True | |
| code_fence = fence_type | |
| elif code_fence is not None and ( | |
| stripped == code_fence or stripped.startswith(code_fence) | |
| ): | |
| # Closing the code block (matching fence type) | |
| in_code_block = False | |
| code_fence = None | |
| # else: Different fence inside code block - treat as content | |
| result_lines.append(line) | |
| # ------------------------------------------------------------------------- | |
| # Handle unclosed code block at end of content | |
| # ------------------------------------------------------------------------- | |
| if in_code_block and code_fence: | |
| # Add closing fence | |
| result_lines.append(code_fence) | |
| return "\n".join(result_lines) | |
| # ------------------------------------------------------------------------- | |
| # Public Methods - OCR Text Normalization | |
| # ------------------------------------------------------------------------- | |
| def normalize_jumbled_words(self, content: str) -> str: | |
| r"""Fix OCR artifacts where spaces incorrectly split words. | |
| This method detects and corrects common OCR errors where spaces | |
| are inserted within words, breaking them into fragments. It handles: | |
| 1. Single letters separated by spaces: "t h e r m a l" -> "thermal" | |
| 2. Short fragments: "ther mal" -> "thermal", "com fort" -> "comfort" | |
| 3. Domain-specific terms from the thermal comfort dictionary | |
| The method uses a curated dictionary of thermal comfort terms to | |
| verify that combining fragments produces a valid known word. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with potential OCR artifacts. | |
| Returns: | |
| ------- | |
| str | |
| Content with corrected word spacing. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "The ther mal com fort index measures t h e conditions." | |
| >>> clean = converter.normalize_jumbled_words(raw) | |
| >>> print(clean) | |
| The thermal comfort index measures the conditions. | |
| Note: | |
| ---- | |
| This method preserves content inside code blocks. | |
| Only words matching the thermal comfort domain dictionary | |
| are corrected to avoid false positives. | |
| """ | |
| if not content: | |
| return "" | |
| # --------------------------------------------------------------------- | |
| # Extract and preserve code blocks before processing | |
| # --------------------------------------------------------------------- | |
| content, code_blocks = self._extract_code_blocks(content) | |
| # --------------------------------------------------------------------- | |
| # Step 1: Fix single letters separated by spaces | |
| # --------------------------------------------------------------------- | |
| # Pattern: "t h e r m a l" -> "thermal" | |
| # We check if the combined result is a known word | |
| # --------------------------------------------------------------------- | |
| def fix_single_letters(match: re.Match[str]) -> str: | |
| """Combine single letters if they form a known word.""" | |
| # Get the full matched text and remove spaces | |
| full_match = match.group(0) | |
| combined = full_match.replace(" ", "").lower() | |
| # Check if the combined word is in our dictionary | |
| if combined in _THERMAL_COMFORT_TERMS: | |
| # Preserve original case pattern if first letter was uppercase | |
| if full_match[0].isupper(): | |
| return combined.capitalize() | |
| return combined | |
| # Also check for very short common words that might be split | |
| if combined in {"the", "and", "for", "are", "was", "has", "had"}: | |
| return combined | |
| # Not a known word - leave as is | |
| return full_match | |
| content = _SINGLE_LETTER_SPACES_PATTERN.sub(fix_single_letters, content) | |
| # --------------------------------------------------------------------- | |
| # Step 2: Fix short word fragments separated by spaces | |
| # --------------------------------------------------------------------- | |
| # Strategy: Check if consecutive short words combine to form a | |
| # known thermal comfort term. Only merge if the result is valid. | |
| # This approach avoids matching valid words like "The" as fragments. | |
| # --------------------------------------------------------------------- | |
| content = self._fix_fragmented_words(content) | |
| # --------------------------------------------------------------------- | |
| # Restore code blocks | |
| # --------------------------------------------------------------------- | |
| content = self._restore_code_blocks(content, code_blocks) | |
| return content | |
| def normalize_capitalization(self, content: str) -> str: | |
| r"""Fix improper capitalization from PDF extraction. | |
| This method corrects common capitalization issues: | |
| 1. ALL CAPS headings -> Title Case: "## THERMAL COMFORT" -> "## Thermal Comfort" | |
| 2. Preserve acronyms: "PMV", "PPD", "ASHRAE", "ISO", "HVAC" stay uppercase | |
| 3. Fix mid-sentence all caps: "PMV MODEL PREDICTS" -> "PMV model predicts" | |
| Content inside code blocks and inline code is never modified. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with capitalization issues. | |
| Returns: | |
| ------- | |
| str | |
| Content with normalized capitalization. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "## THERMAL COMFORT\n\nThe PMV MODEL PREDICTS comfort." | |
| >>> clean = converter.normalize_capitalization(raw) | |
| >>> print(clean) | |
| ## Thermal Comfort | |
| <BLANKLINE> | |
| The PMV model predicts comfort. | |
| Note: | |
| ---- | |
| Acronyms defined in _PRESERVE_ACRONYMS are always kept uppercase. | |
| This includes PMV, PPD, ASHRAE, ISO, HVAC, and other technical terms. | |
| """ | |
| if not content: | |
| return "" | |
| # --------------------------------------------------------------------- | |
| # Extract and preserve code blocks before processing | |
| # --------------------------------------------------------------------- | |
| content, code_blocks = self._extract_code_blocks(content) | |
| # --------------------------------------------------------------------- | |
| # Step 1: Convert ALL CAPS headings to Title Case | |
| # --------------------------------------------------------------------- | |
| # Pattern matches: ## ALL CAPS HEADING | |
| # Preserves acronyms within the heading | |
| # --------------------------------------------------------------------- | |
| def fix_heading_caps(match: re.Match[str]) -> str: | |
| """Convert all-caps heading to title case, preserving acronyms.""" | |
| prefix = match.group(1) # "## " | |
| heading_text = match.group(2) # "ALL CAPS HEADING" | |
| # Split into words and process each | |
| words = heading_text.split() | |
| result_words: list[str] = [] | |
| for word in words: | |
| # Check if it's a known acronym (should stay uppercase) | |
| if word.upper() in _PRESERVE_ACRONYMS: | |
| result_words.append(word.upper()) | |
| else: | |
| # Convert to title case | |
| result_words.append(word.capitalize()) | |
| return prefix + " ".join(result_words) | |
| content = _ALL_CAPS_HEADING_PATTERN.sub(fix_heading_caps, content) | |
| # --------------------------------------------------------------------- | |
| # Step 2: Fix mid-sentence all caps sequences (non-heading text) | |
| # --------------------------------------------------------------------- | |
| # Only process lines that are NOT headings | |
| # Pattern matches: 3+ uppercase letters that aren't acronyms | |
| # --------------------------------------------------------------------- | |
| lines = content.split("\n") | |
| result_lines: list[str] = [] | |
| for line in lines: | |
| # Skip heading lines (already processed) | |
| if line.strip().startswith("#"): | |
| result_lines.append(line) | |
| continue | |
| # Skip lines that look like they're in a code block placeholder | |
| if "\x00CODE_BLOCK" in line: | |
| result_lines.append(line) | |
| continue | |
| # Process the line for mid-sentence all caps | |
| def fix_mid_sentence_caps(match: re.Match[str]) -> str: | |
| """Convert mid-sentence all caps to lowercase, preserving acronyms.""" | |
| text = match.group(1) | |
| words = text.split() | |
| result_words: list[str] = [] | |
| for word in words: | |
| # Check if it's a known acronym | |
| if word.upper() in _PRESERVE_ACRONYMS: | |
| result_words.append(word.upper()) | |
| else: | |
| # Convert to lowercase (mid-sentence) | |
| result_words.append(word.lower()) | |
| return " ".join(result_words) | |
| # Only apply to sequences that are clearly all-caps phrases | |
| # (not single words which might be intentional emphasis) | |
| processed_line = _ALL_CAPS_SEQUENCE_PATTERN.sub(fix_mid_sentence_caps, line) | |
| result_lines.append(processed_line) | |
| content = "\n".join(result_lines) | |
| # --------------------------------------------------------------------- | |
| # Restore code blocks | |
| # --------------------------------------------------------------------- | |
| content = self._restore_code_blocks(content, code_blocks) | |
| return content | |
| def normalize_extra_spaces(self, content: str) -> str: | |
| r"""Fix excessive spacing issues in content. | |
| This method corrects common spacing problems from PDF extraction: | |
| 1. Multiple spaces between words -> single space | |
| 2. Spaces before punctuation -> removed | |
| 3. Tab/space mixtures -> normalized to spaces | |
| Content inside code blocks is preserved exactly as-is. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with spacing issues. | |
| Returns: | |
| ------- | |
| str | |
| Content with normalized spacing. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "This has multiple spaces . And more ." | |
| >>> clean = converter.normalize_extra_spaces(raw) | |
| >>> print(clean) | |
| This has multiple spaces. And more. | |
| Note: | |
| ---- | |
| This is different from normalize_whitespace() which handles | |
| blank lines and trailing whitespace. This method specifically | |
| targets intra-line spacing issues. | |
| """ | |
| if not content: | |
| return "" | |
| # --------------------------------------------------------------------- | |
| # Extract and preserve code blocks before processing | |
| # --------------------------------------------------------------------- | |
| content, code_blocks = self._extract_code_blocks(content) | |
| # --------------------------------------------------------------------- | |
| # Process line by line to preserve leading indentation | |
| # --------------------------------------------------------------------- | |
| # We need to handle each line separately to preserve leading whitespace | |
| # while normalizing internal spacing issues | |
| # --------------------------------------------------------------------- | |
| lines = content.split("\n") | |
| normalized_lines: list[str] = [] | |
| for line in lines: | |
| # Find leading whitespace | |
| stripped = line.lstrip() | |
| if not stripped: | |
| # Empty or whitespace-only line | |
| normalized_lines.append("") | |
| continue | |
| # Get the leading whitespace (preserve structure) | |
| leading = line[: len(line) - len(stripped)] | |
| # Step 1: Replace tabs in the content part (after leading whitespace) | |
| content_part = stripped.replace("\t", " ") | |
| # Step 2: Reduce multiple consecutive spaces to single space | |
| content_part = _MULTIPLE_SPACES_PATTERN.sub(" ", content_part) | |
| # Step 3: Remove spaces before punctuation | |
| # "text ." -> "text." | |
| # "word ," -> "word," | |
| content_part = _SPACE_BEFORE_PUNCT_PATTERN.sub(r"\1", content_part) | |
| normalized_lines.append(leading + content_part) | |
| content = "\n".join(normalized_lines) | |
| # --------------------------------------------------------------------- | |
| # Restore code blocks | |
| # --------------------------------------------------------------------- | |
| content = self._restore_code_blocks(content, code_blocks) | |
| return content | |
| def normalize_sentence_structure(self, content: str) -> str: | |
| r"""Fix common sentence structure issues. | |
| This method corrects punctuation and spacing issues that affect | |
| sentence structure: | |
| 1. Missing space after period: "text.Another" -> "text. Another" | |
| 2. Missing space after comma: "first,second" -> "first, second" | |
| 3. Proper capitalization after period within a line | |
| Content inside code blocks and URLs is preserved. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content with sentence structure issues. | |
| Returns: | |
| ------- | |
| str | |
| Content with corrected sentence structure. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> raw = "First sentence.Second sentence.third should be capitalized." | |
| >>> clean = converter.normalize_sentence_structure(raw) | |
| >>> print(clean) | |
| First sentence. Second sentence. Third should be capitalized. | |
| Note: | |
| ---- | |
| URLs (http://, https://) are detected and preserved. | |
| File extensions (e.g., "file.txt") are handled carefully. | |
| Abbreviations like "Dr." or "Mr." followed by names are preserved. | |
| """ | |
| if not content: | |
| return "" | |
| # --------------------------------------------------------------------- | |
| # Extract and preserve code blocks before processing | |
| # --------------------------------------------------------------------- | |
| content, code_blocks = self._extract_code_blocks(content) | |
| # --------------------------------------------------------------------- | |
| # Step 1: Add missing space after period followed by uppercase | |
| # --------------------------------------------------------------------- | |
| # Pattern: "text.Another" -> "text. Another" | |
| # Skip URLs and file extensions | |
| # --------------------------------------------------------------------- | |
| def fix_period_spacing(match: re.Match[str]) -> str: | |
| """Add space after period if missing before uppercase letter.""" | |
| punct = match.group(1) | |
| next_char = match.group(2) | |
| return f"{punct} {next_char}" | |
| # Apply the fix, but we need to be careful with URLs | |
| # First, let's protect URLs by temporarily replacing them | |
| url_pattern = re.compile(r"(https?://[^\s\)]+)") | |
| urls: list[str] = [] | |
| def protect_url(match: re.Match[str]) -> str: | |
| """Temporarily replace URLs with placeholders.""" | |
| urls.append(match.group(0)) | |
| return f"\x00URL_{len(urls) - 1}\x00" | |
| content = url_pattern.sub(protect_url, content) | |
| # Now apply the period spacing fix | |
| content = _MISSING_SPACE_AFTER_PERIOD_PATTERN.sub(fix_period_spacing, content) | |
| # Restore URLs | |
| for i, url in enumerate(urls): | |
| content = content.replace(f"\x00URL_{i}\x00", url) | |
| # --------------------------------------------------------------------- | |
| # Step 2: Add missing space after comma | |
| # --------------------------------------------------------------------- | |
| # Pattern: "first,second" -> "first, second" | |
| # Be careful with numbers like "1,000" - don't add space there | |
| # --------------------------------------------------------------------- | |
| def fix_comma_spacing(match: re.Match[str]) -> str: | |
| """Add space after comma if missing before letter.""" | |
| next_char = match.group(1) | |
| return f", {next_char}" | |
| content = _MISSING_SPACE_AFTER_COMMA_PATTERN.sub(fix_comma_spacing, content) | |
| # --------------------------------------------------------------------- | |
| # Restore code blocks | |
| # --------------------------------------------------------------------- | |
| content = self._restore_code_blocks(content, code_blocks) | |
| return content | |
| # ------------------------------------------------------------------------- | |
| # Private Methods - Internal Helpers | |
| # ------------------------------------------------------------------------- | |
| def _normalize_line_endings(self, content: str) -> str: | |
| """Normalize all line endings to Unix-style LF. | |
| Converts: | |
| - CRLF (Windows) -> LF | |
| - CR (old Mac) -> LF | |
| Args: | |
| ---- | |
| content: Content with potentially mixed line endings. | |
| Returns: | |
| ------- | |
| Content with consistent LF line endings. | |
| """ | |
| # First convert CRLF to LF | |
| content = _WINDOWS_LINE_ENDING_PATTERN.sub("\n", content) | |
| # Then convert any remaining CR to LF | |
| content = _MAC_LINE_ENDING_PATTERN.sub("\n", content) | |
| return content | |
| def _extract_code_blocks(self, content: str) -> tuple[str, list[str]]: | |
| """Extract code blocks and replace with placeholders. | |
| This preserves code block content during other processing steps. | |
| Code blocks are replaced with unique placeholders that won't be | |
| affected by other normalization steps. | |
| Args: | |
| ---- | |
| content: Markdown content with code blocks. | |
| Returns: | |
| ------- | |
| Tuple of (content_with_placeholders, list_of_code_blocks). | |
| The code blocks list contains the full code block text | |
| including fences. | |
| """ | |
| code_blocks: list[str] = [] | |
| placeholder_template = "\x00CODE_BLOCK_{}\x00" | |
| def replace_code_block(match: re.Match[str]) -> str: | |
| """Replace code block with placeholder.""" | |
| full_match = match.group(0) | |
| index = len(code_blocks) | |
| code_blocks.append(full_match) | |
| return placeholder_template.format(index) | |
| content = _CODE_BLOCK_PATTERN.sub(replace_code_block, content) | |
| return content, code_blocks | |
| def _restore_code_blocks( | |
| self, | |
| content: str, | |
| code_blocks: list[str], | |
| ) -> str: | |
| """Restore code blocks from placeholders. | |
| Args: | |
| ---- | |
| content: Content with code block placeholders. | |
| code_blocks: List of original code block text. | |
| Returns: | |
| ------- | |
| Content with code blocks restored. | |
| """ | |
| for i, code_block in enumerate(code_blocks): | |
| placeholder = f"\x00CODE_BLOCK_{i}\x00" | |
| content = content.replace(placeholder, code_block) | |
| return content | |
| def _normalize_single_table(self, table_text: str) -> str: | |
| """Normalize a single markdown table. | |
| Args: | |
| ---- | |
| table_text: The raw table text to normalize. | |
| Returns: | |
| ------- | |
| Normalized table text. | |
| """ | |
| lines = table_text.strip().split("\n") | |
| if len(lines) < _MIN_TABLE_LINES: | |
| # Not a valid table - return as-is | |
| return table_text | |
| # ------------------------------------------------------------------------- | |
| # Parse table structure | |
| # ------------------------------------------------------------------------- | |
| rows: list[list[str]] = [] | |
| separator_row_idx: int | None = None # Index in the rows list | |
| for raw_line in lines: | |
| stripped_line = raw_line.strip() | |
| if not stripped_line: | |
| continue | |
| # Check if this is a separator row | |
| if _TABLE_SEPARATOR_PATTERN.match(stripped_line): | |
| separator_row_idx = len(rows) # Track index in rows list | |
| rows.append([]) # Placeholder for separator | |
| else: | |
| # Parse cells from the row | |
| # Remove leading and trailing | | |
| cell_content = stripped_line | |
| if cell_content.startswith("|"): | |
| cell_content = cell_content[1:] | |
| if cell_content.endswith("|"): | |
| cell_content = cell_content[:-1] | |
| cells = [cell.strip() for cell in cell_content.split("|")] | |
| rows.append(cells) | |
| if not rows or separator_row_idx is None: | |
| # No valid table structure found | |
| return table_text | |
| # ------------------------------------------------------------------------- | |
| # Determine column count (from header row) | |
| # ------------------------------------------------------------------------- | |
| header_row = rows[0] if rows else [] | |
| col_count = len(header_row) if header_row else 1 | |
| # ------------------------------------------------------------------------- | |
| # Normalize all rows to have the same column count | |
| # ------------------------------------------------------------------------- | |
| normalized_rows: list[str] = [] | |
| for idx, row_cells in enumerate(rows): | |
| if idx == separator_row_idx: | |
| # Generate separator row with correct column count | |
| sep_row = "| " + " | ".join(["---"] * col_count) + " |" | |
| normalized_rows.append(sep_row) | |
| else: | |
| # Normalize data row | |
| # Make a copy to avoid modifying the original | |
| normalized_cells = list(row_cells) | |
| # Pad with empty cells if needed | |
| while len(normalized_cells) < col_count: | |
| normalized_cells.append("") | |
| # Truncate if too many cells | |
| normalized_cells = normalized_cells[:col_count] | |
| # Format the row | |
| formatted_row = "| " + " | ".join(normalized_cells) + " |" | |
| normalized_rows.append(formatted_row) | |
| return "\n".join(normalized_rows) | |
| def _fix_incomplete_tables(self, content: str) -> str: | |
| """Fix tables that might be missing separator rows. | |
| This method looks for consecutive lines starting with | that | |
| look like tables but don't have proper separator rows. | |
| Args: | |
| ---- | |
| content: Markdown content. | |
| Returns: | |
| ------- | |
| Content with fixed tables. | |
| """ | |
| lines = content.split("\n") | |
| result_lines: list[str] = [] | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| # Check if this looks like a table header row (but not a separator row) | |
| if ( | |
| line.startswith("|") | |
| and line.endswith("|") | |
| and not _TABLE_SEPARATOR_PATTERN.match(line) | |
| ): | |
| # Count cells in this row | |
| cells = [c.strip() for c in line[1:-1].split("|")] | |
| cell_count = len(cells) | |
| # Look at next line | |
| if i + 1 < len(lines): | |
| next_line = lines[i + 1].strip() | |
| # Check if next line is a separator | |
| if _TABLE_SEPARATOR_PATTERN.match(next_line): | |
| # Table already has separator - keep as-is | |
| result_lines.append(lines[i]) | |
| i += 1 | |
| continue | |
| # Check if next line is another table row (missing separator) | |
| if ( | |
| next_line.startswith("|") | |
| and next_line.endswith("|") | |
| and not _TABLE_SEPARATOR_PATTERN.match(next_line) | |
| ): | |
| # Add header row | |
| result_lines.append(lines[i]) | |
| # Add separator row | |
| separator = "| " + " | ".join(["---"] * cell_count) + " |" | |
| result_lines.append(separator) | |
| i += 1 | |
| continue | |
| result_lines.append(lines[i]) | |
| i += 1 | |
| return "\n".join(result_lines) | |
| def _fix_fragmented_words(self, content: str) -> str: | |
| """Fix words that were incorrectly split by OCR. | |
| This method scans through the content looking for sequences of | |
| short words that, when combined, form a known thermal comfort term. | |
| Unlike regex-based approaches, this method is more targeted and | |
| avoids incorrectly matching valid short words. | |
| Args: | |
| ---- | |
| content: Content with potential word fragmentation. | |
| Returns: | |
| ------- | |
| Content with fragmented words merged where appropriate. | |
| """ | |
| # Common short valid English words to NOT treat as fragments | |
| common_short_words = frozenset( | |
| { | |
| "the", | |
| "and", | |
| "for", | |
| "are", | |
| "was", | |
| "has", | |
| "had", | |
| "not", | |
| "but", | |
| "can", | |
| "all", | |
| "her", | |
| "his", | |
| "its", | |
| "may", | |
| "new", | |
| "now", | |
| "old", | |
| "one", | |
| "our", | |
| "out", | |
| "own", | |
| "say", | |
| "she", | |
| "two", | |
| "use", | |
| "way", | |
| "who", | |
| "you", | |
| "how", | |
| "man", | |
| "get", | |
| "see", | |
| "set", | |
| "met", | |
| "air", | |
| } | |
| ) | |
| words = content.split(" ") | |
| result: list[str] = [] | |
| i = 0 | |
| while i < len(words): | |
| word = words[i] | |
| # Check if this could be the start of a fragmented word | |
| # Skip if it's a common valid word or has punctuation | |
| stripped_word = word.strip().lower() | |
| # Only consider short words as potential fragments | |
| if ( | |
| _MIN_FRAGMENT_LEN <= len(stripped_word) <= _MAX_FRAGMENT_LEN | |
| and stripped_word.isalpha() | |
| and stripped_word not in common_short_words | |
| ): | |
| # Try combining with next 1-4 words | |
| found_match = False | |
| for num_extra in range(1, 5): | |
| if i + num_extra >= len(words): | |
| break | |
| # Build the combined word from fragments | |
| fragment_words = words[i : i + num_extra + 1] | |
| combined = "".join(w.strip() for w in fragment_words).lower() | |
| # Check if all fragments are short and alphabetic | |
| all_valid_fragments = all( | |
| _MIN_FRAGMENT_LEN <= len(w.strip()) <= _MAX_FRAGMENT_LEN | |
| and w.strip().isalpha() | |
| and w.strip().lower() not in common_short_words | |
| for w in fragment_words[1:] # Check all but first | |
| ) | |
| if not all_valid_fragments: | |
| continue | |
| # Check if combined word is in our dictionary | |
| if combined in _THERMAL_COMFORT_TERMS: | |
| # Preserve original case if first char was uppercase | |
| if words[i] and words[i][0].isupper(): | |
| result.append(combined.capitalize()) | |
| else: | |
| result.append(combined) | |
| i += num_extra + 1 | |
| found_match = True | |
| break | |
| # No valid combination found - append word and move on | |
| if not found_match: | |
| result.append(word) | |
| i += 1 | |
| else: | |
| result.append(word) | |
| i += 1 | |
| return " ".join(result) | |
| # ------------------------------------------------------------------------- | |
| # Public Methods - Content Quality Detection | |
| # ------------------------------------------------------------------------- | |
| def detect_jumbled_content(self, content: str) -> list[str]: | |
| r"""Detect potentially jumbled sentences from multi-column extraction. | |
| This method analyzes content to identify patterns that may indicate | |
| jumbled text from incorrect multi-column PDF extraction. When PDFs | |
| with multiple columns are extracted incorrectly, text from different | |
| columns can get mixed together, producing nonsensical sentences. | |
| Common indicators of jumbled multi-column extraction: | |
| 1. Abrupt topic changes within a sentence | |
| 2. Repeated similar sentence starters close together | |
| 3. Incomplete sentences followed by unrelated text | |
| 4. Technical terms from different contexts mixed together | |
| This method uses heuristics to flag potentially problematic content | |
| for manual review. It does NOT attempt to fix jumbled content, as | |
| automatic repair is error-prone and could introduce new errors. | |
| Args: | |
| ---- | |
| content : str | |
| Markdown content to analyze for jumbled text patterns. | |
| Returns: | |
| ------- | |
| list[str] | |
| List of suspicious patterns or text snippets that may | |
| indicate jumbled content. Empty list if no issues detected. | |
| Each item includes context about why it was flagged. | |
| Example: | |
| ------- | |
| >>> converter = MarkdownConverter() | |
| >>> content = ''' | |
| ... The PMV model is The SET model is designed for predicting | |
| ... thermal comfort in controlled environments based on clothing. | |
| ... ''' | |
| >>> issues = converter.detect_jumbled_content(content) | |
| >>> for issue in issues: | |
| ... print(f"Potential issue: {issue}") | |
| Potential issue: Repeated sentence starter 'The ... is The ... is' | |
| Note: | |
| ---- | |
| This method is designed for awareness and logging purposes. | |
| False positives are possible, especially with legitimate | |
| repetitive content. Manual review is recommended for flagged | |
| content. | |
| The method preserves code blocks - content inside fenced code | |
| blocks is not analyzed for jumbling. | |
| """ | |
| if not content: | |
| return [] | |
| issues: list[str] = [] | |
| # --------------------------------------------------------------------- | |
| # Extract and preserve code blocks before analysis | |
| # --------------------------------------------------------------------- | |
| content_to_analyze, _ = self._extract_code_blocks(content) | |
| # --------------------------------------------------------------------- | |
| # Pattern 1: Repeated sentence starters close together | |
| # --------------------------------------------------------------------- | |
| # Look for patterns like "The X is The Y is" which indicate | |
| # text from adjacent columns getting mixed | |
| # --------------------------------------------------------------------- | |
| repeated_starter_pattern = re.compile( | |
| r"\b(The|A|An|This|That|These|Those|It|There)\s+" | |
| r"(\w+(?:\s+\w+){0,3})\s+" | |
| r"(is|are|was|were|has|have|can|will|would|should)\s+" | |
| r"\1\s+", | |
| re.IGNORECASE, | |
| ) | |
| for match in repeated_starter_pattern.finditer(content_to_analyze): | |
| start_idx = max(0, match.start() - 20) | |
| end_idx = min(len(content_to_analyze), match.end() + 20) | |
| snippet = content_to_analyze[start_idx:end_idx] | |
| issues.append( | |
| f"Repeated sentence starter pattern: '{match.group(0).strip()}' " | |
| f"(context: ...{snippet.strip()}...)" | |
| ) | |
| # --------------------------------------------------------------------- | |
| # Pattern 2: Sentence fragments - period followed by lowercase | |
| # (excluding common abbreviations) | |
| # --------------------------------------------------------------------- | |
| # This can indicate mid-sentence breaks from column mixing | |
| # --------------------------------------------------------------------- | |
| abbreviations = { | |
| "Dr.", | |
| "Mr.", | |
| "Mrs.", | |
| "Ms.", | |
| "Prof.", | |
| "vs.", | |
| "etc.", | |
| "e.g.", | |
| "i.e.", | |
| } | |
| fragment_pattern = re.compile(r"([A-Za-z]{3,})\.\s+([a-z]{2,})") | |
| for match in fragment_pattern.finditer(content_to_analyze): | |
| # Check if this looks like an abbreviation | |
| potential_abbrev = match.group(1) + "." | |
| if potential_abbrev not in abbreviations: | |
| # Check if the following word could reasonably start a sentence | |
| following_word = match.group(2) | |
| # Common sentence-starting words in lowercase is suspicious | |
| if following_word in { | |
| "the", | |
| "a", | |
| "an", | |
| "this", | |
| "that", | |
| "these", | |
| "it", | |
| "there", | |
| "we", | |
| "they", | |
| }: | |
| snippet = content_to_analyze[ | |
| max(0, match.start() - 10) : min( | |
| len(content_to_analyze), match.end() + 30 | |
| ) | |
| ] | |
| issues.append( | |
| f"Possible sentence fragment (period followed by lowercase): " | |
| f"...{snippet.strip()}..." | |
| ) | |
| # --------------------------------------------------------------------- | |
| # Pattern 3: Very short sentences between longer content | |
| # --------------------------------------------------------------------- | |
| # Single-word or very short sentences between substantive content | |
| # can indicate column boundary artifacts | |
| # --------------------------------------------------------------------- | |
| sentences = re.split(r"(?<=[.!?])\s+", content_to_analyze) | |
| for i, sentence in enumerate(sentences): | |
| words_in_sentence = len(sentence.split()) | |
| if 1 <= words_in_sentence <= _MAX_SHORT_SENTENCE_WORDS: | |
| # Check if surrounded by longer sentences | |
| prev_len = len(sentences[i - 1].split()) if i > 0 else 0 | |
| next_len = ( | |
| len(sentences[i + 1].split()) if i < len(sentences) - 1 else 0 | |
| ) | |
| if ( | |
| prev_len > _MIN_LONG_SENTENCE_WORDS | |
| and next_len > _MIN_LONG_SENTENCE_WORDS | |
| ): | |
| issues.append( | |
| f"Isolated short sentence between longer content: " | |
| f"'{sentence.strip()}'" | |
| ) | |
| # --------------------------------------------------------------------- | |
| # Pattern 4: Check for thermal comfort domain-specific mixing | |
| # --------------------------------------------------------------------- | |
| # Look for incompatible thermal comfort terms appearing together | |
| # in ways that suggest column mixing | |
| # --------------------------------------------------------------------- | |
| model_pairs = [ | |
| ("PMV", "SET"), | |
| ("PMV", "PET"), | |
| ("SET", "UTCI"), | |
| ("adaptive", "steady-state"), | |
| ] | |
| for term1, term2 in model_pairs: | |
| # Look for both terms appearing very close together in a suspicious way | |
| pattern = re.compile( | |
| rf"\b{term1}\b[^.!?]{{0,30}}\b{term2}\b[^.!?]{{0,30}}\b{term1}\b", | |
| re.IGNORECASE, | |
| ) | |
| for match in pattern.finditer(content_to_analyze): | |
| issues.append( | |
| f"Possible model name mixing ({term1}/{term2}): " | |
| f"'{match.group(0).strip()}'" | |
| ) | |
| return issues | |