Dar4devil's picture
LexGuard backend
c34b339
Raw
History Blame Contribute Delete
5.56 kB
"""Document parsing pipeline.
Format support:
PDF — pdfplumber (digital text), pytesseract OCR fallback (scanned/image PDFs)
DOCX — python-docx
TXT/MD — charset auto-detection (UTF-8 → UTF-16 → Latin-1)
Returns ParsedDocument with char-level page map for span-to-page resolution in the UI.
"""
from __future__ import annotations
import io
import logging
from dataclasses import dataclass, field
from typing import BinaryIO
logger = logging.getLogger(__name__)
@dataclass
class PageSpan:
page: int
start: int
end: int
@dataclass
class ParsedDocument:
text: str
pages: list[PageSpan] = field(default_factory=list)
source_format: str = "unknown"
char_count: int = 0
page_count: int = 0
ocr_used: bool = False
class UnsupportedFormat(Exception):
pass
class EmptyDocument(Exception):
pass
def parse(filename: str, raw: bytes) -> ParsedDocument:
suffix = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if suffix == "pdf":
doc = _parse_pdf(io.BytesIO(raw))
elif suffix in ("docx",):
doc = _parse_docx(io.BytesIO(raw))
elif suffix in ("txt", "md", ""):
doc = _parse_text(raw)
else:
raise UnsupportedFormat(f"unsupported file type: .{suffix}")
if not doc.text.strip():
raise EmptyDocument("document contains no extractable text")
doc.char_count = len(doc.text)
doc.page_count = len(doc.pages) or 1
return doc
def _parse_pdf(stream: BinaryIO) -> ParsedDocument:
import pdfplumber
parts: list[str] = []
pages: list[PageSpan] = []
cursor = 0
raw_bytes = stream.read()
with pdfplumber.open(io.BytesIO(raw_bytes)) as pdf:
for idx, page in enumerate(pdf.pages, start=1):
text = (page.extract_text() or "").strip()
if not text:
continue
if parts:
parts.append("\n\n")
cursor += 2
start = cursor
parts.append(text)
cursor += len(text)
pages.append(PageSpan(page=idx, start=start, end=cursor))
text = "".join(parts)
if text.strip():
return ParsedDocument(text=text, pages=pages, source_format="pdf")
# Digital extraction yielded nothing — attempt OCR on image-based PDF
logger.info("PDF has no extractable text — attempting OCR pipeline")
return _ocr_pdf(io.BytesIO(raw_bytes))
def _ocr_pdf(stream: BinaryIO) -> ParsedDocument:
try:
import pytesseract
from pdf2image import convert_from_bytes # type: ignore[import]
from PIL import Image # type: ignore[import]
except ImportError as exc:
logger.warning(
"OCR dependencies unavailable (%s). Install pytesseract + pdf2image + Pillow "
"for scanned PDF support.",
exc,
)
return ParsedDocument(text="", source_format="pdf_scanned_no_ocr")
try:
images = convert_from_bytes(stream.read(), dpi=200)
parts: list[str] = []
pages: list[PageSpan] = []
cursor = 0
for idx, img in enumerate(images, start=1):
text = pytesseract.image_to_string(img, lang="eng").strip()
if not text:
continue
if parts:
parts.append("\n\n")
cursor += 2
start = cursor
parts.append(text)
cursor += len(text)
pages.append(PageSpan(page=idx, start=start, end=cursor))
ocr_text = "".join(parts)
logger.info("OCR extracted %d chars from %d page(s)", len(ocr_text), len(images))
return ParsedDocument(text=ocr_text, pages=pages, source_format="pdf_ocr", ocr_used=True)
except Exception as exc:
logger.exception("OCR pipeline failed: %s", exc)
return ParsedDocument(text="", source_format="pdf_ocr_failed")
def _parse_docx(stream: BinaryIO) -> ParsedDocument:
from docx import Document
document = Document(stream)
parts: list[str] = []
cursor = 0
pages: list[PageSpan] = []
for para in document.paragraphs:
text = para.text.strip()
if not text:
continue
if parts:
parts.append("\n\n")
cursor += 2
parts.append(text)
cursor += len(text)
for table in document.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if not cells:
continue
line = " | ".join(cells)
if parts:
parts.append("\n")
cursor += 1
parts.append(line)
cursor += len(line)
text = "".join(parts)
if text:
pages.append(PageSpan(page=1, start=0, end=len(text)))
return ParsedDocument(text=text, pages=pages, source_format="docx")
def _parse_text(raw: bytes) -> ParsedDocument:
for encoding in ("utf-8", "utf-16", "latin-1"):
try:
text = raw.decode(encoding).strip()
break
except UnicodeDecodeError:
continue
else:
raise UnsupportedFormat("could not decode text file")
return ParsedDocument(
text=text,
pages=[PageSpan(page=1, start=0, end=len(text))],
source_format="text",
)
def page_for_offset(pages: list[PageSpan], offset: int) -> int | None:
for span in pages:
if span.start <= offset < span.end:
return span.page
return pages[-1].page if pages else None