| | """ |
| | Document Parser |
| | |
| | Main orchestrator for document parsing pipeline. |
| | Coordinates OCR, layout detection, and chunk generation. |
| | """ |
| |
|
| | import logging |
| | import time |
| | from dataclasses import dataclass, field |
| | from pathlib import Path |
| | from typing import Any, Dict, Iterator, List, Optional, Tuple, Union |
| |
|
| | import numpy as np |
| |
|
| | from ..chunks.models import ( |
| | BoundingBox, |
| | ChunkType, |
| | DocumentChunk, |
| | PageResult, |
| | ParseResult, |
| | TableChunk, |
| | ChartChunk, |
| | ) |
| | from ..io import ( |
| | DocumentFormat, |
| | DocumentInfo, |
| | RenderOptions, |
| | load_document, |
| | get_document_cache, |
| | ) |
| | from ..models import ( |
| | OCRModel, |
| | OCRResult, |
| | LayoutModel, |
| | LayoutResult, |
| | LayoutRegion, |
| | LayoutRegionType, |
| | TableModel, |
| | TableStructure, |
| | ChartModel, |
| | ChartStructure, |
| | ) |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @dataclass |
| | class ParserConfig: |
| | """Configuration for document parser.""" |
| |
|
| | |
| | render_dpi: int = 200 |
| | max_pages: Optional[int] = None |
| |
|
| | |
| | ocr_enabled: bool = True |
| | ocr_languages: List[str] = field(default_factory=lambda: ["en"]) |
| | ocr_min_confidence: float = 0.5 |
| |
|
| | |
| | layout_enabled: bool = True |
| | reading_order_enabled: bool = True |
| |
|
| | |
| | table_extraction_enabled: bool = True |
| | chart_extraction_enabled: bool = True |
| |
|
| | |
| | merge_adjacent_text: bool = True |
| | min_chunk_chars: int = 10 |
| | max_chunk_chars: int = 4000 |
| |
|
| | |
| | cache_enabled: bool = True |
| |
|
| | |
| | include_markdown: bool = True |
| | include_raw_ocr: bool = False |
| |
|
| |
|
| | class DocumentParser: |
| | """ |
| | Main document parsing orchestrator. |
| | |
| | Coordinates the full pipeline: |
| | 1. Load document and render pages |
| | 2. Run OCR on each page |
| | 3. Detect layout regions |
| | 4. Extract tables and charts |
| | 5. Generate semantic chunks |
| | 6. Build reading order |
| | 7. Produce final ParseResult |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | config: Optional[ParserConfig] = None, |
| | ocr_model: Optional[OCRModel] = None, |
| | layout_model: Optional[LayoutModel] = None, |
| | table_model: Optional[TableModel] = None, |
| | chart_model: Optional[ChartModel] = None, |
| | ): |
| | self.config = config or ParserConfig() |
| | self.ocr_model = ocr_model |
| | self.layout_model = layout_model |
| | self.table_model = table_model |
| | self.chart_model = chart_model |
| |
|
| | self._cache = get_document_cache() if self.config.cache_enabled else None |
| |
|
| | def parse( |
| | self, |
| | path: Union[str, Path], |
| | page_range: Optional[Tuple[int, int]] = None, |
| | ) -> ParseResult: |
| | """ |
| | Parse a document and return structured results. |
| | |
| | Args: |
| | path: Path to document file |
| | page_range: Optional (start, end) page range (1-indexed, inclusive) |
| | |
| | Returns: |
| | ParseResult with chunks and metadata |
| | """ |
| | path = Path(path) |
| | start_time = time.time() |
| |
|
| | logger.info(f"Parsing document: {path}") |
| |
|
| | |
| | loader, renderer = load_document(path) |
| | doc_info = loader.info |
| |
|
| | |
| | doc_id = doc_info.doc_id |
| |
|
| | |
| | start_page = page_range[0] if page_range else 1 |
| | end_page = page_range[1] if page_range else doc_info.num_pages |
| |
|
| | if self.config.max_pages: |
| | end_page = min(end_page, start_page + self.config.max_pages - 1) |
| |
|
| | page_numbers = list(range(start_page, end_page + 1)) |
| |
|
| | logger.info(f"Processing pages {start_page}-{end_page} of {doc_info.num_pages}") |
| |
|
| | |
| | page_results: List[PageResult] = [] |
| | all_chunks: List[DocumentChunk] = [] |
| | markdown_by_page: Dict[int, str] = {} |
| | sequence_index = 0 |
| |
|
| | render_options = RenderOptions(dpi=self.config.render_dpi) |
| |
|
| | for page_num, page_image in renderer.render_pages(page_numbers, render_options): |
| | logger.debug(f"Processing page {page_num}") |
| |
|
| | |
| | page_result, page_chunks = self._process_page( |
| | page_image=page_image, |
| | page_number=page_num, |
| | doc_id=doc_id, |
| | sequence_start=sequence_index, |
| | ) |
| |
|
| | page_results.append(page_result) |
| | all_chunks.extend(page_chunks) |
| | sequence_index += len(page_chunks) |
| |
|
| | |
| | if self.config.include_markdown: |
| | markdown_by_page[page_num] = self._generate_page_markdown(page_chunks) |
| |
|
| | |
| | loader.close() |
| |
|
| | |
| | markdown_full = "\n\n---\n\n".join( |
| | f"## Page {p}\n\n{md}" |
| | for p, md in sorted(markdown_by_page.items()) |
| | ) |
| |
|
| | processing_time = time.time() - start_time |
| | logger.info(f"Parsed {len(all_chunks)} chunks in {processing_time:.2f}s") |
| |
|
| | return ParseResult( |
| | doc_id=doc_id, |
| | source_path=str(path.absolute()), |
| | filename=path.name, |
| | num_pages=doc_info.num_pages, |
| | pages=page_results, |
| | chunks=all_chunks, |
| | markdown_full=markdown_full, |
| | markdown_by_page=markdown_by_page, |
| | processing_time_ms=processing_time * 1000, |
| | metadata={ |
| | "format": doc_info.format.value, |
| | "has_text_layer": doc_info.has_text_layer, |
| | "is_scanned": doc_info.is_scanned, |
| | "render_dpi": self.config.render_dpi, |
| | } |
| | ) |
| |
|
| | def _process_page( |
| | self, |
| | page_image: np.ndarray, |
| | page_number: int, |
| | doc_id: str, |
| | sequence_start: int, |
| | ) -> Tuple[PageResult, List[DocumentChunk]]: |
| | """Process a single page.""" |
| | height, width = page_image.shape[:2] |
| | chunks: List[DocumentChunk] = [] |
| | sequence_index = sequence_start |
| |
|
| | |
| | ocr_result: Optional[OCRResult] = None |
| | if self.config.ocr_enabled and self.ocr_model: |
| | ocr_result = self.ocr_model.recognize(page_image) |
| |
|
| | |
| | layout_result: Optional[LayoutResult] = None |
| | if self.config.layout_enabled and self.layout_model: |
| | layout_result = self.layout_model.detect(page_image) |
| |
|
| | |
| | if layout_result and layout_result.regions: |
| | for region in layout_result.get_ordered_regions(): |
| | region_chunks = self._process_region( |
| | page_image=page_image, |
| | region=region, |
| | ocr_result=ocr_result, |
| | page_number=page_number, |
| | doc_id=doc_id, |
| | sequence_index=sequence_index, |
| | image_size=(width, height), |
| | ) |
| | chunks.extend(region_chunks) |
| | sequence_index += len(region_chunks) |
| |
|
| | elif ocr_result and ocr_result.blocks: |
| | |
| | for block in ocr_result.blocks: |
| | chunk = self._create_text_chunk( |
| | text=block.text, |
| | bbox=block.bbox, |
| | confidence=block.confidence, |
| | page_number=page_number, |
| | doc_id=doc_id, |
| | sequence_index=sequence_index, |
| | chunk_type=ChunkType.PARAGRAPH, |
| | ) |
| | chunks.append(chunk) |
| | sequence_index += 1 |
| |
|
| | |
| | if self.config.merge_adjacent_text: |
| | chunks = self._merge_adjacent_chunks(chunks) |
| |
|
| | |
| | page_result = PageResult( |
| | page_number=page_number, |
| | width=width, |
| | height=height, |
| | chunks=[c.chunk_id for c in chunks], |
| | ocr_confidence=ocr_result.confidence if ocr_result else None, |
| | ) |
| |
|
| | return page_result, chunks |
| |
|
| | def _process_region( |
| | self, |
| | page_image: np.ndarray, |
| | region: LayoutRegion, |
| | ocr_result: Optional[OCRResult], |
| | page_number: int, |
| | doc_id: str, |
| | sequence_index: int, |
| | image_size: Tuple[int, int], |
| | ) -> List[DocumentChunk]: |
| | """Process a single layout region.""" |
| | chunks: List[DocumentChunk] = [] |
| | width, height = image_size |
| |
|
| | |
| | bbox = region.bbox |
| | if not bbox.normalized: |
| | bbox = bbox.to_normalized(width, height) |
| |
|
| | |
| | if region.region_type == LayoutRegionType.TABLE: |
| | table_chunk = self._extract_table( |
| | page_image=page_image, |
| | region=region, |
| | page_number=page_number, |
| | doc_id=doc_id, |
| | sequence_index=sequence_index, |
| | ) |
| | if table_chunk: |
| | chunks.append(table_chunk) |
| |
|
| | elif region.region_type in {LayoutRegionType.CHART, LayoutRegionType.FIGURE}: |
| | |
| | chart_chunk = self._extract_chart( |
| | page_image=page_image, |
| | region=region, |
| | page_number=page_number, |
| | doc_id=doc_id, |
| | sequence_index=sequence_index, |
| | ) |
| | if chart_chunk: |
| | chunks.append(chart_chunk) |
| | else: |
| | |
| | text = self._get_region_text(region, ocr_result) or "[Figure]" |
| | chunk = self._create_text_chunk( |
| | text=text, |
| | bbox=bbox, |
| | confidence=region.confidence, |
| | page_number=page_number, |
| | doc_id=doc_id, |
| | sequence_index=sequence_index, |
| | chunk_type=ChunkType.FIGURE, |
| | ) |
| | chunks.append(chunk) |
| |
|
| | else: |
| | |
| | text = self._get_region_text(region, ocr_result) |
| | if text and len(text.strip()) >= self.config.min_chunk_chars: |
| | chunk_type = region.region_type.to_chunk_type() |
| | chunk = self._create_text_chunk( |
| | text=text, |
| | bbox=bbox, |
| | confidence=region.confidence, |
| | page_number=page_number, |
| | doc_id=doc_id, |
| | sequence_index=sequence_index, |
| | chunk_type=chunk_type, |
| | ) |
| | chunks.append(chunk) |
| |
|
| | return chunks |
| |
|
| | def _get_region_text( |
| | self, |
| | region: LayoutRegion, |
| | ocr_result: Optional[OCRResult], |
| | ) -> str: |
| | """Get text for a region from OCR result.""" |
| | if not ocr_result: |
| | return "" |
| |
|
| | return ocr_result.get_text_in_region(region.bbox, threshold=0.3) |
| |
|
| | def _extract_table( |
| | self, |
| | page_image: np.ndarray, |
| | region: LayoutRegion, |
| | page_number: int, |
| | doc_id: str, |
| | sequence_index: int, |
| | ) -> Optional[TableChunk]: |
| | """Extract table structure from a region.""" |
| | if not self.config.table_extraction_enabled or not self.table_model: |
| | return None |
| |
|
| | try: |
| | table_structure = self.table_model.extract_structure( |
| | page_image, |
| | region.bbox |
| | ) |
| |
|
| | if table_structure.num_rows > 0: |
| | return table_structure.to_table_chunk( |
| | doc_id=doc_id, |
| | page=page_number, |
| | sequence_index=sequence_index, |
| | ) |
| | except Exception as e: |
| | logger.warning(f"Table extraction failed: {e}") |
| |
|
| | return None |
| |
|
| | def _extract_chart( |
| | self, |
| | page_image: np.ndarray, |
| | region: LayoutRegion, |
| | page_number: int, |
| | doc_id: str, |
| | sequence_index: int, |
| | ) -> Optional[ChartChunk]: |
| | """Extract chart data from a region.""" |
| | if not self.config.chart_extraction_enabled or not self.chart_model: |
| | return None |
| |
|
| | try: |
| | chart_structure = self.chart_model.extract_chart( |
| | page_image, |
| | region.bbox |
| | ) |
| |
|
| | if chart_structure.chart_type.value != "unknown": |
| | return chart_structure.to_chart_chunk( |
| | doc_id=doc_id, |
| | page=page_number, |
| | sequence_index=sequence_index, |
| | ) |
| | except Exception as e: |
| | logger.warning(f"Chart extraction failed: {e}") |
| |
|
| | return None |
| |
|
| | def _create_text_chunk( |
| | self, |
| | text: str, |
| | bbox: BoundingBox, |
| | confidence: float, |
| | page_number: int, |
| | doc_id: str, |
| | sequence_index: int, |
| | chunk_type: ChunkType, |
| | ) -> DocumentChunk: |
| | """Create a text chunk.""" |
| | chunk_id = DocumentChunk.generate_chunk_id( |
| | doc_id=doc_id, |
| | page=page_number, |
| | bbox=bbox, |
| | chunk_type_str=chunk_type.value, |
| | ) |
| |
|
| | return DocumentChunk( |
| | chunk_id=chunk_id, |
| | doc_id=doc_id, |
| | chunk_type=chunk_type, |
| | text=text, |
| | page=page_number, |
| | bbox=bbox, |
| | confidence=confidence, |
| | sequence_index=sequence_index, |
| | ) |
| |
|
| | def _merge_adjacent_chunks( |
| | self, |
| | chunks: List[DocumentChunk], |
| | ) -> List[DocumentChunk]: |
| | """Merge adjacent text chunks of the same type.""" |
| | if len(chunks) <= 1: |
| | return chunks |
| |
|
| | merged: List[DocumentChunk] = [] |
| | current: Optional[DocumentChunk] = None |
| |
|
| | mergeable_types = { |
| | ChunkType.TEXT, |
| | ChunkType.PARAGRAPH, |
| | } |
| |
|
| | for chunk in chunks: |
| | if current is None: |
| | current = chunk |
| | continue |
| |
|
| | |
| | can_merge = ( |
| | current.chunk_type in mergeable_types and |
| | chunk.chunk_type in mergeable_types and |
| | current.chunk_type == chunk.chunk_type and |
| | current.page == chunk.page and |
| | self._chunks_adjacent(current, chunk) |
| | ) |
| |
|
| | if can_merge: |
| | |
| | merged_text = current.text + "\n" + chunk.text |
| | if len(merged_text) <= self.config.max_chunk_chars: |
| | current = DocumentChunk( |
| | chunk_id=current.chunk_id, |
| | doc_id=current.doc_id, |
| | chunk_type=current.chunk_type, |
| | text=merged_text, |
| | page=current.page, |
| | bbox=self._merge_bboxes(current.bbox, chunk.bbox), |
| | confidence=min(current.confidence, chunk.confidence), |
| | sequence_index=current.sequence_index, |
| | ) |
| | else: |
| | merged.append(current) |
| | current = chunk |
| | else: |
| | merged.append(current) |
| | current = chunk |
| |
|
| | if current: |
| | merged.append(current) |
| |
|
| | return merged |
| |
|
| | def _chunks_adjacent( |
| | self, |
| | chunk1: DocumentChunk, |
| | chunk2: DocumentChunk, |
| | gap_threshold: float = 0.05, |
| | ) -> bool: |
| | """Check if two chunks are vertically adjacent.""" |
| | |
| | gap = chunk2.bbox.y_min - chunk1.bbox.y_max |
| | return 0 <= gap <= gap_threshold |
| |
|
| | def _merge_bboxes( |
| | self, |
| | bbox1: BoundingBox, |
| | bbox2: BoundingBox, |
| | ) -> BoundingBox: |
| | """Merge two bounding boxes.""" |
| | return BoundingBox( |
| | x_min=min(bbox1.x_min, bbox2.x_min), |
| | y_min=min(bbox1.y_min, bbox2.y_min), |
| | x_max=max(bbox1.x_max, bbox2.x_max), |
| | y_max=max(bbox1.y_max, bbox2.y_max), |
| | normalized=bbox1.normalized, |
| | ) |
| |
|
| | def _generate_page_markdown( |
| | self, |
| | chunks: List[DocumentChunk], |
| | ) -> str: |
| | """Generate markdown for page chunks.""" |
| | lines: List[str] = [] |
| |
|
| | for chunk in chunks: |
| | |
| | lines.append(f"<!-- chunk:{chunk.chunk_id} -->") |
| |
|
| | |
| | if chunk.chunk_type == ChunkType.TITLE: |
| | lines.append(f"# {chunk.text}") |
| | elif chunk.chunk_type == ChunkType.HEADING: |
| | lines.append(f"## {chunk.text}") |
| | elif chunk.chunk_type == ChunkType.TABLE: |
| | if isinstance(chunk, TableChunk): |
| | lines.append(chunk.to_markdown()) |
| | else: |
| | lines.append(chunk.text) |
| | elif chunk.chunk_type == ChunkType.LIST: |
| | |
| | for item in chunk.text.split("\n"): |
| | if item.strip(): |
| | lines.append(f"- {item.strip()}") |
| | elif chunk.chunk_type == ChunkType.CODE: |
| | lines.append(f"```\n{chunk.text}\n```") |
| | elif chunk.chunk_type == ChunkType.FIGURE: |
| | lines.append(f"[Figure: {chunk.text}]") |
| | elif chunk.chunk_type == ChunkType.CHART: |
| | if isinstance(chunk, ChartChunk): |
| | lines.append(f"[Chart: {chunk.title or chunk.chart_type}]") |
| | lines.append(chunk.text) |
| | else: |
| | lines.append(f"[Chart: {chunk.text}]") |
| | else: |
| | lines.append(chunk.text) |
| |
|
| | lines.append("") |
| |
|
| | return "\n".join(lines) |
| |
|
| |
|
| | def parse_document( |
| | path: Union[str, Path], |
| | config: Optional[ParserConfig] = None, |
| | ) -> ParseResult: |
| | """ |
| | Convenience function to parse a document. |
| | |
| | Args: |
| | path: Path to document |
| | config: Optional parser configuration |
| | |
| | Returns: |
| | ParseResult with extracted chunks |
| | """ |
| | parser = DocumentParser(config=config) |
| | return parser.parse(path) |
| |
|