| import logging |
| import math |
| import sys |
| import time |
| from functools import partial |
| from pathlib import Path |
| from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union |
|
|
| from pydantic import BaseModel, ConfigDict, model_validator, validate_call |
|
|
| from docling.backend.abstract_backend import AbstractDocumentBackend |
| from docling.backend.asciidoc_backend import AsciiDocBackend |
| from docling.backend.docling_parse_v2_backend import DoclingParseV2DocumentBackend |
| from docling.backend.html_backend import HTMLDocumentBackend |
| from docling.backend.json.docling_json_backend import DoclingJSONBackend |
| from docling.backend.md_backend import MarkdownDocumentBackend |
| from docling.backend.msexcel_backend import MsExcelDocumentBackend |
| from docling.backend.mspowerpoint_backend import MsPowerpointDocumentBackend |
| from docling.backend.msword_backend import MsWordDocumentBackend |
| from docling.backend.xml.pubmed_backend import PubMedDocumentBackend |
| from docling.backend.xml.uspto_backend import PatentUsptoDocumentBackend |
| from docling.datamodel.base_models import ( |
| ConversionStatus, |
| DoclingComponentType, |
| DocumentStream, |
| ErrorItem, |
| InputFormat, |
| ) |
| from docling.datamodel.document import ( |
| ConversionResult, |
| InputDocument, |
| _DocumentConversionInput, |
| ) |
| from docling.datamodel.pipeline_options import PipelineOptions |
| from docling.datamodel.settings import ( |
| DEFAULT_PAGE_RANGE, |
| DocumentLimits, |
| PageRange, |
| settings, |
| ) |
| from docling.exceptions import ConversionError |
| from docling.pipeline.base_pipeline import BasePipeline |
| from docling.pipeline.simple_pipeline import SimplePipeline |
| from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline |
| from docling.utils.utils import chunkify |
|
|
| _log = logging.getLogger(__name__) |
|
|
|
|
| class FormatOption(BaseModel): |
| pipeline_cls: Type[BasePipeline] |
| pipeline_options: Optional[PipelineOptions] = None |
| backend: Type[AbstractDocumentBackend] |
|
|
| model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
| @model_validator(mode="after") |
| def set_optional_field_default(self) -> "FormatOption": |
| if self.pipeline_options is None: |
| self.pipeline_options = self.pipeline_cls.get_default_options() |
| return self |
|
|
|
|
| class ExcelFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[AbstractDocumentBackend] = MsExcelDocumentBackend |
|
|
|
|
| class WordFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[AbstractDocumentBackend] = MsWordDocumentBackend |
|
|
|
|
| class PowerpointFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[AbstractDocumentBackend] = MsPowerpointDocumentBackend |
|
|
|
|
| class MarkdownFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[AbstractDocumentBackend] = MarkdownDocumentBackend |
|
|
|
|
| class AsciiDocFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[AbstractDocumentBackend] = AsciiDocBackend |
|
|
|
|
| class HTMLFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[AbstractDocumentBackend] = HTMLDocumentBackend |
|
|
|
|
| class PatentUsptoFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[PatentUsptoDocumentBackend] = PatentUsptoDocumentBackend |
|
|
|
|
| class XMLPubMedFormatOption(FormatOption): |
| pipeline_cls: Type = SimplePipeline |
| backend: Type[AbstractDocumentBackend] = PubMedDocumentBackend |
|
|
|
|
| class ImageFormatOption(FormatOption): |
| pipeline_cls: Type = StandardPdfPipeline |
| backend: Type[AbstractDocumentBackend] = DoclingParseV2DocumentBackend |
|
|
|
|
| class PdfFormatOption(FormatOption): |
| pipeline_cls: Type = StandardPdfPipeline |
| backend: Type[AbstractDocumentBackend] = DoclingParseV2DocumentBackend |
|
|
|
|
| def _get_default_option(format: InputFormat) -> FormatOption: |
| format_to_default_options = { |
| InputFormat.XLSX: FormatOption( |
| pipeline_cls=SimplePipeline, backend=MsExcelDocumentBackend |
| ), |
| InputFormat.DOCX: FormatOption( |
| pipeline_cls=SimplePipeline, backend=MsWordDocumentBackend |
| ), |
| InputFormat.PPTX: FormatOption( |
| pipeline_cls=SimplePipeline, backend=MsPowerpointDocumentBackend |
| ), |
| InputFormat.MD: FormatOption( |
| pipeline_cls=SimplePipeline, backend=MarkdownDocumentBackend |
| ), |
| InputFormat.ASCIIDOC: FormatOption( |
| pipeline_cls=SimplePipeline, backend=AsciiDocBackend |
| ), |
| InputFormat.HTML: FormatOption( |
| pipeline_cls=SimplePipeline, backend=HTMLDocumentBackend |
| ), |
| InputFormat.XML_USPTO: FormatOption( |
| pipeline_cls=SimplePipeline, backend=PatentUsptoDocumentBackend |
| ), |
| InputFormat.XML_PUBMED: FormatOption( |
| pipeline_cls=SimplePipeline, backend=PubMedDocumentBackend |
| ), |
| InputFormat.IMAGE: FormatOption( |
| pipeline_cls=StandardPdfPipeline, backend=DoclingParseV2DocumentBackend |
| ), |
| InputFormat.PDF: FormatOption( |
| pipeline_cls=StandardPdfPipeline, backend=DoclingParseV2DocumentBackend |
| ), |
| InputFormat.JSON_DOCLING: FormatOption( |
| pipeline_cls=SimplePipeline, backend=DoclingJSONBackend |
| ), |
| } |
| if (options := format_to_default_options.get(format)) is not None: |
| return options |
| else: |
| raise RuntimeError(f"No default options configured for {format}") |
|
|
|
|
| class DocumentConverter: |
| _default_download_filename = "file" |
|
|
| def __init__( |
| self, |
| allowed_formats: Optional[List[InputFormat]] = None, |
| format_options: Optional[Dict[InputFormat, FormatOption]] = None, |
| ): |
| self.allowed_formats = ( |
| allowed_formats if allowed_formats is not None else [e for e in InputFormat] |
| ) |
| self.format_to_options = { |
| format: ( |
| _get_default_option(format=format) |
| if (custom_option := (format_options or {}).get(format)) is None |
| else custom_option |
| ) |
| for format in self.allowed_formats |
| } |
| self.initialized_pipelines: Dict[Type[BasePipeline], BasePipeline] = {} |
|
|
| def initialize_pipeline(self, format: InputFormat): |
| """Initialize the conversion pipeline for the selected format.""" |
| pipeline = self._get_pipeline(doc_format=format) |
| if pipeline is None: |
| raise ConversionError( |
| f"No pipeline could be initialized for format {format}" |
| ) |
|
|
| @validate_call(config=ConfigDict(strict=True)) |
| def convert( |
| self, |
| source: Union[Path, str, DocumentStream], |
| headers: Optional[Dict[str, str]] = None, |
| raises_on_error: bool = True, |
| max_num_pages: int = sys.maxsize, |
| max_file_size: int = sys.maxsize, |
| page_range: PageRange = DEFAULT_PAGE_RANGE, |
| ) -> ConversionResult: |
| all_res = self.convert_all( |
| source=[source], |
| raises_on_error=raises_on_error, |
| max_num_pages=max_num_pages, |
| max_file_size=max_file_size, |
| headers=headers, |
| page_range=page_range, |
| ) |
| return next(all_res) |
|
|
| @validate_call(config=ConfigDict(strict=True)) |
| def convert_all( |
| self, |
| source: Iterable[Union[Path, str, DocumentStream]], |
| headers: Optional[Dict[str, str]] = None, |
| raises_on_error: bool = True, |
| max_num_pages: int = sys.maxsize, |
| max_file_size: int = sys.maxsize, |
| page_range: PageRange = DEFAULT_PAGE_RANGE, |
| ) -> Iterator[ConversionResult]: |
| limits = DocumentLimits( |
| max_num_pages=max_num_pages, |
| max_file_size=max_file_size, |
| page_range=page_range, |
| ) |
| conv_input = _DocumentConversionInput( |
| path_or_stream_iterator=source, limits=limits, headers=headers |
| ) |
| conv_res_iter = self._convert(conv_input, raises_on_error=raises_on_error) |
|
|
| had_result = False |
| for conv_res in conv_res_iter: |
| had_result = True |
| if raises_on_error and conv_res.status not in { |
| ConversionStatus.SUCCESS, |
| ConversionStatus.PARTIAL_SUCCESS, |
| }: |
| raise ConversionError( |
| f"Conversion failed for: {conv_res.input.file} with status: {conv_res.status}" |
| ) |
| else: |
| yield conv_res |
|
|
| if not had_result and raises_on_error: |
| raise ConversionError( |
| f"Conversion failed because the provided file has no recognizable format or it wasn't in the list of allowed formats." |
| ) |
|
|
| def _convert( |
| self, conv_input: _DocumentConversionInput, raises_on_error: bool |
| ) -> Iterator[ConversionResult]: |
| start_time = time.monotonic() |
|
|
| for input_batch in chunkify( |
| conv_input.docs(self.format_to_options), |
| settings.perf.doc_batch_size, |
| ): |
| _log.info(f"Going to convert document batch...") |
|
|
| |
| |
| |
| |
| |
| |
|
|
| for item in map( |
| partial(self._process_document, raises_on_error=raises_on_error), |
| input_batch, |
| ): |
| elapsed = time.monotonic() - start_time |
| start_time = time.monotonic() |
| _log.info( |
| f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec." |
| ) |
| yield item |
|
|
| def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]: |
| fopt = self.format_to_options.get(doc_format) |
|
|
| if fopt is None: |
| return None |
| else: |
| pipeline_class = fopt.pipeline_cls |
| pipeline_options = fopt.pipeline_options |
|
|
| if pipeline_options is None: |
| return None |
| |
| if ( |
| pipeline_class not in self.initialized_pipelines |
| or self.initialized_pipelines[pipeline_class].pipeline_options |
| != pipeline_options |
| ): |
| self.initialized_pipelines[pipeline_class] = pipeline_class( |
| pipeline_options=pipeline_options |
| ) |
| return self.initialized_pipelines[pipeline_class] |
|
|
| def _process_document( |
| self, in_doc: InputDocument, raises_on_error: bool |
| ) -> ConversionResult: |
|
|
| valid = ( |
| self.allowed_formats is not None and in_doc.format in self.allowed_formats |
| ) |
| if valid: |
| conv_res = self._execute_pipeline(in_doc, raises_on_error=raises_on_error) |
| else: |
| error_message = f"File format not allowed: {in_doc.file}" |
| if raises_on_error: |
| raise ConversionError(error_message) |
| else: |
| error_item = ErrorItem( |
| component_type=DoclingComponentType.USER_INPUT, |
| module_name="", |
| error_message=error_message, |
| ) |
| conv_res = ConversionResult( |
| input=in_doc, status=ConversionStatus.SKIPPED, errors=[error_item] |
| ) |
|
|
| return conv_res |
|
|
| def _execute_pipeline( |
| self, in_doc: InputDocument, raises_on_error: bool |
| ) -> ConversionResult: |
| if in_doc.valid: |
| pipeline = self._get_pipeline(in_doc.format) |
| if pipeline is not None: |
| conv_res = pipeline.execute(in_doc, raises_on_error=raises_on_error) |
| else: |
| if raises_on_error: |
| raise ConversionError( |
| f"No pipeline could be initialized for {in_doc.file}." |
| ) |
| else: |
| conv_res = ConversionResult( |
| input=in_doc, |
| status=ConversionStatus.FAILURE, |
| ) |
| else: |
| if raises_on_error: |
| raise ConversionError(f"Input document {in_doc.file} is not valid.") |
|
|
| else: |
| |
| conv_res = ConversionResult( |
| input=in_doc, |
| status=ConversionStatus.FAILURE, |
| ) |
| |
|
|
| return conv_res |
|
|