Spaces:
Runtime error
Runtime error
| # =================================================================== | |
| # | |
| # Copyright (c) 2014, Legrandin <helderijs@gmail.com> | |
| # All rights reserved. | |
| # | |
| # Redistribution and use in source and binary forms, with or without | |
| # modification, are permitted provided that the following conditions | |
| # are met: | |
| # | |
| # 1. Redistributions of source code must retain the above copyright | |
| # notice, this list of conditions and the following disclaimer. | |
| # 2. Redistributions in binary form must reproduce the above copyright | |
| # notice, this list of conditions and the following disclaimer in | |
| # the documentation and/or other materials provided with the | |
| # distribution. | |
| # | |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS | |
| # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE | |
| # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, | |
| # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, | |
| # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
| # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | |
| # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
| # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN | |
| # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
| # POSSIBILITY OF SUCH DAMAGE. | |
| # =================================================================== | |
| import os | |
| import abc | |
| import sys | |
| from Crypto.Util.py3compat import byte_string | |
| from Crypto.Util._file_system import pycryptodome_filename | |
| # | |
| # List of file suffixes for Python extensions | |
| # | |
| if sys.version_info[0] < 3: | |
| import imp | |
| extension_suffixes = [] | |
| for ext, mod, typ in imp.get_suffixes(): | |
| if typ == imp.C_EXTENSION: | |
| extension_suffixes.append(ext) | |
| else: | |
| from importlib import machinery | |
| extension_suffixes = machinery.EXTENSION_SUFFIXES | |
| # Which types with buffer interface we support (apart from byte strings) | |
| _buffer_type = (bytearray, memoryview) | |
| class _VoidPointer(object): | |
| def get(self): | |
| """Return the memory location we point to""" | |
| return | |
| def address_of(self): | |
| """Return a raw pointer to this pointer""" | |
| return | |
| try: | |
| # Starting from v2.18, pycparser (used by cffi for in-line ABI mode) | |
| # stops working correctly when PYOPTIMIZE==2 or the parameter -OO is | |
| # passed. In that case, we fall back to ctypes. | |
| # Note that PyPy ships with an old version of pycparser so we can keep | |
| # using cffi there. | |
| # See https://github.com/Legrandin/pycryptodome/issues/228 | |
| if '__pypy__' not in sys.builtin_module_names and sys.flags.optimize == 2: | |
| raise ImportError("CFFI with optimize=2 fails due to pycparser bug.") | |
| from cffi import FFI | |
| ffi = FFI() | |
| null_pointer = ffi.NULL | |
| uint8_t_type = ffi.typeof(ffi.new("const uint8_t*")) | |
| _Array = ffi.new("uint8_t[1]").__class__.__bases__ | |
| def load_lib(name, cdecl): | |
| """Load a shared library and return a handle to it. | |
| @name, either an absolute path or the name of a library | |
| in the system search path. | |
| @cdecl, the C function declarations. | |
| """ | |
| if hasattr(ffi, "RTLD_DEEPBIND") and not os.getenv('PYCRYPTODOME_DISABLE_DEEPBIND'): | |
| lib = ffi.dlopen(name, ffi.RTLD_DEEPBIND) | |
| else: | |
| lib = ffi.dlopen(name) | |
| ffi.cdef(cdecl) | |
| return lib | |
| def c_ulong(x): | |
| """Convert a Python integer to unsigned long""" | |
| return x | |
| c_ulonglong = c_ulong | |
| c_uint = c_ulong | |
| c_ubyte = c_ulong | |
| def c_size_t(x): | |
| """Convert a Python integer to size_t""" | |
| return x | |
| def create_string_buffer(init_or_size, size=None): | |
| """Allocate the given amount of bytes (initially set to 0)""" | |
| if isinstance(init_or_size, bytes): | |
| size = max(len(init_or_size) + 1, size) | |
| result = ffi.new("uint8_t[]", size) | |
| result[:] = init_or_size | |
| else: | |
| if size: | |
| raise ValueError("Size must be specified once only") | |
| result = ffi.new("uint8_t[]", init_or_size) | |
| return result | |
| def get_c_string(c_string): | |
| """Convert a C string into a Python byte sequence""" | |
| return ffi.string(c_string) | |
| def get_raw_buffer(buf): | |
| """Convert a C buffer into a Python byte sequence""" | |
| return ffi.buffer(buf)[:] | |
| def c_uint8_ptr(data): | |
| if isinstance(data, _buffer_type): | |
| # This only works for cffi >= 1.7 | |
| return ffi.cast(uint8_t_type, ffi.from_buffer(data)) | |
| elif byte_string(data) or isinstance(data, _Array): | |
| return data | |
| else: | |
| raise TypeError("Object type %s cannot be passed to C code" % type(data)) | |
| class VoidPointer_cffi(_VoidPointer): | |
| """Model a newly allocated pointer to void""" | |
| def __init__(self): | |
| self._pp = ffi.new("void *[1]") | |
| def get(self): | |
| return self._pp[0] | |
| def address_of(self): | |
| return self._pp | |
| def VoidPointer(): | |
| return VoidPointer_cffi() | |
| backend = "cffi" | |
| except ImportError: | |
| import ctypes | |
| from ctypes import (CDLL, c_void_p, byref, c_ulong, c_ulonglong, c_size_t, | |
| create_string_buffer, c_ubyte, c_uint) | |
| from ctypes.util import find_library | |
| from ctypes import Array as _Array | |
| null_pointer = None | |
| cached_architecture = [] | |
| def c_ubyte(c): | |
| if not (0 <= c < 256): | |
| raise OverflowError() | |
| return ctypes.c_ubyte(c) | |
| def load_lib(name, cdecl): | |
| if not cached_architecture: | |
| # platform.architecture() creates a subprocess, so caching the | |
| # result makes successive imports faster. | |
| import platform | |
| cached_architecture[:] = platform.architecture() | |
| bits, linkage = cached_architecture | |
| if "." not in name and not linkage.startswith("Win"): | |
| full_name = find_library(name) | |
| if full_name is None: | |
| raise OSError("Cannot load library '%s'" % name) | |
| name = full_name | |
| return CDLL(name) | |
| def get_c_string(c_string): | |
| return c_string.value | |
| def get_raw_buffer(buf): | |
| return buf.raw | |
| # ---- Get raw pointer --- | |
| _c_ssize_t = ctypes.c_ssize_t | |
| _PyBUF_SIMPLE = 0 | |
| _PyObject_GetBuffer = ctypes.pythonapi.PyObject_GetBuffer | |
| _PyBuffer_Release = ctypes.pythonapi.PyBuffer_Release | |
| _py_object = ctypes.py_object | |
| _c_ssize_p = ctypes.POINTER(_c_ssize_t) | |
| # See Include/object.h for CPython | |
| # and https://github.com/pallets/click/blob/master/src/click/_winconsole.py | |
| class _Py_buffer(ctypes.Structure): | |
| _fields_ = [ | |
| ('buf', c_void_p), | |
| ('obj', ctypes.py_object), | |
| ('len', _c_ssize_t), | |
| ('itemsize', _c_ssize_t), | |
| ('readonly', ctypes.c_int), | |
| ('ndim', ctypes.c_int), | |
| ('format', ctypes.c_char_p), | |
| ('shape', _c_ssize_p), | |
| ('strides', _c_ssize_p), | |
| ('suboffsets', _c_ssize_p), | |
| ('internal', c_void_p) | |
| ] | |
| # Extra field for CPython 2.6/2.7 | |
| if sys.version_info[0] == 2: | |
| _fields_.insert(-1, ('smalltable', _c_ssize_t * 2)) | |
| def c_uint8_ptr(data): | |
| if byte_string(data) or isinstance(data, _Array): | |
| return data | |
| elif isinstance(data, _buffer_type): | |
| obj = _py_object(data) | |
| buf = _Py_buffer() | |
| _PyObject_GetBuffer(obj, byref(buf), _PyBUF_SIMPLE) | |
| try: | |
| buffer_type = ctypes.c_ubyte * buf.len | |
| return buffer_type.from_address(buf.buf) | |
| finally: | |
| _PyBuffer_Release(byref(buf)) | |
| else: | |
| raise TypeError("Object type %s cannot be passed to C code" % type(data)) | |
| # --- | |
| class VoidPointer_ctypes(_VoidPointer): | |
| """Model a newly allocated pointer to void""" | |
| def __init__(self): | |
| self._p = c_void_p() | |
| def get(self): | |
| return self._p | |
| def address_of(self): | |
| return byref(self._p) | |
| def VoidPointer(): | |
| return VoidPointer_ctypes() | |
| backend = "ctypes" | |
| class SmartPointer(object): | |
| """Class to hold a non-managed piece of memory""" | |
| def __init__(self, raw_pointer, destructor): | |
| self._raw_pointer = raw_pointer | |
| self._destructor = destructor | |
| def get(self): | |
| return self._raw_pointer | |
| def release(self): | |
| rp, self._raw_pointer = self._raw_pointer, None | |
| return rp | |
| def __del__(self): | |
| try: | |
| if self._raw_pointer is not None: | |
| self._destructor(self._raw_pointer) | |
| self._raw_pointer = None | |
| except AttributeError: | |
| pass | |
| def load_pycryptodome_raw_lib(name, cdecl): | |
| """Load a shared library and return a handle to it. | |
| @name, the name of the library expressed as a PyCryptodome module, | |
| for instance Crypto.Cipher._raw_cbc. | |
| @cdecl, the C function declarations. | |
| """ | |
| split = name.split(".") | |
| dir_comps, basename = split[:-1], split[-1] | |
| attempts = [] | |
| for ext in extension_suffixes: | |
| try: | |
| filename = basename + ext | |
| full_name = pycryptodome_filename(dir_comps, filename) | |
| if not os.path.isfile(full_name): | |
| attempts.append("Not found '%s'" % filename) | |
| continue | |
| return load_lib(full_name, cdecl) | |
| except OSError as exp: | |
| attempts.append("Cannot load '%s': %s" % (filename, str(exp))) | |
| raise OSError("Cannot load native module '%s': %s" % (name, ", ".join(attempts))) | |
| def is_buffer(x): | |
| """Return True if object x supports the buffer interface""" | |
| return isinstance(x, (bytes, bytearray, memoryview)) | |
| def is_writeable_buffer(x): | |
| return (isinstance(x, bytearray) or | |
| (isinstance(x, memoryview) and not x.readonly)) | |