Spaces:
Paused
Paused
| from __future__ import annotations | |
| from contextlib import suppress | |
| from datetime import date, datetime | |
| from uuid import UUID | |
| import ipaddress | |
| import re | |
| import typing | |
| import warnings | |
| from jsonschema.exceptions import FormatError | |
| _FormatCheckCallable = typing.Callable[[object], bool] | |
| #: A format checker callable. | |
| _F = typing.TypeVar("_F", bound=_FormatCheckCallable) | |
| _RaisesType = typing.Union[ | |
| typing.Type[Exception], typing.Tuple[typing.Type[Exception], ...], | |
| ] | |
| _RE_DATE = re.compile(r"^\d{4}-\d{2}-\d{2}$", re.ASCII) | |
| class FormatChecker: | |
| """ | |
| A ``format`` property checker. | |
| JSON Schema does not mandate that the ``format`` property actually do any | |
| validation. If validation is desired however, instances of this class can | |
| be hooked into validators to enable format validation. | |
| `FormatChecker` objects always return ``True`` when asked about | |
| formats that they do not know how to validate. | |
| To add a check for a custom format use the `FormatChecker.checks` | |
| decorator. | |
| Arguments: | |
| formats: | |
| The known formats to validate. This argument can be used to | |
| limit which formats will be used during validation. | |
| """ | |
| checkers: dict[ | |
| str, | |
| tuple[_FormatCheckCallable, _RaisesType], | |
| ] = {} # noqa: RUF012 | |
| def __init__(self, formats: typing.Iterable[str] | None = None): | |
| if formats is None: | |
| formats = self.checkers.keys() | |
| self.checkers = {k: self.checkers[k] for k in formats} | |
| def __repr__(self): | |
| return f"<FormatChecker checkers={sorted(self.checkers)}>" | |
| def checks( | |
| self, format: str, raises: _RaisesType = (), | |
| ) -> typing.Callable[[_F], _F]: | |
| """ | |
| Register a decorated function as validating a new format. | |
| Arguments: | |
| format: | |
| The format that the decorated function will check. | |
| raises: | |
| The exception(s) raised by the decorated function when an | |
| invalid instance is found. | |
| The exception object will be accessible as the | |
| `jsonschema.exceptions.ValidationError.cause` attribute of the | |
| resulting validation error. | |
| """ | |
| def _checks(func: _F) -> _F: | |
| self.checkers[format] = (func, raises) | |
| return func | |
| return _checks | |
| def cls_checks( | |
| cls, format: str, raises: _RaisesType = (), | |
| ) -> typing.Callable[[_F], _F]: | |
| warnings.warn( | |
| ( | |
| "FormatChecker.cls_checks is deprecated. Call " | |
| "FormatChecker.checks on a specific FormatChecker instance " | |
| "instead." | |
| ), | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| return cls._cls_checks(format=format, raises=raises) | |
| def _cls_checks( | |
| cls, format: str, raises: _RaisesType = (), | |
| ) -> typing.Callable[[_F], _F]: | |
| def _checks(func: _F) -> _F: | |
| cls.checkers[format] = (func, raises) | |
| return func | |
| return _checks | |
| def check(self, instance: object, format: str) -> None: | |
| """ | |
| Check whether the instance conforms to the given format. | |
| Arguments: | |
| instance (*any primitive type*, i.e. str, number, bool): | |
| The instance to check | |
| format: | |
| The format that instance should conform to | |
| Raises: | |
| FormatError: | |
| if the instance does not conform to ``format`` | |
| """ | |
| if format not in self.checkers: | |
| return | |
| func, raises = self.checkers[format] | |
| result, cause = None, None | |
| try: | |
| result = func(instance) | |
| except raises as e: | |
| cause = e | |
| if not result: | |
| raise FormatError(f"{instance!r} is not a {format!r}", cause=cause) | |
| def conforms(self, instance: object, format: str) -> bool: | |
| """ | |
| Check whether the instance conforms to the given format. | |
| Arguments: | |
| instance (*any primitive type*, i.e. str, number, bool): | |
| The instance to check | |
| format: | |
| The format that instance should conform to | |
| Returns: | |
| bool: whether it conformed | |
| """ | |
| try: | |
| self.check(instance, format) | |
| except FormatError: | |
| return False | |
| else: | |
| return True | |
| draft3_format_checker = FormatChecker() | |
| draft4_format_checker = FormatChecker() | |
| draft6_format_checker = FormatChecker() | |
| draft7_format_checker = FormatChecker() | |
| draft201909_format_checker = FormatChecker() | |
| draft202012_format_checker = FormatChecker() | |
| _draft_checkers: dict[str, FormatChecker] = dict( | |
| draft3=draft3_format_checker, | |
| draft4=draft4_format_checker, | |
| draft6=draft6_format_checker, | |
| draft7=draft7_format_checker, | |
| draft201909=draft201909_format_checker, | |
| draft202012=draft202012_format_checker, | |
| ) | |
| def _checks_drafts( | |
| name=None, | |
| draft3=None, | |
| draft4=None, | |
| draft6=None, | |
| draft7=None, | |
| draft201909=None, | |
| draft202012=None, | |
| raises=(), | |
| ) -> typing.Callable[[_F], _F]: | |
| draft3 = draft3 or name | |
| draft4 = draft4 or name | |
| draft6 = draft6 or name | |
| draft7 = draft7 or name | |
| draft201909 = draft201909 or name | |
| draft202012 = draft202012 or name | |
| def wrap(func: _F) -> _F: | |
| if draft3: | |
| func = _draft_checkers["draft3"].checks(draft3, raises)(func) | |
| if draft4: | |
| func = _draft_checkers["draft4"].checks(draft4, raises)(func) | |
| if draft6: | |
| func = _draft_checkers["draft6"].checks(draft6, raises)(func) | |
| if draft7: | |
| func = _draft_checkers["draft7"].checks(draft7, raises)(func) | |
| if draft201909: | |
| func = _draft_checkers["draft201909"].checks(draft201909, raises)( | |
| func, | |
| ) | |
| if draft202012: | |
| func = _draft_checkers["draft202012"].checks(draft202012, raises)( | |
| func, | |
| ) | |
| # Oy. This is bad global state, but relied upon for now, until | |
| # deprecation. See #519 and test_format_checkers_come_with_defaults | |
| FormatChecker._cls_checks( | |
| draft202012 or draft201909 or draft7 or draft6 or draft4 or draft3, | |
| raises, | |
| )(func) | |
| return func | |
| return wrap | |
| def is_email(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return "@" in instance | |
| def is_ipv4(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return bool(ipaddress.IPv4Address(instance)) | |
| def is_ipv6(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| address = ipaddress.IPv6Address(instance) | |
| return not getattr(address, "scope_id", "") | |
| with suppress(ImportError): | |
| from fqdn import FQDN | |
| def is_host_name(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return FQDN(instance, min_labels=1).is_valid | |
| with suppress(ImportError): | |
| # The built-in `idna` codec only implements RFC 3890, so we go elsewhere. | |
| import idna | |
| def is_idn_host_name(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| idna.encode(instance) | |
| return True | |
| try: | |
| import rfc3987 | |
| except ImportError: | |
| with suppress(ImportError): | |
| from rfc3986_validator import validate_rfc3986 | |
| def is_uri(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return validate_rfc3986(instance, rule="URI") | |
| def is_uri_reference(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return validate_rfc3986(instance, rule="URI_reference") | |
| else: | |
| def is_iri(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return rfc3987.parse(instance, rule="IRI") | |
| def is_iri_reference(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return rfc3987.parse(instance, rule="IRI_reference") | |
| def is_uri(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return rfc3987.parse(instance, rule="URI") | |
| def is_uri_reference(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return rfc3987.parse(instance, rule="URI_reference") | |
| with suppress(ImportError): | |
| from rfc3339_validator import validate_rfc3339 | |
| def is_datetime(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return validate_rfc3339(instance.upper()) | |
| def is_time(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return is_datetime("1970-01-01T" + instance) | |
| def is_regex(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return bool(re.compile(instance)) | |
| def is_date(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return bool(_RE_DATE.fullmatch(instance) and date.fromisoformat(instance)) | |
| def is_draft3_time(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return bool(datetime.strptime(instance, "%H:%M:%S")) # noqa: DTZ007 | |
| with suppress(ImportError): | |
| from webcolors import CSS21_NAMES_TO_HEX | |
| import webcolors | |
| def is_css_color_code(instance: object) -> bool: | |
| return webcolors.normalize_hex(instance) | |
| def is_css21_color(instance: object) -> bool: | |
| if ( | |
| not isinstance(instance, str) | |
| or instance.lower() in CSS21_NAMES_TO_HEX | |
| ): | |
| return True | |
| return is_css_color_code(instance) | |
| with suppress(ImportError): | |
| import jsonpointer | |
| def is_json_pointer(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return bool(jsonpointer.JsonPointer(instance)) | |
| # TODO: I don't want to maintain this, so it | |
| # needs to go either into jsonpointer (pending | |
| # https://github.com/stefankoegl/python-json-pointer/issues/34) or | |
| # into a new external library. | |
| def is_relative_json_pointer(instance: object) -> bool: | |
| # Definition taken from: | |
| # https://tools.ietf.org/html/draft-handrews-relative-json-pointer-01#section-3 | |
| if not isinstance(instance, str): | |
| return True | |
| if not instance: | |
| return False | |
| non_negative_integer, rest = [], "" | |
| for i, character in enumerate(instance): | |
| if character.isdigit(): | |
| # digits with a leading "0" are not allowed | |
| if i > 0 and int(instance[i - 1]) == 0: | |
| return False | |
| non_negative_integer.append(character) | |
| continue | |
| if not non_negative_integer: | |
| return False | |
| rest = instance[i:] | |
| break | |
| return (rest == "#") or bool(jsonpointer.JsonPointer(rest)) | |
| with suppress(ImportError): | |
| import uri_template | |
| def is_uri_template(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| return uri_template.validate(instance) | |
| with suppress(ImportError): | |
| import isoduration | |
| def is_duration(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| isoduration.parse_duration(instance) | |
| # FIXME: See bolsote/isoduration#25 and bolsote/isoduration#21 | |
| return instance.endswith(tuple("DMYWHMS")) | |
| def is_uuid(instance: object) -> bool: | |
| if not isinstance(instance, str): | |
| return True | |
| UUID(instance) | |
| return all(instance[position] == "-" for position in (8, 13, 18, 23)) | |