"""Document parsing pipeline. Format support: PDF — pdfplumber (digital text), pytesseract OCR fallback (scanned/image PDFs) DOCX — python-docx TXT/MD — charset auto-detection (UTF-8 → UTF-16 → Latin-1) Returns ParsedDocument with char-level page map for span-to-page resolution in the UI. """ from __future__ import annotations import io import logging from dataclasses import dataclass, field from typing import BinaryIO logger = logging.getLogger(__name__) @dataclass class PageSpan: page: int start: int end: int @dataclass class ParsedDocument: text: str pages: list[PageSpan] = field(default_factory=list) source_format: str = "unknown" char_count: int = 0 page_count: int = 0 ocr_used: bool = False class UnsupportedFormat(Exception): pass class EmptyDocument(Exception): pass def parse(filename: str, raw: bytes) -> ParsedDocument: suffix = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" if suffix == "pdf": doc = _parse_pdf(io.BytesIO(raw)) elif suffix in ("docx",): doc = _parse_docx(io.BytesIO(raw)) elif suffix in ("txt", "md", ""): doc = _parse_text(raw) else: raise UnsupportedFormat(f"unsupported file type: .{suffix}") if not doc.text.strip(): raise EmptyDocument("document contains no extractable text") doc.char_count = len(doc.text) doc.page_count = len(doc.pages) or 1 return doc def _parse_pdf(stream: BinaryIO) -> ParsedDocument: import pdfplumber parts: list[str] = [] pages: list[PageSpan] = [] cursor = 0 raw_bytes = stream.read() with pdfplumber.open(io.BytesIO(raw_bytes)) as pdf: for idx, page in enumerate(pdf.pages, start=1): text = (page.extract_text() or "").strip() if not text: continue if parts: parts.append("\n\n") cursor += 2 start = cursor parts.append(text) cursor += len(text) pages.append(PageSpan(page=idx, start=start, end=cursor)) text = "".join(parts) if text.strip(): return ParsedDocument(text=text, pages=pages, source_format="pdf") # Digital extraction yielded nothing — attempt OCR on image-based PDF logger.info("PDF has no extractable text — attempting OCR pipeline") return _ocr_pdf(io.BytesIO(raw_bytes)) def _ocr_pdf(stream: BinaryIO) -> ParsedDocument: try: import pytesseract from pdf2image import convert_from_bytes # type: ignore[import] from PIL import Image # type: ignore[import] except ImportError as exc: logger.warning( "OCR dependencies unavailable (%s). Install pytesseract + pdf2image + Pillow " "for scanned PDF support.", exc, ) return ParsedDocument(text="", source_format="pdf_scanned_no_ocr") try: images = convert_from_bytes(stream.read(), dpi=200) parts: list[str] = [] pages: list[PageSpan] = [] cursor = 0 for idx, img in enumerate(images, start=1): text = pytesseract.image_to_string(img, lang="eng").strip() if not text: continue if parts: parts.append("\n\n") cursor += 2 start = cursor parts.append(text) cursor += len(text) pages.append(PageSpan(page=idx, start=start, end=cursor)) ocr_text = "".join(parts) logger.info("OCR extracted %d chars from %d page(s)", len(ocr_text), len(images)) return ParsedDocument(text=ocr_text, pages=pages, source_format="pdf_ocr", ocr_used=True) except Exception as exc: logger.exception("OCR pipeline failed: %s", exc) return ParsedDocument(text="", source_format="pdf_ocr_failed") def _parse_docx(stream: BinaryIO) -> ParsedDocument: from docx import Document document = Document(stream) parts: list[str] = [] cursor = 0 pages: list[PageSpan] = [] for para in document.paragraphs: text = para.text.strip() if not text: continue if parts: parts.append("\n\n") cursor += 2 parts.append(text) cursor += len(text) for table in document.tables: for row in table.rows: cells = [cell.text.strip() for cell in row.cells if cell.text.strip()] if not cells: continue line = " | ".join(cells) if parts: parts.append("\n") cursor += 1 parts.append(line) cursor += len(line) text = "".join(parts) if text: pages.append(PageSpan(page=1, start=0, end=len(text))) return ParsedDocument(text=text, pages=pages, source_format="docx") def _parse_text(raw: bytes) -> ParsedDocument: for encoding in ("utf-8", "utf-16", "latin-1"): try: text = raw.decode(encoding).strip() break except UnicodeDecodeError: continue else: raise UnsupportedFormat("could not decode text file") return ParsedDocument( text=text, pages=[PageSpan(page=1, start=0, end=len(text))], source_format="text", ) def page_for_offset(pages: list[PageSpan], offset: int) -> int | None: for span in pages: if span.start <= offset < span.end: return span.page return pages[-1].page if pages else None