File size: 10,247 Bytes
c4ef1cf
 
 
 
 
 
 
 
 
 
 
 
 
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
 
 
 
 
 
 
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
 
 
 
9513cca
c4ef1cf
 
 
 
 
 
 
 
 
9513cca
 
 
 
 
c4ef1cf
9513cca
 
 
 
 
 
 
c4ef1cf
9513cca
 
c4ef1cf
9513cca
c4ef1cf
 
 
 
 
 
 
9513cca
c4ef1cf
 
 
9513cca
c4ef1cf
 
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
 
9513cca
c4ef1cf
 
 
 
 
 
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
 
 
 
 
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
9513cca
 
 
 
 
c4ef1cf
 
9513cca
c4ef1cf
 
 
 
 
 
 
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
 
 
 
 
9513cca
c4ef1cf
 
 
 
 
 
 
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
 
 
 
 
 
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
 
9513cca
 
 
 
c4ef1cf
9513cca
c4ef1cf
 
 
 
 
 
 
9513cca
c4ef1cf
 
 
9513cca
c4ef1cf
 
 
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
 
 
 
 
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
9513cca
 
c4ef1cf
 
 
 
 
 
 
9513cca
c4ef1cf
9513cca
c4ef1cf
 
 
9513cca
c4ef1cf
 
 
 
 
 
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
9513cca
c4ef1cf
 
 
 
9513cca
c4ef1cf
 
 
 
 
 
9513cca
c4ef1cf
 
 
 
 
 
 
 
 
 
 
9513cca
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
"""
PDF Processor - Convert PDFs to images and extract text.

This module works INDEPENDENTLY of embedding and vector storage.
Use it if you just need PDF β†’ images conversion.

Features:
- Batch processing to save memory
- Text extraction with surrogate character handling
- Configurable DPI and quality settings
"""

import gc
import logging
import re
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Tuple

from PIL import Image

logger = logging.getLogger(__name__)


