""" MLE SIMD-Optimized Bitwise Operations ===================================== Hardware-accelerated Hamming distance, popcount, and batch XOR operations. Uses ctypes to call GCC-compiled C with -march=native for automatic SIMD vectorization (AVX-512 VPOPCNTQ / AVX2 POPCNT / SSE4.2 POPCNT). Fallback: pure NumPy LUT-based popcount for portability. """ import numpy as np import ctypes import tempfile import subprocess import os import logging from pathlib import Path logger = logging.getLogger(__name__) # ── Constants ────────────────────────────────────────────────────────────────── N_BITS = 4096 N_WORDS = N_BITS // 64 # 64 uint64 words = 512 bytes per vector N_BYTES = N_BITS // 8 # 512 bytes # ── Compile native SIMD library ─────────────────────────────────────────────── _NATIVE_C_SOURCE = r""" #include #include #include /* Single-pair Hamming distance: XOR + popcount over N uint64 words */ int hamming_single(const uint64_t *a, const uint64_t *b, int n_words) { int cnt = 0; for (int i = 0; i < n_words; i++) cnt += __builtin_popcountll(a[i] ^ b[i]); return cnt; } /* Batch Hamming: query (1 x n_words) vs corpus (n_vecs x n_words) Results written to out[n_vecs]. Layout: corpus is row-major contiguous. */ void hamming_batch(const uint64_t *query, const uint64_t *corpus, int n_words, int n_vecs, int *out) { for (int v = 0; v < n_vecs; v++) { int cnt = 0; const uint64_t *row = corpus + (long)v * n_words; for (int w = 0; w < n_words; w++) cnt += __builtin_popcountll(query[w] ^ row[w]); out[v] = cnt; } } /* Batch Hamming with top-K selection (partial sort). Returns indices of top_k smallest distances. Uses a simple max-heap of size top_k for O(N log K). */ static void swap_int(int *a, int *b) { int t = *a; *a = *b; *b = t; } static void sift_down_max(int *heap_dist, int *heap_idx, int size, int i) { while (1) { int largest = i, l = 2*i+1, r = 2*i+2; if (l < size && heap_dist[l] > heap_dist[largest]) largest = l; if (r < size && heap_dist[r] > heap_dist[largest]) largest = r; if (largest == i) break; swap_int(&heap_dist[i], &heap_dist[largest]); swap_int(&heap_idx[i], &heap_idx[largest]); i = largest; } } void hamming_topk(const uint64_t *query, const uint64_t *corpus, int n_words, int n_vecs, int top_k, int *out_indices, int *out_dists) { /* Initialize heap with first top_k elements */ int heap_size = (top_k < n_vecs) ? top_k : n_vecs; for (int v = 0; v < heap_size; v++) { int cnt = 0; const uint64_t *row = corpus + (long)v * n_words; for (int w = 0; w < n_words; w++) cnt += __builtin_popcountll(query[w] ^ row[w]); out_dists[v] = cnt; out_indices[v] = v; } /* Build max-heap */ for (int i = heap_size/2 - 1; i >= 0; i--) sift_down_max(out_dists, out_indices, heap_size, i); /* Process remaining vectors */ for (int v = heap_size; v < n_vecs; v++) { int cnt = 0; const uint64_t *row = corpus + (long)v * n_words; for (int w = 0; w < n_words; w++) cnt += __builtin_popcountll(query[w] ^ row[w]); if (cnt < out_dists[0]) { out_dists[0] = cnt; out_indices[0] = v; sift_down_max(out_dists, out_indices, heap_size, 0); } } } /* Popcount of a single vector (count of 1-bits) */ int popcount_vec(const uint64_t *a, int n_words) { int cnt = 0; for (int i = 0; i < n_words; i++) cnt += __builtin_popcountll(a[i]); return cnt; } /* Batch XOR: out[i] = a[i] ^ b[i] for vectors of n_words */ void xor_vectors(const uint64_t *a, const uint64_t *b, uint64_t *out, int n_words) { for (int i = 0; i < n_words; i++) out[i] = a[i] ^ b[i]; } /* Batch majority vote: given n_vecs vectors of n_words uint64, compute per-bit majority. Result in out[n_words]. */ void majority_vote(const uint64_t *vecs, int n_vecs, int n_words, uint64_t *out) { int n_bits = n_words * 64; int threshold = n_vecs / 2; /* Count per-bit using word-level iteration */ for (int w = 0; w < n_words; w++) { uint64_t result = 0; for (int b = 0; b < 64; b++) { int count = 0; uint64_t mask = (uint64_t)1 << b; for (int v = 0; v < n_vecs; v++) count += ((vecs[(long)v * n_words + w] & mask) != 0); if (count > threshold) result |= mask; } out[w] = result; } } """ _lib = None _lib_path = None def _compile_native(): """Compile the C library with native SIMD optimization.""" global _lib, _lib_path if _lib is not None: return _lib src_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.c") lib_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.so") _lib_path = lib_path with open(src_path, "w") as f: f.write(_NATIVE_C_SOURCE) try: subprocess.run( ["gcc", "-O3", "-march=native", "-shared", "-fPIC", "-o", lib_path, src_path], check=True, capture_output=True, text=True ) lib = ctypes.CDLL(lib_path) # hamming_single lib.hamming_single.restype = ctypes.c_int lib.hamming_single.argtypes = [ ctypes.POINTER(ctypes.c_uint64), ctypes.POINTER(ctypes.c_uint64), ctypes.c_int ] # hamming_batch lib.hamming_batch.restype = None lib.hamming_batch.argtypes = [ ctypes.POINTER(ctypes.c_uint64), ctypes.POINTER(ctypes.c_uint64), ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int) ] # hamming_topk lib.hamming_topk.restype = None lib.hamming_topk.argtypes = [ ctypes.POINTER(ctypes.c_uint64), ctypes.POINTER(ctypes.c_uint64), ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int) ] # popcount_vec lib.popcount_vec.restype = ctypes.c_int lib.popcount_vec.argtypes = [ ctypes.POINTER(ctypes.c_uint64), ctypes.c_int ] # xor_vectors lib.xor_vectors.restype = None lib.xor_vectors.argtypes = [ ctypes.POINTER(ctypes.c_uint64), ctypes.POINTER(ctypes.c_uint64), ctypes.POINTER(ctypes.c_uint64), ctypes.c_int ] # majority_vote lib.majority_vote.restype = None lib.majority_vote.argtypes = [ ctypes.POINTER(ctypes.c_uint64), ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_uint64) ] _lib = lib logger.info("Native SIMD library compiled successfully with -march=native") return lib except Exception as e: logger.warning(f"Failed to compile native SIMD library: {e}. Using NumPy fallback.") return None def get_native_lib(): """Get the compiled native library (lazy initialization).""" return _compile_native() # ── NumPy Fallback Operations ───────────────────────────────────────────────── # LUT for byte-level popcount (256 entries) _POPCOUNT_LUT = np.array([bin(i).count('1') for i in range(256)], dtype=np.int32) def _np_hamming_single(a: np.ndarray, b: np.ndarray) -> int: """Pure NumPy Hamming distance between two packed uint64 vectors.""" xor = np.bitwise_xor(a, b).view(np.uint8) return int(_POPCOUNT_LUT[xor].sum()) def _np_hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray: """Pure NumPy batch Hamming distance. query: (N_WORDS,), corpus: (M, N_WORDS).""" xor = np.bitwise_xor(query[np.newaxis, :], corpus) # (M, N_WORDS) xor_bytes = xor.view(np.uint8) # (M, N_BYTES) return _POPCOUNT_LUT[xor_bytes].reshape(len(corpus), -1).sum(axis=1) # ── Public API (auto-selects native or fallback) ───────────────────────────── def _as_ptr64(arr: np.ndarray): """Get ctypes pointer to uint64 array.""" return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_uint64)) def _as_ptr32(arr: np.ndarray): """Get ctypes pointer to int32 array.""" return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_int)) def hamming_distance(a: np.ndarray, b: np.ndarray) -> int: """Compute Hamming distance between two 4096-bit packed vectors. a, b: np.ndarray of shape (N_WORDS,) dtype=uint64. """ lib = get_native_lib() if lib is not None: return lib.hamming_single(_as_ptr64(a), _as_ptr64(b), N_WORDS) return _np_hamming_single(a, b) def hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray: """Compute Hamming distances from query to all corpus vectors. query: (N_WORDS,) uint64 corpus: (M, N_WORDS) uint64, C-contiguous Returns: (M,) int32 array of distances. """ assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous for SIMD" n_vecs = corpus.shape[0] lib = get_native_lib() if lib is not None: out = np.empty(n_vecs, dtype=np.int32) lib.hamming_batch( _as_ptr64(query), _as_ptr64(corpus), N_WORDS, n_vecs, _as_ptr32(out) ) return out return _np_hamming_batch(query, corpus).astype(np.int32) def hamming_topk(query: np.ndarray, corpus: np.ndarray, k: int = 500): """Find top-k nearest vectors by Hamming distance. Returns: (indices, distances) each of shape (k,), sorted ascending by distance. Uses O(N log K) max-heap in native code. """ assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous" n_vecs = corpus.shape[0] actual_k = min(k, n_vecs) lib = get_native_lib() if lib is not None: out_idx = np.empty(actual_k, dtype=np.int32) out_dist = np.empty(actual_k, dtype=np.int32) lib.hamming_topk( _as_ptr64(query), _as_ptr64(corpus), N_WORDS, n_vecs, actual_k, _as_ptr32(out_idx), _as_ptr32(out_dist) ) # Sort by distance (heap output is unordered) order = np.argsort(out_dist) return out_idx[order], out_dist[order] else: dists = _np_hamming_batch(query, corpus) if actual_k < n_vecs: idx = np.argpartition(dists, actual_k)[:actual_k] else: idx = np.arange(n_vecs) order = np.argsort(dists[idx]) sorted_idx = idx[order] return sorted_idx.astype(np.int32), dists[sorted_idx].astype(np.int32) def xor_vectors(a: np.ndarray, b: np.ndarray) -> np.ndarray: """Bitwise XOR of two packed uint64 vectors.""" lib = get_native_lib() if lib is not None: out = np.empty(N_WORDS, dtype=np.uint64) lib.xor_vectors(_as_ptr64(a), _as_ptr64(b), _as_ptr64(out), N_WORDS) return out return np.bitwise_xor(a, b) def popcount(a: np.ndarray) -> int: """Count number of 1-bits in packed uint64 vector.""" lib = get_native_lib() if lib is not None: return lib.popcount_vec(_as_ptr64(a), N_WORDS) return int(_POPCOUNT_LUT[a.view(np.uint8)].sum()) def majority_vote(vectors: np.ndarray) -> np.ndarray: """Bitwise majority vote across multiple packed uint64 vectors. vectors: (M, N_WORDS) uint64, C-contiguous. Returns: (N_WORDS,) uint64. """ assert vectors.flags['C_CONTIGUOUS'] n_vecs = vectors.shape[0] lib = get_native_lib() if lib is not None: out = np.empty(N_WORDS, dtype=np.uint64) lib.majority_vote(_as_ptr64(vectors), n_vecs, N_WORDS, _as_ptr64(out)) return out # NumPy fallback: unpack, sum, threshold bits = np.unpackbits(vectors.view(np.uint8), axis=1) # (M, N_BITS) summed = bits.astype(np.int32).sum(axis=0) majority = (summed > n_vecs / 2).astype(np.uint8) return np.packbits(majority).view(np.uint64) # ── Vector Generation ───────────────────────────────────────────────────────── def random_binary_vector(n_words: int = N_WORDS) -> np.ndarray: """Generate a random 4096-bit vector, stored as packed uint64. Each bit is iid Bernoulli(0.5) → balanced density. """ return np.random.randint( 0, np.iinfo(np.uint64).max + 1, size=n_words, dtype=np.uint64 ) def random_binary_vectors(n: int, n_words: int = N_WORDS) -> np.ndarray: """Generate n random 4096-bit vectors. Shape: (n, N_WORDS), C-contiguous.""" return np.ascontiguousarray( np.random.randint( 0, np.iinfo(np.uint64).max + 1, size=(n, n_words), dtype=np.uint64 ) ) def normalize_density(v: np.ndarray, target_density: float = 0.5) -> np.ndarray: """Normalize a binary vector to target bit density. Randomly flips bits to reach the desired proportion of 1-bits. """ bits = np.unpackbits(v.view(np.uint8)) current = bits.sum() / len(bits) target_ones = int(target_density * len(bits)) current_ones = int(bits.sum()) if current_ones == target_ones: return v.copy() if current_ones > target_ones: # Flip some 1s to 0s one_positions = np.where(bits == 1)[0] to_flip = np.random.choice(one_positions, current_ones - target_ones, replace=False) bits[to_flip] = 0 else: # Flip some 0s to 1s zero_positions = np.where(bits == 0)[0] to_flip = np.random.choice(zero_positions, target_ones - current_ones, replace=False) bits[to_flip] = 1 return np.packbits(bits).view(np.uint64).copy() def hamming_similarity(a: np.ndarray, b: np.ndarray) -> float: """Normalized Hamming similarity in [0, 1]. 1.0 = identical.""" return 1.0 - hamming_distance(a, b) / N_BITS