| """!!! abstract "Usage Documentation" |
| [JSON Schema](../concepts/json_schema.md) |
| |
| The `json_schema` module contains classes and functions to allow the way [JSON Schema](https://json-schema.org/) |
| is generated to be customized. |
| |
| In general you shouldn't need to use this module directly; instead, you can use |
| [`BaseModel.model_json_schema`][pydantic.BaseModel.model_json_schema] and |
| [`TypeAdapter.json_schema`][pydantic.TypeAdapter.json_schema]. |
| """ |
|
|
| from __future__ import annotations as _annotations |
|
|
| import dataclasses |
| import inspect |
| import math |
| import os |
| import re |
| import warnings |
| from collections import Counter, defaultdict |
| from collections.abc import Hashable, Iterable, Sequence |
| from copy import deepcopy |
| from enum import Enum |
| from re import Pattern |
| from typing import ( |
| TYPE_CHECKING, |
| Annotated, |
| Any, |
| Callable, |
| Literal, |
| NewType, |
| TypeVar, |
| Union, |
| cast, |
| overload, |
| ) |
|
|
| import pydantic_core |
| from pydantic_core import MISSING, CoreSchema, PydanticOmit, core_schema, to_jsonable_python |
| from pydantic_core.core_schema import ComputedField |
| from typing_extensions import TypeAlias, assert_never, deprecated, final |
| from typing_inspection.introspection import get_literal_values |
|
|
| from pydantic.warnings import PydanticDeprecatedSince26, PydanticDeprecatedSince29 |
|
|
| from ._internal import ( |
| _config, |
| _core_metadata, |
| _core_utils, |
| _decorators, |
| _internal_dataclass, |
| _mock_val_ser, |
| _schema_generation_shared, |
| ) |
| from .annotated_handlers import GetJsonSchemaHandler |
| from .config import JsonDict, JsonValue |
| from .errors import PydanticInvalidForJsonSchema, PydanticSchemaGenerationError, PydanticUserError |
|
|
| if TYPE_CHECKING: |
| from . import ConfigDict |
| from ._internal._core_utils import CoreSchemaField, CoreSchemaOrField |
| from ._internal._dataclasses import PydanticDataclass |
| from ._internal._schema_generation_shared import GetJsonSchemaFunction |
| from .main import BaseModel |
|
|
|
|
| CoreSchemaOrFieldType = Literal[core_schema.CoreSchemaType, core_schema.CoreSchemaFieldType] |
| """ |
| A type alias for defined schema types that represents a union of |
| `core_schema.CoreSchemaType` and |
| `core_schema.CoreSchemaFieldType`. |
| """ |
|
|
| JsonSchemaValue = dict[str, Any] |
| """ |
| A type alias for a JSON schema value. This is a dictionary of string keys to arbitrary JSON values. |
| """ |
|
|
| JsonSchemaMode = Literal['validation', 'serialization'] |
| """ |
| A type alias that represents the mode of a JSON schema; either 'validation' or 'serialization'. |
| |
| For some types, the inputs to validation differ from the outputs of serialization. For example, |
| computed fields will only be present when serializing, and should not be provided when |
| validating. This flag provides a way to indicate whether you want the JSON schema required |
| for validation inputs, or that will be matched by serialization outputs. |
| """ |
|
|
| _MODE_TITLE_MAPPING: dict[JsonSchemaMode, str] = {'validation': 'Input', 'serialization': 'Output'} |
|
|
|
|
| JsonSchemaWarningKind = Literal['skipped-choice', 'non-serializable-default', 'skipped-discriminator'] |
| """ |
| A type alias representing the kinds of warnings that can be emitted during JSON schema generation. |
| |
| See [`GenerateJsonSchema.render_warning_message`][pydantic.json_schema.GenerateJsonSchema.render_warning_message] |
| for more details. |
| """ |
|
|
|
|
| class PydanticJsonSchemaWarning(UserWarning): |
| """This class is used to emit warnings produced during JSON schema generation. |
| See the [`GenerateJsonSchema.emit_warning`][pydantic.json_schema.GenerateJsonSchema.emit_warning] and |
| [`GenerateJsonSchema.render_warning_message`][pydantic.json_schema.GenerateJsonSchema.render_warning_message] |
| methods for more details; these can be overridden to control warning behavior. |
| """ |
|
|
|
|
| NoDefault = object() |
| """A sentinel value used to indicate that no default value should be used when generating a JSON Schema |
| for a core schema with a default value. |
| """ |
|
|
|
|
| |
| DEFAULT_REF_TEMPLATE = '#/$defs/{model}' |
| """The default format string used to generate reference names.""" |
|
|
| |
| |
| |
| CoreRef = NewType('CoreRef', str) |
| |
| |
| |
| DefsRef = NewType('DefsRef', str) |
| |
| |
| JsonRef = NewType('JsonRef', str) |
|
|
| CoreModeRef = tuple[CoreRef, JsonSchemaMode] |
| JsonSchemaKeyT = TypeVar('JsonSchemaKeyT', bound=Hashable) |
|
|
| _PRIMITIVE_JSON_SCHEMA_TYPES = ('string', 'boolean', 'null', 'integer', 'number') |
|
|
|
|
| @dataclasses.dataclass(**_internal_dataclass.slots_true) |
| class _DefinitionsRemapping: |
| defs_remapping: dict[DefsRef, DefsRef] |
| json_remapping: dict[JsonRef, JsonRef] |
|
|
| @staticmethod |
| def from_prioritized_choices( |
| prioritized_choices: dict[DefsRef, list[DefsRef]], |
| defs_to_json: dict[DefsRef, JsonRef], |
| definitions: dict[DefsRef, JsonSchemaValue], |
| ) -> _DefinitionsRemapping: |
| """ |
| This function should produce a remapping that replaces complex DefsRef with the simpler ones from the |
| prioritized_choices such that applying the name remapping would result in an equivalent JSON schema. |
| """ |
| |
| |
| |
| |
| copied_definitions = deepcopy(definitions) |
| definitions_schema = {'$defs': copied_definitions} |
| for _iter in range(100): |
| |
| schemas_for_alternatives: dict[DefsRef, list[JsonSchemaValue]] = defaultdict(list) |
| for defs_ref in copied_definitions: |
| alternatives = prioritized_choices[defs_ref] |
| for alternative in alternatives: |
| schemas_for_alternatives[alternative].append(copied_definitions[defs_ref]) |
|
|
| |
| |
| for defs_ref in schemas_for_alternatives: |
| schemas_for_alternatives[defs_ref] = _deduplicate_schemas(schemas_for_alternatives[defs_ref]) |
|
|
| |
| defs_remapping: dict[DefsRef, DefsRef] = {} |
| json_remapping: dict[JsonRef, JsonRef] = {} |
| for original_defs_ref in definitions: |
| alternatives = prioritized_choices[original_defs_ref] |
| |
| remapped_defs_ref = next(x for x in alternatives if len(schemas_for_alternatives[x]) == 1) |
| defs_remapping[original_defs_ref] = remapped_defs_ref |
| json_remapping[defs_to_json[original_defs_ref]] = defs_to_json[remapped_defs_ref] |
| remapping = _DefinitionsRemapping(defs_remapping, json_remapping) |
| new_definitions_schema = remapping.remap_json_schema({'$defs': copied_definitions}) |
| if definitions_schema == new_definitions_schema: |
| |
| return remapping |
| definitions_schema = new_definitions_schema |
|
|
| raise PydanticInvalidForJsonSchema('Failed to simplify the JSON schema definitions') |
|
|
| def remap_defs_ref(self, ref: DefsRef) -> DefsRef: |
| return self.defs_remapping.get(ref, ref) |
|
|
| def remap_json_ref(self, ref: JsonRef) -> JsonRef: |
| return self.json_remapping.get(ref, ref) |
|
|
| def remap_json_schema(self, schema: Any) -> Any: |
| """ |
| Recursively update the JSON schema replacing all $refs |
| """ |
| if isinstance(schema, str): |
| |
| return self.remap_json_ref(JsonRef(schema)) |
| elif isinstance(schema, list): |
| return [self.remap_json_schema(item) for item in schema] |
| elif isinstance(schema, dict): |
| for key, value in schema.items(): |
| if key == '$ref' and isinstance(value, str): |
| schema['$ref'] = self.remap_json_ref(JsonRef(value)) |
| elif key == '$defs': |
| schema['$defs'] = { |
| self.remap_defs_ref(DefsRef(key)): self.remap_json_schema(value) |
| for key, value in schema['$defs'].items() |
| } |
| else: |
| schema[key] = self.remap_json_schema(value) |
| return schema |
|
|
|
|
| class GenerateJsonSchema: |
| """!!! abstract "Usage Documentation" |
| [Customizing the JSON Schema Generation Process](../concepts/json_schema.md#customizing-the-json-schema-generation-process) |
| |
| A class for generating JSON schemas. |
| |
| This class generates JSON schemas based on configured parameters. The default schema dialect |
| is [https://json-schema.org/draft/2020-12/schema](https://json-schema.org/draft/2020-12/schema). |
| The class uses `by_alias` to configure how fields with |
| multiple names are handled and `ref_template` to format reference names. |
| |
| Attributes: |
| schema_dialect: The JSON schema dialect used to generate the schema. See |
| [Declaring a Dialect](https://json-schema.org/understanding-json-schema/reference/schema.html#id4) |
| in the JSON Schema documentation for more information about dialects. |
| ignored_warning_kinds: Warnings to ignore when generating the schema. `self.render_warning_message` will |
| do nothing if its argument `kind` is in `ignored_warning_kinds`; |
| this value can be modified on subclasses to easily control which warnings are emitted. |
| by_alias: Whether to use field aliases when generating the schema. |
| ref_template: The format string used when generating reference names. |
| core_to_json_refs: A mapping of core refs to JSON refs. |
| core_to_defs_refs: A mapping of core refs to definition refs. |
| defs_to_core_refs: A mapping of definition refs to core refs. |
| json_to_defs_refs: A mapping of JSON refs to definition refs. |
| definitions: Definitions in the schema. |
| |
| Args: |
| by_alias: Whether to use field aliases in the generated schemas. |
| ref_template: The format string to use when generating reference names. |
| union_format: The format to use when combining schemas from unions together. Can be one of: |
| |
| - `'any_of'`: Use the [`anyOf`](https://json-schema.org/understanding-json-schema/reference/combining#anyOf) |
| keyword to combine schemas (the default). |
| - `'primitive_type_array'`: Use the [`type`](https://json-schema.org/understanding-json-schema/reference/type) |
| keyword as an array of strings, containing each type of the combination. If any of the schemas is not a primitive |
| type (`string`, `boolean`, `null`, `integer` or `number`) or contains constraints/metadata, falls back to |
| `any_of`. |
| |
| Raises: |
| JsonSchemaError: If the instance of the class is inadvertently reused after generating a schema. |
| """ |
|
|
| schema_dialect = 'https://json-schema.org/draft/2020-12/schema' |
|
|
| |
| |
| ignored_warning_kinds: set[JsonSchemaWarningKind] = {'skipped-choice'} |
|
|
| def __init__( |
| self, |
| by_alias: bool = True, |
| ref_template: str = DEFAULT_REF_TEMPLATE, |
| union_format: Literal['any_of', 'primitive_type_array'] = 'any_of', |
| ) -> None: |
| self.by_alias = by_alias |
| self.ref_template = ref_template |
| self.union_format: Literal['any_of', 'primitive_type_array'] = union_format |
|
|
| self.core_to_json_refs: dict[CoreModeRef, JsonRef] = {} |
| self.core_to_defs_refs: dict[CoreModeRef, DefsRef] = {} |
| self.defs_to_core_refs: dict[DefsRef, CoreModeRef] = {} |
| self.json_to_defs_refs: dict[JsonRef, DefsRef] = {} |
|
|
| self.definitions: dict[DefsRef, JsonSchemaValue] = {} |
| self._config_wrapper_stack = _config.ConfigWrapperStack(_config.ConfigWrapper({})) |
|
|
| self._mode: JsonSchemaMode = 'validation' |
|
|
| |
| |
| |
| |
| self._prioritized_defsref_choices: dict[DefsRef, list[DefsRef]] = {} |
| self._collision_counter: dict[str, int] = defaultdict(int) |
| self._collision_index: dict[str, int] = {} |
|
|
| self._schema_type_to_method = self.build_schema_type_to_method() |
|
|
| |
| |
| |
| |
| |
| |
| self._core_defs_invalid_for_json_schema: dict[DefsRef, PydanticInvalidForJsonSchema] = {} |
|
|
| |
| |
| self._used = False |
|
|
| @property |
| def _config(self) -> _config.ConfigWrapper: |
| return self._config_wrapper_stack.tail |
|
|
| @property |
| def mode(self) -> JsonSchemaMode: |
| if self._config.json_schema_mode_override is not None: |
| return self._config.json_schema_mode_override |
| else: |
| return self._mode |
|
|
| def build_schema_type_to_method( |
| self, |
| ) -> dict[CoreSchemaOrFieldType, Callable[[CoreSchemaOrField], JsonSchemaValue]]: |
| """Builds a dictionary mapping fields to methods for generating JSON schemas. |
| |
| Returns: |
| A dictionary containing the mapping of `CoreSchemaOrFieldType` to a handler method. |
| |
| Raises: |
| TypeError: If no method has been defined for generating a JSON schema for a given pydantic core schema type. |
| """ |
| mapping: dict[CoreSchemaOrFieldType, Callable[[CoreSchemaOrField], JsonSchemaValue]] = {} |
| core_schema_types: list[CoreSchemaOrFieldType] = list(get_literal_values(CoreSchemaOrFieldType)) |
| for key in core_schema_types: |
| method_name = f'{key.replace("-", "_")}_schema' |
| try: |
| mapping[key] = getattr(self, method_name) |
| except AttributeError as e: |
| if os.getenv('PYDANTIC_PRIVATE_ALLOW_UNHANDLED_SCHEMA_TYPES'): |
| continue |
| raise TypeError( |
| f'No method for generating JsonSchema for core_schema.type={key!r} ' |
| f'(expected: {type(self).__name__}.{method_name})' |
| ) from e |
| return mapping |
|
|
| def generate_definitions( |
| self, inputs: Sequence[tuple[JsonSchemaKeyT, JsonSchemaMode, core_schema.CoreSchema]] |
| ) -> tuple[dict[tuple[JsonSchemaKeyT, JsonSchemaMode], JsonSchemaValue], dict[DefsRef, JsonSchemaValue]]: |
| """Generates JSON schema definitions from a list of core schemas, pairing the generated definitions with a |
| mapping that links the input keys to the definition references. |
| |
| Args: |
| inputs: A sequence of tuples, where: |
| |
| - The first element is a JSON schema key type. |
| - The second element is the JSON mode: either 'validation' or 'serialization'. |
| - The third element is a core schema. |
| |
| Returns: |
| A tuple where: |
| |
| - The first element is a dictionary whose keys are tuples of JSON schema key type and JSON mode, and |
| whose values are the JSON schema corresponding to that pair of inputs. (These schemas may have |
| JsonRef references to definitions that are defined in the second returned element.) |
| - The second element is a dictionary whose keys are definition references for the JSON schemas |
| from the first returned element, and whose values are the actual JSON schema definitions. |
| |
| Raises: |
| PydanticUserError: Raised if the JSON schema generator has already been used to generate a JSON schema. |
| """ |
| if self._used: |
| raise PydanticUserError( |
| 'This JSON schema generator has already been used to generate a JSON schema. ' |
| f'You must create a new instance of {type(self).__name__} to generate a new JSON schema.', |
| code='json-schema-already-used', |
| ) |
|
|
| for _, mode, schema in inputs: |
| self._mode = mode |
| self.generate_inner(schema) |
|
|
| definitions_remapping = self._build_definitions_remapping() |
|
|
| json_schemas_map: dict[tuple[JsonSchemaKeyT, JsonSchemaMode], DefsRef] = {} |
| for key, mode, schema in inputs: |
| self._mode = mode |
| json_schema = self.generate_inner(schema) |
| json_schemas_map[(key, mode)] = definitions_remapping.remap_json_schema(json_schema) |
|
|
| json_schema = {'$defs': self.definitions} |
| json_schema = definitions_remapping.remap_json_schema(json_schema) |
| self._used = True |
| return json_schemas_map, self.sort(json_schema['$defs']) |
|
|
| def generate(self, schema: CoreSchema, mode: JsonSchemaMode = 'validation') -> JsonSchemaValue: |
| """Generates a JSON schema for a specified schema in a specified mode. |
| |
| Args: |
| schema: A Pydantic model. |
| mode: The mode in which to generate the schema. Defaults to 'validation'. |
| |
| Returns: |
| A JSON schema representing the specified schema. |
| |
| Raises: |
| PydanticUserError: If the JSON schema generator has already been used to generate a JSON schema. |
| """ |
| self._mode = mode |
| if self._used: |
| raise PydanticUserError( |
| 'This JSON schema generator has already been used to generate a JSON schema. ' |
| f'You must create a new instance of {type(self).__name__} to generate a new JSON schema.', |
| code='json-schema-already-used', |
| ) |
|
|
| json_schema: JsonSchemaValue = self.generate_inner(schema) |
| json_ref_counts = self.get_json_ref_counts(json_schema) |
|
|
| ref = cast(JsonRef, json_schema.get('$ref')) |
| while ref is not None: |
| ref_json_schema = self.get_schema_from_definitions(ref) |
| if json_ref_counts[ref] == 1 and ref_json_schema is not None and len(json_schema) == 1: |
| |
| json_schema = ref_json_schema.copy() |
| json_ref_counts[ref] -= 1 |
| ref = cast(JsonRef, json_schema.get('$ref')) |
| ref = None |
|
|
| self._garbage_collect_definitions(json_schema) |
| definitions_remapping = self._build_definitions_remapping() |
|
|
| if self.definitions: |
| json_schema['$defs'] = self.definitions |
|
|
| json_schema = definitions_remapping.remap_json_schema(json_schema) |
|
|
| |
| |
| |
|
|
| self._used = True |
| return self.sort(json_schema) |
|
|
| def generate_inner(self, schema: CoreSchemaOrField) -> JsonSchemaValue: |
| """Generates a JSON schema for a given core schema. |
| |
| Args: |
| schema: The given core schema. |
| |
| Returns: |
| The generated JSON schema. |
| |
| TODO: the nested function definitions here seem like bad practice, I'd like to unpack these |
| in a future PR. It'd be great if we could shorten the call stack a bit for JSON schema generation, |
| and I think there's potential for that here. |
| """ |
| |
| |
| |
| if 'ref' in schema: |
| core_ref = CoreRef(schema['ref']) |
| core_mode_ref = (core_ref, self.mode) |
| if core_mode_ref in self.core_to_defs_refs and self.core_to_defs_refs[core_mode_ref] in self.definitions: |
| return {'$ref': self.core_to_json_refs[core_mode_ref]} |
|
|
| def populate_defs(core_schema: CoreSchema, json_schema: JsonSchemaValue) -> JsonSchemaValue: |
| if 'ref' in core_schema: |
| core_ref = CoreRef(core_schema['ref']) |
| defs_ref, ref_json_schema = self.get_cache_defs_ref_schema(core_ref) |
| json_ref = JsonRef(ref_json_schema['$ref']) |
| |
| |
| |
| if json_schema.get('$ref', None) != json_ref: |
| self.definitions[defs_ref] = json_schema |
| self._core_defs_invalid_for_json_schema.pop(defs_ref, None) |
| json_schema = ref_json_schema |
| return json_schema |
|
|
| def handler_func(schema_or_field: CoreSchemaOrField) -> JsonSchemaValue: |
| """Generate a JSON schema based on the input schema. |
| |
| Args: |
| schema_or_field: The core schema to generate a JSON schema from. |
| |
| Returns: |
| The generated JSON schema. |
| |
| Raises: |
| TypeError: If an unexpected schema type is encountered. |
| """ |
| |
| json_schema: JsonSchemaValue | None = None |
| if self.mode == 'serialization' and 'serialization' in schema_or_field: |
| |
| |
| |
| ser_schema = schema_or_field['serialization'] |
| json_schema = self.ser_schema(ser_schema) |
|
|
| |
| |
| if ( |
| json_schema is not None |
| and ser_schema.get('when_used') in ('unless-none', 'json-unless-none') |
| and schema_or_field['type'] == 'nullable' |
| ): |
| json_schema = self.get_union_of_schemas([{'type': 'null'}, json_schema]) |
| if json_schema is None: |
| if _core_utils.is_core_schema(schema_or_field) or _core_utils.is_core_schema_field(schema_or_field): |
| generate_for_schema_type = self._schema_type_to_method[schema_or_field['type']] |
| json_schema = generate_for_schema_type(schema_or_field) |
| else: |
| raise TypeError(f'Unexpected schema type: schema={schema_or_field}') |
| return json_schema |
|
|
| current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, handler_func) |
|
|
| metadata = cast(_core_metadata.CoreMetadata, schema.get('metadata', {})) |
|
|
| |
|
|
| if js_updates := metadata.get('pydantic_js_updates'): |
|
|
| def js_updates_handler_func( |
| schema_or_field: CoreSchemaOrField, |
| current_handler: GetJsonSchemaHandler = current_handler, |
| ) -> JsonSchemaValue: |
| json_schema = {**current_handler(schema_or_field), **js_updates} |
| return json_schema |
|
|
| current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, js_updates_handler_func) |
|
|
| if js_extra := metadata.get('pydantic_js_extra'): |
|
|
| def js_extra_handler_func( |
| schema_or_field: CoreSchemaOrField, |
| current_handler: GetJsonSchemaHandler = current_handler, |
| ) -> JsonSchemaValue: |
| json_schema = current_handler(schema_or_field) |
| if isinstance(js_extra, dict): |
| json_schema.update(to_jsonable_python(js_extra)) |
| elif callable(js_extra): |
| |
| js_extra(json_schema) |
| return json_schema |
|
|
| current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, js_extra_handler_func) |
|
|
| for js_modify_function in metadata.get('pydantic_js_functions', ()): |
|
|
| def new_handler_func( |
| schema_or_field: CoreSchemaOrField, |
| current_handler: GetJsonSchemaHandler = current_handler, |
| js_modify_function: GetJsonSchemaFunction = js_modify_function, |
| ) -> JsonSchemaValue: |
| json_schema = js_modify_function(schema_or_field, current_handler) |
| if _core_utils.is_core_schema(schema_or_field): |
| json_schema = populate_defs(schema_or_field, json_schema) |
| original_schema = current_handler.resolve_ref_schema(json_schema) |
| ref = json_schema.pop('$ref', None) |
| if ref and json_schema: |
| original_schema.update(json_schema) |
| return original_schema |
|
|
| current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, new_handler_func) |
|
|
| for js_modify_function in metadata.get('pydantic_js_annotation_functions', ()): |
|
|
| def new_handler_func( |
| schema_or_field: CoreSchemaOrField, |
| current_handler: GetJsonSchemaHandler = current_handler, |
| js_modify_function: GetJsonSchemaFunction = js_modify_function, |
| ) -> JsonSchemaValue: |
| return js_modify_function(schema_or_field, current_handler) |
|
|
| current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, new_handler_func) |
|
|
| json_schema = current_handler(schema) |
| if _core_utils.is_core_schema(schema): |
| json_schema = populate_defs(schema, json_schema) |
| return json_schema |
|
|
| def sort(self, value: JsonSchemaValue, parent_key: str | None = None) -> JsonSchemaValue: |
| """Override this method to customize the sorting of the JSON schema (e.g., don't sort at all, sort all keys unconditionally, etc.) |
| |
| By default, alphabetically sort the keys in the JSON schema, skipping the 'properties' and 'default' keys to preserve field definition order. |
| This sort is recursive, so it will sort all nested dictionaries as well. |
| """ |
| sorted_dict: dict[str, JsonSchemaValue] = {} |
| keys = value.keys() |
| if parent_key not in ('properties', 'default'): |
| keys = sorted(keys) |
| for key in keys: |
| sorted_dict[key] = self._sort_recursive(value[key], parent_key=key) |
| return sorted_dict |
|
|
| def _sort_recursive(self, value: Any, parent_key: str | None = None) -> Any: |
| """Recursively sort a JSON schema value.""" |
| if isinstance(value, dict): |
| sorted_dict: dict[str, JsonSchemaValue] = {} |
| keys = value.keys() |
| if parent_key not in ('properties', 'default'): |
| keys = sorted(keys) |
| for key in keys: |
| sorted_dict[key] = self._sort_recursive(value[key], parent_key=key) |
| return sorted_dict |
| elif isinstance(value, list): |
| sorted_list: list[JsonSchemaValue] = [self._sort_recursive(item, parent_key) for item in value] |
| return sorted_list |
| else: |
| return value |
|
|
| |
|
|
| def invalid_schema(self, schema: core_schema.InvalidSchema) -> JsonSchemaValue: |
| """Placeholder - should never be called.""" |
|
|
| raise RuntimeError('Cannot generate schema for invalid_schema. This is a bug! Please report it.') |
|
|
| def any_schema(self, schema: core_schema.AnySchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches any value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {} |
|
|
| def none_schema(self, schema: core_schema.NoneSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches `None`. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {'type': 'null'} |
|
|
| def bool_schema(self, schema: core_schema.BoolSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a bool value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {'type': 'boolean'} |
|
|
| def int_schema(self, schema: core_schema.IntSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches an int value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema: dict[str, Any] = {'type': 'integer'} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.numeric) |
| json_schema = {k: v for k, v in json_schema.items() if v not in {math.inf, -math.inf}} |
| return json_schema |
|
|
| def float_schema(self, schema: core_schema.FloatSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a float value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema: dict[str, Any] = {'type': 'number'} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.numeric) |
| json_schema = {k: v for k, v in json_schema.items() if v not in {math.inf, -math.inf}} |
| return json_schema |
|
|
| def decimal_schema(self, schema: core_schema.DecimalSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a decimal value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
|
|
| def get_decimal_pattern(schema: core_schema.DecimalSchema) -> str: |
| max_digits = schema.get('max_digits') |
| decimal_places = schema.get('decimal_places') |
|
|
| pattern = ( |
| r'^(?!^[-+.]*$)[+-]?0*' |
| ) |
|
|
| |
| if max_digits is not None and decimal_places is not None: |
| integer_places = max(0, max_digits - decimal_places) |
| pattern += ( |
| rf'(?:' |
| rf'\d{{0,{integer_places}}}' |
| rf'|' |
| rf'(?=[\d.]{{1,{max_digits + 1}}}0*$)' |
| rf'\d{{0,{integer_places}}}\.\d{{0,{decimal_places}}}0*$' |
| rf')' |
| ) |
|
|
| |
| elif max_digits is not None and decimal_places is None: |
| pattern += ( |
| rf'(?:' |
| rf'\d{{0,{max_digits}}}' |
| rf'|' |
| rf'(?=[\d.]{{1,{max_digits + 1}}}0*$)' |
| rf'\d*\.\d*0*$' |
| rf')' |
| ) |
|
|
| |
| elif max_digits is None and decimal_places is not None: |
| pattern += rf'\d*\.?\d{{0,{decimal_places}}}0*$' |
|
|
| |
| else: |
| pattern += r'\d*\.?\d*$' |
|
|
| return pattern |
|
|
| json_schema = self.str_schema(core_schema.str_schema(pattern=get_decimal_pattern(schema))) |
| if self.mode == 'validation': |
| multiple_of = schema.get('multiple_of') |
| le = schema.get('le') |
| ge = schema.get('ge') |
| lt = schema.get('lt') |
| gt = schema.get('gt') |
| json_schema = { |
| 'anyOf': [ |
| self.float_schema( |
| core_schema.float_schema( |
| allow_inf_nan=schema.get('allow_inf_nan'), |
| multiple_of=None if multiple_of is None else float(multiple_of), |
| le=None if le is None else float(le), |
| ge=None if ge is None else float(ge), |
| lt=None if lt is None else float(lt), |
| gt=None if gt is None else float(gt), |
| ) |
| ), |
| json_schema, |
| ], |
| } |
| return json_schema |
|
|
| def str_schema(self, schema: core_schema.StringSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a string value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema = {'type': 'string'} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.string) |
| if isinstance(json_schema.get('pattern'), Pattern): |
| |
| json_schema['pattern'] = json_schema.get('pattern').pattern |
| return json_schema |
|
|
| def bytes_schema(self, schema: core_schema.BytesSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a bytes value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema = {'type': 'string', 'format': 'base64url' if self._config.ser_json_bytes == 'base64' else 'binary'} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.bytes) |
| return json_schema |
|
|
| def date_schema(self, schema: core_schema.DateSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a date value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {'type': 'string', 'format': 'date'} |
|
|
| def time_schema(self, schema: core_schema.TimeSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a time value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {'type': 'string', 'format': 'time'} |
|
|
| def datetime_schema(self, schema: core_schema.DatetimeSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a datetime value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {'type': 'string', 'format': 'date-time'} |
|
|
| def timedelta_schema(self, schema: core_schema.TimedeltaSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a timedelta value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| if self._config.ser_json_timedelta == 'float': |
| return {'type': 'number'} |
| return {'type': 'string', 'format': 'duration'} |
|
|
| def literal_schema(self, schema: core_schema.LiteralSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a literal value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| expected = [to_jsonable_python(v.value if isinstance(v, Enum) else v) for v in schema['expected']] |
|
|
| result: dict[str, Any] = {} |
| if len(expected) == 1: |
| result['const'] = expected[0] |
| else: |
| result['enum'] = expected |
|
|
| types = {type(e) for e in expected} |
| if types == {str}: |
| result['type'] = 'string' |
| elif types == {int}: |
| result['type'] = 'integer' |
| elif types == {float}: |
| result['type'] = 'number' |
| elif types == {bool}: |
| result['type'] = 'boolean' |
| elif types == {list}: |
| result['type'] = 'array' |
| elif types == {type(None)}: |
| result['type'] = 'null' |
| return result |
|
|
| def missing_sentinel_schema(self, schema: core_schema.MissingSentinelSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches the `MISSING` sentinel value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| raise PydanticOmit |
|
|
| def enum_schema(self, schema: core_schema.EnumSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches an Enum value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| enum_type = schema['cls'] |
| description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__) |
| if ( |
| description == 'An enumeration.' |
| ): |
| description = None |
| result: dict[str, Any] = {'title': enum_type.__name__, 'description': description} |
| result = {k: v for k, v in result.items() if v is not None} |
|
|
| expected = [to_jsonable_python(v.value) for v in schema['members']] |
|
|
| result['enum'] = expected |
|
|
| types = {type(e) for e in expected} |
| if isinstance(enum_type, str) or types == {str}: |
| result['type'] = 'string' |
| elif isinstance(enum_type, int) or types == {int}: |
| result['type'] = 'integer' |
| elif isinstance(enum_type, float) or types == {float}: |
| result['type'] = 'number' |
| elif types == {bool}: |
| result['type'] = 'boolean' |
| elif types == {list}: |
| result['type'] = 'array' |
|
|
| return result |
|
|
| def is_instance_schema(self, schema: core_schema.IsInstanceSchema) -> JsonSchemaValue: |
| """Handles JSON schema generation for a core schema that checks if a value is an instance of a class. |
| |
| Unless overridden in a subclass, this raises an error. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.handle_invalid_for_json_schema(schema, f'core_schema.IsInstanceSchema ({schema["cls"]})') |
|
|
| def is_subclass_schema(self, schema: core_schema.IsSubclassSchema) -> JsonSchemaValue: |
| """Handles JSON schema generation for a core schema that checks if a value is a subclass of a class. |
| |
| For backwards compatibility with v1, this does not raise an error, but can be overridden to change this. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| |
| return {} |
|
|
| def callable_schema(self, schema: core_schema.CallableSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a callable value. |
| |
| Unless overridden in a subclass, this raises an error. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.handle_invalid_for_json_schema(schema, 'core_schema.CallableSchema') |
|
|
| def list_schema(self, schema: core_schema.ListSchema) -> JsonSchemaValue: |
| """Returns a schema that matches a list schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema']) |
| json_schema = {'type': 'array', 'items': items_schema} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) |
| return json_schema |
|
|
| @deprecated('`tuple_positional_schema` is deprecated. Use `tuple_schema` instead.', category=None) |
| @final |
| def tuple_positional_schema(self, schema: core_schema.TupleSchema) -> JsonSchemaValue: |
| """Replaced by `tuple_schema`.""" |
| warnings.warn( |
| '`tuple_positional_schema` is deprecated. Use `tuple_schema` instead.', |
| PydanticDeprecatedSince26, |
| stacklevel=2, |
| ) |
| return self.tuple_schema(schema) |
|
|
| @deprecated('`tuple_variable_schema` is deprecated. Use `tuple_schema` instead.', category=None) |
| @final |
| def tuple_variable_schema(self, schema: core_schema.TupleSchema) -> JsonSchemaValue: |
| """Replaced by `tuple_schema`.""" |
| warnings.warn( |
| '`tuple_variable_schema` is deprecated. Use `tuple_schema` instead.', |
| PydanticDeprecatedSince26, |
| stacklevel=2, |
| ) |
| return self.tuple_schema(schema) |
|
|
| def tuple_schema(self, schema: core_schema.TupleSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a tuple schema e.g. `tuple[int, |
| str, bool]` or `tuple[int, ...]`. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema: JsonSchemaValue = {'type': 'array'} |
| if 'variadic_item_index' in schema: |
| variadic_item_index = schema['variadic_item_index'] |
| if variadic_item_index > 0: |
| json_schema['minItems'] = variadic_item_index |
| json_schema['prefixItems'] = [ |
| self.generate_inner(item) for item in schema['items_schema'][:variadic_item_index] |
| ] |
| if variadic_item_index + 1 == len(schema['items_schema']): |
| |
| json_schema['items'] = self.generate_inner(schema['items_schema'][variadic_item_index]) |
| else: |
| |
| |
| |
| json_schema['items'] = True |
| else: |
| prefixItems = [self.generate_inner(item) for item in schema['items_schema']] |
| if prefixItems: |
| json_schema['prefixItems'] = prefixItems |
| json_schema['minItems'] = len(prefixItems) |
| json_schema['maxItems'] = len(prefixItems) |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) |
| return json_schema |
|
|
| def set_schema(self, schema: core_schema.SetSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a set schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self._common_set_schema(schema) |
|
|
| def frozenset_schema(self, schema: core_schema.FrozenSetSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a frozenset schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self._common_set_schema(schema) |
|
|
| def _common_set_schema(self, schema: core_schema.SetSchema | core_schema.FrozenSetSchema) -> JsonSchemaValue: |
| items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema']) |
| json_schema = {'type': 'array', 'uniqueItems': True, 'items': items_schema} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) |
| return json_schema |
|
|
| def generator_schema(self, schema: core_schema.GeneratorSchema) -> JsonSchemaValue: |
| """Returns a JSON schema that represents the provided GeneratorSchema. |
| |
| Args: |
| schema: The schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema']) |
| json_schema = {'type': 'array', 'items': items_schema} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.array) |
| return json_schema |
|
|
| def dict_schema(self, schema: core_schema.DictSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a dict schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema: JsonSchemaValue = {'type': 'object'} |
|
|
| keys_schema = self.generate_inner(schema['keys_schema']).copy() if 'keys_schema' in schema else {} |
| if '$ref' not in keys_schema: |
| keys_pattern = keys_schema.pop('pattern', None) |
| |
| keys_schema.pop('title', None) |
| else: |
| |
| |
| |
| |
| |
| keys_pattern = None |
|
|
| values_schema = self.generate_inner(schema['values_schema']).copy() if 'values_schema' in schema else {} |
| |
| values_schema.pop('title', None) |
|
|
| if values_schema or keys_pattern is not None: |
| if keys_pattern is None: |
| json_schema['additionalProperties'] = values_schema |
| else: |
| json_schema['patternProperties'] = {keys_pattern: values_schema} |
| else: |
| json_schema['additionalProperties'] = True |
|
|
| if ( |
| |
| (keys_schema.get('type') == 'string' and len(keys_schema) > 1) |
| |
| or '$ref' in keys_schema |
| ): |
| keys_schema.pop('type', None) |
| json_schema['propertyNames'] = keys_schema |
|
|
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.object) |
| return json_schema |
|
|
| def function_before_schema(self, schema: core_schema.BeforeValidatorFunctionSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a function-before schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| if self.mode == 'validation' and (input_schema := schema.get('json_schema_input_schema')): |
| return self.generate_inner(input_schema) |
|
|
| return self.generate_inner(schema['schema']) |
|
|
| def function_after_schema(self, schema: core_schema.AfterValidatorFunctionSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a function-after schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['schema']) |
|
|
| def function_plain_schema(self, schema: core_schema.PlainValidatorFunctionSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a function-plain schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| if self.mode == 'validation' and (input_schema := schema.get('json_schema_input_schema')): |
| return self.generate_inner(input_schema) |
|
|
| return self.handle_invalid_for_json_schema( |
| schema, f'core_schema.PlainValidatorFunctionSchema ({schema["function"]})' |
| ) |
|
|
| def function_wrap_schema(self, schema: core_schema.WrapValidatorFunctionSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a function-wrap schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| if self.mode == 'validation' and (input_schema := schema.get('json_schema_input_schema')): |
| return self.generate_inner(input_schema) |
|
|
| return self.generate_inner(schema['schema']) |
|
|
| def default_schema(self, schema: core_schema.WithDefaultSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema with a default value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema = self.generate_inner(schema['schema']) |
|
|
| default = self.get_default_value(schema) |
| if default is NoDefault or default is MISSING: |
| return json_schema |
|
|
| |
| |
| |
| if self.mode == 'serialization': |
| |
| ser_schema = _get_ser_schema_for_default_value(schema['schema']) |
| if ( |
| ser_schema is not None |
| and (ser_func := ser_schema.get('function')) |
| and not (default is None and ser_schema.get('when_used') in ('unless-none', 'json-unless-none')) |
| ): |
| try: |
| default = ser_func(default) |
| except Exception: |
| |
| |
| |
| |
| |
| |
| self.emit_warning( |
| 'non-serializable-default', |
| f'Unable to serialize value {default!r} with the plain serializer; excluding default from JSON schema', |
| ) |
| return json_schema |
|
|
| try: |
| encoded_default = self.encode_default(default) |
| except pydantic_core.PydanticSerializationError: |
| self.emit_warning( |
| 'non-serializable-default', |
| f'Default value {default} is not JSON serializable; excluding default from JSON schema', |
| ) |
| |
| return json_schema |
|
|
| json_schema['default'] = encoded_default |
| return json_schema |
|
|
| def get_default_value(self, schema: core_schema.WithDefaultSchema) -> Any: |
| """Get the default value to be used when generating a JSON Schema for a core schema with a default. |
| |
| The default implementation is to use the statically defined default value. This method can be overridden |
| if you want to make use of the default factory. |
| |
| Args: |
| schema: The `'with-default'` core schema. |
| |
| Returns: |
| The default value to use, or [`NoDefault`][pydantic.json_schema.NoDefault] if no default |
| value is available. |
| """ |
| return schema.get('default', NoDefault) |
|
|
| def nullable_schema(self, schema: core_schema.NullableSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that allows null values. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| null_schema = {'type': 'null'} |
| inner_json_schema = self.generate_inner(schema['schema']) |
|
|
| if inner_json_schema == null_schema: |
| return null_schema |
| else: |
| return self.get_union_of_schemas([inner_json_schema, null_schema]) |
|
|
| def union_schema(self, schema: core_schema.UnionSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that allows values matching any of the given schemas. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| generated: list[JsonSchemaValue] = [] |
|
|
| choices = schema['choices'] |
| for choice in choices: |
| |
| choice_schema = choice[0] if isinstance(choice, tuple) else choice |
| try: |
| generated.append(self.generate_inner(choice_schema)) |
| except PydanticOmit: |
| continue |
| except PydanticInvalidForJsonSchema as exc: |
| self.emit_warning('skipped-choice', exc.message) |
| if len(generated) == 1: |
| return generated[0] |
| return self.get_union_of_schemas(generated) |
|
|
| def get_union_of_schemas(self, schemas: list[JsonSchemaValue]) -> JsonSchemaValue: |
| """Returns the JSON Schema representation for the union of the provided JSON Schemas. |
| |
| The result depends on the configured `'union_format'`. |
| |
| Args: |
| schemas: The list of JSON Schemas to be included in the union. |
| |
| Returns: |
| The JSON Schema representing the union of schemas. |
| """ |
| if self.union_format == 'primitive_type_array': |
| types: list[str] = [] |
| for schema in schemas: |
| schema_types: list[str] | str | None = schema.get('type') |
| if schema_types is None: |
| |
| break |
| if not isinstance(schema_types, list): |
| schema_types = [schema_types] |
| if not all(t in _PRIMITIVE_JSON_SCHEMA_TYPES for t in schema_types): |
| break |
| if len(schema) != 1: |
| |
| |
| |
| |
| break |
|
|
| types.extend(schema_types) |
| else: |
| |
| return {'type': list(dict.fromkeys(types))} |
|
|
| return self.get_flattened_anyof(schemas) |
|
|
| def tagged_union_schema(self, schema: core_schema.TaggedUnionSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that allows values matching any of the given schemas, where |
| the schemas are tagged with a discriminator field that indicates which schema should be used to validate |
| the value. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| generated: dict[str, JsonSchemaValue] = {} |
| for k, v in schema['choices'].items(): |
| if isinstance(k, Enum): |
| k = k.value |
| try: |
| |
| |
| generated[str(k)] = self.generate_inner(v).copy() |
| except PydanticOmit: |
| continue |
| except PydanticInvalidForJsonSchema as exc: |
| self.emit_warning('skipped-choice', exc.message) |
|
|
| one_of_choices = _deduplicate_schemas(generated.values()) |
| json_schema: JsonSchemaValue = {'oneOf': one_of_choices} |
|
|
| |
| openapi_discriminator = self._extract_discriminator(schema, one_of_choices) |
| if openapi_discriminator is not None: |
| json_schema['discriminator'] = { |
| 'propertyName': openapi_discriminator, |
| 'mapping': {k: v.get('$ref', v) for k, v in generated.items()}, |
| } |
|
|
| return json_schema |
|
|
| def _extract_discriminator( |
| self, schema: core_schema.TaggedUnionSchema, one_of_choices: list[JsonDict] |
| ) -> str | None: |
| """Extract a compatible OpenAPI discriminator from the schema and one_of choices that end up in the final |
| schema.""" |
| openapi_discriminator: str | None = None |
|
|
| if isinstance(schema['discriminator'], str): |
| return schema['discriminator'] |
|
|
| if isinstance(schema['discriminator'], list): |
| |
| if len(schema['discriminator']) == 1 and isinstance(schema['discriminator'][0], str): |
| return schema['discriminator'][0] |
| |
| |
| |
| |
| for alias_path in schema['discriminator']: |
| if not isinstance(alias_path, list): |
| break |
| if len(alias_path) != 1: |
| continue |
| alias = alias_path[0] |
| if not isinstance(alias, str): |
| continue |
| alias_is_present_on_all_choices = True |
| for choice in one_of_choices: |
| try: |
| choice = self.resolve_ref_schema(choice) |
| except RuntimeError as exc: |
| |
| |
| |
| self.emit_warning('skipped-discriminator', str(exc)) |
| choice = {} |
| properties = choice.get('properties', {}) |
| if not isinstance(properties, dict) or alias not in properties: |
| alias_is_present_on_all_choices = False |
| break |
| if alias_is_present_on_all_choices: |
| openapi_discriminator = alias |
| break |
| return openapi_discriminator |
|
|
| def chain_schema(self, schema: core_schema.ChainSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a core_schema.ChainSchema. |
| |
| When generating a schema for validation, we return the validation JSON schema for the first step in the chain. |
| For serialization, we return the serialization JSON schema for the last step in the chain. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| step_index = 0 if self.mode == 'validation' else -1 |
| return self.generate_inner(schema['steps'][step_index]) |
|
|
| def lax_or_strict_schema(self, schema: core_schema.LaxOrStrictSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that allows values matching either the lax schema or the |
| strict schema. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| |
| use_strict = schema.get('strict', False) |
| |
| |
| if use_strict: |
| return self.generate_inner(schema['strict_schema']) |
| else: |
| return self.generate_inner(schema['lax_schema']) |
|
|
| def json_or_python_schema(self, schema: core_schema.JsonOrPythonSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that allows values matching either the JSON schema or the |
| Python schema. |
| |
| The JSON schema is used instead of the Python schema. If you want to use the Python schema, you should override |
| this method. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['json_schema']) |
|
|
| def typed_dict_schema(self, schema: core_schema.TypedDictSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a typed dict. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| total = schema.get('total', True) |
| named_required_fields: list[tuple[str, bool, CoreSchemaField]] = [ |
| (name, self.field_is_required(field, total), field) |
| for name, field in schema['fields'].items() |
| if self.field_is_present(field) |
| ] |
| if self.mode == 'serialization': |
| named_required_fields.extend(self._name_required_computed_fields(schema.get('computed_fields', []))) |
| cls = schema.get('cls') |
| config = _get_typed_dict_config(cls) |
| with self._config_wrapper_stack.push(config): |
| json_schema = self._named_required_fields_schema(named_required_fields) |
|
|
| |
| |
| |
| |
| |
| |
| if schema.get('extra_behavior') == 'forbid': |
| json_schema['additionalProperties'] = False |
| elif schema.get('extra_behavior') == 'allow': |
| if 'extras_schema' in schema and schema['extras_schema'] != {'type': 'any'}: |
| json_schema['additionalProperties'] = self.generate_inner(schema['extras_schema']) |
| else: |
| json_schema['additionalProperties'] = True |
|
|
| if cls is not None: |
| |
| |
| self._update_class_schema(json_schema, cls, config) |
| elif 'additionalProperties' not in json_schema: |
| extra = schema.get('config', {}).get('extra_fields_behavior') |
| if extra == 'forbid': |
| json_schema['additionalProperties'] = False |
| elif extra == 'allow': |
| json_schema['additionalProperties'] = True |
|
|
| return json_schema |
|
|
| @staticmethod |
| def _name_required_computed_fields( |
| computed_fields: list[ComputedField], |
| ) -> list[tuple[str, bool, core_schema.ComputedField]]: |
| return [(field['property_name'], True, field) for field in computed_fields] |
|
|
| def _named_required_fields_schema( |
| self, named_required_fields: Sequence[tuple[str, bool, CoreSchemaField]] |
| ) -> JsonSchemaValue: |
| properties: dict[str, JsonSchemaValue] = {} |
| required_fields: list[str] = [] |
| for name, required, field in named_required_fields: |
| if self.by_alias: |
| name = self._get_alias_name(field, name) |
| try: |
| field_json_schema = self.generate_inner(field).copy() |
| except PydanticOmit: |
| continue |
| if 'title' not in field_json_schema and self.field_title_should_be_set(field): |
| title = self.get_title_from_name(name) |
| field_json_schema['title'] = title |
| field_json_schema = self.handle_ref_overrides(field_json_schema) |
| properties[name] = field_json_schema |
| if required: |
| required_fields.append(name) |
|
|
| json_schema = {'type': 'object', 'properties': properties} |
| if required_fields: |
| json_schema['required'] = required_fields |
| return json_schema |
|
|
| def _get_alias_name(self, field: CoreSchemaField, name: str) -> str: |
| if field['type'] == 'computed-field': |
| alias: Any = field.get('alias', name) |
| elif self.mode == 'validation': |
| alias = field.get('validation_alias', name) |
| else: |
| alias = field.get('serialization_alias', name) |
| if isinstance(alias, str): |
| name = alias |
| elif isinstance(alias, list): |
| alias = cast('list[str] | str', alias) |
| for path in alias: |
| if isinstance(path, list) and len(path) == 1 and isinstance(path[0], str): |
| |
| |
| name = path[0] |
| break |
| else: |
| assert_never(alias) |
| return name |
|
|
| def typed_dict_field_schema(self, schema: core_schema.TypedDictField) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a typed dict field. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['schema']) |
|
|
| def dataclass_field_schema(self, schema: core_schema.DataclassField) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a dataclass field. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['schema']) |
|
|
| def model_field_schema(self, schema: core_schema.ModelField) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a model field. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['schema']) |
|
|
| def computed_field_schema(self, schema: core_schema.ComputedField) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a computed field. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['return_schema']) |
|
|
| def model_schema(self, schema: core_schema.ModelSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a model. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| |
| |
| cls = cast('type[BaseModel]', schema['cls']) |
| config = cls.model_config |
|
|
| with self._config_wrapper_stack.push(config): |
| json_schema = self.generate_inner(schema['schema']) |
|
|
| self._update_class_schema(json_schema, cls, config) |
|
|
| return json_schema |
|
|
| def _update_class_schema(self, json_schema: JsonSchemaValue, cls: type[Any], config: ConfigDict) -> None: |
| """Update json_schema with the following, extracted from `config` and `cls`: |
| |
| * title |
| * description |
| * additional properties |
| * json_schema_extra |
| * deprecated |
| |
| Done in place, hence there's no return value as the original json_schema is mutated. |
| No ref resolving is involved here, as that's not appropriate for simple updates. |
| """ |
| from .main import BaseModel |
| from .root_model import RootModel |
|
|
| if (config_title := config.get('title')) is not None: |
| json_schema.setdefault('title', config_title) |
| elif model_title_generator := config.get('model_title_generator'): |
| title = model_title_generator(cls) |
| if not isinstance(title, str): |
| raise TypeError(f'model_title_generator {model_title_generator} must return str, not {title.__class__}') |
| json_schema.setdefault('title', title) |
| if 'title' not in json_schema: |
| json_schema['title'] = cls.__name__ |
|
|
| |
| docstring = None if cls is BaseModel or dataclasses.is_dataclass(cls) else cls.__doc__ |
|
|
| if docstring: |
| json_schema.setdefault('description', inspect.cleandoc(docstring)) |
| elif issubclass(cls, RootModel) and (root_description := cls.__pydantic_fields__['root'].description): |
| json_schema.setdefault('description', root_description) |
|
|
| extra = config.get('extra') |
| if 'additionalProperties' not in json_schema: |
| if extra == 'allow': |
| json_schema['additionalProperties'] = True |
| elif extra == 'forbid': |
| json_schema['additionalProperties'] = False |
|
|
| json_schema_extra = config.get('json_schema_extra') |
| if issubclass(cls, BaseModel) and cls.__pydantic_root_model__: |
| root_json_schema_extra = cls.model_fields['root'].json_schema_extra |
| if json_schema_extra and root_json_schema_extra: |
| raise ValueError( |
| '"model_config[\'json_schema_extra\']" and "Field.json_schema_extra" on "RootModel.root"' |
| ' field must not be set simultaneously' |
| ) |
| if root_json_schema_extra: |
| json_schema_extra = root_json_schema_extra |
|
|
| if isinstance(json_schema_extra, (staticmethod, classmethod)): |
| |
| json_schema_extra = json_schema_extra.__get__(cls) |
|
|
| if isinstance(json_schema_extra, dict): |
| json_schema.update(json_schema_extra) |
| elif callable(json_schema_extra): |
| |
| if len(inspect.signature(json_schema_extra).parameters) > 1: |
| json_schema_extra(json_schema, cls) |
| else: |
| json_schema_extra(json_schema) |
| elif json_schema_extra is not None: |
| raise ValueError( |
| f"model_config['json_schema_extra']={json_schema_extra} should be a dict, callable, or None" |
| ) |
|
|
| if hasattr(cls, '__deprecated__'): |
| json_schema['deprecated'] = True |
|
|
| def resolve_ref_schema(self, json_schema: JsonSchemaValue) -> JsonSchemaValue: |
| """Resolve a JsonSchemaValue to the non-ref schema if it is a $ref schema. |
| |
| Args: |
| json_schema: The schema to resolve. |
| |
| Returns: |
| The resolved schema. |
| |
| Raises: |
| RuntimeError: If the schema reference can't be found in definitions. |
| """ |
| while '$ref' in json_schema: |
| ref = json_schema['$ref'] |
| schema_to_update = self.get_schema_from_definitions(JsonRef(ref)) |
| if schema_to_update is None: |
| raise RuntimeError(f'Cannot update undefined schema for $ref={ref}') |
| json_schema = schema_to_update |
| return json_schema |
|
|
| def model_fields_schema(self, schema: core_schema.ModelFieldsSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a model's fields. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| named_required_fields: list[tuple[str, bool, CoreSchemaField]] = [ |
| (name, self.field_is_required(field, total=True), field) |
| for name, field in schema['fields'].items() |
| if self.field_is_present(field) |
| ] |
| if self.mode == 'serialization': |
| named_required_fields.extend(self._name_required_computed_fields(schema.get('computed_fields', []))) |
| json_schema = self._named_required_fields_schema(named_required_fields) |
| extras_schema = schema.get('extras_schema', None) |
| if extras_schema is not None: |
| schema_to_update = self.resolve_ref_schema(json_schema) |
| schema_to_update['additionalProperties'] = self.generate_inner(extras_schema) |
| return json_schema |
|
|
| def field_is_present(self, field: CoreSchemaField) -> bool: |
| """Whether the field should be included in the generated JSON schema. |
| |
| Args: |
| field: The schema for the field itself. |
| |
| Returns: |
| `True` if the field should be included in the generated JSON schema, `False` otherwise. |
| """ |
| if self.mode == 'serialization': |
| |
| |
| return not field.get('serialization_exclude') |
| elif self.mode == 'validation': |
| return True |
| else: |
| assert_never(self.mode) |
|
|
| def field_is_required( |
| self, |
| field: core_schema.ModelField | core_schema.DataclassField | core_schema.TypedDictField, |
| total: bool, |
| ) -> bool: |
| """Whether the field should be marked as required in the generated JSON schema. |
| (Note that this is irrelevant if the field is not present in the JSON schema.). |
| |
| Args: |
| field: The schema for the field itself. |
| total: Only applies to `TypedDictField`s. |
| Indicates if the `TypedDict` this field belongs to is total, in which case any fields that don't |
| explicitly specify `required=False` are required. |
| |
| Returns: |
| `True` if the field should be marked as required in the generated JSON schema, `False` otherwise. |
| """ |
| if field['type'] == 'typed-dict-field': |
| required = field.get('required', total) |
| else: |
| required = field['schema']['type'] != 'default' |
|
|
| if self.mode == 'serialization': |
| has_exclude_if = field.get('serialization_exclude_if') is not None |
| if self._config.json_schema_serialization_defaults_required: |
| return not has_exclude_if |
| else: |
| return required and not has_exclude_if |
| else: |
| return required |
|
|
| def dataclass_args_schema(self, schema: core_schema.DataclassArgsSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a dataclass's constructor arguments. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| named_required_fields: list[tuple[str, bool, CoreSchemaField]] = [ |
| (field['name'], self.field_is_required(field, total=True), field) |
| for field in schema['fields'] |
| if self.field_is_present(field) |
| ] |
| if self.mode == 'serialization': |
| named_required_fields.extend(self._name_required_computed_fields(schema.get('computed_fields', []))) |
| return self._named_required_fields_schema(named_required_fields) |
|
|
| def dataclass_schema(self, schema: core_schema.DataclassSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a dataclass. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| from ._internal._dataclasses import is_stdlib_dataclass |
|
|
| cls = schema['cls'] |
| config: ConfigDict = getattr(cls, '__pydantic_config__', cast('ConfigDict', {})) |
|
|
| with self._config_wrapper_stack.push(config): |
| json_schema = self.generate_inner(schema['schema']).copy() |
|
|
| self._update_class_schema(json_schema, cls, config) |
|
|
| |
| if is_stdlib_dataclass(cls): |
| |
| description = None |
| else: |
| description = None if cls.__doc__ is None else inspect.cleandoc(cls.__doc__) |
| if description: |
| json_schema['description'] = description |
|
|
| return json_schema |
|
|
| def arguments_schema(self, schema: core_schema.ArgumentsSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a function's arguments. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| prefer_positional = schema.get('metadata', {}).get('pydantic_js_prefer_positional_arguments') |
|
|
| arguments = schema['arguments_schema'] |
| kw_only_arguments = [a for a in arguments if a.get('mode') == 'keyword_only'] |
| kw_or_p_arguments = [a for a in arguments if a.get('mode') in {'positional_or_keyword', None}] |
| p_only_arguments = [a for a in arguments if a.get('mode') == 'positional_only'] |
| var_args_schema = schema.get('var_args_schema') |
| var_kwargs_schema = schema.get('var_kwargs_schema') |
|
|
| if prefer_positional: |
| positional_possible = not kw_only_arguments and not var_kwargs_schema |
| if positional_possible: |
| return self.p_arguments_schema(p_only_arguments + kw_or_p_arguments, var_args_schema) |
|
|
| keyword_possible = not p_only_arguments and not var_args_schema |
| if keyword_possible: |
| return self.kw_arguments_schema(kw_or_p_arguments + kw_only_arguments, var_kwargs_schema) |
|
|
| if not prefer_positional: |
| positional_possible = not kw_only_arguments and not var_kwargs_schema |
| if positional_possible: |
| return self.p_arguments_schema(p_only_arguments + kw_or_p_arguments, var_args_schema) |
|
|
| raise PydanticInvalidForJsonSchema( |
| 'Unable to generate JSON schema for arguments validator with positional-only and keyword-only arguments' |
| ) |
|
|
| def kw_arguments_schema( |
| self, arguments: list[core_schema.ArgumentsParameter], var_kwargs_schema: CoreSchema | None |
| ) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a function's keyword arguments. |
| |
| Args: |
| arguments: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| properties: dict[str, JsonSchemaValue] = {} |
| required: list[str] = [] |
| for argument in arguments: |
| name = self.get_argument_name(argument) |
| argument_schema = self.generate_inner(argument['schema']).copy() |
| if 'title' not in argument_schema and self.field_title_should_be_set(argument['schema']): |
| argument_schema['title'] = self.get_title_from_name(name) |
| properties[name] = argument_schema |
|
|
| if argument['schema']['type'] != 'default': |
| |
| |
| |
| required.append(name) |
|
|
| json_schema: JsonSchemaValue = {'type': 'object', 'properties': properties} |
| if required: |
| json_schema['required'] = required |
|
|
| if var_kwargs_schema: |
| additional_properties_schema = self.generate_inner(var_kwargs_schema) |
| if additional_properties_schema: |
| json_schema['additionalProperties'] = additional_properties_schema |
| else: |
| json_schema['additionalProperties'] = False |
| return json_schema |
|
|
| def p_arguments_schema( |
| self, arguments: list[core_schema.ArgumentsParameter], var_args_schema: CoreSchema | None |
| ) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a function's positional arguments. |
| |
| Args: |
| arguments: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| prefix_items: list[JsonSchemaValue] = [] |
| min_items = 0 |
|
|
| for argument in arguments: |
| name = self.get_argument_name(argument) |
|
|
| argument_schema = self.generate_inner(argument['schema']).copy() |
| if 'title' not in argument_schema and self.field_title_should_be_set(argument['schema']): |
| argument_schema['title'] = self.get_title_from_name(name) |
| prefix_items.append(argument_schema) |
|
|
| if argument['schema']['type'] != 'default': |
| |
| |
| |
| min_items += 1 |
|
|
| json_schema: JsonSchemaValue = {'type': 'array'} |
| if prefix_items: |
| json_schema['prefixItems'] = prefix_items |
| if min_items: |
| json_schema['minItems'] = min_items |
|
|
| if var_args_schema: |
| items_schema = self.generate_inner(var_args_schema) |
| if items_schema: |
| json_schema['items'] = items_schema |
| else: |
| json_schema['maxItems'] = len(prefix_items) |
|
|
| return json_schema |
|
|
| def get_argument_name(self, argument: core_schema.ArgumentsParameter | core_schema.ArgumentsV3Parameter) -> str: |
| """Retrieves the name of an argument. |
| |
| Args: |
| argument: The core schema. |
| |
| Returns: |
| The name of the argument. |
| """ |
| name = argument['name'] |
| if self.by_alias: |
| alias = argument.get('alias') |
| if isinstance(alias, str): |
| name = alias |
| else: |
| pass |
| return name |
|
|
| def arguments_v3_schema(self, schema: core_schema.ArgumentsV3Schema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a function's arguments. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| arguments = schema['arguments_schema'] |
| properties: dict[str, JsonSchemaValue] = {} |
| required: list[str] = [] |
| for argument in arguments: |
| mode = argument.get('mode', 'positional_or_keyword') |
| name = self.get_argument_name(argument) |
| argument_schema = self.generate_inner(argument['schema']).copy() |
| if mode == 'var_args': |
| argument_schema = {'type': 'array', 'items': argument_schema} |
| elif mode == 'var_kwargs_uniform': |
| argument_schema = {'type': 'object', 'additionalProperties': argument_schema} |
|
|
| argument_schema.setdefault('title', self.get_title_from_name(name)) |
| properties[name] = argument_schema |
|
|
| if ( |
| (mode == 'var_kwargs_unpacked_typed_dict' and 'required' in argument_schema) |
| or mode not in {'var_args', 'var_kwargs_uniform', 'var_kwargs_unpacked_typed_dict'} |
| and argument['schema']['type'] != 'default' |
| ): |
| |
| |
| |
| required.append(name) |
|
|
| json_schema: JsonSchemaValue = {'type': 'object', 'properties': properties} |
| if required: |
| json_schema['required'] = required |
| return json_schema |
|
|
| def call_schema(self, schema: core_schema.CallSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a function call. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['arguments_schema']) |
|
|
| def custom_error_schema(self, schema: core_schema.CustomErrorSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a custom error. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return self.generate_inner(schema['schema']) |
|
|
| def json_schema(self, schema: core_schema.JsonSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a JSON object. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| content_core_schema = schema.get('schema') or core_schema.any_schema() |
| content_json_schema = self.generate_inner(content_core_schema) |
| if self.mode == 'validation': |
| return {'type': 'string', 'contentMediaType': 'application/json', 'contentSchema': content_json_schema} |
| else: |
| |
| return content_json_schema |
|
|
| def url_schema(self, schema: core_schema.UrlSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a URL. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| json_schema = {'type': 'string', 'format': 'uri', 'minLength': 1} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.string) |
| return json_schema |
|
|
| def multi_host_url_schema(self, schema: core_schema.MultiHostUrlSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a URL that can be used with multiple hosts. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| |
| json_schema = {'type': 'string', 'format': 'multi-host-uri', 'minLength': 1} |
| self.update_with_validations(json_schema, schema, self.ValidationsMapping.string) |
| return json_schema |
|
|
| def uuid_schema(self, schema: core_schema.UuidSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a UUID. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {'type': 'string', 'format': 'uuid'} |
|
|
| def definitions_schema(self, schema: core_schema.DefinitionsSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that defines a JSON object with definitions. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| for definition in schema['definitions']: |
| try: |
| self.generate_inner(definition) |
| except PydanticInvalidForJsonSchema as e: |
| core_ref: CoreRef = CoreRef(definition['ref']) |
| self._core_defs_invalid_for_json_schema[self.get_defs_ref((core_ref, self.mode))] = e |
| continue |
| return self.generate_inner(schema['schema']) |
|
|
| def definition_ref_schema(self, schema: core_schema.DefinitionReferenceSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a schema that references a definition. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| core_ref = CoreRef(schema['schema_ref']) |
| _, ref_json_schema = self.get_cache_defs_ref_schema(core_ref) |
| return ref_json_schema |
|
|
| def ser_schema( |
| self, schema: core_schema.SerSchema | core_schema.IncExSeqSerSchema | core_schema.IncExDictSerSchema |
| ) -> JsonSchemaValue | None: |
| """Generates a JSON schema that matches a schema that defines a serialized object. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| schema_type = schema['type'] |
| if schema_type == 'function-plain' or schema_type == 'function-wrap': |
| |
| return_schema = schema.get('return_schema') |
| if return_schema is not None: |
| return self.generate_inner(return_schema) |
| elif schema_type == 'format' or schema_type == 'to-string': |
| |
| return self.str_schema(core_schema.str_schema()) |
| elif schema['type'] == 'model': |
| |
| return self.generate_inner(schema['schema']) |
| return None |
|
|
| def complex_schema(self, schema: core_schema.ComplexSchema) -> JsonSchemaValue: |
| """Generates a JSON schema that matches a complex number. |
| |
| JSON has no standard way to represent complex numbers. Complex number is not a numeric |
| type. Here we represent complex number as strings following the rule defined by Python. |
| For instance, '1+2j' is an accepted complex string. Details can be found in |
| [Python's `complex` documentation][complex]. |
| |
| Args: |
| schema: The core schema. |
| |
| Returns: |
| The generated JSON schema. |
| """ |
| return {'type': 'string'} |
|
|
| |
|
|
| def get_title_from_name(self, name: str) -> str: |
| """Retrieves a title from a name. |
| |
| Args: |
| name: The name to retrieve a title from. |
| |
| Returns: |
| The title. |
| """ |
| return name.title().replace('_', ' ').strip() |
|
|
| def field_title_should_be_set(self, schema: CoreSchemaOrField) -> bool: |
| """Returns true if a field with the given schema should have a title set based on the field name. |
| |
| Intuitively, we want this to return true for schemas that wouldn't otherwise provide their own title |
| (e.g., int, float, str), and false for those that would (e.g., BaseModel subclasses). |
| |
| Args: |
| schema: The schema to check. |
| |
| Returns: |
| `True` if the field should have a title set, `False` otherwise. |
| """ |
| if _core_utils.is_core_schema_field(schema): |
| if schema['type'] == 'computed-field': |
| field_schema = schema['return_schema'] |
| else: |
| field_schema = schema['schema'] |
| return self.field_title_should_be_set(field_schema) |
|
|
| elif _core_utils.is_core_schema(schema): |
| if schema.get('ref'): |
| return False |
| if schema['type'] in {'default', 'nullable', 'definitions'}: |
| return self.field_title_should_be_set(schema['schema']) |
| if _core_utils.is_function_with_inner_schema(schema): |
| return self.field_title_should_be_set(schema['schema']) |
| if schema['type'] == 'definition-ref': |
| |
| |
| return False |
| return True |
|
|
| else: |
| raise PydanticInvalidForJsonSchema(f'Unexpected schema type: schema={schema}') |
|
|
| def normalize_name(self, name: str) -> str: |
| """Normalizes a name to be used as a key in a dictionary. |
| |
| Args: |
| name: The name to normalize. |
| |
| Returns: |
| The normalized name. |
| """ |
| return re.sub(r'[^a-zA-Z0-9.\-_]', '_', name).replace('.', '__') |
|
|
| def get_defs_ref(self, core_mode_ref: CoreModeRef) -> DefsRef: |
| """Override this method to change the way that definitions keys are generated from a core reference. |
| |
| Args: |
| core_mode_ref: The core reference. |
| |
| Returns: |
| The definitions key. |
| """ |
| |
| core_ref, mode = core_mode_ref |
| components = re.split(r'([\][,])', core_ref) |
| |
| components = [x.rsplit(':', 1)[0] for x in components] |
| core_ref_no_id = ''.join(components) |
| |
| components = [re.sub(r'(?:[^.[\]]+\.)+((?:[^.[\]]+))', r'\1', x) for x in components] |
| short_ref = ''.join(components) |
|
|
| mode_title = _MODE_TITLE_MAPPING[mode] |
|
|
| |
| |
| |
| name = DefsRef(self.normalize_name(short_ref)) |
| name_mode = DefsRef(self.normalize_name(short_ref) + f'-{mode_title}') |
| module_qualname = DefsRef(self.normalize_name(core_ref_no_id)) |
| module_qualname_mode = DefsRef(f'{module_qualname}-{mode_title}') |
| module_qualname_id = DefsRef(self.normalize_name(core_ref)) |
| occurrence_index = self._collision_index.get(module_qualname_id) |
| if occurrence_index is None: |
| self._collision_counter[module_qualname] += 1 |
| occurrence_index = self._collision_index[module_qualname_id] = self._collision_counter[module_qualname] |
|
|
| module_qualname_occurrence = DefsRef(f'{module_qualname}__{occurrence_index}') |
| module_qualname_occurrence_mode = DefsRef(f'{module_qualname_mode}__{occurrence_index}') |
|
|
| self._prioritized_defsref_choices[module_qualname_occurrence_mode] = [ |
| name, |
| name_mode, |
| module_qualname, |
| module_qualname_mode, |
| module_qualname_occurrence, |
| module_qualname_occurrence_mode, |
| ] |
|
|
| return module_qualname_occurrence_mode |
|
|
| def get_cache_defs_ref_schema(self, core_ref: CoreRef) -> tuple[DefsRef, JsonSchemaValue]: |
| """This method wraps the get_defs_ref method with some cache-lookup/population logic, |
| and returns both the produced defs_ref and the JSON schema that will refer to the right definition. |
| |
| Args: |
| core_ref: The core reference to get the definitions reference for. |
| |
| Returns: |
| A tuple of the definitions reference and the JSON schema that will refer to it. |
| """ |
| core_mode_ref = (core_ref, self.mode) |
| maybe_defs_ref = self.core_to_defs_refs.get(core_mode_ref) |
| if maybe_defs_ref is not None: |
| json_ref = self.core_to_json_refs[core_mode_ref] |
| return maybe_defs_ref, {'$ref': json_ref} |
|
|
| defs_ref = self.get_defs_ref(core_mode_ref) |
|
|
| |
| self.core_to_defs_refs[core_mode_ref] = defs_ref |
| self.defs_to_core_refs[defs_ref] = core_mode_ref |
|
|
| json_ref = JsonRef(self.ref_template.format(model=defs_ref)) |
| self.core_to_json_refs[core_mode_ref] = json_ref |
| self.json_to_defs_refs[json_ref] = defs_ref |
| ref_json_schema = {'$ref': json_ref} |
| return defs_ref, ref_json_schema |
|
|
| def handle_ref_overrides(self, json_schema: JsonSchemaValue) -> JsonSchemaValue: |
| """Remove any sibling keys that are redundant with the referenced schema. |
| |
| Args: |
| json_schema: The schema to remove redundant sibling keys from. |
| |
| Returns: |
| The schema with redundant sibling keys removed. |
| """ |
| if '$ref' in json_schema: |
| |
| json_schema = json_schema.copy() |
|
|
| referenced_json_schema = self.get_schema_from_definitions(JsonRef(json_schema['$ref'])) |
| if referenced_json_schema is None: |
| |
| |
| |
| return json_schema |
| for k, v in list(json_schema.items()): |
| if k == '$ref': |
| continue |
| if k in referenced_json_schema and referenced_json_schema[k] == v: |
| del json_schema[k] |
|
|
| return json_schema |
|
|
| def get_schema_from_definitions(self, json_ref: JsonRef) -> JsonSchemaValue | None: |
| try: |
| def_ref = self.json_to_defs_refs[json_ref] |
| if def_ref in self._core_defs_invalid_for_json_schema: |
| raise self._core_defs_invalid_for_json_schema[def_ref] |
| return self.definitions.get(def_ref, None) |
| except KeyError: |
| if json_ref.startswith(('http://', 'https://')): |
| return None |
| raise |
|
|
| def encode_default(self, dft: Any) -> Any: |
| """Encode a default value to a JSON-serializable value. |
| |
| This is used to encode default values for fields in the generated JSON schema. |
| |
| Args: |
| dft: The default value to encode. |
| |
| Returns: |
| The encoded default value. |
| """ |
| from .type_adapter import TypeAdapter, _type_has_config |
|
|
| config = self._config |
| try: |
| default = ( |
| dft |
| if _type_has_config(type(dft)) |
| else TypeAdapter(type(dft), config=config.config_dict).dump_python( |
| dft, by_alias=self.by_alias, mode='json' |
| ) |
| ) |
| except PydanticSchemaGenerationError: |
| raise pydantic_core.PydanticSerializationError(f'Unable to encode default value {dft}') |
|
|
| return pydantic_core.to_jsonable_python( |
| default, timedelta_mode=config.ser_json_timedelta, bytes_mode=config.ser_json_bytes, by_alias=self.by_alias |
| ) |
|
|
| def update_with_validations( |
| self, json_schema: JsonSchemaValue, core_schema: CoreSchema, mapping: dict[str, str] |
| ) -> None: |
| """Update the json_schema with the corresponding validations specified in the core_schema, |
| using the provided mapping to translate keys in core_schema to the appropriate keys for a JSON schema. |
| |
| Args: |
| json_schema: The JSON schema to update. |
| core_schema: The core schema to get the validations from. |
| mapping: A mapping from core_schema attribute names to the corresponding JSON schema attribute names. |
| """ |
| for core_key, json_schema_key in mapping.items(): |
| if core_key in core_schema: |
| json_schema[json_schema_key] = core_schema[core_key] |
|
|
| class ValidationsMapping: |
| """This class just contains mappings from core_schema attribute names to the corresponding |
| JSON schema attribute names. While I suspect it is unlikely to be necessary, you can in |
| principle override this class in a subclass of GenerateJsonSchema (by inheriting from |
| GenerateJsonSchema.ValidationsMapping) to change these mappings. |
| """ |
|
|
| numeric = { |
| 'multiple_of': 'multipleOf', |
| 'le': 'maximum', |
| 'ge': 'minimum', |
| 'lt': 'exclusiveMaximum', |
| 'gt': 'exclusiveMinimum', |
| } |
| bytes = { |
| 'min_length': 'minLength', |
| 'max_length': 'maxLength', |
| } |
| string = { |
| 'min_length': 'minLength', |
| 'max_length': 'maxLength', |
| 'pattern': 'pattern', |
| } |
| array = { |
| 'min_length': 'minItems', |
| 'max_length': 'maxItems', |
| } |
| object = { |
| 'min_length': 'minProperties', |
| 'max_length': 'maxProperties', |
| } |
|
|
| def get_flattened_anyof(self, schemas: list[JsonSchemaValue]) -> JsonSchemaValue: |
| members = [] |
| for schema in schemas: |
| if len(schema) == 1 and 'anyOf' in schema: |
| members.extend(schema['anyOf']) |
| else: |
| members.append(schema) |
| members = _deduplicate_schemas(members) |
| if len(members) == 1: |
| return members[0] |
| return {'anyOf': members} |
|
|
| def get_json_ref_counts(self, json_schema: JsonSchemaValue) -> dict[JsonRef, int]: |
| """Get all values corresponding to the key '$ref' anywhere in the json_schema.""" |
| json_refs: dict[JsonRef, int] = Counter() |
|
|
| def _add_json_refs(schema: Any) -> None: |
| if isinstance(schema, dict): |
| if '$ref' in schema: |
| json_ref = JsonRef(schema['$ref']) |
| if not isinstance(json_ref, str): |
| return |
| already_visited = json_ref in json_refs |
| json_refs[json_ref] += 1 |
| if already_visited: |
| return |
| try: |
| defs_ref = self.json_to_defs_refs[json_ref] |
| if defs_ref in self._core_defs_invalid_for_json_schema: |
| raise self._core_defs_invalid_for_json_schema[defs_ref] |
| _add_json_refs(self.definitions[defs_ref]) |
| except KeyError: |
| if not json_ref.startswith(('http://', 'https://')): |
| raise |
|
|
| for k, v in schema.items(): |
| if k == 'examples' and isinstance(v, list): |
| |
| |
| continue |
| _add_json_refs(v) |
| elif isinstance(schema, list): |
| for v in schema: |
| _add_json_refs(v) |
|
|
| _add_json_refs(json_schema) |
| return json_refs |
|
|
| def handle_invalid_for_json_schema(self, schema: CoreSchemaOrField, error_info: str) -> JsonSchemaValue: |
| raise PydanticInvalidForJsonSchema(f'Cannot generate a JsonSchema for {error_info}') |
|
|
| def emit_warning(self, kind: JsonSchemaWarningKind, detail: str) -> None: |
| """This method simply emits PydanticJsonSchemaWarnings based on handling in the `warning_message` method.""" |
| message = self.render_warning_message(kind, detail) |
| if message is not None: |
| warnings.warn(message, PydanticJsonSchemaWarning) |
|
|
| def render_warning_message(self, kind: JsonSchemaWarningKind, detail: str) -> str | None: |
| """This method is responsible for ignoring warnings as desired, and for formatting the warning messages. |
| |
| You can override the value of `ignored_warning_kinds` in a subclass of GenerateJsonSchema |
| to modify what warnings are generated. If you want more control, you can override this method; |
| just return None in situations where you don't want warnings to be emitted. |
| |
| Args: |
| kind: The kind of warning to render. It can be one of the following: |
| |
| - 'skipped-choice': A choice field was skipped because it had no valid choices. |
| - 'non-serializable-default': A default value was skipped because it was not JSON-serializable. |
| detail: A string with additional details about the warning. |
| |
| Returns: |
| The formatted warning message, or `None` if no warning should be emitted. |
| """ |
| if kind in self.ignored_warning_kinds: |
| return None |
| return f'{detail} [{kind}]' |
|
|
| def _build_definitions_remapping(self) -> _DefinitionsRemapping: |
| defs_to_json: dict[DefsRef, JsonRef] = {} |
| for defs_refs in self._prioritized_defsref_choices.values(): |
| for defs_ref in defs_refs: |
| json_ref = JsonRef(self.ref_template.format(model=defs_ref)) |
| defs_to_json[defs_ref] = json_ref |
|
|
| return _DefinitionsRemapping.from_prioritized_choices( |
| self._prioritized_defsref_choices, defs_to_json, self.definitions |
| ) |
|
|
| def _garbage_collect_definitions(self, schema: JsonSchemaValue) -> None: |
| visited_defs_refs: set[DefsRef] = set() |
| unvisited_json_refs = _get_all_json_refs(schema) |
| while unvisited_json_refs: |
| next_json_ref = unvisited_json_refs.pop() |
| try: |
| next_defs_ref = self.json_to_defs_refs[next_json_ref] |
| if next_defs_ref in visited_defs_refs: |
| continue |
| visited_defs_refs.add(next_defs_ref) |
| unvisited_json_refs.update(_get_all_json_refs(self.definitions[next_defs_ref])) |
| except KeyError: |
| if not next_json_ref.startswith(('http://', 'https://')): |
| raise |
|
|
| self.definitions = {k: v for k, v in self.definitions.items() if k in visited_defs_refs} |
|
|
|
|
| |
|
|
|
|
| def model_json_schema( |
| cls: type[BaseModel] | type[PydanticDataclass], |
| by_alias: bool = True, |
| ref_template: str = DEFAULT_REF_TEMPLATE, |
| union_format: Literal['any_of', 'primitive_type_array'] = 'any_of', |
| schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema, |
| mode: JsonSchemaMode = 'validation', |
| ) -> dict[str, Any]: |
| """Utility function to generate a JSON Schema for a model. |
| |
| Args: |
| cls: The model class to generate a JSON Schema for. |
| by_alias: If `True` (the default), fields will be serialized according to their alias. |
| If `False`, fields will be serialized according to their attribute name. |
| ref_template: The template to use for generating JSON Schema references. |
| union_format: The format to use when combining schemas from unions together. Can be one of: |
| |
| - `'any_of'`: Use the [`anyOf`](https://json-schema.org/understanding-json-schema/reference/combining#anyOf) |
| keyword to combine schemas (the default). |
| - `'primitive_type_array'`: Use the [`type`](https://json-schema.org/understanding-json-schema/reference/type) |
| keyword as an array of strings, containing each type of the combination. If any of the schemas is not a primitive |
| type (`string`, `boolean`, `null`, `integer` or `number`) or contains constraints/metadata, falls back to |
| `any_of`. |
| schema_generator: The class to use for generating the JSON Schema. |
| mode: The mode to use for generating the JSON Schema. It can be one of the following: |
| |
| - 'validation': Generate a JSON Schema for validating data. |
| - 'serialization': Generate a JSON Schema for serializing data. |
| |
| Returns: |
| The generated JSON Schema. |
| """ |
| from .main import BaseModel |
|
|
| schema_generator_instance = schema_generator( |
| by_alias=by_alias, ref_template=ref_template, union_format=union_format |
| ) |
|
|
| if isinstance(cls.__pydantic_core_schema__, _mock_val_ser.MockCoreSchema): |
| cls.__pydantic_core_schema__.rebuild() |
|
|
| if cls is BaseModel: |
| raise AttributeError('model_json_schema() must be called on a subclass of BaseModel, not BaseModel itself.') |
|
|
| assert not isinstance(cls.__pydantic_core_schema__, _mock_val_ser.MockCoreSchema), 'this is a bug! please report it' |
| return schema_generator_instance.generate(cls.__pydantic_core_schema__, mode=mode) |
|
|
|
|
| def models_json_schema( |
| models: Sequence[tuple[type[BaseModel] | type[PydanticDataclass], JsonSchemaMode]], |
| *, |
| by_alias: bool = True, |
| title: str | None = None, |
| description: str | None = None, |
| ref_template: str = DEFAULT_REF_TEMPLATE, |
| union_format: Literal['any_of', 'primitive_type_array'] = 'any_of', |
| schema_generator: type[GenerateJsonSchema] = GenerateJsonSchema, |
| ) -> tuple[dict[tuple[type[BaseModel] | type[PydanticDataclass], JsonSchemaMode], JsonSchemaValue], JsonSchemaValue]: |
| """Utility function to generate a JSON Schema for multiple models. |
| |
| Args: |
| models: A sequence of tuples of the form (model, mode). |
| by_alias: Whether field aliases should be used as keys in the generated JSON Schema. |
| title: The title of the generated JSON Schema. |
| description: The description of the generated JSON Schema. |
| ref_template: The reference template to use for generating JSON Schema references. |
| union_format: The format to use when combining schemas from unions together. Can be one of: |
| |
| - `'any_of'`: Use the [`anyOf`](https://json-schema.org/understanding-json-schema/reference/combining#anyOf) |
| keyword to combine schemas (the default). |
| - `'primitive_type_array'`: Use the [`type`](https://json-schema.org/understanding-json-schema/reference/type) |
| keyword as an array of strings, containing each type of the combination. If any of the schemas is not a primitive |
| type (`string`, `boolean`, `null`, `integer` or `number`) or contains constraints/metadata, falls back to |
| `any_of`. |
| schema_generator: The schema generator to use for generating the JSON Schema. |
| |
| Returns: |
| A tuple where: |
| - The first element is a dictionary whose keys are tuples of JSON schema key type and JSON mode, and |
| whose values are the JSON schema corresponding to that pair of inputs. (These schemas may have |
| JsonRef references to definitions that are defined in the second returned element.) |
| - The second element is a JSON schema containing all definitions referenced in the first returned |
| element, along with the optional title and description keys. |
| """ |
| for cls, _ in models: |
| if isinstance(cls.__pydantic_core_schema__, _mock_val_ser.MockCoreSchema): |
| cls.__pydantic_core_schema__.rebuild() |
|
|
| instance = schema_generator(by_alias=by_alias, ref_template=ref_template, union_format=union_format) |
| inputs: list[tuple[type[BaseModel] | type[PydanticDataclass], JsonSchemaMode, CoreSchema]] = [ |
| (m, mode, m.__pydantic_core_schema__) for m, mode in models |
| ] |
| json_schemas_map, definitions = instance.generate_definitions(inputs) |
|
|
| json_schema: dict[str, Any] = {} |
| if definitions: |
| json_schema['$defs'] = definitions |
| if title: |
| json_schema['title'] = title |
| if description: |
| json_schema['description'] = description |
|
|
| return json_schemas_map, json_schema |
|
|
|
|
| |
|
|
|
|
| _HashableJsonValue: TypeAlias = Union[ |
| int, float, str, bool, None, tuple['_HashableJsonValue', ...], tuple[tuple[str, '_HashableJsonValue'], ...] |
| ] |
|
|
|
|
| def _deduplicate_schemas(schemas: Iterable[JsonDict]) -> list[JsonDict]: |
| return list({_make_json_hashable(schema): schema for schema in schemas}.values()) |
|
|
|
|
| def _make_json_hashable(value: JsonValue) -> _HashableJsonValue: |
| if isinstance(value, dict): |
| return tuple(sorted((k, _make_json_hashable(v)) for k, v in value.items())) |
| elif isinstance(value, list): |
| return tuple(_make_json_hashable(v) for v in value) |
| else: |
| return value |
|
|
|
|
| @dataclasses.dataclass(**_internal_dataclass.slots_true) |
| class WithJsonSchema: |
| """!!! abstract "Usage Documentation" |
| [`WithJsonSchema` Annotation](../concepts/json_schema.md#withjsonschema-annotation) |
| |
| Add this as an annotation on a field to override the (base) JSON schema that would be generated for that field. |
| This provides a way to set a JSON schema for types that would otherwise raise errors when producing a JSON schema, |
| such as Callable, or types that have an is-instance core schema, without needing to go so far as creating a |
| custom subclass of pydantic.json_schema.GenerateJsonSchema. |
| Note that any _modifications_ to the schema that would normally be made (such as setting the title for model fields) |
| will still be performed. |
| |
| If `mode` is set this will only apply to that schema generation mode, allowing you |
| to set different json schemas for validation and serialization. |
| """ |
|
|
| json_schema: JsonSchemaValue | None |
| mode: Literal['validation', 'serialization'] | None = None |
|
|
| def __get_pydantic_json_schema__( |
| self, core_schema: core_schema.CoreSchema, handler: GetJsonSchemaHandler |
| ) -> JsonSchemaValue: |
| mode = self.mode or handler.mode |
| if mode != handler.mode: |
| return handler(core_schema) |
| if self.json_schema is None: |
| |
| raise PydanticOmit |
| else: |
| return self.json_schema.copy() |
|
|
| def __hash__(self) -> int: |
| return hash(type(self.mode)) |
|
|
|
|
| class Examples: |
| """Add examples to a JSON schema. |
| |
| If the JSON Schema already contains examples, the provided examples |
| will be appended. |
| |
| If `mode` is set this will only apply to that schema generation mode, |
| allowing you to add different examples for validation and serialization. |
| """ |
|
|
| @overload |
| @deprecated('Using a dict for `examples` is deprecated since v2.9 and will be removed in v3.0. Use a list instead.') |
| def __init__( |
| self, examples: dict[str, Any], mode: Literal['validation', 'serialization'] | None = None |
| ) -> None: ... |
|
|
| @overload |
| def __init__(self, examples: list[Any], mode: Literal['validation', 'serialization'] | None = None) -> None: ... |
|
|
| def __init__( |
| self, examples: dict[str, Any] | list[Any], mode: Literal['validation', 'serialization'] | None = None |
| ) -> None: |
| if isinstance(examples, dict): |
| warnings.warn( |
| 'Using a dict for `examples` is deprecated, use a list instead.', |
| PydanticDeprecatedSince29, |
| stacklevel=2, |
| ) |
| self.examples = examples |
| self.mode = mode |
|
|
| def __get_pydantic_json_schema__( |
| self, core_schema: core_schema.CoreSchema, handler: GetJsonSchemaHandler |
| ) -> JsonSchemaValue: |
| mode = self.mode or handler.mode |
| json_schema = handler(core_schema) |
| if mode != handler.mode: |
| return json_schema |
| examples = json_schema.get('examples') |
| if examples is None: |
| json_schema['examples'] = to_jsonable_python(self.examples) |
| if isinstance(examples, dict): |
| if isinstance(self.examples, list): |
| warnings.warn( |
| 'Updating existing JSON Schema examples of type dict with examples of type list. ' |
| 'Only the existing examples values will be retained. Note that dict support for ' |
| 'examples is deprecated and will be removed in v3.0.', |
| UserWarning, |
| ) |
| json_schema['examples'] = to_jsonable_python( |
| [ex for value in examples.values() for ex in value] + self.examples |
| ) |
| else: |
| json_schema['examples'] = to_jsonable_python({**examples, **self.examples}) |
| if isinstance(examples, list): |
| if isinstance(self.examples, list): |
| json_schema['examples'] = to_jsonable_python(examples + self.examples) |
| elif isinstance(self.examples, dict): |
| warnings.warn( |
| 'Updating existing JSON Schema examples of type list with examples of type dict. ' |
| 'Only the examples values will be retained. Note that dict support for ' |
| 'examples is deprecated and will be removed in v3.0.', |
| UserWarning, |
| ) |
| json_schema['examples'] = to_jsonable_python( |
| examples + [ex for value in self.examples.values() for ex in value] |
| ) |
|
|
| return json_schema |
|
|
| def __hash__(self) -> int: |
| return hash(type(self.mode)) |
|
|
|
|
| def _get_all_json_refs(item: Any) -> set[JsonRef]: |
| """Get all the definitions references from a JSON schema.""" |
| refs: set[JsonRef] = set() |
| stack = [item] |
|
|
| while stack: |
| current = stack.pop() |
| if isinstance(current, dict): |
| for key, value in current.items(): |
| if key == 'examples' and isinstance(value, list): |
| |
| |
| |
| |
| |
| continue |
| if key == '$ref' and isinstance(value, str): |
| refs.add(JsonRef(value)) |
| elif isinstance(value, dict): |
| stack.append(value) |
| elif isinstance(value, list): |
| stack.extend(value) |
| elif isinstance(current, list): |
| stack.extend(current) |
|
|
| return refs |
|
|
|
|
| AnyType = TypeVar('AnyType') |
|
|
| if TYPE_CHECKING: |
| SkipJsonSchema = Annotated[AnyType, ...] |
| else: |
|
|
| @dataclasses.dataclass(**_internal_dataclass.slots_true) |
| class SkipJsonSchema: |
| """!!! abstract "Usage Documentation" |
| [`SkipJsonSchema` Annotation](../concepts/json_schema.md#skipjsonschema-annotation) |
| |
| Add this as an annotation on a field to skip generating a JSON schema for that field. |
| |
| Example: |
| ```python |
| from pprint import pprint |
| from typing import Union |
| |
| from pydantic import BaseModel |
| from pydantic.json_schema import SkipJsonSchema |
| |
| class Model(BaseModel): |
| a: Union[int, None] = None # (1)! |
| b: Union[int, SkipJsonSchema[None]] = None # (2)! |
| c: SkipJsonSchema[Union[int, None]] = None # (3)! |
| |
| pprint(Model.model_json_schema()) |
| ''' |
| { |
| 'properties': { |
| 'a': { |
| 'anyOf': [ |
| {'type': 'integer'}, |
| {'type': 'null'} |
| ], |
| 'default': None, |
| 'title': 'A' |
| }, |
| 'b': { |
| 'default': None, |
| 'title': 'B', |
| 'type': 'integer' |
| } |
| }, |
| 'title': 'Model', |
| 'type': 'object' |
| } |
| ''' |
| ``` |
| |
| 1. The integer and null types are both included in the schema for `a`. |
| 2. The integer type is the only type included in the schema for `b`. |
| 3. The entirety of the `c` field is omitted from the schema. |
| """ |
|
|
| def __class_getitem__(cls, item: AnyType) -> AnyType: |
| return Annotated[item, cls()] |
|
|
| def __get_pydantic_json_schema__( |
| self, core_schema: CoreSchema, handler: GetJsonSchemaHandler |
| ) -> JsonSchemaValue: |
| raise PydanticOmit |
|
|
| def __hash__(self) -> int: |
| return hash(type(self)) |
|
|
|
|
| def _get_typed_dict_config(cls: type[Any] | None) -> ConfigDict: |
| if cls is not None: |
| try: |
| return _decorators.get_attribute_from_bases(cls, '__pydantic_config__') |
| except AttributeError: |
| pass |
| return {} |
|
|
|
|
| def _get_ser_schema_for_default_value(schema: CoreSchema) -> core_schema.PlainSerializerFunctionSerSchema | None: |
| """Get a `'function-plain'` serialization schema that can be used to serialize a default value. |
| |
| This takes into account having the serialization schema nested under validation schema(s). |
| """ |
| if ( |
| (ser_schema := schema.get('serialization')) |
| and ser_schema['type'] == 'function-plain' |
| and not ser_schema.get('info_arg') |
| ): |
| return ser_schema |
| if _core_utils.is_function_with_inner_schema(schema): |
| return _get_ser_schema_for_default_value(schema['schema']) |
|
|