morpho-logic-engine / mle /mle_system.py
Harry00's picture
Upload mle/mle_system.py
8983e24 verified
"""
MLE System - Intégration complète du Morpho-Logic Engine
Orchestre les modules :
- memory (SparseAddressTable)
- routing (HammingRouter)
- binding (CircularBinder)
- energy (EnergyLandscape)
- inference (InferenceEngine)
Ajoute :
- Pile sémantique pour traitement hiérarchique
- Méta-apprentissage sur la structure même du système
- Métriques et monitoring
- Stabilisation globale
"""
import numpy as np
from typing import List, Dict, Tuple, Optional, Callable, Any
import logging
import time
import json
from .memory import SparseAddressTable, VECTOR_SIZE
from .routing import HammingRouter
from .binding import CircularBinder
from .energy import EnergyLandscape
from .inference import InferenceEngine, InferenceResult
logger = logging.getLogger(__name__)
class SemanticStack:
"""
Pile sémantique pour traitement hiérarchique.
Permet de représenter des structures imbriquées :
- Niveau 0 : tokens/bruts
- Niveau 1 : chunks/groupes
- Niveau 2 : phrases/propositions
- Niveau 3+: concepts abstraits
"""
def __init__(self, max_depth: int = 4):
self.max_depth = max_depth
self.levels: List[List[int]] = [[] for _ in range(max_depth)]
self.level_bindings: Dict[int, Dict[Tuple[int, int], np.ndarray]] = {}
def push(self, vector_id: int, level: int = 0):
"""Ajoute un vecteur à un niveau."""
if 0 <= level < self.max_depth:
self.levels[level].append(vector_id)
def pop(self, level: int = 0) -> Optional[int]:
"""Retire le dernier vecteur d'un niveau."""
if 0 <= level < self.max_depth and self.levels[level]:
return self.levels[level].pop()
return None
def bind_level(self, level: int, binder: CircularBinder, memory: SparseAddressTable):
"""
Combine les vecteurs d'un niveau en un vecteur composite,
puis le pousse au niveau supérieur.
"""
if level >= self.max_depth - 1:
return None
ids = self.levels[level]
if len(ids) < 2:
return None
# Récupère les vecteurs
vectors = []
for vid in ids:
for idx, meta in memory.metadata.items():
if meta.id == vid and memory.active_mask[idx]:
vectors.append(memory.vectors[idx])
break
if len(vectors) < 2:
return None
# Binding de tous les vecteurs du niveau
composite = binder.bind_multiple(vectors)
# Stocke le composite
self.level_bindings[level] = {}
for i, vid in enumerate(ids):
for j, vid2 in enumerate(ids[i+1:], i+1):
self.level_bindings[level][(vid, vid2)] = composite
# Crée un nouveau vecteur pour le composite et le pousse au niveau supérieur
new_id = memory.create_vector(context=composite, abstraction_level=level+1)
self.levels[level] = []
self.push(new_id, level=level+1)
return new_id
def get_level_state(self, level: int, memory: SparseAddressTable) -> np.ndarray:
"""Retourne l'état composite d'un niveau."""
if level >= self.max_depth:
return np.zeros(VECTOR_SIZE, dtype=np.uint8)
ids = self.levels[level]
if not ids:
return np.zeros(VECTOR_SIZE, dtype=np.uint8)
vectors = []
for vid in ids:
for idx, meta in memory.metadata.items():
if meta.id == vid and memory.active_mask[idx]:
vectors.append(memory.vectors[idx])
break
if not vectors:
return np.zeros(VECTOR_SIZE, dtype=np.uint8)
# Moyenne binaire
mean_vec = np.mean(vectors, axis=0)
return (mean_vec > 0.5).astype(np.uint8)
def clear(self):
"""Vide toute la pile."""
self.levels = [[] for _ in range(self.max_depth)]
self.level_bindings = {}
class MLEMetrics:
"""Collecte et agrège les métriques de performance du système."""
def __init__(self):
self.inference_times: List[float] = []
self.energy_trajectories: List[List[float]] = []
self.memory_sizes: List[int] = []
self.associations_counts: List[int] = []
self.creation_rates: List[float] = []
self.convergence_rates: List[float] = []
# Métriques de cohérence sémantique
self.semantic_coherence_scores: List[float] = []
self.clustering_coefficients: List[float] = []
# Suivi des améliorations
self.baseline_energy: Optional[float] = None
self.energy_improvement: List[float] = []
def record_inference(self, result: InferenceResult, memory: SparseAddressTable,
energy: EnergyLandscape):
self.inference_times.append(result.execution_time_ms)
self.energy_trajectories.append(result.energy_trajectory)
self.memory_sizes.append(memory.size)
self.associations_counts.append(len(energy.associations))
if result.energy_trajectory:
final_energy = result.energy_trajectory[-1]
if self.baseline_energy is None:
self.baseline_energy = final_energy
else:
improvement = (self.baseline_energy - final_energy) / max(abs(self.baseline_energy), 1.0)
self.energy_improvement.append(improvement)
self.convergence_rates.append(1.0 if result.converged else 0.0)
def compute_coherence(self, memory: SparseAddressTable) -> float:
"""
Calcule un score de cohérence sémantique :
les vecteurs proches en distance de Hamming doivent avoir des usages similaires.
"""
if memory.size < 10:
return 0.0
active = memory.active_vectors
ids = [meta.id for idx, meta in memory.metadata.items() if memory.active_mask[idx]]
if len(active) < 10:
return 0.0
# Échantillonne
n_sample = min(50, len(active))
sample_idx = np.random.choice(len(active), size=n_sample, replace=False)
coherence_scores = []
for i in sample_idx:
dists = np.sum(active != active[i], axis=1)
nearest = np.argsort(dists)[1:6] # 5 plus proches
# Compare les niveaux d'abstraction
my_level = memory.metadata[i].abstraction_level if i in memory.metadata else 0
neighbor_levels = [
memory.metadata[ids[j]].abstraction_level
for j in nearest
]
# Cohérence = variance faible des niveaux dans le voisinage
level_variance = np.var(neighbor_levels + [my_level])
coherence_scores.append(1.0 / (1.0 + level_variance))
return float(np.mean(coherence_scores)) if coherence_scores else 0.0
def get_summary(self) -> Dict:
if not self.inference_times:
return {}
recent_energies = [
traj[-1] for traj in self.energy_trajectories[-50:]
if traj
]
return {
'avg_inference_time_ms': float(np.mean(self.inference_times[-100:])),
'avg_final_energy': float(np.mean(recent_energies)) if recent_energies else 0.0,
'memory_size': self.memory_sizes[-1] if self.memory_sizes else 0,
'n_associations': self.associations_counts[-1] if self.associations_counts else 0,
'convergence_rate': float(np.mean(self.convergence_rates[-100:])),
'energy_improvement_trend': float(np.mean(self.energy_improvement[-50:])) if self.energy_improvement else 0.0,
'semantic_coherence': float(np.mean(self.semantic_coherence_scores[-50:])) if self.semantic_coherence_scores else 0.0,
}
class MLESystem:
"""
Système MLE complet intégrant tous les modules avec apprentissage organique.
Usage:
mle = MLESystem()
result = mle.process(input_vector)
metrics = mle.get_metrics()
"""
def __init__(
self,
memory_capacity: int = 10000,
k_neighbors: int = 10,
temperature: float = 0.5,
online_learning: bool = True,
enable_stack: bool = True,
enable_metrics: bool = True,
):
self.k_neighbors = k_neighbors
self.enable_stack = enable_stack
self.enable_metrics = enable_metrics
# Modules
self.memory = SparseAddressTable(
initial_capacity=memory_capacity,
max_capacity=memory_capacity * 5,
)
self.router = HammingRouter(
use_index=True,
learn_routes=True,
)
self.binder = CircularBinder()
self.energy = EnergyLandscape()
self.inference = InferenceEngine(
temperature=temperature,
online_learning=online_learning,
)
# Stack sémantique
self.stack = SemanticStack() if enable_stack else None
# Métriques
self.metrics = MLEMetrics() if enable_metrics else None
# Historique d'expérience
self.experience_buffer: List[Dict] = []
self.experience_buffer_size = 1000
# Initialisation : crée quelques vecteurs de base
self._initialize_base_vectors()
logger.info(f"MLE System initialized with capacity {memory_capacity}")
def _initialize_base_vectors(self, n_base: int = 10):
"""Crée des vecteurs de base pour démarrer le système."""
for i in range(n_base):
vec = self.memory._create_sparse_vector()
vid = self.memory.create_vector()
# Trouve l'index
for idx, meta in self.memory.metadata.items():
if meta.id == vid:
self.router.add_vector(idx, vec)
break
def process(
self,
input_vector: np.ndarray,
stack_level: int = 0,
external_callback: Optional[Callable] = None,
) -> InferenceResult:
"""
Traite un vecteur d'entrée par inférence + apprentissage.
Args:
input_vector: (4096,) uint8
stack_level: niveau de la pile sémantique
external_callback: callback par itération
Returns:
InferenceResult
"""
# Maintenance de la mémoire
self.memory.tick()
# Requête ou création du vecteur d'entrée
input_id, input_idx, created = self.memory.query_or_create(input_vector)
if created and input_idx >= 0:
# Nouveau vecteur : ajoute au routeur
self.router.add_vector(input_idx, input_vector)
# Ajoute à la pile sémantique
if self.stack:
self.stack.push(input_id, level=stack_level)
# Inférence
result = self.inference.infer(
initial_state=input_vector,
memory_table=self.memory,
router=self.router,
energy_landscape=self.energy,
binder=self.binder,
k_neighbors=self.k_neighbors,
external_callback=external_callback,
)
# Stocke l'expérience
experience = {
'input_id': input_id,
'created': created,
'final_state': result.final_state.copy() if result.final_state is not None else None,
'energy_trajectory': result.energy_trajectory.copy(),
'converged': result.converged,
'learning_events': result.learning_events.copy(),
}
self.experience_buffer.append(experience)
if len(self.experience_buffer) > self.experience_buffer_size:
self.experience_buffer.pop(0)
# Métriques
if self.metrics:
self.metrics.record_inference(result, self.memory, self.energy)
# Coherence périodique
if self.inference.total_inferences % 50 == 0:
coherence = self.metrics.compute_coherence(self.memory)
self.metrics.semantic_coherence_scores.append(coherence)
# Met à jour le routeur pour le vecteur final
if result.final_state is not None:
# Requête ou création de l'état final
final_id, final_idx, final_created = self.memory.query_or_create(result.final_state)
if final_created and final_idx >= 0:
self.router.add_vector(final_idx, result.final_state)
# Renforce la route input -> final
if not created and not final_created:
pair = tuple(sorted((input_id, final_id)))
current = self.energy.associations.get(pair, 0.0)
self.energy.associations[pair] = min(1.0, current + 0.05)
return result
def process_sequence(
self,
vectors: List[np.ndarray],
bind_levels: bool = False,
) -> List[InferenceResult]:
"""
Traite une séquence de vecteurs.
Args:
vectors: liste de (4096,) uint8
bind_levels: si True, bind les niveaux de la pile périodiquement
Returns:
Liste de InferenceResult
"""
results = []
for i, vec in enumerate(vectors):
result = self.process(vec, stack_level=0)
results.append(result)
# Bind périodique des niveaux
if bind_levels and self.stack and i > 0 and i % 3 == 0:
self.stack.bind_level(0, self.binder, self.memory)
return results
def query(
self,
query_vector: np.ndarray,
k: int = 5,
) -> List[Tuple[int, float, int]]:
"""
Requête simple (sans inférence) pour retrouver les voisins.
Returns:
[(vector_id, distance, index)]
"""
return self.memory.find_nearest(query_vector, k=k)
def bind_vectors(self, ids: List[int]) -> Optional[np.ndarray]:
"""
Binding explicite de vecteurs par ID.
Returns:
Vecteur composé ou None
"""
vectors = []
for vid in ids:
for idx, meta in self.memory.metadata.items():
if meta.id == vid and self.memory.active_mask[idx]:
vectors.append(self.memory.vectors[idx])
break
if len(vectors) < 2:
return None
return self.binder.bind_multiple(vectors)
def get_vector(self, vector_id: int) -> Optional[np.ndarray]:
"""Retourne un vecteur par son ID."""
for idx, meta in self.memory.metadata.items():
if meta.id == vector_id and self.memory.active_mask[idx]:
return self.memory.vectors[idx].copy()
return None
def get_semantic_clusters(self, n_clusters: int = 5) -> Dict[int, List[int]]:
"""
Retourne des clusters sémantiques basés sur la distance de Hamming.
"""
if self.memory.size < n_clusters * 2:
return {}
active = self.memory.active_vectors
ids = [meta.id for idx, meta in self.memory.metadata.items() if self.memory.active_mask[idx]]
# Clustering simple par distance
# 1. Choix des graines aléatoires
seeds = np.random.choice(len(active), size=min(n_clusters, len(active)), replace=False)
clusters: Dict[int, List[int]] = {ids[s]: [] for s in seeds}
# 2. Assignation par plus proche graine
for i, vec in enumerate(active):
dists = [np.sum(vec != active[s]) for s in seeds]
nearest_seed = seeds[np.argmin(dists)]
clusters[ids[nearest_seed]].append(ids[i])
return clusters
def get_metrics_summary(self) -> Dict:
"""Résumé des métriques."""
summary = {}
if self.metrics:
summary['performance'] = self.metrics.get_summary()
summary['memory'] = self.memory.get_stats()
summary['routing'] = self.router.get_stats()
summary['energy'] = self.energy.get_stats()
summary['inference'] = self.inference.get_stats()
return summary
def print_summary(self):
"""Affiche un résumé lisible."""
summary = self.get_metrics_summary()
print("\n" + "="*60)
print("MLE SYSTEM SUMMARY")
print("="*60)
for section, data in summary.items():
print(f"\n--- {section.upper()} ---")
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, float):
print(f" {key}: {value:.4f}")
else:
print(f" {key}: {value}")
else:
print(f" {data}")
print("\n" + "="*60)
def save_state(self, filepath: str):
"""Sauvegarde l'état du système."""
state = {
'memory_stats': self.memory.get_stats(),
'energy_stats': self.energy.get_stats(),
'inference_stats': self.inference.get_stats(),
'router_stats': self.router.get_stats(),
}
with open(filepath, 'w') as f:
json.dump(state, f, indent=2)
def reset_metrics(self):
"""Réinitialise les métriques."""
if self.metrics:
self.metrics = MLEMetrics()
self.inference.total_inferences = 0
self.inference.total_iterations = 0
self.inference.total_converged = 0