class PDFProcessor:
    """
    Process PDFs into images and text for visual retrieval.

    Works independently - no embedding or storage dependencies.

    Args:
        dpi: DPI for image conversion (higher = better quality)
        output_format: Image format (RGB, L, etc.)
        page_batch_size: Pages per batch for memory efficiency

    Example:
        >>> processor = PDFProcessor(dpi=140)
        >>>
        >>> # Convert single PDF
        >>> images, texts = processor.process_pdf(Path("report.pdf"))
        >>>
        >>> # Stream large PDFs
        >>> for images, texts in processor.stream_pdf(Path("large.pdf"), batch_size=10):
        ...     # Process each batch
        ...     pass
    """

    def __init__(
        self,
        dpi: int = 140,
        output_format: str = "RGB",
        page_batch_size: int = 50,
    ):
        self.dpi = dpi
        self.output_format = output_format
        self.page_batch_size = page_batch_size

        # PDF deps are optional: we only require them when calling PDF-specific methods.
        # This keeps the class usable for helper utilities like `resize_for_colpali()`
        # even in minimal installs.
        self._pdf_deps_available = True
        try:
            import pdf2image  # noqa: F401
            import pypdf  # noqa: F401
        except Exception:
            self._pdf_deps_available = False

    def _require_pdf_deps(self) -> None:
        if not self._pdf_deps_available:
            raise ImportError(
                "PDF processing requires `pdf2image` and `pypdf`.\n"
                'Install with: pip install "visual-rag-toolkit[pdf]"'
            )

    def process_pdf(
        self,
        pdf_path: Path,
        dpi: Optional[int] = None,
    ) -> Tuple[List[Image.Image], List[str]]:
        """
        Convert PDF to images and extract text.

        Args:
            pdf_path: Path to PDF file
            dpi: Override default DPI

        Returns:
            Tuple of (list of images, list of page texts)
        """
        self._require_pdf_deps()
        from pdf2image import convert_from_path
        from pypdf import PdfReader

        dpi = dpi or self.dpi
        pdf_path = Path(pdf_path)

        logger.info(f"πŸ“„ Processing PDF: {pdf_path.name}")

        # Extract text
        reader = PdfReader(str(pdf_path))
        total_pages = len(reader.pages)

        page_texts = []
        for page in reader.pages:
            text = page.extract_text() or ""
            # Handle surrogate characters
            text = self._sanitize_text(text)
            page_texts.append(text)

        # Convert to images in batches
        all_images = []
        for start_page in range(1, total_pages + 1, self.page_batch_size):
            end_page = min(start_page + self.page_batch_size - 1, total_pages)

            batch_images = convert_from_path(
                str(pdf_path),
                dpi=dpi,
                fmt=self.output_format.lower(),
                first_page=start_page,
                last_page=end_page,
            )

            all_images.extend(batch_images)

            del batch_images
            gc.collect()

        assert len(all_images) == len(
            page_texts
        ), f"Mismatch: {len(all_images)} images vs {len(page_texts)} texts"

        logger.info(f"βœ… Processed {len(all_images)} pages")
        return all_images, page_texts

    def stream_pdf(
        self,
        pdf_path: Path,
        batch_size: int = 10,
        dpi: Optional[int] = None,
    ) -> Generator[Tuple[List[Image.Image], List[str], int], None, None]:
        """
        Stream PDF processing for large files.

        Yields batches of (images, texts, start_page) without loading
        entire PDF into memory.

        Args:
            pdf_path: Path to PDF file
            batch_size: Pages per batch
            dpi: Override default DPI

        Yields:
            Tuple of (batch_images, batch_texts, start_page_number)
        """
        self._require_pdf_deps()
        from pdf2image import convert_from_path
        from pypdf import PdfReader

        dpi = dpi or self.dpi
        pdf_path = Path(pdf_path)

        reader = PdfReader(str(pdf_path))
        total_pages = len(reader.pages)

        logger.info(f"πŸ“„ Streaming PDF: {pdf_path.name} ({total_pages} pages)")

        for start_idx in range(0, total_pages, batch_size):
            end_idx = min(start_idx + batch_size, total_pages)

            # Extract text for batch
            batch_texts = []
            for page_idx in range(start_idx, end_idx):
                text = reader.pages[page_idx].extract_text() or ""
                text = self._sanitize_text(text)
                batch_texts.append(text)

            # Convert images for batch
            batch_images = convert_from_path(
                str(pdf_path),
                dpi=dpi,
                fmt=self.output_format.lower(),
                first_page=start_idx + 1,  # 1-indexed
                last_page=end_idx,
            )

            yield batch_images, batch_texts, start_idx + 1

            del batch_images
            gc.collect()

    def get_page_count(self, pdf_path: Path) -> int:
        """Get number of pages in PDF without loading images."""
        self._require_pdf_deps()
        from pypdf import PdfReader

        reader = PdfReader(str(pdf_path))
        return len(reader.pages)

    def resize_for_colpali(
        self,
        image: Image.Image,
        max_edge: int = 2048,
        tile_size: int = 512,
    ) -> Tuple[Image.Image, int, int]:
        """
        Resize image following ColPali/Idefics3 processor logic.

        Resizes to fit within tile grid without black padding.

        Args:
            image: PIL Image
            max_edge: Maximum edge length
            tile_size: Size of each tile

        Returns:
            Tuple of (resized_image, tile_rows, tile_cols)
        """
        # Ensure consistent mode for downstream processors (and predictable tests)
        if image.mode != "RGB":
            image = image.convert("RGB")

        w, h = image.size

        # Step 1: Resize so longest edge = max_edge
        if w > h:
            new_w = max_edge
            new_h = int(h * (max_edge / w))
        else:
            new_h = max_edge
            new_w = int(w * (max_edge / h))

        # Step 2: Calculate tile grid
        tile_cols = (new_w + tile_size - 1) // tile_size
        tile_rows = (new_h + tile_size - 1) // tile_size

        # Step 3: Calculate exact dimensions for tiles
        final_w = tile_cols * tile_size
        final_h = tile_rows * tile_size

        # Step 4: Scale to fit within tile grid
        scale_w = final_w / w
        scale_h = final_h / h
        scale = min(scale_w, scale_h)

        scaled_w = int(w * scale)
        scaled_h = int(h * scale)

        resized = image.resize((scaled_w, scaled_h), Image.LANCZOS)

        # Center on white canvas if needed
        if scaled_w != final_w or scaled_h != final_h:
            canvas = Image.new("RGB", (final_w, final_h), (255, 255, 255))
            offset_x = (final_w - scaled_w) // 2
            offset_y = (final_h - scaled_h) // 2
            canvas.paste(resized, (offset_x, offset_y))
            resized = canvas

        return resized, tile_rows, tile_cols

    def _sanitize_text(self, text: str) -> str:
        """Remove invalid Unicode characters (surrogates) from text."""
        if not text:
            return ""

        # Remove surrogate characters (U+D800-U+DFFF)
        return text.encode("utf-8", errors="surrogatepass").decode("utf-8", errors="ignore")

    def extract_metadata_from_filename(
        self,
        filename: str,
        mapping: Optional[Dict[str, Dict[str, Any]]] = None,
    ) -> Dict[str, Any]:
        """
        Extract metadata from PDF filename.

        Uses mapping if provided, otherwise falls back to pattern matching.

        Args:
            filename: PDF filename (with or without .pdf extension)
            mapping: Optional mapping dict {filename: metadata}

        Returns:
            Metadata dict with year, source, district, etc.
        """
        # Remove extension
        stem = Path(filename).stem
        stem_lower = stem.lower().strip()

        # Try mapping first
        if mapping:
            if stem_lower in mapping:
                return mapping[stem_lower].copy()

            # Try without .pdf
            stem_no_ext = stem_lower.replace(".pdf", "")
            if stem_no_ext in mapping:
                return mapping[stem_no_ext].copy()

        # Fallback: pattern matching
        metadata = {"filename": filename}

        # Extract year
        year_match = re.search(r"(20\d{2})", stem)
        if year_match:
            metadata["year"] = int(year_match.group(1))

        # Detect source type
        if "consolidated" in stem_lower or ("annual" in stem_lower and "oag" in stem_lower):
            metadata["source"] = "Consolidated"
        elif "dlg" in stem_lower or "district local government" in stem_lower:
            metadata["source"] = "Local Government"
            # Try to extract district name
            district_match = re.search(r"([a-z]+)\s+(?:dlg|district local government)", stem_lower)
            if district_match:
                metadata["district"] = district_match.group(1).title()
        elif "hospital" in stem_lower or "referral" in stem_lower:
            metadata["source"] = "Hospital"
        elif "ministry" in stem_lower:
            metadata["source"] = "Ministry"
        elif "project" in stem_lower:
            metadata["source"] = "Project"
        else:
            metadata["source"] = "Unknown"

        return metadata