| | import inspect |
| | import sys |
| | from datetime import datetime, timezone |
| | from collections import Counter |
| | from dataclasses import is_dataclass |
| | from typing import (Collection, Mapping, Optional, TypeVar, Any, Type, Tuple, |
| | Union, cast) |
| |
|
| |
|
| | def _get_type_cons(type_): |
| | """More spaghetti logic for 3.6 vs. 3.7""" |
| | if sys.version_info.minor == 6: |
| | try: |
| | cons = type_.__extra__ |
| | except AttributeError: |
| | try: |
| | cons = type_.__origin__ |
| | except AttributeError: |
| | cons = type_ |
| | else: |
| | cons = type_ if cons is None else cons |
| | else: |
| | try: |
| | cons = type_.__origin__ if cons is None else cons |
| | except AttributeError: |
| | cons = type_ |
| | else: |
| | cons = type_.__origin__ |
| | return cons |
| |
|
| |
|
| | _NO_TYPE_ORIGIN = object() |
| |
|
| |
|
| | def _get_type_origin(type_): |
| | """Some spaghetti logic to accommodate differences between 3.6 and 3.7 in |
| | the typing api""" |
| | try: |
| | origin = type_.__origin__ |
| | except AttributeError: |
| | |
| | |
| | origin = _NO_TYPE_ORIGIN |
| |
|
| | if sys.version_info.minor == 6: |
| | try: |
| | origin = type_.__extra__ |
| | except AttributeError: |
| | origin = type_ |
| | else: |
| | origin = type_ if origin in (None, _NO_TYPE_ORIGIN) else origin |
| | elif origin is _NO_TYPE_ORIGIN: |
| | origin = type_ |
| | return origin |
| |
|
| |
|
| | def _hasargs(type_, *args): |
| | try: |
| | res = all(arg in type_.__args__ for arg in args) |
| | except AttributeError: |
| | return False |
| | except TypeError: |
| | if (type_.__args__ is None): |
| | return False |
| | else: |
| | raise |
| | else: |
| | return res |
| |
|
| |
|
| | class _NoArgs(object): |
| | def __bool__(self): |
| | return False |
| |
|
| | def __len__(self): |
| | return 0 |
| |
|
| | def __iter__(self): |
| | return self |
| |
|
| | def __next__(self): |
| | raise StopIteration |
| |
|
| |
|
| | _NO_ARGS = _NoArgs() |
| |
|
| |
|
| | def _get_type_args(tp: Type, default: Union[Tuple[Type, ...], _NoArgs] = _NO_ARGS) -> \ |
| | Union[Tuple[Type, ...], _NoArgs]: |
| | if hasattr(tp, '__args__'): |
| | if tp.__args__ is not None: |
| | return tp.__args__ |
| | return default |
| |
|
| |
|
| | def _get_type_arg_param(tp: Type, index: int) -> Union[Type, _NoArgs]: |
| | _args = _get_type_args(tp) |
| | if _args is not _NO_ARGS: |
| | try: |
| | return cast(Tuple[Type, ...], _args)[index] |
| | except (TypeError, IndexError, NotImplementedError): |
| | pass |
| |
|
| | return _NO_ARGS |
| |
|
| |
|
| | def _isinstance_safe(o, t): |
| | try: |
| | result = isinstance(o, t) |
| | except Exception: |
| | return False |
| | else: |
| | return result |
| |
|
| |
|
| | def _issubclass_safe(cls, classinfo): |
| | try: |
| | return issubclass(cls, classinfo) |
| | except Exception: |
| | return (_is_new_type_subclass_safe(cls, classinfo) |
| | if _is_new_type(cls) |
| | else False) |
| |
|
| |
|
| | def _is_new_type_subclass_safe(cls, classinfo): |
| | super_type = getattr(cls, "__supertype__", None) |
| |
|
| | if super_type: |
| | return _is_new_type_subclass_safe(super_type, classinfo) |
| |
|
| | try: |
| | return issubclass(cls, classinfo) |
| | except Exception: |
| | return False |
| |
|
| |
|
| | def _is_new_type(type_): |
| | return inspect.isfunction(type_) and hasattr(type_, "__supertype__") |
| |
|
| |
|
| | def _is_optional(type_): |
| | return (_issubclass_safe(type_, Optional) or |
| | _hasargs(type_, type(None)) or |
| | type_ is Any) |
| |
|
| |
|
| | def _is_counter(type_): |
| | return _issubclass_safe(_get_type_origin(type_), Counter) |
| |
|
| |
|
| | def _is_mapping(type_): |
| | return _issubclass_safe(_get_type_origin(type_), Mapping) |
| |
|
| |
|
| | def _is_collection(type_): |
| | return _issubclass_safe(_get_type_origin(type_), Collection) |
| |
|
| |
|
| | def _is_tuple(type_): |
| | return _issubclass_safe(_get_type_origin(type_), Tuple) |
| |
|
| |
|
| | def _is_nonstr_collection(type_): |
| | return (_issubclass_safe(_get_type_origin(type_), Collection) |
| | and not _issubclass_safe(type_, str)) |
| |
|
| |
|
| | def _is_generic_dataclass(type_): |
| | return is_dataclass(_get_type_origin(type_)) |
| |
|
| |
|
| | def _timestamp_to_dt_aware(timestamp: float): |
| | tz = datetime.now(timezone.utc).astimezone().tzinfo |
| | dt = datetime.fromtimestamp(timestamp, tz=tz) |
| | return dt |
| |
|
| |
|
| | def _undefined_parameter_action_safe(cls): |
| | try: |
| | if cls.dataclass_json_config is None: |
| | return |
| | action_enum = cls.dataclass_json_config['undefined'] |
| | except (AttributeError, KeyError): |
| | return |
| |
|
| | if action_enum is None or action_enum.value is None: |
| | return |
| |
|
| | return action_enum |
| |
|
| |
|
| | def _handle_undefined_parameters_safe(cls, kvs, usage: str): |
| | """ |
| | Checks if an undefined parameters action is defined and performs the |
| | according action. |
| | """ |
| | undefined_parameter_action = _undefined_parameter_action_safe(cls) |
| | usage = usage.lower() |
| | if undefined_parameter_action is None: |
| | return kvs if usage != "init" else cls.__init__ |
| | if usage == "from": |
| | return undefined_parameter_action.value.handle_from_dict(cls=cls, |
| | kvs=kvs) |
| | elif usage == "to": |
| | return undefined_parameter_action.value.handle_to_dict(obj=cls, |
| | kvs=kvs) |
| | elif usage == "dump": |
| | return undefined_parameter_action.value.handle_dump(obj=cls) |
| | elif usage == "init": |
| | return undefined_parameter_action.value.create_init(obj=cls) |
| | else: |
| | raise ValueError( |
| | f"usage must be one of ['to', 'from', 'dump', 'init'], " |
| | f"but is '{usage}'") |
| |
|
| |
|
| | |
| | |
| | CatchAllVar = TypeVar("CatchAllVar", bound=Mapping) |
| |
|