| import warnings |
| from abc import ABCMeta |
| from copy import deepcopy |
| from enum import Enum |
| from functools import partial |
| from pathlib import Path |
| from types import FunctionType, prepare_class, resolve_bases |
| from typing import ( |
| TYPE_CHECKING, |
| AbstractSet, |
| Any, |
| Callable, |
| ClassVar, |
| Dict, |
| List, |
| Mapping, |
| Optional, |
| Tuple, |
| Type, |
| TypeVar, |
| Union, |
| cast, |
| no_type_check, |
| overload, |
| ) |
|
|
| from typing_extensions import dataclass_transform |
|
|
| from pydantic.v1.class_validators import ValidatorGroup, extract_root_validators, extract_validators, inherit_validators |
| from pydantic.v1.config import BaseConfig, Extra, inherit_config, prepare_config |
| from pydantic.v1.error_wrappers import ErrorWrapper, ValidationError |
| from pydantic.v1.errors import ConfigError, DictError, ExtraError, MissingError |
| from pydantic.v1.fields import ( |
| MAPPING_LIKE_SHAPES, |
| Field, |
| ModelField, |
| ModelPrivateAttr, |
| PrivateAttr, |
| Undefined, |
| is_finalvar_with_default_val, |
| ) |
| from pydantic.v1.json import custom_pydantic_encoder, pydantic_encoder |
| from pydantic.v1.parse import Protocol, load_file, load_str_bytes |
| from pydantic.v1.schema import default_ref_template, model_schema |
| from pydantic.v1.types import PyObject, StrBytes |
| from pydantic.v1.typing import ( |
| AnyCallable, |
| get_args, |
| get_origin, |
| is_classvar, |
| is_namedtuple, |
| is_union, |
| resolve_annotations, |
| update_model_forward_refs, |
| ) |
| from pydantic.v1.utils import ( |
| DUNDER_ATTRIBUTES, |
| ROOT_KEY, |
| ClassAttribute, |
| GetterDict, |
| Representation, |
| ValueItems, |
| generate_model_signature, |
| is_valid_field, |
| is_valid_private_name, |
| lenient_issubclass, |
| sequence_like, |
| smart_deepcopy, |
| unique_list, |
| validate_field_name, |
| ) |
|
|
| if TYPE_CHECKING: |
| from inspect import Signature |
|
|
| from pydantic.v1.class_validators import ValidatorListDict |
| from pydantic.v1.types import ModelOrDc |
| from pydantic.v1.typing import ( |
| AbstractSetIntStr, |
| AnyClassMethod, |
| CallableGenerator, |
| DictAny, |
| DictStrAny, |
| MappingIntStrAny, |
| ReprArgs, |
| SetStr, |
| TupleGenerator, |
| ) |
|
|
| Model = TypeVar('Model', bound='BaseModel') |
|
|
| __all__ = 'BaseModel', 'create_model', 'validate_model' |
|
|
| _T = TypeVar('_T') |
|
|
|
|
| def validate_custom_root_type(fields: Dict[str, ModelField]) -> None: |
| if len(fields) > 1: |
| raise ValueError(f'{ROOT_KEY} cannot be mixed with other fields') |
|
|
|
|
| def generate_hash_function(frozen: bool) -> Optional[Callable[[Any], int]]: |
| def hash_function(self_: Any) -> int: |
| return hash(self_.__class__) + hash(tuple(self_.__dict__.values())) |
|
|
| return hash_function if frozen else None |
|
|
|
|
| |
| ANNOTATED_FIELD_UNTOUCHED_TYPES: Tuple[Any, ...] = (property, type, classmethod, staticmethod) |
| |
| UNTOUCHED_TYPES: Tuple[Any, ...] = (FunctionType,) + ANNOTATED_FIELD_UNTOUCHED_TYPES |
| |
| |
| |
| |
| _is_base_model_class_defined = False |
|
|
|
|
| @dataclass_transform(kw_only_default=True, field_specifiers=(Field,)) |
| class ModelMetaclass(ABCMeta): |
| @no_type_check |
| def __new__(mcs, name, bases, namespace, **kwargs): |
| fields: Dict[str, ModelField] = {} |
| config = BaseConfig |
| validators: 'ValidatorListDict' = {} |
|
|
| pre_root_validators, post_root_validators = [], [] |
| private_attributes: Dict[str, ModelPrivateAttr] = {} |
| base_private_attributes: Dict[str, ModelPrivateAttr] = {} |
| slots: SetStr = namespace.get('__slots__', ()) |
| slots = {slots} if isinstance(slots, str) else set(slots) |
| class_vars: SetStr = set() |
| hash_func: Optional[Callable[[Any], int]] = None |
|
|
| for base in reversed(bases): |
| if _is_base_model_class_defined and issubclass(base, BaseModel) and base != BaseModel: |
| fields.update(smart_deepcopy(base.__fields__)) |
| config = inherit_config(base.__config__, config) |
| validators = inherit_validators(base.__validators__, validators) |
| pre_root_validators += base.__pre_root_validators__ |
| post_root_validators += base.__post_root_validators__ |
| base_private_attributes.update(base.__private_attributes__) |
| class_vars.update(base.__class_vars__) |
| hash_func = base.__hash__ |
|
|
| resolve_forward_refs = kwargs.pop('__resolve_forward_refs__', True) |
| allowed_config_kwargs: SetStr = { |
| key |
| for key in dir(config) |
| if not (key.startswith('__') and key.endswith('__')) |
| } |
| config_kwargs = {key: kwargs.pop(key) for key in kwargs.keys() & allowed_config_kwargs} |
| config_from_namespace = namespace.get('Config') |
| if config_kwargs and config_from_namespace: |
| raise TypeError('Specifying config in two places is ambiguous, use either Config attribute or class kwargs') |
| config = inherit_config(config_from_namespace, config, **config_kwargs) |
|
|
| validators = inherit_validators(extract_validators(namespace), validators) |
| vg = ValidatorGroup(validators) |
|
|
| for f in fields.values(): |
| f.set_config(config) |
| extra_validators = vg.get_validators(f.name) |
| if extra_validators: |
| f.class_validators.update(extra_validators) |
| |
| f.populate_validators() |
|
|
| prepare_config(config, name) |
|
|
| untouched_types = ANNOTATED_FIELD_UNTOUCHED_TYPES |
|
|
| def is_untouched(v: Any) -> bool: |
| return isinstance(v, untouched_types) or v.__class__.__name__ == 'cython_function_or_method' |
|
|
| if (namespace.get('__module__'), namespace.get('__qualname__')) != ('pydantic.main', 'BaseModel'): |
| annotations = resolve_annotations(namespace.get('__annotations__', {}), namespace.get('__module__', None)) |
| |
| for ann_name, ann_type in annotations.items(): |
| if is_classvar(ann_type): |
| class_vars.add(ann_name) |
| elif is_finalvar_with_default_val(ann_type, namespace.get(ann_name, Undefined)): |
| class_vars.add(ann_name) |
| elif is_valid_field(ann_name): |
| validate_field_name(bases, ann_name) |
| value = namespace.get(ann_name, Undefined) |
| allowed_types = get_args(ann_type) if is_union(get_origin(ann_type)) else (ann_type,) |
| if ( |
| is_untouched(value) |
| and ann_type != PyObject |
| and not any( |
| lenient_issubclass(get_origin(allowed_type), Type) for allowed_type in allowed_types |
| ) |
| ): |
| continue |
| fields[ann_name] = ModelField.infer( |
| name=ann_name, |
| value=value, |
| annotation=ann_type, |
| class_validators=vg.get_validators(ann_name), |
| config=config, |
| ) |
| elif ann_name not in namespace and config.underscore_attrs_are_private: |
| private_attributes[ann_name] = PrivateAttr() |
|
|
| untouched_types = UNTOUCHED_TYPES + config.keep_untouched |
| for var_name, value in namespace.items(): |
| can_be_changed = var_name not in class_vars and not is_untouched(value) |
| if isinstance(value, ModelPrivateAttr): |
| if not is_valid_private_name(var_name): |
| raise NameError( |
| f'Private attributes "{var_name}" must not be a valid field name; ' |
| f'Use sunder or dunder names, e. g. "_{var_name}" or "__{var_name}__"' |
| ) |
| private_attributes[var_name] = value |
| elif config.underscore_attrs_are_private and is_valid_private_name(var_name) and can_be_changed: |
| private_attributes[var_name] = PrivateAttr(default=value) |
| elif is_valid_field(var_name) and var_name not in annotations and can_be_changed: |
| validate_field_name(bases, var_name) |
| inferred = ModelField.infer( |
| name=var_name, |
| value=value, |
| annotation=annotations.get(var_name, Undefined), |
| class_validators=vg.get_validators(var_name), |
| config=config, |
| ) |
| if var_name in fields: |
| if lenient_issubclass(inferred.type_, fields[var_name].type_): |
| inferred.type_ = fields[var_name].type_ |
| else: |
| raise TypeError( |
| f'The type of {name}.{var_name} differs from the new default value; ' |
| f'if you wish to change the type of this field, please use a type annotation' |
| ) |
| fields[var_name] = inferred |
|
|
| _custom_root_type = ROOT_KEY in fields |
| if _custom_root_type: |
| validate_custom_root_type(fields) |
| vg.check_for_unused() |
| if config.json_encoders: |
| json_encoder = partial(custom_pydantic_encoder, config.json_encoders) |
| else: |
| json_encoder = pydantic_encoder |
| pre_rv_new, post_rv_new = extract_root_validators(namespace) |
|
|
| if hash_func is None: |
| hash_func = generate_hash_function(config.frozen) |
|
|
| exclude_from_namespace = fields | private_attributes.keys() | {'__slots__'} |
| new_namespace = { |
| '__config__': config, |
| '__fields__': fields, |
| '__exclude_fields__': { |
| name: field.field_info.exclude for name, field in fields.items() if field.field_info.exclude is not None |
| } |
| or None, |
| '__include_fields__': { |
| name: field.field_info.include for name, field in fields.items() if field.field_info.include is not None |
| } |
| or None, |
| '__validators__': vg.validators, |
| '__pre_root_validators__': unique_list( |
| pre_root_validators + pre_rv_new, |
| name_factory=lambda v: v.__name__, |
| ), |
| '__post_root_validators__': unique_list( |
| post_root_validators + post_rv_new, |
| name_factory=lambda skip_on_failure_and_v: skip_on_failure_and_v[1].__name__, |
| ), |
| '__schema_cache__': {}, |
| '__json_encoder__': staticmethod(json_encoder), |
| '__custom_root_type__': _custom_root_type, |
| '__private_attributes__': {**base_private_attributes, **private_attributes}, |
| '__slots__': slots | private_attributes.keys(), |
| '__hash__': hash_func, |
| '__class_vars__': class_vars, |
| **{n: v for n, v in namespace.items() if n not in exclude_from_namespace}, |
| } |
|
|
| cls = super().__new__(mcs, name, bases, new_namespace, **kwargs) |
| |
| cls.__signature__ = ClassAttribute('__signature__', generate_model_signature(cls.__init__, fields, config)) |
|
|
| if not _is_base_model_class_defined: |
| |
| |
| getattr(cls, '__annotations__', {}).clear() |
|
|
| if resolve_forward_refs: |
| cls.__try_update_forward_refs__() |
|
|
| |
| |
| for name, obj in namespace.items(): |
| if name not in new_namespace: |
| set_name = getattr(obj, '__set_name__', None) |
| if callable(set_name): |
| set_name(cls, name) |
|
|
| return cls |
|
|
| def __instancecheck__(self, instance: Any) -> bool: |
| """ |
| Avoid calling ABC _abc_subclasscheck unless we're pretty sure. |
| |
| See #3829 and python/cpython#92810 |
| """ |
| return hasattr(instance, '__post_root_validators__') and super().__instancecheck__(instance) |
|
|
|
|
| object_setattr = object.__setattr__ |
|
|
|
|
| class BaseModel(Representation, metaclass=ModelMetaclass): |
| if TYPE_CHECKING: |
| |
| __fields__: ClassVar[Dict[str, ModelField]] = {} |
| __include_fields__: ClassVar[Optional[Mapping[str, Any]]] = None |
| __exclude_fields__: ClassVar[Optional[Mapping[str, Any]]] = None |
| __validators__: ClassVar[Dict[str, AnyCallable]] = {} |
| __pre_root_validators__: ClassVar[List[AnyCallable]] |
| __post_root_validators__: ClassVar[List[Tuple[bool, AnyCallable]]] |
| __config__: ClassVar[Type[BaseConfig]] = BaseConfig |
| __json_encoder__: ClassVar[Callable[[Any], Any]] = lambda x: x |
| __schema_cache__: ClassVar['DictAny'] = {} |
| __custom_root_type__: ClassVar[bool] = False |
| __signature__: ClassVar['Signature'] |
| __private_attributes__: ClassVar[Dict[str, ModelPrivateAttr]] |
| __class_vars__: ClassVar[SetStr] |
| __fields_set__: ClassVar[SetStr] = set() |
|
|
| Config = BaseConfig |
| __slots__ = ('__dict__', '__fields_set__') |
| __doc__ = '' |
|
|
| def __init__(__pydantic_self__, **data: Any) -> None: |
| """ |
| Create a new model by parsing and validating input data from keyword arguments. |
| |
| Raises ValidationError if the input data cannot be parsed to form a valid model. |
| """ |
| |
| values, fields_set, validation_error = validate_model(__pydantic_self__.__class__, data) |
| if validation_error: |
| raise validation_error |
| try: |
| object_setattr(__pydantic_self__, '__dict__', values) |
| except TypeError as e: |
| raise TypeError( |
| 'Model values must be a dict; you may not have returned a dictionary from a root validator' |
| ) from e |
| object_setattr(__pydantic_self__, '__fields_set__', fields_set) |
| __pydantic_self__._init_private_attributes() |
|
|
| @no_type_check |
| def __setattr__(self, name, value): |
| if name in self.__private_attributes__ or name in DUNDER_ATTRIBUTES: |
| return object_setattr(self, name, value) |
|
|
| if self.__config__.extra is not Extra.allow and name not in self.__fields__: |
| raise ValueError(f'"{self.__class__.__name__}" object has no field "{name}"') |
| elif not self.__config__.allow_mutation or self.__config__.frozen: |
| raise TypeError(f'"{self.__class__.__name__}" is immutable and does not support item assignment') |
| elif name in self.__fields__ and self.__fields__[name].final: |
| raise TypeError( |
| f'"{self.__class__.__name__}" object "{name}" field is final and does not support reassignment' |
| ) |
| elif self.__config__.validate_assignment: |
| new_values = {**self.__dict__, name: value} |
|
|
| for validator in self.__pre_root_validators__: |
| try: |
| new_values = validator(self.__class__, new_values) |
| except (ValueError, TypeError, AssertionError) as exc: |
| raise ValidationError([ErrorWrapper(exc, loc=ROOT_KEY)], self.__class__) |
|
|
| known_field = self.__fields__.get(name, None) |
| if known_field: |
| |
| |
| |
| |
| if not known_field.field_info.allow_mutation: |
| raise TypeError(f'"{known_field.name}" has allow_mutation set to False and cannot be assigned') |
| dict_without_original_value = {k: v for k, v in self.__dict__.items() if k != name} |
| value, error_ = known_field.validate(value, dict_without_original_value, loc=name, cls=self.__class__) |
| if error_: |
| raise ValidationError([error_], self.__class__) |
| else: |
| new_values[name] = value |
|
|
| errors = [] |
| for skip_on_failure, validator in self.__post_root_validators__: |
| if skip_on_failure and errors: |
| continue |
| try: |
| new_values = validator(self.__class__, new_values) |
| except (ValueError, TypeError, AssertionError) as exc: |
| errors.append(ErrorWrapper(exc, loc=ROOT_KEY)) |
| if errors: |
| raise ValidationError(errors, self.__class__) |
|
|
| |
| |
| object_setattr(self, '__dict__', new_values) |
| else: |
| self.__dict__[name] = value |
|
|
| self.__fields_set__.add(name) |
|
|
| def __getstate__(self) -> 'DictAny': |
| private_attrs = ((k, getattr(self, k, Undefined)) for k in self.__private_attributes__) |
| return { |
| '__dict__': self.__dict__, |
| '__fields_set__': self.__fields_set__, |
| '__private_attribute_values__': {k: v for k, v in private_attrs if v is not Undefined}, |
| } |
|
|
| def __setstate__(self, state: 'DictAny') -> None: |
| object_setattr(self, '__dict__', state['__dict__']) |
| object_setattr(self, '__fields_set__', state['__fields_set__']) |
| for name, value in state.get('__private_attribute_values__', {}).items(): |
| object_setattr(self, name, value) |
|
|
| def _init_private_attributes(self) -> None: |
| for name, private_attr in self.__private_attributes__.items(): |
| default = private_attr.get_default() |
| if default is not Undefined: |
| object_setattr(self, name, default) |
|
|
| def dict( |
| self, |
| *, |
| include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| exclude: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| by_alias: bool = False, |
| skip_defaults: Optional[bool] = None, |
| exclude_unset: bool = False, |
| exclude_defaults: bool = False, |
| exclude_none: bool = False, |
| ) -> 'DictStrAny': |
| """ |
| Generate a dictionary representation of the model, optionally specifying which fields to include or exclude. |
| |
| """ |
| if skip_defaults is not None: |
| warnings.warn( |
| f'{self.__class__.__name__}.dict(): "skip_defaults" is deprecated and replaced by "exclude_unset"', |
| DeprecationWarning, |
| ) |
| exclude_unset = skip_defaults |
|
|
| return dict( |
| self._iter( |
| to_dict=True, |
| by_alias=by_alias, |
| include=include, |
| exclude=exclude, |
| exclude_unset=exclude_unset, |
| exclude_defaults=exclude_defaults, |
| exclude_none=exclude_none, |
| ) |
| ) |
|
|
| def json( |
| self, |
| *, |
| include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| exclude: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| by_alias: bool = False, |
| skip_defaults: Optional[bool] = None, |
| exclude_unset: bool = False, |
| exclude_defaults: bool = False, |
| exclude_none: bool = False, |
| encoder: Optional[Callable[[Any], Any]] = None, |
| models_as_dict: bool = True, |
| **dumps_kwargs: Any, |
| ) -> str: |
| """ |
| Generate a JSON representation of the model, `include` and `exclude` arguments as per `dict()`. |
| |
| `encoder` is an optional function to supply as `default` to json.dumps(), other arguments as per `json.dumps()`. |
| """ |
| if skip_defaults is not None: |
| warnings.warn( |
| f'{self.__class__.__name__}.json(): "skip_defaults" is deprecated and replaced by "exclude_unset"', |
| DeprecationWarning, |
| ) |
| exclude_unset = skip_defaults |
| encoder = cast(Callable[[Any], Any], encoder or self.__json_encoder__) |
|
|
| |
| |
| |
| data = dict( |
| self._iter( |
| to_dict=models_as_dict, |
| by_alias=by_alias, |
| include=include, |
| exclude=exclude, |
| exclude_unset=exclude_unset, |
| exclude_defaults=exclude_defaults, |
| exclude_none=exclude_none, |
| ) |
| ) |
| if self.__custom_root_type__: |
| data = data[ROOT_KEY] |
| return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs) |
|
|
| @classmethod |
| def _enforce_dict_if_root(cls, obj: Any) -> Any: |
| if cls.__custom_root_type__ and ( |
| not (isinstance(obj, dict) and obj.keys() == {ROOT_KEY}) |
| and not (isinstance(obj, BaseModel) and obj.__fields__.keys() == {ROOT_KEY}) |
| or cls.__fields__[ROOT_KEY].shape in MAPPING_LIKE_SHAPES |
| ): |
| return {ROOT_KEY: obj} |
| else: |
| return obj |
|
|
| @classmethod |
| def parse_obj(cls: Type['Model'], obj: Any) -> 'Model': |
| obj = cls._enforce_dict_if_root(obj) |
| if not isinstance(obj, dict): |
| try: |
| obj = dict(obj) |
| except (TypeError, ValueError) as e: |
| exc = TypeError(f'{cls.__name__} expected dict not {obj.__class__.__name__}') |
| raise ValidationError([ErrorWrapper(exc, loc=ROOT_KEY)], cls) from e |
| return cls(**obj) |
|
|
| @classmethod |
| def parse_raw( |
| cls: Type['Model'], |
| b: StrBytes, |
| *, |
| content_type: str = None, |
| encoding: str = 'utf8', |
| proto: Protocol = None, |
| allow_pickle: bool = False, |
| ) -> 'Model': |
| try: |
| obj = load_str_bytes( |
| b, |
| proto=proto, |
| content_type=content_type, |
| encoding=encoding, |
| allow_pickle=allow_pickle, |
| json_loads=cls.__config__.json_loads, |
| ) |
| except (ValueError, TypeError, UnicodeDecodeError) as e: |
| raise ValidationError([ErrorWrapper(e, loc=ROOT_KEY)], cls) |
| return cls.parse_obj(obj) |
|
|
| @classmethod |
| def parse_file( |
| cls: Type['Model'], |
| path: Union[str, Path], |
| *, |
| content_type: str = None, |
| encoding: str = 'utf8', |
| proto: Protocol = None, |
| allow_pickle: bool = False, |
| ) -> 'Model': |
| obj = load_file( |
| path, |
| proto=proto, |
| content_type=content_type, |
| encoding=encoding, |
| allow_pickle=allow_pickle, |
| json_loads=cls.__config__.json_loads, |
| ) |
| return cls.parse_obj(obj) |
|
|
| @classmethod |
| def from_orm(cls: Type['Model'], obj: Any) -> 'Model': |
| if not cls.__config__.orm_mode: |
| raise ConfigError('You must have the config attribute orm_mode=True to use from_orm') |
| obj = {ROOT_KEY: obj} if cls.__custom_root_type__ else cls._decompose_class(obj) |
| m = cls.__new__(cls) |
| values, fields_set, validation_error = validate_model(cls, obj) |
| if validation_error: |
| raise validation_error |
| object_setattr(m, '__dict__', values) |
| object_setattr(m, '__fields_set__', fields_set) |
| m._init_private_attributes() |
| return m |
|
|
| @classmethod |
| def construct(cls: Type['Model'], _fields_set: Optional['SetStr'] = None, **values: Any) -> 'Model': |
| """ |
| Creates a new model setting __dict__ and __fields_set__ from trusted or pre-validated data. |
| Default values are respected, but no other validation is performed. |
| Behaves as if `Config.extra = 'allow'` was set since it adds all passed values |
| """ |
| m = cls.__new__(cls) |
| fields_values: Dict[str, Any] = {} |
| for name, field in cls.__fields__.items(): |
| if field.alt_alias and field.alias in values: |
| fields_values[name] = values[field.alias] |
| elif name in values: |
| fields_values[name] = values[name] |
| elif not field.required: |
| fields_values[name] = field.get_default() |
| fields_values.update(values) |
| object_setattr(m, '__dict__', fields_values) |
| if _fields_set is None: |
| _fields_set = set(values.keys()) |
| object_setattr(m, '__fields_set__', _fields_set) |
| m._init_private_attributes() |
| return m |
|
|
| def _copy_and_set_values(self: 'Model', values: 'DictStrAny', fields_set: 'SetStr', *, deep: bool) -> 'Model': |
| if deep: |
| |
| values = deepcopy(values) |
|
|
| cls = self.__class__ |
| m = cls.__new__(cls) |
| object_setattr(m, '__dict__', values) |
| object_setattr(m, '__fields_set__', fields_set) |
| for name in self.__private_attributes__: |
| value = getattr(self, name, Undefined) |
| if value is not Undefined: |
| if deep: |
| value = deepcopy(value) |
| object_setattr(m, name, value) |
|
|
| return m |
|
|
| def copy( |
| self: 'Model', |
| *, |
| include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| exclude: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| update: Optional['DictStrAny'] = None, |
| deep: bool = False, |
| ) -> 'Model': |
| """ |
| Duplicate a model, optionally choose which fields to include, exclude and change. |
| |
| :param include: fields to include in new model |
| :param exclude: fields to exclude from new model, as with values this takes precedence over include |
| :param update: values to change/add in the new model. Note: the data is not validated before creating |
| the new model: you should trust this data |
| :param deep: set to `True` to make a deep copy of the model |
| :return: new model instance |
| """ |
|
|
| values = dict( |
| self._iter(to_dict=False, by_alias=False, include=include, exclude=exclude, exclude_unset=False), |
| **(update or {}), |
| ) |
|
|
| |
| if update: |
| fields_set = self.__fields_set__ | update.keys() |
| else: |
| fields_set = set(self.__fields_set__) |
|
|
| return self._copy_and_set_values(values, fields_set, deep=deep) |
|
|
| @classmethod |
| def schema(cls, by_alias: bool = True, ref_template: str = default_ref_template) -> 'DictStrAny': |
| cached = cls.__schema_cache__.get((by_alias, ref_template)) |
| if cached is not None: |
| return cached |
| s = model_schema(cls, by_alias=by_alias, ref_template=ref_template) |
| cls.__schema_cache__[(by_alias, ref_template)] = s |
| return s |
|
|
| @classmethod |
| def schema_json( |
| cls, *, by_alias: bool = True, ref_template: str = default_ref_template, **dumps_kwargs: Any |
| ) -> str: |
| from pydantic.v1.json import pydantic_encoder |
|
|
| return cls.__config__.json_dumps( |
| cls.schema(by_alias=by_alias, ref_template=ref_template), default=pydantic_encoder, **dumps_kwargs |
| ) |
|
|
| @classmethod |
| def __get_validators__(cls) -> 'CallableGenerator': |
| yield cls.validate |
|
|
| @classmethod |
| def validate(cls: Type['Model'], value: Any) -> 'Model': |
| if isinstance(value, cls): |
| copy_on_model_validation = cls.__config__.copy_on_model_validation |
| |
| deep_copy: Optional[bool] = None |
| if copy_on_model_validation not in {'deep', 'shallow', 'none'}: |
| |
| warnings.warn( |
| "`copy_on_model_validation` should be a string: 'deep', 'shallow' or 'none'", DeprecationWarning |
| ) |
| if copy_on_model_validation: |
| deep_copy = False |
|
|
| if copy_on_model_validation == 'shallow': |
| |
| deep_copy = False |
| elif copy_on_model_validation == 'deep': |
| |
| deep_copy = True |
|
|
| if deep_copy is None: |
| return value |
| else: |
| return value._copy_and_set_values(value.__dict__, value.__fields_set__, deep=deep_copy) |
|
|
| value = cls._enforce_dict_if_root(value) |
|
|
| if isinstance(value, dict): |
| return cls(**value) |
| elif cls.__config__.orm_mode: |
| return cls.from_orm(value) |
| else: |
| try: |
| value_as_dict = dict(value) |
| except (TypeError, ValueError) as e: |
| raise DictError() from e |
| return cls(**value_as_dict) |
|
|
| @classmethod |
| def _decompose_class(cls: Type['Model'], obj: Any) -> GetterDict: |
| if isinstance(obj, GetterDict): |
| return obj |
| return cls.__config__.getter_dict(obj) |
|
|
| @classmethod |
| @no_type_check |
| def _get_value( |
| cls, |
| v: Any, |
| to_dict: bool, |
| by_alias: bool, |
| include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']], |
| exclude: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']], |
| exclude_unset: bool, |
| exclude_defaults: bool, |
| exclude_none: bool, |
| ) -> Any: |
| if isinstance(v, BaseModel): |
| if to_dict: |
| v_dict = v.dict( |
| by_alias=by_alias, |
| exclude_unset=exclude_unset, |
| exclude_defaults=exclude_defaults, |
| include=include, |
| exclude=exclude, |
| exclude_none=exclude_none, |
| ) |
| if ROOT_KEY in v_dict: |
| return v_dict[ROOT_KEY] |
| return v_dict |
| else: |
| return v.copy(include=include, exclude=exclude) |
|
|
| value_exclude = ValueItems(v, exclude) if exclude else None |
| value_include = ValueItems(v, include) if include else None |
|
|
| if isinstance(v, dict): |
| return { |
| k_: cls._get_value( |
| v_, |
| to_dict=to_dict, |
| by_alias=by_alias, |
| exclude_unset=exclude_unset, |
| exclude_defaults=exclude_defaults, |
| include=value_include and value_include.for_element(k_), |
| exclude=value_exclude and value_exclude.for_element(k_), |
| exclude_none=exclude_none, |
| ) |
| for k_, v_ in v.items() |
| if (not value_exclude or not value_exclude.is_excluded(k_)) |
| and (not value_include or value_include.is_included(k_)) |
| } |
|
|
| elif sequence_like(v): |
| seq_args = ( |
| cls._get_value( |
| v_, |
| to_dict=to_dict, |
| by_alias=by_alias, |
| exclude_unset=exclude_unset, |
| exclude_defaults=exclude_defaults, |
| include=value_include and value_include.for_element(i), |
| exclude=value_exclude and value_exclude.for_element(i), |
| exclude_none=exclude_none, |
| ) |
| for i, v_ in enumerate(v) |
| if (not value_exclude or not value_exclude.is_excluded(i)) |
| and (not value_include or value_include.is_included(i)) |
| ) |
|
|
| return v.__class__(*seq_args) if is_namedtuple(v.__class__) else v.__class__(seq_args) |
|
|
| elif isinstance(v, Enum) and getattr(cls.Config, 'use_enum_values', False): |
| return v.value |
|
|
| else: |
| return v |
|
|
| @classmethod |
| def __try_update_forward_refs__(cls, **localns: Any) -> None: |
| """ |
| Same as update_forward_refs but will not raise exception |
| when forward references are not defined. |
| """ |
| update_model_forward_refs(cls, cls.__fields__.values(), cls.__config__.json_encoders, localns, (NameError,)) |
|
|
| @classmethod |
| def update_forward_refs(cls, **localns: Any) -> None: |
| """ |
| Try to update ForwardRefs on fields based on this Model, globalns and localns. |
| """ |
| update_model_forward_refs(cls, cls.__fields__.values(), cls.__config__.json_encoders, localns) |
|
|
| def __iter__(self) -> 'TupleGenerator': |
| """ |
| so `dict(model)` works |
| """ |
| yield from self.__dict__.items() |
|
|
| def _iter( |
| self, |
| to_dict: bool = False, |
| by_alias: bool = False, |
| include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| exclude: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None, |
| exclude_unset: bool = False, |
| exclude_defaults: bool = False, |
| exclude_none: bool = False, |
| ) -> 'TupleGenerator': |
| |
| |
| if exclude is not None or self.__exclude_fields__ is not None: |
| exclude = ValueItems.merge(self.__exclude_fields__, exclude) |
|
|
| if include is not None or self.__include_fields__ is not None: |
| include = ValueItems.merge(self.__include_fields__, include, intersect=True) |
|
|
| allowed_keys = self._calculate_keys( |
| include=include, exclude=exclude, exclude_unset=exclude_unset |
| ) |
| if allowed_keys is None and not (to_dict or by_alias or exclude_unset or exclude_defaults or exclude_none): |
| |
| yield from self.__dict__.items() |
| return |
|
|
| value_exclude = ValueItems(self, exclude) if exclude is not None else None |
| value_include = ValueItems(self, include) if include is not None else None |
|
|
| for field_key, v in self.__dict__.items(): |
| if (allowed_keys is not None and field_key not in allowed_keys) or (exclude_none and v is None): |
| continue |
|
|
| if exclude_defaults: |
| model_field = self.__fields__.get(field_key) |
| if not getattr(model_field, 'required', True) and getattr(model_field, 'default', _missing) == v: |
| continue |
|
|
| if by_alias and field_key in self.__fields__: |
| dict_key = self.__fields__[field_key].alias |
| else: |
| dict_key = field_key |
|
|
| if to_dict or value_include or value_exclude: |
| v = self._get_value( |
| v, |
| to_dict=to_dict, |
| by_alias=by_alias, |
| include=value_include and value_include.for_element(field_key), |
| exclude=value_exclude and value_exclude.for_element(field_key), |
| exclude_unset=exclude_unset, |
| exclude_defaults=exclude_defaults, |
| exclude_none=exclude_none, |
| ) |
| yield dict_key, v |
|
|
| def _calculate_keys( |
| self, |
| include: Optional['MappingIntStrAny'], |
| exclude: Optional['MappingIntStrAny'], |
| exclude_unset: bool, |
| update: Optional['DictStrAny'] = None, |
| ) -> Optional[AbstractSet[str]]: |
| if include is None and exclude is None and exclude_unset is False: |
| return None |
|
|
| keys: AbstractSet[str] |
| if exclude_unset: |
| keys = self.__fields_set__.copy() |
| else: |
| keys = self.__dict__.keys() |
|
|
| if include is not None: |
| keys &= include.keys() |
|
|
| if update: |
| keys -= update.keys() |
|
|
| if exclude: |
| keys -= {k for k, v in exclude.items() if ValueItems.is_true(v)} |
|
|
| return keys |
|
|
| def __eq__(self, other: Any) -> bool: |
| if isinstance(other, BaseModel): |
| return self.dict() == other.dict() |
| else: |
| return self.dict() == other |
|
|
| def __repr_args__(self) -> 'ReprArgs': |
| return [ |
| (k, v) |
| for k, v in self.__dict__.items() |
| if k not in DUNDER_ATTRIBUTES and (k not in self.__fields__ or self.__fields__[k].field_info.repr) |
| ] |
|
|
|
|
| _is_base_model_class_defined = True |
|
|
|
|
| @overload |
| def create_model( |
| __model_name: str, |
| *, |
| __config__: Optional[Type[BaseConfig]] = None, |
| __base__: None = None, |
| __module__: str = __name__, |
| __validators__: Dict[str, 'AnyClassMethod'] = None, |
| __cls_kwargs__: Dict[str, Any] = None, |
| **field_definitions: Any, |
| ) -> Type['BaseModel']: |
| ... |
|
|
|
|
| @overload |
| def create_model( |
| __model_name: str, |
| *, |
| __config__: Optional[Type[BaseConfig]] = None, |
| __base__: Union[Type['Model'], Tuple[Type['Model'], ...]], |
| __module__: str = __name__, |
| __validators__: Dict[str, 'AnyClassMethod'] = None, |
| __cls_kwargs__: Dict[str, Any] = None, |
| **field_definitions: Any, |
| ) -> Type['Model']: |
| ... |
|
|
|
|
| def create_model( |
| __model_name: str, |
| *, |
| __config__: Optional[Type[BaseConfig]] = None, |
| __base__: Union[None, Type['Model'], Tuple[Type['Model'], ...]] = None, |
| __module__: str = __name__, |
| __validators__: Dict[str, 'AnyClassMethod'] = None, |
| __cls_kwargs__: Dict[str, Any] = None, |
| __slots__: Optional[Tuple[str, ...]] = None, |
| **field_definitions: Any, |
| ) -> Type['Model']: |
| """ |
| Dynamically create a model. |
| :param __model_name: name of the created model |
| :param __config__: config class to use for the new model |
| :param __base__: base class for the new model to inherit from |
| :param __module__: module of the created model |
| :param __validators__: a dict of method names and @validator class methods |
| :param __cls_kwargs__: a dict for class creation |
| :param __slots__: Deprecated, `__slots__` should not be passed to `create_model` |
| :param field_definitions: fields of the model (or extra fields if a base is supplied) |
| in the format `<name>=(<type>, <default default>)` or `<name>=<default value>, e.g. |
| `foobar=(str, ...)` or `foobar=123`, or, for complex use-cases, in the format |
| `<name>=<Field>` or `<name>=(<type>, <FieldInfo>)`, e.g. |
| `foo=Field(datetime, default_factory=datetime.utcnow, alias='bar')` or |
| `foo=(str, FieldInfo(title='Foo'))` |
| """ |
| if __slots__ is not None: |
| |
| warnings.warn('__slots__ should not be passed to create_model', RuntimeWarning) |
|
|
| if __base__ is not None: |
| if __config__ is not None: |
| raise ConfigError('to avoid confusion __config__ and __base__ cannot be used together') |
| if not isinstance(__base__, tuple): |
| __base__ = (__base__,) |
| else: |
| __base__ = (cast(Type['Model'], BaseModel),) |
|
|
| __cls_kwargs__ = __cls_kwargs__ or {} |
|
|
| fields = {} |
| annotations = {} |
|
|
| for f_name, f_def in field_definitions.items(): |
| if not is_valid_field(f_name): |
| warnings.warn(f'fields may not start with an underscore, ignoring "{f_name}"', RuntimeWarning) |
| if isinstance(f_def, tuple): |
| try: |
| f_annotation, f_value = f_def |
| except ValueError as e: |
| raise ConfigError( |
| 'field definitions should either be a tuple of (<type>, <default>) or just a ' |
| 'default value, unfortunately this means tuples as ' |
| 'default values are not allowed' |
| ) from e |
| else: |
| f_annotation, f_value = None, f_def |
|
|
| if f_annotation: |
| annotations[f_name] = f_annotation |
| fields[f_name] = f_value |
|
|
| namespace: 'DictStrAny' = {'__annotations__': annotations, '__module__': __module__} |
| if __validators__: |
| namespace.update(__validators__) |
| namespace.update(fields) |
| if __config__: |
| namespace['Config'] = inherit_config(__config__, BaseConfig) |
| resolved_bases = resolve_bases(__base__) |
| meta, ns, kwds = prepare_class(__model_name, resolved_bases, kwds=__cls_kwargs__) |
| if resolved_bases is not __base__: |
| ns['__orig_bases__'] = __base__ |
| namespace.update(ns) |
| return meta(__model_name, resolved_bases, namespace, **kwds) |
|
|
|
|
| _missing = object() |
|
|
|
|
| def validate_model( |
| model: Type[BaseModel], input_data: 'DictStrAny', cls: 'ModelOrDc' = None |
| ) -> Tuple['DictStrAny', 'SetStr', Optional[ValidationError]]: |
| """ |
| validate data against a model. |
| """ |
| values = {} |
| errors = [] |
| |
| names_used = set() |
| |
| fields_set = set() |
| config = model.__config__ |
| check_extra = config.extra is not Extra.ignore |
| cls_ = cls or model |
|
|
| for validator in model.__pre_root_validators__: |
| try: |
| input_data = validator(cls_, input_data) |
| except (ValueError, TypeError, AssertionError) as exc: |
| return {}, set(), ValidationError([ErrorWrapper(exc, loc=ROOT_KEY)], cls_) |
|
|
| for name, field in model.__fields__.items(): |
| value = input_data.get(field.alias, _missing) |
| using_name = False |
| if value is _missing and config.allow_population_by_field_name and field.alt_alias: |
| value = input_data.get(field.name, _missing) |
| using_name = True |
|
|
| if value is _missing: |
| if field.required: |
| errors.append(ErrorWrapper(MissingError(), loc=field.alias)) |
| continue |
|
|
| value = field.get_default() |
|
|
| if not config.validate_all and not field.validate_always: |
| values[name] = value |
| continue |
| else: |
| fields_set.add(name) |
| if check_extra: |
| names_used.add(field.name if using_name else field.alias) |
|
|
| v_, errors_ = field.validate(value, values, loc=field.alias, cls=cls_) |
| if isinstance(errors_, ErrorWrapper): |
| errors.append(errors_) |
| elif isinstance(errors_, list): |
| errors.extend(errors_) |
| else: |
| values[name] = v_ |
|
|
| if check_extra: |
| if isinstance(input_data, GetterDict): |
| extra = input_data.extra_keys() - names_used |
| else: |
| extra = input_data.keys() - names_used |
| if extra: |
| fields_set |= extra |
| if config.extra is Extra.allow: |
| for f in extra: |
| values[f] = input_data[f] |
| else: |
| for f in sorted(extra): |
| errors.append(ErrorWrapper(ExtraError(), loc=f)) |
|
|
| for skip_on_failure, validator in model.__post_root_validators__: |
| if skip_on_failure and errors: |
| continue |
| try: |
| values = validator(cls_, values) |
| except (ValueError, TypeError, AssertionError) as exc: |
| errors.append(ErrorWrapper(exc, loc=ROOT_KEY)) |
|
|
| if errors: |
| return values, fields_set, ValidationError(errors, cls_) |
| else: |
| return values, fields_set, None |
|
|