MLE-Morpho-Logic-Engine / mle /binding /semantic_binding.py
Harry00's picture
feat: complete MLE engine implementation
ebaf2ce verified
"""
MLE Binding Module: Semantic Binding Operations
=================================================
Implements circular convolution-based binding for composing and
decomposing semantic relations between hyperdimensional vectors.
Two implementations:
1. FFT-based (HRR): High precision, O(N log N), works on real-valued vectors
2. Binary (BSC): O(N/64) via XOR, works directly on packed uint64 vectors
The binding operation creates a new vector C = bind(A, B) such that:
- C is quasi-orthogonal to both A and B
- unbind(C, B) β‰ˆ A (recoverable)
- bind is commutative and associative
This enables representing structured relations:
- "cat IS_A animal" β†’ bind(cat, IS_A) stores a trace recoverable with animal
- Analogies: unbind(bind(king, male), bind(queen, female)) β‰ˆ identity
"""
import numpy as np
from typing import Optional, Tuple, List
import logging
from ..utils.simd_ops import (
N_BITS, N_WORDS,
xor_vectors, random_binary_vector, random_binary_vectors,
hamming_distance, hamming_similarity, majority_vote, popcount
)
logger = logging.getLogger(__name__)
# ══════════════════════════════════════════════════════════════════════════════
# FFT-based Circular Convolution (Holographic Reduced Representations)
# ══════════════════════════════════════════════════════════════════════════════
class HRRBinding:
"""
Holographic Reduced Representations via circular convolution.
Operates on real-valued vectors of dimension N.
Uses numpy.fft for O(N log N) binding/unbinding.
Properties:
- bind(A, B) = circular_conv(A, B) via IFFT(FFT(A) * FFT(B))
- unbind(C, B) = circular_corr(C, B) via IFFT(FFT(C) * conj(FFT(B)))
- Similarity-preserving: cos(A, B) relates to cos(bind(A,X), bind(B,X))
"""
def __init__(self, dim: int = N_BITS):
self.dim = dim
# Pre-allocate FFT workspace
self._fft_len = dim # Use full-length FFT
@staticmethod
def random_vector(dim: int = N_BITS) -> np.ndarray:
"""Generate a random unit-length HRR vector."""
v = np.random.randn(dim).astype(np.float32)
v /= np.linalg.norm(v)
return v
@staticmethod
def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Circular convolution: bind two vectors.
C = IFFT(FFT(A) βŠ™ FFT(B))
"""
A = np.fft.rfft(a)
B = np.fft.rfft(b)
return np.fft.irfft(A * B, n=len(a)).astype(np.float32)
@staticmethod
def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Circular correlation: recover A from C = bind(A, B).
A β‰ˆ IFFT(FFT(C) βŠ™ conj(FFT(B)))
"""
C = np.fft.rfft(c)
B = np.fft.rfft(b)
return np.fft.irfft(C * np.conj(B), n=len(c)).astype(np.float32)
@staticmethod
def bundle(*vectors: np.ndarray) -> np.ndarray:
"""Superposition (sum + normalize) of multiple HRR vectors."""
s = np.sum(vectors, axis=0)
norm = np.linalg.norm(s)
if norm > 1e-8:
s /= norm
return s.astype(np.float32)
@staticmethod
def similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between HRR vectors."""
dot = np.dot(a, b)
na = np.linalg.norm(a)
nb = np.linalg.norm(b)
if na < 1e-8 or nb < 1e-8:
return 0.0
return float(dot / (na * nb))
@staticmethod
def permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Cyclic permutation (for positional encoding / sequence ordering)."""
return np.roll(v, shift).astype(np.float32)
@staticmethod
def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Inverse cyclic permutation."""
return np.roll(v, -shift).astype(np.float32)
@classmethod
def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray:
"""Bind a sequence with positional encoding via permutation.
S = Ξ£_i permute(V_i, i)
Preserves order information.
"""
result = np.zeros_like(vectors[0])
for i, v in enumerate(vectors):
result += cls.permute(v, i)
norm = np.linalg.norm(result)
if norm > 1e-8:
result /= norm
return result.astype(np.float32)
@classmethod
def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray:
"""Encode a role-filler pair: bind(role, filler)."""
return cls.bind(role, filler)
@classmethod
def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray:
"""Extract filler from structure given role: unbind(structure, role)."""
return cls.unbind(structure, role)
@classmethod
def encode_triple(cls, subject: np.ndarray, relation: np.ndarray,
obj: np.ndarray) -> np.ndarray:
"""Encode a knowledge triple (s, r, o).
T = bind(bind(subject, relation), object)
"""
return cls.bind(cls.bind(subject, relation), obj)
# ══════════════════════════════════════════════════════════════════════════════
# Binary Binding (BSC - Binary Spatter Codes)
# ══════════════════════════════════════════════════════════════════════════════
class BinaryBinding:
"""
Binary Spatter Code binding via XOR.
Operates directly on packed uint64 vectors (512 bytes for 4096 bits).
Extremely fast on CPU: single XOR instruction per 64-bit word.
Properties:
- bind(A, B) = A βŠ• B (XOR)
- unbind(C, B) = C βŠ• B = A (XOR is self-inverse β†’ exact recovery!)
- bundle = majority vote
- similarity = normalized Hamming distance
"""
@staticmethod
def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Binary binding via XOR. Self-inverse: bind(bind(a,b), b) = a."""
return xor_vectors(a, b)
@staticmethod
def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Binary unbinding = XOR (since XOR is its own inverse)."""
return xor_vectors(c, b)
@staticmethod
def bundle(*vectors: np.ndarray) -> np.ndarray:
"""Majority-vote bundling. Requires odd number of vectors for tie-breaking."""
if len(vectors) == 1:
return vectors[0].copy()
vecs = np.stack(vectors)
return majority_vote(np.ascontiguousarray(vecs))
@staticmethod
def similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Normalized Hamming similarity [0, 1]."""
return hamming_similarity(a, b)
@staticmethod
def permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Bit-level cyclic permutation for sequence encoding.
Shifts all bits by `shift` positions cyclically.
"""
bits = np.unpackbits(v.view(np.uint8))
shifted = np.roll(bits, shift)
return np.packbits(shifted).view(np.uint64).copy()
@staticmethod
def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Inverse bit-level cyclic permutation."""
bits = np.unpackbits(v.view(np.uint8))
shifted = np.roll(bits, -shift)
return np.packbits(shifted).view(np.uint64).copy()
@classmethod
def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray:
"""Bind a sequence with positional encoding.
S = bundle(permute(V_0, 0), permute(V_1, 1), ..., permute(V_n, n))
"""
positioned = [cls.permute(v, i) for i, v in enumerate(vectors)]
return cls.bundle(*positioned)
@classmethod
def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray:
"""Encode role-filler: bind(role, filler)."""
return cls.bind(role, filler)
@classmethod
def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray:
"""Decode filler from structure given role."""
return cls.unbind(structure, role)
@classmethod
def encode_triple(cls, subject: np.ndarray, relation: np.ndarray,
obj: np.ndarray) -> np.ndarray:
"""Encode knowledge triple (s, r, o) = bind(bind(s, r), o)."""
return cls.bind(cls.bind(subject, relation), obj)
@classmethod
def create_analogy_query(cls, a: np.ndarray, b: np.ndarray,
c: np.ndarray) -> np.ndarray:
"""Create analogy query: a:b :: c:?
Relation R = bind(a, b) [XOR extracts the difference]
Query = bind(c, R) [apply same relation to c]
"""
relation = cls.bind(a, b)
return cls.bind(c, relation)
# ══════════════════════════════════════════════════════════════════════════════
# Hybrid Binding Engine
# ══════════════════════════════════════════════════════════════════════════════
class BindingEngine:
"""
Unified binding engine that supports both binary and real-valued operations.
The engine maintains a concept codebook (binary vectors for fast routing)
and can convert between binary and real domains for FFT operations.
"""
def __init__(self, dim: int = N_BITS, use_binary: bool = True):
self.dim = dim
self.use_binary = use_binary
self.binary = BinaryBinding()
self.hrr = HRRBinding(dim)
# Concept codebook: name β†’ binary vector
self._codebook: dict = {}
def register_concept(self, name: str, vector: Optional[np.ndarray] = None) -> np.ndarray:
"""Register a named concept with a binary vector."""
if vector is None:
vector = random_binary_vector()
self._codebook[name] = vector.copy()
return vector
def get_concept(self, name: str) -> Optional[np.ndarray]:
"""Get binary vector for a named concept."""
return self._codebook.get(name)
def bind(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Bind two vectors using the configured method."""
if self.use_binary and a.dtype == np.uint64:
return self.binary.bind(a, b)
return self.hrr.bind(a, b)
def unbind(self, c: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Unbind: recover A from C = bind(A, B) given B."""
if self.use_binary and c.dtype == np.uint64:
return self.binary.unbind(c, b)
return self.hrr.unbind(c, b)
def bundle(self, *vectors: np.ndarray) -> np.ndarray:
"""Bundle multiple vectors."""
if self.use_binary and vectors[0].dtype == np.uint64:
return self.binary.bundle(*vectors)
return self.hrr.bundle(*vectors)
def similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Compute similarity between vectors."""
if self.use_binary and a.dtype == np.uint64:
return self.binary.similarity(a, b)
return self.hrr.similarity(a, b)
def encode_relation(self, subject: str, relation: str, obj: str) -> np.ndarray:
"""Encode a semantic relation between named concepts.
Auto-registers unknown concepts.
"""
for name in [subject, relation, obj]:
if name not in self._codebook:
self.register_concept(name)
s = self._codebook[subject]
r = self._codebook[relation]
o = self._codebook[obj]
if self.use_binary:
return self.binary.encode_triple(s, r, o)
return self.hrr.encode_triple(
self._to_real(s), self._to_real(r), self._to_real(o)
)
def solve_analogy(self, a: str, b: str, c: str,
candidates: Optional[List[str]] = None) -> List[Tuple[str, float]]:
"""Solve analogy a:b :: c:?
Returns ranked candidates with similarity scores.
"""
va = self._codebook.get(a)
vb = self._codebook.get(b)
vc = self._codebook.get(c)
if va is None or vb is None or vc is None:
raise ValueError(f"Unknown concept(s): {a}, {b}, {c}")
if self.use_binary:
query = self.binary.create_analogy_query(va, vb, vc)
else:
query = self.hrr.unbind(
self.hrr.bind(self._to_real(vb), self._to_real(vc)),
self._to_real(va)
)
# Search candidates
search_names = candidates or list(self._codebook.keys())
results = []
for name in search_names:
vec = self._codebook[name]
if self.use_binary:
sim = self.binary.similarity(query, vec)
else:
sim = self.hrr.similarity(query, self._to_real(vec))
results.append((name, sim))
results.sort(key=lambda x: x[1], reverse=True)
return results
def _to_real(self, binary_vec: np.ndarray) -> np.ndarray:
"""Convert packed binary vector to real-valued Β±1 vector."""
bits = np.unpackbits(binary_vec.view(np.uint8)).astype(np.float32)
return (2.0 * bits - 1.0) # {0,1} β†’ {-1, +1}
def _to_binary(self, real_vec: np.ndarray) -> np.ndarray:
"""Convert real-valued vector to packed binary (threshold at 0)."""
bits = (real_vec > 0).astype(np.uint8)
return np.packbits(bits).view(np.uint64).copy()