File size: 4,673 Bytes
7248d39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""PyMuPDF-based PDF parsing utilities."""

from __future__ import annotations

from dataclasses import dataclass
from typing import List, Tuple

import fitz
from PIL import Image

SPARSE_TEXT_THRESHOLD = 100
_LINE_Y_TOLERANCE = 4.0
_SPACE_POINTS = 3.5


@dataclass
class PDFPage:
    page_number: int
    embedded_text: str
    image: Image.Image
    is_sparse: bool


def extract_pdf_pages(file_bytes: bytes, dpi_scale: float = 2.0) -> List[PDFPage]:
    doc = fitz.open(stream=file_bytes, filetype="pdf")
    pages = []
    try:
        for page_num, page in enumerate(doc):
            embedded_text = page.get_text("text")
            mat = fitz.Matrix(dpi_scale, dpi_scale)
            pix = page.get_pixmap(matrix=mat)
            img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
            is_sparse = len(embedded_text.strip()) < SPARSE_TEXT_THRESHOLD
            pages.append(
                PDFPage(
                    page_number=page_num + 1,
                    embedded_text=embedded_text,
                    image=img,
                    is_sparse=is_sparse,
                )
            )
    finally:
        doc.close()
    return pages


def _group_blocks_into_lines(
    blocks: List[Tuple[float, float, float, str]],
) -> List[List[Tuple[float, float, str]]]:
    blocks.sort(key=lambda item: (round(item[0], 1), item[1]))
    lines: List[List[Tuple[float, float, str]]] = []
    current_y: float | None = None
    current_line: List[Tuple[float, float, str]] = []

    for y0, x0, x1, text in blocks:
        if current_y is None or abs(y0 - current_y) > _LINE_Y_TOLERANCE:
            if current_line:
                lines.append(current_line)
            current_line = [(x0, x1, text)]
            current_y = y0
        else:
            current_line.append((x0, x1, text))

    if current_line:
        lines.append(current_line)
    return lines


def extract_page_spatial_text(page: fitz.Page) -> str:
    """Rebuild page text with column spacing from native PDF text blocks."""
    raw_blocks = page.get_text("blocks")
    text_blocks: List[Tuple[float, float, float, str]] = []

    for block in raw_blocks:
        if block[6] != 0:
            continue
        x0, y0, x1, y1, text, *_ = block
        cleaned = text.replace("\n", " ").strip()
        if cleaned:
            text_blocks.append((y0, x0, x1, cleaned))

    if not text_blocks:
        return page.get_text("text", sort=True).strip()

    lines_out: List[str] = []
    for line_blocks in _group_blocks_into_lines(text_blocks):
        line_blocks.sort(key=lambda item: item[0])
        parts: List[str] = []
        cursor_x = 0.0

        for x0, x1, text in line_blocks:
            if parts:
                gap = max(1, int((x0 - cursor_x) / _SPACE_POINTS))
                parts.append(" " * gap)
            else:
                leading = max(0, int(x0 / _SPACE_POINTS))
                if leading:
                    parts.append(" " * leading)
            parts.append(text)
            cursor_x = x1

        lines_out.append("".join(parts).rstrip())

    return "\n".join(lines_out).strip()


def extract_pdf_spatial_pages(file_bytes: bytes) -> List[Tuple[int, str, bool]]:
    """Return (page_num, spatial_text, is_sparse) for each PDF page."""
    doc = fitz.open(stream=file_bytes, filetype="pdf")
    pages: List[Tuple[int, str, bool]] = []
    try:
        for page_num, page in enumerate(doc, start=1):
            embedded = page.get_text("text").strip()
            is_sparse = len(embedded) < SPARSE_TEXT_THRESHOLD
            if is_sparse:
                pages.append((page_num, embedded, True))
            else:
                pages.append((page_num, extract_page_spatial_text(page), False))
    finally:
        doc.close()
    return pages


def render_page_image(
    file_bytes: bytes, page_num: int, dpi_scale: float = 2.0
) -> Image.Image:
    """Render a single PDF page — used only when chart OCR is needed."""
    doc = fitz.open(stream=file_bytes, filetype="pdf")
    try:
        page = doc[page_num - 1]
        mat = fitz.Matrix(dpi_scale, dpi_scale)
        pix = page.get_pixmap(matrix=mat)
        return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    finally:
        doc.close()


def render_page_png_base64(file_bytes: bytes, page_num: int = 1, dpi_scale: float = 2.0) -> str:
    import base64

    doc = fitz.open(stream=file_bytes, filetype="pdf")
    try:
        page = doc[page_num - 1]
        mat = fitz.Matrix(dpi_scale, dpi_scale)
        pix = page.get_pixmap(matrix=mat)
        return base64.b64encode(pix.tobytes("png")).decode("ascii")
    finally:
        doc.close()