|
|
import re |
|
|
import sys |
|
|
import io |
|
|
from typing import List, Union, BinaryIO, Optional, Any |
|
|
from markitdown import MarkItDown as BaseMarkItDown |
|
|
from markitdown._stream_info import StreamInfo |
|
|
from markitdown._base_converter import DocumentConverterResult |
|
|
|
|
|
from markitdown._exceptions import ( |
|
|
FileConversionException, |
|
|
UnsupportedFormatException, |
|
|
FailedConversionAttempt, |
|
|
) |
|
|
|
|
|
from ._audio_converter import AudioConverter |
|
|
from ._image_converter import ImageConverter |
|
|
|
|
|
class MarkItDown(BaseMarkItDown): |
|
|
"""(In preview) An extremely simple text-based document reader, suitable for LLM use. |
|
|
This reader will convert common file-types or webpages to Markdown.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
**kwargs, |
|
|
): |
|
|
super().__init__(**kwargs) |
|
|
self.register_converter(AudioConverter()) |
|
|
self.register_converter(ImageConverter()) |
|
|
|
|
|
def _convert( |
|
|
self, *, file_stream: BinaryIO, stream_info_guesses: List[StreamInfo], **kwargs |
|
|
) -> DocumentConverterResult: |
|
|
res: Union[None, DocumentConverterResult] = None |
|
|
|
|
|
|
|
|
failed_attempts: List[FailedConversionAttempt] = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sorted_registrations = sorted(self._converters, key=lambda x: x.priority) |
|
|
|
|
|
|
|
|
cur_pos = file_stream.tell() |
|
|
|
|
|
for stream_info in stream_info_guesses + [StreamInfo()]: |
|
|
for converter_registration in sorted_registrations: |
|
|
converter = converter_registration.converter |
|
|
|
|
|
assert ( |
|
|
cur_pos == file_stream.tell() |
|
|
), f"File stream position should NOT change between guess iterations" |
|
|
|
|
|
_kwargs = {k: v for k, v in kwargs.items()} |
|
|
|
|
|
|
|
|
if "llm_client" not in _kwargs and self._llm_client is not None: |
|
|
_kwargs["llm_client"] = self._llm_client |
|
|
|
|
|
if "llm_model" not in _kwargs and self._llm_model is not None: |
|
|
_kwargs["llm_model"] = self._llm_model |
|
|
|
|
|
if "style_map" not in _kwargs and self._style_map is not None: |
|
|
_kwargs["style_map"] = self._style_map |
|
|
|
|
|
if "exiftool_path" not in _kwargs and self._exiftool_path is not None: |
|
|
_kwargs["exiftool_path"] = self._exiftool_path |
|
|
|
|
|
|
|
|
_kwargs["_parent_converters"] = self._converters |
|
|
|
|
|
|
|
|
if stream_info is not None: |
|
|
if stream_info.extension is not None: |
|
|
_kwargs["file_extension"] = stream_info.extension |
|
|
|
|
|
if stream_info.url is not None: |
|
|
_kwargs["url"] = stream_info.url |
|
|
|
|
|
|
|
|
_accepts = False |
|
|
try: |
|
|
_accepts = converter.accepts(file_stream, stream_info, **_kwargs) |
|
|
except NotImplementedError: |
|
|
pass |
|
|
|
|
|
|
|
|
assert ( |
|
|
cur_pos == file_stream.tell() |
|
|
), f"{type(converter).__name__}.accept() should NOT change the file_stream position" |
|
|
|
|
|
|
|
|
if _accepts: |
|
|
try: |
|
|
res = converter.convert(file_stream, stream_info, **_kwargs) |
|
|
except Exception: |
|
|
failed_attempts.append( |
|
|
FailedConversionAttempt( |
|
|
converter=converter, exc_info=sys.exc_info() |
|
|
) |
|
|
) |
|
|
finally: |
|
|
file_stream.seek(cur_pos) |
|
|
|
|
|
if res is not None: |
|
|
if isinstance(res.text_content, str): |
|
|
|
|
|
res.text_content = "\n".join( |
|
|
[line.rstrip() for line in re.split(r"\r?\n", res.text_content)] |
|
|
) |
|
|
res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) |
|
|
return res |
|
|
|
|
|
|
|
|
if len(failed_attempts) > 0: |
|
|
raise FileConversionException(attempts=failed_attempts) |
|
|
|
|
|
|
|
|
raise UnsupportedFormatException( |
|
|
f"Could not convert stream to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported." |
|
|
) |
|
|
|
|
|
def convert_stream( |
|
|
self, |
|
|
stream: BinaryIO, |
|
|
*, |
|
|
stream_info: Optional[StreamInfo] = None, |
|
|
file_extension: Optional[str] = None, |
|
|
url: Optional[str] = None, |
|
|
**kwargs: Any, |
|
|
) -> DocumentConverterResult: |
|
|
guesses: List[StreamInfo] = [] |
|
|
|
|
|
|
|
|
base_guess = None |
|
|
if stream_info is not None or file_extension is not None or url is not None: |
|
|
|
|
|
if stream_info is None: |
|
|
base_guess = StreamInfo() |
|
|
else: |
|
|
base_guess = stream_info |
|
|
|
|
|
if file_extension is not None: |
|
|
|
|
|
assert base_guess is not None |
|
|
base_guess = base_guess.copy_and_update(extension=file_extension) |
|
|
|
|
|
if url is not None: |
|
|
|
|
|
assert base_guess is not None |
|
|
base_guess = base_guess.copy_and_update(url=url) |
|
|
|
|
|
|
|
|
if not hasattr(stream, "seekable") or not stream.seekable(): |
|
|
buffer = io.BytesIO() |
|
|
while True: |
|
|
chunk = stream.read(4096) |
|
|
if not chunk: |
|
|
break |
|
|
buffer.write(chunk) |
|
|
buffer.seek(0) |
|
|
stream = buffer |
|
|
|
|
|
|
|
|
guesses = self._get_stream_info_guesses( |
|
|
file_stream=stream, base_guess=base_guess or StreamInfo() |
|
|
) |
|
|
return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs) |
|
|
|