"""Post-processing functions and regex patterns for markdown cleanup."""
import re
_STANDALONE_DATE = re.compile(
r"^\s*(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),\s+"
r"(?:January|February|March|April|May|June|July|August|September|"
r"October|November|December)\s+\d{1,2},\s+\d{4}\s*$",
re.MULTILINE,
)
_STANDALONE_TIME = re.compile(r"^\s*\d{1,2}:\d{2}\s*(?:AM|PM)\s*$", re.MULTILINE)
_PAGE_FOOTER = re.compile(
r"^\s*\d{1,3}\s*\|?\s*\d{2,5}\s+\w.*(?:Rd|St|Ave|Blvd|Dr|Ln|Way|Ct)\b.*\d{5}.*$",
re.MULTILINE,
)
_STANDALONE_PAGE_NUM = re.compile(r"^\s*\d{1,3}\s*$", re.MULTILINE)
_BRANDING_FOOTER = re.compile(r"^\s*[A-Za-z][^|]{5,}\|[^|]+\|?\s*\d{1,3}\s*$", re.MULTILINE)
_SHORT_LOCATION_LINE = re.compile(r"^\s*[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*,\s*[A-Z]{2}\s*$", re.MULTILINE)
_NUMBERED_SECTION = re.compile(r"^(\d{1,2})\.\s+([A-Z][A-Z\s\-/&,]+(?:\.\s*)?)")
_EMPTY_TABLE_ROW = re.compile(r"^\|(?:\s*\|)+\s*$", re.MULTILINE)
_TRAILING_EMPTY_CELLS = re.compile(r"(?:\s*\|\s*){2,}\s*$")
_TABLE_SEP_ROW = re.compile(r"^\|[\s\-:]+(?:\|[\s\-:]+)+\|?\s*$")
_LATEX_MATHRM = re.compile(r"\$\s*\\mathrm\{([^}]*)\}\s*\$")
_LATEX_SUPERSCRIPT = re.compile(r"\$\s*\^?\{([^}]*)\}\s*\$")
_LATEX_SUBSCRIPT = re.compile(r"\$\s*_\{([^}]*)\}\s*\$")
_LATEX_PLUSMINUS = re.compile(r"\$\s*\\pm\s*([^$]*?)\s*\$")
_LATEX_INLINE = re.compile(r"\$\s*([^$]{1,60}?[\\^_{}][^$]{0,60}?)\s*\$")
_LATEX_ESCAPED_CHARS = re.compile(r"\\([%$&_#])")
_TINY_IMAGE_DIV = re.compile(r'
]*>\s*
![]()
]*width="(\d+)%"[^>]*/>\s*
', re.IGNORECASE)
def _post_process_merged_markdown(content: str) -> str:
"""Post-process merged markdown to fix extraction artifacts."""
content = _strip_latex_artifacts(content)
content = _remove_tiny_image_tags(content)
content = _deduplicate_headings(content)
content = _deduplicate_short_blocks(content)
content = _remove_page_boundary_artifacts(content)
content = _normalize_numbered_headings(content)
content = _clean_table_artifacts(content)
content = _merge_split_tables(content)
content = re.sub(r"\n{4,}", "\n\n\n", content)
return content.strip()
def _strip_latex_artifacts(content: str) -> str:
def _clean_mathrm(match: re.Match) -> str:
return match.group(1).replace("~", "").strip()
content = _LATEX_MATHRM.sub(_clean_mathrm, content)
content = _LATEX_PLUSMINUS.sub(lambda m: "±" + m.group(1), content)
content = _LATEX_SUPERSCRIPT.sub(lambda m: m.group(1), content)
content = _LATEX_SUBSCRIPT.sub(lambda m: m.group(1), content)
content = _LATEX_ESCAPED_CHARS.sub(lambda m: m.group(1), content)
content = _LATEX_INLINE.sub(lambda m: m.group(1).strip(), content)
return content
def _remove_tiny_image_tags(content: str) -> str:
def _check_size(match: re.Match) -> str:
return "" if int(match.group(1)) <= 10 else match.group(0)
return _TINY_IMAGE_DIV.sub(_check_size, content)
def _deduplicate_headings(content: str) -> str:
lines = content.split("\n")
seen_headings: set[str] = set()
result: list[str] = []
for line in lines:
stripped = line.strip()
if stripped.startswith("#"):
key = stripped.lstrip("#").strip().lower()
if key and key in seen_headings:
continue
if key:
seen_headings.add(key)
result.append(line)
return "\n".join(result)
def _deduplicate_short_blocks(content: str) -> str:
blocks = content.split("\n\n")
seen: set[str] = set()
result: list[str] = []
for block in blocks:
stripped = block.strip()
if not stripped:
result.append(block)
continue
is_table = stripped.startswith("|") and "|" in stripped[1:]
is_heading = stripped.startswith("#")
if is_table or is_heading or len(stripped) > 120:
result.append(block)
continue
key = stripped.lower()
if key in seen:
continue
seen.add(key)
result.append(block)
return "\n\n".join(result)
def _remove_page_boundary_artifacts(content: str) -> str:
content = _STANDALONE_DATE.sub("", content)
content = _STANDALONE_TIME.sub("", content)
content = _PAGE_FOOTER.sub("", content)
content = _STANDALONE_PAGE_NUM.sub("", content)
content = _remove_repeated_lines(content, _BRANDING_FOOTER, min_repeats=3)
content = _remove_repeated_lines(content, _SHORT_LOCATION_LINE, min_repeats=3)
return content
def _remove_repeated_lines(content: str, pattern: re.Pattern, min_repeats: int = 3) -> str:
counts: dict[str, int] = {}
for match in pattern.finditer(content):
key = match.group(0).strip().lower()
counts[key] = counts.get(key, 0) + 1
repeated = {k for k, v in counts.items() if v >= min_repeats}
if not repeated:
return content
result: list[str] = []
for line in content.split("\n"):
if line.strip().lower() in repeated:
continue
result.append(line)
return "\n".join(result)
def _normalize_numbered_headings(content: str) -> str:
lines = content.split("\n")
result: list[str] = []
sections_with_heading: set[int] = set()
sections_without_heading: set[int] = set()
for line in lines:
stripped = line.strip()
heading_match = re.match(r"^#{1,3}\s+(\d{1,2})\.\s+[A-Z]", stripped)
if heading_match:
sections_with_heading.add(int(heading_match.group(1)))
continue
plain_match = _NUMBERED_SECTION.match(stripped)
if plain_match:
sections_without_heading.add(int(plain_match.group(1)))
if sections_with_heading and sections_without_heading:
for line in lines:
stripped = line.strip()
plain_match = _NUMBERED_SECTION.match(stripped)
if plain_match:
title_end = plain_match.end()
title = stripped[:title_end].rstrip(".")
body = stripped[title_end:].strip()
result.append(f"## {title}")
if body:
result.append(body)
continue
result.append(line)
return "\n".join(result)
return content
def _clean_table_artifacts(content: str) -> str:
lines = content.split("\n")
result: list[str] = []
for line in lines:
stripped = line.strip()
if _EMPTY_TABLE_ROW.match(stripped):
continue
if stripped.startswith("|") and "|" in stripped[1:] and not _TABLE_SEP_ROW.match(stripped):
result.append(_TRAILING_EMPTY_CELLS.sub(" |", stripped))
continue
result.append(line)
return "\n".join(result)
def _is_table_line(line: str) -> bool:
s = line.strip()
return bool(s.startswith("|") and s.endswith("|") and s.count("|") >= 3)
def _count_columns(line: str) -> int:
s = line.strip()
if not s.startswith("|"):
return 0
return max(0, len(s.split("|")) - 2)
def _merge_split_tables(content: str) -> str:
lines = content.split("\n")
result: list[str] = []
i = 0
while i < len(lines):
result.append(lines[i])
i += 1
if not _is_table_line(result[-1]):
continue
last_table_cols = _count_columns(result[-1])
if last_table_cols < 2:
continue
j = i
while j < len(lines) and lines[j].strip() == "":
j += 1
if j >= len(lines) or not _is_table_line(lines[j]):
continue
next_table_cols = _count_columns(lines[j])
if next_table_cols < 2:
continue
ratio = min(last_table_cols, next_table_cols) / max(last_table_cols, next_table_cols)
if ratio < 0.7:
continue
has_new_header = False
for k in range(j + 1, min(j + 3, len(lines))):
if _TABLE_SEP_ROW.match(lines[k].strip()):
has_new_header = True
break
if has_new_header:
skip_to = j
while skip_to < len(lines):
if _TABLE_SEP_ROW.match(lines[skip_to].strip()):
skip_to += 1
break
skip_to += 1
i = skip_to
else:
i = j
return "\n".join(result)