| """ |
| Binding via Convolution Circulaire |
| |
| Le binding est une opération fondamentale pour composer et décomposer des |
| représentations distribuées. On utilise la convolution circulaire qui : |
| - Est commutative dans le domaine fréquentiel (FFT) |
| - Permet binding/unbinding par multiplication/déconvolution |
| - Supporte la composition de structures |
| |
| Optimisé avec numpy FFT et support pour binding chaîné. |
| """ |
|
|
| import numpy as np |
| from numba import njit, prange, complex128, float64 |
| from typing import List, Tuple, Optional, Dict |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| VECTOR_SIZE = 4096 |
|
|
|
|
| class CircularBinder: |
| """ |
| Binder circulaire pour composition/décomposition de vecteurs. |
| |
| Le binding utilise la convolution circulaire qui peut être calculée |
| efficacement via FFT : |
| a ⊗ b = IFFT(FFT(a) * FFT(b)) |
| |
| Avantages : |
| - Commutatif : a ⊗ b = b ⊗ a |
| - Associatif pour composition chaînée |
| - Unbinding par déconvolution (division dans le domaine fréquentiel) |
| """ |
| |
| def __init__(self, noise_tolerance: float = 0.1): |
| self.noise_tolerance = noise_tolerance |
| |
| |
| self.fft_cache: Dict[int, np.ndarray] = {} |
| self.cache_limit = 1000 |
| |
| |
| self.bind_count = 0 |
| self.unbind_count = 0 |
| |
| def _normalize(self, vec: np.ndarray) -> np.ndarray: |
| """ |
| Normalise un vecteur pour le binding. |
| Convertit binaire en {-1, 1} pour la convolution. |
| """ |
| |
| return 2.0 * vec.astype(np.float64) - 1.0 |
| |
| def _denormalize(self, vec: np.ndarray, threshold: float = 0.0) -> np.ndarray: |
| """Reconvertit en binaire.""" |
| return (vec > threshold).astype(np.uint8) |
| |
| def bind(self, a: np.ndarray, b: np.ndarray, use_fft: bool = True) -> np.ndarray: |
| """ |
| Binding de deux vecteurs : a ⊗ b. |
| |
| Args: |
| a, b: vecteurs binaires (4096,) uint8 |
| use_fft: utilise FFT pour efficacité |
| |
| Returns: |
| vecteur binaire résultant |
| """ |
| if use_fft: |
| |
| a_norm = self._normalize(a) |
| b_norm = self._normalize(b) |
| |
| fft_a = np.fft.fft(a_norm) |
| fft_b = np.fft.fft(b_norm) |
| |
| fft_result = fft_a * fft_b |
| result = np.fft.ifft(fft_result).real |
| |
| |
| result = result / (np.std(result) + 1e-8) |
| return self._denormalize(result) |
| else: |
| |
| a_norm = self._normalize(a).astype(np.float64) |
| b_norm = self._normalize(b).astype(np.float64) |
| result = np.convolve(a_norm, b_norm, mode='same') |
| result = result / (np.std(result) + 1e-8) |
| return self._denormalize(result) |
| |
| def bind_multiple(self, vectors: List[np.ndarray], use_cache: bool = True) -> np.ndarray: |
| """ |
| Binding de plusieurs vecteurs : a ⊗ b ⊗ c ⊗ ... |
| Utilise le cache FFT pour éviter de recalculer les FFT. |
| """ |
| if len(vectors) == 0: |
| return np.ones(VECTOR_SIZE, dtype=np.uint8) |
| |
| if len(vectors) == 1: |
| return vectors[0].copy() |
| |
| |
| if use_cache: |
| ids = [id(v) for v in vectors] |
| fft_vec = np.ones(VECTOR_SIZE, dtype=np.complex128) |
| |
| for vid, v in zip(ids, vectors): |
| if vid in self.fft_cache: |
| fft_v = self.fft_cache[vid] |
| else: |
| v_norm = self._normalize(v) |
| fft_v = np.fft.fft(v_norm) |
| if len(self.fft_cache) < self.cache_limit: |
| self.fft_cache[vid] = fft_v |
| fft_vec *= fft_v |
| |
| result = np.fft.ifft(fft_vec).real |
| result = result / (np.std(result) + 1e-8) |
| return self._denormalize(result) |
| else: |
| |
| result = vectors[0].copy() |
| for v in vectors[1:]: |
| result = self.bind(result, v) |
| return result |
| |
| def unbind(self, bound: np.ndarray, a: np.ndarray, use_fft: bool = True) -> np.ndarray: |
| """ |
| Déconvolution : résout bound = a ⊗ b pour trouver b. |
| |
| Dans le domaine fréquentiel : b = IFFT(FFT(bound) / FFT(a)) |
| """ |
| if use_fft: |
| bound_norm = self._normalize(bound) |
| a_norm = self._normalize(a) |
| |
| fft_bound = np.fft.fft(bound_norm) |
| fft_a = np.fft.fft(a_norm) |
| |
| |
| fft_a = np.where(np.abs(fft_a) < 1e-8, 1e-8, fft_a) |
| fft_result = fft_bound / fft_a |
| |
| result = np.fft.ifft(fft_result).real |
| result = result / (np.std(result) + 1e-8) |
| return self._denormalize(result) |
| else: |
| |
| bound_norm = self._normalize(bound).astype(np.float64) |
| a_norm = self._normalize(a).astype(np.float64) |
| |
| |
| result = np.correlate(bound_norm, a_norm, mode='same') |
| result = result / (np.std(result) + 1e-8) |
| return self._denormalize(result) |
| |
| def bind_role_filler(self, role: np.ndarray, filler: np.ndarray) -> np.ndarray: |
| """ |
| Binding spécialisé role-filler (structure propositionnelle). |
| Utile pour représenter "sujet-agent", "objet-patient", etc. |
| """ |
| |
| shifted_filler = np.roll(filler, VECTOR_SIZE // 4) |
| return self.bind(role, shifted_filler) |
| |
| def unbind_role_filler(self, bound: np.ndarray, role: np.ndarray) -> np.ndarray: |
| """Extrait le filler d'un binding role-filler.""" |
| unshifted = self.unbind(bound, role) |
| return np.roll(unshifted, -VECTOR_SIZE // 4) |
| |
| def extract_similar( |
| self, |
| bound: np.ndarray, |
| candidates: List[np.ndarray], |
| top_k: int = 3 |
| ) -> List[Tuple[np.ndarray, float]]: |
| """ |
| Extrait les candidats les plus similaires à partir d'un vecteur bound. |
| Utile pour "décoder" une structure composée. |
| """ |
| results = [] |
| for cand in candidates: |
| |
| decoded = self.unbind(bound, cand) |
| |
| best_sim = 0.0 |
| for other in candidates: |
| sim = np.mean(decoded == other) |
| if sim > best_sim: |
| best_sim = sim |
| results.append((cand, best_sim)) |
| |
| results.sort(key=lambda x: -x[1]) |
| return results[:top_k] |
| |
| def compose_structure( |
| self, |
| role_filler_pairs: List[Tuple[np.ndarray, np.ndarray]] |
| ) -> np.ndarray: |
| """ |
| Compose une structure complète à partir de paires role-filler. |
| Ex: [(agent, john), (action, run), (patient, ball)] |
| """ |
| bound_pairs = [] |
| for role, filler in role_filler_pairs: |
| bound_pairs.append(self.bind_role_filler(role, filler)) |
| |
| |
| result = np.zeros(VECTOR_SIZE, dtype=np.float64) |
| for bp in bound_pairs: |
| result += self._normalize(bp) |
| |
| result = result / (np.std(result) + 1e-8) |
| return self._denormalize(result) |
| |
| def decompose_structure( |
| self, |
| composite: np.ndarray, |
| roles: List[np.ndarray] |
| ) -> List[np.ndarray]: |
| """ |
| Décompose une structure en ses fillers. |
| """ |
| fillers = [] |
| for role in roles: |
| filler = self.unbind_role_filler(composite, role) |
| fillers.append(filler) |
| return fillers |
| |
| def get_stats(self) -> Dict: |
| return { |
| 'bind_count': self.bind_count, |
| 'unbind_count': self.unbind_count, |
| 'fft_cache_size': len(self.fft_cache), |
| } |