import re from collections import Counter from pathlib import Path _ARXIV_STAMP = re.compile(r"arXiv:\d{4}\.\d{4,5}", re.I) # --------------------------------------------------------------------------- # Picture-text block filtering (pymupdf4llm 1.27.x emits these) # --------------------------------------------------------------------------- _PICTURE_TEXT_MARKER = re.compile( r"-{3,}\s*(start|begin|end)\s+of\s+picture\s+text\s*-{3,}", re.I ) _PICTURE_TEXT_START = re.compile( r"-{3,}\s*(start|begin)\s+of\s+picture\s+text\s*-{3,}", re.I ) _PICTURE_TEXT_END = re.compile( r"-{3,}\s*end\s+of\s+picture\s+text\s*-{3,}", re.I ) def strip_picture_text(lines: list[str]) -> list[str]: """Remove lines between (and including) picture-text markers. Handles: - Multi-line blocks: start marker on one line, end on another. - Inline end markers: content and end marker on the same line (with
). - Both 'Start/End' (pymupdf4llm 1.27.x) and 'Begin/End' variants. """ result: list[str] = [] in_block = False for line in lines: if in_block: # Check if this line contains an end marker (possibly with content before it) if _PICTURE_TEXT_END.search(line): in_block = False # Either way, drop the line (it's inside a picture block) continue # Check if this line starts a picture block if _PICTURE_TEXT_START.search(line): in_block = True # If the end marker is ALSO on this same line (rare but possible) if _PICTURE_TEXT_END.search(line): in_block = False continue result.append(line) return result # --------------------------------------------------------------------------- # De-markdown: strip emphasis/backtick and repair math notation # --------------------------------------------------------------------------- _SUPERSCRIPTS = str.maketrans("0123456789", "⁰¹²³⁴⁵⁶⁷⁸⁹") def demarkdown(text: str) -> str: """Strip markdown emphasis markers and repair math notation debris. Applied to paragraph text and figure captions. Does NOT touch headings or pattern-filter lines (those run on raw blocks before this is called). Steps (order matters): 1. Strip bold ``**...**`` 2. Strip italic/emphasis ``_..._`` 3. Strip backticks 4. Repair digit-dot-digit spacing left by ``_._`` stripping: ``5 . 7`` → ``5.7`` 5. Repair digit-×-digit spacing left by ``_×_`` stripping: ``5 × 7`` → ``5×7`` 6. Convert bracketed exponents after digit: ``10[5]`` → ``10⁵`` """ # 1. Strip bold t = re.sub(r"\*\*(.+?)\*\*", r"\1", text) # 2. Strip emphasis (non-greedy; multiple passes handle nested/adjacent) t = re.sub(r"_(.+?)_", r"\1", t) # 3. Strip backticks t = t.replace("`", "") # 4. Repair spacing around decimal points (artifact of stripping _._) t = re.sub(r"(\d) \. (\d)", r"\1.\2", t) # 5. Repair spacing around × symbol (artifact of stripping _×_) t = re.sub(r"(\d) × (\d)", r"\1×\2", t) # 6. Bracketed exponents after a digit: 10[5] → 10⁵, 10[11] → 10¹¹ t = re.sub( r"(?<=\d)\[(\d+)\]", lambda m: m.group(1).translate(_SUPERSCRIPTS), t, ) # 7. Negative bracketed exponents: 10[-5] → 10⁻⁵ t = re.sub( r"(?<=\d)\[-(\d+)\]", lambda m: "⁻" + m.group(1).translate(_SUPERSCRIPTS), t, ) # 8. Strip (cid:N) font-encoding debris t = re.sub(r"\(cid:\d+\)", "", t) return t # --------------------------------------------------------------------------- # Open-sentence detection: words that strongly imply an incomplete sentence # --------------------------------------------------------------------------- _OPEN_FUNCTION_WORDS = frozenset({ # Coordinating conjunctions 'and', 'or', 'but', 'nor', 'yet', 'so', # Articles 'the', 'a', 'an', # Prepositions and subordinating conjunctions 'in', 'on', 'at', 'of', 'to', 'for', 'with', 'by', 'from', 'as', 'into', 'that', 'which', 'who', 'whose', 'where', 'when', 'between', 'among', 'including', 'such', 'both', 'either', 'than', 'then', 'also', 'only', 'even', 'about', 'over', 'under', }) def _prev_para_is_open(text: str) -> bool: """Return True if the paragraph text ends mid-sentence. Uses a word-level heuristic: the last alphabetic token is a function word (conjunction, preposition, article) that cannot end a sentence. Also rejects blocks that end with a digit (line-number artifact) or a hyphen (word-break artifact), both common in 2-column paper extractions. """ stripped = text.rstrip() # Digit at end → line number artifact, not a real sentence break if re.search(r'\d\s*$', stripped): return False # Hyphen at end → word-break artifact if stripped.endswith('-'): return False last_words = re.findall(r'[a-zA-Z]+', stripped) last_word = last_words[-1].lower() if last_words else '' return last_word in _OPEN_FUNCTION_WORDS # Pages whose raw content stream exceeds this size (bytes) are "pathological": # pymupdf4llm's layout analysis takes minutes on them (e.g. JWST vector-drawing # plates with 500k+ path elements), while plain get_text() stays fast. _MAX_PAGE_STREAM_BYTES = 8_000_000 def _pathological_pages(doc) -> set[int]: """Return indices of pages whose raw content stream is enormous. Such pages (e.g. JWST images drawn as 563 k vector paths) cause pymupdf4llm's layout/table analysis to run for 60+ seconds regardless of ``graphics_limit``/``table_strategy``. Plain ``page.get_text()`` on the same pages takes < 1 s and recovers all text. Detected by content-stream size (threshold: _MAX_PAGE_STREAM_BYTES). """ out: set[int] = set() for i, page in enumerate(doc): try: if len(page.read_contents()) > _MAX_PAGE_STREAM_BYTES: out.add(i) except Exception: # noqa: BLE001 pass return out _DOI_STAMP = re.compile(r"^(doi:|https?://doi\.org)", re.I) _PAGE_NUM = re.compile(r"^[–—\-\s]*\d+[–—\-\s]*$") # Running journal headers: all-uppercase, ≥5 chars, no lowercase letters. # Catches things like "RENAUD ET AL.", "STARBURSTS HIDING IN THE MAIN SEQUENCE" # that survive remove_repeating_lines when they vary slightly across pages. _RUNNING_HEADER = re.compile(r"^[A-Z][A-Z\s\.\,\-\–\—\(\)\[\]\:\;\'\"]{4,}$") _MIN_PARA = 200 _HEADING = re.compile(r"^#{1,4}\s+(.*)") _CAPTION = re.compile(r"^(figure|fig\.|table)\s*\d+\s*[.:|]", re.I) _PICTURE = re.compile(r"^==>|^\*\*==>") _TABLE_ROW = re.compile(r"^\|") _REFERENCES = re.compile(r"^(references|bibliography|acknowledg)", re.I) _AFFILIATION = re.compile(r"^>\s*\d+\s") _LIST_HEADING = re.compile( r"^[-*]\s+(?:[_*]{0,2})(\d+(?:\.\d+)+\.?\s+\S[^\n]{0,100})(?:[_*]{0,2})$" ) _HEADING_JUNK = re.compile( r"^\d{1,4}\s+\S.*\bet\s+al\.?\s*$" r"|^(Received|Accepted|Submitted|Published)\b", re.I, ) # Author/affiliation block detector thresholds _AUTHOR_BLOCK_MAX_PROSE = 0.25 # prose fraction below this → not real prose _AUTHOR_BLOCK_MIN_COMMAS = 3 # must have at least this many commas _AUTHOR_BLOCK_FRONT_LIMIT = 3 # only apply detector to first N accepted paragraphs def _norm(line: str) -> str: return re.sub(r"\d+", "#", line.strip().lower()) def _prose_fraction(text: str) -> float: """Return fraction of alphabetic tokens that are lowercase-only, ≥3 chars. Real prose runs ~0.6–0.8; author name lists ~0.0–0.1; affiliation lists ~0.1–0.2. Strip markdown emphasis, bracket groups, and digits first. """ cleaned = re.sub(r"\[[^\]]*\]", "", text) # drop [...] cleaned = re.sub(r"[_*]", "", cleaned) # drop emphasis chars cleaned = re.sub(r"\d+", "", cleaned) # drop digits tokens = re.findall(r"[a-zA-Z]+", cleaned) if not tokens: return 0.0 prose_tokens = [t for t in tokens if t == t.lower() and len(t) >= 3] return len(prose_tokens) / len(tokens) def _is_author_affiliation_block(text: str) -> bool: """Return True if the block looks like an author/affiliation block. Criteria (applied only during front-matter scanning): - Contains at least _AUTHOR_BLOCK_MIN_COMMAS commas, AND - Prose fraction is below _AUTHOR_BLOCK_MAX_PROSE. """ return ( text.count(",") >= _AUTHOR_BLOCK_MIN_COMMAS and _prose_fraction(text) < _AUTHOR_BLOCK_MAX_PROSE ) def _clean_authors(raw: str) -> str: """Return a cleaned, truncated author string. Strips bracket groups, parenthetical groups, and markdown emphasis; splits on commas; keeps tokens with ≥2 letters; joins first 3 with ', ' and appends ' et al.' when more than 3 remain. """ cleaned = re.sub(r"\[[^\]]*\]", "", raw) # drop [...] cleaned = re.sub(r"\([^)]*\)", "", cleaned) # drop (...) cleaned = re.sub(r"[_*]", "", cleaned) # drop emphasis chars cleaned = re.sub(r"\s+", " ", cleaned).strip() parts = [p.strip() for p in cleaned.split(",")] # Keep only tokens that contain at least 2 letters authors = [p for p in parts if len(re.findall(r"[a-zA-Z]", p)) >= 2] if len(authors) > 3: return ", ".join(authors[:3]) + " et al." return ", ".join(authors) def remove_repeating_lines(pages: list[list[str]]) -> list[list[str]]: """Drop lines whose digit-normalized form appears on >= half the pages.""" counts = Counter() for page in pages: for n in {_norm(l) for l in page if l.strip()}: counts[n] += 1 threshold = max(2, len(pages) // 2) repeating = {n for n, c in counts.items() if c >= threshold} return [[l for l in page if _norm(l) not in repeating] for page in pages] def repair_hyphenation(text: str) -> str: return re.sub(r"(\w)-\n(\w)", r"\1\2", text) def strip_page_artifacts(lines: list[str]) -> list[str]: return [ l for l in lines if not l.strip() or (not _PAGE_NUM.match(l.strip()) and not _ARXIV_STAMP.search(l) and not _DOI_STAMP.match(l.strip()) and not _RUNNING_HEADER.match(l.strip())) ] def segment_markdown(md: str) -> tuple[list[dict], str]: """Segment markdown into paragraphs and capture raw references text. Returns (paras, raw_refs) where raw_refs is the text of the References section (empty string if absent), consumed by the reference parser. """ paras: list[dict] = [] section = "" first_pending = True pending: list[str] = [] # short blocks buffered as section-head (fix 0.1) pending_headings: list[str] = [] # heading names since last paragraph (fix 0.3) pending_heading_levels: list[int] = [] # parallel list of heading levels (fix 0.3b) raw_refs = "" _state = "normal" # "normal" | "refs_capture" | "refs_skip" def _flush_pending_as_para(): nonlocal first_pending if not pending: return text = " ".join(pending) pending.clear() headings_snap = list(pending_headings) pending_headings.clear() pending_heading_levels.clear() if len(text) >= _MIN_PARA: paras.append({ "section": section, "firstOfSection": first_pending, "text": text, "headings": headings_snap, }) first_pending = False for block in re.split(r"\n\s*\n", md): block = repair_hyphenation(block).strip() if not block: continue m = _HEADING.match(block) lm = _LIST_HEADING.match(block) if not m else None if m or lm: name_raw = m.group(1).strip().strip("*") if m else lm.group(1).strip().strip("*") name = demarkdown(name_raw) # Heading level: count leading '#' for _HEADING; treat _LIST_HEADING as level 3 hlevel = len(re.match(r"^(#+)", block).group(1)) if m else 3 if _HEADING_JUNK.match(name): continue _flush_pending_as_para() # Prune orphaned sibling/ancestor headings that accumulated with no prose. # Keep parent headings (lower level number) so consecutive sub-headings chain. if not pending: # Remove any pending_headings at same or deeper level than current heading keep_up_to = next( (i for i, lvl in enumerate(pending_heading_levels) if lvl >= hlevel), len(pending_heading_levels), ) del pending_headings[keep_up_to:] del pending_heading_levels[keep_up_to:] if _REFERENCES.match(name): _state = ( "refs_capture" if re.match(r"^(references|bibliography)", name, re.I) else "refs_skip" ) section = name first_pending = True continue if _state in ("refs_capture", "refs_skip"): _state = "normal" section = name first_pending = True pending_headings.append(name) pending_heading_levels.append(hlevel) continue if _state == "refs_capture": raw_refs += block + "\n\n" continue if _state == "refs_skip": continue filtered_lines = strip_picture_text([ l for l in block.splitlines() if not _AFFILIATION.match(l) ]) block = "\n".join(filtered_lines).strip() if not block: continue # Fix 0.6: demarkdown first, measure once on cleaned text text = demarkdown(re.sub(r"\s+", " ", block)) if _CAPTION.match(text) or _PICTURE.match(text) or _TABLE_ROW.match(text): continue if len(paras) < _AUTHOR_BLOCK_FRONT_LIMIT and _is_author_affiliation_block(text): continue # Prepend pending buffer if pending: text = " ".join(pending) + " " + text pending.clear() starts_lower = text[:1].islower() prev_open = bool(paras and _prev_para_is_open(paras[-1]["text"])) should_merge = starts_lower or len(text) < _MIN_PARA or prev_open if paras and should_merge and paras[-1]["section"] == section: paras[-1]["text"] += " " + text elif len(text) >= _MIN_PARA: headings_chain = list(pending_headings) pending_headings.clear() pending_heading_levels.clear() paras.append({ "section": section, "firstOfSection": first_pending, "text": text, "headings": headings_chain, }) first_pending = False else: # Fix 0.1: buffer short non-mergeable block instead of dropping pending.append(text) _flush_pending_as_para() # Fix 0.6: removed final re-filter (length already measured post-demarkdown) for i, p in enumerate(paras): p["id"] = f"p{i + 1}" return paras, raw_refs def _fallback_chunk(page) -> dict: """Build a substitute chunk for a pathological page using plain text extraction. Uses ``page.get_text("blocks")`` to preserve paragraph boundaries: text blocks (block type 0) are joined with double newlines so that the downstream ``segment_markdown`` call sees intact paragraph structure. """ blocks = page.get_text("blocks") text_parts = [b[4] for b in blocks if b[6] == 0 and b[4].strip()] return {"text": "\n\n".join(text_parts)} def parse_text(raw: bytes, file_name: str) -> tuple["Paper", str]: from .schemas import Paper, Paragraph text = raw.decode("utf-8", errors="replace") title = Path(file_name).stem for line in text.splitlines(): m = _HEADING.match(line.strip()) if m: title = m.group(1).strip().strip("*") break arxiv = "" arxiv_from_name = re.search(r"(\d{4}\.\d{4,5})", file_name) if arxiv_from_name: arxiv = arxiv_from_name.group(1) paras, raw_refs = segment_markdown(text) return Paper( title=title, authors="", arxivId=arxiv, pages=0, paragraphs=[Paragraph(**p) for p in paras], ), raw_refs def parse_pdf(pdf_bytes: bytes, file_name: str) -> tuple["Paper", str]: import pymupdf4llm import pymupdf from .schemas import Paper, Paragraph doc = pymupdf.open(stream=pdf_bytes, filetype="pdf") n_pages = doc.page_count bad = _pathological_pages(doc) if bad: # Request only the normal pages from pymupdf4llm normal_pages = [i for i in range(n_pages) if i not in bad] md_chunks: list[dict] = pymupdf4llm.to_markdown( doc, page_chunks=True, pages=normal_pages, force_text=False ) # pymupdf4llm returns chunks only for requested pages, in order — # interleave fallback chunks for bad pages to restore document order. normal_iter = iter(md_chunks) chunks: list[dict] = [] for i in range(n_pages): if i in bad: chunks.append(_fallback_chunk(doc[i])) else: chunks.append(next(normal_iter)) assert len(chunks) == n_pages, ( f"Chunk count mismatch: {len(chunks)} != {n_pages}" ) else: chunks = pymupdf4llm.to_markdown(doc, page_chunks=True, force_text=False) pages_lines = [c["text"].splitlines() for c in chunks] pages_lines = remove_repeating_lines(pages_lines) pages_lines = [strip_page_artifacts(p) for p in pages_lines] # Belt-and-suspenders: strip any remaining picture-text marker lines pages_lines = [strip_picture_text(p) for p in pages_lines] md = "\n".join("\n".join(p) for p in pages_lines) paras, raw_refs = segment_markdown(md) # Title: first markdown heading in the document; authors: first non-heading line after it. title, authors = file_name, "" lines = [l for l in pages_lines[0] if l.strip()] for i, l in enumerate(lines): m = _HEADING.match(l) if m: title = m.group(1).strip().strip("*") for nxt in lines[i + 1:]: if not _HEADING.match(nxt): raw_authors = re.sub(r"[*#]", "", nxt).strip() authors = _clean_authors(raw_authors) break break # arXiv id: try to extract from the file_name (e.g. "2402.08696" pattern) arxiv = "" arxiv_from_name = re.search(r"(\d{4}\.\d{4,5})", file_name) if arxiv_from_name: arxiv = arxiv_from_name.group(1) else: # Fall back: search the raw first-page chunk text before cleaning m = _ARXIV_STAMP.search(chunks[0]["text"]) if m: arxiv = m.group(0).split(":")[1] return Paper( title=title, authors=authors, arxivId=arxiv, pages=n_pages, paragraphs=[Paragraph(**p) for p in paras], ), raw_refs