| | """ |
| | ane_bridge_py.py — Python ctypes wrapper for libane_bridge.dylib |
| | |
| | Provides a Pythonic interface to Apple Neural Engine private APIs |
| | via the maderix/ANE C bridge library. Enables compiling and executing |
| | MIL programs on ANE hardware from Python. |
| | |
| | Usage: |
| | from ane_bridge_py import ANEBridge |
| | ane = ANEBridge() |
| | kernel = ane.compile_kernel(mil_text, weights, input_sizes, output_sizes) |
| | ane.write_input(kernel, 0, my_numpy_array) |
| | ane.eval(kernel) |
| | result = ane.read_output(kernel, 0, output_shape, dtype=np.float16) |
| | ane.free_kernel(kernel) |
| | """ |
| |
|
| | import ctypes |
| | import ctypes.util |
| | import os |
| | import numpy as np |
| | from pathlib import Path |
| | from typing import Optional |
| |
|
| | |
| | _BRIDGE_DIR = Path(__file__).parent / "bridge" |
| | _LIB_PATH = str(_BRIDGE_DIR / "libane_bridge.dylib") |
| |
|
| | |
| | MAX_COMPILE_BUDGET = 110 |
| |
|
| |
|
| | class ANEBridgeError(Exception): |
| | """Error from ANE bridge operations.""" |
| | pass |
| |
|
| |
|
| | class ANEBridge: |
| | """Python wrapper for the ANE C bridge library.""" |
| |
|
| | def __init__(self, lib_path: Optional[str] = None): |
| | lib_path = lib_path or _LIB_PATH |
| | if not os.path.exists(lib_path): |
| | raise ANEBridgeError( |
| | f"ANE bridge library not found at {lib_path}. " |
| | f"Run: cd scripts/ane-engine/bridge && make" |
| | ) |
| |
|
| | self._lib = ctypes.CDLL(lib_path) |
| | self._setup_signatures() |
| |
|
| | rc = self._lib.ane_bridge_init() |
| | if rc != 0: |
| | raise ANEBridgeError( |
| | "Failed to initialize ANE runtime. " |
| | "Requires macOS 15+ on Apple Silicon." |
| | ) |
| |
|
| | def _setup_signatures(self): |
| | """Define C function signatures for type safety.""" |
| | lib = self._lib |
| |
|
| | |
| | lib.ane_bridge_init.restype = ctypes.c_int |
| | lib.ane_bridge_init.argtypes = [] |
| |
|
| | |
| | lib.ane_bridge_compile.restype = ctypes.c_void_p |
| | lib.ane_bridge_compile.argtypes = [ |
| | ctypes.c_char_p, |
| | ctypes.c_size_t, |
| | ctypes.POINTER(ctypes.c_uint8), |
| | ctypes.c_size_t, |
| | ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_size_t), |
| | ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_size_t), |
| | ] |
| |
|
| | |
| | lib.ane_bridge_compile_multi_weights.restype = ctypes.c_void_p |
| | lib.ane_bridge_compile_multi_weights.argtypes = [ |
| | ctypes.c_char_p, |
| | ctypes.c_size_t, |
| | ctypes.POINTER(ctypes.c_char_p), |
| | ctypes.POINTER(ctypes.POINTER(ctypes.c_uint8)), |
| | ctypes.POINTER(ctypes.c_size_t), |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_size_t), |
| | ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_size_t), |
| | ] |
| |
|
| | |
| | lib.ane_bridge_eval.restype = ctypes.c_bool |
| | lib.ane_bridge_eval.argtypes = [ctypes.c_void_p] |
| |
|
| | |
| | lib.ane_bridge_write_input.restype = None |
| | lib.ane_bridge_write_input.argtypes = [ |
| | ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t |
| | ] |
| |
|
| | |
| | lib.ane_bridge_read_output.restype = None |
| | lib.ane_bridge_read_output.argtypes = [ |
| | ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t |
| | ] |
| |
|
| | |
| | lib.ane_bridge_free.restype = None |
| | lib.ane_bridge_free.argtypes = [ctypes.c_void_p] |
| |
|
| | |
| | lib.ane_bridge_get_compile_count.restype = ctypes.c_int |
| | lib.ane_bridge_get_compile_count.argtypes = [] |
| |
|
| | |
| | lib.ane_bridge_reset_compile_count.restype = None |
| | lib.ane_bridge_reset_compile_count.argtypes = [] |
| |
|
| | |
| | lib.ane_bridge_build_weight_blob.restype = ctypes.POINTER(ctypes.c_uint8) |
| | lib.ane_bridge_build_weight_blob.argtypes = [ |
| | ctypes.POINTER(ctypes.c_float), ctypes.c_int, ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_size_t) |
| | ] |
| |
|
| | |
| | lib.ane_bridge_build_weight_blob_transposed.restype = ctypes.POINTER(ctypes.c_uint8) |
| | lib.ane_bridge_build_weight_blob_transposed.argtypes = [ |
| | ctypes.POINTER(ctypes.c_float), ctypes.c_int, ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_size_t) |
| | ] |
| |
|
| | |
| | lib.ane_bridge_free_blob.restype = None |
| | lib.ane_bridge_free_blob.argtypes = [ctypes.c_void_p] |
| |
|
| | @property |
| | def compile_count(self) -> int: |
| | """Current number of ANE compilations in this process.""" |
| | return self._lib.ane_bridge_get_compile_count() |
| |
|
| | @property |
| | def compile_budget_remaining(self) -> int: |
| | """Remaining compilations before process restart needed.""" |
| | return MAX_COMPILE_BUDGET - self.compile_count |
| |
|
| | def needs_restart(self) -> bool: |
| | """True if compile budget is exhausted and process needs restart.""" |
| | return self.compile_count >= MAX_COMPILE_BUDGET |
| |
|
| | def reset_compile_count(self): |
| | """Reset compile counter (call after process restart).""" |
| | self._lib.ane_bridge_reset_compile_count() |
| |
|
| | def build_weight_blob(self, weights: np.ndarray, transpose: bool = False) -> tuple: |
| | """Convert numpy float32 weights to ANE blob format (128-byte header + fp16). |
| | |
| | Args: |
| | weights: float32 numpy array of shape (rows, cols) |
| | transpose: if True, store in transposed layout |
| | |
| | Returns: |
| | (blob_pointer, blob_length) — caller should free via free_blob() |
| | """ |
| | if weights.dtype != np.float32: |
| | weights = weights.astype(np.float32) |
| | weights = np.ascontiguousarray(weights) |
| |
|
| | rows, cols = weights.shape |
| | out_len = ctypes.c_size_t() |
| | src_ptr = weights.ctypes.data_as(ctypes.POINTER(ctypes.c_float)) |
| |
|
| | if transpose: |
| | blob = self._lib.ane_bridge_build_weight_blob_transposed( |
| | src_ptr, rows, cols, ctypes.byref(out_len)) |
| | else: |
| | blob = self._lib.ane_bridge_build_weight_blob( |
| | src_ptr, rows, cols, ctypes.byref(out_len)) |
| |
|
| | if not blob: |
| | raise ANEBridgeError("Failed to build weight blob") |
| |
|
| | return blob, out_len.value |
| |
|
| | def free_blob(self, blob_ptr): |
| | """Free a weight blob allocated by build_weight_blob.""" |
| | self._lib.ane_bridge_free_blob(blob_ptr) |
| |
|
| | def compile_kernel( |
| | self, |
| | mil_text: str, |
| | input_sizes: list[int], |
| | output_sizes: list[int], |
| | weight_data: Optional[bytes] = None, |
| | ) -> int: |
| | """Compile a MIL program with optional single weight blob. |
| | |
| | Args: |
| | mil_text: UTF-8 MIL program text |
| | input_sizes: list of byte sizes for each input IOSurface |
| | output_sizes: list of byte sizes for each output IOSurface |
| | weight_data: optional raw weight blob bytes |
| | |
| | Returns: |
| | Opaque kernel handle (int). Use with eval(), write_input(), etc. |
| | """ |
| | if self.needs_restart(): |
| | raise ANEBridgeError( |
| | f"Compile budget exhausted ({self.compile_count} compiles). " |
| | "Process restart required." |
| | ) |
| |
|
| | mil_bytes = mil_text.encode('utf-8') |
| | n_inputs = len(input_sizes) |
| | n_outputs = len(output_sizes) |
| |
|
| | c_input_sizes = (ctypes.c_size_t * n_inputs)(*input_sizes) |
| | c_output_sizes = (ctypes.c_size_t * n_outputs)(*output_sizes) |
| |
|
| | if weight_data: |
| | c_weight = (ctypes.c_uint8 * len(weight_data)).from_buffer_copy(weight_data) |
| | handle = self._lib.ane_bridge_compile( |
| | mil_bytes, len(mil_bytes), |
| | c_weight, len(weight_data), |
| | n_inputs, c_input_sizes, |
| | n_outputs, c_output_sizes) |
| | else: |
| | handle = self._lib.ane_bridge_compile( |
| | mil_bytes, len(mil_bytes), |
| | None, 0, |
| | n_inputs, c_input_sizes, |
| | n_outputs, c_output_sizes) |
| |
|
| | if not handle: |
| | raise ANEBridgeError("ANE kernel compilation failed") |
| |
|
| | return handle |
| |
|
| | def compile_kernel_multi_weights( |
| | self, |
| | mil_text: str, |
| | weights: dict[str, tuple], |
| | input_sizes: list[int], |
| | output_sizes: list[int], |
| | ) -> int: |
| | """Compile a MIL program with multiple named weight blobs. |
| | |
| | Args: |
| | mil_text: UTF-8 MIL program text |
| | weights: dict of {name: (blob_ptr, blob_len)} from build_weight_blob() |
| | input_sizes: list of byte sizes for each input IOSurface |
| | output_sizes: list of byte sizes for each output IOSurface |
| | |
| | Returns: |
| | Opaque kernel handle |
| | """ |
| | if self.needs_restart(): |
| | raise ANEBridgeError( |
| | f"Compile budget exhausted ({self.compile_count} compiles). " |
| | "Process restart required." |
| | ) |
| |
|
| | mil_bytes = mil_text.encode('utf-8') |
| | n_inputs = len(input_sizes) |
| | n_outputs = len(output_sizes) |
| | n_weights = len(weights) |
| |
|
| | |
| | c_names = (ctypes.c_char_p * n_weights)() |
| | c_datas = (ctypes.POINTER(ctypes.c_uint8) * n_weights)() |
| | c_lens = (ctypes.c_size_t * n_weights)() |
| |
|
| | for i, (name, (blob_ptr, blob_len)) in enumerate(weights.items()): |
| | c_names[i] = name.encode('utf-8') |
| | c_datas[i] = ctypes.cast(blob_ptr, ctypes.POINTER(ctypes.c_uint8)) |
| | c_lens[i] = blob_len |
| |
|
| | c_input_sizes = (ctypes.c_size_t * n_inputs)(*input_sizes) |
| | c_output_sizes = (ctypes.c_size_t * n_outputs)(*output_sizes) |
| |
|
| | handle = self._lib.ane_bridge_compile_multi_weights( |
| | mil_bytes, len(mil_bytes), |
| | c_names, c_datas, c_lens, n_weights, |
| | n_inputs, c_input_sizes, |
| | n_outputs, c_output_sizes) |
| |
|
| | if not handle: |
| | raise ANEBridgeError("ANE kernel compilation with multi-weights failed") |
| |
|
| | return handle |
| |
|
| | def eval(self, kernel_handle: int) -> bool: |
| | """Execute a compiled kernel on ANE hardware. |
| | |
| | Args: |
| | kernel_handle: handle from compile_kernel() |
| | |
| | Returns: |
| | True on success |
| | """ |
| | result = self._lib.ane_bridge_eval(kernel_handle) |
| | if not result: |
| | raise ANEBridgeError("ANE kernel evaluation failed") |
| | return True |
| |
|
| | def write_input(self, kernel_handle: int, index: int, data: np.ndarray): |
| | """Write numpy array to kernel input IOSurface. |
| | |
| | Args: |
| | kernel_handle: handle from compile_kernel() |
| | index: input tensor index (0-based) |
| | data: numpy array (will be made contiguous if needed) |
| | """ |
| | data = np.ascontiguousarray(data) |
| | self._lib.ane_bridge_write_input( |
| | kernel_handle, index, |
| | data.ctypes.data, data.nbytes) |
| |
|
| | def read_output( |
| | self, |
| | kernel_handle: int, |
| | index: int, |
| | shape: tuple, |
| | dtype=np.float16, |
| | ) -> np.ndarray: |
| | """Read kernel output IOSurface into numpy array. |
| | |
| | Args: |
| | kernel_handle: handle from compile_kernel() |
| | index: output tensor index (0-based) |
| | shape: shape of the output tensor |
| | dtype: numpy dtype (default float16, matching ANE native format) |
| | |
| | Returns: |
| | numpy array with output data |
| | """ |
| | out = np.empty(shape, dtype=dtype) |
| | self._lib.ane_bridge_read_output( |
| | kernel_handle, index, |
| | out.ctypes.data, out.nbytes) |
| | return out |
| |
|
| | def free_kernel(self, kernel_handle: int): |
| | """Free a compiled kernel and all associated resources.""" |
| | if kernel_handle: |
| | self._lib.ane_bridge_free(kernel_handle) |
| |
|
| |
|
| | def self_test(): |
| | """Quick self-test to verify ANE bridge works on this machine.""" |
| | print("ANE Bridge Self-Test") |
| | print("=" * 40) |
| |
|
| | try: |
| | ane = ANEBridge() |
| | print(f"[OK] ANE runtime initialized") |
| | print(f" Compile count: {ane.compile_count}") |
| | print(f" Budget remaining: {ane.compile_budget_remaining}") |
| | except ANEBridgeError as e: |
| | print(f"[FAIL] {e}") |
| | return False |
| |
|
| | |
| | |
| | |
| | ch, sp = 64, 16 |
| | mil_text = ( |
| | 'program(1.3)\n' |
| | '[buildInfo = dict<string, string>({{"coremlc-component-MIL", "3510.2.1"}, ' |
| | '{"coremlc-version", "3505.4.1"}, ' |
| | '{"coremltools-component-milinternal", ""}, ' |
| | '{"coremltools-version", "9.0"}})]\n' |
| | '{\n' |
| | f' func main<ios18>(tensor<fp32, [1, {ch}, 1, {sp}]> x) {{\n' |
| | ' string c_pad_type = const()[name = string("c_pad_type"), val = string("valid")];\n' |
| | ' tensor<int32, [2]> c_strides = const()[name = string("c_strides"), val = tensor<int32, [2]>([1, 1])];\n' |
| | ' tensor<int32, [4]> c_pad = const()[name = string("c_pad"), val = tensor<int32, [4]>([0, 0, 0, 0])];\n' |
| | ' tensor<int32, [2]> c_dilations = const()[name = string("c_dilations"), val = tensor<int32, [2]>([1, 1])];\n' |
| | ' int32 c_groups = const()[name = string("c_groups"), val = int32(1)];\n' |
| | ' string to_fp16 = const()[name = string("to_fp16"), val = string("fp16")];\n' |
| | f' tensor<fp16, [1, {ch}, 1, {sp}]> x16 = cast(dtype = to_fp16, x = x)[name = string("cast_in")];\n' |
| | f' tensor<fp16, [{ch}, {ch}, 1, 1]> W = const()[name = string("W"), val = tensor<fp16, [{ch}, {ch}, 1, 1]>(BLOBFILE(path = string("@model_path/weights/weight.bin"), offset = uint64(64)))];\n' |
| | f' tensor<fp16, [1, {ch}, 1, {sp}]> y16 = conv(dilations = c_dilations, groups = c_groups, pad = c_pad, pad_type = c_pad_type, strides = c_strides, weight = W, x = x16)[name = string("conv")];\n' |
| | ' string to_fp32 = const()[name = string("to_fp32"), val = string("fp32")];\n' |
| | f' tensor<fp32, [1, {ch}, 1, {sp}]> y = cast(dtype = to_fp32, x = y16)[name = string("cast_out")];\n' |
| | ' } -> (y);\n' |
| | '}\n' |
| | ) |
| |
|
| | |
| | W = np.eye(ch, dtype=np.float32) |
| | blob_ptr, blob_len = ane.build_weight_blob(W) |
| |
|
| | tensor_bytes_in = ch * sp * 4 |
| | tensor_bytes_out = ch * sp * 4 |
| |
|
| | try: |
| | |
| | blob_bytes = bytes(ctypes.cast(blob_ptr, ctypes.POINTER(ctypes.c_uint8 * blob_len)).contents) |
| | kernel = ane.compile_kernel( |
| | mil_text, |
| | input_sizes=[tensor_bytes_in], |
| | output_sizes=[tensor_bytes_out], |
| | weight_data=blob_bytes, |
| | ) |
| | print(f"[OK] MIL compilation succeeded (handle: 0x{kernel:x})") |
| | print(f" Compile count: {ane.compile_count}") |
| | except ANEBridgeError as e: |
| | print(f"[FAIL] Compilation: {e}") |
| | ane.free_blob(blob_ptr) |
| | return False |
| | finally: |
| | ane.free_blob(blob_ptr) |
| |
|
| | |
| | x = np.random.randn(1, ch, 1, sp).astype(np.float32) |
| |
|
| | try: |
| | ane.write_input(kernel, 0, x) |
| | ane.eval(kernel) |
| | result = ane.read_output(kernel, 0, (1, ch, 1, sp), dtype=np.float32) |
| |
|
| | |
| | if np.allclose(result, x, atol=0.05): |
| | print(f"[OK] ANE evaluation correct (identity conv)") |
| | print(f" Input[:4]: {x.flatten()[:4]}") |
| | print(f" Output[:4]: {result.flatten()[:4]}") |
| | else: |
| | max_err = np.max(np.abs(result - x)) |
| | print(f"[WARN] Result differs (max err: {max_err:.4f})") |
| | print(f" Input[:4]: {x.flatten()[:4]}") |
| | print(f" Output[:4]: {result.flatten()[:4]}") |
| | |
| | except ANEBridgeError as e: |
| | print(f"[FAIL] Evaluation: {e}") |
| | ane.free_kernel(kernel) |
| | return False |
| |
|
| | |
| | try: |
| | weights = np.random.randn(4, 4).astype(np.float32) |
| | blob, blob_len = ane.build_weight_blob(weights) |
| | print(f"[OK] Weight blob built ({blob_len} bytes for 4x4 float32)") |
| | ane.free_blob(blob) |
| | except ANEBridgeError as e: |
| | print(f"[FAIL] Weight blob: {e}") |
| | ane.free_kernel(kernel) |
| | return False |
| |
|
| | ane.free_kernel(kernel) |
| | print(f"\n[PASS] All ANE bridge tests passed") |
| | print(f" Final compile count: {ane.compile_count}") |
| | return True |
| |
|
| |
|
| | if __name__ == "__main__": |
| | success = self_test() |
| | exit(0 if success else 1) |
| |
|