| | import io |
| | import warnings |
| | from typing import Any, Iterable, List, Optional |
| | from urllib.parse import urlencode |
| |
|
| | from multidict import MultiDict, MultiDictProxy |
| |
|
| | from . import hdrs, multipart, payload |
| | from .helpers import guess_filename |
| | from .payload import Payload |
| |
|
| | __all__ = ("FormData",) |
| |
|
| |
|
| | class FormData: |
| | """Helper class for form body generation. |
| | |
| | Supports multipart/form-data and application/x-www-form-urlencoded. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | fields: Iterable[Any] = (), |
| | quote_fields: bool = True, |
| | charset: Optional[str] = None, |
| | *, |
| | default_to_multipart: bool = False, |
| | ) -> None: |
| | self._writer = multipart.MultipartWriter("form-data") |
| | self._fields: List[Any] = [] |
| | self._is_multipart = default_to_multipart |
| | self._quote_fields = quote_fields |
| | self._charset = charset |
| |
|
| | if isinstance(fields, dict): |
| | fields = list(fields.items()) |
| | elif not isinstance(fields, (list, tuple)): |
| | fields = (fields,) |
| | self.add_fields(*fields) |
| |
|
| | @property |
| | def is_multipart(self) -> bool: |
| | return self._is_multipart |
| |
|
| | def add_field( |
| | self, |
| | name: str, |
| | value: Any, |
| | *, |
| | content_type: Optional[str] = None, |
| | filename: Optional[str] = None, |
| | content_transfer_encoding: Optional[str] = None, |
| | ) -> None: |
| |
|
| | if isinstance(value, io.IOBase): |
| | self._is_multipart = True |
| | elif isinstance(value, (bytes, bytearray, memoryview)): |
| | msg = ( |
| | "In v4, passing bytes will no longer create a file field. " |
| | "Please explicitly use the filename parameter or pass a BytesIO object." |
| | ) |
| | if filename is None and content_transfer_encoding is None: |
| | warnings.warn(msg, DeprecationWarning) |
| | filename = name |
| |
|
| | type_options: MultiDict[str] = MultiDict({"name": name}) |
| | if filename is not None and not isinstance(filename, str): |
| | raise TypeError("filename must be an instance of str. Got: %s" % filename) |
| | if filename is None and isinstance(value, io.IOBase): |
| | filename = guess_filename(value, name) |
| | if filename is not None: |
| | type_options["filename"] = filename |
| | self._is_multipart = True |
| |
|
| | headers = {} |
| | if content_type is not None: |
| | if not isinstance(content_type, str): |
| | raise TypeError( |
| | "content_type must be an instance of str. Got: %s" % content_type |
| | ) |
| | headers[hdrs.CONTENT_TYPE] = content_type |
| | self._is_multipart = True |
| | if content_transfer_encoding is not None: |
| | if not isinstance(content_transfer_encoding, str): |
| | raise TypeError( |
| | "content_transfer_encoding must be an instance" |
| | " of str. Got: %s" % content_transfer_encoding |
| | ) |
| | msg = ( |
| | "content_transfer_encoding is deprecated. " |
| | "To maintain compatibility with v4 please pass a BytesPayload." |
| | ) |
| | warnings.warn(msg, DeprecationWarning) |
| | self._is_multipart = True |
| |
|
| | self._fields.append((type_options, headers, value)) |
| |
|
| | def add_fields(self, *fields: Any) -> None: |
| | to_add = list(fields) |
| |
|
| | while to_add: |
| | rec = to_add.pop(0) |
| |
|
| | if isinstance(rec, io.IOBase): |
| | k = guess_filename(rec, "unknown") |
| | self.add_field(k, rec) |
| |
|
| | elif isinstance(rec, (MultiDictProxy, MultiDict)): |
| | to_add.extend(rec.items()) |
| |
|
| | elif isinstance(rec, (list, tuple)) and len(rec) == 2: |
| | k, fp = rec |
| | self.add_field(k, fp) |
| |
|
| | else: |
| | raise TypeError( |
| | "Only io.IOBase, multidict and (name, file) " |
| | "pairs allowed, use .add_field() for passing " |
| | "more complex parameters, got {!r}".format(rec) |
| | ) |
| |
|
| | def _gen_form_urlencoded(self) -> payload.BytesPayload: |
| | |
| | data = [] |
| | for type_options, _, value in self._fields: |
| | data.append((type_options["name"], value)) |
| |
|
| | charset = self._charset if self._charset is not None else "utf-8" |
| |
|
| | if charset == "utf-8": |
| | content_type = "application/x-www-form-urlencoded" |
| | else: |
| | content_type = "application/x-www-form-urlencoded; charset=%s" % charset |
| |
|
| | return payload.BytesPayload( |
| | urlencode(data, doseq=True, encoding=charset).encode(), |
| | content_type=content_type, |
| | ) |
| |
|
| | def _gen_form_data(self) -> multipart.MultipartWriter: |
| | """Encode a list of fields using the multipart/form-data MIME format""" |
| | for dispparams, headers, value in self._fields: |
| | try: |
| | if hdrs.CONTENT_TYPE in headers: |
| | part = payload.get_payload( |
| | value, |
| | content_type=headers[hdrs.CONTENT_TYPE], |
| | headers=headers, |
| | encoding=self._charset, |
| | ) |
| | else: |
| | part = payload.get_payload( |
| | value, headers=headers, encoding=self._charset |
| | ) |
| | except Exception as exc: |
| | raise TypeError( |
| | "Can not serialize value type: %r\n " |
| | "headers: %r\n value: %r" % (type(value), headers, value) |
| | ) from exc |
| |
|
| | if dispparams: |
| | part.set_content_disposition( |
| | "form-data", quote_fields=self._quote_fields, **dispparams |
| | ) |
| | |
| | |
| | assert part.headers is not None |
| | part.headers.popall(hdrs.CONTENT_LENGTH, None) |
| |
|
| | self._writer.append_payload(part) |
| |
|
| | self._fields.clear() |
| | return self._writer |
| |
|
| | def __call__(self) -> Payload: |
| | if self._is_multipart: |
| | return self._gen_form_data() |
| | else: |
| | return self._gen_form_urlencoded() |
| |
|