Spaces:
Running on T4
Running on T4
| """PDF-to-page-images rendering and image preprocessing.""" | |
| import os | |
| import tempfile | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import Optional | |
| import cv2 | |
| from pdf2image import convert_from_path | |
| from config import RENDER_DPI, logger | |
| def _preprocess_image_for_ocr(image_path: str) -> str: | |
| """Enhance image quality for better OCR accuracy.""" | |
| img = cv2.imread(image_path) | |
| if img is None: | |
| return image_path | |
| lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) | |
| l_channel, a_channel, b_channel = cv2.split(lab) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| l_channel = clahe.apply(l_channel) | |
| lab = cv2.merge([l_channel, a_channel, b_channel]) | |
| img = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) | |
| cv2.imwrite(image_path, img) | |
| return image_path | |
| def _render_single_page(input_path: Path, page_idx: int, dpi: int) -> tuple[int, Optional[bytes]]: | |
| """Render a single PDF page to PNG bytes with preprocessing.""" | |
| try: | |
| images = convert_from_path( | |
| str(input_path), dpi=dpi, first_page=page_idx + 1, last_page=page_idx + 1 | |
| ) | |
| if not images: | |
| return page_idx, None | |
| img = images[0] | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: | |
| tmp_path = tmp.name | |
| img.save(tmp_path, format="PNG") | |
| try: | |
| _preprocess_image_for_ocr(tmp_path) | |
| with open(tmp_path, "rb") as f: | |
| return page_idx, f.read() | |
| finally: | |
| os.unlink(tmp_path) | |
| except Exception as e: | |
| logger.warning(f"Failed to render page {page_idx + 1}: {e}") | |
| return page_idx, None | |
| def _pdf_to_page_images( | |
| input_path: Path, | |
| request_id: str, | |
| start_page: int = 0, | |
| end_page: Optional[int] = None, | |
| ) -> list[tuple[int, bytes]]: | |
| """Convert PDF pages to PNG image bytes using parallel rendering.""" | |
| try: | |
| from pdf2image.pdf2image import pdfinfo_from_path | |
| info = pdfinfo_from_path(str(input_path)) | |
| total_pages = info["Pages"] | |
| last_page = min(end_page + 1, total_pages) if end_page is not None else total_pages | |
| except Exception as e: | |
| logger.warning(f"[{request_id}] Could not get PDF info: {e}") | |
| return [] | |
| page_indices = list(range(start_page, last_page)) | |
| if not page_indices: | |
| return [] | |
| start_time = time.time() | |
| page_images: list[tuple[int, bytes]] = [] | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| futures = { | |
| executor.submit(_render_single_page, input_path, idx, RENDER_DPI): idx | |
| for idx in page_indices | |
| } | |
| for future in as_completed(futures): | |
| page_idx, png_bytes = future.result() | |
| if png_bytes is not None: | |
| page_images.append((page_idx, png_bytes)) | |
| page_images.sort(key=lambda x: x[0]) | |
| render_time = time.time() - start_time | |
| logger.info( | |
| f"[{request_id}] Rendered {len(page_images)} pages in {render_time:.2f}s " | |
| f"({render_time / max(len(page_images), 1):.1f}s/page, DPI={RENDER_DPI})" | |
| ) | |
| return page_images | |