| import re |
| from collections import Counter |
| from pathlib import Path |
|
|
| _ARXIV_STAMP = re.compile(r"arXiv:\d{4}\.\d{4,5}", re.I) |
|
|
| |
| |
| |
| _PICTURE_TEXT_MARKER = re.compile( |
| r"-{3,}\s*(start|begin|end)\s+of\s+picture\s+text\s*-{3,}", re.I |
| ) |
| _PICTURE_TEXT_START = re.compile( |
| r"-{3,}\s*(start|begin)\s+of\s+picture\s+text\s*-{3,}", re.I |
| ) |
| _PICTURE_TEXT_END = re.compile( |
| r"-{3,}\s*end\s+of\s+picture\s+text\s*-{3,}", re.I |
| ) |
|
|
|
|
| def strip_picture_text(lines: list[str]) -> list[str]: |
| """Remove lines between (and including) picture-text markers. |
| |
| Handles: |
| - Multi-line blocks: start marker on one line, end on another. |
| - Inline end markers: content and end marker on the same line (with <br>). |
| - Both 'Start/End' (pymupdf4llm 1.27.x) and 'Begin/End' variants. |
| """ |
| result: list[str] = [] |
| in_block = False |
| for line in lines: |
| if in_block: |
| |
| if _PICTURE_TEXT_END.search(line): |
| in_block = False |
| |
| continue |
| |
| if _PICTURE_TEXT_START.search(line): |
| in_block = True |
| |
| if _PICTURE_TEXT_END.search(line): |
| in_block = False |
| continue |
| result.append(line) |
| return result |
|
|
|
|
| |
| |
| |
| _SUPERSCRIPTS = str.maketrans("0123456789", "⁰¹²³⁴⁵⁶⁷⁸⁹") |
|
|
|
|
| def demarkdown(text: str) -> str: |
| """Strip markdown emphasis markers and repair math notation debris. |
| |
| Applied to paragraph text and figure captions. Does NOT touch headings or |
| pattern-filter lines (those run on raw blocks before this is called). |
| |
| Steps (order matters): |
| 1. Strip bold ``**...**`` |
| 2. Strip italic/emphasis ``_..._`` |
| 3. Strip backticks |
| 4. Repair digit-dot-digit spacing left by ``_._`` stripping: ``5 . 7`` → ``5.7`` |
| 5. Repair digit-×-digit spacing left by ``_×_`` stripping: ``5 × 7`` → ``5×7`` |
| 6. Convert bracketed exponents after digit: ``10[5]`` → ``10⁵`` |
| """ |
| |
| t = re.sub(r"\*\*(.+?)\*\*", r"\1", text) |
| |
| t = re.sub(r"_(.+?)_", r"\1", t) |
| |
| t = t.replace("`", "") |
| |
| t = re.sub(r"(\d) \. (\d)", r"\1.\2", t) |
| |
| t = re.sub(r"(\d) × (\d)", r"\1×\2", t) |
| |
| t = re.sub( |
| r"(?<=\d)\[(\d+)\]", |
| lambda m: m.group(1).translate(_SUPERSCRIPTS), |
| t, |
| ) |
| |
| t = re.sub( |
| r"(?<=\d)\[-(\d+)\]", |
| lambda m: "⁻" + m.group(1).translate(_SUPERSCRIPTS), |
| t, |
| ) |
| |
| t = re.sub(r"\(cid:\d+\)", "", t) |
| return t |
|
|
|
|
| |
| |
| |
| _OPEN_FUNCTION_WORDS = frozenset({ |
| |
| 'and', 'or', 'but', 'nor', 'yet', 'so', |
| |
| 'the', 'a', 'an', |
| |
| 'in', 'on', 'at', 'of', 'to', 'for', 'with', 'by', 'from', 'as', 'into', |
| 'that', 'which', 'who', 'whose', 'where', 'when', |
| 'between', 'among', 'including', 'such', 'both', 'either', |
| 'than', 'then', 'also', 'only', 'even', 'about', 'over', 'under', |
| }) |
|
|
|
|
| def _prev_para_is_open(text: str) -> bool: |
| """Return True if the paragraph text ends mid-sentence. |
| |
| Uses a word-level heuristic: the last alphabetic token is a function word |
| (conjunction, preposition, article) that cannot end a sentence. Also |
| rejects blocks that end with a digit (line-number artifact) or a hyphen |
| (word-break artifact), both common in 2-column paper extractions. |
| """ |
| stripped = text.rstrip() |
| |
| if re.search(r'\d\s*$', stripped): |
| return False |
| |
| if stripped.endswith('-'): |
| return False |
| last_words = re.findall(r'[a-zA-Z]+', stripped) |
| last_word = last_words[-1].lower() if last_words else '' |
| return last_word in _OPEN_FUNCTION_WORDS |
|
|
|
|
| |
| |
| |
| _MAX_PAGE_STREAM_BYTES = 8_000_000 |
|
|
|
|
| def _pathological_pages(doc) -> set[int]: |
| """Return indices of pages whose raw content stream is enormous. |
| |
| Such pages (e.g. JWST images drawn as 563 k vector paths) cause |
| pymupdf4llm's layout/table analysis to run for 60+ seconds regardless of |
| ``graphics_limit``/``table_strategy``. Plain ``page.get_text()`` on the |
| same pages takes < 1 s and recovers all text. Detected by content-stream |
| size (threshold: _MAX_PAGE_STREAM_BYTES). |
| """ |
| out: set[int] = set() |
| for i, page in enumerate(doc): |
| try: |
| if len(page.read_contents()) > _MAX_PAGE_STREAM_BYTES: |
| out.add(i) |
| except Exception: |
| pass |
| return out |
|
|
|
|
| _DOI_STAMP = re.compile(r"^(doi:|https?://doi\.org)", re.I) |
| _PAGE_NUM = re.compile(r"^[–—\-\s]*\d+[–—\-\s]*$") |
| |
| |
| |
| _RUNNING_HEADER = re.compile(r"^[A-Z][A-Z\s\.\,\-\–\—\(\)\[\]\:\;\'\"]{4,}$") |
|
|
| _MIN_PARA = 200 |
| _HEADING = re.compile(r"^#{1,4}\s+(.*)") |
| _CAPTION = re.compile(r"^(figure|fig\.|table)\s*\d+\s*[.:|]", re.I) |
| _PICTURE = re.compile(r"^==>|^\*\*==>") |
| _TABLE_ROW = re.compile(r"^\|") |
| _REFERENCES = re.compile(r"^(references|bibliography|acknowledg)", re.I) |
| _AFFILIATION = re.compile(r"^>\s*\d+\s") |
| _LIST_HEADING = re.compile( |
| r"^[-*]\s+(?:[_*]{0,2})(\d+(?:\.\d+)+\.?\s+\S[^\n]{0,100})(?:[_*]{0,2})$" |
| ) |
| _HEADING_JUNK = re.compile( |
| r"^\d{1,4}\s+\S.*\bet\s+al\.?\s*$" |
| r"|^(Received|Accepted|Submitted|Published)\b", |
| re.I, |
| ) |
|
|
| |
| _AUTHOR_BLOCK_MAX_PROSE = 0.25 |
| _AUTHOR_BLOCK_MIN_COMMAS = 3 |
| _AUTHOR_BLOCK_FRONT_LIMIT = 3 |
|
|
|
|
| def _norm(line: str) -> str: |
| return re.sub(r"\d+", "#", line.strip().lower()) |
|
|
|
|
| def _prose_fraction(text: str) -> float: |
| """Return fraction of alphabetic tokens that are lowercase-only, ≥3 chars. |
| |
| Real prose runs ~0.6–0.8; author name lists ~0.0–0.1; affiliation lists |
| ~0.1–0.2. Strip markdown emphasis, bracket groups, and digits first. |
| """ |
| cleaned = re.sub(r"\[[^\]]*\]", "", text) |
| cleaned = re.sub(r"[_*]", "", cleaned) |
| cleaned = re.sub(r"\d+", "", cleaned) |
| tokens = re.findall(r"[a-zA-Z]+", cleaned) |
| if not tokens: |
| return 0.0 |
| prose_tokens = [t for t in tokens if t == t.lower() and len(t) >= 3] |
| return len(prose_tokens) / len(tokens) |
|
|
|
|
| def _is_author_affiliation_block(text: str) -> bool: |
| """Return True if the block looks like an author/affiliation block. |
| |
| Criteria (applied only during front-matter scanning): |
| - Contains at least _AUTHOR_BLOCK_MIN_COMMAS commas, AND |
| - Prose fraction is below _AUTHOR_BLOCK_MAX_PROSE. |
| """ |
| return ( |
| text.count(",") >= _AUTHOR_BLOCK_MIN_COMMAS |
| and _prose_fraction(text) < _AUTHOR_BLOCK_MAX_PROSE |
| ) |
|
|
|
|
| def _clean_authors(raw: str) -> str: |
| """Return a cleaned, truncated author string. |
| |
| Strips bracket groups, parenthetical groups, and markdown emphasis; |
| splits on commas; keeps tokens with ≥2 letters; joins first 3 with ', ' |
| and appends ' et al.' when more than 3 remain. |
| """ |
| cleaned = re.sub(r"\[[^\]]*\]", "", raw) |
| cleaned = re.sub(r"\([^)]*\)", "", cleaned) |
| cleaned = re.sub(r"[_*]", "", cleaned) |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() |
| parts = [p.strip() for p in cleaned.split(",")] |
| |
| authors = [p for p in parts if len(re.findall(r"[a-zA-Z]", p)) >= 2] |
| if len(authors) > 3: |
| return ", ".join(authors[:3]) + " et al." |
| return ", ".join(authors) |
|
|
|
|
| def remove_repeating_lines(pages: list[list[str]]) -> list[list[str]]: |
| """Drop lines whose digit-normalized form appears on >= half the pages.""" |
| counts = Counter() |
| for page in pages: |
| for n in {_norm(l) for l in page if l.strip()}: |
| counts[n] += 1 |
| threshold = max(2, len(pages) // 2) |
| repeating = {n for n, c in counts.items() if c >= threshold} |
| return [[l for l in page if _norm(l) not in repeating] for page in pages] |
|
|
|
|
| def repair_hyphenation(text: str) -> str: |
| return re.sub(r"(\w)-\n(\w)", r"\1\2", text) |
|
|
|
|
| def strip_page_artifacts(lines: list[str]) -> list[str]: |
| return [ |
| l for l in lines |
| if not l.strip() |
| or (not _PAGE_NUM.match(l.strip()) |
| and not _ARXIV_STAMP.search(l) |
| and not _DOI_STAMP.match(l.strip()) |
| and not _RUNNING_HEADER.match(l.strip())) |
| ] |
|
|
|
|
| def segment_markdown(md: str) -> tuple[list[dict], str]: |
| """Segment markdown into paragraphs and capture raw references text. |
| |
| Returns (paras, raw_refs) where raw_refs is the text of the References |
| section (empty string if absent), consumed by the reference parser. |
| """ |
| paras: list[dict] = [] |
| section = "" |
| first_pending = True |
| pending: list[str] = [] |
| pending_headings: list[str] = [] |
| pending_heading_levels: list[int] = [] |
| raw_refs = "" |
| _state = "normal" |
|
|
| def _flush_pending_as_para(): |
| nonlocal first_pending |
| if not pending: |
| return |
| text = " ".join(pending) |
| pending.clear() |
| headings_snap = list(pending_headings) |
| pending_headings.clear() |
| pending_heading_levels.clear() |
| if len(text) >= _MIN_PARA: |
| paras.append({ |
| "section": section, |
| "firstOfSection": first_pending, |
| "text": text, |
| "headings": headings_snap, |
| }) |
| first_pending = False |
|
|
| for block in re.split(r"\n\s*\n", md): |
| block = repair_hyphenation(block).strip() |
| if not block: |
| continue |
|
|
| m = _HEADING.match(block) |
| lm = _LIST_HEADING.match(block) if not m else None |
| if m or lm: |
| name_raw = m.group(1).strip().strip("*") if m else lm.group(1).strip().strip("*") |
| name = demarkdown(name_raw) |
| |
| hlevel = len(re.match(r"^(#+)", block).group(1)) if m else 3 |
|
|
| if _HEADING_JUNK.match(name): |
| continue |
|
|
| _flush_pending_as_para() |
|
|
| |
| |
| if not pending: |
| |
| keep_up_to = next( |
| (i for i, lvl in enumerate(pending_heading_levels) if lvl >= hlevel), |
| len(pending_heading_levels), |
| ) |
| del pending_headings[keep_up_to:] |
| del pending_heading_levels[keep_up_to:] |
|
|
| if _REFERENCES.match(name): |
| _state = ( |
| "refs_capture" |
| if re.match(r"^(references|bibliography)", name, re.I) |
| else "refs_skip" |
| ) |
| section = name |
| first_pending = True |
| continue |
|
|
| if _state in ("refs_capture", "refs_skip"): |
| _state = "normal" |
|
|
| section = name |
| first_pending = True |
| pending_headings.append(name) |
| pending_heading_levels.append(hlevel) |
| continue |
|
|
| if _state == "refs_capture": |
| raw_refs += block + "\n\n" |
| continue |
| if _state == "refs_skip": |
| continue |
|
|
| filtered_lines = strip_picture_text([ |
| l for l in block.splitlines() if not _AFFILIATION.match(l) |
| ]) |
| block = "\n".join(filtered_lines).strip() |
| if not block: |
| continue |
|
|
| |
| text = demarkdown(re.sub(r"\s+", " ", block)) |
|
|
| if _CAPTION.match(text) or _PICTURE.match(text) or _TABLE_ROW.match(text): |
| continue |
| if len(paras) < _AUTHOR_BLOCK_FRONT_LIMIT and _is_author_affiliation_block(text): |
| continue |
|
|
| |
| if pending: |
| text = " ".join(pending) + " " + text |
| pending.clear() |
|
|
| starts_lower = text[:1].islower() |
| prev_open = bool(paras and _prev_para_is_open(paras[-1]["text"])) |
| should_merge = starts_lower or len(text) < _MIN_PARA or prev_open |
|
|
| if paras and should_merge and paras[-1]["section"] == section: |
| paras[-1]["text"] += " " + text |
| elif len(text) >= _MIN_PARA: |
| headings_chain = list(pending_headings) |
| pending_headings.clear() |
| pending_heading_levels.clear() |
| paras.append({ |
| "section": section, |
| "firstOfSection": first_pending, |
| "text": text, |
| "headings": headings_chain, |
| }) |
| first_pending = False |
| else: |
| |
| pending.append(text) |
|
|
| _flush_pending_as_para() |
|
|
| |
| for i, p in enumerate(paras): |
| p["id"] = f"p{i + 1}" |
|
|
| return paras, raw_refs |
|
|
|
|
| def _fallback_chunk(page) -> dict: |
| """Build a substitute chunk for a pathological page using plain text extraction. |
| |
| Uses ``page.get_text("blocks")`` to preserve paragraph boundaries: |
| text blocks (block type 0) are joined with double newlines so that the |
| downstream ``segment_markdown`` call sees intact paragraph structure. |
| """ |
| blocks = page.get_text("blocks") |
| text_parts = [b[4] for b in blocks if b[6] == 0 and b[4].strip()] |
| return {"text": "\n\n".join(text_parts)} |
|
|
|
|
| def parse_text(raw: bytes, file_name: str) -> tuple["Paper", str]: |
| from .schemas import Paper, Paragraph |
|
|
| text = raw.decode("utf-8", errors="replace") |
| title = Path(file_name).stem |
| for line in text.splitlines(): |
| m = _HEADING.match(line.strip()) |
| if m: |
| title = m.group(1).strip().strip("*") |
| break |
|
|
| arxiv = "" |
| arxiv_from_name = re.search(r"(\d{4}\.\d{4,5})", file_name) |
| if arxiv_from_name: |
| arxiv = arxiv_from_name.group(1) |
|
|
| paras, raw_refs = segment_markdown(text) |
|
|
| return Paper( |
| title=title, |
| authors="", |
| arxivId=arxiv, |
| pages=0, |
| paragraphs=[Paragraph(**p) for p in paras], |
| ), raw_refs |
|
|
|
|
| def parse_pdf(pdf_bytes: bytes, file_name: str) -> tuple["Paper", str]: |
| import pymupdf4llm |
| import pymupdf |
| from .schemas import Paper, Paragraph |
|
|
| doc = pymupdf.open(stream=pdf_bytes, filetype="pdf") |
| n_pages = doc.page_count |
|
|
| bad = _pathological_pages(doc) |
|
|
| if bad: |
| |
| normal_pages = [i for i in range(n_pages) if i not in bad] |
| md_chunks: list[dict] = pymupdf4llm.to_markdown( |
| doc, page_chunks=True, pages=normal_pages, force_text=False |
| ) |
| |
| |
| normal_iter = iter(md_chunks) |
| chunks: list[dict] = [] |
| for i in range(n_pages): |
| if i in bad: |
| chunks.append(_fallback_chunk(doc[i])) |
| else: |
| chunks.append(next(normal_iter)) |
| assert len(chunks) == n_pages, ( |
| f"Chunk count mismatch: {len(chunks)} != {n_pages}" |
| ) |
| else: |
| chunks = pymupdf4llm.to_markdown(doc, page_chunks=True, force_text=False) |
|
|
| pages_lines = [c["text"].splitlines() for c in chunks] |
| pages_lines = remove_repeating_lines(pages_lines) |
| pages_lines = [strip_page_artifacts(p) for p in pages_lines] |
| |
| pages_lines = [strip_picture_text(p) for p in pages_lines] |
| md = "\n".join("\n".join(p) for p in pages_lines) |
| paras, raw_refs = segment_markdown(md) |
|
|
| |
| title, authors = file_name, "" |
| lines = [l for l in pages_lines[0] if l.strip()] |
| for i, l in enumerate(lines): |
| m = _HEADING.match(l) |
| if m: |
| title = m.group(1).strip().strip("*") |
| for nxt in lines[i + 1:]: |
| if not _HEADING.match(nxt): |
| raw_authors = re.sub(r"[*#]", "", nxt).strip() |
| authors = _clean_authors(raw_authors) |
| break |
| break |
|
|
| |
| arxiv = "" |
| arxiv_from_name = re.search(r"(\d{4}\.\d{4,5})", file_name) |
| if arxiv_from_name: |
| arxiv = arxiv_from_name.group(1) |
| else: |
| |
| m = _ARXIV_STAMP.search(chunks[0]["text"]) |
| if m: |
| arxiv = m.group(0).split(":")[1] |
|
|
| return Paper( |
| title=title, authors=authors, arxivId=arxiv, pages=n_pages, |
| paragraphs=[Paragraph(**p) for p in paras], |
| ), raw_refs |
|
|