Spaces:
Sleeping
Sleeping
| """Document parsing pipeline. | |
| Format support: | |
| PDF — pdfplumber (digital text), pytesseract OCR fallback (scanned/image PDFs) | |
| DOCX — python-docx | |
| TXT/MD — charset auto-detection (UTF-8 → UTF-16 → Latin-1) | |
| Returns ParsedDocument with char-level page map for span-to-page resolution in the UI. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import BinaryIO | |
| logger = logging.getLogger(__name__) | |
| class PageSpan: | |
| page: int | |
| start: int | |
| end: int | |
| class ParsedDocument: | |
| text: str | |
| pages: list[PageSpan] = field(default_factory=list) | |
| source_format: str = "unknown" | |
| char_count: int = 0 | |
| page_count: int = 0 | |
| ocr_used: bool = False | |
| class UnsupportedFormat(Exception): | |
| pass | |
| class EmptyDocument(Exception): | |
| pass | |
| def parse(filename: str, raw: bytes) -> ParsedDocument: | |
| suffix = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" | |
| if suffix == "pdf": | |
| doc = _parse_pdf(io.BytesIO(raw)) | |
| elif suffix in ("docx",): | |
| doc = _parse_docx(io.BytesIO(raw)) | |
| elif suffix in ("txt", "md", ""): | |
| doc = _parse_text(raw) | |
| else: | |
| raise UnsupportedFormat(f"unsupported file type: .{suffix}") | |
| if not doc.text.strip(): | |
| raise EmptyDocument("document contains no extractable text") | |
| doc.char_count = len(doc.text) | |
| doc.page_count = len(doc.pages) or 1 | |
| return doc | |
| def _parse_pdf(stream: BinaryIO) -> ParsedDocument: | |
| import pdfplumber | |
| parts: list[str] = [] | |
| pages: list[PageSpan] = [] | |
| cursor = 0 | |
| raw_bytes = stream.read() | |
| with pdfplumber.open(io.BytesIO(raw_bytes)) as pdf: | |
| for idx, page in enumerate(pdf.pages, start=1): | |
| text = (page.extract_text() or "").strip() | |
| if not text: | |
| continue | |
| if parts: | |
| parts.append("\n\n") | |
| cursor += 2 | |
| start = cursor | |
| parts.append(text) | |
| cursor += len(text) | |
| pages.append(PageSpan(page=idx, start=start, end=cursor)) | |
| text = "".join(parts) | |
| if text.strip(): | |
| return ParsedDocument(text=text, pages=pages, source_format="pdf") | |
| # Digital extraction yielded nothing — attempt OCR on image-based PDF | |
| logger.info("PDF has no extractable text — attempting OCR pipeline") | |
| return _ocr_pdf(io.BytesIO(raw_bytes)) | |
| def _ocr_pdf(stream: BinaryIO) -> ParsedDocument: | |
| try: | |
| import pytesseract | |
| from pdf2image import convert_from_bytes # type: ignore[import] | |
| from PIL import Image # type: ignore[import] | |
| except ImportError as exc: | |
| logger.warning( | |
| "OCR dependencies unavailable (%s). Install pytesseract + pdf2image + Pillow " | |
| "for scanned PDF support.", | |
| exc, | |
| ) | |
| return ParsedDocument(text="", source_format="pdf_scanned_no_ocr") | |
| try: | |
| images = convert_from_bytes(stream.read(), dpi=200) | |
| parts: list[str] = [] | |
| pages: list[PageSpan] = [] | |
| cursor = 0 | |
| for idx, img in enumerate(images, start=1): | |
| text = pytesseract.image_to_string(img, lang="eng").strip() | |
| if not text: | |
| continue | |
| if parts: | |
| parts.append("\n\n") | |
| cursor += 2 | |
| start = cursor | |
| parts.append(text) | |
| cursor += len(text) | |
| pages.append(PageSpan(page=idx, start=start, end=cursor)) | |
| ocr_text = "".join(parts) | |
| logger.info("OCR extracted %d chars from %d page(s)", len(ocr_text), len(images)) | |
| return ParsedDocument(text=ocr_text, pages=pages, source_format="pdf_ocr", ocr_used=True) | |
| except Exception as exc: | |
| logger.exception("OCR pipeline failed: %s", exc) | |
| return ParsedDocument(text="", source_format="pdf_ocr_failed") | |
| def _parse_docx(stream: BinaryIO) -> ParsedDocument: | |
| from docx import Document | |
| document = Document(stream) | |
| parts: list[str] = [] | |
| cursor = 0 | |
| pages: list[PageSpan] = [] | |
| for para in document.paragraphs: | |
| text = para.text.strip() | |
| if not text: | |
| continue | |
| if parts: | |
| parts.append("\n\n") | |
| cursor += 2 | |
| parts.append(text) | |
| cursor += len(text) | |
| for table in document.tables: | |
| for row in table.rows: | |
| cells = [cell.text.strip() for cell in row.cells if cell.text.strip()] | |
| if not cells: | |
| continue | |
| line = " | ".join(cells) | |
| if parts: | |
| parts.append("\n") | |
| cursor += 1 | |
| parts.append(line) | |
| cursor += len(line) | |
| text = "".join(parts) | |
| if text: | |
| pages.append(PageSpan(page=1, start=0, end=len(text))) | |
| return ParsedDocument(text=text, pages=pages, source_format="docx") | |
| def _parse_text(raw: bytes) -> ParsedDocument: | |
| for encoding in ("utf-8", "utf-16", "latin-1"): | |
| try: | |
| text = raw.decode(encoding).strip() | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| else: | |
| raise UnsupportedFormat("could not decode text file") | |
| return ParsedDocument( | |
| text=text, | |
| pages=[PageSpan(page=1, start=0, end=len(text))], | |
| source_format="text", | |
| ) | |
| def page_for_offset(pages: list[PageSpan], offset: int) -> int | None: | |
| for span in pages: | |
| if span.start <= offset < span.end: | |
| return span.page | |
| return pages[-1].page if pages else None | |