| | import ctypes, ctypes.util, operator, sys |
| | from . import model |
| |
|
| | if sys.version_info < (3,): |
| | bytechr = chr |
| | else: |
| | unicode = str |
| | long = int |
| | xrange = range |
| | bytechr = lambda num: bytes([num]) |
| |
|
| | class CTypesType(type): |
| | pass |
| |
|
| | class CTypesData(object): |
| | __metaclass__ = CTypesType |
| | __slots__ = ['__weakref__'] |
| | __name__ = '<cdata>' |
| |
|
| | def __init__(self, *args): |
| | raise TypeError("cannot instantiate %r" % (self.__class__,)) |
| |
|
| | @classmethod |
| | def _newp(cls, init): |
| | raise TypeError("expected a pointer or array ctype, got '%s'" |
| | % (cls._get_c_name(),)) |
| |
|
| | @staticmethod |
| | def _to_ctypes(value): |
| | raise TypeError |
| |
|
| | @classmethod |
| | def _arg_to_ctypes(cls, *value): |
| | try: |
| | ctype = cls._ctype |
| | except AttributeError: |
| | raise TypeError("cannot create an instance of %r" % (cls,)) |
| | if value: |
| | res = cls._to_ctypes(*value) |
| | if not isinstance(res, ctype): |
| | res = cls._ctype(res) |
| | else: |
| | res = cls._ctype() |
| | return res |
| |
|
| | @classmethod |
| | def _create_ctype_obj(cls, init): |
| | if init is None: |
| | return cls._arg_to_ctypes() |
| | else: |
| | return cls._arg_to_ctypes(init) |
| |
|
| | @staticmethod |
| | def _from_ctypes(ctypes_value): |
| | raise TypeError |
| |
|
| | @classmethod |
| | def _get_c_name(cls, replace_with=''): |
| | return cls._reftypename.replace(' &', replace_with) |
| |
|
| | @classmethod |
| | def _fix_class(cls): |
| | cls.__name__ = 'CData<%s>' % (cls._get_c_name(),) |
| | cls.__qualname__ = 'CData<%s>' % (cls._get_c_name(),) |
| | cls.__module__ = 'ffi' |
| |
|
| | def _get_own_repr(self): |
| | raise NotImplementedError |
| |
|
| | def _addr_repr(self, address): |
| | if address == 0: |
| | return 'NULL' |
| | else: |
| | if address < 0: |
| | address += 1 << (8*ctypes.sizeof(ctypes.c_void_p)) |
| | return '0x%x' % address |
| |
|
| | def __repr__(self, c_name=None): |
| | own = self._get_own_repr() |
| | return '<cdata %r %s>' % (c_name or self._get_c_name(), own) |
| |
|
| | def _convert_to_address(self, BClass): |
| | if BClass is None: |
| | raise TypeError("cannot convert %r to an address" % ( |
| | self._get_c_name(),)) |
| | else: |
| | raise TypeError("cannot convert %r to %r" % ( |
| | self._get_c_name(), BClass._get_c_name())) |
| |
|
| | @classmethod |
| | def _get_size(cls): |
| | return ctypes.sizeof(cls._ctype) |
| |
|
| | def _get_size_of_instance(self): |
| | return ctypes.sizeof(self._ctype) |
| |
|
| | @classmethod |
| | def _cast_from(cls, source): |
| | raise TypeError("cannot cast to %r" % (cls._get_c_name(),)) |
| |
|
| | def _cast_to_integer(self): |
| | return self._convert_to_address(None) |
| |
|
| | @classmethod |
| | def _alignment(cls): |
| | return ctypes.alignment(cls._ctype) |
| |
|
| | def __iter__(self): |
| | raise TypeError("cdata %r does not support iteration" % ( |
| | self._get_c_name()),) |
| |
|
| | def _make_cmp(name): |
| | cmpfunc = getattr(operator, name) |
| | def cmp(self, other): |
| | v_is_ptr = not isinstance(self, CTypesGenericPrimitive) |
| | w_is_ptr = (isinstance(other, CTypesData) and |
| | not isinstance(other, CTypesGenericPrimitive)) |
| | if v_is_ptr and w_is_ptr: |
| | return cmpfunc(self._convert_to_address(None), |
| | other._convert_to_address(None)) |
| | elif v_is_ptr or w_is_ptr: |
| | return NotImplemented |
| | else: |
| | if isinstance(self, CTypesGenericPrimitive): |
| | self = self._value |
| | if isinstance(other, CTypesGenericPrimitive): |
| | other = other._value |
| | return cmpfunc(self, other) |
| | cmp.func_name = name |
| | return cmp |
| |
|
| | __eq__ = _make_cmp('__eq__') |
| | __ne__ = _make_cmp('__ne__') |
| | __lt__ = _make_cmp('__lt__') |
| | __le__ = _make_cmp('__le__') |
| | __gt__ = _make_cmp('__gt__') |
| | __ge__ = _make_cmp('__ge__') |
| |
|
| | def __hash__(self): |
| | return hash(self._convert_to_address(None)) |
| |
|
| | def _to_string(self, maxlen): |
| | raise TypeError("string(): %r" % (self,)) |
| |
|
| |
|
| | class CTypesGenericPrimitive(CTypesData): |
| | __slots__ = [] |
| |
|
| | def __hash__(self): |
| | return hash(self._value) |
| |
|
| | def _get_own_repr(self): |
| | return repr(self._from_ctypes(self._value)) |
| |
|
| |
|
| | class CTypesGenericArray(CTypesData): |
| | __slots__ = [] |
| |
|
| | @classmethod |
| | def _newp(cls, init): |
| | return cls(init) |
| |
|
| | def __iter__(self): |
| | for i in xrange(len(self)): |
| | yield self[i] |
| |
|
| | def _get_own_repr(self): |
| | return self._addr_repr(ctypes.addressof(self._blob)) |
| |
|
| |
|
| | class CTypesGenericPtr(CTypesData): |
| | __slots__ = ['_address', '_as_ctype_ptr'] |
| | _automatic_casts = False |
| | kind = "pointer" |
| |
|
| | @classmethod |
| | def _newp(cls, init): |
| | return cls(init) |
| |
|
| | @classmethod |
| | def _cast_from(cls, source): |
| | if source is None: |
| | address = 0 |
| | elif isinstance(source, CTypesData): |
| | address = source._cast_to_integer() |
| | elif isinstance(source, (int, long)): |
| | address = source |
| | else: |
| | raise TypeError("bad type for cast to %r: %r" % |
| | (cls, type(source).__name__)) |
| | return cls._new_pointer_at(address) |
| |
|
| | @classmethod |
| | def _new_pointer_at(cls, address): |
| | self = cls.__new__(cls) |
| | self._address = address |
| | self._as_ctype_ptr = ctypes.cast(address, cls._ctype) |
| | return self |
| |
|
| | def _get_own_repr(self): |
| | try: |
| | return self._addr_repr(self._address) |
| | except AttributeError: |
| | return '???' |
| |
|
| | def _cast_to_integer(self): |
| | return self._address |
| |
|
| | def __nonzero__(self): |
| | return bool(self._address) |
| | __bool__ = __nonzero__ |
| |
|
| | @classmethod |
| | def _to_ctypes(cls, value): |
| | if not isinstance(value, CTypesData): |
| | raise TypeError("unexpected %s object" % type(value).__name__) |
| | address = value._convert_to_address(cls) |
| | return ctypes.cast(address, cls._ctype) |
| |
|
| | @classmethod |
| | def _from_ctypes(cls, ctypes_ptr): |
| | address = ctypes.cast(ctypes_ptr, ctypes.c_void_p).value or 0 |
| | return cls._new_pointer_at(address) |
| |
|
| | @classmethod |
| | def _initialize(cls, ctypes_ptr, value): |
| | if value: |
| | ctypes_ptr.contents = cls._to_ctypes(value).contents |
| |
|
| | def _convert_to_address(self, BClass): |
| | if (BClass in (self.__class__, None) or BClass._automatic_casts |
| | or self._automatic_casts): |
| | return self._address |
| | else: |
| | return CTypesData._convert_to_address(self, BClass) |
| |
|
| |
|
| | class CTypesBaseStructOrUnion(CTypesData): |
| | __slots__ = ['_blob'] |
| |
|
| | @classmethod |
| | def _create_ctype_obj(cls, init): |
| | |
| | raise TypeError("cannot instantiate opaque type %s" % (cls,)) |
| |
|
| | def _get_own_repr(self): |
| | return self._addr_repr(ctypes.addressof(self._blob)) |
| |
|
| | @classmethod |
| | def _offsetof(cls, fieldname): |
| | return getattr(cls._ctype, fieldname).offset |
| |
|
| | def _convert_to_address(self, BClass): |
| | if getattr(BClass, '_BItem', None) is self.__class__: |
| | return ctypes.addressof(self._blob) |
| | else: |
| | return CTypesData._convert_to_address(self, BClass) |
| |
|
| | @classmethod |
| | def _from_ctypes(cls, ctypes_struct_or_union): |
| | self = cls.__new__(cls) |
| | self._blob = ctypes_struct_or_union |
| | return self |
| |
|
| | @classmethod |
| | def _to_ctypes(cls, value): |
| | return value._blob |
| |
|
| | def __repr__(self, c_name=None): |
| | return CTypesData.__repr__(self, c_name or self._get_c_name(' &')) |
| |
|
| |
|
| | class CTypesBackend(object): |
| |
|
| | PRIMITIVE_TYPES = { |
| | 'char': ctypes.c_char, |
| | 'short': ctypes.c_short, |
| | 'int': ctypes.c_int, |
| | 'long': ctypes.c_long, |
| | 'long long': ctypes.c_longlong, |
| | 'signed char': ctypes.c_byte, |
| | 'unsigned char': ctypes.c_ubyte, |
| | 'unsigned short': ctypes.c_ushort, |
| | 'unsigned int': ctypes.c_uint, |
| | 'unsigned long': ctypes.c_ulong, |
| | 'unsigned long long': ctypes.c_ulonglong, |
| | 'float': ctypes.c_float, |
| | 'double': ctypes.c_double, |
| | '_Bool': ctypes.c_bool, |
| | } |
| |
|
| | for _name in ['unsigned long long', 'unsigned long', |
| | 'unsigned int', 'unsigned short', 'unsigned char']: |
| | _size = ctypes.sizeof(PRIMITIVE_TYPES[_name]) |
| | PRIMITIVE_TYPES['uint%d_t' % (8*_size)] = PRIMITIVE_TYPES[_name] |
| | if _size == ctypes.sizeof(ctypes.c_void_p): |
| | PRIMITIVE_TYPES['uintptr_t'] = PRIMITIVE_TYPES[_name] |
| | if _size == ctypes.sizeof(ctypes.c_size_t): |
| | PRIMITIVE_TYPES['size_t'] = PRIMITIVE_TYPES[_name] |
| |
|
| | for _name in ['long long', 'long', 'int', 'short', 'signed char']: |
| | _size = ctypes.sizeof(PRIMITIVE_TYPES[_name]) |
| | PRIMITIVE_TYPES['int%d_t' % (8*_size)] = PRIMITIVE_TYPES[_name] |
| | if _size == ctypes.sizeof(ctypes.c_void_p): |
| | PRIMITIVE_TYPES['intptr_t'] = PRIMITIVE_TYPES[_name] |
| | PRIMITIVE_TYPES['ptrdiff_t'] = PRIMITIVE_TYPES[_name] |
| | if _size == ctypes.sizeof(ctypes.c_size_t): |
| | PRIMITIVE_TYPES['ssize_t'] = PRIMITIVE_TYPES[_name] |
| |
|
| |
|
| | def __init__(self): |
| | self.RTLD_LAZY = 0 |
| | self.RTLD_NOW = 0 |
| | self.RTLD_GLOBAL = ctypes.RTLD_GLOBAL |
| | self.RTLD_LOCAL = ctypes.RTLD_LOCAL |
| |
|
| | def set_ffi(self, ffi): |
| | self.ffi = ffi |
| |
|
| | def _get_types(self): |
| | return CTypesData, CTypesType |
| |
|
| | def load_library(self, path, flags=0): |
| | cdll = ctypes.CDLL(path, flags) |
| | return CTypesLibrary(self, cdll) |
| |
|
| | def new_void_type(self): |
| | class CTypesVoid(CTypesData): |
| | __slots__ = [] |
| | _reftypename = 'void &' |
| | @staticmethod |
| | def _from_ctypes(novalue): |
| | return None |
| | @staticmethod |
| | def _to_ctypes(novalue): |
| | if novalue is not None: |
| | raise TypeError("None expected, got %s object" % |
| | (type(novalue).__name__,)) |
| | return None |
| | CTypesVoid._fix_class() |
| | return CTypesVoid |
| |
|
| | def new_primitive_type(self, name): |
| | if name == 'wchar_t': |
| | raise NotImplementedError(name) |
| | ctype = self.PRIMITIVE_TYPES[name] |
| | if name == 'char': |
| | kind = 'char' |
| | elif name in ('float', 'double'): |
| | kind = 'float' |
| | else: |
| | if name in ('signed char', 'unsigned char'): |
| | kind = 'byte' |
| | elif name == '_Bool': |
| | kind = 'bool' |
| | else: |
| | kind = 'int' |
| | is_signed = (ctype(-1).value == -1) |
| | |
| | def _cast_source_to_int(source): |
| | if isinstance(source, (int, long, float)): |
| | source = int(source) |
| | elif isinstance(source, CTypesData): |
| | source = source._cast_to_integer() |
| | elif isinstance(source, bytes): |
| | source = ord(source) |
| | elif source is None: |
| | source = 0 |
| | else: |
| | raise TypeError("bad type for cast to %r: %r" % |
| | (CTypesPrimitive, type(source).__name__)) |
| | return source |
| | |
| | kind1 = kind |
| | class CTypesPrimitive(CTypesGenericPrimitive): |
| | __slots__ = ['_value'] |
| | _ctype = ctype |
| | _reftypename = '%s &' % name |
| | kind = kind1 |
| |
|
| | def __init__(self, value): |
| | self._value = value |
| |
|
| | @staticmethod |
| | def _create_ctype_obj(init): |
| | if init is None: |
| | return ctype() |
| | return ctype(CTypesPrimitive._to_ctypes(init)) |
| |
|
| | if kind == 'int' or kind == 'byte': |
| | @classmethod |
| | def _cast_from(cls, source): |
| | source = _cast_source_to_int(source) |
| | source = ctype(source).value |
| | return cls(source) |
| | def __int__(self): |
| | return self._value |
| |
|
| | if kind == 'bool': |
| | @classmethod |
| | def _cast_from(cls, source): |
| | if not isinstance(source, (int, long, float)): |
| | source = _cast_source_to_int(source) |
| | return cls(bool(source)) |
| | def __int__(self): |
| | return int(self._value) |
| |
|
| | if kind == 'char': |
| | @classmethod |
| | def _cast_from(cls, source): |
| | source = _cast_source_to_int(source) |
| | source = bytechr(source & 0xFF) |
| | return cls(source) |
| | def __int__(self): |
| | return ord(self._value) |
| |
|
| | if kind == 'float': |
| | @classmethod |
| | def _cast_from(cls, source): |
| | if isinstance(source, float): |
| | pass |
| | elif isinstance(source, CTypesGenericPrimitive): |
| | if hasattr(source, '__float__'): |
| | source = float(source) |
| | else: |
| | source = int(source) |
| | else: |
| | source = _cast_source_to_int(source) |
| | source = ctype(source).value |
| | return cls(source) |
| | def __int__(self): |
| | return int(self._value) |
| | def __float__(self): |
| | return self._value |
| |
|
| | _cast_to_integer = __int__ |
| |
|
| | if kind == 'int' or kind == 'byte' or kind == 'bool': |
| | @staticmethod |
| | def _to_ctypes(x): |
| | if not isinstance(x, (int, long)): |
| | if isinstance(x, CTypesData): |
| | x = int(x) |
| | else: |
| | raise TypeError("integer expected, got %s" % |
| | type(x).__name__) |
| | if ctype(x).value != x: |
| | if not is_signed and x < 0: |
| | raise OverflowError("%s: negative integer" % name) |
| | else: |
| | raise OverflowError("%s: integer out of bounds" |
| | % name) |
| | return x |
| |
|
| | if kind == 'char': |
| | @staticmethod |
| | def _to_ctypes(x): |
| | if isinstance(x, bytes) and len(x) == 1: |
| | return x |
| | if isinstance(x, CTypesPrimitive): |
| | return x._value |
| | raise TypeError("character expected, got %s" % |
| | type(x).__name__) |
| | def __nonzero__(self): |
| | return ord(self._value) != 0 |
| | else: |
| | def __nonzero__(self): |
| | return self._value != 0 |
| | __bool__ = __nonzero__ |
| |
|
| | if kind == 'float': |
| | @staticmethod |
| | def _to_ctypes(x): |
| | if not isinstance(x, (int, long, float, CTypesData)): |
| | raise TypeError("float expected, got %s" % |
| | type(x).__name__) |
| | return ctype(x).value |
| |
|
| | @staticmethod |
| | def _from_ctypes(value): |
| | return getattr(value, 'value', value) |
| |
|
| | @staticmethod |
| | def _initialize(blob, init): |
| | blob.value = CTypesPrimitive._to_ctypes(init) |
| |
|
| | if kind == 'char': |
| | def _to_string(self, maxlen): |
| | return self._value |
| | if kind == 'byte': |
| | def _to_string(self, maxlen): |
| | return chr(self._value & 0xff) |
| | |
| | CTypesPrimitive._fix_class() |
| | return CTypesPrimitive |
| |
|
| | def new_pointer_type(self, BItem): |
| | getbtype = self.ffi._get_cached_btype |
| | if BItem is getbtype(model.PrimitiveType('char')): |
| | kind = 'charp' |
| | elif BItem in (getbtype(model.PrimitiveType('signed char')), |
| | getbtype(model.PrimitiveType('unsigned char'))): |
| | kind = 'bytep' |
| | elif BItem is getbtype(model.void_type): |
| | kind = 'voidp' |
| | else: |
| | kind = 'generic' |
| | |
| | class CTypesPtr(CTypesGenericPtr): |
| | __slots__ = ['_own'] |
| | if kind == 'charp': |
| | __slots__ += ['__as_strbuf'] |
| | _BItem = BItem |
| | if hasattr(BItem, '_ctype'): |
| | _ctype = ctypes.POINTER(BItem._ctype) |
| | _bitem_size = ctypes.sizeof(BItem._ctype) |
| | else: |
| | _ctype = ctypes.c_void_p |
| | if issubclass(BItem, CTypesGenericArray): |
| | _reftypename = BItem._get_c_name('(* &)') |
| | else: |
| | _reftypename = BItem._get_c_name(' * &') |
| |
|
| | def __init__(self, init): |
| | ctypeobj = BItem._create_ctype_obj(init) |
| | if kind == 'charp': |
| | self.__as_strbuf = ctypes.create_string_buffer( |
| | ctypeobj.value + b'\x00') |
| | self._as_ctype_ptr = ctypes.cast( |
| | self.__as_strbuf, self._ctype) |
| | else: |
| | self._as_ctype_ptr = ctypes.pointer(ctypeobj) |
| | self._address = ctypes.cast(self._as_ctype_ptr, |
| | ctypes.c_void_p).value |
| | self._own = True |
| |
|
| | def __add__(self, other): |
| | if isinstance(other, (int, long)): |
| | return self._new_pointer_at(self._address + |
| | other * self._bitem_size) |
| | else: |
| | return NotImplemented |
| |
|
| | def __sub__(self, other): |
| | if isinstance(other, (int, long)): |
| | return self._new_pointer_at(self._address - |
| | other * self._bitem_size) |
| | elif type(self) is type(other): |
| | return (self._address - other._address) // self._bitem_size |
| | else: |
| | return NotImplemented |
| |
|
| | def __getitem__(self, index): |
| | if getattr(self, '_own', False) and index != 0: |
| | raise IndexError |
| | return BItem._from_ctypes(self._as_ctype_ptr[index]) |
| |
|
| | def __setitem__(self, index, value): |
| | self._as_ctype_ptr[index] = BItem._to_ctypes(value) |
| |
|
| | if kind == 'charp' or kind == 'voidp': |
| | @classmethod |
| | def _arg_to_ctypes(cls, *value): |
| | if value and isinstance(value[0], bytes): |
| | return ctypes.c_char_p(value[0]) |
| | else: |
| | return super(CTypesPtr, cls)._arg_to_ctypes(*value) |
| |
|
| | if kind == 'charp' or kind == 'bytep': |
| | def _to_string(self, maxlen): |
| | if maxlen < 0: |
| | maxlen = sys.maxsize |
| | p = ctypes.cast(self._as_ctype_ptr, |
| | ctypes.POINTER(ctypes.c_char)) |
| | n = 0 |
| | while n < maxlen and p[n] != b'\x00': |
| | n += 1 |
| | return b''.join([p[i] for i in range(n)]) |
| |
|
| | def _get_own_repr(self): |
| | if getattr(self, '_own', False): |
| | return 'owning %d bytes' % ( |
| | ctypes.sizeof(self._as_ctype_ptr.contents),) |
| | return super(CTypesPtr, self)._get_own_repr() |
| | |
| | if (BItem is self.ffi._get_cached_btype(model.void_type) or |
| | BItem is self.ffi._get_cached_btype(model.PrimitiveType('char'))): |
| | CTypesPtr._automatic_casts = True |
| | |
| | CTypesPtr._fix_class() |
| | return CTypesPtr |
| |
|
| | def new_array_type(self, CTypesPtr, length): |
| | if length is None: |
| | brackets = ' &[]' |
| | else: |
| | brackets = ' &[%d]' % length |
| | BItem = CTypesPtr._BItem |
| | getbtype = self.ffi._get_cached_btype |
| | if BItem is getbtype(model.PrimitiveType('char')): |
| | kind = 'char' |
| | elif BItem in (getbtype(model.PrimitiveType('signed char')), |
| | getbtype(model.PrimitiveType('unsigned char'))): |
| | kind = 'byte' |
| | else: |
| | kind = 'generic' |
| | |
| | class CTypesArray(CTypesGenericArray): |
| | __slots__ = ['_blob', '_own'] |
| | if length is not None: |
| | _ctype = BItem._ctype * length |
| | else: |
| | __slots__.append('_ctype') |
| | _reftypename = BItem._get_c_name(brackets) |
| | _declared_length = length |
| | _CTPtr = CTypesPtr |
| |
|
| | def __init__(self, init): |
| | if length is None: |
| | if isinstance(init, (int, long)): |
| | len1 = init |
| | init = None |
| | elif kind == 'char' and isinstance(init, bytes): |
| | len1 = len(init) + 1 |
| | else: |
| | init = tuple(init) |
| | len1 = len(init) |
| | self._ctype = BItem._ctype * len1 |
| | self._blob = self._ctype() |
| | self._own = True |
| | if init is not None: |
| | self._initialize(self._blob, init) |
| |
|
| | @staticmethod |
| | def _initialize(blob, init): |
| | if isinstance(init, bytes): |
| | init = [init[i:i+1] for i in range(len(init))] |
| | else: |
| | if isinstance(init, CTypesGenericArray): |
| | if (len(init) != len(blob) or |
| | not isinstance(init, CTypesArray)): |
| | raise TypeError("length/type mismatch: %s" % (init,)) |
| | init = tuple(init) |
| | if len(init) > len(blob): |
| | raise IndexError("too many initializers") |
| | addr = ctypes.cast(blob, ctypes.c_void_p).value |
| | PTR = ctypes.POINTER(BItem._ctype) |
| | itemsize = ctypes.sizeof(BItem._ctype) |
| | for i, value in enumerate(init): |
| | p = ctypes.cast(addr + i * itemsize, PTR) |
| | BItem._initialize(p.contents, value) |
| |
|
| | def __len__(self): |
| | return len(self._blob) |
| |
|
| | def __getitem__(self, index): |
| | if not (0 <= index < len(self._blob)): |
| | raise IndexError |
| | return BItem._from_ctypes(self._blob[index]) |
| |
|
| | def __setitem__(self, index, value): |
| | if not (0 <= index < len(self._blob)): |
| | raise IndexError |
| | self._blob[index] = BItem._to_ctypes(value) |
| |
|
| | if kind == 'char' or kind == 'byte': |
| | def _to_string(self, maxlen): |
| | if maxlen < 0: |
| | maxlen = len(self._blob) |
| | p = ctypes.cast(self._blob, |
| | ctypes.POINTER(ctypes.c_char)) |
| | n = 0 |
| | while n < maxlen and p[n] != b'\x00': |
| | n += 1 |
| | return b''.join([p[i] for i in range(n)]) |
| |
|
| | def _get_own_repr(self): |
| | if getattr(self, '_own', False): |
| | return 'owning %d bytes' % (ctypes.sizeof(self._blob),) |
| | return super(CTypesArray, self)._get_own_repr() |
| |
|
| | def _convert_to_address(self, BClass): |
| | if BClass in (CTypesPtr, None) or BClass._automatic_casts: |
| | return ctypes.addressof(self._blob) |
| | else: |
| | return CTypesData._convert_to_address(self, BClass) |
| |
|
| | @staticmethod |
| | def _from_ctypes(ctypes_array): |
| | self = CTypesArray.__new__(CTypesArray) |
| | self._blob = ctypes_array |
| | return self |
| |
|
| | @staticmethod |
| | def _arg_to_ctypes(value): |
| | return CTypesPtr._arg_to_ctypes(value) |
| |
|
| | def __add__(self, other): |
| | if isinstance(other, (int, long)): |
| | return CTypesPtr._new_pointer_at( |
| | ctypes.addressof(self._blob) + |
| | other * ctypes.sizeof(BItem._ctype)) |
| | else: |
| | return NotImplemented |
| |
|
| | @classmethod |
| | def _cast_from(cls, source): |
| | raise NotImplementedError("casting to %r" % ( |
| | cls._get_c_name(),)) |
| | |
| | CTypesArray._fix_class() |
| | return CTypesArray |
| |
|
| | def _new_struct_or_union(self, kind, name, base_ctypes_class): |
| | |
| | class struct_or_union(base_ctypes_class): |
| | pass |
| | struct_or_union.__name__ = '%s_%s' % (kind, name) |
| | kind1 = kind |
| | |
| | class CTypesStructOrUnion(CTypesBaseStructOrUnion): |
| | __slots__ = ['_blob'] |
| | _ctype = struct_or_union |
| | _reftypename = '%s &' % (name,) |
| | _kind = kind = kind1 |
| | |
| | CTypesStructOrUnion._fix_class() |
| | return CTypesStructOrUnion |
| |
|
| | def new_struct_type(self, name): |
| | return self._new_struct_or_union('struct', name, ctypes.Structure) |
| |
|
| | def new_union_type(self, name): |
| | return self._new_struct_or_union('union', name, ctypes.Union) |
| |
|
| | def complete_struct_or_union(self, CTypesStructOrUnion, fields, tp, |
| | totalsize=-1, totalalignment=-1, sflags=0, |
| | pack=0): |
| | if totalsize >= 0 or totalalignment >= 0: |
| | raise NotImplementedError("the ctypes backend of CFFI does not support " |
| | "structures completed by verify(); please " |
| | "compile and install the _cffi_backend module.") |
| | struct_or_union = CTypesStructOrUnion._ctype |
| | fnames = [fname for (fname, BField, bitsize) in fields] |
| | btypes = [BField for (fname, BField, bitsize) in fields] |
| | bitfields = [bitsize for (fname, BField, bitsize) in fields] |
| | |
| | bfield_types = {} |
| | cfields = [] |
| | for (fname, BField, bitsize) in fields: |
| | if bitsize < 0: |
| | cfields.append((fname, BField._ctype)) |
| | bfield_types[fname] = BField |
| | else: |
| | cfields.append((fname, BField._ctype, bitsize)) |
| | bfield_types[fname] = Ellipsis |
| | if sflags & 8: |
| | struct_or_union._pack_ = 1 |
| | elif pack: |
| | struct_or_union._pack_ = pack |
| | struct_or_union._fields_ = cfields |
| | CTypesStructOrUnion._bfield_types = bfield_types |
| | |
| | @staticmethod |
| | def _create_ctype_obj(init): |
| | result = struct_or_union() |
| | if init is not None: |
| | initialize(result, init) |
| | return result |
| | CTypesStructOrUnion._create_ctype_obj = _create_ctype_obj |
| | |
| | def initialize(blob, init): |
| | if is_union: |
| | if len(init) > 1: |
| | raise ValueError("union initializer: %d items given, but " |
| | "only one supported (use a dict if needed)" |
| | % (len(init),)) |
| | if not isinstance(init, dict): |
| | if isinstance(init, (bytes, unicode)): |
| | raise TypeError("union initializer: got a str") |
| | init = tuple(init) |
| | if len(init) > len(fnames): |
| | raise ValueError("too many values for %s initializer" % |
| | CTypesStructOrUnion._get_c_name()) |
| | init = dict(zip(fnames, init)) |
| | addr = ctypes.addressof(blob) |
| | for fname, value in init.items(): |
| | BField, bitsize = name2fieldtype[fname] |
| | assert bitsize < 0, \ |
| | "not implemented: initializer with bit fields" |
| | offset = CTypesStructOrUnion._offsetof(fname) |
| | PTR = ctypes.POINTER(BField._ctype) |
| | p = ctypes.cast(addr + offset, PTR) |
| | BField._initialize(p.contents, value) |
| | is_union = CTypesStructOrUnion._kind == 'union' |
| | name2fieldtype = dict(zip(fnames, zip(btypes, bitfields))) |
| | |
| | for fname, BField, bitsize in fields: |
| | if fname == '': |
| | raise NotImplementedError("nested anonymous structs/unions") |
| | if hasattr(CTypesStructOrUnion, fname): |
| | raise ValueError("the field name %r conflicts in " |
| | "the ctypes backend" % fname) |
| | if bitsize < 0: |
| | def getter(self, fname=fname, BField=BField, |
| | offset=CTypesStructOrUnion._offsetof(fname), |
| | PTR=ctypes.POINTER(BField._ctype)): |
| | addr = ctypes.addressof(self._blob) |
| | p = ctypes.cast(addr + offset, PTR) |
| | return BField._from_ctypes(p.contents) |
| | def setter(self, value, fname=fname, BField=BField): |
| | setattr(self._blob, fname, BField._to_ctypes(value)) |
| | |
| | if issubclass(BField, CTypesGenericArray): |
| | setter = None |
| | if BField._declared_length == 0: |
| | def getter(self, fname=fname, BFieldPtr=BField._CTPtr, |
| | offset=CTypesStructOrUnion._offsetof(fname), |
| | PTR=ctypes.POINTER(BField._ctype)): |
| | addr = ctypes.addressof(self._blob) |
| | p = ctypes.cast(addr + offset, PTR) |
| | return BFieldPtr._from_ctypes(p) |
| | |
| | else: |
| | def getter(self, fname=fname, BField=BField): |
| | return BField._from_ctypes(getattr(self._blob, fname)) |
| | def setter(self, value, fname=fname, BField=BField): |
| | |
| | value = BField._to_ctypes(value) |
| | oldvalue = getattr(self._blob, fname) |
| | setattr(self._blob, fname, value) |
| | if value != getattr(self._blob, fname): |
| | setattr(self._blob, fname, oldvalue) |
| | raise OverflowError("value too large for bitfield") |
| | setattr(CTypesStructOrUnion, fname, property(getter, setter)) |
| | |
| | CTypesPtr = self.ffi._get_cached_btype(model.PointerType(tp)) |
| | for fname in fnames: |
| | if hasattr(CTypesPtr, fname): |
| | raise ValueError("the field name %r conflicts in " |
| | "the ctypes backend" % fname) |
| | def getter(self, fname=fname): |
| | return getattr(self[0], fname) |
| | def setter(self, value, fname=fname): |
| | setattr(self[0], fname, value) |
| | setattr(CTypesPtr, fname, property(getter, setter)) |
| |
|
| | def new_function_type(self, BArgs, BResult, has_varargs): |
| | nameargs = [BArg._get_c_name() for BArg in BArgs] |
| | if has_varargs: |
| | nameargs.append('...') |
| | nameargs = ', '.join(nameargs) |
| | |
| | class CTypesFunctionPtr(CTypesGenericPtr): |
| | __slots__ = ['_own_callback', '_name'] |
| | _ctype = ctypes.CFUNCTYPE(getattr(BResult, '_ctype', None), |
| | *[BArg._ctype for BArg in BArgs], |
| | use_errno=True) |
| | _reftypename = BResult._get_c_name('(* &)(%s)' % (nameargs,)) |
| |
|
| | def __init__(self, init, error=None): |
| | |
| | import traceback |
| | assert not has_varargs, "varargs not supported for callbacks" |
| | if getattr(BResult, '_ctype', None) is not None: |
| | error = BResult._from_ctypes( |
| | BResult._create_ctype_obj(error)) |
| | else: |
| | error = None |
| | def callback(*args): |
| | args2 = [] |
| | for arg, BArg in zip(args, BArgs): |
| | args2.append(BArg._from_ctypes(arg)) |
| | try: |
| | res2 = init(*args2) |
| | res2 = BResult._to_ctypes(res2) |
| | except: |
| | traceback.print_exc() |
| | res2 = error |
| | if issubclass(BResult, CTypesGenericPtr): |
| | if res2: |
| | res2 = ctypes.cast(res2, ctypes.c_void_p).value |
| | |
| | else: |
| | res2 = None |
| | |
| | return res2 |
| | if issubclass(BResult, CTypesGenericPtr): |
| | |
| | |
| | callback_ctype = ctypes.CFUNCTYPE( |
| | ctypes.c_void_p, |
| | *[BArg._ctype for BArg in BArgs], |
| | use_errno=True) |
| | else: |
| | callback_ctype = CTypesFunctionPtr._ctype |
| | self._as_ctype_ptr = callback_ctype(callback) |
| | self._address = ctypes.cast(self._as_ctype_ptr, |
| | ctypes.c_void_p).value |
| | self._own_callback = init |
| |
|
| | @staticmethod |
| | def _initialize(ctypes_ptr, value): |
| | if value: |
| | raise NotImplementedError("ctypes backend: not supported: " |
| | "initializers for function pointers") |
| |
|
| | def __repr__(self): |
| | c_name = getattr(self, '_name', None) |
| | if c_name: |
| | i = self._reftypename.index('(* &)') |
| | if self._reftypename[i-1] not in ' )*': |
| | c_name = ' ' + c_name |
| | c_name = self._reftypename.replace('(* &)', c_name) |
| | return CTypesData.__repr__(self, c_name) |
| |
|
| | def _get_own_repr(self): |
| | if getattr(self, '_own_callback', None) is not None: |
| | return 'calling %r' % (self._own_callback,) |
| | return super(CTypesFunctionPtr, self)._get_own_repr() |
| |
|
| | def __call__(self, *args): |
| | if has_varargs: |
| | assert len(args) >= len(BArgs) |
| | extraargs = args[len(BArgs):] |
| | args = args[:len(BArgs)] |
| | else: |
| | assert len(args) == len(BArgs) |
| | ctypes_args = [] |
| | for arg, BArg in zip(args, BArgs): |
| | ctypes_args.append(BArg._arg_to_ctypes(arg)) |
| | if has_varargs: |
| | for i, arg in enumerate(extraargs): |
| | if arg is None: |
| | ctypes_args.append(ctypes.c_void_p(0)) |
| | continue |
| | if not isinstance(arg, CTypesData): |
| | raise TypeError( |
| | "argument %d passed in the variadic part " |
| | "needs to be a cdata object (got %s)" % |
| | (1 + len(BArgs) + i, type(arg).__name__)) |
| | ctypes_args.append(arg._arg_to_ctypes(arg)) |
| | result = self._as_ctype_ptr(*ctypes_args) |
| | return BResult._from_ctypes(result) |
| | |
| | CTypesFunctionPtr._fix_class() |
| | return CTypesFunctionPtr |
| |
|
| | def new_enum_type(self, name, enumerators, enumvalues, CTypesInt): |
| | assert isinstance(name, str) |
| | reverse_mapping = dict(zip(reversed(enumvalues), |
| | reversed(enumerators))) |
| | |
| | class CTypesEnum(CTypesInt): |
| | __slots__ = [] |
| | _reftypename = '%s &' % name |
| |
|
| | def _get_own_repr(self): |
| | value = self._value |
| | try: |
| | return '%d: %s' % (value, reverse_mapping[value]) |
| | except KeyError: |
| | return str(value) |
| |
|
| | def _to_string(self, maxlen): |
| | value = self._value |
| | try: |
| | return reverse_mapping[value] |
| | except KeyError: |
| | return str(value) |
| | |
| | CTypesEnum._fix_class() |
| | return CTypesEnum |
| |
|
| | def get_errno(self): |
| | return ctypes.get_errno() |
| |
|
| | def set_errno(self, value): |
| | ctypes.set_errno(value) |
| |
|
| | def string(self, b, maxlen=-1): |
| | return b._to_string(maxlen) |
| |
|
| | def buffer(self, bptr, size=-1): |
| | raise NotImplementedError("buffer() with ctypes backend") |
| |
|
| | def sizeof(self, cdata_or_BType): |
| | if isinstance(cdata_or_BType, CTypesData): |
| | return cdata_or_BType._get_size_of_instance() |
| | else: |
| | assert issubclass(cdata_or_BType, CTypesData) |
| | return cdata_or_BType._get_size() |
| |
|
| | def alignof(self, BType): |
| | assert issubclass(BType, CTypesData) |
| | return BType._alignment() |
| |
|
| | def newp(self, BType, source): |
| | if not issubclass(BType, CTypesData): |
| | raise TypeError |
| | return BType._newp(source) |
| |
|
| | def cast(self, BType, source): |
| | return BType._cast_from(source) |
| |
|
| | def callback(self, BType, source, error, onerror): |
| | assert onerror is None |
| | return BType(source, error) |
| |
|
| | _weakref_cache_ref = None |
| |
|
| | def gcp(self, cdata, destructor, size=0): |
| | if self._weakref_cache_ref is None: |
| | import weakref |
| | class MyRef(weakref.ref): |
| | def __eq__(self, other): |
| | myref = self() |
| | return self is other or ( |
| | myref is not None and myref is other()) |
| | def __ne__(self, other): |
| | return not (self == other) |
| | def __hash__(self): |
| | try: |
| | return self._hash |
| | except AttributeError: |
| | self._hash = hash(self()) |
| | return self._hash |
| | self._weakref_cache_ref = {}, MyRef |
| | weak_cache, MyRef = self._weakref_cache_ref |
| |
|
| | if destructor is None: |
| | try: |
| | del weak_cache[MyRef(cdata)] |
| | except KeyError: |
| | raise TypeError("Can remove destructor only on a object " |
| | "previously returned by ffi.gc()") |
| | return None |
| |
|
| | def remove(k): |
| | cdata, destructor = weak_cache.pop(k, (None, None)) |
| | if destructor is not None: |
| | destructor(cdata) |
| |
|
| | new_cdata = self.cast(self.typeof(cdata), cdata) |
| | assert new_cdata is not cdata |
| | weak_cache[MyRef(new_cdata, remove)] = (cdata, destructor) |
| | return new_cdata |
| |
|
| | typeof = type |
| |
|
| | def getcname(self, BType, replace_with): |
| | return BType._get_c_name(replace_with) |
| |
|
| | def typeoffsetof(self, BType, fieldname, num=0): |
| | if isinstance(fieldname, str): |
| | if num == 0 and issubclass(BType, CTypesGenericPtr): |
| | BType = BType._BItem |
| | if not issubclass(BType, CTypesBaseStructOrUnion): |
| | raise TypeError("expected a struct or union ctype") |
| | BField = BType._bfield_types[fieldname] |
| | if BField is Ellipsis: |
| | raise TypeError("not supported for bitfields") |
| | return (BField, BType._offsetof(fieldname)) |
| | elif isinstance(fieldname, (int, long)): |
| | if issubclass(BType, CTypesGenericArray): |
| | BType = BType._CTPtr |
| | if not issubclass(BType, CTypesGenericPtr): |
| | raise TypeError("expected an array or ptr ctype") |
| | BItem = BType._BItem |
| | offset = BItem._get_size() * fieldname |
| | if offset > sys.maxsize: |
| | raise OverflowError |
| | return (BItem, offset) |
| | else: |
| | raise TypeError(type(fieldname)) |
| |
|
| | def rawaddressof(self, BTypePtr, cdata, offset=None): |
| | if isinstance(cdata, CTypesBaseStructOrUnion): |
| | ptr = ctypes.pointer(type(cdata)._to_ctypes(cdata)) |
| | elif isinstance(cdata, CTypesGenericPtr): |
| | if offset is None or not issubclass(type(cdata)._BItem, |
| | CTypesBaseStructOrUnion): |
| | raise TypeError("unexpected cdata type") |
| | ptr = type(cdata)._to_ctypes(cdata) |
| | elif isinstance(cdata, CTypesGenericArray): |
| | ptr = type(cdata)._to_ctypes(cdata) |
| | else: |
| | raise TypeError("expected a <cdata 'struct-or-union'>") |
| | if offset: |
| | ptr = ctypes.cast( |
| | ctypes.c_void_p( |
| | ctypes.cast(ptr, ctypes.c_void_p).value + offset), |
| | type(ptr)) |
| | return BTypePtr._from_ctypes(ptr) |
| |
|
| |
|
| | class CTypesLibrary(object): |
| |
|
| | def __init__(self, backend, cdll): |
| | self.backend = backend |
| | self.cdll = cdll |
| |
|
| | def load_function(self, BType, name): |
| | c_func = getattr(self.cdll, name) |
| | funcobj = BType._from_ctypes(c_func) |
| | funcobj._name = name |
| | return funcobj |
| |
|
| | def read_variable(self, BType, name): |
| | try: |
| | ctypes_obj = BType._ctype.in_dll(self.cdll, name) |
| | except AttributeError as e: |
| | raise NotImplementedError(e) |
| | return BType._from_ctypes(ctypes_obj) |
| |
|
| | def write_variable(self, BType, name, value): |
| | new_ctypes_obj = BType._to_ctypes(value) |
| | ctypes_obj = BType._ctype.in_dll(self.cdll, name) |
| | ctypes.memmove(ctypes.addressof(ctypes_obj), |
| | ctypes.addressof(new_ctypes_obj), |
| | ctypes.sizeof(BType._ctype)) |
| |
|