Spaces:
Sleeping
Sleeping
File size: 5,556 Bytes
c34b339 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | """Document parsing pipeline.
Format support:
PDF — pdfplumber (digital text), pytesseract OCR fallback (scanned/image PDFs)
DOCX — python-docx
TXT/MD — charset auto-detection (UTF-8 → UTF-16 → Latin-1)
Returns ParsedDocument with char-level page map for span-to-page resolution in the UI.
"""
from __future__ import annotations
import io
import logging
from dataclasses import dataclass, field
from typing import BinaryIO
logger = logging.getLogger(__name__)
@dataclass
class PageSpan:
page: int
start: int
end: int
@dataclass
class ParsedDocument:
text: str
pages: list[PageSpan] = field(default_factory=list)
source_format: str = "unknown"
char_count: int = 0
page_count: int = 0
ocr_used: bool = False
class UnsupportedFormat(Exception):
pass
class EmptyDocument(Exception):
pass
def parse(filename: str, raw: bytes) -> ParsedDocument:
suffix = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if suffix == "pdf":
doc = _parse_pdf(io.BytesIO(raw))
elif suffix in ("docx",):
doc = _parse_docx(io.BytesIO(raw))
elif suffix in ("txt", "md", ""):
doc = _parse_text(raw)
else:
raise UnsupportedFormat(f"unsupported file type: .{suffix}")
if not doc.text.strip():
raise EmptyDocument("document contains no extractable text")
doc.char_count = len(doc.text)
doc.page_count = len(doc.pages) or 1
return doc
def _parse_pdf(stream: BinaryIO) -> ParsedDocument:
import pdfplumber
parts: list[str] = []
pages: list[PageSpan] = []
cursor = 0
raw_bytes = stream.read()
with pdfplumber.open(io.BytesIO(raw_bytes)) as pdf:
for idx, page in enumerate(pdf.pages, start=1):
text = (page.extract_text() or "").strip()
if not text:
continue
if parts:
parts.append("\n\n")
cursor += 2
start = cursor
parts.append(text)
cursor += len(text)
pages.append(PageSpan(page=idx, start=start, end=cursor))
text = "".join(parts)
if text.strip():
return ParsedDocument(text=text, pages=pages, source_format="pdf")
# Digital extraction yielded nothing — attempt OCR on image-based PDF
logger.info("PDF has no extractable text — attempting OCR pipeline")
return _ocr_pdf(io.BytesIO(raw_bytes))
def _ocr_pdf(stream: BinaryIO) -> ParsedDocument:
try:
import pytesseract
from pdf2image import convert_from_bytes # type: ignore[import]
from PIL import Image # type: ignore[import]
except ImportError as exc:
logger.warning(
"OCR dependencies unavailable (%s). Install pytesseract + pdf2image + Pillow "
"for scanned PDF support.",
exc,
)
return ParsedDocument(text="", source_format="pdf_scanned_no_ocr")
try:
images = convert_from_bytes(stream.read(), dpi=200)
parts: list[str] = []
pages: list[PageSpan] = []
cursor = 0
for idx, img in enumerate(images, start=1):
text = pytesseract.image_to_string(img, lang="eng").strip()
if not text:
continue
if parts:
parts.append("\n\n")
cursor += 2
start = cursor
parts.append(text)
cursor += len(text)
pages.append(PageSpan(page=idx, start=start, end=cursor))
ocr_text = "".join(parts)
logger.info("OCR extracted %d chars from %d page(s)", len(ocr_text), len(images))
return ParsedDocument(text=ocr_text, pages=pages, source_format="pdf_ocr", ocr_used=True)
except Exception as exc:
logger.exception("OCR pipeline failed: %s", exc)
return ParsedDocument(text="", source_format="pdf_ocr_failed")
def _parse_docx(stream: BinaryIO) -> ParsedDocument:
from docx import Document
document = Document(stream)
parts: list[str] = []
cursor = 0
pages: list[PageSpan] = []
for para in document.paragraphs:
text = para.text.strip()
if not text:
continue
if parts:
parts.append("\n\n")
cursor += 2
parts.append(text)
cursor += len(text)
for table in document.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if not cells:
continue
line = " | ".join(cells)
if parts:
parts.append("\n")
cursor += 1
parts.append(line)
cursor += len(line)
text = "".join(parts)
if text:
pages.append(PageSpan(page=1, start=0, end=len(text)))
return ParsedDocument(text=text, pages=pages, source_format="docx")
def _parse_text(raw: bytes) -> ParsedDocument:
for encoding in ("utf-8", "utf-16", "latin-1"):
try:
text = raw.decode(encoding).strip()
break
except UnicodeDecodeError:
continue
else:
raise UnsupportedFormat("could not decode text file")
return ParsedDocument(
text=text,
pages=[PageSpan(page=1, start=0, end=len(text))],
source_format="text",
)
def page_for_offset(pages: list[PageSpan], offset: int) -> int | None:
for span in pages:
if span.start <= offset < span.end:
return span.page
return pages[-1].page if pages else None
|