| """Private logic related to fields (the `Field()` function and `FieldInfo` class), and arguments to `Annotated`.""" |
|
|
| from __future__ import annotations as _annotations |
|
|
| import dataclasses |
| import warnings |
| from collections.abc import Mapping |
| from functools import cache |
| from inspect import Parameter, ismethoddescriptor, signature |
| from re import Pattern |
| from typing import TYPE_CHECKING, Any, Callable, TypeVar |
|
|
| from pydantic_core import PydanticUndefined |
| from typing_extensions import TypeIs |
| from typing_inspection.introspection import AnnotationSource |
|
|
| from pydantic import PydanticDeprecatedSince211 |
| from pydantic.errors import PydanticUserError |
|
|
| from ..aliases import AliasGenerator |
| from . import _generics, _typing_extra |
| from ._config import ConfigWrapper |
| from ._docs_extraction import extract_docstrings_from_cls |
| from ._import_utils import import_cached_base_model, import_cached_field_info |
| from ._namespace_utils import NsResolver |
| from ._repr import Representation |
| from ._utils import can_be_positional, get_first_not_none |
|
|
| if TYPE_CHECKING: |
| from annotated_types import BaseMetadata |
|
|
| from ..fields import FieldInfo |
| from ..main import BaseModel |
| from ._dataclasses import PydanticDataclass, StandardDataclass |
| from ._decorators import DecoratorInfos |
|
|
|
|
| class PydanticMetadata(Representation): |
| """Base class for annotation markers like `Strict`.""" |
|
|
| __slots__ = () |
|
|
|
|
| def pydantic_general_metadata(**metadata: Any) -> BaseMetadata: |
| """Create a new `_PydanticGeneralMetadata` class with the given metadata. |
| |
| Args: |
| **metadata: The metadata to add. |
| |
| Returns: |
| The new `_PydanticGeneralMetadata` class. |
| """ |
| return _general_metadata_cls()(metadata) |
|
|
|
|
| @cache |
| def _general_metadata_cls() -> type[BaseMetadata]: |
| """Do it this way to avoid importing `annotated_types` at import time.""" |
| from annotated_types import BaseMetadata |
|
|
| class _PydanticGeneralMetadata(PydanticMetadata, BaseMetadata): |
| """Pydantic general metadata like `max_digits`.""" |
|
|
| def __init__(self, metadata: Any): |
| self.__dict__ = metadata |
|
|
| return _PydanticGeneralMetadata |
|
|
|
|
| def _check_protected_namespaces( |
| protected_namespaces: tuple[str | Pattern[str], ...], |
| ann_name: str, |
| bases: tuple[type[Any], ...], |
| cls_name: str, |
| ) -> None: |
| BaseModel = import_cached_base_model() |
|
|
| for protected_namespace in protected_namespaces: |
| ns_violation = False |
| if isinstance(protected_namespace, Pattern): |
| ns_violation = protected_namespace.match(ann_name) is not None |
| elif isinstance(protected_namespace, str): |
| ns_violation = ann_name.startswith(protected_namespace) |
|
|
| if ns_violation: |
| for b in bases: |
| if hasattr(b, ann_name): |
| if not (issubclass(b, BaseModel) and ann_name in getattr(b, '__pydantic_fields__', {})): |
| raise ValueError( |
| f'Field {ann_name!r} conflicts with member {getattr(b, ann_name)}' |
| f' of protected namespace {protected_namespace!r}.' |
| ) |
| else: |
| valid_namespaces: list[str] = [] |
| for pn in protected_namespaces: |
| if isinstance(pn, Pattern): |
| if not pn.match(ann_name): |
| valid_namespaces.append(f're.compile({pn.pattern!r})') |
| else: |
| if not ann_name.startswith(pn): |
| valid_namespaces.append(f"'{pn}'") |
|
|
| valid_namespaces_str = f'({", ".join(valid_namespaces)}{",)" if len(valid_namespaces) == 1 else ")"}' |
|
|
| warnings.warn( |
| f'Field {ann_name!r} in {cls_name!r} conflicts with protected namespace {protected_namespace!r}.\n\n' |
| f"You may be able to solve this by setting the 'protected_namespaces' configuration to {valid_namespaces_str}.", |
| UserWarning, |
| stacklevel=5, |
| ) |
|
|
|
|
| def _update_fields_from_docstrings(cls: type[Any], fields: dict[str, FieldInfo], use_inspect: bool = False) -> None: |
| fields_docs = extract_docstrings_from_cls(cls, use_inspect=use_inspect) |
| for ann_name, field_info in fields.items(): |
| if field_info.description is None and ann_name in fields_docs: |
| field_info.description = fields_docs[ann_name] |
|
|
|
|
| def _apply_field_title_generator_to_field_info( |
| title_generator: Callable[[str, FieldInfo], str], |
| field_name: str, |
| field_info: FieldInfo, |
| ): |
| if field_info.title is None: |
| title = title_generator(field_name, field_info) |
| if not isinstance(title, str): |
| raise TypeError(f'field_title_generator {title_generator} must return str, not {title.__class__}') |
|
|
| field_info.title = title |
|
|
|
|
| def _apply_alias_generator_to_field_info( |
| alias_generator: Callable[[str], str] | AliasGenerator, field_name: str, field_info: FieldInfo |
| ): |
| """Apply an alias generator to aliases on a `FieldInfo` instance if appropriate. |
| |
| Args: |
| alias_generator: A callable that takes a string and returns a string, or an `AliasGenerator` instance. |
| field_name: The name of the field from which to generate the alias. |
| field_info: The `FieldInfo` instance to which the alias generator is (maybe) applied. |
| """ |
| |
| |
| |
| if ( |
| field_info.alias_priority is None |
| or field_info.alias_priority <= 1 |
| or field_info.alias is None |
| or field_info.validation_alias is None |
| or field_info.serialization_alias is None |
| ): |
| alias, validation_alias, serialization_alias = None, None, None |
|
|
| if isinstance(alias_generator, AliasGenerator): |
| alias, validation_alias, serialization_alias = alias_generator.generate_aliases(field_name) |
| elif callable(alias_generator): |
| alias = alias_generator(field_name) |
| if not isinstance(alias, str): |
| raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}') |
|
|
| |
| |
| |
| if field_info.alias_priority is None or field_info.alias_priority <= 1: |
| field_info.alias_priority = 1 |
|
|
| |
| if field_info.alias_priority == 1: |
| field_info.serialization_alias = get_first_not_none(serialization_alias, alias) |
| field_info.validation_alias = get_first_not_none(validation_alias, alias) |
| field_info.alias = alias |
|
|
| |
| if field_info.alias is None: |
| field_info.alias = alias |
| if field_info.serialization_alias is None: |
| field_info.serialization_alias = get_first_not_none(serialization_alias, alias) |
| if field_info.validation_alias is None: |
| field_info.validation_alias = get_first_not_none(validation_alias, alias) |
|
|
|
|
| def update_field_from_config(config_wrapper: ConfigWrapper, field_name: str, field_info: FieldInfo) -> None: |
| """Update the `FieldInfo` instance from the configuration set on the model it belongs to. |
| |
| This will apply the title and alias generators from the configuration. |
| |
| Args: |
| config_wrapper: The configuration from the model. |
| field_name: The field name the `FieldInfo` instance is attached to. |
| field_info: The `FieldInfo` instance to update. |
| """ |
| field_title_generator = field_info.field_title_generator or config_wrapper.field_title_generator |
| if field_title_generator is not None: |
| _apply_field_title_generator_to_field_info(field_title_generator, field_name, field_info) |
| if config_wrapper.alias_generator is not None: |
| _apply_alias_generator_to_field_info(config_wrapper.alias_generator, field_name, field_info) |
|
|
|
|
| _deprecated_method_names = {'dict', 'json', 'copy', '_iter', '_copy_and_set_values', '_calculate_keys'} |
|
|
| _deprecated_classmethod_names = { |
| 'parse_obj', |
| 'parse_raw', |
| 'parse_file', |
| 'from_orm', |
| 'construct', |
| 'schema', |
| 'schema_json', |
| 'validate', |
| 'update_forward_refs', |
| '_get_value', |
| } |
|
|
|
|
| def collect_model_fields( |
| cls: type[BaseModel], |
| config_wrapper: ConfigWrapper, |
| ns_resolver: NsResolver | None, |
| *, |
| typevars_map: Mapping[TypeVar, Any] | None = None, |
| ) -> tuple[dict[str, FieldInfo], set[str]]: |
| """Collect the fields and class variables names of a nascent Pydantic model. |
| |
| The fields collection process is *lenient*, meaning it won't error if string annotations |
| fail to evaluate. If this happens, the original annotation (and assigned value, if any) |
| is stored on the created `FieldInfo` instance. |
| |
| The `rebuild_model_fields()` should be called at a later point (e.g. when rebuilding the model), |
| and will make use of these stored attributes. |
| |
| Args: |
| cls: BaseModel or dataclass. |
| config_wrapper: The config wrapper instance. |
| ns_resolver: Namespace resolver to use when getting model annotations. |
| typevars_map: A dictionary mapping type variables to their concrete types. |
| |
| Returns: |
| A two-tuple containing model fields and class variables names. |
| |
| Raises: |
| NameError: |
| - If there is a conflict between a field name and protected namespaces. |
| - If there is a field other than `root` in `RootModel`. |
| - If a field shadows an attribute in the parent model. |
| """ |
| FieldInfo_ = import_cached_field_info() |
| BaseModel_ = import_cached_base_model() |
|
|
| bases = cls.__bases__ |
| parent_fields_lookup: dict[str, FieldInfo] = {} |
| for base in reversed(bases): |
| if model_fields := getattr(base, '__pydantic_fields__', None): |
| parent_fields_lookup.update(model_fields) |
|
|
| type_hints = _typing_extra.get_model_type_hints(cls, ns_resolver=ns_resolver) |
|
|
| |
| |
| annotations = _typing_extra.safe_get_annotations(cls) |
|
|
| fields: dict[str, FieldInfo] = {} |
|
|
| class_vars: set[str] = set() |
| for ann_name, (ann_type, evaluated) in type_hints.items(): |
| if ann_name == 'model_config': |
| |
| |
| |
| continue |
|
|
| _check_protected_namespaces( |
| protected_namespaces=config_wrapper.protected_namespaces, |
| ann_name=ann_name, |
| bases=bases, |
| cls_name=cls.__name__, |
| ) |
|
|
| if _typing_extra.is_classvar_annotation(ann_type): |
| class_vars.add(ann_name) |
| continue |
|
|
| assigned_value = getattr(cls, ann_name, PydanticUndefined) |
| if assigned_value is not PydanticUndefined and ( |
| |
| any(getattr(BaseModel_, depr_name, None) is assigned_value for depr_name in _deprecated_method_names) |
| |
| or ( |
| hasattr(assigned_value, '__func__') |
| and any( |
| getattr(getattr(BaseModel_, depr_name, None), '__func__', None) is assigned_value.__func__ |
| for depr_name in _deprecated_classmethod_names |
| ) |
| ) |
| ): |
| |
| assigned_value = PydanticUndefined |
|
|
| if not is_valid_field_name(ann_name): |
| continue |
| if cls.__pydantic_root_model__ and ann_name != 'root': |
| raise NameError( |
| f"Unexpected field with name {ann_name!r}; only 'root' is allowed as a field of a `RootModel`" |
| ) |
|
|
| |
| |
| generic_origin = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin') |
| for base in bases: |
| dataclass_fields = { |
| field.name for field in (dataclasses.fields(base) if dataclasses.is_dataclass(base) else ()) |
| } |
| if hasattr(base, ann_name): |
| if base is generic_origin: |
| |
| continue |
|
|
| if ann_name in dataclass_fields: |
| |
| |
| continue |
|
|
| if ann_name not in annotations: |
| |
| continue |
|
|
| warnings.warn( |
| f'Field name "{ann_name}" in "{cls.__qualname__}" shadows an attribute in parent ' |
| f'"{base.__qualname__}"', |
| UserWarning, |
| stacklevel=4, |
| ) |
|
|
| if assigned_value is PydanticUndefined: |
| if ann_name in annotations or ann_name not in parent_fields_lookup: |
| |
| |
| |
| |
| |
| field_info = FieldInfo_.from_annotation(ann_type, _source=AnnotationSource.CLASS) |
| if not evaluated: |
| field_info._complete = False |
| |
| |
| field_info._original_annotation = ann_type |
| else: |
| |
| |
| field_info = parent_fields_lookup[ann_name]._copy() |
|
|
| else: |
| if isinstance(assigned_value, FieldInfo_) and ismethoddescriptor(assigned_value.default): |
| |
| |
| |
| |
| |
| default = assigned_value.default.__get__(None, cls) |
| assigned_value.default = default |
| assigned_value._attributes_set['default'] = default |
|
|
| field_info = FieldInfo_.from_annotated_attribute(ann_type, assigned_value, _source=AnnotationSource.CLASS) |
| |
| |
| |
| field_info._original_assignment = assigned_value |
| if not evaluated: |
| field_info._complete = False |
| field_info._original_annotation = ann_type |
| elif 'final' in field_info._qualifiers and not field_info.is_required(): |
| warnings.warn( |
| f'Annotation {ann_name!r} is marked as final and has a default value. Pydantic treats {ann_name!r} as a ' |
| 'class variable, but it will be considered as a normal field in V3 to be aligned with dataclasses. If you ' |
| f'still want {ann_name!r} to be considered as a class variable, annotate it as: `ClassVar[<type>] = <default>.`', |
| category=PydanticDeprecatedSince211, |
| |
| stacklevel=4, |
| ) |
| class_vars.add(ann_name) |
| continue |
|
|
| |
| |
| |
| try: |
| delattr(cls, ann_name) |
| except AttributeError: |
| pass |
|
|
| |
| |
| decorators: DecoratorInfos = cls.__dict__['__pydantic_decorators__'] |
| if ann_name in decorators.computed_fields: |
| raise TypeError( |
| f'Field {ann_name!r} of class {cls.__name__!r} overrides symbol of same name in a parent class. ' |
| 'This override with a computed_field is incompatible.' |
| ) |
| fields[ann_name] = field_info |
|
|
| if field_info._complete: |
| |
| update_field_from_config(config_wrapper, ann_name, field_info) |
|
|
| if typevars_map: |
| for field in fields.values(): |
| if field._complete: |
| field.apply_typevars_map(typevars_map) |
|
|
| if config_wrapper.use_attribute_docstrings: |
| _update_fields_from_docstrings(cls, fields) |
| return fields, class_vars |
|
|
|
|
| def rebuild_model_fields( |
| cls: type[BaseModel], |
| *, |
| config_wrapper: ConfigWrapper, |
| ns_resolver: NsResolver, |
| typevars_map: Mapping[TypeVar, Any], |
| ) -> dict[str, FieldInfo]: |
| """Rebuild the (already present) model fields by trying to reevaluate annotations. |
| |
| This function should be called whenever a model with incomplete fields is encountered. |
| |
| Raises: |
| NameError: If one of the annotations failed to evaluate. |
| |
| Note: |
| This function *doesn't* mutate the model fields in place, as it can be called during |
| schema generation, where you don't want to mutate other model's fields. |
| """ |
| FieldInfo_ = import_cached_field_info() |
|
|
| rebuilt_fields: dict[str, FieldInfo] = {} |
| with ns_resolver.push(cls): |
| for f_name, field_info in cls.__pydantic_fields__.items(): |
| if field_info._complete: |
| rebuilt_fields[f_name] = field_info |
| else: |
| existing_desc = field_info.description |
| ann = _typing_extra.eval_type( |
| field_info._original_annotation, |
| *ns_resolver.types_namespace, |
| ) |
| ann = _generics.replace_types(ann, typevars_map) |
|
|
| if (assign := field_info._original_assignment) is PydanticUndefined: |
| new_field = FieldInfo_.from_annotation(ann, _source=AnnotationSource.CLASS) |
| else: |
| new_field = FieldInfo_.from_annotated_attribute(ann, assign, _source=AnnotationSource.CLASS) |
| |
| new_field.description = new_field.description if new_field.description is not None else existing_desc |
| update_field_from_config(config_wrapper, f_name, new_field) |
| rebuilt_fields[f_name] = new_field |
|
|
| return rebuilt_fields |
|
|
|
|
| def collect_dataclass_fields( |
| cls: type[StandardDataclass], |
| *, |
| config_wrapper: ConfigWrapper, |
| ns_resolver: NsResolver | None = None, |
| typevars_map: dict[Any, Any] | None = None, |
| ) -> dict[str, FieldInfo]: |
| """Collect the fields of a dataclass. |
| |
| Args: |
| cls: dataclass. |
| config_wrapper: The config wrapper instance. |
| ns_resolver: Namespace resolver to use when getting dataclass annotations. |
| Defaults to an empty instance. |
| typevars_map: A dictionary mapping type variables to their concrete types. |
| |
| Returns: |
| The dataclass fields. |
| """ |
| FieldInfo_ = import_cached_field_info() |
|
|
| fields: dict[str, FieldInfo] = {} |
| ns_resolver = ns_resolver or NsResolver() |
| dataclass_fields = cls.__dataclass_fields__ |
|
|
| |
| |
| |
| for base in reversed(cls.__mro__): |
| if not dataclasses.is_dataclass(base): |
| continue |
|
|
| with ns_resolver.push(base): |
| for ann_name, dataclass_field in dataclass_fields.items(): |
| base_anns = _typing_extra.safe_get_annotations(base) |
|
|
| if ann_name not in base_anns: |
| |
| |
| continue |
|
|
| globalns, localns = ns_resolver.types_namespace |
| ann_type, evaluated = _typing_extra.try_eval_type(dataclass_field.type, globalns, localns) |
|
|
| if _typing_extra.is_classvar_annotation(ann_type): |
| continue |
|
|
| if ( |
| not dataclass_field.init |
| and dataclass_field.default is dataclasses.MISSING |
| and dataclass_field.default_factory is dataclasses.MISSING |
| ): |
| |
| |
| continue |
|
|
| if isinstance(dataclass_field.default, FieldInfo_): |
| if dataclass_field.default.init_var: |
| if dataclass_field.default.init is False: |
| raise PydanticUserError( |
| f'Dataclass field {ann_name} has init=False and init_var=True, but these are mutually exclusive.', |
| code='clashing-init-and-init-var', |
| ) |
|
|
| |
| continue |
| field_info = FieldInfo_.from_annotated_attribute( |
| ann_type, dataclass_field.default, _source=AnnotationSource.DATACLASS |
| ) |
| field_info._original_assignment = dataclass_field.default |
| else: |
| field_info = FieldInfo_.from_annotated_attribute( |
| ann_type, dataclass_field, _source=AnnotationSource.DATACLASS |
| ) |
| field_info._original_assignment = dataclass_field |
|
|
| if not evaluated: |
| field_info._complete = False |
| field_info._original_annotation = ann_type |
|
|
| fields[ann_name] = field_info |
| update_field_from_config(config_wrapper, ann_name, field_info) |
|
|
| if field_info.default is not PydanticUndefined and isinstance( |
| getattr(cls, ann_name, field_info), FieldInfo_ |
| ): |
| |
| setattr(cls, ann_name, field_info.default) |
|
|
| if typevars_map: |
| for field in fields.values(): |
| |
| |
| |
| field.apply_typevars_map(typevars_map) |
|
|
| if config_wrapper.use_attribute_docstrings: |
| _update_fields_from_docstrings( |
| cls, |
| fields, |
| |
| |
| use_inspect=not hasattr(cls, '__is_pydantic_dataclass__'), |
| ) |
|
|
| return fields |
|
|
|
|
| def rebuild_dataclass_fields( |
| cls: type[PydanticDataclass], |
| *, |
| config_wrapper: ConfigWrapper, |
| ns_resolver: NsResolver, |
| typevars_map: Mapping[TypeVar, Any], |
| ) -> dict[str, FieldInfo]: |
| """Rebuild the (already present) dataclass fields by trying to reevaluate annotations. |
| |
| This function should be called whenever a dataclass with incomplete fields is encountered. |
| |
| Raises: |
| NameError: If one of the annotations failed to evaluate. |
| |
| Note: |
| This function *doesn't* mutate the dataclass fields in place, as it can be called during |
| schema generation, where you don't want to mutate other dataclass's fields. |
| """ |
| FieldInfo_ = import_cached_field_info() |
|
|
| rebuilt_fields: dict[str, FieldInfo] = {} |
| with ns_resolver.push(cls): |
| for f_name, field_info in cls.__pydantic_fields__.items(): |
| if field_info._complete: |
| rebuilt_fields[f_name] = field_info |
| else: |
| existing_desc = field_info.description |
| ann = _typing_extra.eval_type( |
| field_info._original_annotation, |
| *ns_resolver.types_namespace, |
| ) |
| ann = _generics.replace_types(ann, typevars_map) |
| new_field = FieldInfo_.from_annotated_attribute( |
| ann, |
| field_info._original_assignment, |
| _source=AnnotationSource.DATACLASS, |
| ) |
|
|
| |
| new_field.description = new_field.description if new_field.description is not None else existing_desc |
| update_field_from_config(config_wrapper, f_name, new_field) |
| rebuilt_fields[f_name] = new_field |
|
|
| return rebuilt_fields |
|
|
|
|
| def is_valid_field_name(name: str) -> bool: |
| return not name.startswith('_') |
|
|
|
|
| def is_valid_privateattr_name(name: str) -> bool: |
| return name.startswith('_') and not name.startswith('__') |
|
|
|
|
| def takes_validated_data_argument( |
| default_factory: Callable[[], Any] | Callable[[dict[str, Any]], Any], |
| ) -> TypeIs[Callable[[dict[str, Any]], Any]]: |
| """Whether the provided default factory callable has a validated data parameter.""" |
| try: |
| sig = signature(default_factory) |
| except (ValueError, TypeError): |
| |
| |
| return False |
|
|
| parameters = list(sig.parameters.values()) |
|
|
| return len(parameters) == 1 and can_be_positional(parameters[0]) and parameters[0].default is Parameter.empty |
|
|