| """ |
| MLE SIMD-Optimized Bitwise Operations |
| ===================================== |
| Hardware-accelerated Hamming distance, popcount, and batch XOR operations. |
| Uses ctypes to call GCC-compiled C with -march=native for automatic SIMD |
| vectorization (AVX-512 VPOPCNTQ / AVX2 POPCNT / SSE4.2 POPCNT). |
| |
| Fallback: pure NumPy LUT-based popcount for portability. |
| """ |
|
|
| import numpy as np |
| import ctypes |
| import tempfile |
| import subprocess |
| import os |
| import logging |
| from pathlib import Path |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| N_BITS = 4096 |
| N_WORDS = N_BITS // 64 |
| N_BYTES = N_BITS // 8 |
|
|
| |
|
|
| _NATIVE_C_SOURCE = r""" |
| #include <stdint.h> |
| #include <stdlib.h> |
| #include <string.h> |
| |
| /* Single-pair Hamming distance: XOR + popcount over N uint64 words */ |
| int hamming_single(const uint64_t *a, const uint64_t *b, int n_words) { |
| int cnt = 0; |
| for (int i = 0; i < n_words; i++) |
| cnt += __builtin_popcountll(a[i] ^ b[i]); |
| return cnt; |
| } |
| |
| /* Batch Hamming: query (1 x n_words) vs corpus (n_vecs x n_words) |
| Results written to out[n_vecs]. Layout: corpus is row-major contiguous. */ |
| void hamming_batch(const uint64_t *query, const uint64_t *corpus, |
| int n_words, int n_vecs, int *out) { |
| for (int v = 0; v < n_vecs; v++) { |
| int cnt = 0; |
| const uint64_t *row = corpus + (long)v * n_words; |
| for (int w = 0; w < n_words; w++) |
| cnt += __builtin_popcountll(query[w] ^ row[w]); |
| out[v] = cnt; |
| } |
| } |
| |
| /* Batch Hamming with top-K selection (partial sort). |
| Returns indices of top_k smallest distances. |
| Uses a simple max-heap of size top_k for O(N log K). */ |
| static void swap_int(int *a, int *b) { int t = *a; *a = *b; *b = t; } |
| |
| static void sift_down_max(int *heap_dist, int *heap_idx, int size, int i) { |
| while (1) { |
| int largest = i, l = 2*i+1, r = 2*i+2; |
| if (l < size && heap_dist[l] > heap_dist[largest]) largest = l; |
| if (r < size && heap_dist[r] > heap_dist[largest]) largest = r; |
| if (largest == i) break; |
| swap_int(&heap_dist[i], &heap_dist[largest]); |
| swap_int(&heap_idx[i], &heap_idx[largest]); |
| i = largest; |
| } |
| } |
| |
| void hamming_topk(const uint64_t *query, const uint64_t *corpus, |
| int n_words, int n_vecs, int top_k, |
| int *out_indices, int *out_dists) { |
| /* Initialize heap with first top_k elements */ |
| int heap_size = (top_k < n_vecs) ? top_k : n_vecs; |
| for (int v = 0; v < heap_size; v++) { |
| int cnt = 0; |
| const uint64_t *row = corpus + (long)v * n_words; |
| for (int w = 0; w < n_words; w++) |
| cnt += __builtin_popcountll(query[w] ^ row[w]); |
| out_dists[v] = cnt; |
| out_indices[v] = v; |
| } |
| /* Build max-heap */ |
| for (int i = heap_size/2 - 1; i >= 0; i--) |
| sift_down_max(out_dists, out_indices, heap_size, i); |
| |
| /* Process remaining vectors */ |
| for (int v = heap_size; v < n_vecs; v++) { |
| int cnt = 0; |
| const uint64_t *row = corpus + (long)v * n_words; |
| for (int w = 0; w < n_words; w++) |
| cnt += __builtin_popcountll(query[w] ^ row[w]); |
| if (cnt < out_dists[0]) { |
| out_dists[0] = cnt; |
| out_indices[0] = v; |
| sift_down_max(out_dists, out_indices, heap_size, 0); |
| } |
| } |
| } |
| |
| /* Popcount of a single vector (count of 1-bits) */ |
| int popcount_vec(const uint64_t *a, int n_words) { |
| int cnt = 0; |
| for (int i = 0; i < n_words; i++) |
| cnt += __builtin_popcountll(a[i]); |
| return cnt; |
| } |
| |
| /* Batch XOR: out[i] = a[i] ^ b[i] for vectors of n_words */ |
| void xor_vectors(const uint64_t *a, const uint64_t *b, uint64_t *out, int n_words) { |
| for (int i = 0; i < n_words; i++) |
| out[i] = a[i] ^ b[i]; |
| } |
| |
| /* Batch majority vote: given n_vecs vectors of n_words uint64, |
| compute per-bit majority. Result in out[n_words]. */ |
| void majority_vote(const uint64_t *vecs, int n_vecs, int n_words, uint64_t *out) { |
| int n_bits = n_words * 64; |
| int threshold = n_vecs / 2; |
| /* Count per-bit using word-level iteration */ |
| for (int w = 0; w < n_words; w++) { |
| uint64_t result = 0; |
| for (int b = 0; b < 64; b++) { |
| int count = 0; |
| uint64_t mask = (uint64_t)1 << b; |
| for (int v = 0; v < n_vecs; v++) |
| count += ((vecs[(long)v * n_words + w] & mask) != 0); |
| if (count > threshold) |
| result |= mask; |
| } |
| out[w] = result; |
| } |
| } |
| """ |
|
|
| _lib = None |
| _lib_path = None |
|
|
|
|
| def _compile_native(): |
| """Compile the C library with native SIMD optimization.""" |
| global _lib, _lib_path |
| if _lib is not None: |
| return _lib |
|
|
| src_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.c") |
| lib_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.so") |
| _lib_path = lib_path |
|
|
| with open(src_path, "w") as f: |
| f.write(_NATIVE_C_SOURCE) |
|
|
| try: |
| subprocess.run( |
| ["gcc", "-O3", "-march=native", "-shared", "-fPIC", |
| "-o", lib_path, src_path], |
| check=True, capture_output=True, text=True |
| ) |
| lib = ctypes.CDLL(lib_path) |
|
|
| |
| lib.hamming_single.restype = ctypes.c_int |
| lib.hamming_single.argtypes = [ |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.c_int |
| ] |
|
|
| |
| lib.hamming_batch.restype = None |
| lib.hamming_batch.argtypes = [ |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.c_int, ctypes.c_int, |
| ctypes.POINTER(ctypes.c_int) |
| ] |
|
|
| |
| lib.hamming_topk.restype = None |
| lib.hamming_topk.argtypes = [ |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.c_int, ctypes.c_int, ctypes.c_int, |
| ctypes.POINTER(ctypes.c_int), |
| ctypes.POINTER(ctypes.c_int) |
| ] |
|
|
| |
| lib.popcount_vec.restype = ctypes.c_int |
| lib.popcount_vec.argtypes = [ |
| ctypes.POINTER(ctypes.c_uint64), ctypes.c_int |
| ] |
|
|
| |
| lib.xor_vectors.restype = None |
| lib.xor_vectors.argtypes = [ |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.c_int |
| ] |
|
|
| |
| lib.majority_vote.restype = None |
| lib.majority_vote.argtypes = [ |
| ctypes.POINTER(ctypes.c_uint64), |
| ctypes.c_int, ctypes.c_int, |
| ctypes.POINTER(ctypes.c_uint64) |
| ] |
|
|
| _lib = lib |
| logger.info("Native SIMD library compiled successfully with -march=native") |
| return lib |
| except Exception as e: |
| logger.warning(f"Failed to compile native SIMD library: {e}. Using NumPy fallback.") |
| return None |
|
|
|
|
| def get_native_lib(): |
| """Get the compiled native library (lazy initialization).""" |
| return _compile_native() |
|
|
|
|
| |
|
|
| |
| _POPCOUNT_LUT = np.array([bin(i).count('1') for i in range(256)], dtype=np.int32) |
|
|
|
|
| def _np_hamming_single(a: np.ndarray, b: np.ndarray) -> int: |
| """Pure NumPy Hamming distance between two packed uint64 vectors.""" |
| xor = np.bitwise_xor(a, b).view(np.uint8) |
| return int(_POPCOUNT_LUT[xor].sum()) |
|
|
|
|
| def _np_hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray: |
| """Pure NumPy batch Hamming distance. query: (N_WORDS,), corpus: (M, N_WORDS).""" |
| xor = np.bitwise_xor(query[np.newaxis, :], corpus) |
| xor_bytes = xor.view(np.uint8) |
| return _POPCOUNT_LUT[xor_bytes].reshape(len(corpus), -1).sum(axis=1) |
|
|
|
|
| |
|
|
| def _as_ptr64(arr: np.ndarray): |
| """Get ctypes pointer to uint64 array.""" |
| return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_uint64)) |
|
|
|
|
| def _as_ptr32(arr: np.ndarray): |
| """Get ctypes pointer to int32 array.""" |
| return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_int)) |
|
|
|
|
| def hamming_distance(a: np.ndarray, b: np.ndarray) -> int: |
| """Compute Hamming distance between two 4096-bit packed vectors. |
| a, b: np.ndarray of shape (N_WORDS,) dtype=uint64. |
| """ |
| lib = get_native_lib() |
| if lib is not None: |
| return lib.hamming_single(_as_ptr64(a), _as_ptr64(b), N_WORDS) |
| return _np_hamming_single(a, b) |
|
|
|
|
| def hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray: |
| """Compute Hamming distances from query to all corpus vectors. |
| query: (N_WORDS,) uint64 |
| corpus: (M, N_WORDS) uint64, C-contiguous |
| Returns: (M,) int32 array of distances. |
| """ |
| assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous for SIMD" |
| n_vecs = corpus.shape[0] |
| lib = get_native_lib() |
| if lib is not None: |
| out = np.empty(n_vecs, dtype=np.int32) |
| lib.hamming_batch( |
| _as_ptr64(query), _as_ptr64(corpus), |
| N_WORDS, n_vecs, _as_ptr32(out) |
| ) |
| return out |
| return _np_hamming_batch(query, corpus).astype(np.int32) |
|
|
|
|
| def hamming_topk(query: np.ndarray, corpus: np.ndarray, k: int = 500): |
| """Find top-k nearest vectors by Hamming distance. |
| Returns: (indices, distances) each of shape (k,), sorted ascending by distance. |
| Uses O(N log K) max-heap in native code. |
| """ |
| assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous" |
| n_vecs = corpus.shape[0] |
| actual_k = min(k, n_vecs) |
| lib = get_native_lib() |
|
|
| if lib is not None: |
| out_idx = np.empty(actual_k, dtype=np.int32) |
| out_dist = np.empty(actual_k, dtype=np.int32) |
| lib.hamming_topk( |
| _as_ptr64(query), _as_ptr64(corpus), |
| N_WORDS, n_vecs, actual_k, |
| _as_ptr32(out_idx), _as_ptr32(out_dist) |
| ) |
| |
| order = np.argsort(out_dist) |
| return out_idx[order], out_dist[order] |
| else: |
| dists = _np_hamming_batch(query, corpus) |
| if actual_k < n_vecs: |
| idx = np.argpartition(dists, actual_k)[:actual_k] |
| else: |
| idx = np.arange(n_vecs) |
| order = np.argsort(dists[idx]) |
| sorted_idx = idx[order] |
| return sorted_idx.astype(np.int32), dists[sorted_idx].astype(np.int32) |
|
|
|
|
| def xor_vectors(a: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Bitwise XOR of two packed uint64 vectors.""" |
| lib = get_native_lib() |
| if lib is not None: |
| out = np.empty(N_WORDS, dtype=np.uint64) |
| lib.xor_vectors(_as_ptr64(a), _as_ptr64(b), _as_ptr64(out), N_WORDS) |
| return out |
| return np.bitwise_xor(a, b) |
|
|
|
|
| def popcount(a: np.ndarray) -> int: |
| """Count number of 1-bits in packed uint64 vector.""" |
| lib = get_native_lib() |
| if lib is not None: |
| return lib.popcount_vec(_as_ptr64(a), N_WORDS) |
| return int(_POPCOUNT_LUT[a.view(np.uint8)].sum()) |
|
|
|
|
| def majority_vote(vectors: np.ndarray) -> np.ndarray: |
| """Bitwise majority vote across multiple packed uint64 vectors. |
| vectors: (M, N_WORDS) uint64, C-contiguous. |
| Returns: (N_WORDS,) uint64. |
| """ |
| assert vectors.flags['C_CONTIGUOUS'] |
| n_vecs = vectors.shape[0] |
| lib = get_native_lib() |
| if lib is not None: |
| out = np.empty(N_WORDS, dtype=np.uint64) |
| lib.majority_vote(_as_ptr64(vectors), n_vecs, N_WORDS, _as_ptr64(out)) |
| return out |
| |
| bits = np.unpackbits(vectors.view(np.uint8), axis=1) |
| summed = bits.astype(np.int32).sum(axis=0) |
| majority = (summed > n_vecs / 2).astype(np.uint8) |
| return np.packbits(majority).view(np.uint64) |
|
|
|
|
| |
|
|
| def random_binary_vector(n_words: int = N_WORDS) -> np.ndarray: |
| """Generate a random 4096-bit vector, stored as packed uint64. |
| Each bit is iid Bernoulli(0.5) β balanced density. |
| """ |
| return np.random.randint( |
| 0, np.iinfo(np.uint64).max + 1, |
| size=n_words, dtype=np.uint64 |
| ) |
|
|
|
|
| def random_binary_vectors(n: int, n_words: int = N_WORDS) -> np.ndarray: |
| """Generate n random 4096-bit vectors. Shape: (n, N_WORDS), C-contiguous.""" |
| return np.ascontiguousarray( |
| np.random.randint( |
| 0, np.iinfo(np.uint64).max + 1, |
| size=(n, n_words), dtype=np.uint64 |
| ) |
| ) |
|
|
|
|
| def normalize_density(v: np.ndarray, target_density: float = 0.5) -> np.ndarray: |
| """Normalize a binary vector to target bit density. |
| Randomly flips bits to reach the desired proportion of 1-bits. |
| """ |
| bits = np.unpackbits(v.view(np.uint8)) |
| current = bits.sum() / len(bits) |
| target_ones = int(target_density * len(bits)) |
| current_ones = int(bits.sum()) |
|
|
| if current_ones == target_ones: |
| return v.copy() |
|
|
| if current_ones > target_ones: |
| |
| one_positions = np.where(bits == 1)[0] |
| to_flip = np.random.choice(one_positions, current_ones - target_ones, replace=False) |
| bits[to_flip] = 0 |
| else: |
| |
| zero_positions = np.where(bits == 0)[0] |
| to_flip = np.random.choice(zero_positions, target_ones - current_ones, replace=False) |
| bits[to_flip] = 1 |
|
|
| return np.packbits(bits).view(np.uint64).copy() |
|
|
|
|
| def hamming_similarity(a: np.ndarray, b: np.ndarray) -> float: |
| """Normalized Hamming similarity in [0, 1]. 1.0 = identical.""" |
| return 1.0 - hamming_distance(a, b) / N_BITS |
|
|