| """ |
| MLE System - Intégration complète du Morpho-Logic Engine |
| |
| Orchestre les modules : |
| - memory (SparseAddressTable) |
| - routing (HammingRouter) |
| - binding (CircularBinder) |
| - energy (EnergyLandscape) |
| - inference (InferenceEngine) |
| |
| Ajoute : |
| - Pile sémantique pour traitement hiérarchique |
| - Méta-apprentissage sur la structure même du système |
| - Métriques et monitoring |
| - Stabilisation globale |
| """ |
|
|
| import numpy as np |
| from typing import List, Dict, Tuple, Optional, Callable, Any |
| import logging |
| import time |
| import json |
|
|
| from .memory import SparseAddressTable, VECTOR_SIZE |
| from .routing import HammingRouter |
| from .binding import CircularBinder |
| from .energy import EnergyLandscape |
| from .inference import InferenceEngine, InferenceResult |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SemanticStack: |
| """ |
| Pile sémantique pour traitement hiérarchique. |
| |
| Permet de représenter des structures imbriquées : |
| - Niveau 0 : tokens/bruts |
| - Niveau 1 : chunks/groupes |
| - Niveau 2 : phrases/propositions |
| - Niveau 3+: concepts abstraits |
| """ |
| |
| def __init__(self, max_depth: int = 4): |
| self.max_depth = max_depth |
| self.levels: List[List[int]] = [[] for _ in range(max_depth)] |
| self.level_bindings: Dict[int, Dict[Tuple[int, int], np.ndarray]] = {} |
| |
| def push(self, vector_id: int, level: int = 0): |
| """Ajoute un vecteur à un niveau.""" |
| if 0 <= level < self.max_depth: |
| self.levels[level].append(vector_id) |
| |
| def pop(self, level: int = 0) -> Optional[int]: |
| """Retire le dernier vecteur d'un niveau.""" |
| if 0 <= level < self.max_depth and self.levels[level]: |
| return self.levels[level].pop() |
| return None |
| |
| def bind_level(self, level: int, binder: CircularBinder, memory: SparseAddressTable): |
| """ |
| Combine les vecteurs d'un niveau en un vecteur composite, |
| puis le pousse au niveau supérieur. |
| """ |
| if level >= self.max_depth - 1: |
| return None |
| |
| ids = self.levels[level] |
| if len(ids) < 2: |
| return None |
| |
| |
| vectors = [] |
| for vid in ids: |
| for idx, meta in memory.metadata.items(): |
| if meta.id == vid and memory.active_mask[idx]: |
| vectors.append(memory.vectors[idx]) |
| break |
| |
| if len(vectors) < 2: |
| return None |
| |
| |
| composite = binder.bind_multiple(vectors) |
| |
| |
| self.level_bindings[level] = {} |
| for i, vid in enumerate(ids): |
| for j, vid2 in enumerate(ids[i+1:], i+1): |
| self.level_bindings[level][(vid, vid2)] = composite |
| |
| |
| new_id = memory.create_vector(context=composite, abstraction_level=level+1) |
| self.levels[level] = [] |
| self.push(new_id, level=level+1) |
| |
| return new_id |
| |
| def get_level_state(self, level: int, memory: SparseAddressTable) -> np.ndarray: |
| """Retourne l'état composite d'un niveau.""" |
| if level >= self.max_depth: |
| return np.zeros(VECTOR_SIZE, dtype=np.uint8) |
| |
| ids = self.levels[level] |
| if not ids: |
| return np.zeros(VECTOR_SIZE, dtype=np.uint8) |
| |
| vectors = [] |
| for vid in ids: |
| for idx, meta in memory.metadata.items(): |
| if meta.id == vid and memory.active_mask[idx]: |
| vectors.append(memory.vectors[idx]) |
| break |
| |
| if not vectors: |
| return np.zeros(VECTOR_SIZE, dtype=np.uint8) |
| |
| |
| mean_vec = np.mean(vectors, axis=0) |
| return (mean_vec > 0.5).astype(np.uint8) |
| |
| def clear(self): |
| """Vide toute la pile.""" |
| self.levels = [[] for _ in range(self.max_depth)] |
| self.level_bindings = {} |
|
|
|
|
| class MLEMetrics: |
| """Collecte et agrège les métriques de performance du système.""" |
| |
| def __init__(self): |
| self.inference_times: List[float] = [] |
| self.energy_trajectories: List[List[float]] = [] |
| self.memory_sizes: List[int] = [] |
| self.associations_counts: List[int] = [] |
| self.creation_rates: List[float] = [] |
| self.convergence_rates: List[float] = [] |
| |
| |
| self.semantic_coherence_scores: List[float] = [] |
| self.clustering_coefficients: List[float] = [] |
| |
| |
| self.baseline_energy: Optional[float] = None |
| self.energy_improvement: List[float] = [] |
| |
| def record_inference(self, result: InferenceResult, memory: SparseAddressTable, |
| energy: EnergyLandscape): |
| self.inference_times.append(result.execution_time_ms) |
| self.energy_trajectories.append(result.energy_trajectory) |
| self.memory_sizes.append(memory.size) |
| self.associations_counts.append(len(energy.associations)) |
| |
| if result.energy_trajectory: |
| final_energy = result.energy_trajectory[-1] |
| if self.baseline_energy is None: |
| self.baseline_energy = final_energy |
| else: |
| improvement = (self.baseline_energy - final_energy) / max(abs(self.baseline_energy), 1.0) |
| self.energy_improvement.append(improvement) |
| |
| self.convergence_rates.append(1.0 if result.converged else 0.0) |
| |
| def compute_coherence(self, memory: SparseAddressTable) -> float: |
| """ |
| Calcule un score de cohérence sémantique : |
| les vecteurs proches en distance de Hamming doivent avoir des usages similaires. |
| """ |
| if memory.size < 10: |
| return 0.0 |
| |
| active = memory.active_vectors |
| ids = [meta.id for idx, meta in memory.metadata.items() if memory.active_mask[idx]] |
| |
| if len(active) < 10: |
| return 0.0 |
| |
| |
| n_sample = min(50, len(active)) |
| sample_idx = np.random.choice(len(active), size=n_sample, replace=False) |
| |
| coherence_scores = [] |
| for i in sample_idx: |
| dists = np.sum(active != active[i], axis=1) |
| nearest = np.argsort(dists)[1:6] |
| |
| |
| my_level = memory.metadata[i].abstraction_level if i in memory.metadata else 0 |
| neighbor_levels = [ |
| memory.metadata[ids[j]].abstraction_level |
| for j in nearest |
| ] |
| |
| |
| level_variance = np.var(neighbor_levels + [my_level]) |
| coherence_scores.append(1.0 / (1.0 + level_variance)) |
| |
| return float(np.mean(coherence_scores)) if coherence_scores else 0.0 |
| |
| def get_summary(self) -> Dict: |
| if not self.inference_times: |
| return {} |
| |
| recent_energies = [ |
| traj[-1] for traj in self.energy_trajectories[-50:] |
| if traj |
| ] |
| |
| return { |
| 'avg_inference_time_ms': float(np.mean(self.inference_times[-100:])), |
| 'avg_final_energy': float(np.mean(recent_energies)) if recent_energies else 0.0, |
| 'memory_size': self.memory_sizes[-1] if self.memory_sizes else 0, |
| 'n_associations': self.associations_counts[-1] if self.associations_counts else 0, |
| 'convergence_rate': float(np.mean(self.convergence_rates[-100:])), |
| 'energy_improvement_trend': float(np.mean(self.energy_improvement[-50:])) if self.energy_improvement else 0.0, |
| 'semantic_coherence': float(np.mean(self.semantic_coherence_scores[-50:])) if self.semantic_coherence_scores else 0.0, |
| } |
|
|
|
|
| class MLESystem: |
| """ |
| Système MLE complet intégrant tous les modules avec apprentissage organique. |
| |
| Usage: |
| mle = MLESystem() |
| result = mle.process(input_vector) |
| metrics = mle.get_metrics() |
| """ |
| |
| def __init__( |
| self, |
| memory_capacity: int = 10000, |
| k_neighbors: int = 10, |
| temperature: float = 0.5, |
| online_learning: bool = True, |
| enable_stack: bool = True, |
| enable_metrics: bool = True, |
| ): |
| self.k_neighbors = k_neighbors |
| self.enable_stack = enable_stack |
| self.enable_metrics = enable_metrics |
| |
| |
| self.memory = SparseAddressTable( |
| initial_capacity=memory_capacity, |
| max_capacity=memory_capacity * 5, |
| ) |
| self.router = HammingRouter( |
| use_index=True, |
| learn_routes=True, |
| ) |
| self.binder = CircularBinder() |
| self.energy = EnergyLandscape() |
| self.inference = InferenceEngine( |
| temperature=temperature, |
| online_learning=online_learning, |
| ) |
| |
| |
| self.stack = SemanticStack() if enable_stack else None |
| |
| |
| self.metrics = MLEMetrics() if enable_metrics else None |
| |
| |
| self.experience_buffer: List[Dict] = [] |
| self.experience_buffer_size = 1000 |
| |
| |
| self._initialize_base_vectors() |
| |
| logger.info(f"MLE System initialized with capacity {memory_capacity}") |
| |
| def _initialize_base_vectors(self, n_base: int = 10): |
| """Crée des vecteurs de base pour démarrer le système.""" |
| for i in range(n_base): |
| vec = self.memory._create_sparse_vector() |
| vid = self.memory.create_vector() |
| |
| for idx, meta in self.memory.metadata.items(): |
| if meta.id == vid: |
| self.router.add_vector(idx, vec) |
| break |
| |
| def process( |
| self, |
| input_vector: np.ndarray, |
| stack_level: int = 0, |
| external_callback: Optional[Callable] = None, |
| ) -> InferenceResult: |
| """ |
| Traite un vecteur d'entrée par inférence + apprentissage. |
| |
| Args: |
| input_vector: (4096,) uint8 |
| stack_level: niveau de la pile sémantique |
| external_callback: callback par itération |
| |
| Returns: |
| InferenceResult |
| """ |
| |
| self.memory.tick() |
| |
| |
| input_id, input_idx, created = self.memory.query_or_create(input_vector) |
| |
| if created and input_idx >= 0: |
| |
| self.router.add_vector(input_idx, input_vector) |
| |
| |
| if self.stack: |
| self.stack.push(input_id, level=stack_level) |
| |
| |
| result = self.inference.infer( |
| initial_state=input_vector, |
| memory_table=self.memory, |
| router=self.router, |
| energy_landscape=self.energy, |
| binder=self.binder, |
| k_neighbors=self.k_neighbors, |
| external_callback=external_callback, |
| ) |
| |
| |
| experience = { |
| 'input_id': input_id, |
| 'created': created, |
| 'final_state': result.final_state.copy() if result.final_state is not None else None, |
| 'energy_trajectory': result.energy_trajectory.copy(), |
| 'converged': result.converged, |
| 'learning_events': result.learning_events.copy(), |
| } |
| self.experience_buffer.append(experience) |
| if len(self.experience_buffer) > self.experience_buffer_size: |
| self.experience_buffer.pop(0) |
| |
| |
| if self.metrics: |
| self.metrics.record_inference(result, self.memory, self.energy) |
| |
| if self.inference.total_inferences % 50 == 0: |
| coherence = self.metrics.compute_coherence(self.memory) |
| self.metrics.semantic_coherence_scores.append(coherence) |
| |
| |
| if result.final_state is not None: |
| |
| final_id, final_idx, final_created = self.memory.query_or_create(result.final_state) |
| if final_created and final_idx >= 0: |
| self.router.add_vector(final_idx, result.final_state) |
| |
| |
| if not created and not final_created: |
| pair = tuple(sorted((input_id, final_id))) |
| current = self.energy.associations.get(pair, 0.0) |
| self.energy.associations[pair] = min(1.0, current + 0.05) |
| |
| return result |
| |
| def process_sequence( |
| self, |
| vectors: List[np.ndarray], |
| bind_levels: bool = False, |
| ) -> List[InferenceResult]: |
| """ |
| Traite une séquence de vecteurs. |
| |
| Args: |
| vectors: liste de (4096,) uint8 |
| bind_levels: si True, bind les niveaux de la pile périodiquement |
| |
| Returns: |
| Liste de InferenceResult |
| """ |
| results = [] |
| for i, vec in enumerate(vectors): |
| result = self.process(vec, stack_level=0) |
| results.append(result) |
| |
| |
| if bind_levels and self.stack and i > 0 and i % 3 == 0: |
| self.stack.bind_level(0, self.binder, self.memory) |
| |
| return results |
| |
| def query( |
| self, |
| query_vector: np.ndarray, |
| k: int = 5, |
| ) -> List[Tuple[int, float, int]]: |
| """ |
| Requête simple (sans inférence) pour retrouver les voisins. |
| |
| Returns: |
| [(vector_id, distance, index)] |
| """ |
| return self.memory.find_nearest(query_vector, k=k) |
| |
| def bind_vectors(self, ids: List[int]) -> Optional[np.ndarray]: |
| """ |
| Binding explicite de vecteurs par ID. |
| |
| Returns: |
| Vecteur composé ou None |
| """ |
| vectors = [] |
| for vid in ids: |
| for idx, meta in self.memory.metadata.items(): |
| if meta.id == vid and self.memory.active_mask[idx]: |
| vectors.append(self.memory.vectors[idx]) |
| break |
| |
| if len(vectors) < 2: |
| return None |
| |
| return self.binder.bind_multiple(vectors) |
| |
| def get_vector(self, vector_id: int) -> Optional[np.ndarray]: |
| """Retourne un vecteur par son ID.""" |
| for idx, meta in self.memory.metadata.items(): |
| if meta.id == vector_id and self.memory.active_mask[idx]: |
| return self.memory.vectors[idx].copy() |
| return None |
| |
| def get_semantic_clusters(self, n_clusters: int = 5) -> Dict[int, List[int]]: |
| """ |
| Retourne des clusters sémantiques basés sur la distance de Hamming. |
| """ |
| if self.memory.size < n_clusters * 2: |
| return {} |
| |
| active = self.memory.active_vectors |
| ids = [meta.id for idx, meta in self.memory.metadata.items() if self.memory.active_mask[idx]] |
| |
| |
| |
| seeds = np.random.choice(len(active), size=min(n_clusters, len(active)), replace=False) |
| clusters: Dict[int, List[int]] = {ids[s]: [] for s in seeds} |
| |
| |
| for i, vec in enumerate(active): |
| dists = [np.sum(vec != active[s]) for s in seeds] |
| nearest_seed = seeds[np.argmin(dists)] |
| clusters[ids[nearest_seed]].append(ids[i]) |
| |
| return clusters |
| |
| def get_metrics_summary(self) -> Dict: |
| """Résumé des métriques.""" |
| summary = {} |
| |
| if self.metrics: |
| summary['performance'] = self.metrics.get_summary() |
| |
| summary['memory'] = self.memory.get_stats() |
| summary['routing'] = self.router.get_stats() |
| summary['energy'] = self.energy.get_stats() |
| summary['inference'] = self.inference.get_stats() |
| |
| return summary |
| |
| def print_summary(self): |
| """Affiche un résumé lisible.""" |
| summary = self.get_metrics_summary() |
| print("\n" + "="*60) |
| print("MLE SYSTEM SUMMARY") |
| print("="*60) |
| |
| for section, data in summary.items(): |
| print(f"\n--- {section.upper()} ---") |
| if isinstance(data, dict): |
| for key, value in data.items(): |
| if isinstance(value, float): |
| print(f" {key}: {value:.4f}") |
| else: |
| print(f" {key}: {value}") |
| else: |
| print(f" {data}") |
| |
| print("\n" + "="*60) |
| |
| def save_state(self, filepath: str): |
| """Sauvegarde l'état du système.""" |
| state = { |
| 'memory_stats': self.memory.get_stats(), |
| 'energy_stats': self.energy.get_stats(), |
| 'inference_stats': self.inference.get_stats(), |
| 'router_stats': self.router.get_stats(), |
| } |
| with open(filepath, 'w') as f: |
| json.dump(state, f, indent=2) |
| |
| def reset_metrics(self): |
| """Réinitialise les métriques.""" |
| if self.metrics: |
| self.metrics = MLEMetrics() |
| self.inference.total_inferences = 0 |
| self.inference.total_iterations = 0 |
| self.inference.total_converged = 0 |