| import base64
|
| import binascii
|
| from typing import TYPE_CHECKING
|
|
|
| from .exceptions import DecodeError
|
|
|
| if TYPE_CHECKING:
|
| from typing import Protocol, TypeVar
|
|
|
| _T_contra = TypeVar("_T_contra", contravariant=True)
|
|
|
| class SupportsWrite(Protocol[_T_contra]):
|
| def write(self, __b: _T_contra) -> object: ...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class Base64Decoder:
|
| """This object provides an interface to decode a stream of Base64 data. It
|
| is instantiated with an "underlying object", and whenever a write()
|
| operation is performed, it will decode the incoming data as Base64, and
|
| call write() on the underlying object. This is primarily used for decoding
|
| form data encoded as Base64, but can be used for other purposes::
|
|
|
| from python_multipart.decoders import Base64Decoder
|
| fd = open("notb64.txt", "wb")
|
| decoder = Base64Decoder(fd)
|
| try:
|
| decoder.write("Zm9vYmFy") # "foobar" in Base64
|
| decoder.finalize()
|
| finally:
|
| decoder.close()
|
|
|
| # The contents of "notb64.txt" should be "foobar".
|
|
|
| This object will also pass all finalize() and close() calls to the
|
| underlying object, if the underlying object supports them.
|
|
|
| Note that this class maintains a cache of base64 chunks, so that a write of
|
| arbitrary size can be performed. You must call :meth:`finalize` on this
|
| object after all writes are completed to ensure that all data is flushed
|
| to the underlying object.
|
|
|
| :param underlying: the underlying object to pass writes to
|
| """
|
|
|
| def __init__(self, underlying: "SupportsWrite[bytes]") -> None:
|
| self.cache = bytearray()
|
| self.underlying = underlying
|
|
|
| def write(self, data: bytes) -> int:
|
| """Takes any input data provided, decodes it as base64, and passes it
|
| on to the underlying object. If the data provided is invalid base64
|
| data, then this method will raise
|
| a :class:`python_multipart.exceptions.DecodeError`
|
|
|
| :param data: base64 data to decode
|
| """
|
|
|
|
|
| if len(self.cache) > 0:
|
| data = bytes(self.cache) + data
|
|
|
|
|
| decode_len = (len(data) // 4) * 4
|
| val = data[:decode_len]
|
|
|
|
|
| if len(val) > 0:
|
| try:
|
| decoded = base64.b64decode(val)
|
| except binascii.Error:
|
| raise DecodeError("There was an error raised while decoding base64-encoded data.")
|
|
|
| self.underlying.write(decoded)
|
|
|
|
|
| remaining_len = len(data) % 4
|
| if remaining_len > 0:
|
| self.cache[:] = data[-remaining_len:]
|
| else:
|
| self.cache[:] = b""
|
|
|
|
|
| return len(data)
|
|
|
| def close(self) -> None:
|
| """Close this decoder. If the underlying object has a `close()`
|
| method, this function will call it.
|
| """
|
| if hasattr(self.underlying, "close"):
|
| self.underlying.close()
|
|
|
| def finalize(self) -> None:
|
| """Finalize this object. This should be called when no more data
|
| should be written to the stream. This function can raise a
|
| :class:`python_multipart.exceptions.DecodeError` if there is some remaining
|
| data in the cache.
|
|
|
| If the underlying object has a `finalize()` method, this function will
|
| call it.
|
| """
|
| if len(self.cache) > 0:
|
| raise DecodeError(
|
| "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache)
|
| )
|
|
|
| if hasattr(self.underlying, "finalize"):
|
| self.underlying.finalize()
|
|
|
| def __repr__(self) -> str:
|
| return f"{self.__class__.__name__}(underlying={self.underlying!r})"
|
|
|
|
|
| class QuotedPrintableDecoder:
|
| """This object provides an interface to decode a stream of quoted-printable
|
| data. It is instantiated with an "underlying object", in the same manner
|
| as the :class:`python_multipart.decoders.Base64Decoder` class. This class behaves
|
| in exactly the same way, including maintaining a cache of quoted-printable
|
| chunks.
|
|
|
| :param underlying: the underlying object to pass writes to
|
| """
|
|
|
| def __init__(self, underlying: "SupportsWrite[bytes]") -> None:
|
| self.cache = b""
|
| self.underlying = underlying
|
|
|
| def write(self, data: bytes) -> int:
|
| """Takes any input data provided, decodes it as quoted-printable, and
|
| passes it on to the underlying object.
|
|
|
| :param data: quoted-printable data to decode
|
| """
|
|
|
| if len(self.cache) > 0:
|
| data = self.cache + data
|
|
|
|
|
|
|
|
|
| if data[-2:].find(b"=") != -1:
|
| enc, rest = data[:-2], data[-2:]
|
| else:
|
| enc = data
|
| rest = b""
|
|
|
|
|
| if len(enc) > 0:
|
| self.underlying.write(binascii.a2b_qp(enc))
|
|
|
|
|
| self.cache = rest
|
| return len(data)
|
|
|
| def close(self) -> None:
|
| """Close this decoder. If the underlying object has a `close()`
|
| method, this function will call it.
|
| """
|
| if hasattr(self.underlying, "close"):
|
| self.underlying.close()
|
|
|
| def finalize(self) -> None:
|
| """Finalize this object. This should be called when no more data
|
| should be written to the stream. This function will not raise any
|
| exceptions, but it may write more data to the underlying object if
|
| there is data remaining in the cache.
|
|
|
| If the underlying object has a `finalize()` method, this function will
|
| call it.
|
| """
|
|
|
| if len(self.cache) > 0:
|
| self.underlying.write(binascii.a2b_qp(self.cache))
|
| self.cache = b""
|
|
|
|
|
| if hasattr(self.underlying, "finalize"):
|
| self.underlying.finalize()
|
|
|
| def __repr__(self) -> str:
|
| return f"{self.__class__.__name__}(underlying={self.underlying!r})"
|
|
|