"""Post-processing functions and regex patterns for markdown cleanup.""" import re _STANDALONE_DATE = re.compile( r"^\s*(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),\s+" r"(?:January|February|March|April|May|June|July|August|September|" r"October|November|December)\s+\d{1,2},\s+\d{4}\s*$", re.MULTILINE, ) _STANDALONE_TIME = re.compile(r"^\s*\d{1,2}:\d{2}\s*(?:AM|PM)\s*$", re.MULTILINE) _PAGE_FOOTER = re.compile( r"^\s*\d{1,3}\s*\|?\s*\d{2,5}\s+\w.*(?:Rd|St|Ave|Blvd|Dr|Ln|Way|Ct)\b.*\d{5}.*$", re.MULTILINE, ) _STANDALONE_PAGE_NUM = re.compile(r"^\s*\d{1,3}\s*$", re.MULTILINE) _BRANDING_FOOTER = re.compile(r"^\s*[A-Za-z][^|]{5,}\|[^|]+\|?\s*\d{1,3}\s*$", re.MULTILINE) _SHORT_LOCATION_LINE = re.compile(r"^\s*[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*,\s*[A-Z]{2}\s*$", re.MULTILINE) _NUMBERED_SECTION = re.compile(r"^(\d{1,2})\.\s+([A-Z][A-Z\s\-/&,]+(?:\.\s*)?)") _EMPTY_TABLE_ROW = re.compile(r"^\|(?:\s*\|)+\s*$", re.MULTILINE) _TRAILING_EMPTY_CELLS = re.compile(r"(?:\s*\|\s*){2,}\s*$") _TABLE_SEP_ROW = re.compile(r"^\|[\s\-:]+(?:\|[\s\-:]+)+\|?\s*$") _LATEX_MATHRM = re.compile(r"\$\s*\\mathrm\{([^}]*)\}\s*\$") _LATEX_SUPERSCRIPT = re.compile(r"\$\s*\^?\{([^}]*)\}\s*\$") _LATEX_SUBSCRIPT = re.compile(r"\$\s*_\{([^}]*)\}\s*\$") _LATEX_PLUSMINUS = re.compile(r"\$\s*\\pm\s*([^$]*?)\s*\$") _LATEX_INLINE = re.compile(r"\$\s*([^$]{1,60}?[\\^_{}][^$]{0,60}?)\s*\$") _LATEX_ESCAPED_CHARS = re.compile(r"\\([%$&_#])") _TINY_IMAGE_DIV = re.compile(r']*>\s*]*width="(\d+)%"[^>]*/>\s*', re.IGNORECASE) def _post_process_merged_markdown(content: str) -> str: """Post-process merged markdown to fix extraction artifacts.""" content = _strip_latex_artifacts(content) content = _remove_tiny_image_tags(content) content = _deduplicate_headings(content) content = _deduplicate_short_blocks(content) content = _remove_page_boundary_artifacts(content) content = _normalize_numbered_headings(content) content = _clean_table_artifacts(content) content = _merge_split_tables(content) content = re.sub(r"\n{4,}", "\n\n\n", content) return content.strip() def _strip_latex_artifacts(content: str) -> str: def _clean_mathrm(match: re.Match) -> str: return match.group(1).replace("~", "").strip() content = _LATEX_MATHRM.sub(_clean_mathrm, content) content = _LATEX_PLUSMINUS.sub(lambda m: "±" + m.group(1), content) content = _LATEX_SUPERSCRIPT.sub(lambda m: m.group(1), content) content = _LATEX_SUBSCRIPT.sub(lambda m: m.group(1), content) content = _LATEX_ESCAPED_CHARS.sub(lambda m: m.group(1), content) content = _LATEX_INLINE.sub(lambda m: m.group(1).strip(), content) return content def _remove_tiny_image_tags(content: str) -> str: def _check_size(match: re.Match) -> str: return "" if int(match.group(1)) <= 10 else match.group(0) return _TINY_IMAGE_DIV.sub(_check_size, content) def _deduplicate_headings(content: str) -> str: lines = content.split("\n") seen_headings: set[str] = set() result: list[str] = [] for line in lines: stripped = line.strip() if stripped.startswith("#"): key = stripped.lstrip("#").strip().lower() if key and key in seen_headings: continue if key: seen_headings.add(key) result.append(line) return "\n".join(result) def _deduplicate_short_blocks(content: str) -> str: blocks = content.split("\n\n") seen: set[str] = set() result: list[str] = [] for block in blocks: stripped = block.strip() if not stripped: result.append(block) continue is_table = stripped.startswith("|") and "|" in stripped[1:] is_heading = stripped.startswith("#") if is_table or is_heading or len(stripped) > 120: result.append(block) continue key = stripped.lower() if key in seen: continue seen.add(key) result.append(block) return "\n\n".join(result) def _remove_page_boundary_artifacts(content: str) -> str: content = _STANDALONE_DATE.sub("", content) content = _STANDALONE_TIME.sub("", content) content = _PAGE_FOOTER.sub("", content) content = _STANDALONE_PAGE_NUM.sub("", content) content = _remove_repeated_lines(content, _BRANDING_FOOTER, min_repeats=3) content = _remove_repeated_lines(content, _SHORT_LOCATION_LINE, min_repeats=3) return content def _remove_repeated_lines(content: str, pattern: re.Pattern, min_repeats: int = 3) -> str: counts: dict[str, int] = {} for match in pattern.finditer(content): key = match.group(0).strip().lower() counts[key] = counts.get(key, 0) + 1 repeated = {k for k, v in counts.items() if v >= min_repeats} if not repeated: return content result: list[str] = [] for line in content.split("\n"): if line.strip().lower() in repeated: continue result.append(line) return "\n".join(result) def _normalize_numbered_headings(content: str) -> str: lines = content.split("\n") result: list[str] = [] sections_with_heading: set[int] = set() sections_without_heading: set[int] = set() for line in lines: stripped = line.strip() heading_match = re.match(r"^#{1,3}\s+(\d{1,2})\.\s+[A-Z]", stripped) if heading_match: sections_with_heading.add(int(heading_match.group(1))) continue plain_match = _NUMBERED_SECTION.match(stripped) if plain_match: sections_without_heading.add(int(plain_match.group(1))) if sections_with_heading and sections_without_heading: for line in lines: stripped = line.strip() plain_match = _NUMBERED_SECTION.match(stripped) if plain_match: title_end = plain_match.end() title = stripped[:title_end].rstrip(".") body = stripped[title_end:].strip() result.append(f"## {title}") if body: result.append(body) continue result.append(line) return "\n".join(result) return content def _clean_table_artifacts(content: str) -> str: lines = content.split("\n") result: list[str] = [] for line in lines: stripped = line.strip() if _EMPTY_TABLE_ROW.match(stripped): continue if stripped.startswith("|") and "|" in stripped[1:] and not _TABLE_SEP_ROW.match(stripped): result.append(_TRAILING_EMPTY_CELLS.sub(" |", stripped)) continue result.append(line) return "\n".join(result) def _is_table_line(line: str) -> bool: s = line.strip() return bool(s.startswith("|") and s.endswith("|") and s.count("|") >= 3) def _count_columns(line: str) -> int: s = line.strip() if not s.startswith("|"): return 0 return max(0, len(s.split("|")) - 2) def _merge_split_tables(content: str) -> str: lines = content.split("\n") result: list[str] = [] i = 0 while i < len(lines): result.append(lines[i]) i += 1 if not _is_table_line(result[-1]): continue last_table_cols = _count_columns(result[-1]) if last_table_cols < 2: continue j = i while j < len(lines) and lines[j].strip() == "": j += 1 if j >= len(lines) or not _is_table_line(lines[j]): continue next_table_cols = _count_columns(lines[j]) if next_table_cols < 2: continue ratio = min(last_table_cols, next_table_cols) / max(last_table_cols, next_table_cols) if ratio < 0.7: continue has_new_header = False for k in range(j + 1, min(j + 3, len(lines))): if _TABLE_SEP_ROW.match(lines[k].strip()): has_new_header = True break if has_new_header: skip_to = j while skip_to < len(lines): if _TABLE_SEP_ROW.match(lines[skip_to].strip()): skip_to += 1 break skip_to += 1 i = skip_to else: i = j return "\n".join(result)