File size: 5,556 Bytes
c34b339
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""Document parsing pipeline.

Format support:
  PDF     — pdfplumber (digital text), pytesseract OCR fallback (scanned/image PDFs)
  DOCX    — python-docx
  TXT/MD  — charset auto-detection (UTF-8 → UTF-16 → Latin-1)

Returns ParsedDocument with char-level page map for span-to-page resolution in the UI.
"""

from __future__ import annotations

import io
import logging
from dataclasses import dataclass, field
from typing import BinaryIO

logger = logging.getLogger(__name__)


@dataclass
class PageSpan:
    page: int
    start: int
    end: int


@dataclass
class ParsedDocument:
    text: str
    pages: list[PageSpan] = field(default_factory=list)
    source_format: str = "unknown"
    char_count: int = 0
    page_count: int = 0
    ocr_used: bool = False


class UnsupportedFormat(Exception):
    pass


class EmptyDocument(Exception):
    pass


def parse(filename: str, raw: bytes) -> ParsedDocument:
    suffix = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
    if suffix == "pdf":
        doc = _parse_pdf(io.BytesIO(raw))
    elif suffix in ("docx",):
        doc = _parse_docx(io.BytesIO(raw))
    elif suffix in ("txt", "md", ""):
        doc = _parse_text(raw)
    else:
        raise UnsupportedFormat(f"unsupported file type: .{suffix}")

    if not doc.text.strip():
        raise EmptyDocument("document contains no extractable text")

    doc.char_count = len(doc.text)
    doc.page_count = len(doc.pages) or 1
    return doc


def _parse_pdf(stream: BinaryIO) -> ParsedDocument:
    import pdfplumber

    parts: list[str] = []
    pages: list[PageSpan] = []
    cursor = 0
    raw_bytes = stream.read()

    with pdfplumber.open(io.BytesIO(raw_bytes)) as pdf:
        for idx, page in enumerate(pdf.pages, start=1):
            text = (page.extract_text() or "").strip()
            if not text:
                continue
            if parts:
                parts.append("\n\n")
                cursor += 2
            start = cursor
            parts.append(text)
            cursor += len(text)
            pages.append(PageSpan(page=idx, start=start, end=cursor))

    text = "".join(parts)
    if text.strip():
        return ParsedDocument(text=text, pages=pages, source_format="pdf")

    # Digital extraction yielded nothing — attempt OCR on image-based PDF
    logger.info("PDF has no extractable text — attempting OCR pipeline")
    return _ocr_pdf(io.BytesIO(raw_bytes))


def _ocr_pdf(stream: BinaryIO) -> ParsedDocument:
    try:
        import pytesseract
        from pdf2image import convert_from_bytes  # type: ignore[import]
        from PIL import Image  # type: ignore[import]
    except ImportError as exc:
        logger.warning(
            "OCR dependencies unavailable (%s). Install pytesseract + pdf2image + Pillow "
            "for scanned PDF support.",
            exc,
        )
        return ParsedDocument(text="", source_format="pdf_scanned_no_ocr")

    try:
        images = convert_from_bytes(stream.read(), dpi=200)
        parts: list[str] = []
        pages: list[PageSpan] = []
        cursor = 0

        for idx, img in enumerate(images, start=1):
            text = pytesseract.image_to_string(img, lang="eng").strip()
            if not text:
                continue
            if parts:
                parts.append("\n\n")
                cursor += 2
            start = cursor
            parts.append(text)
            cursor += len(text)
            pages.append(PageSpan(page=idx, start=start, end=cursor))

        ocr_text = "".join(parts)
        logger.info("OCR extracted %d chars from %d page(s)", len(ocr_text), len(images))
        return ParsedDocument(text=ocr_text, pages=pages, source_format="pdf_ocr", ocr_used=True)

    except Exception as exc:
        logger.exception("OCR pipeline failed: %s", exc)
        return ParsedDocument(text="", source_format="pdf_ocr_failed")


def _parse_docx(stream: BinaryIO) -> ParsedDocument:
    from docx import Document

    document = Document(stream)
    parts: list[str] = []
    cursor = 0
    pages: list[PageSpan] = []

    for para in document.paragraphs:
        text = para.text.strip()
        if not text:
            continue
        if parts:
            parts.append("\n\n")
            cursor += 2
        parts.append(text)
        cursor += len(text)

    for table in document.tables:
        for row in table.rows:
            cells = [cell.text.strip() for cell in row.cells if cell.text.strip()]
            if not cells:
                continue
            line = " | ".join(cells)
            if parts:
                parts.append("\n")
                cursor += 1
            parts.append(line)
            cursor += len(line)

    text = "".join(parts)
    if text:
        pages.append(PageSpan(page=1, start=0, end=len(text)))

    return ParsedDocument(text=text, pages=pages, source_format="docx")


def _parse_text(raw: bytes) -> ParsedDocument:
    for encoding in ("utf-8", "utf-16", "latin-1"):
        try:
            text = raw.decode(encoding).strip()
            break
        except UnicodeDecodeError:
            continue
    else:
        raise UnsupportedFormat("could not decode text file")

    return ParsedDocument(
        text=text,
        pages=[PageSpan(page=1, start=0, end=len(text))],
        source_format="text",
    )


def page_for_offset(pages: list[PageSpan], offset: int) -> int | None:
    for span in pages:
        if span.start <= offset < span.end:
            return span.page
    return pages[-1].page if pages else None