| | """ |
| | Document Processor Pipeline |
| | |
| | Main pipeline that orchestrates document processing: |
| | 1. Load document |
| | 2. OCR (PaddleOCR or Tesseract) |
| | 3. Layout detection |
| | 4. Reading order reconstruction |
| | 5. Semantic chunking |
| | 6. Grounding evidence |
| | |
| | Outputs ProcessedDocument with all extracted information. |
| | """ |
| |
|
| | import time |
| | from pathlib import Path |
| | from typing import List, Optional, Dict, Any, Union |
| | from datetime import datetime |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| | import numpy as np |
| |
|
| | from ..schemas.core import ( |
| | ProcessedDocument, |
| | DocumentMetadata, |
| | DocumentChunk, |
| | OCRRegion, |
| | LayoutRegion, |
| | ) |
| | from ..io.loader import load_document, LoadedDocument |
| | from ..io.cache import get_document_cache |
| | from ..ocr import get_ocr_engine, OCRConfig, OCRResult |
| | from ..layout import get_layout_detector, LayoutConfig, LayoutResult |
| | from ..reading_order import get_reading_order_reconstructor, ReadingOrderConfig |
| | from ..chunking import get_document_chunker, ChunkerConfig |
| |
|
| |
|
| | class PipelineConfig(BaseModel): |
| | """Configuration for the document processing pipeline.""" |
| | |
| | ocr: OCRConfig = Field(default_factory=OCRConfig) |
| | layout: LayoutConfig = Field(default_factory=LayoutConfig) |
| | reading_order: ReadingOrderConfig = Field(default_factory=ReadingOrderConfig) |
| | chunking: ChunkerConfig = Field(default_factory=ChunkerConfig) |
| |
|
| | |
| | render_dpi: int = Field(default=300, ge=72, description="DPI for PDF rendering") |
| | enable_caching: bool = Field(default=True, description="Cache rendered pages") |
| | parallel_pages: bool = Field(default=False, description="Process pages in parallel") |
| | max_pages: Optional[int] = Field(default=None, description="Max pages to process") |
| |
|
| | |
| | include_ocr_regions: bool = Field(default=True) |
| | include_layout_regions: bool = Field(default=True) |
| | generate_full_text: bool = Field(default=True) |
| |
|
| |
|
| | class DocumentProcessor: |
| | """ |
| | Main document processing pipeline. |
| | |
| | Provides end-to-end document processing with: |
| | - Multi-format support (PDF, images) |
| | - Pluggable OCR engines |
| | - Layout detection |
| | - Reading order reconstruction |
| | - Semantic chunking |
| | """ |
| |
|
| | def __init__(self, config: Optional[PipelineConfig] = None): |
| | """ |
| | Initialize document processor. |
| | |
| | Args: |
| | config: Pipeline configuration |
| | """ |
| | self.config = config or PipelineConfig() |
| | self._initialized = False |
| |
|
| | |
| | self._ocr_engine = None |
| | self._layout_detector = None |
| | self._reading_order = None |
| | self._chunker = None |
| |
|
| | def initialize(self): |
| | """Initialize all pipeline components.""" |
| | if self._initialized: |
| | return |
| |
|
| | logger.info("Initializing document processing pipeline...") |
| |
|
| | |
| | self._ocr_engine = get_ocr_engine( |
| | engine_type=self.config.ocr.engine, |
| | config=self.config.ocr, |
| | ) |
| |
|
| | |
| | from ..layout.detector import create_layout_detector |
| | self._layout_detector = create_layout_detector(self.config.layout, initialize=True) |
| |
|
| | |
| | self._reading_order = get_reading_order_reconstructor(self.config.reading_order) |
| |
|
| | |
| | self._chunker = get_document_chunker(self.config.chunking) |
| |
|
| | self._initialized = True |
| | logger.info("Document processing pipeline initialized") |
| |
|
| | def process( |
| | self, |
| | source: Union[str, Path], |
| | document_id: Optional[str] = None, |
| | ) -> ProcessedDocument: |
| | """ |
| | Process a document through the full pipeline. |
| | |
| | Args: |
| | source: Path to document |
| | document_id: Optional document ID |
| | |
| | Returns: |
| | ProcessedDocument with all extracted information |
| | """ |
| | if not self._initialized: |
| | self.initialize() |
| |
|
| | start_time = time.time() |
| | source_path = str(Path(source).absolute()) |
| |
|
| | logger.info(f"Processing document: {source_path}") |
| |
|
| | try: |
| | |
| | loaded_doc = load_document(source_path, document_id) |
| | document_id = loaded_doc.document_id |
| |
|
| | |
| | num_pages = loaded_doc.num_pages |
| | if self.config.max_pages: |
| | num_pages = min(num_pages, self.config.max_pages) |
| |
|
| | logger.info(f"Document loaded: {num_pages} pages") |
| |
|
| | |
| | all_ocr_regions: List[OCRRegion] = [] |
| | all_layout_regions: List[LayoutRegion] = [] |
| | page_dimensions = [] |
| |
|
| | for page_num in range(num_pages): |
| | logger.debug(f"Processing page {page_num + 1}/{num_pages}") |
| |
|
| | |
| | page_image = self._get_page_image(loaded_doc, page_num) |
| | height, width = page_image.shape[:2] |
| | page_dimensions.append((width, height)) |
| |
|
| | |
| | ocr_result = self._ocr_engine.recognize(page_image, page_num) |
| | if ocr_result.success: |
| | all_ocr_regions.extend(ocr_result.regions) |
| |
|
| | |
| | layout_result = self._layout_detector.detect( |
| | page_image, |
| | page_num, |
| | ocr_result.regions if ocr_result.success else None, |
| | ) |
| | if layout_result.success: |
| | all_layout_regions.extend(layout_result.regions) |
| |
|
| | |
| | if all_ocr_regions: |
| | reading_result = self._reading_order.reconstruct( |
| | all_ocr_regions, |
| | all_layout_regions, |
| | page_width=page_dimensions[0][0] if page_dimensions else None, |
| | page_height=page_dimensions[0][1] if page_dimensions else None, |
| | ) |
| |
|
| | |
| | if reading_result.success and reading_result.order: |
| | all_ocr_regions = [all_ocr_regions[i] for i in reading_result.order] |
| |
|
| | |
| | chunks = self._chunker.create_chunks( |
| | all_ocr_regions, |
| | all_layout_regions if self.config.include_layout_regions else None, |
| | document_id, |
| | source_path, |
| | ) |
| |
|
| | |
| | full_text = "" |
| | if self.config.generate_full_text and all_ocr_regions: |
| | full_text = self._generate_full_text(all_ocr_regions) |
| |
|
| | |
| | ocr_confidence_avg = None |
| | if all_ocr_regions: |
| | ocr_confidence_avg = sum(r.confidence for r in all_ocr_regions) / len(all_ocr_regions) |
| |
|
| | layout_confidence_avg = None |
| | if all_layout_regions: |
| | layout_confidence_avg = sum(r.confidence for r in all_layout_regions) / len(all_layout_regions) |
| |
|
| | |
| | metadata = DocumentMetadata( |
| | document_id=document_id, |
| | source_path=source_path, |
| | filename=loaded_doc.filename, |
| | file_type=loaded_doc.file_type, |
| | file_size_bytes=loaded_doc.file_size_bytes, |
| | num_pages=loaded_doc.num_pages, |
| | page_dimensions=page_dimensions, |
| | processed_at=datetime.utcnow(), |
| | total_chunks=len(chunks), |
| | total_characters=len(full_text), |
| | ocr_confidence_avg=ocr_confidence_avg, |
| | layout_confidence_avg=layout_confidence_avg, |
| | ) |
| |
|
| | |
| | result = ProcessedDocument( |
| | metadata=metadata, |
| | ocr_regions=all_ocr_regions if self.config.include_ocr_regions else [], |
| | layout_regions=all_layout_regions if self.config.include_layout_regions else [], |
| | chunks=chunks, |
| | full_text=full_text, |
| | status="completed", |
| | ) |
| |
|
| | processing_time = time.time() - start_time |
| | logger.info( |
| | f"Document processed in {processing_time:.2f}s: " |
| | f"{len(all_ocr_regions)} OCR regions, " |
| | f"{len(all_layout_regions)} layout regions, " |
| | f"{len(chunks)} chunks" |
| | ) |
| |
|
| | return result |
| |
|
| | except Exception as e: |
| | logger.error(f"Document processing failed: {e}") |
| | raise |
| |
|
| | finally: |
| | |
| | if 'loaded_doc' in locals(): |
| | loaded_doc.close() |
| |
|
| | def _get_page_image( |
| | self, |
| | doc: LoadedDocument, |
| | page_num: int, |
| | ) -> np.ndarray: |
| | """Get page image, using cache if enabled.""" |
| | if self.config.enable_caching: |
| | cache = get_document_cache() |
| | cached = cache.get(doc.document_id, page_num, self.config.render_dpi) |
| | if cached is not None: |
| | return cached |
| |
|
| | |
| | image = doc.get_page_image(page_num, self.config.render_dpi) |
| |
|
| | |
| | if self.config.enable_caching: |
| | cache = get_document_cache() |
| | cache.put(doc.document_id, page_num, self.config.render_dpi, image) |
| |
|
| | return image |
| |
|
| | def _generate_full_text(self, ocr_regions: List[OCRRegion]) -> str: |
| | """Generate full text from OCR regions in reading order.""" |
| | |
| | by_page: Dict[int, List[OCRRegion]] = {} |
| | for r in ocr_regions: |
| | if r.page not in by_page: |
| | by_page[r.page] = [] |
| | by_page[r.page].append(r) |
| |
|
| | |
| | pages_text = [] |
| | for page_num in sorted(by_page.keys()): |
| | page_regions = by_page[page_num] |
| | page_text = " ".join(r.text for r in page_regions) |
| | pages_text.append(page_text) |
| |
|
| | return "\n\n".join(pages_text) |
| |
|
| | def process_batch( |
| | self, |
| | sources: List[Union[str, Path]], |
| | ) -> List[ProcessedDocument]: |
| | """ |
| | Process multiple documents. |
| | |
| | Args: |
| | sources: List of document paths |
| | |
| | Returns: |
| | List of ProcessedDocument |
| | """ |
| | results = [] |
| | for source in sources: |
| | try: |
| | result = self.process(source) |
| | results.append(result) |
| | except Exception as e: |
| | logger.error(f"Failed to process {source}: {e}") |
| | |
| |
|
| | return results |
| |
|
| |
|
| | |
| | _document_processor: Optional[DocumentProcessor] = None |
| |
|
| |
|
| | def get_document_processor( |
| | config: Optional[PipelineConfig] = None, |
| | ) -> DocumentProcessor: |
| | """Get or create singleton document processor.""" |
| | global _document_processor |
| | if _document_processor is None: |
| | _document_processor = DocumentProcessor(config) |
| | _document_processor.initialize() |
| | return _document_processor |
| |
|
| |
|
| | def process_document( |
| | source: Union[str, Path], |
| | document_id: Optional[str] = None, |
| | config: Optional[PipelineConfig] = None, |
| | ) -> ProcessedDocument: |
| | """ |
| | Convenience function to process a document. |
| | |
| | Args: |
| | source: Document path |
| | document_id: Optional document ID |
| | config: Optional pipeline configuration |
| | |
| | Returns: |
| | ProcessedDocument |
| | """ |
| | processor = get_document_processor(config) |
| | return processor.process(source, document_id) |
| |
|