""" MLE Binding Module: Semantic Binding Operations ================================================= Implements circular convolution-based binding for composing and decomposing semantic relations between hyperdimensional vectors. Two implementations: 1. FFT-based (HRR): High precision, O(N log N), works on real-valued vectors 2. Binary (BSC): O(N/64) via XOR, works directly on packed uint64 vectors The binding operation creates a new vector C = bind(A, B) such that: - C is quasi-orthogonal to both A and B - unbind(C, B) ≈ A (recoverable) - bind is commutative and associative This enables representing structured relations: - "cat IS_A animal" → bind(cat, IS_A) stores a trace recoverable with animal - Analogies: unbind(bind(king, male), bind(queen, female)) ≈ identity """ import numpy as np from typing import Optional, Tuple, List import logging from ..utils.simd_ops import ( N_BITS, N_WORDS, xor_vectors, random_binary_vector, random_binary_vectors, hamming_distance, hamming_similarity, majority_vote, popcount ) logger = logging.getLogger(__name__) # ══════════════════════════════════════════════════════════════════════════════ # FFT-based Circular Convolution (Holographic Reduced Representations) # ══════════════════════════════════════════════════════════════════════════════ class HRRBinding: """ Holographic Reduced Representations via circular convolution. Operates on real-valued vectors of dimension N. Uses numpy.fft for O(N log N) binding/unbinding. Properties: - bind(A, B) = circular_conv(A, B) via IFFT(FFT(A) * FFT(B)) - unbind(C, B) = circular_corr(C, B) via IFFT(FFT(C) * conj(FFT(B))) - Similarity-preserving: cos(A, B) relates to cos(bind(A,X), bind(B,X)) """ def __init__(self, dim: int = N_BITS): self.dim = dim # Pre-allocate FFT workspace self._fft_len = dim # Use full-length FFT @staticmethod def random_vector(dim: int = N_BITS) -> np.ndarray: """Generate a random unit-length HRR vector.""" v = np.random.randn(dim).astype(np.float32) v /= np.linalg.norm(v) return v @staticmethod def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray: """Circular convolution: bind two vectors. C = IFFT(FFT(A) ⊙ FFT(B)) """ A = np.fft.rfft(a) B = np.fft.rfft(b) return np.fft.irfft(A * B, n=len(a)).astype(np.float32) @staticmethod def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray: """Circular correlation: recover A from C = bind(A, B). A ≈ IFFT(FFT(C) ⊙ conj(FFT(B))) """ C = np.fft.rfft(c) B = np.fft.rfft(b) return np.fft.irfft(C * np.conj(B), n=len(c)).astype(np.float32) @staticmethod def bundle(*vectors: np.ndarray) -> np.ndarray: """Superposition (sum + normalize) of multiple HRR vectors.""" s = np.sum(vectors, axis=0) norm = np.linalg.norm(s) if norm > 1e-8: s /= norm return s.astype(np.float32) @staticmethod def similarity(a: np.ndarray, b: np.ndarray) -> float: """Cosine similarity between HRR vectors.""" dot = np.dot(a, b) na = np.linalg.norm(a) nb = np.linalg.norm(b) if na < 1e-8 or nb < 1e-8: return 0.0 return float(dot / (na * nb)) @staticmethod def permute(v: np.ndarray, shift: int = 1) -> np.ndarray: """Cyclic permutation (for positional encoding / sequence ordering).""" return np.roll(v, shift).astype(np.float32) @staticmethod def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray: """Inverse cyclic permutation.""" return np.roll(v, -shift).astype(np.float32) @classmethod def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray: """Bind a sequence with positional encoding via permutation. S = Σ_i permute(V_i, i) Preserves order information. """ result = np.zeros_like(vectors[0]) for i, v in enumerate(vectors): result += cls.permute(v, i) norm = np.linalg.norm(result) if norm > 1e-8: result /= norm return result.astype(np.float32) @classmethod def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray: """Encode a role-filler pair: bind(role, filler).""" return cls.bind(role, filler) @classmethod def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray: """Extract filler from structure given role: unbind(structure, role).""" return cls.unbind(structure, role) @classmethod def encode_triple(cls, subject: np.ndarray, relation: np.ndarray, obj: np.ndarray) -> np.ndarray: """Encode a knowledge triple (s, r, o). T = bind(bind(subject, relation), object) """ return cls.bind(cls.bind(subject, relation), obj) # ══════════════════════════════════════════════════════════════════════════════ # Binary Binding (BSC - Binary Spatter Codes) # ══════════════════════════════════════════════════════════════════════════════ class BinaryBinding: """ Binary Spatter Code binding via XOR. Operates directly on packed uint64 vectors (512 bytes for 4096 bits). Extremely fast on CPU: single XOR instruction per 64-bit word. Properties: - bind(A, B) = A ⊕ B (XOR) - unbind(C, B) = C ⊕ B = A (XOR is self-inverse → exact recovery!) - bundle = majority vote - similarity = normalized Hamming distance """ @staticmethod def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray: """Binary binding via XOR. Self-inverse: bind(bind(a,b), b) = a.""" return xor_vectors(a, b) @staticmethod def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray: """Binary unbinding = XOR (since XOR is its own inverse).""" return xor_vectors(c, b) @staticmethod def bundle(*vectors: np.ndarray) -> np.ndarray: """Majority-vote bundling. Requires odd number of vectors for tie-breaking.""" if len(vectors) == 1: return vectors[0].copy() vecs = np.stack(vectors) return majority_vote(np.ascontiguousarray(vecs)) @staticmethod def similarity(a: np.ndarray, b: np.ndarray) -> float: """Normalized Hamming similarity [0, 1].""" return hamming_similarity(a, b) @staticmethod def permute(v: np.ndarray, shift: int = 1) -> np.ndarray: """Bit-level cyclic permutation for sequence encoding. Shifts all bits by `shift` positions cyclically. """ bits = np.unpackbits(v.view(np.uint8)) shifted = np.roll(bits, shift) return np.packbits(shifted).view(np.uint64).copy() @staticmethod def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray: """Inverse bit-level cyclic permutation.""" bits = np.unpackbits(v.view(np.uint8)) shifted = np.roll(bits, -shift) return np.packbits(shifted).view(np.uint64).copy() @classmethod def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray: """Bind a sequence with positional encoding. S = bundle(permute(V_0, 0), permute(V_1, 1), ..., permute(V_n, n)) """ positioned = [cls.permute(v, i) for i, v in enumerate(vectors)] return cls.bundle(*positioned) @classmethod def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray: """Encode role-filler: bind(role, filler).""" return cls.bind(role, filler) @classmethod def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray: """Decode filler from structure given role.""" return cls.unbind(structure, role) @classmethod def encode_triple(cls, subject: np.ndarray, relation: np.ndarray, obj: np.ndarray) -> np.ndarray: """Encode knowledge triple (s, r, o) = bind(bind(s, r), o).""" return cls.bind(cls.bind(subject, relation), obj) @classmethod def create_analogy_query(cls, a: np.ndarray, b: np.ndarray, c: np.ndarray) -> np.ndarray: """Create analogy query: a:b :: c:? Relation R = bind(a, b) [XOR extracts the difference] Query = bind(c, R) [apply same relation to c] """ relation = cls.bind(a, b) return cls.bind(c, relation) # ══════════════════════════════════════════════════════════════════════════════ # Hybrid Binding Engine # ══════════════════════════════════════════════════════════════════════════════ class BindingEngine: """ Unified binding engine that supports both binary and real-valued operations. The engine maintains a concept codebook (binary vectors for fast routing) and can convert between binary and real domains for FFT operations. """ def __init__(self, dim: int = N_BITS, use_binary: bool = True): self.dim = dim self.use_binary = use_binary self.binary = BinaryBinding() self.hrr = HRRBinding(dim) # Concept codebook: name → binary vector self._codebook: dict = {} def register_concept(self, name: str, vector: Optional[np.ndarray] = None) -> np.ndarray: """Register a named concept with a binary vector.""" if vector is None: vector = random_binary_vector() self._codebook[name] = vector.copy() return vector def get_concept(self, name: str) -> Optional[np.ndarray]: """Get binary vector for a named concept.""" return self._codebook.get(name) def bind(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: """Bind two vectors using the configured method.""" if self.use_binary and a.dtype == np.uint64: return self.binary.bind(a, b) return self.hrr.bind(a, b) def unbind(self, c: np.ndarray, b: np.ndarray) -> np.ndarray: """Unbind: recover A from C = bind(A, B) given B.""" if self.use_binary and c.dtype == np.uint64: return self.binary.unbind(c, b) return self.hrr.unbind(c, b) def bundle(self, *vectors: np.ndarray) -> np.ndarray: """Bundle multiple vectors.""" if self.use_binary and vectors[0].dtype == np.uint64: return self.binary.bundle(*vectors) return self.hrr.bundle(*vectors) def similarity(self, a: np.ndarray, b: np.ndarray) -> float: """Compute similarity between vectors.""" if self.use_binary and a.dtype == np.uint64: return self.binary.similarity(a, b) return self.hrr.similarity(a, b) def encode_relation(self, subject: str, relation: str, obj: str) -> np.ndarray: """Encode a semantic relation between named concepts. Auto-registers unknown concepts. """ for name in [subject, relation, obj]: if name not in self._codebook: self.register_concept(name) s = self._codebook[subject] r = self._codebook[relation] o = self._codebook[obj] if self.use_binary: return self.binary.encode_triple(s, r, o) return self.hrr.encode_triple( self._to_real(s), self._to_real(r), self._to_real(o) ) def solve_analogy(self, a: str, b: str, c: str, candidates: Optional[List[str]] = None) -> List[Tuple[str, float]]: """Solve analogy a:b :: c:? Returns ranked candidates with similarity scores. """ va = self._codebook.get(a) vb = self._codebook.get(b) vc = self._codebook.get(c) if va is None or vb is None or vc is None: raise ValueError(f"Unknown concept(s): {a}, {b}, {c}") if self.use_binary: query = self.binary.create_analogy_query(va, vb, vc) else: query = self.hrr.unbind( self.hrr.bind(self._to_real(vb), self._to_real(vc)), self._to_real(va) ) # Search candidates search_names = candidates or list(self._codebook.keys()) results = [] for name in search_names: vec = self._codebook[name] if self.use_binary: sim = self.binary.similarity(query, vec) else: sim = self.hrr.similarity(query, self._to_real(vec)) results.append((name, sim)) results.sort(key=lambda x: x[1], reverse=True) return results def _to_real(self, binary_vec: np.ndarray) -> np.ndarray: """Convert packed binary vector to real-valued ±1 vector.""" bits = np.unpackbits(binary_vec.view(np.uint8)).astype(np.float32) return (2.0 * bits - 1.0) # {0,1} → {-1, +1} def _to_binary(self, real_vec: np.ndarray) -> np.ndarray: """Convert real-valued vector to packed binary (threshold at 0).""" bits = (real_vec > 0).astype(np.uint8) return np.packbits(bits).view(np.uint64).copy()