Spaces:
Sleeping
Sleeping
| """Document parsing — PyMuPDF for digital PDFs; MiniCPM-V OCR on Modal; LiteParse for layout only.""" | |
| from __future__ import annotations | |
| import base64 | |
| import io | |
| import logging | |
| import os | |
| import re | |
| import tempfile | |
| from functools import lru_cache | |
| from typing import TYPE_CHECKING, List, Optional, Tuple | |
| import fitz | |
| from liteparse import LiteParse, ParseResult, ParsedPage | |
| from utils.pdf_parser import ( | |
| extract_pdf_spatial_pages, | |
| render_page_image, | |
| render_page_png_base64, | |
| ) | |
| if TYPE_CHECKING: | |
| from models.ocr import MiniCPMVOCR | |
| logger = logging.getLogger(__name__) | |
| _HTML_TAG = re.compile(r"<[^>]+>") | |
| _IMAGE_SUFFIXES = {".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp", ".gif"} | |
| def _get_layout_parser() -> LiteParse: | |
| """LiteParse for layout/format detection only — OCR disabled (text from MiniCPM-V).""" | |
| return LiteParse( | |
| ocr_enabled=False, | |
| dpi=300, | |
| quiet=True, | |
| ) | |
| def _suffix_from_filename(filename: Optional[str]) -> str: | |
| if filename and "." in filename: | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext: | |
| return ext | |
| return ".pdf" | |
| def _is_image_suffix(suffix: str) -> bool: | |
| return suffix.lower() in _IMAGE_SUFFIXES | |
| def _clean_spatial_text(text: str) -> str: | |
| if not text: | |
| return "" | |
| cleaned = text.replace("\r\n", "\n").replace("\r", "\n") | |
| if "<" in cleaned and ">" in cleaned: | |
| cleaned = re.sub(r"<br\s*/?>", "\n", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"</tr>", "\n", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"</t[dh]>", " ", cleaned, flags=re.IGNORECASE) | |
| cleaned = _HTML_TAG.sub("", cleaned) | |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) | |
| return cleaned.rstrip() | |
| def _image_to_png_bytes(image_bytes: bytes) -> bytes: | |
| from PIL import Image | |
| image = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| buf = io.BytesIO() | |
| image.save(buf, format="PNG") | |
| return buf.getvalue() | |
| def _page_image_to_png_bytes(file_bytes: bytes, page_num: int) -> bytes: | |
| image = render_page_image(file_bytes, page_num) | |
| buf = io.BytesIO() | |
| image.save(buf, format="PNG") | |
| return buf.getvalue() | |
| def _modal_ocr_page(file_bytes: bytes, page_num: int, ocr: MiniCPMVOCR) -> str: | |
| png_bytes = _page_image_to_png_bytes(file_bytes, page_num) | |
| return ocr.extract_text(png_bytes) | |
| def _build_parse_result(pages: List[Tuple[int, str]]) -> ParseResult: | |
| parsed_pages = [ | |
| ParsedPage(page_num=n, width=0.0, height=0.0, text=t, text_items=[]) | |
| for n, t in pages | |
| if t.strip() | |
| ] | |
| full_text = "\n\n".join(p.text for p in parsed_pages) | |
| return ParseResult(pages=parsed_pages, text=full_text) | |
| def _liteparse_layout_pages(file_bytes: bytes) -> List[Tuple[int, str]]: | |
| """Optional layout pass — keeps table/section structure without running Tesseract OCR.""" | |
| try: | |
| result = _get_layout_parser().parse(file_bytes) | |
| return [(page.page_num, page.text) for page in result.pages if page.text.strip()] | |
| except Exception as exc: | |
| logger.debug("LiteParse layout pass skipped: %s", exc) | |
| return [] | |
| def _parse_pdf_hybrid(file_bytes: bytes, ocr: MiniCPMVOCR) -> ParseResult: | |
| page_infos = extract_pdf_spatial_pages(file_bytes) | |
| pages_out: List[Tuple[int, str]] = [] | |
| for page_num, text, is_sparse in page_infos: | |
| if is_sparse: | |
| try: | |
| logger.info("MiniCPM-V OCR on PDF page %d", page_num) | |
| text = _modal_ocr_page(file_bytes, page_num, ocr) | |
| except Exception as exc: | |
| logger.warning("Modal OCR failed on page %d: %s", page_num, exc) | |
| pages_out.append((page_num, _clean_spatial_text(text))) | |
| return _build_parse_result(pages_out) | |
| def parse_document( | |
| file_bytes: bytes, | |
| filename: Optional[str], | |
| ocr: MiniCPMVOCR, | |
| ) -> ParseResult: | |
| suffix = _suffix_from_filename(filename) | |
| if suffix == ".pdf": | |
| return _parse_pdf_hybrid(file_bytes, ocr) | |
| if _is_image_suffix(suffix): | |
| logger.info("MiniCPM-V OCR on image %s", filename or "upload") | |
| text = ocr.extract_text(_image_to_png_bytes(file_bytes)) | |
| cleaned = _clean_spatial_text(text) | |
| return ParseResult( | |
| pages=[ | |
| ParsedPage( | |
| page_num=1, | |
| width=0.0, | |
| height=0.0, | |
| text=cleaned, | |
| text_items=[], | |
| ) | |
| ], | |
| text=cleaned, | |
| ) | |
| with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: | |
| tmp.write(file_bytes) | |
| tmp_path = tmp.name | |
| try: | |
| layout_pages = _liteparse_layout_pages(file_bytes) | |
| if layout_pages: | |
| return _build_parse_result( | |
| [(num, _clean_spatial_text(text)) for num, text in layout_pages] | |
| ) | |
| result = _get_layout_parser().parse(tmp_path) | |
| return ParseResult( | |
| pages=result.pages, | |
| text=_clean_spatial_text(result.text), | |
| ) | |
| finally: | |
| os.unlink(tmp_path) | |
| def file_to_ocr_image_bytes( | |
| file_bytes: bytes, | |
| filename: Optional[str] = None, | |
| page_num: int = 1, | |
| ) -> bytes: | |
| suffix = _suffix_from_filename(filename) | |
| if _is_image_suffix(suffix): | |
| return _image_to_png_bytes(file_bytes) | |
| return _page_image_to_png_bytes(file_bytes, page_num) | |
| def _modal_structured_page(file_bytes: bytes, page_num: int, ocr: MiniCPMVOCR) -> str: | |
| png_bytes = _page_image_to_png_bytes(file_bytes, page_num) | |
| return ocr.extract_structured(png_bytes) | |
| def extract_document_structured_ocr( | |
| file_bytes: bytes, | |
| filename: Optional[str], | |
| ocr: MiniCPMVOCR, | |
| ) -> dict: | |
| """Structured OCR via MiniCPM-V — sections, key-value fields, and table rows.""" | |
| from utils.ocr_structure import merge_structured_pages, parse_structured_page | |
| suffix = _suffix_from_filename(filename) | |
| pages = [] | |
| if _is_image_suffix(suffix): | |
| logger.info("MiniCPM-V structured OCR on image %s", filename or "upload") | |
| raw = ocr.extract_structured(_image_to_png_bytes(file_bytes)) | |
| pages.append(parse_structured_page(raw, page_number=1)) | |
| else: | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| try: | |
| page_count = doc.page_count | |
| finally: | |
| doc.close() | |
| for page_num in range(1, page_count + 1): | |
| logger.info("MiniCPM-V structured OCR page %d/%d", page_num, page_count) | |
| raw = _modal_structured_page(file_bytes, page_num, ocr) | |
| pages.append(parse_structured_page(raw, page_number=page_num)) | |
| return merge_structured_pages(pages, filename) | |
| def extract_document_ocr( | |
| file_bytes: bytes, | |
| filename: Optional[str], | |
| ocr: MiniCPMVOCR, | |
| ) -> str: | |
| """Full-document OCR via MiniCPM-V (Document OCR UI).""" | |
| suffix = _suffix_from_filename(filename) | |
| if _is_image_suffix(suffix): | |
| return _clean_spatial_text(ocr.extract_text(_image_to_png_bytes(file_bytes))) | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| try: | |
| page_count = doc.page_count | |
| finally: | |
| doc.close() | |
| parts: List[str] = [] | |
| for page_num in range(1, page_count + 1): | |
| logger.info("MiniCPM-V OCR page %d/%d", page_num, page_count) | |
| parts.append(_modal_ocr_page(file_bytes, page_num, ocr)) | |
| return _clean_spatial_text("\n\n".join(part for part in parts if part.strip())) | |
| def extract_text( | |
| file_bytes: bytes, | |
| filename: Optional[str], | |
| ocr: MiniCPMVOCR, | |
| ) -> str: | |
| return extract_document_ocr(file_bytes, filename, ocr) | |
| def preview_page_base64( | |
| file_bytes: bytes, | |
| page_num: int = 1, | |
| filename: Optional[str] = None, | |
| ) -> Optional[str]: | |
| suffix = _suffix_from_filename(filename) | |
| if _is_image_suffix(suffix): | |
| return base64.b64encode(file_bytes).decode("ascii") | |
| try: | |
| return render_page_png_base64(file_bytes, page_num=page_num) | |
| except Exception as exc: | |
| logger.warning("PDF preview render failed: %s", exc) | |
| return None | |