Harry00's picture
feat: complete MLE engine implementation
ebaf2ce verified
"""
MLE SIMD-Optimized Bitwise Operations
=====================================
Hardware-accelerated Hamming distance, popcount, and batch XOR operations.
Uses ctypes to call GCC-compiled C with -march=native for automatic SIMD
vectorization (AVX-512 VPOPCNTQ / AVX2 POPCNT / SSE4.2 POPCNT).
Fallback: pure NumPy LUT-based popcount for portability.
"""
import numpy as np
import ctypes
import tempfile
import subprocess
import os
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# ── Constants ──────────────────────────────────────────────────────────────────
N_BITS = 4096
N_WORDS = N_BITS // 64 # 64 uint64 words = 512 bytes per vector
N_BYTES = N_BITS // 8 # 512 bytes
# ── Compile native SIMD library ───────────────────────────────────────────────
_NATIVE_C_SOURCE = r"""
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
/* Single-pair Hamming distance: XOR + popcount over N uint64 words */
int hamming_single(const uint64_t *a, const uint64_t *b, int n_words) {
int cnt = 0;
for (int i = 0; i < n_words; i++)
cnt += __builtin_popcountll(a[i] ^ b[i]);
return cnt;
}
/* Batch Hamming: query (1 x n_words) vs corpus (n_vecs x n_words)
Results written to out[n_vecs]. Layout: corpus is row-major contiguous. */
void hamming_batch(const uint64_t *query, const uint64_t *corpus,
int n_words, int n_vecs, int *out) {
for (int v = 0; v < n_vecs; v++) {
int cnt = 0;
const uint64_t *row = corpus + (long)v * n_words;
for (int w = 0; w < n_words; w++)
cnt += __builtin_popcountll(query[w] ^ row[w]);
out[v] = cnt;
}
}
/* Batch Hamming with top-K selection (partial sort).
Returns indices of top_k smallest distances.
Uses a simple max-heap of size top_k for O(N log K). */
static void swap_int(int *a, int *b) { int t = *a; *a = *b; *b = t; }
static void sift_down_max(int *heap_dist, int *heap_idx, int size, int i) {
while (1) {
int largest = i, l = 2*i+1, r = 2*i+2;
if (l < size && heap_dist[l] > heap_dist[largest]) largest = l;
if (r < size && heap_dist[r] > heap_dist[largest]) largest = r;
if (largest == i) break;
swap_int(&heap_dist[i], &heap_dist[largest]);
swap_int(&heap_idx[i], &heap_idx[largest]);
i = largest;
}
}
void hamming_topk(const uint64_t *query, const uint64_t *corpus,
int n_words, int n_vecs, int top_k,
int *out_indices, int *out_dists) {
/* Initialize heap with first top_k elements */
int heap_size = (top_k < n_vecs) ? top_k : n_vecs;
for (int v = 0; v < heap_size; v++) {
int cnt = 0;
const uint64_t *row = corpus + (long)v * n_words;
for (int w = 0; w < n_words; w++)
cnt += __builtin_popcountll(query[w] ^ row[w]);
out_dists[v] = cnt;
out_indices[v] = v;
}
/* Build max-heap */
for (int i = heap_size/2 - 1; i >= 0; i--)
sift_down_max(out_dists, out_indices, heap_size, i);
/* Process remaining vectors */
for (int v = heap_size; v < n_vecs; v++) {
int cnt = 0;
const uint64_t *row = corpus + (long)v * n_words;
for (int w = 0; w < n_words; w++)
cnt += __builtin_popcountll(query[w] ^ row[w]);
if (cnt < out_dists[0]) {
out_dists[0] = cnt;
out_indices[0] = v;
sift_down_max(out_dists, out_indices, heap_size, 0);
}
}
}
/* Popcount of a single vector (count of 1-bits) */
int popcount_vec(const uint64_t *a, int n_words) {
int cnt = 0;
for (int i = 0; i < n_words; i++)
cnt += __builtin_popcountll(a[i]);
return cnt;
}
/* Batch XOR: out[i] = a[i] ^ b[i] for vectors of n_words */
void xor_vectors(const uint64_t *a, const uint64_t *b, uint64_t *out, int n_words) {
for (int i = 0; i < n_words; i++)
out[i] = a[i] ^ b[i];
}
/* Batch majority vote: given n_vecs vectors of n_words uint64,
compute per-bit majority. Result in out[n_words]. */
void majority_vote(const uint64_t *vecs, int n_vecs, int n_words, uint64_t *out) {
int n_bits = n_words * 64;
int threshold = n_vecs / 2;
/* Count per-bit using word-level iteration */
for (int w = 0; w < n_words; w++) {
uint64_t result = 0;
for (int b = 0; b < 64; b++) {
int count = 0;
uint64_t mask = (uint64_t)1 << b;
for (int v = 0; v < n_vecs; v++)
count += ((vecs[(long)v * n_words + w] & mask) != 0);
if (count > threshold)
result |= mask;
}
out[w] = result;
}
}
"""
_lib = None
_lib_path = None
def _compile_native():
"""Compile the C library with native SIMD optimization."""
global _lib, _lib_path
if _lib is not None:
return _lib
src_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.c")
lib_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.so")
_lib_path = lib_path
with open(src_path, "w") as f:
f.write(_NATIVE_C_SOURCE)
try:
subprocess.run(
["gcc", "-O3", "-march=native", "-shared", "-fPIC",
"-o", lib_path, src_path],
check=True, capture_output=True, text=True
)
lib = ctypes.CDLL(lib_path)
# hamming_single
lib.hamming_single.restype = ctypes.c_int
lib.hamming_single.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int
]
# hamming_batch
lib.hamming_batch.restype = None
lib.hamming_batch.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_int)
]
# hamming_topk
lib.hamming_topk.restype = None
lib.hamming_topk.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int, ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(ctypes.c_int)
]
# popcount_vec
lib.popcount_vec.restype = ctypes.c_int
lib.popcount_vec.argtypes = [
ctypes.POINTER(ctypes.c_uint64), ctypes.c_int
]
# xor_vectors
lib.xor_vectors.restype = None
lib.xor_vectors.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int
]
# majority_vote
lib.majority_vote.restype = None
lib.majority_vote.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_uint64)
]
_lib = lib
logger.info("Native SIMD library compiled successfully with -march=native")
return lib
except Exception as e:
logger.warning(f"Failed to compile native SIMD library: {e}. Using NumPy fallback.")
return None
def get_native_lib():
"""Get the compiled native library (lazy initialization)."""
return _compile_native()
# ── NumPy Fallback Operations ─────────────────────────────────────────────────
# LUT for byte-level popcount (256 entries)
_POPCOUNT_LUT = np.array([bin(i).count('1') for i in range(256)], dtype=np.int32)
def _np_hamming_single(a: np.ndarray, b: np.ndarray) -> int:
"""Pure NumPy Hamming distance between two packed uint64 vectors."""
xor = np.bitwise_xor(a, b).view(np.uint8)
return int(_POPCOUNT_LUT[xor].sum())
def _np_hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray:
"""Pure NumPy batch Hamming distance. query: (N_WORDS,), corpus: (M, N_WORDS)."""
xor = np.bitwise_xor(query[np.newaxis, :], corpus) # (M, N_WORDS)
xor_bytes = xor.view(np.uint8) # (M, N_BYTES)
return _POPCOUNT_LUT[xor_bytes].reshape(len(corpus), -1).sum(axis=1)
# ── Public API (auto-selects native or fallback) ─────────────────────────────
def _as_ptr64(arr: np.ndarray):
"""Get ctypes pointer to uint64 array."""
return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_uint64))
def _as_ptr32(arr: np.ndarray):
"""Get ctypes pointer to int32 array."""
return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_int))
def hamming_distance(a: np.ndarray, b: np.ndarray) -> int:
"""Compute Hamming distance between two 4096-bit packed vectors.
a, b: np.ndarray of shape (N_WORDS,) dtype=uint64.
"""
lib = get_native_lib()
if lib is not None:
return lib.hamming_single(_as_ptr64(a), _as_ptr64(b), N_WORDS)
return _np_hamming_single(a, b)
def hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray:
"""Compute Hamming distances from query to all corpus vectors.
query: (N_WORDS,) uint64
corpus: (M, N_WORDS) uint64, C-contiguous
Returns: (M,) int32 array of distances.
"""
assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous for SIMD"
n_vecs = corpus.shape[0]
lib = get_native_lib()
if lib is not None:
out = np.empty(n_vecs, dtype=np.int32)
lib.hamming_batch(
_as_ptr64(query), _as_ptr64(corpus),
N_WORDS, n_vecs, _as_ptr32(out)
)
return out
return _np_hamming_batch(query, corpus).astype(np.int32)
def hamming_topk(query: np.ndarray, corpus: np.ndarray, k: int = 500):
"""Find top-k nearest vectors by Hamming distance.
Returns: (indices, distances) each of shape (k,), sorted ascending by distance.
Uses O(N log K) max-heap in native code.
"""
assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous"
n_vecs = corpus.shape[0]
actual_k = min(k, n_vecs)
lib = get_native_lib()
if lib is not None:
out_idx = np.empty(actual_k, dtype=np.int32)
out_dist = np.empty(actual_k, dtype=np.int32)
lib.hamming_topk(
_as_ptr64(query), _as_ptr64(corpus),
N_WORDS, n_vecs, actual_k,
_as_ptr32(out_idx), _as_ptr32(out_dist)
)
# Sort by distance (heap output is unordered)
order = np.argsort(out_dist)
return out_idx[order], out_dist[order]
else:
dists = _np_hamming_batch(query, corpus)
if actual_k < n_vecs:
idx = np.argpartition(dists, actual_k)[:actual_k]
else:
idx = np.arange(n_vecs)
order = np.argsort(dists[idx])
sorted_idx = idx[order]
return sorted_idx.astype(np.int32), dists[sorted_idx].astype(np.int32)
def xor_vectors(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Bitwise XOR of two packed uint64 vectors."""
lib = get_native_lib()
if lib is not None:
out = np.empty(N_WORDS, dtype=np.uint64)
lib.xor_vectors(_as_ptr64(a), _as_ptr64(b), _as_ptr64(out), N_WORDS)
return out
return np.bitwise_xor(a, b)
def popcount(a: np.ndarray) -> int:
"""Count number of 1-bits in packed uint64 vector."""
lib = get_native_lib()
if lib is not None:
return lib.popcount_vec(_as_ptr64(a), N_WORDS)
return int(_POPCOUNT_LUT[a.view(np.uint8)].sum())
def majority_vote(vectors: np.ndarray) -> np.ndarray:
"""Bitwise majority vote across multiple packed uint64 vectors.
vectors: (M, N_WORDS) uint64, C-contiguous.
Returns: (N_WORDS,) uint64.
"""
assert vectors.flags['C_CONTIGUOUS']
n_vecs = vectors.shape[0]
lib = get_native_lib()
if lib is not None:
out = np.empty(N_WORDS, dtype=np.uint64)
lib.majority_vote(_as_ptr64(vectors), n_vecs, N_WORDS, _as_ptr64(out))
return out
# NumPy fallback: unpack, sum, threshold
bits = np.unpackbits(vectors.view(np.uint8), axis=1) # (M, N_BITS)
summed = bits.astype(np.int32).sum(axis=0)
majority = (summed > n_vecs / 2).astype(np.uint8)
return np.packbits(majority).view(np.uint64)
# ── Vector Generation ─────────────────────────────────────────────────────────
def random_binary_vector(n_words: int = N_WORDS) -> np.ndarray:
"""Generate a random 4096-bit vector, stored as packed uint64.
Each bit is iid Bernoulli(0.5) β†’ balanced density.
"""
return np.random.randint(
0, np.iinfo(np.uint64).max + 1,
size=n_words, dtype=np.uint64
)
def random_binary_vectors(n: int, n_words: int = N_WORDS) -> np.ndarray:
"""Generate n random 4096-bit vectors. Shape: (n, N_WORDS), C-contiguous."""
return np.ascontiguousarray(
np.random.randint(
0, np.iinfo(np.uint64).max + 1,
size=(n, n_words), dtype=np.uint64
)
)
def normalize_density(v: np.ndarray, target_density: float = 0.5) -> np.ndarray:
"""Normalize a binary vector to target bit density.
Randomly flips bits to reach the desired proportion of 1-bits.
"""
bits = np.unpackbits(v.view(np.uint8))
current = bits.sum() / len(bits)
target_ones = int(target_density * len(bits))
current_ones = int(bits.sum())
if current_ones == target_ones:
return v.copy()
if current_ones > target_ones:
# Flip some 1s to 0s
one_positions = np.where(bits == 1)[0]
to_flip = np.random.choice(one_positions, current_ones - target_ones, replace=False)
bits[to_flip] = 0
else:
# Flip some 0s to 1s
zero_positions = np.where(bits == 0)[0]
to_flip = np.random.choice(zero_positions, target_ones - current_ones, replace=False)
bits[to_flip] = 1
return np.packbits(bits).view(np.uint64).copy()
def hamming_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Normalized Hamming similarity in [0, 1]. 1.0 = identical."""
return 1.0 - hamming_distance(a, b) / N_BITS