Spaces:
Paused
Paused
| # mypy: allow-untyped-defs | |
| """Python version compatibility code.""" | |
| from __future__ import annotations | |
| import dataclasses | |
| import enum | |
| import functools | |
| import inspect | |
| from inspect import Parameter | |
| from inspect import signature | |
| import os | |
| from pathlib import Path | |
| import sys | |
| from typing import Any | |
| from typing import Callable | |
| from typing import Final | |
| from typing import NoReturn | |
| import py | |
| #: constant to prepare valuing pylib path replacements/lazy proxies later on | |
| # intended for removal in pytest 8.0 or 9.0 | |
| # fmt: off | |
| # intentional space to create a fake difference for the verification | |
| LEGACY_PATH = py.path. local | |
| # fmt: on | |
| def legacy_path(path: str | os.PathLike[str]) -> LEGACY_PATH: | |
| """Internal wrapper to prepare lazy proxies for legacy_path instances""" | |
| return LEGACY_PATH(path) | |
| # fmt: off | |
| # Singleton type for NOTSET, as described in: | |
| # https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions | |
| class NotSetType(enum.Enum): | |
| token = 0 | |
| NOTSET: Final = NotSetType.token | |
| # fmt: on | |
| def is_generator(func: object) -> bool: | |
| genfunc = inspect.isgeneratorfunction(func) | |
| return genfunc and not iscoroutinefunction(func) | |
| def iscoroutinefunction(func: object) -> bool: | |
| """Return True if func is a coroutine function (a function defined with async | |
| def syntax, and doesn't contain yield), or a function decorated with | |
| @asyncio.coroutine. | |
| Note: copied and modified from Python 3.5's builtin coroutines.py to avoid | |
| importing asyncio directly, which in turns also initializes the "logging" | |
| module as a side-effect (see issue #8). | |
| """ | |
| return inspect.iscoroutinefunction(func) or getattr(func, "_is_coroutine", False) | |
| def is_async_function(func: object) -> bool: | |
| """Return True if the given function seems to be an async function or | |
| an async generator.""" | |
| return iscoroutinefunction(func) or inspect.isasyncgenfunction(func) | |
| def getlocation(function, curdir: str | os.PathLike[str] | None = None) -> str: | |
| function = get_real_func(function) | |
| fn = Path(inspect.getfile(function)) | |
| lineno = function.__code__.co_firstlineno | |
| if curdir is not None: | |
| try: | |
| relfn = fn.relative_to(curdir) | |
| except ValueError: | |
| pass | |
| else: | |
| return "%s:%d" % (relfn, lineno + 1) | |
| return "%s:%d" % (fn, lineno + 1) | |
| def num_mock_patch_args(function) -> int: | |
| """Return number of arguments used up by mock arguments (if any).""" | |
| patchings = getattr(function, "patchings", None) | |
| if not patchings: | |
| return 0 | |
| mock_sentinel = getattr(sys.modules.get("mock"), "DEFAULT", object()) | |
| ut_mock_sentinel = getattr(sys.modules.get("unittest.mock"), "DEFAULT", object()) | |
| return len( | |
| [ | |
| p | |
| for p in patchings | |
| if not p.attribute_name | |
| and (p.new is mock_sentinel or p.new is ut_mock_sentinel) | |
| ] | |
| ) | |
| def getfuncargnames( | |
| function: Callable[..., object], | |
| *, | |
| name: str = "", | |
| cls: type | None = None, | |
| ) -> tuple[str, ...]: | |
| """Return the names of a function's mandatory arguments. | |
| Should return the names of all function arguments that: | |
| * Aren't bound to an instance or type as in instance or class methods. | |
| * Don't have default values. | |
| * Aren't bound with functools.partial. | |
| * Aren't replaced with mocks. | |
| The cls arguments indicate that the function should be treated as a bound | |
| method even though it's not unless the function is a static method. | |
| The name parameter should be the original name in which the function was collected. | |
| """ | |
| # TODO(RonnyPfannschmidt): This function should be refactored when we | |
| # revisit fixtures. The fixture mechanism should ask the node for | |
| # the fixture names, and not try to obtain directly from the | |
| # function object well after collection has occurred. | |
| # The parameters attribute of a Signature object contains an | |
| # ordered mapping of parameter names to Parameter instances. This | |
| # creates a tuple of the names of the parameters that don't have | |
| # defaults. | |
| try: | |
| parameters = signature(function).parameters | |
| except (ValueError, TypeError) as e: | |
| from _pytest.outcomes import fail | |
| fail( | |
| f"Could not determine arguments of {function!r}: {e}", | |
| pytrace=False, | |
| ) | |
| arg_names = tuple( | |
| p.name | |
| for p in parameters.values() | |
| if ( | |
| p.kind is Parameter.POSITIONAL_OR_KEYWORD | |
| or p.kind is Parameter.KEYWORD_ONLY | |
| ) | |
| and p.default is Parameter.empty | |
| ) | |
| if not name: | |
| name = function.__name__ | |
| # If this function should be treated as a bound method even though | |
| # it's passed as an unbound method or function, remove the first | |
| # parameter name. | |
| if ( | |
| # Not using `getattr` because we don't want to resolve the staticmethod. | |
| # Not using `cls.__dict__` because we want to check the entire MRO. | |
| cls | |
| and not isinstance( | |
| inspect.getattr_static(cls, name, default=None), staticmethod | |
| ) | |
| ): | |
| arg_names = arg_names[1:] | |
| # Remove any names that will be replaced with mocks. | |
| if hasattr(function, "__wrapped__"): | |
| arg_names = arg_names[num_mock_patch_args(function) :] | |
| return arg_names | |
| def get_default_arg_names(function: Callable[..., Any]) -> tuple[str, ...]: | |
| # Note: this code intentionally mirrors the code at the beginning of | |
| # getfuncargnames, to get the arguments which were excluded from its result | |
| # because they had default values. | |
| return tuple( | |
| p.name | |
| for p in signature(function).parameters.values() | |
| if p.kind in (Parameter.POSITIONAL_OR_KEYWORD, Parameter.KEYWORD_ONLY) | |
| and p.default is not Parameter.empty | |
| ) | |
| _non_printable_ascii_translate_table = { | |
| i: f"\\x{i:02x}" for i in range(128) if i not in range(32, 127) | |
| } | |
| _non_printable_ascii_translate_table.update( | |
| {ord("\t"): "\\t", ord("\r"): "\\r", ord("\n"): "\\n"} | |
| ) | |
| def ascii_escaped(val: bytes | str) -> str: | |
| r"""If val is pure ASCII, return it as an str, otherwise, escape | |
| bytes objects into a sequence of escaped bytes: | |
| b'\xc3\xb4\xc5\xd6' -> r'\xc3\xb4\xc5\xd6' | |
| and escapes strings into a sequence of escaped unicode ids, e.g.: | |
| r'4\nV\U00043efa\x0eMXWB\x1e\u3028\u15fd\xcd\U0007d944' | |
| Note: | |
| The obvious "v.decode('unicode-escape')" will return | |
| valid UTF-8 unicode if it finds them in bytes, but we | |
| want to return escaped bytes for any byte, even if they match | |
| a UTF-8 string. | |
| """ | |
| if isinstance(val, bytes): | |
| ret = val.decode("ascii", "backslashreplace") | |
| else: | |
| ret = val.encode("unicode_escape").decode("ascii") | |
| return ret.translate(_non_printable_ascii_translate_table) | |
| class _PytestWrapper: | |
| """Dummy wrapper around a function object for internal use only. | |
| Used to correctly unwrap the underlying function object when we are | |
| creating fixtures, because we wrap the function object ourselves with a | |
| decorator to issue warnings when the fixture function is called directly. | |
| """ | |
| obj: Any | |
| def get_real_func(obj): | |
| """Get the real function object of the (possibly) wrapped object by | |
| functools.wraps or functools.partial.""" | |
| start_obj = obj | |
| for i in range(100): | |
| # __pytest_wrapped__ is set by @pytest.fixture when wrapping the fixture function | |
| # to trigger a warning if it gets called directly instead of by pytest: we don't | |
| # want to unwrap further than this otherwise we lose useful wrappings like @mock.patch (#3774) | |
| new_obj = getattr(obj, "__pytest_wrapped__", None) | |
| if isinstance(new_obj, _PytestWrapper): | |
| obj = new_obj.obj | |
| break | |
| new_obj = getattr(obj, "__wrapped__", None) | |
| if new_obj is None: | |
| break | |
| obj = new_obj | |
| else: | |
| from _pytest._io.saferepr import saferepr | |
| raise ValueError( | |
| f"could not find real function of {saferepr(start_obj)}\nstopped at {saferepr(obj)}" | |
| ) | |
| if isinstance(obj, functools.partial): | |
| obj = obj.func | |
| return obj | |
| def get_real_method(obj, holder): | |
| """Attempt to obtain the real function object that might be wrapping | |
| ``obj``, while at the same time returning a bound method to ``holder`` if | |
| the original object was a bound method.""" | |
| try: | |
| is_method = hasattr(obj, "__func__") | |
| obj = get_real_func(obj) | |
| except Exception: # pragma: no cover | |
| return obj | |
| if is_method and hasattr(obj, "__get__") and callable(obj.__get__): | |
| obj = obj.__get__(holder) | |
| return obj | |
| def getimfunc(func): | |
| try: | |
| return func.__func__ | |
| except AttributeError: | |
| return func | |
| def safe_getattr(object: Any, name: str, default: Any) -> Any: | |
| """Like getattr but return default upon any Exception or any OutcomeException. | |
| Attribute access can potentially fail for 'evil' Python objects. | |
| See issue #214. | |
| It catches OutcomeException because of #2490 (issue #580), new outcomes | |
| are derived from BaseException instead of Exception (for more details | |
| check #2707). | |
| """ | |
| from _pytest.outcomes import TEST_OUTCOME | |
| try: | |
| return getattr(object, name, default) | |
| except TEST_OUTCOME: | |
| return default | |
| def safe_isclass(obj: object) -> bool: | |
| """Ignore any exception via isinstance on Python 3.""" | |
| try: | |
| return inspect.isclass(obj) | |
| except Exception: | |
| return False | |
| def get_user_id() -> int | None: | |
| """Return the current process's real user id or None if it could not be | |
| determined. | |
| :return: The user id or None if it could not be determined. | |
| """ | |
| # mypy follows the version and platform checking expectation of PEP 484: | |
| # https://mypy.readthedocs.io/en/stable/common_issues.html?highlight=platform#python-version-and-system-platform-checks | |
| # Containment checks are too complex for mypy v1.5.0 and cause failure. | |
| if sys.platform == "win32" or sys.platform == "emscripten": | |
| # win32 does not have a getuid() function. | |
| # Emscripten has a return 0 stub. | |
| return None | |
| else: | |
| # On other platforms, a return value of -1 is assumed to indicate that | |
| # the current process's real user id could not be determined. | |
| ERROR = -1 | |
| uid = os.getuid() | |
| return uid if uid != ERROR else None | |
| # Perform exhaustiveness checking. | |
| # | |
| # Consider this example: | |
| # | |
| # MyUnion = Union[int, str] | |
| # | |
| # def handle(x: MyUnion) -> int { | |
| # if isinstance(x, int): | |
| # return 1 | |
| # elif isinstance(x, str): | |
| # return 2 | |
| # else: | |
| # raise Exception('unreachable') | |
| # | |
| # Now suppose we add a new variant: | |
| # | |
| # MyUnion = Union[int, str, bytes] | |
| # | |
| # After doing this, we must remember ourselves to go and update the handle | |
| # function to handle the new variant. | |
| # | |
| # With `assert_never` we can do better: | |
| # | |
| # // raise Exception('unreachable') | |
| # return assert_never(x) | |
| # | |
| # Now, if we forget to handle the new variant, the type-checker will emit a | |
| # compile-time error, instead of the runtime error we would have gotten | |
| # previously. | |
| # | |
| # This also work for Enums (if you use `is` to compare) and Literals. | |
| def assert_never(value: NoReturn) -> NoReturn: | |
| assert False, f"Unhandled value: {value} ({type(value).__name__})" | |