"""PDF-to-page-images rendering and image preprocessing.""" import os import tempfile import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Optional import cv2 from pdf2image import convert_from_path from config import RENDER_DPI, logger def _preprocess_image_for_ocr(image_path: str) -> str: """Enhance image quality for better OCR accuracy.""" img = cv2.imread(image_path) if img is None: return image_path lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) l_channel, a_channel, b_channel = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) l_channel = clahe.apply(l_channel) lab = cv2.merge([l_channel, a_channel, b_channel]) img = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) cv2.imwrite(image_path, img) return image_path def _render_single_page(input_path: Path, page_idx: int, dpi: int) -> tuple[int, Optional[bytes]]: """Render a single PDF page to PNG bytes with preprocessing.""" try: images = convert_from_path( str(input_path), dpi=dpi, first_page=page_idx + 1, last_page=page_idx + 1 ) if not images: return page_idx, None img = images[0] with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: tmp_path = tmp.name img.save(tmp_path, format="PNG") try: _preprocess_image_for_ocr(tmp_path) with open(tmp_path, "rb") as f: return page_idx, f.read() finally: os.unlink(tmp_path) except Exception as e: logger.warning(f"Failed to render page {page_idx + 1}: {e}") return page_idx, None def _pdf_to_page_images( input_path: Path, request_id: str, start_page: int = 0, end_page: Optional[int] = None, ) -> list[tuple[int, bytes]]: """Convert PDF pages to PNG image bytes using parallel rendering.""" try: from pdf2image.pdf2image import pdfinfo_from_path info = pdfinfo_from_path(str(input_path)) total_pages = info["Pages"] last_page = min(end_page + 1, total_pages) if end_page is not None else total_pages except Exception as e: logger.warning(f"[{request_id}] Could not get PDF info: {e}") return [] page_indices = list(range(start_page, last_page)) if not page_indices: return [] start_time = time.time() page_images: list[tuple[int, bytes]] = [] with ThreadPoolExecutor(max_workers=4) as executor: futures = { executor.submit(_render_single_page, input_path, idx, RENDER_DPI): idx for idx in page_indices } for future in as_completed(futures): page_idx, png_bytes = future.result() if png_bytes is not None: page_images.append((page_idx, png_bytes)) page_images.sort(key=lambda x: x[0]) render_time = time.time() - start_time logger.info( f"[{request_id}] Rendered {len(page_images)} pages in {render_time:.2f}s " f"({render_time / max(len(page_images), 1):.1f}s/page, DPI={RENDER_DPI})" ) return page_images