File size: 6,861 Bytes
bbfde3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import base64
import binascii
from typing import TYPE_CHECKING

from .exceptions import DecodeError

if TYPE_CHECKING:  # pragma: no cover
    from typing import Protocol, TypeVar

    _T_contra = TypeVar("_T_contra", contravariant=True)

    class SupportsWrite(Protocol[_T_contra]):
        def write(self, __b: _T_contra) -> object: ...

        # No way to specify optional methods. See
        # https://github.com/python/typing/issues/601
        # close() [Optional]
        # finalize() [Optional]


class Base64Decoder:
    """This object provides an interface to decode a stream of Base64 data.  It

    is instantiated with an "underlying object", and whenever a write()

    operation is performed, it will decode the incoming data as Base64, and

    call write() on the underlying object.  This is primarily used for decoding

    form data encoded as Base64, but can be used for other purposes::



        from python_multipart.decoders import Base64Decoder

        fd = open("notb64.txt", "wb")

        decoder = Base64Decoder(fd)

        try:

            decoder.write("Zm9vYmFy")       # "foobar" in Base64

            decoder.finalize()

        finally:

            decoder.close()



        # The contents of "notb64.txt" should be "foobar".



    This object will also pass all finalize() and close() calls to the

    underlying object, if the underlying object supports them.



    Note that this class maintains a cache of base64 chunks, so that a write of

    arbitrary size can be performed.  You must call :meth:`finalize` on this

    object after all writes are completed to ensure that all data is flushed

    to the underlying object.



    :param underlying: the underlying object to pass writes to

    """

    def __init__(self, underlying: "SupportsWrite[bytes]") -> None:
        self.cache = bytearray()
        self.underlying = underlying

    def write(self, data: bytes) -> int:
        """Takes any input data provided, decodes it as base64, and passes it

        on to the underlying object.  If the data provided is invalid base64

        data, then this method will raise

        a :class:`python_multipart.exceptions.DecodeError`



        :param data: base64 data to decode

        """

        # Prepend any cache info to our data.
        if len(self.cache) > 0:
            data = bytes(self.cache) + data

        # Slice off a string that's a multiple of 4.
        decode_len = (len(data) // 4) * 4
        val = data[:decode_len]

        # Decode and write, if we have any.
        if len(val) > 0:
            try:
                decoded = base64.b64decode(val)
            except binascii.Error:
                raise DecodeError("There was an error raised while decoding base64-encoded data.")

            self.underlying.write(decoded)

        # Get the remaining bytes and save in our cache.
        remaining_len = len(data) % 4
        if remaining_len > 0:
            self.cache[:] = data[-remaining_len:]
        else:
            self.cache[:] = b""

        # Return the length of the data to indicate no error.
        return len(data)

    def close(self) -> None:
        """Close this decoder.  If the underlying object has a `close()`

        method, this function will call it.

        """
        if hasattr(self.underlying, "close"):
            self.underlying.close()

    def finalize(self) -> None:
        """Finalize this object.  This should be called when no more data

        should be written to the stream.  This function can raise a

        :class:`python_multipart.exceptions.DecodeError` if there is some remaining

        data in the cache.



        If the underlying object has a `finalize()` method, this function will

        call it.

        """
        if len(self.cache) > 0:
            raise DecodeError(
                "There are %d bytes remaining in the Base64Decoder cache when finalize() is called" % len(self.cache)
            )

        if hasattr(self.underlying, "finalize"):
            self.underlying.finalize()

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(underlying={self.underlying!r})"


class QuotedPrintableDecoder:
    """This object provides an interface to decode a stream of quoted-printable

    data.  It is instantiated with an "underlying object", in the same manner

    as the :class:`python_multipart.decoders.Base64Decoder` class.  This class behaves

    in exactly the same way, including maintaining a cache of quoted-printable

    chunks.



    :param underlying: the underlying object to pass writes to

    """

    def __init__(self, underlying: "SupportsWrite[bytes]") -> None:
        self.cache = b""
        self.underlying = underlying

    def write(self, data: bytes) -> int:
        """Takes any input data provided, decodes it as quoted-printable, and

        passes it on to the underlying object.



        :param data: quoted-printable data to decode

        """
        # Prepend any cache info to our data.
        if len(self.cache) > 0:
            data = self.cache + data

        # If the last 2 characters have an '=' sign in it, then we won't be
        # able to decode the encoded value and we'll need to save it for the
        # next decoding step.
        if data[-2:].find(b"=") != -1:
            enc, rest = data[:-2], data[-2:]
        else:
            enc = data
            rest = b""

        # Encode and write, if we have data.
        if len(enc) > 0:
            self.underlying.write(binascii.a2b_qp(enc))

        # Save remaining in cache.
        self.cache = rest
        return len(data)

    def close(self) -> None:
        """Close this decoder.  If the underlying object has a `close()`

        method, this function will call it.

        """
        if hasattr(self.underlying, "close"):
            self.underlying.close()

    def finalize(self) -> None:
        """Finalize this object.  This should be called when no more data

        should be written to the stream.  This function will not raise any

        exceptions, but it may write more data to the underlying object if

        there is data remaining in the cache.



        If the underlying object has a `finalize()` method, this function will

        call it.

        """
        # If we have a cache, write and then remove it.
        if len(self.cache) > 0:  # pragma: no cover
            self.underlying.write(binascii.a2b_qp(self.cache))
            self.cache = b""

        # Finalize our underlying stream.
        if hasattr(self.underlying, "finalize"):
            self.underlying.finalize()

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(underlying={self.underlying!r})"