| | import os |
| | import sys |
| | import warnings |
| | from typing import Any, Callable, NoReturn, Type, Union |
| |
|
| | from cryptography.hazmat.bindings.openssl.binding import Binding |
| |
|
| | StrOrBytesPath = Union[str, bytes, os.PathLike] |
| |
|
| | binding = Binding() |
| | ffi = binding.ffi |
| | lib = binding.lib |
| |
|
| |
|
| | |
| | |
| | |
| | no_zero_allocator = ffi.new_allocator(should_clear_after_alloc=False) |
| |
|
| |
|
| | def text(charp: Any) -> str: |
| | """ |
| | Get a native string type representing of the given CFFI ``char*`` object. |
| | |
| | :param charp: A C-style string represented using CFFI. |
| | |
| | :return: :class:`str` |
| | """ |
| | if not charp: |
| | return "" |
| | return ffi.string(charp).decode("utf-8") |
| |
|
| |
|
| | def exception_from_error_queue(exception_type: Type[Exception]) -> NoReturn: |
| | """ |
| | Convert an OpenSSL library failure into a Python exception. |
| | |
| | When a call to the native OpenSSL library fails, this is usually signalled |
| | by the return value, and an error code is stored in an error queue |
| | associated with the current thread. The err library provides functions to |
| | obtain these error codes and textual error messages. |
| | """ |
| | errors = [] |
| |
|
| | while True: |
| | error = lib.ERR_get_error() |
| | if error == 0: |
| | break |
| | errors.append( |
| | ( |
| | text(lib.ERR_lib_error_string(error)), |
| | text(lib.ERR_func_error_string(error)), |
| | text(lib.ERR_reason_error_string(error)), |
| | ) |
| | ) |
| |
|
| | raise exception_type(errors) |
| |
|
| |
|
| | def make_assert(error: Type[Exception]) -> Callable[[bool], Any]: |
| | """ |
| | Create an assert function that uses :func:`exception_from_error_queue` to |
| | raise an exception wrapped by *error*. |
| | """ |
| |
|
| | def openssl_assert(ok: bool) -> None: |
| | """ |
| | If *ok* is not True, retrieve the error from OpenSSL and raise it. |
| | """ |
| | if ok is not True: |
| | exception_from_error_queue(error) |
| |
|
| | return openssl_assert |
| |
|
| |
|
| | def path_bytes(s: StrOrBytesPath) -> bytes: |
| | """ |
| | Convert a Python path to a :py:class:`bytes` for the path which can be |
| | passed into an OpenSSL API accepting a filename. |
| | |
| | :param s: A path (valid for os.fspath). |
| | |
| | :return: An instance of :py:class:`bytes`. |
| | """ |
| | b = os.fspath(s) |
| |
|
| | if isinstance(b, str): |
| | return b.encode(sys.getfilesystemencoding()) |
| | else: |
| | return b |
| |
|
| |
|
| | def byte_string(s: str) -> bytes: |
| | return s.encode("charmap") |
| |
|
| |
|
| | |
| | |
| | UNSPECIFIED = object() |
| |
|
| | _TEXT_WARNING = "str for {0} is no longer accepted, use bytes" |
| |
|
| |
|
| | def text_to_bytes_and_warn(label: str, obj: Any) -> Any: |
| | """ |
| | If ``obj`` is text, emit a warning that it should be bytes instead and try |
| | to convert it to bytes automatically. |
| | |
| | :param str label: The name of the parameter from which ``obj`` was taken |
| | (so a developer can easily find the source of the problem and correct |
| | it). |
| | |
| | :return: If ``obj`` is the text string type, a ``bytes`` object giving the |
| | UTF-8 encoding of that text is returned. Otherwise, ``obj`` itself is |
| | returned. |
| | """ |
| | if isinstance(obj, str): |
| | warnings.warn( |
| | _TEXT_WARNING.format(label), |
| | category=DeprecationWarning, |
| | stacklevel=3, |
| | ) |
| | return obj.encode("utf-8") |
| | return obj |
| |
|