| from __future__ import annotations |
|
|
| import re |
| import struct |
| import sys |
| from codecs import getincrementaldecoder |
| from collections.abc import Callable, Generator, Mapping, Sequence |
| from contextlib import contextmanager |
| from datetime import date, datetime, timedelta, timezone |
| from io import BytesIO |
| from typing import IO, TYPE_CHECKING, Any, TypeVar, cast, overload |
|
|
| from ._types import ( |
| CBORDecodeEOF, |
| CBORDecodeValueError, |
| CBORSimpleValue, |
| CBORTag, |
| FrozenDict, |
| break_marker, |
| undefined, |
| ) |
|
|
| if TYPE_CHECKING: |
| from decimal import Decimal |
| from email.message import Message |
| from fractions import Fraction |
| from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network |
| from typing import Literal |
| from uuid import UUID |
|
|
| T = TypeVar("T") |
|
|
| timestamp_re = re.compile( |
| r"^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)" r"(?:\.(\d{1,6})\d*)?(?:Z|([+-])(\d\d):(\d\d))$" |
| ) |
| incremental_utf8_decoder = getincrementaldecoder("utf-8") |
|
|
|
|
| class CBORDecoder: |
| """ |
| The CBORDecoder class implements a fully featured `CBOR`_ decoder with |
| several extensions for handling shared references, big integers, rational |
| numbers and so on. Typically the class is not used directly, but the |
| :func:`load` and :func:`loads` functions are called to indirectly construct |
| and use the class. |
| |
| When the class is constructed manually, the main entry points are |
| :meth:`decode` and :meth:`decode_from_bytes`. |
| |
| .. _CBOR: https://cbor.io/ |
| """ |
|
|
| __slots__ = ( |
| "_tag_hook", |
| "_object_hook", |
| "_share_index", |
| "_shareables", |
| "_fp", |
| "_fp_read", |
| "_immutable", |
| "_str_errors", |
| "_stringref_namespace", |
| "_decode_depth", |
| ) |
|
|
| _fp: IO[bytes] |
| _fp_read: Callable[[int], bytes] |
|
|
| def __init__( |
| self, |
| fp: IO[bytes], |
| tag_hook: Callable[[CBORDecoder, CBORTag], Any] | None = None, |
| object_hook: Callable[[CBORDecoder, dict[Any, Any]], Any] | None = None, |
| str_errors: Literal["strict", "error", "replace"] = "strict", |
| ): |
| """ |
| :param fp: |
| the file to read from (any file-like object opened for reading in binary |
| mode) |
| :param tag_hook: |
| callable that takes 2 arguments: the decoder instance, and the |
| :class:`.CBORTag` to be decoded. This callback is invoked for any tags |
| for which there is no built-in decoder. The return value is substituted |
| for the :class:`.CBORTag` object in the deserialized output |
| :param object_hook: |
| callable that takes 2 arguments: the decoder instance, and a |
| dictionary. This callback is invoked for each deserialized |
| :class:`dict` object. The return value is substituted for the dict in |
| the deserialized output. |
| :param str_errors: |
| determines how to handle unicode decoding errors (see the `Error Handlers`_ |
| section in the standard library documentation for details) |
| |
| .. _Error Handlers: https://docs.python.org/3/library/codecs.html#error-handlers |
| |
| """ |
| self.fp = fp |
| self.tag_hook = tag_hook |
| self.object_hook = object_hook |
| self.str_errors = str_errors |
| self._share_index: int | None = None |
| self._shareables: list[object] = [] |
| self._stringref_namespace: list[str | bytes] | None = None |
| self._immutable = False |
| self._decode_depth = 0 |
|
|
| @property |
| def immutable(self) -> bool: |
| """ |
| Used by decoders to check if the calling context requires an immutable |
| type. Object_hook or tag_hook should raise an exception if this flag |
| is set unless the result can be safely used as a dict key. |
| """ |
| return self._immutable |
|
|
| @property |
| def fp(self) -> IO[bytes]: |
| return self._fp |
|
|
| @fp.setter |
| def fp(self, value: IO[bytes]) -> None: |
| try: |
| if not callable(value.read): |
| raise ValueError("fp.read is not callable") |
| except AttributeError: |
| raise ValueError("fp object has no read method") |
| else: |
| self._fp = value |
| self._fp_read = value.read |
|
|
| @property |
| def tag_hook(self) -> Callable[[CBORDecoder, CBORTag], Any] | None: |
| return self._tag_hook |
|
|
| @tag_hook.setter |
| def tag_hook(self, value: Callable[[CBORDecoder, CBORTag], Any] | None) -> None: |
| if value is None or callable(value): |
| self._tag_hook = value |
| else: |
| raise ValueError("tag_hook must be None or a callable") |
|
|
| @property |
| def object_hook(self) -> Callable[[CBORDecoder, dict[Any, Any]], Any] | None: |
| return self._object_hook |
|
|
| @object_hook.setter |
| def object_hook(self, value: Callable[[CBORDecoder, dict[Any, Any]], Any] | None) -> None: |
| if value is None or callable(value): |
| self._object_hook = value |
| else: |
| raise ValueError("object_hook must be None or a callable") |
|
|
| @property |
| def str_errors(self) -> Literal["strict", "error", "replace"]: |
| return self._str_errors |
|
|
| @str_errors.setter |
| def str_errors(self, value: Literal["strict", "error", "replace"]) -> None: |
| if value in ("strict", "error", "replace"): |
| self._str_errors = value |
| else: |
| raise ValueError( |
| f"invalid str_errors value {value!r} (must be one of 'strict', " |
| "'error', or 'replace')" |
| ) |
|
|
| def set_shareable(self, value: T) -> T: |
| """ |
| Set the shareable value for the last encountered shared value marker, |
| if any. If the current shared index is ``None``, nothing is done. |
| |
| :param value: the shared value |
| :returns: the shared value to permit chaining |
| """ |
| if self._share_index is not None: |
| self._shareables[self._share_index] = value |
|
|
| return value |
|
|
| def _stringref_namespace_add(self, string: str | bytes, length: int) -> None: |
| if self._stringref_namespace is not None: |
| next_index = len(self._stringref_namespace) |
| if next_index < 24: |
| is_referenced = length >= 3 |
| elif next_index < 256: |
| is_referenced = length >= 4 |
| elif next_index < 65536: |
| is_referenced = length >= 5 |
| elif next_index < 4294967296: |
| is_referenced = length >= 7 |
| else: |
| is_referenced = length >= 11 |
|
|
| if is_referenced: |
| self._stringref_namespace.append(string) |
|
|
| def read(self, amount: int) -> bytes: |
| """ |
| Read bytes from the data stream. |
| |
| :param int amount: the number of bytes to read |
| """ |
| data = self._fp_read(amount) |
| if len(data) < amount: |
| raise CBORDecodeEOF( |
| f"premature end of stream (expected to read {amount} bytes, got {len(data)} " |
| "instead)" |
| ) |
|
|
| return data |
|
|
| def _decode(self, immutable: bool = False, unshared: bool = False) -> Any: |
| if immutable: |
| old_immutable = self._immutable |
| self._immutable = True |
| if unshared: |
| old_index = self._share_index |
| self._share_index = None |
| try: |
| initial_byte = self.read(1)[0] |
| major_type = initial_byte >> 5 |
| subtype = initial_byte & 31 |
| decoder = major_decoders[major_type] |
| return decoder(self, subtype) |
| finally: |
| if immutable: |
| self._immutable = old_immutable |
| if unshared: |
| self._share_index = old_index |
|
|
| @contextmanager |
| def _decoding_context(self) -> Generator[None]: |
| """ |
| Context manager for tracking decode depth and clearing shared state. |
| |
| Shared state is cleared at the end of each top-level decode to prevent |
| shared references from leaking between independent decode operations. |
| Nested calls (from hooks) must preserve the state. |
| """ |
| self._decode_depth += 1 |
| try: |
| yield |
| finally: |
| self._decode_depth -= 1 |
| assert self._decode_depth >= 0 |
| if self._decode_depth == 0: |
| self._shareables.clear() |
| self._share_index = None |
|
|
| def decode(self) -> object: |
| """ |
| Decode the next value from the stream. |
| |
| :raises CBORDecodeError: if there is any problem decoding the stream |
| """ |
| with self._decoding_context(): |
| return self._decode() |
|
|
| def decode_from_bytes(self, buf: bytes) -> object: |
| """ |
| Wrap the given bytestring as a file and call :meth:`decode` with it as |
| the argument. |
| |
| This method was intended to be used from the ``tag_hook`` hook when an |
| object needs to be decoded separately from the rest but while still |
| taking advantage of the shared value registry. |
| """ |
| with self._decoding_context(): |
| with BytesIO(buf) as fp: |
| old_fp = self.fp |
| self.fp = fp |
| retval = self._decode() |
| self.fp = old_fp |
| return retval |
|
|
| @overload |
| def _decode_length(self, subtype: int) -> int: ... |
|
|
| @overload |
| def _decode_length(self, subtype: int, allow_indefinite: Literal[True]) -> int | None: ... |
|
|
| def _decode_length(self, subtype: int, allow_indefinite: bool = False) -> int | None: |
| if subtype < 24: |
| return subtype |
| elif subtype == 24: |
| return self.read(1)[0] |
| elif subtype == 25: |
| return cast(int, struct.unpack(">H", self.read(2))[0]) |
| elif subtype == 26: |
| return cast(int, struct.unpack(">L", self.read(4))[0]) |
| elif subtype == 27: |
| return cast(int, struct.unpack(">Q", self.read(8))[0]) |
| elif subtype == 31 and allow_indefinite: |
| return None |
| else: |
| raise CBORDecodeValueError(f"unknown unsigned integer subtype 0x{subtype:x}") |
|
|
| def decode_uint(self, subtype: int) -> int: |
| |
| return self.set_shareable(self._decode_length(subtype)) |
|
|
| def decode_negint(self, subtype: int) -> int: |
| |
| return self.set_shareable(-self._decode_length(subtype) - 1) |
|
|
| def decode_bytestring(self, subtype: int) -> bytes: |
| |
| length = self._decode_length(subtype, allow_indefinite=True) |
| if length is None: |
| |
| buf: list[bytes] = [] |
| while True: |
| initial_byte = self.read(1)[0] |
| if initial_byte == 0xFF: |
| result = b"".join(buf) |
| break |
| elif initial_byte >> 5 == 2: |
| length = self._decode_length(initial_byte & 0x1F) |
| if length is None or length > sys.maxsize: |
| raise CBORDecodeValueError( |
| f"invalid length for indefinite bytestring chunk 0x{length:x}" |
| ) |
| value = self.read(length) |
| buf.append(value) |
| else: |
| raise CBORDecodeValueError( |
| "non-bytestring found in indefinite length bytestring" |
| ) |
| else: |
| if length > sys.maxsize: |
| raise CBORDecodeValueError(f"invalid length for bytestring 0x{length:x}") |
| elif length <= 65536: |
| result = self.read(length) |
| else: |
| |
| left = length |
| buffer = bytearray() |
| while left: |
| chunk_size = min(left, 65536) |
| buffer.extend(self.read(chunk_size)) |
| left -= chunk_size |
|
|
| result = bytes(buffer) |
|
|
| self._stringref_namespace_add(result, length) |
|
|
| return self.set_shareable(result) |
|
|
| def decode_string(self, subtype: int) -> str: |
| |
| length = self._decode_length(subtype, allow_indefinite=True) |
| if length is None: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| buf: list[str] = [] |
| while True: |
| initial_byte = self.read(1)[0] |
| if initial_byte == 0xFF: |
| result = "".join(buf) |
| break |
| elif initial_byte >> 5 == 3: |
| length = self._decode_length(initial_byte & 0x1F) |
| if length is None or length > sys.maxsize: |
| raise CBORDecodeValueError( |
| f"invalid length for indefinite string chunk 0x{length:x}" |
| ) |
|
|
| try: |
| value = self.read(length).decode("utf-8", self._str_errors) |
| except UnicodeDecodeError as exc: |
| raise CBORDecodeValueError("error decoding unicode string") from exc |
|
|
| buf.append(value) |
| else: |
| raise CBORDecodeValueError("non-string found in indefinite length string") |
| else: |
| if length > sys.maxsize: |
| raise CBORDecodeValueError(f"invalid length for string 0x{length:x}") |
|
|
| if length <= 65536: |
| try: |
| result = self.read(length).decode("utf-8", self._str_errors) |
| except UnicodeDecodeError as exc: |
| raise CBORDecodeValueError("error decoding unicode string") from exc |
| else: |
| |
| codec = incremental_utf8_decoder(self._str_errors) |
| left = length |
| result = "" |
| while left: |
| chunk_size = min(left, 65536) |
| final = left <= chunk_size |
| try: |
| result += codec.decode(self.read(chunk_size), final) |
| except UnicodeDecodeError as exc: |
| raise CBORDecodeValueError("error decoding unicode string") from exc |
|
|
| left -= chunk_size |
|
|
| self._stringref_namespace_add(result, length) |
|
|
| return self.set_shareable(result) |
|
|
| def decode_array(self, subtype: int) -> Sequence[Any]: |
| |
| length = self._decode_length(subtype, allow_indefinite=True) |
| if length is None: |
| |
| items: list[Any] = [] |
| if not self._immutable: |
| self.set_shareable(items) |
| while True: |
| value = self._decode(unshared=True) |
| if value is break_marker: |
| break |
| else: |
| items.append(value) |
| else: |
| if length > sys.maxsize: |
| raise CBORDecodeValueError(f"invalid length for array 0x{length:x}") |
|
|
| items = [] |
| if not self._immutable: |
| self.set_shareable(items) |
|
|
| for index in range(length): |
| items.append(self._decode(unshared=True)) |
|
|
| if self._immutable: |
| items_tuple = tuple(items) |
| self.set_shareable(items_tuple) |
| return items_tuple |
|
|
| return items |
|
|
| def decode_map(self, subtype: int) -> Mapping[Any, Any]: |
| |
| length = self._decode_length(subtype, allow_indefinite=True) |
| if length is None: |
| |
| dictionary: dict[Any, Any] = {} |
| self.set_shareable(dictionary) |
| while True: |
| key = self._decode(immutable=True, unshared=True) |
| if key is break_marker: |
| break |
| else: |
| dictionary[key] = self._decode(unshared=True) |
| else: |
| dictionary = {} |
| self.set_shareable(dictionary) |
| for _ in range(length): |
| key = self._decode(immutable=True, unshared=True) |
| dictionary[key] = self._decode(unshared=True) |
|
|
| if self._object_hook: |
| dictionary = self._object_hook(self, dictionary) |
| self.set_shareable(dictionary) |
| elif self._immutable: |
| frozen_dict = FrozenDict(dictionary) |
| self.set_shareable(dictionary) |
| return frozen_dict |
|
|
| return dictionary |
|
|
| def decode_semantic(self, subtype: int) -> Any: |
| |
| tagnum = self._decode_length(subtype) |
| if semantic_decoder := semantic_decoders.get(tagnum): |
| return semantic_decoder(self) |
|
|
| tag = CBORTag(tagnum, None) |
| self.set_shareable(tag) |
| tag.value = self._decode(unshared=True) |
| if self._tag_hook: |
| tag = self._tag_hook(self, tag) |
|
|
| return self.set_shareable(tag) |
|
|
| def decode_special(self, subtype: int) -> Any: |
| |
| if subtype < 20: |
| |
| return CBORSimpleValue(subtype) |
|
|
| |
| try: |
| return special_decoders[subtype](self) |
| except KeyError as e: |
| raise CBORDecodeValueError( |
| f"Undefined Reserved major type 7 subtype 0x{subtype:x}" |
| ) from e |
|
|
| |
| |
| |
| def decode_epoch_date(self) -> date: |
| |
| value = self._decode() |
| return self.set_shareable(date.fromordinal(value + 719163)) |
|
|
| def decode_date_string(self) -> date: |
| |
| value = self._decode() |
| return self.set_shareable(date.fromisoformat(value)) |
|
|
| def decode_datetime_string(self) -> datetime: |
| |
| value = self._decode() |
| match = timestamp_re.match(value) |
| if match: |
| ( |
| year, |
| month, |
| day, |
| hour, |
| minute, |
| second, |
| secfrac, |
| offset_sign, |
| offset_h, |
| offset_m, |
| ) = match.groups() |
| if secfrac is None: |
| microsecond = 0 |
| else: |
| microsecond = int(f"{secfrac:<06}") |
|
|
| if offset_h: |
| if offset_sign == "-": |
| sign = -1 |
| else: |
| sign = 1 |
| hours = int(offset_h) * sign |
| minutes = int(offset_m) * sign |
| tz = timezone(timedelta(hours=hours, minutes=minutes)) |
| else: |
| tz = timezone.utc |
|
|
| return self.set_shareable( |
| datetime( |
| int(year), |
| int(month), |
| int(day), |
| int(hour), |
| int(minute), |
| int(second), |
| microsecond, |
| tz, |
| ) |
| ) |
| else: |
| raise CBORDecodeValueError(f"invalid datetime string: {value!r}") |
|
|
| def decode_epoch_datetime(self) -> datetime: |
| |
| value = self._decode() |
|
|
| try: |
| tmp = datetime.fromtimestamp(value, timezone.utc) |
| except (OverflowError, OSError, ValueError) as exc: |
| raise CBORDecodeValueError("error decoding datetime from epoch") from exc |
|
|
| return self.set_shareable(tmp) |
|
|
| def decode_positive_bignum(self) -> int: |
| |
| from binascii import hexlify |
|
|
| value = self._decode() |
| if not isinstance(value, bytes): |
| raise CBORDecodeValueError("invalid bignum value " + str(value)) |
|
|
| return self.set_shareable(int(hexlify(value), 16)) |
|
|
| def decode_negative_bignum(self) -> int: |
| |
| return self.set_shareable(-self.decode_positive_bignum() - 1) |
|
|
| def decode_fraction(self) -> Decimal: |
| |
| from decimal import Decimal |
|
|
| try: |
| exp, sig = self._decode() |
| except (TypeError, ValueError) as e: |
| raise CBORDecodeValueError("Incorrect tag 4 payload") from e |
| tmp = Decimal(sig).as_tuple() |
| return self.set_shareable(Decimal((tmp.sign, tmp.digits, exp))) |
|
|
| def decode_bigfloat(self) -> Decimal: |
| |
| from decimal import Decimal |
|
|
| try: |
| exp, sig = self._decode() |
| except (TypeError, ValueError) as e: |
| raise CBORDecodeValueError("Incorrect tag 5 payload") from e |
|
|
| return self.set_shareable(Decimal(sig) * (2 ** Decimal(exp))) |
|
|
| def decode_stringref(self) -> str | bytes: |
| |
| if self._stringref_namespace is None: |
| raise CBORDecodeValueError("string reference outside of namespace") |
|
|
| index: int = self._decode() |
| try: |
| value = self._stringref_namespace[index] |
| except IndexError: |
| raise CBORDecodeValueError(f"string reference {index} not found") |
|
|
| return value |
|
|
| def decode_shareable(self) -> object: |
| |
| old_index = self._share_index |
| self._share_index = len(self._shareables) |
| self._shareables.append(None) |
| try: |
| return self._decode() |
| finally: |
| self._share_index = old_index |
|
|
| def decode_sharedref(self) -> Any: |
| |
| value = self._decode(unshared=True) |
| try: |
| shared = self._shareables[value] |
| except IndexError: |
| raise CBORDecodeValueError(f"shared reference {value} not found") |
|
|
| if shared is None: |
| raise CBORDecodeValueError(f"shared value {value} has not been initialized") |
| else: |
| return shared |
|
|
| def decode_complex(self) -> complex: |
| |
| inputval = self._decode(immutable=True, unshared=True) |
| try: |
| value = complex(*inputval) |
| except TypeError as exc: |
| if not isinstance(inputval, tuple): |
| raise CBORDecodeValueError( |
| "error decoding complex: input value was not a tuple" |
| ) from None |
|
|
| raise CBORDecodeValueError("error decoding complex") from exc |
|
|
| return self.set_shareable(value) |
|
|
| def decode_rational(self) -> Fraction: |
| |
| from fractions import Fraction |
|
|
| inputval = self._decode(immutable=True, unshared=True) |
| try: |
| value = Fraction(*inputval) |
| except (TypeError, ZeroDivisionError) as exc: |
| if not isinstance(inputval, tuple): |
| raise CBORDecodeValueError( |
| "error decoding rational: input value was not a tuple" |
| ) from None |
|
|
| raise CBORDecodeValueError("error decoding rational") from exc |
|
|
| return self.set_shareable(value) |
|
|
| def decode_regexp(self) -> re.Pattern[str]: |
| |
| try: |
| value = re.compile(self._decode()) |
| except re.error as exc: |
| raise CBORDecodeValueError("error decoding regular expression") from exc |
|
|
| return self.set_shareable(value) |
|
|
| def decode_mime(self) -> Message: |
| |
| from email.parser import Parser |
|
|
| try: |
| value = Parser().parsestr(self._decode()) |
| except TypeError as exc: |
| raise CBORDecodeValueError("error decoding MIME message") from exc |
|
|
| return self.set_shareable(value) |
|
|
| def decode_uuid(self) -> UUID: |
| |
| from uuid import UUID |
|
|
| try: |
| value = UUID(bytes=self._decode()) |
| except (TypeError, ValueError) as exc: |
| raise CBORDecodeValueError("error decoding UUID value") from exc |
|
|
| return self.set_shareable(value) |
|
|
| def decode_stringref_namespace(self) -> Any: |
| |
| old_namespace = self._stringref_namespace |
| self._stringref_namespace = [] |
| value = self._decode() |
| self._stringref_namespace = old_namespace |
| return value |
|
|
| def decode_set(self) -> set[Any] | frozenset[Any]: |
| |
| if self._immutable: |
| return self.set_shareable(frozenset(self._decode(immutable=True))) |
| else: |
| return self.set_shareable(set(self._decode(immutable=True))) |
|
|
| def decode_ipaddress(self) -> IPv4Address | IPv6Address | CBORTag: |
| |
| from ipaddress import ip_address |
|
|
| buf = self.decode() |
| if not isinstance(buf, bytes) or len(buf) not in (4, 6, 16): |
| raise CBORDecodeValueError(f"invalid ipaddress value {buf!r}") |
| elif len(buf) in (4, 16): |
| return self.set_shareable(ip_address(buf)) |
| elif len(buf) == 6: |
| |
| return self.set_shareable(CBORTag(260, buf)) |
|
|
| raise CBORDecodeValueError(f"invalid ipaddress value {buf!r}") |
|
|
| def decode_ipnetwork(self) -> IPv4Network | IPv6Network: |
| |
| from ipaddress import ip_network |
|
|
| net_map = self.decode() |
| if isinstance(net_map, Mapping) and len(net_map) == 1: |
| for net in net_map.items(): |
| try: |
| return self.set_shareable(ip_network(net, strict=False)) |
| except (TypeError, ValueError): |
| break |
|
|
| raise CBORDecodeValueError(f"invalid ipnetwork value {net_map!r}") |
|
|
| def decode_self_describe_cbor(self) -> Any: |
| |
| return self._decode() |
|
|
| |
| |
| |
|
|
| def decode_simple_value(self) -> CBORSimpleValue: |
| |
| return CBORSimpleValue(self.read(1)[0]) |
|
|
| def decode_float16(self) -> float: |
| return self.set_shareable(cast(float, struct.unpack(">e", self.read(2))[0])) |
|
|
| def decode_float32(self) -> float: |
| return self.set_shareable(cast(float, struct.unpack(">f", self.read(4))[0])) |
|
|
| def decode_float64(self) -> float: |
| return self.set_shareable(cast(float, struct.unpack(">d", self.read(8))[0])) |
|
|
|
|
| major_decoders: dict[int, Callable[[CBORDecoder, int], Any]] = { |
| 0: CBORDecoder.decode_uint, |
| 1: CBORDecoder.decode_negint, |
| 2: CBORDecoder.decode_bytestring, |
| 3: CBORDecoder.decode_string, |
| 4: CBORDecoder.decode_array, |
| 5: CBORDecoder.decode_map, |
| 6: CBORDecoder.decode_semantic, |
| 7: CBORDecoder.decode_special, |
| } |
|
|
| special_decoders: dict[int, Callable[[CBORDecoder], Any]] = { |
| 20: lambda self: False, |
| 21: lambda self: True, |
| 22: lambda self: None, |
| 23: lambda self: undefined, |
| 24: CBORDecoder.decode_simple_value, |
| 25: CBORDecoder.decode_float16, |
| 26: CBORDecoder.decode_float32, |
| 27: CBORDecoder.decode_float64, |
| 31: lambda self: break_marker, |
| } |
|
|
| semantic_decoders: dict[int, Callable[[CBORDecoder], Any]] = { |
| 0: CBORDecoder.decode_datetime_string, |
| 1: CBORDecoder.decode_epoch_datetime, |
| 2: CBORDecoder.decode_positive_bignum, |
| 3: CBORDecoder.decode_negative_bignum, |
| 4: CBORDecoder.decode_fraction, |
| 5: CBORDecoder.decode_bigfloat, |
| 25: CBORDecoder.decode_stringref, |
| 28: CBORDecoder.decode_shareable, |
| 29: CBORDecoder.decode_sharedref, |
| 30: CBORDecoder.decode_rational, |
| 35: CBORDecoder.decode_regexp, |
| 36: CBORDecoder.decode_mime, |
| 37: CBORDecoder.decode_uuid, |
| 100: CBORDecoder.decode_epoch_date, |
| 256: CBORDecoder.decode_stringref_namespace, |
| 258: CBORDecoder.decode_set, |
| 260: CBORDecoder.decode_ipaddress, |
| 261: CBORDecoder.decode_ipnetwork, |
| 1004: CBORDecoder.decode_date_string, |
| 43000: CBORDecoder.decode_complex, |
| 55799: CBORDecoder.decode_self_describe_cbor, |
| } |
|
|
|
|
| def loads( |
| s: bytes | bytearray | memoryview, |
| tag_hook: Callable[[CBORDecoder, CBORTag], Any] | None = None, |
| object_hook: Callable[[CBORDecoder, dict[Any, Any]], Any] | None = None, |
| str_errors: Literal["strict", "error", "replace"] = "strict", |
| ) -> Any: |
| """ |
| Deserialize an object from a bytestring. |
| |
| :param bytes s: |
| the bytestring to deserialize |
| :param tag_hook: |
| callable that takes 2 arguments: the decoder instance, and the :class:`.CBORTag` |
| to be decoded. This callback is invoked for any tags for which there is no |
| built-in decoder. The return value is substituted for the :class:`.CBORTag` |
| object in the deserialized output |
| :param object_hook: |
| callable that takes 2 arguments: the decoder instance, and a dictionary. This |
| callback is invoked for each deserialized :class:`dict` object. The return value |
| is substituted for the dict in the deserialized output. |
| :param str_errors: |
| determines how to handle unicode decoding errors (see the `Error Handlers`_ |
| section in the standard library documentation for details) |
| :return: |
| the deserialized object |
| |
| .. _Error Handlers: https://docs.python.org/3/library/codecs.html#error-handlers |
| |
| """ |
| with BytesIO(s) as fp: |
| return CBORDecoder( |
| fp, tag_hook=tag_hook, object_hook=object_hook, str_errors=str_errors |
| ).decode() |
|
|
|
|
| def load( |
| fp: IO[bytes], |
| tag_hook: Callable[[CBORDecoder, CBORTag], Any] | None = None, |
| object_hook: Callable[[CBORDecoder, dict[Any, Any]], Any] | None = None, |
| str_errors: Literal["strict", "error", "replace"] = "strict", |
| ) -> Any: |
| """ |
| Deserialize an object from an open file. |
| |
| :param fp: |
| the file to read from (any file-like object opened for reading in binary mode) |
| :param tag_hook: |
| callable that takes 2 arguments: the decoder instance, and the :class:`.CBORTag` |
| to be decoded. This callback is invoked for any tags for which there is no |
| built-in decoder. The return value is substituted for the :class:`.CBORTag` |
| object in the deserialized output |
| :param object_hook: |
| callable that takes 2 arguments: the decoder instance, and a dictionary. This |
| callback is invoked for each deserialized :class:`dict` object. The return value |
| is substituted for the dict in the deserialized output. |
| :param str_errors: |
| determines how to handle unicode decoding errors (see the `Error Handlers`_ |
| section in the standard library documentation for details) |
| :return: |
| the deserialized object |
| |
| .. _Error Handlers: https://docs.python.org/3/library/codecs.html#error-handlers |
| |
| """ |
| return CBORDecoder( |
| fp, tag_hook=tag_hook, object_hook=object_hook, str_errors=str_errors |
| ).decode() |
|
|