| """PDF file processor with OCR support for scanned PDFs.""" |
|
|
| import os |
| import logging |
| import tempfile |
| from typing import Dict, Any, List, Tuple |
|
|
| from .base import BaseProcessor |
| from .image_processor import ImageProcessor |
| from ..result import ConversionResult |
| from ..exceptions import ConversionError, FileNotFoundError |
| from ..config import InternalConfig |
| from ..pipeline.ocr_service import OCRServiceFactory, NeuralOCRService |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class PDFProcessor(BaseProcessor): |
| """Processor for PDF files using PDF-to-image conversion with OCR.""" |
| |
| def __init__(self, preserve_layout: bool = True, include_images: bool = False, ocr_enabled: bool = True, use_markdownify: bool = None): |
| super().__init__(preserve_layout, include_images, ocr_enabled, use_markdownify) |
| |
| shared_ocr_service = NeuralOCRService() |
| self._image_processor = ImageProcessor( |
| preserve_layout=preserve_layout, |
| include_images=include_images, |
| ocr_enabled=ocr_enabled, |
| use_markdownify=use_markdownify, |
| ocr_service=shared_ocr_service |
| ) |
| |
| def can_process(self, file_path: str) -> bool: |
| """Check if this processor can handle the given file. |
| |
| Args: |
| file_path: Path to the file to check |
| |
| Returns: |
| True if this processor can handle the file |
| """ |
| if not os.path.exists(file_path): |
| return False |
| |
| |
| file_path_str = str(file_path) |
| _, ext = os.path.splitext(file_path_str.lower()) |
| return ext == '.pdf' |
| |
| def process(self, file_path: str) -> ConversionResult: |
| """Process PDF file with OCR capabilities. |
| |
| Args: |
| file_path: Path to the PDF file |
| |
| Returns: |
| ConversionResult with extracted content |
| """ |
| try: |
| from ..config import InternalConfig |
| pdf_to_image_enabled = InternalConfig.pdf_to_image_enabled |
| except (ImportError, AttributeError): |
| |
| pdf_to_image_enabled = True |
| logger.warning("InternalConfig not available, defaulting to pdf_to_image_enabled = True") |
| |
| try: |
| if not os.path.exists(file_path): |
| raise FileNotFoundError(f"PDF file not found: {file_path}") |
| |
| logger.info(f"Processing PDF file: {file_path}") |
| logger.info(f"pdf_to_image_enabled = {pdf_to_image_enabled}") |
| |
| |
| logger.info("Using OCR-based PDF processing with pdf2image") |
| return self._process_with_ocr(file_path) |
| |
| except Exception as e: |
| logger.error(f"Failed to process PDF file {file_path}: {e}") |
| raise ConversionError(f"PDF processing failed: {e}") |
| |
| def _process_with_ocr(self, file_path: str) -> ConversionResult: |
| """Process PDF using OCR after converting pages to images.""" |
| try: |
| from pdf2image import convert_from_path |
| from ..config import InternalConfig |
| |
| |
| dpi = getattr(InternalConfig, 'pdf_image_dpi', 300) |
| |
| |
| images = convert_from_path(file_path, dpi=dpi) |
| page_count = len(images) |
| all_content = [] |
| |
| for page_num, image in enumerate(images): |
| |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: |
| image.save(tmp.name, 'PNG') |
| temp_image_path = tmp.name |
| |
| try: |
| |
| page_result = self._image_processor.process(temp_image_path) |
| page_content = page_result.content |
| |
| if page_content.strip(): |
| all_content.append(f"## Page {page_num + 1}\n\n{page_content}") |
| |
| finally: |
| |
| os.unlink(temp_image_path) |
| |
| content = "\n\n".join(all_content) if all_content else "No content extracted from PDF" |
| |
| return ConversionResult( |
| content=content, |
| metadata={ |
| 'file_path': file_path, |
| 'file_type': 'pdf', |
| 'pages': page_count, |
| 'extraction_method': 'ocr' |
| } |
| ) |
| |
| except ImportError: |
| logger.error("pdf2image not available. Please install it: pip install pdf2image") |
| raise ConversionError("pdf2image is required for PDF processing") |
| except Exception as e: |
| logger.error(f"OCR-based PDF processing failed: {e}") |
| raise ConversionError(f"OCR-based PDF processing failed: {e}") |
|
|
| @staticmethod |
| def predownload_ocr_models(): |
| """Pre-download OCR models by running a dummy prediction.""" |
| try: |
| |
| ImageProcessor.predownload_ocr_models() |
| except Exception as e: |
| logger.error(f"Failed to pre-download OCR models: {e}") |