morpho-logic-engine / mle /memory.py
Harry00's picture
Upload mle/memory.py
949842e verified
"""
Sparse Address Table (SAT) - Mémoire adaptative à haute dimension
Vecteurs binaires de 4096 bits stockés sous forme de tableaux booléens uint8.
Supporte :
- Création dynamique de vecteurs pour configurations récurrentes
- Fusion/spécialisation de vecteurs
- Réorganisation locale pour cohérence sémantique
- Pruning contrôlé et normalisation
"""
import numpy as np
from numba import njit, prange
from typing import List, Dict, Tuple, Optional, Set
import logging
logger = logging.getLogger(__name__)
VECTOR_SIZE = 4096
SLICE_SIZE = 64 # Pour opérations SIMD-friendly
NUM_SLICES = VECTOR_SIZE // SLICE_SIZE
@njit(parallel=True, cache=True)
def hamming_distance_batch(query: np.ndarray, table: np.ndarray) -> np.ndarray:
"""
Calcule les distances de Hamming entre un vecteur requête et tous les vecteurs de la table.
Optimisé avec parallélisation numba.
Args:
query: (VECTOR_SIZE,) uint8 binaire
table: (N, VECTOR_SIZE) uint8 binaire
Returns:
distances: (N,) int32 Hamming distances
"""
N = table.shape[0]
distances = np.empty(N, dtype=np.int32)
for i in prange(N):
dist = 0
for j in range(VECTOR_SIZE):
dist += query[j] ^ table[i, j]
distances[i] = dist
return distances
@njit(cache=True)
def bitwise_and_popcount(a: np.ndarray, b: np.ndarray) -> int:
"""Compte les bits communs entre deux vecteurs binaires."""
count = 0
for i in range(len(a)):
count += a[i] & b[i]
return count
class VectorMetadata:
"""Métadonnées associées à un vecteur de la SAT."""
def __init__(self, vector_id: int, creation_context: Optional[np.ndarray] = None):
self.id = vector_id
self.creation_time = 0
self.last_access = 0
self.access_count = 0
self.energy_history: List[float] = []
self.coactivation_neighbors: Dict[int, float] = {} # id -> weight
self.abstraction_level = 0 # 0 = concret, >0 = abstrait
self.merged_from: Optional[List[int]] = None
self.specialized_from: Optional[int] = None
self.creation_context = creation_context # snapshot du contexte à la création
self.stability_score = 1.0
def record_access(self, time_step: int, energy: float):
self.last_access = time_step
self.access_count += 1
self.energy_history.append(energy)
if len(self.energy_history) > 100:
self.energy_history = self.energy_history[-100:]
def update_coactivation(self, neighbor_id: int, strength: float, decay: float = 0.99):
"""Met à jour le poids de coactivation avec un autre vecteur."""
if neighbor_id in self.coactivation_neighbors:
self.coactivation_neighbors[neighbor_id] = (
decay * self.coactivation_neighbors[neighbor_id] + (1 - decay) * strength
)
else:
self.coactivation_neighbors[neighbor_id] = strength
@property
def average_energy(self) -> float:
if not self.energy_history:
return 0.0
return np.mean(self.energy_history[-20:])
@property
def usage_score(self) -> float:
"""Score combiné accès et stabilité pour décider pruning/fusion."""
recency = 1.0 / (1.0 + 0.001 * (self.last_access - self.creation_time))
return np.log1p(self.access_count) * recency * self.stability_score
class SparseAddressTable:
"""
Table d'adresses sparse adaptative.
Stocke N vecteurs binaires de 4096 bits avec métadonnées dynamiques.
Supporte création, fusion, spécialisation, réorganisation locale.
"""
def __init__(
self,
initial_capacity: int = 1000,
max_capacity: int = 50000,
sparsity_target: float = 0.05, # ~200 bits actifs sur 4096
creation_threshold: float = 0.3, # Distance relative pour création
fusion_threshold: float = 0.05, # Distance relative pour fusion
pruning_threshold: float = 0.01, # Score usage minimum
):
self.vector_size = VECTOR_SIZE
self.sparsity_target = sparsity_target
self.target_active = int(VECTOR_SIZE * sparsity_target)
self.creation_threshold = int(VECTOR_SIZE * creation_threshold)
self.fusion_threshold = int(VECTOR_SIZE * fusion_threshold)
self.pruning_threshold = pruning_threshold
self.max_capacity = max_capacity
# Données
self.vectors: np.ndarray = np.zeros((initial_capacity, VECTOR_SIZE), dtype=np.uint8)
self.metadata: Dict[int, VectorMetadata] = {}
self.active_mask: np.ndarray = np.zeros(initial_capacity, dtype=bool)
self.next_id = 0
self.time_step = 0
# Stats
self.stats = {
'creations': 0,
'fusions': 0,
'specializations': 0,
'prunings': 0,
'reorganizations': 0,
}
@property
def size(self) -> int:
return int(np.sum(self.active_mask))
@property
def active_vectors(self) -> np.ndarray:
"""Retourne uniquement les vecteurs actifs."""
return self.vectors[self.active_mask]
@property
def active_ids(self) -> List[int]:
return [i for i, m in self.metadata.items() if self.active_mask[i]]
def _expand_if_needed(self):
"""Agrandit le stockage si nécessaire."""
if self.size >= self.vectors.shape[0] - 10:
new_size = min(int(self.vectors.shape[0] * 1.5), self.max_capacity)
if new_size > self.vectors.shape[0]:
new_vectors = np.zeros((new_size, VECTOR_SIZE), dtype=np.uint8)
new_vectors[:self.vectors.shape[0]] = self.vectors
self.vectors = new_vectors
new_mask = np.zeros(new_size, dtype=bool)
new_mask[:self.active_mask.shape[0]] = self.active_mask
self.active_mask = new_mask
def _create_sparse_vector(self, seed_context: Optional[np.ndarray] = None) -> np.ndarray:
"""Crée un nouveau vecteur sparse aléatoire avec sparsité cible."""
vec = np.zeros(VECTOR_SIZE, dtype=np.uint8)
if seed_context is not None:
# Biaisé par le contexte
n_from_context = min(self.target_active // 2, np.sum(seed_context))
if n_from_context > 0:
active_indices = np.where(seed_context)[0]
chosen = np.random.choice(active_indices, size=n_from_context, replace=False)
vec[chosen] = 1
# Complète aléatoirement
remaining = self.target_active - n_from_context
if remaining > 0:
zero_indices = np.where(vec == 0)[0]
if len(zero_indices) >= remaining:
chosen = np.random.choice(zero_indices, size=remaining, replace=False)
vec[chosen] = 1
else:
# Purement aléatoire
indices = np.random.choice(VECTOR_SIZE, size=self.target_active, replace=False)
vec[indices] = 1
return vec
def create_vector(
self,
context: Optional[np.ndarray] = None,
abstraction_level: int = 0,
metadata_override: Optional[Dict] = None
) -> int:
"""
Crée un nouveau vecteur dans la table.
Returns:
vector_id
"""
self._expand_if_needed()
# Trouve le premier slot libre
free_slots = np.where(~self.active_mask)[0]
if len(free_slots) == 0:
# Forced pruning
self.prune_weakest(0.1)
free_slots = np.where(~self.active_mask)[0]
if len(free_slots) == 0:
raise RuntimeError("SAT pleine, impossible de créer un vecteur")
idx = free_slots[0]
vec = self._create_sparse_vector(context)
self.vectors[idx] = vec
self.active_mask[idx] = True
meta = VectorMetadata(self.next_id, creation_context=context.copy() if context is not None else None)
meta.creation_time = self.time_step
meta.abstraction_level = abstraction_level
if metadata_override:
for k, v in metadata_override.items():
setattr(meta, k, v)
self.metadata[idx] = meta
vector_id = self.next_id
self.next_id += 1
self.stats['creations'] += 1
logger.debug(f"Created vector {vector_id} at index {idx}")
return vector_id
def find_nearest(
self,
query: np.ndarray,
k: int = 5,
exclude_id: Optional[int] = None
) -> List[Tuple[int, float, int]]:
"""
Trouve les k vecteurs les plus proches par distance de Hamming.
Returns:
List of (vector_id, distance, index)
"""
active = self.active_vectors
if len(active) == 0:
return []
distances = hamming_distance_batch(query, active)
active_indices = np.where(self.active_mask)[0]
# Trie
sorted_idx = np.argsort(distances)[:k]
results = []
for si in sorted_idx:
idx = active_indices[si]
meta = self.metadata[idx]
if exclude_id is not None and meta.id == exclude_id:
continue
results.append((meta.id, float(distances[si]), idx))
return results
def query_or_create(
self,
pattern: np.ndarray,
min_distance_threshold: Optional[float] = None
) -> Tuple[int, int, bool]:
"""
Requête : si un vecteur proche existe, le retourne.
Sinon, crée un nouveau vecteur.
Returns:
(vector_id, index, created)
"""
threshold = min_distance_threshold or self.creation_threshold
nearest = self.find_nearest(pattern, k=1)
if nearest and nearest[0][1] < threshold:
vid, dist, idx = nearest[0]
meta = self.metadata[idx]
meta.record_access(self.time_step, energy=dist)
return vid, idx, False
# Crée un nouveau vecteur
vid = self.create_vector(context=pattern)
# Trouve son index
for idx, meta in self.metadata.items():
if meta.id == vid:
return vid, idx, True
return vid, -1, True
def fuse_vectors(self, id1: int, id2: int) -> Optional[int]:
"""
Fusionne deux vecteurs proches en un nouveau vecteur.
Retourne l'ID du vecteur fusionné.
"""
# Trouve les indices
idx1 = idx2 = -1
for idx, meta in self.metadata.items():
if meta.id == id1:
idx1 = idx
elif meta.id == id2:
idx2 = idx
if idx1 == -1 or idx2 == -1:
return None
v1 = self.vectors[idx1]
v2 = self.vectors[idx2]
# Distance de Hamming
dist = np.sum(v1 != v2)
if dist > self.fusion_threshold * 3:
logger.debug(f"Vectors {id1} and {id2} too far ({dist}), skip fusion")
return None
# Fusion : intersection majoritaire
merged = (v1 & v2) | (np.random.random(VECTOR_SIZE) < 0.5) & (v1 | v2)
merged = merged.astype(np.uint8)
# Ajuste la sparsité
active_count = np.sum(merged)
if active_count > self.target_active * 1.2:
excess = active_count - self.target_active
ones = np.where(merged)[0]
to_remove = np.random.choice(ones, size=excess, replace=False)
merged[to_remove] = 0
elif active_count < self.target_active * 0.8:
deficit = self.target_active - active_count
zeros = np.where(merged == 0)[0]
to_add = np.random.choice(zeros, size=deficit, replace=False)
merged[to_add] = 1
# Crée le nouveau vecteur
new_vid = self.create_vector(context=merged)
new_idx = -1
for idx, meta in self.metadata.items():
if meta.id == new_vid:
new_idx = idx
meta.merged_from = [id1, id2]
meta.stability_score = 0.5 # Nouveau, pas encore stabilisé
break
# Marque les anciens comme inactifs
self.active_mask[idx1] = False
self.active_mask[idx2] = False
del self.metadata[idx1]
del self.metadata[idx2]
self.stats['fusions'] += 1
logger.info(f"Fused {id1} + {id2} -> {new_vid}")
return new_vid
def specialize_vector(self, vector_id: int, context: np.ndarray, strength: float = 0.3) -> Optional[int]:
"""
Crée une spécialisation d'un vecteur existant dans un contexte spécifique.
"""
idx = -1
for i, meta in self.metadata.items():
if meta.id == vector_id:
idx = i
break
if idx == -1:
return None
original = self.vectors[idx]
# Mélange avec le contexte
specialized = original.copy()
context_active = np.where(context)[0]
n_to_flip = int(len(context_active) * strength)
if n_to_flip > 0:
to_flip = np.random.choice(context_active, size=n_to_flip, replace=False)
specialized[to_flip] = 1
# Ajuste sparsité
active = np.sum(specialized)
if active > self.target_active * 1.2:
ones = np.where(specialized)[0]
excess = active - self.target_active
to_remove = np.random.choice(ones, size=excess, replace=False)
specialized[to_remove] = 0
new_vid = self.create_vector(context=specialized)
for i, meta in self.metadata.items():
if meta.id == new_vid:
meta.specialized_from = vector_id
meta.abstraction_level = self.metadata[idx].abstraction_level + 1
break
self.stats['specializations'] += 1
return new_vid
def local_reorganization(self, center_idx: int, radius: int = 5):
"""
Réorganise localement l'espace autour d'un vecteur central.
Déplace les vecteurs sémantiquement proches plus près dans l'espace
en ajustant légèrement leurs patterns.
"""
if not self.active_mask[center_idx]:
return
center_vec = self.vectors[center_idx]
active_indices = np.where(self.active_mask)[0]
# Distance aux autres vecteurs actifs
distances = []
for idx in active_indices:
if idx == center_idx:
continue
dist = np.sum(center_vec != self.vectors[idx])
distances.append((idx, dist))
# Trie par distance
distances.sort(key=lambda x: x[1])
# Pour les vecteurs dans le voisinage proche, ajuste légèrement
n_neighbors = min(radius, len(distances))
for i in range(n_neighbors):
idx, dist = distances[i]
neighbor_vec = self.vectors[idx]
# Différence
diff = center_vec != neighbor_vec
diff_indices = np.where(diff)[0]
if len(diff_indices) > 0:
# Fait converger légèrement vers le centre
n_to_converge = max(1, len(diff_indices) // 10)
to_converge = np.random.choice(diff_indices, size=n_to_converge, replace=False)
self.vectors[idx, to_converge] = center_vec[to_converge]
# Met à jour les métadonnées
self.metadata[idx].stability_score *= 0.95
self.stats['reorganizations'] += 1
def prune_weakest(self, fraction: float = 0.05):
"""
Supprime les vecteurs les moins utilisés.
"""
if self.size == 0:
return
scores = []
for idx, meta in self.metadata.items():
scores.append((idx, meta.usage_score))
scores.sort(key=lambda x: x[1])
n_to_prune = max(1, int(len(scores) * fraction))
for idx, _ in scores[:n_to_prune]:
if self.active_mask[idx]:
self.active_mask[idx] = False
del self.metadata[idx]
self.stats['prunings'] += 1
logger.info(f"Pruned {n_to_prune} weak vectors")
def detect_frequent_patterns(self, trajectory: List[np.ndarray], min_frequency: int = 3) -> List[np.ndarray]:
"""
Détecte les motifs fréquents dans une trajectoire de vecteurs.
Retourne des patterns candidats pour abstraction.
"""
if len(trajectory) < min_frequency:
return []
# Cherche les sous-ensembles qui apparaissent fréquemment
# Simplifié : cherche les bits qui sont souvent actifs ensemble
trajectory_array = np.array(trajectory)
frequency = np.mean(trajectory_array, axis=0)
# Bits fréquemment actifs (>70% de la trajectoire)
common_bits = np.where(frequency > 0.7)[0]
if len(common_bits) < self.target_active // 4:
return []
# Crée un pattern abstrait
pattern = np.zeros(VECTOR_SIZE, dtype=np.uint8)
# Sélectionne un sous-ensemble des bits communs
n_select = min(self.target_active, len(common_bits))
selected = np.random.choice(common_bits, size=n_select, replace=False)
pattern[selected] = 1
return [pattern]
def tick(self):
"""Incrémente le compteur de temps et effectue maintenance périodique."""
self.time_step += 1
# Maintenance périodique
if self.time_step % 1000 == 0:
self.prune_weakest(0.02)
if self.time_step % 500 == 0 and self.size > 100:
# Fusion périodique des paires très proches
self._periodic_fusion()
def _periodic_fusion(self):
"""Fusionne automatiquement les paires de vecteurs très proches."""
active = np.where(self.active_mask)[0]
if len(active) < 2:
return
# Échantillonne aléatoirement pour efficacité O(N) au lieu de O(N²)
sample_size = min(50, len(active))
sampled = np.random.choice(active, size=sample_size, replace=False)
fused = set()
for i, idx1 in enumerate(sampled):
if idx1 in fused:
continue
for idx2 in sampled[i+1:]:
if idx2 in fused:
continue
dist = np.sum(self.vectors[idx1] != self.vectors[idx2])
if dist < self.fusion_threshold:
id1 = self.metadata[idx1].id
id2 = self.metadata[idx2].id
new_id = self.fuse_vectors(id1, id2)
if new_id is not None:
fused.add(idx1)
fused.add(idx2)
break
def get_stats(self) -> Dict:
"""Retourne les statistiques de la mémoire."""
stats = self.stats.copy()
stats['size'] = self.size
stats['capacity'] = self.vectors.shape[0]
stats['time_step'] = self.time_step
if self.size > 0:
active = self.active_vectors
stats['avg_sparsity'] = float(np.mean(np.sum(active, axis=1)) / VECTOR_SIZE)
stats['avg_usage'] = float(np.mean([m.usage_score for m in self.metadata.values()]))
return stats