""" Sparse Address Table (SAT) - Mémoire adaptative à haute dimension Vecteurs binaires de 4096 bits stockés sous forme de tableaux booléens uint8. Supporte : - Création dynamique de vecteurs pour configurations récurrentes - Fusion/spécialisation de vecteurs - Réorganisation locale pour cohérence sémantique - Pruning contrôlé et normalisation """ import numpy as np from numba import njit, prange from typing import List, Dict, Tuple, Optional, Set import logging logger = logging.getLogger(__name__) VECTOR_SIZE = 4096 SLICE_SIZE = 64 # Pour opérations SIMD-friendly NUM_SLICES = VECTOR_SIZE // SLICE_SIZE @njit(parallel=True, cache=True) def hamming_distance_batch(query: np.ndarray, table: np.ndarray) -> np.ndarray: """ Calcule les distances de Hamming entre un vecteur requête et tous les vecteurs de la table. Optimisé avec parallélisation numba. Args: query: (VECTOR_SIZE,) uint8 binaire table: (N, VECTOR_SIZE) uint8 binaire Returns: distances: (N,) int32 Hamming distances """ N = table.shape[0] distances = np.empty(N, dtype=np.int32) for i in prange(N): dist = 0 for j in range(VECTOR_SIZE): dist += query[j] ^ table[i, j] distances[i] = dist return distances @njit(cache=True) def bitwise_and_popcount(a: np.ndarray, b: np.ndarray) -> int: """Compte les bits communs entre deux vecteurs binaires.""" count = 0 for i in range(len(a)): count += a[i] & b[i] return count class VectorMetadata: """Métadonnées associées à un vecteur de la SAT.""" def __init__(self, vector_id: int, creation_context: Optional[np.ndarray] = None): self.id = vector_id self.creation_time = 0 self.last_access = 0 self.access_count = 0 self.energy_history: List[float] = [] self.coactivation_neighbors: Dict[int, float] = {} # id -> weight self.abstraction_level = 0 # 0 = concret, >0 = abstrait self.merged_from: Optional[List[int]] = None self.specialized_from: Optional[int] = None self.creation_context = creation_context # snapshot du contexte à la création self.stability_score = 1.0 def record_access(self, time_step: int, energy: float): self.last_access = time_step self.access_count += 1 self.energy_history.append(energy) if len(self.energy_history) > 100: self.energy_history = self.energy_history[-100:] def update_coactivation(self, neighbor_id: int, strength: float, decay: float = 0.99): """Met à jour le poids de coactivation avec un autre vecteur.""" if neighbor_id in self.coactivation_neighbors: self.coactivation_neighbors[neighbor_id] = ( decay * self.coactivation_neighbors[neighbor_id] + (1 - decay) * strength ) else: self.coactivation_neighbors[neighbor_id] = strength @property def average_energy(self) -> float: if not self.energy_history: return 0.0 return np.mean(self.energy_history[-20:]) @property def usage_score(self) -> float: """Score combiné accès et stabilité pour décider pruning/fusion.""" recency = 1.0 / (1.0 + 0.001 * (self.last_access - self.creation_time)) return np.log1p(self.access_count) * recency * self.stability_score class SparseAddressTable: """ Table d'adresses sparse adaptative. Stocke N vecteurs binaires de 4096 bits avec métadonnées dynamiques. Supporte création, fusion, spécialisation, réorganisation locale. """ def __init__( self, initial_capacity: int = 1000, max_capacity: int = 50000, sparsity_target: float = 0.05, # ~200 bits actifs sur 4096 creation_threshold: float = 0.3, # Distance relative pour création fusion_threshold: float = 0.05, # Distance relative pour fusion pruning_threshold: float = 0.01, # Score usage minimum ): self.vector_size = VECTOR_SIZE self.sparsity_target = sparsity_target self.target_active = int(VECTOR_SIZE * sparsity_target) self.creation_threshold = int(VECTOR_SIZE * creation_threshold) self.fusion_threshold = int(VECTOR_SIZE * fusion_threshold) self.pruning_threshold = pruning_threshold self.max_capacity = max_capacity # Données self.vectors: np.ndarray = np.zeros((initial_capacity, VECTOR_SIZE), dtype=np.uint8) self.metadata: Dict[int, VectorMetadata] = {} self.active_mask: np.ndarray = np.zeros(initial_capacity, dtype=bool) self.next_id = 0 self.time_step = 0 # Stats self.stats = { 'creations': 0, 'fusions': 0, 'specializations': 0, 'prunings': 0, 'reorganizations': 0, } @property def size(self) -> int: return int(np.sum(self.active_mask)) @property def active_vectors(self) -> np.ndarray: """Retourne uniquement les vecteurs actifs.""" return self.vectors[self.active_mask] @property def active_ids(self) -> List[int]: return [i for i, m in self.metadata.items() if self.active_mask[i]] def _expand_if_needed(self): """Agrandit le stockage si nécessaire.""" if self.size >= self.vectors.shape[0] - 10: new_size = min(int(self.vectors.shape[0] * 1.5), self.max_capacity) if new_size > self.vectors.shape[0]: new_vectors = np.zeros((new_size, VECTOR_SIZE), dtype=np.uint8) new_vectors[:self.vectors.shape[0]] = self.vectors self.vectors = new_vectors new_mask = np.zeros(new_size, dtype=bool) new_mask[:self.active_mask.shape[0]] = self.active_mask self.active_mask = new_mask def _create_sparse_vector(self, seed_context: Optional[np.ndarray] = None) -> np.ndarray: """Crée un nouveau vecteur sparse aléatoire avec sparsité cible.""" vec = np.zeros(VECTOR_SIZE, dtype=np.uint8) if seed_context is not None: # Biaisé par le contexte n_from_context = min(self.target_active // 2, np.sum(seed_context)) if n_from_context > 0: active_indices = np.where(seed_context)[0] chosen = np.random.choice(active_indices, size=n_from_context, replace=False) vec[chosen] = 1 # Complète aléatoirement remaining = self.target_active - n_from_context if remaining > 0: zero_indices = np.where(vec == 0)[0] if len(zero_indices) >= remaining: chosen = np.random.choice(zero_indices, size=remaining, replace=False) vec[chosen] = 1 else: # Purement aléatoire indices = np.random.choice(VECTOR_SIZE, size=self.target_active, replace=False) vec[indices] = 1 return vec def create_vector( self, context: Optional[np.ndarray] = None, abstraction_level: int = 0, metadata_override: Optional[Dict] = None ) -> int: """ Crée un nouveau vecteur dans la table. Returns: vector_id """ self._expand_if_needed() # Trouve le premier slot libre free_slots = np.where(~self.active_mask)[0] if len(free_slots) == 0: # Forced pruning self.prune_weakest(0.1) free_slots = np.where(~self.active_mask)[0] if len(free_slots) == 0: raise RuntimeError("SAT pleine, impossible de créer un vecteur") idx = free_slots[0] vec = self._create_sparse_vector(context) self.vectors[idx] = vec self.active_mask[idx] = True meta = VectorMetadata(self.next_id, creation_context=context.copy() if context is not None else None) meta.creation_time = self.time_step meta.abstraction_level = abstraction_level if metadata_override: for k, v in metadata_override.items(): setattr(meta, k, v) self.metadata[idx] = meta vector_id = self.next_id self.next_id += 1 self.stats['creations'] += 1 logger.debug(f"Created vector {vector_id} at index {idx}") return vector_id def find_nearest( self, query: np.ndarray, k: int = 5, exclude_id: Optional[int] = None ) -> List[Tuple[int, float, int]]: """ Trouve les k vecteurs les plus proches par distance de Hamming. Returns: List of (vector_id, distance, index) """ active = self.active_vectors if len(active) == 0: return [] distances = hamming_distance_batch(query, active) active_indices = np.where(self.active_mask)[0] # Trie sorted_idx = np.argsort(distances)[:k] results = [] for si in sorted_idx: idx = active_indices[si] meta = self.metadata[idx] if exclude_id is not None and meta.id == exclude_id: continue results.append((meta.id, float(distances[si]), idx)) return results def query_or_create( self, pattern: np.ndarray, min_distance_threshold: Optional[float] = None ) -> Tuple[int, int, bool]: """ Requête : si un vecteur proche existe, le retourne. Sinon, crée un nouveau vecteur. Returns: (vector_id, index, created) """ threshold = min_distance_threshold or self.creation_threshold nearest = self.find_nearest(pattern, k=1) if nearest and nearest[0][1] < threshold: vid, dist, idx = nearest[0] meta = self.metadata[idx] meta.record_access(self.time_step, energy=dist) return vid, idx, False # Crée un nouveau vecteur vid = self.create_vector(context=pattern) # Trouve son index for idx, meta in self.metadata.items(): if meta.id == vid: return vid, idx, True return vid, -1, True def fuse_vectors(self, id1: int, id2: int) -> Optional[int]: """ Fusionne deux vecteurs proches en un nouveau vecteur. Retourne l'ID du vecteur fusionné. """ # Trouve les indices idx1 = idx2 = -1 for idx, meta in self.metadata.items(): if meta.id == id1: idx1 = idx elif meta.id == id2: idx2 = idx if idx1 == -1 or idx2 == -1: return None v1 = self.vectors[idx1] v2 = self.vectors[idx2] # Distance de Hamming dist = np.sum(v1 != v2) if dist > self.fusion_threshold * 3: logger.debug(f"Vectors {id1} and {id2} too far ({dist}), skip fusion") return None # Fusion : intersection majoritaire merged = (v1 & v2) | (np.random.random(VECTOR_SIZE) < 0.5) & (v1 | v2) merged = merged.astype(np.uint8) # Ajuste la sparsité active_count = np.sum(merged) if active_count > self.target_active * 1.2: excess = active_count - self.target_active ones = np.where(merged)[0] to_remove = np.random.choice(ones, size=excess, replace=False) merged[to_remove] = 0 elif active_count < self.target_active * 0.8: deficit = self.target_active - active_count zeros = np.where(merged == 0)[0] to_add = np.random.choice(zeros, size=deficit, replace=False) merged[to_add] = 1 # Crée le nouveau vecteur new_vid = self.create_vector(context=merged) new_idx = -1 for idx, meta in self.metadata.items(): if meta.id == new_vid: new_idx = idx meta.merged_from = [id1, id2] meta.stability_score = 0.5 # Nouveau, pas encore stabilisé break # Marque les anciens comme inactifs self.active_mask[idx1] = False self.active_mask[idx2] = False del self.metadata[idx1] del self.metadata[idx2] self.stats['fusions'] += 1 logger.info(f"Fused {id1} + {id2} -> {new_vid}") return new_vid def specialize_vector(self, vector_id: int, context: np.ndarray, strength: float = 0.3) -> Optional[int]: """ Crée une spécialisation d'un vecteur existant dans un contexte spécifique. """ idx = -1 for i, meta in self.metadata.items(): if meta.id == vector_id: idx = i break if idx == -1: return None original = self.vectors[idx] # Mélange avec le contexte specialized = original.copy() context_active = np.where(context)[0] n_to_flip = int(len(context_active) * strength) if n_to_flip > 0: to_flip = np.random.choice(context_active, size=n_to_flip, replace=False) specialized[to_flip] = 1 # Ajuste sparsité active = np.sum(specialized) if active > self.target_active * 1.2: ones = np.where(specialized)[0] excess = active - self.target_active to_remove = np.random.choice(ones, size=excess, replace=False) specialized[to_remove] = 0 new_vid = self.create_vector(context=specialized) for i, meta in self.metadata.items(): if meta.id == new_vid: meta.specialized_from = vector_id meta.abstraction_level = self.metadata[idx].abstraction_level + 1 break self.stats['specializations'] += 1 return new_vid def local_reorganization(self, center_idx: int, radius: int = 5): """ Réorganise localement l'espace autour d'un vecteur central. Déplace les vecteurs sémantiquement proches plus près dans l'espace en ajustant légèrement leurs patterns. """ if not self.active_mask[center_idx]: return center_vec = self.vectors[center_idx] active_indices = np.where(self.active_mask)[0] # Distance aux autres vecteurs actifs distances = [] for idx in active_indices: if idx == center_idx: continue dist = np.sum(center_vec != self.vectors[idx]) distances.append((idx, dist)) # Trie par distance distances.sort(key=lambda x: x[1]) # Pour les vecteurs dans le voisinage proche, ajuste légèrement n_neighbors = min(radius, len(distances)) for i in range(n_neighbors): idx, dist = distances[i] neighbor_vec = self.vectors[idx] # Différence diff = center_vec != neighbor_vec diff_indices = np.where(diff)[0] if len(diff_indices) > 0: # Fait converger légèrement vers le centre n_to_converge = max(1, len(diff_indices) // 10) to_converge = np.random.choice(diff_indices, size=n_to_converge, replace=False) self.vectors[idx, to_converge] = center_vec[to_converge] # Met à jour les métadonnées self.metadata[idx].stability_score *= 0.95 self.stats['reorganizations'] += 1 def prune_weakest(self, fraction: float = 0.05): """ Supprime les vecteurs les moins utilisés. """ if self.size == 0: return scores = [] for idx, meta in self.metadata.items(): scores.append((idx, meta.usage_score)) scores.sort(key=lambda x: x[1]) n_to_prune = max(1, int(len(scores) * fraction)) for idx, _ in scores[:n_to_prune]: if self.active_mask[idx]: self.active_mask[idx] = False del self.metadata[idx] self.stats['prunings'] += 1 logger.info(f"Pruned {n_to_prune} weak vectors") def detect_frequent_patterns(self, trajectory: List[np.ndarray], min_frequency: int = 3) -> List[np.ndarray]: """ Détecte les motifs fréquents dans une trajectoire de vecteurs. Retourne des patterns candidats pour abstraction. """ if len(trajectory) < min_frequency: return [] # Cherche les sous-ensembles qui apparaissent fréquemment # Simplifié : cherche les bits qui sont souvent actifs ensemble trajectory_array = np.array(trajectory) frequency = np.mean(trajectory_array, axis=0) # Bits fréquemment actifs (>70% de la trajectoire) common_bits = np.where(frequency > 0.7)[0] if len(common_bits) < self.target_active // 4: return [] # Crée un pattern abstrait pattern = np.zeros(VECTOR_SIZE, dtype=np.uint8) # Sélectionne un sous-ensemble des bits communs n_select = min(self.target_active, len(common_bits)) selected = np.random.choice(common_bits, size=n_select, replace=False) pattern[selected] = 1 return [pattern] def tick(self): """Incrémente le compteur de temps et effectue maintenance périodique.""" self.time_step += 1 # Maintenance périodique if self.time_step % 1000 == 0: self.prune_weakest(0.02) if self.time_step % 500 == 0 and self.size > 100: # Fusion périodique des paires très proches self._periodic_fusion() def _periodic_fusion(self): """Fusionne automatiquement les paires de vecteurs très proches.""" active = np.where(self.active_mask)[0] if len(active) < 2: return # Échantillonne aléatoirement pour efficacité O(N) au lieu de O(N²) sample_size = min(50, len(active)) sampled = np.random.choice(active, size=sample_size, replace=False) fused = set() for i, idx1 in enumerate(sampled): if idx1 in fused: continue for idx2 in sampled[i+1:]: if idx2 in fused: continue dist = np.sum(self.vectors[idx1] != self.vectors[idx2]) if dist < self.fusion_threshold: id1 = self.metadata[idx1].id id2 = self.metadata[idx2].id new_id = self.fuse_vectors(id1, id2) if new_id is not None: fused.add(idx1) fused.add(idx2) break def get_stats(self) -> Dict: """Retourne les statistiques de la mémoire.""" stats = self.stats.copy() stats['size'] = self.size stats['capacity'] = self.vectors.shape[0] stats['time_step'] = self.time_step if self.size > 0: active = self.active_vectors stats['avg_sparsity'] = float(np.mean(np.sum(active, axis=1)) / VECTOR_SIZE) stats['avg_usage'] = float(np.mean([m.usage_score for m in self.metadata.values()])) return stats