File size: 14,028 Bytes
ebaf2ce | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 | """
MLE Binding Module: Semantic Binding Operations
=================================================
Implements circular convolution-based binding for composing and
decomposing semantic relations between hyperdimensional vectors.
Two implementations:
1. FFT-based (HRR): High precision, O(N log N), works on real-valued vectors
2. Binary (BSC): O(N/64) via XOR, works directly on packed uint64 vectors
The binding operation creates a new vector C = bind(A, B) such that:
- C is quasi-orthogonal to both A and B
- unbind(C, B) β A (recoverable)
- bind is commutative and associative
This enables representing structured relations:
- "cat IS_A animal" β bind(cat, IS_A) stores a trace recoverable with animal
- Analogies: unbind(bind(king, male), bind(queen, female)) β identity
"""
import numpy as np
from typing import Optional, Tuple, List
import logging
from ..utils.simd_ops import (
N_BITS, N_WORDS,
xor_vectors, random_binary_vector, random_binary_vectors,
hamming_distance, hamming_similarity, majority_vote, popcount
)
logger = logging.getLogger(__name__)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# FFT-based Circular Convolution (Holographic Reduced Representations)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class HRRBinding:
"""
Holographic Reduced Representations via circular convolution.
Operates on real-valued vectors of dimension N.
Uses numpy.fft for O(N log N) binding/unbinding.
Properties:
- bind(A, B) = circular_conv(A, B) via IFFT(FFT(A) * FFT(B))
- unbind(C, B) = circular_corr(C, B) via IFFT(FFT(C) * conj(FFT(B)))
- Similarity-preserving: cos(A, B) relates to cos(bind(A,X), bind(B,X))
"""
def __init__(self, dim: int = N_BITS):
self.dim = dim
# Pre-allocate FFT workspace
self._fft_len = dim # Use full-length FFT
@staticmethod
def random_vector(dim: int = N_BITS) -> np.ndarray:
"""Generate a random unit-length HRR vector."""
v = np.random.randn(dim).astype(np.float32)
v /= np.linalg.norm(v)
return v
@staticmethod
def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Circular convolution: bind two vectors.
C = IFFT(FFT(A) β FFT(B))
"""
A = np.fft.rfft(a)
B = np.fft.rfft(b)
return np.fft.irfft(A * B, n=len(a)).astype(np.float32)
@staticmethod
def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Circular correlation: recover A from C = bind(A, B).
A β IFFT(FFT(C) β conj(FFT(B)))
"""
C = np.fft.rfft(c)
B = np.fft.rfft(b)
return np.fft.irfft(C * np.conj(B), n=len(c)).astype(np.float32)
@staticmethod
def bundle(*vectors: np.ndarray) -> np.ndarray:
"""Superposition (sum + normalize) of multiple HRR vectors."""
s = np.sum(vectors, axis=0)
norm = np.linalg.norm(s)
if norm > 1e-8:
s /= norm
return s.astype(np.float32)
@staticmethod
def similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between HRR vectors."""
dot = np.dot(a, b)
na = np.linalg.norm(a)
nb = np.linalg.norm(b)
if na < 1e-8 or nb < 1e-8:
return 0.0
return float(dot / (na * nb))
@staticmethod
def permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Cyclic permutation (for positional encoding / sequence ordering)."""
return np.roll(v, shift).astype(np.float32)
@staticmethod
def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Inverse cyclic permutation."""
return np.roll(v, -shift).astype(np.float32)
@classmethod
def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray:
"""Bind a sequence with positional encoding via permutation.
S = Ξ£_i permute(V_i, i)
Preserves order information.
"""
result = np.zeros_like(vectors[0])
for i, v in enumerate(vectors):
result += cls.permute(v, i)
norm = np.linalg.norm(result)
if norm > 1e-8:
result /= norm
return result.astype(np.float32)
@classmethod
def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray:
"""Encode a role-filler pair: bind(role, filler)."""
return cls.bind(role, filler)
@classmethod
def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray:
"""Extract filler from structure given role: unbind(structure, role)."""
return cls.unbind(structure, role)
@classmethod
def encode_triple(cls, subject: np.ndarray, relation: np.ndarray,
obj: np.ndarray) -> np.ndarray:
"""Encode a knowledge triple (s, r, o).
T = bind(bind(subject, relation), object)
"""
return cls.bind(cls.bind(subject, relation), obj)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Binary Binding (BSC - Binary Spatter Codes)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class BinaryBinding:
"""
Binary Spatter Code binding via XOR.
Operates directly on packed uint64 vectors (512 bytes for 4096 bits).
Extremely fast on CPU: single XOR instruction per 64-bit word.
Properties:
- bind(A, B) = A β B (XOR)
- unbind(C, B) = C β B = A (XOR is self-inverse β exact recovery!)
- bundle = majority vote
- similarity = normalized Hamming distance
"""
@staticmethod
def bind(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Binary binding via XOR. Self-inverse: bind(bind(a,b), b) = a."""
return xor_vectors(a, b)
@staticmethod
def unbind(c: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Binary unbinding = XOR (since XOR is its own inverse)."""
return xor_vectors(c, b)
@staticmethod
def bundle(*vectors: np.ndarray) -> np.ndarray:
"""Majority-vote bundling. Requires odd number of vectors for tie-breaking."""
if len(vectors) == 1:
return vectors[0].copy()
vecs = np.stack(vectors)
return majority_vote(np.ascontiguousarray(vecs))
@staticmethod
def similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Normalized Hamming similarity [0, 1]."""
return hamming_similarity(a, b)
@staticmethod
def permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Bit-level cyclic permutation for sequence encoding.
Shifts all bits by `shift` positions cyclically.
"""
bits = np.unpackbits(v.view(np.uint8))
shifted = np.roll(bits, shift)
return np.packbits(shifted).view(np.uint64).copy()
@staticmethod
def inverse_permute(v: np.ndarray, shift: int = 1) -> np.ndarray:
"""Inverse bit-level cyclic permutation."""
bits = np.unpackbits(v.view(np.uint8))
shifted = np.roll(bits, -shift)
return np.packbits(shifted).view(np.uint64).copy()
@classmethod
def bind_sequence(cls, vectors: List[np.ndarray]) -> np.ndarray:
"""Bind a sequence with positional encoding.
S = bundle(permute(V_0, 0), permute(V_1, 1), ..., permute(V_n, n))
"""
positioned = [cls.permute(v, i) for i, v in enumerate(vectors)]
return cls.bundle(*positioned)
@classmethod
def encode_pair(cls, role: np.ndarray, filler: np.ndarray) -> np.ndarray:
"""Encode role-filler: bind(role, filler)."""
return cls.bind(role, filler)
@classmethod
def decode_filler(cls, structure: np.ndarray, role: np.ndarray) -> np.ndarray:
"""Decode filler from structure given role."""
return cls.unbind(structure, role)
@classmethod
def encode_triple(cls, subject: np.ndarray, relation: np.ndarray,
obj: np.ndarray) -> np.ndarray:
"""Encode knowledge triple (s, r, o) = bind(bind(s, r), o)."""
return cls.bind(cls.bind(subject, relation), obj)
@classmethod
def create_analogy_query(cls, a: np.ndarray, b: np.ndarray,
c: np.ndarray) -> np.ndarray:
"""Create analogy query: a:b :: c:?
Relation R = bind(a, b) [XOR extracts the difference]
Query = bind(c, R) [apply same relation to c]
"""
relation = cls.bind(a, b)
return cls.bind(c, relation)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Hybrid Binding Engine
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class BindingEngine:
"""
Unified binding engine that supports both binary and real-valued operations.
The engine maintains a concept codebook (binary vectors for fast routing)
and can convert between binary and real domains for FFT operations.
"""
def __init__(self, dim: int = N_BITS, use_binary: bool = True):
self.dim = dim
self.use_binary = use_binary
self.binary = BinaryBinding()
self.hrr = HRRBinding(dim)
# Concept codebook: name β binary vector
self._codebook: dict = {}
def register_concept(self, name: str, vector: Optional[np.ndarray] = None) -> np.ndarray:
"""Register a named concept with a binary vector."""
if vector is None:
vector = random_binary_vector()
self._codebook[name] = vector.copy()
return vector
def get_concept(self, name: str) -> Optional[np.ndarray]:
"""Get binary vector for a named concept."""
return self._codebook.get(name)
def bind(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Bind two vectors using the configured method."""
if self.use_binary and a.dtype == np.uint64:
return self.binary.bind(a, b)
return self.hrr.bind(a, b)
def unbind(self, c: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Unbind: recover A from C = bind(A, B) given B."""
if self.use_binary and c.dtype == np.uint64:
return self.binary.unbind(c, b)
return self.hrr.unbind(c, b)
def bundle(self, *vectors: np.ndarray) -> np.ndarray:
"""Bundle multiple vectors."""
if self.use_binary and vectors[0].dtype == np.uint64:
return self.binary.bundle(*vectors)
return self.hrr.bundle(*vectors)
def similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Compute similarity between vectors."""
if self.use_binary and a.dtype == np.uint64:
return self.binary.similarity(a, b)
return self.hrr.similarity(a, b)
def encode_relation(self, subject: str, relation: str, obj: str) -> np.ndarray:
"""Encode a semantic relation between named concepts.
Auto-registers unknown concepts.
"""
for name in [subject, relation, obj]:
if name not in self._codebook:
self.register_concept(name)
s = self._codebook[subject]
r = self._codebook[relation]
o = self._codebook[obj]
if self.use_binary:
return self.binary.encode_triple(s, r, o)
return self.hrr.encode_triple(
self._to_real(s), self._to_real(r), self._to_real(o)
)
def solve_analogy(self, a: str, b: str, c: str,
candidates: Optional[List[str]] = None) -> List[Tuple[str, float]]:
"""Solve analogy a:b :: c:?
Returns ranked candidates with similarity scores.
"""
va = self._codebook.get(a)
vb = self._codebook.get(b)
vc = self._codebook.get(c)
if va is None or vb is None or vc is None:
raise ValueError(f"Unknown concept(s): {a}, {b}, {c}")
if self.use_binary:
query = self.binary.create_analogy_query(va, vb, vc)
else:
query = self.hrr.unbind(
self.hrr.bind(self._to_real(vb), self._to_real(vc)),
self._to_real(va)
)
# Search candidates
search_names = candidates or list(self._codebook.keys())
results = []
for name in search_names:
vec = self._codebook[name]
if self.use_binary:
sim = self.binary.similarity(query, vec)
else:
sim = self.hrr.similarity(query, self._to_real(vec))
results.append((name, sim))
results.sort(key=lambda x: x[1], reverse=True)
return results
def _to_real(self, binary_vec: np.ndarray) -> np.ndarray:
"""Convert packed binary vector to real-valued Β±1 vector."""
bits = np.unpackbits(binary_vec.view(np.uint8)).astype(np.float32)
return (2.0 * bits - 1.0) # {0,1} β {-1, +1}
def _to_binary(self, real_vec: np.ndarray) -> np.ndarray:
"""Convert real-valued vector to packed binary (threshold at 0)."""
bits = (real_vec > 0).astype(np.uint8)
return np.packbits(bits).view(np.uint64).copy()
|