| | """ |
| | PDF Document Loading and Rendering |
| | |
| | Uses PyMuPDF (fitz) for PDF operations. |
| | Falls back to pdf2image + poppler if needed. |
| | """ |
| |
|
| | import logging |
| | from pathlib import Path |
| | from typing import Iterator, List, Optional, Tuple, Union |
| |
|
| | import numpy as np |
| | from PIL import Image |
| |
|
| | from .base import ( |
| | DocumentFormat, |
| | DocumentInfo, |
| | DocumentLoader, |
| | PageInfo, |
| | PageRenderer, |
| | RenderOptions, |
| | ) |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class PDFLoader(DocumentLoader): |
| | """ |
| | PDF document loader using PyMuPDF. |
| | |
| | Extracts metadata and provides page information. |
| | """ |
| |
|
| | def __init__(self): |
| | self._doc = None |
| | self._info: Optional[DocumentInfo] = None |
| | self._path: Optional[Path] = None |
| |
|
| | def load(self, path: Union[str, Path]) -> DocumentInfo: |
| | """Load PDF and extract metadata.""" |
| | try: |
| | import fitz |
| | except ImportError: |
| | raise ImportError( |
| | "PyMuPDF (fitz) is required for PDF loading. " |
| | "Install with: pip install pymupdf" |
| | ) |
| |
|
| | self._path = Path(path) |
| | if not self._path.exists(): |
| | raise FileNotFoundError(f"PDF file not found: {self._path}") |
| |
|
| | |
| | self.close() |
| |
|
| | |
| | self._doc = fitz.open(str(self._path)) |
| |
|
| | |
| | metadata = self._doc.metadata or {} |
| |
|
| | |
| | pages = [] |
| | has_text_layer = False |
| | has_images = False |
| |
|
| | for page_num in range(len(self._doc)): |
| | page = self._doc[page_num] |
| | rect = page.rect |
| |
|
| | |
| | page_has_text = len(page.get_text().strip()) > 0 |
| | if page_has_text: |
| | has_text_layer = True |
| |
|
| | |
| | image_list = page.get_images(full=True) |
| | if image_list: |
| | has_images = True |
| |
|
| | page_info = PageInfo( |
| | page_number=page_num + 1, |
| | width_pixels=int(rect.width), |
| | height_pixels=int(rect.height), |
| | width_points=rect.width, |
| | height_points=rect.height, |
| | dpi=72, |
| | rotation=page.rotation, |
| | has_text=page_has_text, |
| | has_images=len(image_list) > 0 |
| | ) |
| | pages.append(page_info) |
| |
|
| | |
| | is_scanned = has_images and not has_text_layer |
| |
|
| | self._info = DocumentInfo( |
| | path=self._path, |
| | format=DocumentFormat.PDF, |
| | num_pages=len(self._doc), |
| | pages=pages, |
| | title=metadata.get("title"), |
| | author=metadata.get("author"), |
| | subject=metadata.get("subject"), |
| | creator=metadata.get("creator"), |
| | creation_date=metadata.get("creationDate"), |
| | modification_date=metadata.get("modDate"), |
| | file_size_bytes=self._path.stat().st_size, |
| | is_encrypted=self._doc.is_encrypted, |
| | has_text_layer=has_text_layer, |
| | is_scanned=is_scanned, |
| | has_forms=self._doc.is_form_pdf, |
| | has_annotations=any( |
| | len(self._doc[i].annots()) > 0 |
| | for i in range(len(self._doc)) |
| | if self._doc[i].annots() is not None |
| | ) |
| | ) |
| |
|
| | return self._info |
| |
|
| | def close(self) -> None: |
| | """Close the PDF document.""" |
| | if self._doc is not None: |
| | self._doc.close() |
| | self._doc = None |
| |
|
| | def is_loaded(self) -> bool: |
| | """Check if a document is loaded.""" |
| | return self._doc is not None |
| |
|
| | @property |
| | def info(self) -> Optional[DocumentInfo]: |
| | """Get document info.""" |
| | return self._info |
| |
|
| | @property |
| | def document(self): |
| | """Get the underlying fitz document (for advanced use).""" |
| | return self._doc |
| |
|
| |
|
| | class PDFRenderer(PageRenderer): |
| | """ |
| | PDF page renderer using PyMuPDF. |
| | |
| | Renders PDF pages to images at specified DPI. |
| | """ |
| |
|
| | def __init__(self, loader: PDFLoader): |
| | self._loader = loader |
| |
|
| | def render_page( |
| | self, |
| | page_number: int, |
| | options: Optional[RenderOptions] = None |
| | ) -> np.ndarray: |
| | """Render a PDF page to an image.""" |
| | if not self._loader.is_loaded(): |
| | raise RuntimeError("No document loaded") |
| |
|
| | options = options or RenderOptions() |
| | doc = self._loader.document |
| |
|
| | |
| | if page_number < 1 or page_number > len(doc): |
| | raise ValueError(f"Invalid page number: {page_number}") |
| |
|
| | page = doc[page_number - 1] |
| |
|
| | |
| | |
| | zoom = options.dpi / 72.0 |
| | matrix = self._get_matrix(zoom) |
| |
|
| | |
| | if options.color_mode == "L": |
| | colorspace = self._get_grayscale_colorspace() |
| | else: |
| | colorspace = self._get_rgb_colorspace() |
| |
|
| | |
| | try: |
| | import fitz |
| |
|
| | pixmap = page.get_pixmap( |
| | matrix=matrix, |
| | colorspace=colorspace, |
| | alpha=options.color_mode == "RGBA" |
| | ) |
| |
|
| | |
| | if options.color_mode == "L": |
| | img = np.frombuffer(pixmap.samples, dtype=np.uint8) |
| | img = img.reshape(pixmap.height, pixmap.width) |
| | elif options.color_mode == "RGBA": |
| | img = np.frombuffer(pixmap.samples, dtype=np.uint8) |
| | img = img.reshape(pixmap.height, pixmap.width, 4) |
| | else: |
| | img = np.frombuffer(pixmap.samples, dtype=np.uint8) |
| | img = img.reshape(pixmap.height, pixmap.width, 3) |
| |
|
| | return img |
| |
|
| | except Exception as e: |
| | logger.error(f"Error rendering page {page_number}: {e}") |
| | raise |
| |
|
| | def _get_matrix(self, zoom: float): |
| | """Get transformation matrix for rendering.""" |
| | import fitz |
| | return fitz.Matrix(zoom, zoom) |
| |
|
| | def _get_rgb_colorspace(self): |
| | """Get RGB colorspace.""" |
| | import fitz |
| | return fitz.csRGB |
| |
|
| | def _get_grayscale_colorspace(self): |
| | """Get grayscale colorspace.""" |
| | import fitz |
| | return fitz.csGRAY |
| |
|
| | def render_pages( |
| | self, |
| | page_numbers: Optional[List[int]] = None, |
| | options: Optional[RenderOptions] = None |
| | ) -> Iterator[Tuple[int, np.ndarray]]: |
| | """Render multiple pages.""" |
| | if not self._loader.is_loaded(): |
| | raise RuntimeError("No document loaded") |
| |
|
| | info = self._loader.info |
| | if page_numbers is None: |
| | page_numbers = list(range(1, info.num_pages + 1)) |
| |
|
| | for page_num in page_numbers: |
| | yield page_num, self.render_page(page_num, options) |
| |
|
| |
|
| | class PDFTextExtractor: |
| | """ |
| | Extract text and text positions from PDF. |
| | |
| | Useful for PDFs with embedded text layer. |
| | """ |
| |
|
| | def __init__(self, loader: PDFLoader): |
| | self._loader = loader |
| |
|
| | def extract_text(self, page_number: int) -> str: |
| | """Extract plain text from a page.""" |
| | if not self._loader.is_loaded(): |
| | raise RuntimeError("No document loaded") |
| |
|
| | doc = self._loader.document |
| | page = doc[page_number - 1] |
| | return page.get_text() |
| |
|
| | def extract_text_with_positions( |
| | self, |
| | page_number: int |
| | ) -> List[dict]: |
| | """ |
| | Extract text with bounding box positions. |
| | |
| | Returns list of dicts with: |
| | - text: The text content |
| | - bbox: (x0, y0, x1, y1) in page coordinates |
| | - block_no: Block number |
| | - line_no: Line number within block |
| | - word_no: Word number within line |
| | """ |
| | if not self._loader.is_loaded(): |
| | raise RuntimeError("No document loaded") |
| |
|
| | doc = self._loader.document |
| | page = doc[page_number - 1] |
| |
|
| | |
| | text_dict = page.get_text("dict") |
| |
|
| | words = [] |
| | for block in text_dict.get("blocks", []): |
| | if block.get("type") != 0: |
| | continue |
| |
|
| | block_no = block.get("number", 0) |
| |
|
| | for line_no, line in enumerate(block.get("lines", [])): |
| | for word_no, span in enumerate(line.get("spans", [])): |
| | bbox = span.get("bbox", (0, 0, 0, 0)) |
| | words.append({ |
| | "text": span.get("text", ""), |
| | "bbox": bbox, |
| | "block_no": block_no, |
| | "line_no": line_no, |
| | "word_no": word_no, |
| | "font": span.get("font", ""), |
| | "size": span.get("size", 0), |
| | "flags": span.get("flags", 0), |
| | }) |
| |
|
| | return words |
| |
|
| | def get_page_dimensions(self, page_number: int) -> Tuple[float, float]: |
| | """Get page dimensions in points.""" |
| | if not self._loader.is_loaded(): |
| | raise RuntimeError("No document loaded") |
| |
|
| | doc = self._loader.document |
| | page = doc[page_number - 1] |
| | rect = page.rect |
| | return rect.width, rect.height |
| |
|
| |
|
| | def load_pdf(path: Union[str, Path]) -> Tuple[PDFLoader, PDFRenderer]: |
| | """ |
| | Convenience function to load a PDF. |
| | |
| | Returns: |
| | Tuple of (loader, renderer) |
| | |
| | Example: |
| | loader, renderer = load_pdf("document.pdf") |
| | info = loader.info |
| | for page_num in range(1, info.num_pages + 1): |
| | image = renderer.render_page(page_num) |
| | """ |
| | loader = PDFLoader() |
| | loader.load(path) |
| | renderer = PDFRenderer(loader) |
| | return loader, renderer |
| |
|