|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import abc |
|
|
import sys |
|
|
from Crypto.Util.py3compat import byte_string |
|
|
from Crypto.Util._file_system import pycryptodome_filename |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if sys.version_info[0] < 3: |
|
|
|
|
|
import imp |
|
|
extension_suffixes = [] |
|
|
for ext, mod, typ in imp.get_suffixes(): |
|
|
if typ == imp.C_EXTENSION: |
|
|
extension_suffixes.append(ext) |
|
|
|
|
|
else: |
|
|
|
|
|
from importlib import machinery |
|
|
extension_suffixes = machinery.EXTENSION_SUFFIXES |
|
|
|
|
|
|
|
|
_buffer_type = (bytearray, memoryview) |
|
|
|
|
|
|
|
|
class _VoidPointer(object): |
|
|
@abc.abstractmethod |
|
|
def get(self): |
|
|
"""Return the memory location we point to""" |
|
|
return |
|
|
|
|
|
@abc.abstractmethod |
|
|
def address_of(self): |
|
|
"""Return a raw pointer to this pointer""" |
|
|
return |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if '__pypy__' not in sys.builtin_module_names and sys.flags.optimize == 2: |
|
|
raise ImportError("CFFI with optimize=2 fails due to pycparser bug.") |
|
|
|
|
|
from cffi import FFI |
|
|
|
|
|
ffi = FFI() |
|
|
null_pointer = ffi.NULL |
|
|
uint8_t_type = ffi.typeof(ffi.new("const uint8_t*")) |
|
|
|
|
|
_Array = ffi.new("uint8_t[1]").__class__.__bases__ |
|
|
|
|
|
def load_lib(name, cdecl): |
|
|
"""Load a shared library and return a handle to it. |
|
|
|
|
|
@name, either an absolute path or the name of a library |
|
|
in the system search path. |
|
|
|
|
|
@cdecl, the C function declarations. |
|
|
""" |
|
|
|
|
|
if hasattr(ffi, "RTLD_DEEPBIND") and not os.getenv('PYCRYPTODOME_DISABLE_DEEPBIND'): |
|
|
lib = ffi.dlopen(name, ffi.RTLD_DEEPBIND) |
|
|
else: |
|
|
lib = ffi.dlopen(name) |
|
|
ffi.cdef(cdecl) |
|
|
return lib |
|
|
|
|
|
def c_ulong(x): |
|
|
"""Convert a Python integer to unsigned long""" |
|
|
return x |
|
|
|
|
|
c_ulonglong = c_ulong |
|
|
c_uint = c_ulong |
|
|
c_ubyte = c_ulong |
|
|
|
|
|
def c_size_t(x): |
|
|
"""Convert a Python integer to size_t""" |
|
|
return x |
|
|
|
|
|
def create_string_buffer(init_or_size, size=None): |
|
|
"""Allocate the given amount of bytes (initially set to 0)""" |
|
|
|
|
|
if isinstance(init_or_size, bytes): |
|
|
size = max(len(init_or_size) + 1, size) |
|
|
result = ffi.new("uint8_t[]", size) |
|
|
result[:] = init_or_size |
|
|
else: |
|
|
if size: |
|
|
raise ValueError("Size must be specified once only") |
|
|
result = ffi.new("uint8_t[]", init_or_size) |
|
|
return result |
|
|
|
|
|
def get_c_string(c_string): |
|
|
"""Convert a C string into a Python byte sequence""" |
|
|
return ffi.string(c_string) |
|
|
|
|
|
def get_raw_buffer(buf): |
|
|
"""Convert a C buffer into a Python byte sequence""" |
|
|
return ffi.buffer(buf)[:] |
|
|
|
|
|
def c_uint8_ptr(data): |
|
|
if isinstance(data, _buffer_type): |
|
|
|
|
|
return ffi.cast(uint8_t_type, ffi.from_buffer(data)) |
|
|
elif byte_string(data) or isinstance(data, _Array): |
|
|
return data |
|
|
else: |
|
|
raise TypeError("Object type %s cannot be passed to C code" % type(data)) |
|
|
|
|
|
class VoidPointer_cffi(_VoidPointer): |
|
|
"""Model a newly allocated pointer to void""" |
|
|
|
|
|
def __init__(self): |
|
|
self._pp = ffi.new("void *[1]") |
|
|
|
|
|
def get(self): |
|
|
return self._pp[0] |
|
|
|
|
|
def address_of(self): |
|
|
return self._pp |
|
|
|
|
|
def VoidPointer(): |
|
|
return VoidPointer_cffi() |
|
|
|
|
|
backend = "cffi" |
|
|
|
|
|
except ImportError: |
|
|
|
|
|
import ctypes |
|
|
from ctypes import (CDLL, c_void_p, byref, c_ulong, c_ulonglong, c_size_t, |
|
|
create_string_buffer, c_ubyte, c_uint) |
|
|
from ctypes.util import find_library |
|
|
from ctypes import Array as _Array |
|
|
|
|
|
null_pointer = None |
|
|
cached_architecture = [] |
|
|
|
|
|
def c_ubyte(c): |
|
|
if not (0 <= c < 256): |
|
|
raise OverflowError() |
|
|
return ctypes.c_ubyte(c) |
|
|
|
|
|
def load_lib(name, cdecl): |
|
|
if not cached_architecture: |
|
|
|
|
|
|
|
|
import platform |
|
|
cached_architecture[:] = platform.architecture() |
|
|
bits, linkage = cached_architecture |
|
|
if "." not in name and not linkage.startswith("Win"): |
|
|
full_name = find_library(name) |
|
|
if full_name is None: |
|
|
raise OSError("Cannot load library '%s'" % name) |
|
|
name = full_name |
|
|
return CDLL(name) |
|
|
|
|
|
def get_c_string(c_string): |
|
|
return c_string.value |
|
|
|
|
|
def get_raw_buffer(buf): |
|
|
return buf.raw |
|
|
|
|
|
|
|
|
|
|
|
_c_ssize_t = ctypes.c_ssize_t |
|
|
|
|
|
_PyBUF_SIMPLE = 0 |
|
|
_PyObject_GetBuffer = ctypes.pythonapi.PyObject_GetBuffer |
|
|
_PyBuffer_Release = ctypes.pythonapi.PyBuffer_Release |
|
|
_py_object = ctypes.py_object |
|
|
_c_ssize_p = ctypes.POINTER(_c_ssize_t) |
|
|
|
|
|
|
|
|
|
|
|
class _Py_buffer(ctypes.Structure): |
|
|
_fields_ = [ |
|
|
('buf', c_void_p), |
|
|
('obj', ctypes.py_object), |
|
|
('len', _c_ssize_t), |
|
|
('itemsize', _c_ssize_t), |
|
|
('readonly', ctypes.c_int), |
|
|
('ndim', ctypes.c_int), |
|
|
('format', ctypes.c_char_p), |
|
|
('shape', _c_ssize_p), |
|
|
('strides', _c_ssize_p), |
|
|
('suboffsets', _c_ssize_p), |
|
|
('internal', c_void_p) |
|
|
] |
|
|
|
|
|
|
|
|
if sys.version_info[0] == 2: |
|
|
_fields_.insert(-1, ('smalltable', _c_ssize_t * 2)) |
|
|
|
|
|
def c_uint8_ptr(data): |
|
|
if byte_string(data) or isinstance(data, _Array): |
|
|
return data |
|
|
elif isinstance(data, _buffer_type): |
|
|
obj = _py_object(data) |
|
|
buf = _Py_buffer() |
|
|
_PyObject_GetBuffer(obj, byref(buf), _PyBUF_SIMPLE) |
|
|
try: |
|
|
buffer_type = ctypes.c_ubyte * buf.len |
|
|
return buffer_type.from_address(buf.buf) |
|
|
finally: |
|
|
_PyBuffer_Release(byref(buf)) |
|
|
else: |
|
|
raise TypeError("Object type %s cannot be passed to C code" % type(data)) |
|
|
|
|
|
|
|
|
|
|
|
class VoidPointer_ctypes(_VoidPointer): |
|
|
"""Model a newly allocated pointer to void""" |
|
|
|
|
|
def __init__(self): |
|
|
self._p = c_void_p() |
|
|
|
|
|
def get(self): |
|
|
return self._p |
|
|
|
|
|
def address_of(self): |
|
|
return byref(self._p) |
|
|
|
|
|
def VoidPointer(): |
|
|
return VoidPointer_ctypes() |
|
|
|
|
|
backend = "ctypes" |
|
|
|
|
|
|
|
|
class SmartPointer(object): |
|
|
"""Class to hold a non-managed piece of memory""" |
|
|
|
|
|
def __init__(self, raw_pointer, destructor): |
|
|
self._raw_pointer = raw_pointer |
|
|
self._destructor = destructor |
|
|
|
|
|
def get(self): |
|
|
return self._raw_pointer |
|
|
|
|
|
def release(self): |
|
|
rp, self._raw_pointer = self._raw_pointer, None |
|
|
return rp |
|
|
|
|
|
def __del__(self): |
|
|
try: |
|
|
if self._raw_pointer is not None: |
|
|
self._destructor(self._raw_pointer) |
|
|
self._raw_pointer = None |
|
|
except AttributeError: |
|
|
pass |
|
|
|
|
|
|
|
|
def load_pycryptodome_raw_lib(name, cdecl): |
|
|
"""Load a shared library and return a handle to it. |
|
|
|
|
|
@name, the name of the library expressed as a PyCryptodome module, |
|
|
for instance Crypto.Cipher._raw_cbc. |
|
|
|
|
|
@cdecl, the C function declarations. |
|
|
""" |
|
|
|
|
|
split = name.split(".") |
|
|
dir_comps, basename = split[:-1], split[-1] |
|
|
attempts = [] |
|
|
for ext in extension_suffixes: |
|
|
try: |
|
|
filename = basename + ext |
|
|
full_name = pycryptodome_filename(dir_comps, filename) |
|
|
if not os.path.isfile(full_name): |
|
|
attempts.append("Not found '%s'" % filename) |
|
|
continue |
|
|
return load_lib(full_name, cdecl) |
|
|
except OSError as exp: |
|
|
attempts.append("Cannot load '%s': %s" % (filename, str(exp))) |
|
|
raise OSError("Cannot load native module '%s': %s" % (name, ", ".join(attempts))) |
|
|
|
|
|
|
|
|
def is_buffer(x): |
|
|
"""Return True if object x supports the buffer interface""" |
|
|
return isinstance(x, (bytes, bytearray, memoryview)) |
|
|
|
|
|
|
|
|
def is_writeable_buffer(x): |
|
|
return (isinstance(x, bytearray) or |
|
|
(isinstance(x, memoryview) and not x.readonly)) |
|
|
|