File size: 14,362 Bytes
ebaf2ce | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 | """
MLE SIMD-Optimized Bitwise Operations
=====================================
Hardware-accelerated Hamming distance, popcount, and batch XOR operations.
Uses ctypes to call GCC-compiled C with -march=native for automatic SIMD
vectorization (AVX-512 VPOPCNTQ / AVX2 POPCNT / SSE4.2 POPCNT).
Fallback: pure NumPy LUT-based popcount for portability.
"""
import numpy as np
import ctypes
import tempfile
import subprocess
import os
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# ββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
N_BITS = 4096
N_WORDS = N_BITS // 64 # 64 uint64 words = 512 bytes per vector
N_BYTES = N_BITS // 8 # 512 bytes
# ββ Compile native SIMD library βββββββββββββββββββββββββββββββββββββββββββββββ
_NATIVE_C_SOURCE = r"""
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
/* Single-pair Hamming distance: XOR + popcount over N uint64 words */
int hamming_single(const uint64_t *a, const uint64_t *b, int n_words) {
int cnt = 0;
for (int i = 0; i < n_words; i++)
cnt += __builtin_popcountll(a[i] ^ b[i]);
return cnt;
}
/* Batch Hamming: query (1 x n_words) vs corpus (n_vecs x n_words)
Results written to out[n_vecs]. Layout: corpus is row-major contiguous. */
void hamming_batch(const uint64_t *query, const uint64_t *corpus,
int n_words, int n_vecs, int *out) {
for (int v = 0; v < n_vecs; v++) {
int cnt = 0;
const uint64_t *row = corpus + (long)v * n_words;
for (int w = 0; w < n_words; w++)
cnt += __builtin_popcountll(query[w] ^ row[w]);
out[v] = cnt;
}
}
/* Batch Hamming with top-K selection (partial sort).
Returns indices of top_k smallest distances.
Uses a simple max-heap of size top_k for O(N log K). */
static void swap_int(int *a, int *b) { int t = *a; *a = *b; *b = t; }
static void sift_down_max(int *heap_dist, int *heap_idx, int size, int i) {
while (1) {
int largest = i, l = 2*i+1, r = 2*i+2;
if (l < size && heap_dist[l] > heap_dist[largest]) largest = l;
if (r < size && heap_dist[r] > heap_dist[largest]) largest = r;
if (largest == i) break;
swap_int(&heap_dist[i], &heap_dist[largest]);
swap_int(&heap_idx[i], &heap_idx[largest]);
i = largest;
}
}
void hamming_topk(const uint64_t *query, const uint64_t *corpus,
int n_words, int n_vecs, int top_k,
int *out_indices, int *out_dists) {
/* Initialize heap with first top_k elements */
int heap_size = (top_k < n_vecs) ? top_k : n_vecs;
for (int v = 0; v < heap_size; v++) {
int cnt = 0;
const uint64_t *row = corpus + (long)v * n_words;
for (int w = 0; w < n_words; w++)
cnt += __builtin_popcountll(query[w] ^ row[w]);
out_dists[v] = cnt;
out_indices[v] = v;
}
/* Build max-heap */
for (int i = heap_size/2 - 1; i >= 0; i--)
sift_down_max(out_dists, out_indices, heap_size, i);
/* Process remaining vectors */
for (int v = heap_size; v < n_vecs; v++) {
int cnt = 0;
const uint64_t *row = corpus + (long)v * n_words;
for (int w = 0; w < n_words; w++)
cnt += __builtin_popcountll(query[w] ^ row[w]);
if (cnt < out_dists[0]) {
out_dists[0] = cnt;
out_indices[0] = v;
sift_down_max(out_dists, out_indices, heap_size, 0);
}
}
}
/* Popcount of a single vector (count of 1-bits) */
int popcount_vec(const uint64_t *a, int n_words) {
int cnt = 0;
for (int i = 0; i < n_words; i++)
cnt += __builtin_popcountll(a[i]);
return cnt;
}
/* Batch XOR: out[i] = a[i] ^ b[i] for vectors of n_words */
void xor_vectors(const uint64_t *a, const uint64_t *b, uint64_t *out, int n_words) {
for (int i = 0; i < n_words; i++)
out[i] = a[i] ^ b[i];
}
/* Batch majority vote: given n_vecs vectors of n_words uint64,
compute per-bit majority. Result in out[n_words]. */
void majority_vote(const uint64_t *vecs, int n_vecs, int n_words, uint64_t *out) {
int n_bits = n_words * 64;
int threshold = n_vecs / 2;
/* Count per-bit using word-level iteration */
for (int w = 0; w < n_words; w++) {
uint64_t result = 0;
for (int b = 0; b < 64; b++) {
int count = 0;
uint64_t mask = (uint64_t)1 << b;
for (int v = 0; v < n_vecs; v++)
count += ((vecs[(long)v * n_words + w] & mask) != 0);
if (count > threshold)
result |= mask;
}
out[w] = result;
}
}
"""
_lib = None
_lib_path = None
def _compile_native():
"""Compile the C library with native SIMD optimization."""
global _lib, _lib_path
if _lib is not None:
return _lib
src_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.c")
lib_path = os.path.join(tempfile.gettempdir(), "mle_simd_ops.so")
_lib_path = lib_path
with open(src_path, "w") as f:
f.write(_NATIVE_C_SOURCE)
try:
subprocess.run(
["gcc", "-O3", "-march=native", "-shared", "-fPIC",
"-o", lib_path, src_path],
check=True, capture_output=True, text=True
)
lib = ctypes.CDLL(lib_path)
# hamming_single
lib.hamming_single.restype = ctypes.c_int
lib.hamming_single.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int
]
# hamming_batch
lib.hamming_batch.restype = None
lib.hamming_batch.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_int)
]
# hamming_topk
lib.hamming_topk.restype = None
lib.hamming_topk.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int, ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(ctypes.c_int)
]
# popcount_vec
lib.popcount_vec.restype = ctypes.c_int
lib.popcount_vec.argtypes = [
ctypes.POINTER(ctypes.c_uint64), ctypes.c_int
]
# xor_vectors
lib.xor_vectors.restype = None
lib.xor_vectors.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int
]
# majority_vote
lib.majority_vote.restype = None
lib.majority_vote.argtypes = [
ctypes.POINTER(ctypes.c_uint64),
ctypes.c_int, ctypes.c_int,
ctypes.POINTER(ctypes.c_uint64)
]
_lib = lib
logger.info("Native SIMD library compiled successfully with -march=native")
return lib
except Exception as e:
logger.warning(f"Failed to compile native SIMD library: {e}. Using NumPy fallback.")
return None
def get_native_lib():
"""Get the compiled native library (lazy initialization)."""
return _compile_native()
# ββ NumPy Fallback Operations βββββββββββββββββββββββββββββββββββββββββββββββββ
# LUT for byte-level popcount (256 entries)
_POPCOUNT_LUT = np.array([bin(i).count('1') for i in range(256)], dtype=np.int32)
def _np_hamming_single(a: np.ndarray, b: np.ndarray) -> int:
"""Pure NumPy Hamming distance between two packed uint64 vectors."""
xor = np.bitwise_xor(a, b).view(np.uint8)
return int(_POPCOUNT_LUT[xor].sum())
def _np_hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray:
"""Pure NumPy batch Hamming distance. query: (N_WORDS,), corpus: (M, N_WORDS)."""
xor = np.bitwise_xor(query[np.newaxis, :], corpus) # (M, N_WORDS)
xor_bytes = xor.view(np.uint8) # (M, N_BYTES)
return _POPCOUNT_LUT[xor_bytes].reshape(len(corpus), -1).sum(axis=1)
# ββ Public API (auto-selects native or fallback) βββββββββββββββββββββββββββββ
def _as_ptr64(arr: np.ndarray):
"""Get ctypes pointer to uint64 array."""
return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_uint64))
def _as_ptr32(arr: np.ndarray):
"""Get ctypes pointer to int32 array."""
return arr.ctypes.data_as(ctypes.POINTER(ctypes.c_int))
def hamming_distance(a: np.ndarray, b: np.ndarray) -> int:
"""Compute Hamming distance between two 4096-bit packed vectors.
a, b: np.ndarray of shape (N_WORDS,) dtype=uint64.
"""
lib = get_native_lib()
if lib is not None:
return lib.hamming_single(_as_ptr64(a), _as_ptr64(b), N_WORDS)
return _np_hamming_single(a, b)
def hamming_batch(query: np.ndarray, corpus: np.ndarray) -> np.ndarray:
"""Compute Hamming distances from query to all corpus vectors.
query: (N_WORDS,) uint64
corpus: (M, N_WORDS) uint64, C-contiguous
Returns: (M,) int32 array of distances.
"""
assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous for SIMD"
n_vecs = corpus.shape[0]
lib = get_native_lib()
if lib is not None:
out = np.empty(n_vecs, dtype=np.int32)
lib.hamming_batch(
_as_ptr64(query), _as_ptr64(corpus),
N_WORDS, n_vecs, _as_ptr32(out)
)
return out
return _np_hamming_batch(query, corpus).astype(np.int32)
def hamming_topk(query: np.ndarray, corpus: np.ndarray, k: int = 500):
"""Find top-k nearest vectors by Hamming distance.
Returns: (indices, distances) each of shape (k,), sorted ascending by distance.
Uses O(N log K) max-heap in native code.
"""
assert corpus.flags['C_CONTIGUOUS'], "Corpus must be C-contiguous"
n_vecs = corpus.shape[0]
actual_k = min(k, n_vecs)
lib = get_native_lib()
if lib is not None:
out_idx = np.empty(actual_k, dtype=np.int32)
out_dist = np.empty(actual_k, dtype=np.int32)
lib.hamming_topk(
_as_ptr64(query), _as_ptr64(corpus),
N_WORDS, n_vecs, actual_k,
_as_ptr32(out_idx), _as_ptr32(out_dist)
)
# Sort by distance (heap output is unordered)
order = np.argsort(out_dist)
return out_idx[order], out_dist[order]
else:
dists = _np_hamming_batch(query, corpus)
if actual_k < n_vecs:
idx = np.argpartition(dists, actual_k)[:actual_k]
else:
idx = np.arange(n_vecs)
order = np.argsort(dists[idx])
sorted_idx = idx[order]
return sorted_idx.astype(np.int32), dists[sorted_idx].astype(np.int32)
def xor_vectors(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Bitwise XOR of two packed uint64 vectors."""
lib = get_native_lib()
if lib is not None:
out = np.empty(N_WORDS, dtype=np.uint64)
lib.xor_vectors(_as_ptr64(a), _as_ptr64(b), _as_ptr64(out), N_WORDS)
return out
return np.bitwise_xor(a, b)
def popcount(a: np.ndarray) -> int:
"""Count number of 1-bits in packed uint64 vector."""
lib = get_native_lib()
if lib is not None:
return lib.popcount_vec(_as_ptr64(a), N_WORDS)
return int(_POPCOUNT_LUT[a.view(np.uint8)].sum())
def majority_vote(vectors: np.ndarray) -> np.ndarray:
"""Bitwise majority vote across multiple packed uint64 vectors.
vectors: (M, N_WORDS) uint64, C-contiguous.
Returns: (N_WORDS,) uint64.
"""
assert vectors.flags['C_CONTIGUOUS']
n_vecs = vectors.shape[0]
lib = get_native_lib()
if lib is not None:
out = np.empty(N_WORDS, dtype=np.uint64)
lib.majority_vote(_as_ptr64(vectors), n_vecs, N_WORDS, _as_ptr64(out))
return out
# NumPy fallback: unpack, sum, threshold
bits = np.unpackbits(vectors.view(np.uint8), axis=1) # (M, N_BITS)
summed = bits.astype(np.int32).sum(axis=0)
majority = (summed > n_vecs / 2).astype(np.uint8)
return np.packbits(majority).view(np.uint64)
# ββ Vector Generation βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def random_binary_vector(n_words: int = N_WORDS) -> np.ndarray:
"""Generate a random 4096-bit vector, stored as packed uint64.
Each bit is iid Bernoulli(0.5) β balanced density.
"""
return np.random.randint(
0, np.iinfo(np.uint64).max + 1,
size=n_words, dtype=np.uint64
)
def random_binary_vectors(n: int, n_words: int = N_WORDS) -> np.ndarray:
"""Generate n random 4096-bit vectors. Shape: (n, N_WORDS), C-contiguous."""
return np.ascontiguousarray(
np.random.randint(
0, np.iinfo(np.uint64).max + 1,
size=(n, n_words), dtype=np.uint64
)
)
def normalize_density(v: np.ndarray, target_density: float = 0.5) -> np.ndarray:
"""Normalize a binary vector to target bit density.
Randomly flips bits to reach the desired proportion of 1-bits.
"""
bits = np.unpackbits(v.view(np.uint8))
current = bits.sum() / len(bits)
target_ones = int(target_density * len(bits))
current_ones = int(bits.sum())
if current_ones == target_ones:
return v.copy()
if current_ones > target_ones:
# Flip some 1s to 0s
one_positions = np.where(bits == 1)[0]
to_flip = np.random.choice(one_positions, current_ones - target_ones, replace=False)
bits[to_flip] = 0
else:
# Flip some 0s to 1s
zero_positions = np.where(bits == 0)[0]
to_flip = np.random.choice(zero_positions, target_ones - current_ones, replace=False)
bits[to_flip] = 1
return np.packbits(bits).view(np.uint64).copy()
def hamming_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Normalized Hamming similarity in [0, 1]. 1.0 = identical."""
return 1.0 - hamming_distance(a, b) / N_BITS
|