""" MLE System - Intégration complète du Morpho-Logic Engine Orchestre les modules : - memory (SparseAddressTable) - routing (HammingRouter) - binding (CircularBinder) - energy (EnergyLandscape) - inference (InferenceEngine) Ajoute : - Pile sémantique pour traitement hiérarchique - Méta-apprentissage sur la structure même du système - Métriques et monitoring - Stabilisation globale """ import numpy as np from typing import List, Dict, Tuple, Optional, Callable, Any import logging import time import json from .memory import SparseAddressTable, VECTOR_SIZE from .routing import HammingRouter from .binding import CircularBinder from .energy import EnergyLandscape from .inference import InferenceEngine, InferenceResult logger = logging.getLogger(__name__) class SemanticStack: """ Pile sémantique pour traitement hiérarchique. Permet de représenter des structures imbriquées : - Niveau 0 : tokens/bruts - Niveau 1 : chunks/groupes - Niveau 2 : phrases/propositions - Niveau 3+: concepts abstraits """ def __init__(self, max_depth: int = 4): self.max_depth = max_depth self.levels: List[List[int]] = [[] for _ in range(max_depth)] self.level_bindings: Dict[int, Dict[Tuple[int, int], np.ndarray]] = {} def push(self, vector_id: int, level: int = 0): """Ajoute un vecteur à un niveau.""" if 0 <= level < self.max_depth: self.levels[level].append(vector_id) def pop(self, level: int = 0) -> Optional[int]: """Retire le dernier vecteur d'un niveau.""" if 0 <= level < self.max_depth and self.levels[level]: return self.levels[level].pop() return None def bind_level(self, level: int, binder: CircularBinder, memory: SparseAddressTable): """ Combine les vecteurs d'un niveau en un vecteur composite, puis le pousse au niveau supérieur. """ if level >= self.max_depth - 1: return None ids = self.levels[level] if len(ids) < 2: return None # Récupère les vecteurs vectors = [] for vid in ids: for idx, meta in memory.metadata.items(): if meta.id == vid and memory.active_mask[idx]: vectors.append(memory.vectors[idx]) break if len(vectors) < 2: return None # Binding de tous les vecteurs du niveau composite = binder.bind_multiple(vectors) # Stocke le composite self.level_bindings[level] = {} for i, vid in enumerate(ids): for j, vid2 in enumerate(ids[i+1:], i+1): self.level_bindings[level][(vid, vid2)] = composite # Crée un nouveau vecteur pour le composite et le pousse au niveau supérieur new_id = memory.create_vector(context=composite, abstraction_level=level+1) self.levels[level] = [] self.push(new_id, level=level+1) return new_id def get_level_state(self, level: int, memory: SparseAddressTable) -> np.ndarray: """Retourne l'état composite d'un niveau.""" if level >= self.max_depth: return np.zeros(VECTOR_SIZE, dtype=np.uint8) ids = self.levels[level] if not ids: return np.zeros(VECTOR_SIZE, dtype=np.uint8) vectors = [] for vid in ids: for idx, meta in memory.metadata.items(): if meta.id == vid and memory.active_mask[idx]: vectors.append(memory.vectors[idx]) break if not vectors: return np.zeros(VECTOR_SIZE, dtype=np.uint8) # Moyenne binaire mean_vec = np.mean(vectors, axis=0) return (mean_vec > 0.5).astype(np.uint8) def clear(self): """Vide toute la pile.""" self.levels = [[] for _ in range(self.max_depth)] self.level_bindings = {} class MLEMetrics: """Collecte et agrège les métriques de performance du système.""" def __init__(self): self.inference_times: List[float] = [] self.energy_trajectories: List[List[float]] = [] self.memory_sizes: List[int] = [] self.associations_counts: List[int] = [] self.creation_rates: List[float] = [] self.convergence_rates: List[float] = [] # Métriques de cohérence sémantique self.semantic_coherence_scores: List[float] = [] self.clustering_coefficients: List[float] = [] # Suivi des améliorations self.baseline_energy: Optional[float] = None self.energy_improvement: List[float] = [] def record_inference(self, result: InferenceResult, memory: SparseAddressTable, energy: EnergyLandscape): self.inference_times.append(result.execution_time_ms) self.energy_trajectories.append(result.energy_trajectory) self.memory_sizes.append(memory.size) self.associations_counts.append(len(energy.associations)) if result.energy_trajectory: final_energy = result.energy_trajectory[-1] if self.baseline_energy is None: self.baseline_energy = final_energy else: improvement = (self.baseline_energy - final_energy) / max(abs(self.baseline_energy), 1.0) self.energy_improvement.append(improvement) self.convergence_rates.append(1.0 if result.converged else 0.0) def compute_coherence(self, memory: SparseAddressTable) -> float: """ Calcule un score de cohérence sémantique : les vecteurs proches en distance de Hamming doivent avoir des usages similaires. """ if memory.size < 10: return 0.0 active = memory.active_vectors ids = [meta.id for idx, meta in memory.metadata.items() if memory.active_mask[idx]] if len(active) < 10: return 0.0 # Échantillonne n_sample = min(50, len(active)) sample_idx = np.random.choice(len(active), size=n_sample, replace=False) coherence_scores = [] for i in sample_idx: dists = np.sum(active != active[i], axis=1) nearest = np.argsort(dists)[1:6] # 5 plus proches # Compare les niveaux d'abstraction my_level = memory.metadata[i].abstraction_level if i in memory.metadata else 0 neighbor_levels = [ memory.metadata[ids[j]].abstraction_level for j in nearest ] # Cohérence = variance faible des niveaux dans le voisinage level_variance = np.var(neighbor_levels + [my_level]) coherence_scores.append(1.0 / (1.0 + level_variance)) return float(np.mean(coherence_scores)) if coherence_scores else 0.0 def get_summary(self) -> Dict: if not self.inference_times: return {} recent_energies = [ traj[-1] for traj in self.energy_trajectories[-50:] if traj ] return { 'avg_inference_time_ms': float(np.mean(self.inference_times[-100:])), 'avg_final_energy': float(np.mean(recent_energies)) if recent_energies else 0.0, 'memory_size': self.memory_sizes[-1] if self.memory_sizes else 0, 'n_associations': self.associations_counts[-1] if self.associations_counts else 0, 'convergence_rate': float(np.mean(self.convergence_rates[-100:])), 'energy_improvement_trend': float(np.mean(self.energy_improvement[-50:])) if self.energy_improvement else 0.0, 'semantic_coherence': float(np.mean(self.semantic_coherence_scores[-50:])) if self.semantic_coherence_scores else 0.0, } class MLESystem: """ Système MLE complet intégrant tous les modules avec apprentissage organique. Usage: mle = MLESystem() result = mle.process(input_vector) metrics = mle.get_metrics() """ def __init__( self, memory_capacity: int = 10000, k_neighbors: int = 10, temperature: float = 0.5, online_learning: bool = True, enable_stack: bool = True, enable_metrics: bool = True, ): self.k_neighbors = k_neighbors self.enable_stack = enable_stack self.enable_metrics = enable_metrics # Modules self.memory = SparseAddressTable( initial_capacity=memory_capacity, max_capacity=memory_capacity * 5, ) self.router = HammingRouter( use_index=True, learn_routes=True, ) self.binder = CircularBinder() self.energy = EnergyLandscape() self.inference = InferenceEngine( temperature=temperature, online_learning=online_learning, ) # Stack sémantique self.stack = SemanticStack() if enable_stack else None # Métriques self.metrics = MLEMetrics() if enable_metrics else None # Historique d'expérience self.experience_buffer: List[Dict] = [] self.experience_buffer_size = 1000 # Initialisation : crée quelques vecteurs de base self._initialize_base_vectors() logger.info(f"MLE System initialized with capacity {memory_capacity}") def _initialize_base_vectors(self, n_base: int = 10): """Crée des vecteurs de base pour démarrer le système.""" for i in range(n_base): vec = self.memory._create_sparse_vector() vid = self.memory.create_vector() # Trouve l'index for idx, meta in self.memory.metadata.items(): if meta.id == vid: self.router.add_vector(idx, vec) break def process( self, input_vector: np.ndarray, stack_level: int = 0, external_callback: Optional[Callable] = None, ) -> InferenceResult: """ Traite un vecteur d'entrée par inférence + apprentissage. Args: input_vector: (4096,) uint8 stack_level: niveau de la pile sémantique external_callback: callback par itération Returns: InferenceResult """ # Maintenance de la mémoire self.memory.tick() # Requête ou création du vecteur d'entrée input_id, input_idx, created = self.memory.query_or_create(input_vector) if created and input_idx >= 0: # Nouveau vecteur : ajoute au routeur self.router.add_vector(input_idx, input_vector) # Ajoute à la pile sémantique if self.stack: self.stack.push(input_id, level=stack_level) # Inférence result = self.inference.infer( initial_state=input_vector, memory_table=self.memory, router=self.router, energy_landscape=self.energy, binder=self.binder, k_neighbors=self.k_neighbors, external_callback=external_callback, ) # Stocke l'expérience experience = { 'input_id': input_id, 'created': created, 'final_state': result.final_state.copy() if result.final_state is not None else None, 'energy_trajectory': result.energy_trajectory.copy(), 'converged': result.converged, 'learning_events': result.learning_events.copy(), } self.experience_buffer.append(experience) if len(self.experience_buffer) > self.experience_buffer_size: self.experience_buffer.pop(0) # Métriques if self.metrics: self.metrics.record_inference(result, self.memory, self.energy) # Coherence périodique if self.inference.total_inferences % 50 == 0: coherence = self.metrics.compute_coherence(self.memory) self.metrics.semantic_coherence_scores.append(coherence) # Met à jour le routeur pour le vecteur final if result.final_state is not None: # Requête ou création de l'état final final_id, final_idx, final_created = self.memory.query_or_create(result.final_state) if final_created and final_idx >= 0: self.router.add_vector(final_idx, result.final_state) # Renforce la route input -> final if not created and not final_created: pair = tuple(sorted((input_id, final_id))) current = self.energy.associations.get(pair, 0.0) self.energy.associations[pair] = min(1.0, current + 0.05) return result def process_sequence( self, vectors: List[np.ndarray], bind_levels: bool = False, ) -> List[InferenceResult]: """ Traite une séquence de vecteurs. Args: vectors: liste de (4096,) uint8 bind_levels: si True, bind les niveaux de la pile périodiquement Returns: Liste de InferenceResult """ results = [] for i, vec in enumerate(vectors): result = self.process(vec, stack_level=0) results.append(result) # Bind périodique des niveaux if bind_levels and self.stack and i > 0 and i % 3 == 0: self.stack.bind_level(0, self.binder, self.memory) return results def query( self, query_vector: np.ndarray, k: int = 5, ) -> List[Tuple[int, float, int]]: """ Requête simple (sans inférence) pour retrouver les voisins. Returns: [(vector_id, distance, index)] """ return self.memory.find_nearest(query_vector, k=k) def bind_vectors(self, ids: List[int]) -> Optional[np.ndarray]: """ Binding explicite de vecteurs par ID. Returns: Vecteur composé ou None """ vectors = [] for vid in ids: for idx, meta in self.memory.metadata.items(): if meta.id == vid and self.memory.active_mask[idx]: vectors.append(self.memory.vectors[idx]) break if len(vectors) < 2: return None return self.binder.bind_multiple(vectors) def get_vector(self, vector_id: int) -> Optional[np.ndarray]: """Retourne un vecteur par son ID.""" for idx, meta in self.memory.metadata.items(): if meta.id == vector_id and self.memory.active_mask[idx]: return self.memory.vectors[idx].copy() return None def get_semantic_clusters(self, n_clusters: int = 5) -> Dict[int, List[int]]: """ Retourne des clusters sémantiques basés sur la distance de Hamming. """ if self.memory.size < n_clusters * 2: return {} active = self.memory.active_vectors ids = [meta.id for idx, meta in self.memory.metadata.items() if self.memory.active_mask[idx]] # Clustering simple par distance # 1. Choix des graines aléatoires seeds = np.random.choice(len(active), size=min(n_clusters, len(active)), replace=False) clusters: Dict[int, List[int]] = {ids[s]: [] for s in seeds} # 2. Assignation par plus proche graine for i, vec in enumerate(active): dists = [np.sum(vec != active[s]) for s in seeds] nearest_seed = seeds[np.argmin(dists)] clusters[ids[nearest_seed]].append(ids[i]) return clusters def get_metrics_summary(self) -> Dict: """Résumé des métriques.""" summary = {} if self.metrics: summary['performance'] = self.metrics.get_summary() summary['memory'] = self.memory.get_stats() summary['routing'] = self.router.get_stats() summary['energy'] = self.energy.get_stats() summary['inference'] = self.inference.get_stats() return summary def print_summary(self): """Affiche un résumé lisible.""" summary = self.get_metrics_summary() print("\n" + "="*60) print("MLE SYSTEM SUMMARY") print("="*60) for section, data in summary.items(): print(f"\n--- {section.upper()} ---") if isinstance(data, dict): for key, value in data.items(): if isinstance(value, float): print(f" {key}: {value:.4f}") else: print(f" {key}: {value}") else: print(f" {data}") print("\n" + "="*60) def save_state(self, filepath: str): """Sauvegarde l'état du système.""" state = { 'memory_stats': self.memory.get_stats(), 'energy_stats': self.energy.get_stats(), 'inference_stats': self.inference.get_stats(), 'router_stats': self.router.get_stats(), } with open(filepath, 'w') as f: json.dump(state, f, indent=2) def reset_metrics(self): """Réinitialise les métriques.""" if self.metrics: self.metrics = MLEMetrics() self.inference.total_inferences = 0 self.inference.total_iterations = 0 self.inference.total_converged = 0