| """Text extraction from digital (native-text) PDFs.
|
|
|
| Uses PyMuPDF (fitz) for fast native text extraction and pdfplumber
|
| for table detection on text-based pages.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import logging
|
| from pathlib import Path
|
|
|
| import fitz
|
| import pdfplumber
|
|
|
| from app.schemas.extraction import (
|
| BlockType,
|
| ContentBlock,
|
| DocumentMetadata,
|
| HeadingLevel,
|
| ListItem,
|
| PageResult,
|
| TableBlock,
|
| TableCell,
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
| _HEADING_MIN_SIZE = 13.0
|
| _LIST_BULLETS = {"•", "–", "-", "—", "○", "■", "□", "►", "▸", "●"}
|
|
|
|
|
| def _is_heading(span: dict) -> bool:
|
| """Guess if a text span is a heading based on font size and weight."""
|
| size = span.get("size", 12)
|
| flags = span.get("flags", 0)
|
| is_bold = bool(flags & 2 ** 4)
|
| return size >= _HEADING_MIN_SIZE or (is_bold and size >= 11.5)
|
|
|
|
|
| def _heading_level(size: float) -> HeadingLevel:
|
| if size >= 22:
|
| return HeadingLevel.H1
|
| if size >= 18:
|
| return HeadingLevel.H2
|
| if size >= 15:
|
| return HeadingLevel.H3
|
| if size >= 13:
|
| return HeadingLevel.H4
|
| return HeadingLevel.H5
|
|
|
|
|
| def _is_list_line(line: str) -> bool:
|
| stripped = line.strip()
|
| if not stripped:
|
| return False
|
|
|
| if stripped[0] in _LIST_BULLETS:
|
| return True
|
|
|
| if len(stripped) >= 2 and stripped[0].isalnum() and stripped[1] in ".)" :
|
| return True
|
| return False
|
|
|
|
|
| def _strip_bullet(line: str) -> str:
|
| stripped = line.strip()
|
| if stripped and stripped[0] in _LIST_BULLETS:
|
| return stripped[1:].strip()
|
|
|
| if len(stripped) >= 2 and stripped[0].isalnum() and stripped[1] in ".)":
|
| return stripped[2:].strip()
|
| return stripped
|
|
|
|
|
|
|
|
|
|
|
| def page_has_native_text(pdf_path: str | Path, page_num: int) -> bool:
|
| """Return True if the page has enough native text to skip OCR."""
|
| with fitz.open(str(pdf_path)) as doc:
|
| if page_num >= len(doc):
|
| return False
|
| text = doc[page_num].get_text("text").strip()
|
| return len(text) > 30
|
|
|
|
|
| def document_has_native_text(pdf_path: str | Path) -> bool:
|
| """Quick check: does ANY page have substantial native text?"""
|
| with fitz.open(str(pdf_path)) as doc:
|
| for page in doc:
|
| if len(page.get_text("text").strip()) > 30:
|
| return True
|
| return False
|
|
|
|
|
|
|
|
|
|
|
| def extract_metadata(pdf_path: str | Path) -> DocumentMetadata:
|
| p = Path(pdf_path)
|
| with fitz.open(str(p)) as doc:
|
| meta = doc.metadata or {}
|
| return DocumentMetadata(
|
| title=meta.get("title", "") or "",
|
| author=meta.get("author", "") or "",
|
| subject=meta.get("subject", "") or "",
|
| creator=meta.get("creator", "") or "",
|
| producer=meta.get("producer", "") or "",
|
| page_count=len(doc),
|
| file_name=p.name,
|
| file_size_bytes=p.stat().st_size,
|
| mime_type="application/pdf",
|
| creation_date=meta.get("creationDate", "") or "",
|
| modification_date=meta.get("modDate", "") or "",
|
| )
|
|
|
|
|
|
|
|
|
|
|
| def extract_text_page(pdf_path: str | Path, page_num: int) -> PageResult:
|
| """Extract structured blocks from a native-text PDF page."""
|
|
|
| blocks: list[ContentBlock] = []
|
|
|
| with fitz.open(str(pdf_path)) as doc:
|
| page = doc[page_num]
|
| rect = page.rect
|
| text_dict = page.get_text("dict", flags=fitz.TEXT_PRESERVE_WHITESPACE)
|
|
|
| current_paragraph_lines: list[str] = []
|
|
|
| def flush_paragraph():
|
| if current_paragraph_lines:
|
| text = " ".join(current_paragraph_lines).strip()
|
| if text:
|
|
|
| lines = text.split("\n")
|
| if all(_is_list_line(l) for l in lines if l.strip()):
|
| items = [
|
| ListItem(text=_strip_bullet(l))
|
| for l in lines if l.strip()
|
| ]
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.LIST,
|
| list_items=items,
|
| source="text",
|
| ))
|
| else:
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.PARAGRAPH,
|
| text=text,
|
| source="text",
|
| ))
|
| current_paragraph_lines.clear()
|
|
|
| for block_dict in text_dict.get("blocks", []):
|
| if block_dict.get("type") != 0:
|
| continue
|
| for line_dict in block_dict.get("lines", []):
|
| spans = line_dict.get("spans", [])
|
| if not spans:
|
| continue
|
|
|
| line_text = "".join(s.get("text", "") for s in spans).strip()
|
| if not line_text:
|
| flush_paragraph()
|
| continue
|
|
|
|
|
| first_span = spans[0]
|
| if _is_heading(first_span):
|
| flush_paragraph()
|
| lvl = _heading_level(first_span.get("size", 12))
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.HEADING,
|
| text=line_text,
|
| heading_level=lvl,
|
| source="text",
|
| ))
|
| elif _is_list_line(line_text):
|
| flush_paragraph()
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.LIST,
|
| list_items=[ListItem(text=_strip_bullet(line_text))],
|
| source="text",
|
| ))
|
| else:
|
| current_paragraph_lines.append(line_text)
|
|
|
| flush_paragraph()
|
|
|
|
|
| _extract_tables_plumber(pdf_path, page_num, blocks)
|
|
|
| plain = "\n".join(
|
| b.text for b in blocks
|
| if b.block_type in (BlockType.HEADING, BlockType.PARAGRAPH)
|
| )
|
|
|
| with fitz.open(str(pdf_path)) as doc:
|
| rect = doc[page_num].rect
|
|
|
| return PageResult(
|
| page_number=page_num + 1,
|
| width=rect.width,
|
| height=rect.height,
|
| blocks=blocks,
|
| plain_text=plain,
|
| is_scanned=False,
|
| ocr_confidence=1.0,
|
| )
|
|
|
|
|
| def _extract_tables_plumber(
|
| pdf_path: str | Path,
|
| page_num: int,
|
| blocks: list[ContentBlock],
|
| ) -> None:
|
| """Detect tables with pdfplumber and append TableBlock entries."""
|
| try:
|
| with pdfplumber.open(str(pdf_path)) as pdf:
|
| if page_num >= len(pdf.pages):
|
| return
|
| page = pdf.pages[page_num]
|
| tables = page.extract_tables()
|
| for raw_table in tables:
|
| if not raw_table:
|
| continue
|
| cells: list[TableCell] = []
|
| n_rows = len(raw_table)
|
| n_cols = max((len(r) for r in raw_table), default=0)
|
| for ri, row in enumerate(raw_table):
|
| for ci, val in enumerate(row or []):
|
| cells.append(TableCell(
|
| text=(val or "").strip(),
|
| row=ri,
|
| col=ci,
|
| is_header=(ri == 0),
|
| ))
|
| tb = TableBlock(rows=n_rows, cols=n_cols, cells=cells)
|
| blocks.append(ContentBlock(
|
| block_type=BlockType.TABLE,
|
| table=tb,
|
| source="text",
|
| ))
|
| except Exception:
|
| logger.warning("pdfplumber table extraction failed on page %d", page_num, exc_info=True)
|
|
|