| """ |
| MLE Binding Module: Semantic Binding Operations |
| ================================================= |
| Implements circular convolution-based binding for composing and |
| decomposing semantic relations between hyperdimensional vectors. |
| |
| Two implementations: |
| 1. FFT-based (HRR): High precision, O(N log N), works on real-valued vectors |
| 2. Binary (BSC): O(N/64) via XOR, works directly on packed uint64 vectors |
| |
| The binding operation creates a new vector C = bind(A, B) such that: |
| - C is quasi-orthogonal to both A and B |
| - unbind(C, B) β A (recoverable) |
| - bind is commutative and associative |
| |
| This enables representing structured relations: |
| - "cat IS_A animal" β bind(cat, IS_A) stores a trace recoverable with animal |
| - Analogies: unbind(bind(king, male), bind(queen, female)) β identity |
| """ |
|
|
| import numpy as np |
| from typing import Optional, Tuple, List |
| import logging |
|
|
| from ..utils.simd_ops import ( |
| N_BITS, N_WORDS, |
| xor_vectors, random_binary_vector, random_binary_vectors, |
| hamming_distance, hamming_similarity, majority_vote, popcount |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| class HRRBinding: |
| """ |
| Holographic Reduced Representations via circular convolution. |
| |
| Operates on real-valued vectors of dimension N. |
| Uses numpy.fft for O(N log N) binding/unbinding. |
| |
| Properties: |
| - bind(A, B) = circular_conv(A, B) via IFFT(FFT(A) * FFT(B)) |
| - unbind(C, B) = circular_corr(C, B) via IFFT(FFT(C) * conj(FFT(B))) |
| - Similarity-preserving: cos(A, B) relates to cos(bind(A,X), bind(B,X)) |
| """ |
|
|
| def __init__(self, dim: int = N_BITS): |
| self.dim = dim |
| |
| self._fft_len = dim |
|
|
| @staticmethod |
| def random_vector(dim: int = N_BITS) -> np.ndarray: |
| """Generate a random unit-length HRR vector.""" |
| v = np.random.randn(dim).astype(np.float32) |
| v /= np.linalg.norm(v) |
| return v |
|
|
| @staticmethod |
| def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Circular convolution: bind two vectors. |
| C = IFFT(FFT(A) β FFT(B)) |
| """ |
| A = np.fft.rfft(a) |
| B = np.fft.rfft(b) |
| return np.fft.irfft(A * B, n=len(a)).astype(np.float32) |
|
|
| @staticmethod |
| def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Circular correlation: recover A from C = bind(A, B). |
| A β IFFT(FFT(C) β conj(FFT(B))) |
| """ |
| C = np.fft.rfft(c) |
| B = np.fft.rfft(b) |
| return np.fft.irfft(C * np.conj(B), n=len(c)).astype(np.float32) |
|
|
| @staticmethod |
| def bundle(*vectors: np.ndarray) -> np.ndarray: |
| """Superposition (sum + normalize) of multiple HRR vectors.""" |
| s = np.sum(vectors, axis=0) |
| norm = np.linalg.norm(s) |
| if norm > 1e-8: |
| s /= norm |
| return s.astype(np.float32) |
|
|
| @staticmethod |
| def similarity(a: np.ndarray, b: np.ndarray) -> float: |
| """Cosine similarity between HRR vectors.""" |
| dot = np.dot(a, b) |
| na = np.linalg.norm(a) |
| nb = np.linalg.norm(b) |
| if na < 1e-8 or nb < 1e-8: |
| return 0.0 |
| return float(dot / (na * nb)) |
|
|
| @staticmethod |
| def permute(v: np.ndarray, shift: int = 1) -> np.ndarray: |
| """Cyclic permutation (for positional encoding / sequence ordering).""" |
| return np.roll(v, shift).astype(np.float32) |
|
|
| @staticmethod |
| def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray: |
| """Inverse cyclic permutation.""" |
| return np.roll(v, -shift).astype(np.float32) |
|
|
| @classmethod |
| def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray: |
| """Bind a sequence with positional encoding via permutation. |
| S = Ξ£_i permute(V_i, i) |
| Preserves order information. |
| """ |
| result = np.zeros_like(vectors[0]) |
| for i, v in enumerate(vectors): |
| result += cls.permute(v, i) |
| norm = np.linalg.norm(result) |
| if norm > 1e-8: |
| result /= norm |
| return result.astype(np.float32) |
|
|
| @classmethod |
| def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray: |
| """Encode a role-filler pair: bind(role, filler).""" |
| return cls.bind(role, filler) |
|
|
| @classmethod |
| def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray: |
| """Extract filler from structure given role: unbind(structure, role).""" |
| return cls.unbind(structure, role) |
|
|
| @classmethod |
| def encode_triple(cls, subject: np.ndarray, relation: np.ndarray, |
| obj: np.ndarray) -> np.ndarray: |
| """Encode a knowledge triple (s, r, o). |
| T = bind(bind(subject, relation), object) |
| """ |
| return cls.bind(cls.bind(subject, relation), obj) |
|
|
|
|
| |
| |
| |
|
|
| class BinaryBinding: |
| """ |
| Binary Spatter Code binding via XOR. |
| |
| Operates directly on packed uint64 vectors (512 bytes for 4096 bits). |
| Extremely fast on CPU: single XOR instruction per 64-bit word. |
| |
| Properties: |
| - bind(A, B) = A β B (XOR) |
| - unbind(C, B) = C β B = A (XOR is self-inverse β exact recovery!) |
| - bundle = majority vote |
| - similarity = normalized Hamming distance |
| """ |
|
|
| @staticmethod |
| def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Binary binding via XOR. Self-inverse: bind(bind(a,b), b) = a.""" |
| return xor_vectors(a, b) |
|
|
| @staticmethod |
| def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Binary unbinding = XOR (since XOR is its own inverse).""" |
| return xor_vectors(c, b) |
|
|
| @staticmethod |
| def bundle(*vectors: np.ndarray) -> np.ndarray: |
| """Majority-vote bundling. Requires odd number of vectors for tie-breaking.""" |
| if len(vectors) == 1: |
| return vectors[0].copy() |
| vecs = np.stack(vectors) |
| return majority_vote(np.ascontiguousarray(vecs)) |
|
|
| @staticmethod |
| def similarity(a: np.ndarray, b: np.ndarray) -> float: |
| """Normalized Hamming similarity [0, 1].""" |
| return hamming_similarity(a, b) |
|
|
| @staticmethod |
| def permute(v: np.ndarray, shift: int = 1) -> np.ndarray: |
| """Bit-level cyclic permutation for sequence encoding. |
| Shifts all bits by `shift` positions cyclically. |
| """ |
| bits = np.unpackbits(v.view(np.uint8)) |
| shifted = np.roll(bits, shift) |
| return np.packbits(shifted).view(np.uint64).copy() |
|
|
| @staticmethod |
| def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray: |
| """Inverse bit-level cyclic permutation.""" |
| bits = np.unpackbits(v.view(np.uint8)) |
| shifted = np.roll(bits, -shift) |
| return np.packbits(shifted).view(np.uint64).copy() |
|
|
| @classmethod |
| def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray: |
| """Bind a sequence with positional encoding. |
| S = bundle(permute(V_0, 0), permute(V_1, 1), ..., permute(V_n, n)) |
| """ |
| positioned = [cls.permute(v, i) for i, v in enumerate(vectors)] |
| return cls.bundle(*positioned) |
|
|
| @classmethod |
| def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray: |
| """Encode role-filler: bind(role, filler).""" |
| return cls.bind(role, filler) |
|
|
| @classmethod |
| def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray: |
| """Decode filler from structure given role.""" |
| return cls.unbind(structure, role) |
|
|
| @classmethod |
| def encode_triple(cls, subject: np.ndarray, relation: np.ndarray, |
| obj: np.ndarray) -> np.ndarray: |
| """Encode knowledge triple (s, r, o) = bind(bind(s, r), o).""" |
| return cls.bind(cls.bind(subject, relation), obj) |
|
|
| @classmethod |
| def create_analogy_query(cls, a: np.ndarray, b: np.ndarray, |
| c: np.ndarray) -> np.ndarray: |
| """Create analogy query: a:b :: c:? |
| Relation R = bind(a, b) [XOR extracts the difference] |
| Query = bind(c, R) [apply same relation to c] |
| """ |
| relation = cls.bind(a, b) |
| return cls.bind(c, relation) |
|
|
|
|
| |
| |
| |
|
|
| class BindingEngine: |
| """ |
| Unified binding engine that supports both binary and real-valued operations. |
| |
| The engine maintains a concept codebook (binary vectors for fast routing) |
| and can convert between binary and real domains for FFT operations. |
| """ |
|
|
| def __init__(self, dim: int = N_BITS, use_binary: bool = True): |
| self.dim = dim |
| self.use_binary = use_binary |
| self.binary = BinaryBinding() |
| self.hrr = HRRBinding(dim) |
|
|
| |
| self._codebook: dict = {} |
|
|
| def register_concept(self, name: str, vector: Optional[np.ndarray] = None) -> np.ndarray: |
| """Register a named concept with a binary vector.""" |
| if vector is None: |
| vector = random_binary_vector() |
| self._codebook[name] = vector.copy() |
| return vector |
|
|
| def get_concept(self, name: str) -> Optional[np.ndarray]: |
| """Get binary vector for a named concept.""" |
| return self._codebook.get(name) |
|
|
| def bind(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Bind two vectors using the configured method.""" |
| if self.use_binary and a.dtype == np.uint64: |
| return self.binary.bind(a, b) |
| return self.hrr.bind(a, b) |
|
|
| def unbind(self, c: np.ndarray, b: np.ndarray) -> np.ndarray: |
| """Unbind: recover A from C = bind(A, B) given B.""" |
| if self.use_binary and c.dtype == np.uint64: |
| return self.binary.unbind(c, b) |
| return self.hrr.unbind(c, b) |
|
|
| def bundle(self, *vectors: np.ndarray) -> np.ndarray: |
| """Bundle multiple vectors.""" |
| if self.use_binary and vectors[0].dtype == np.uint64: |
| return self.binary.bundle(*vectors) |
| return self.hrr.bundle(*vectors) |
|
|
| def similarity(self, a: np.ndarray, b: np.ndarray) -> float: |
| """Compute similarity between vectors.""" |
| if self.use_binary and a.dtype == np.uint64: |
| return self.binary.similarity(a, b) |
| return self.hrr.similarity(a, b) |
|
|
| def encode_relation(self, subject: str, relation: str, obj: str) -> np.ndarray: |
| """Encode a semantic relation between named concepts. |
| Auto-registers unknown concepts. |
| """ |
| for name in [subject, relation, obj]: |
| if name not in self._codebook: |
| self.register_concept(name) |
|
|
| s = self._codebook[subject] |
| r = self._codebook[relation] |
| o = self._codebook[obj] |
|
|
| if self.use_binary: |
| return self.binary.encode_triple(s, r, o) |
| return self.hrr.encode_triple( |
| self._to_real(s), self._to_real(r), self._to_real(o) |
| ) |
|
|
| def solve_analogy(self, a: str, b: str, c: str, |
| candidates: Optional[List[str]] = None) -> List[Tuple[str, float]]: |
| """Solve analogy a:b :: c:? |
| Returns ranked candidates with similarity scores. |
| """ |
| va = self._codebook.get(a) |
| vb = self._codebook.get(b) |
| vc = self._codebook.get(c) |
| if va is None or vb is None or vc is None: |
| raise ValueError(f"Unknown concept(s): {a}, {b}, {c}") |
|
|
| if self.use_binary: |
| query = self.binary.create_analogy_query(va, vb, vc) |
| else: |
| query = self.hrr.unbind( |
| self.hrr.bind(self._to_real(vb), self._to_real(vc)), |
| self._to_real(va) |
| ) |
|
|
| |
| search_names = candidates or list(self._codebook.keys()) |
| results = [] |
| for name in search_names: |
| vec = self._codebook[name] |
| if self.use_binary: |
| sim = self.binary.similarity(query, vec) |
| else: |
| sim = self.hrr.similarity(query, self._to_real(vec)) |
| results.append((name, sim)) |
|
|
| results.sort(key=lambda x: x[1], reverse=True) |
| return results |
|
|
| def _to_real(self, binary_vec: np.ndarray) -> np.ndarray: |
| """Convert packed binary vector to real-valued Β±1 vector.""" |
| bits = np.unpackbits(binary_vec.view(np.uint8)).astype(np.float32) |
| return (2.0 * bits - 1.0) |
|
|
| def _to_binary(self, real_vec: np.ndarray) -> np.ndarray: |
| """Convert real-valued vector to packed binary (threshold at 0).""" |
| bits = (real_vec > 0).astype(np.uint8) |
| return np.packbits(bits).view(np.uint64).copy() |
|
|