"""Benchmark rapide et efficace du système MLE.""" import sys sys.path.insert(0, '.') import numpy as np import json import time from mle.mle_system import MLESystem from mle.memory import VECTOR_SIZE np.random.seed(42) def generate_related_vectors(n: int, base_sparsity: float = 0.05, relatedness: float = 0.7): target_active = int(VECTOR_SIZE * base_sparsity) n_shared = int(target_active * relatedness) n_unique = max(1, target_active - n_shared) shared_indices = np.random.choice(VECTOR_SIZE, size=n_shared, replace=False) vectors = [] for i in range(n): vec = np.zeros(VECTOR_SIZE, dtype=np.uint8) vec[shared_indices] = 1 remaining = np.setdiff1d(np.arange(VECTOR_SIZE), shared_indices) if len(remaining) >= n_unique: unique_indices = np.random.choice(remaining, size=n_unique, replace=False) vec[unique_indices] = 1 vectors.append(vec) return vectors def generate_unrelated_vectors(n: int, base_sparsity: float = 0.05): target_active = int(VECTOR_SIZE * base_sparsity) vectors = [] for i in range(n): indices = np.random.choice(VECTOR_SIZE, size=target_active, replace=False) vec = np.zeros(VECTOR_SIZE, dtype=np.uint8) vec[indices] = 1 vectors.append(vec) return vectors def generate_query_from_base(base: np.ndarray, noise: float = 0.1): vec = base.copy() active = np.where(vec)[0] n_flip = max(1, int(len(active) * noise)) if n_flip > 0 and len(active) > 0: to_off = np.random.choice(active, size=min(n_flip, len(active)), replace=False) vec[to_off] = 0 inactive = np.where(vec == 0)[0] if len(inactive) > 0: to_on = np.random.choice(inactive, size=min(n_flip, len(inactive)), replace=False) vec[to_on] = 1 return vec def benchmark_learning(mle: MLESystem, n_concepts: int = 5, n_batches: int = 3): """Benchmark d'apprentissage et généralisation - version rapide.""" print("\n" + "="*70) print("BENCHMARK: Learning Curve & Generalization") print("="*70) concepts = [] for i in range(n_concepts): base = generate_related_vectors(1, relatedness=1.0)[0] variants = generate_related_vectors(4, relatedness=0.7) concepts.append((base, variants)) train_data = [] test_data = [] for base, variants in concepts: for v in variants[:2]: train_data.append(v) for v in variants[2:]: test_data.append(v) for _ in range(2): train_data.append(generate_query_from_base(base, noise=0.15)) for _ in range(2): test_data.append(generate_query_from_base(base, noise=0.30)) np.random.shuffle(train_data) np.random.shuffle(test_data) results = [] batch_size = max(1, len(train_data) // n_batches) for batch_idx in range(n_batches): start = batch_idx * batch_size end = min(start + batch_size, len(train_data)) batch = train_data[start:end] print(f"\n--- Batch {batch_idx + 1}/{n_batches} ({len(batch)} vectors) ---") energies = [] for i, vec in enumerate(batch): result = mle.process(vec) if result.energy_trajectory: energies.append(result.energy_trajectory[-1]) avg_train = np.mean(energies) if energies else 0 print(f" Train energy: {avg_train:.0f} (n={len(energies)})") # Test rapide test_energies = [] for vec in test_data[:5]: result = mle.process(vec) if result.energy_trajectory: test_energies.append(result.energy_trajectory[-1]) avg_test = np.mean(test_energies) if test_energies else 0 print(f" Test energy: {avg_test:.0f} (n={len(test_energies)})") print(f" Memory size: {mle.memory.size}") results.append({ 'batch': batch_idx + 1, 'train_avg_energy': float(avg_train), 'test_avg_energy': float(avg_test), 'memory_size': mle.memory.size, 'n_associations': len(mle.energy.associations), }) return results def benchmark_stability(mle: MLESystem, n_iterations: int = 50): """Test de stabilité - rapide.""" print("\n" + "="*70) print("BENCHMARK: Stability Test") print("="*70) base_vectors = generate_unrelated_vectors(5) energies = [] for i in range(n_iterations): base = base_vectors[i % len(base_vectors)] vec = generate_query_from_base(base, noise=0.20) result = mle.process(vec) if result.energy_trajectory: energies.append(result.energy_trajectory[-1]) if i % 10 == 0: recent = np.mean(energies[-10:]) if len(energies) >= 10 else (np.mean(energies) if energies else 0) print(f" [{i:3d}] energy={recent:.0f} memory={mle.memory.size}") if len(energies) > 20: early = np.mean(energies[:10]) late = np.mean(energies[-10:]) print(f"\n Early energy: {early:.0f}") print(f" Late energy: {late:.0f}") if late < early * 0.9: print(" ✓ Energy DECREASED with experience") elif late < early * 1.1: print(" ✓ Energy STABLE") else: print(" ⚠ Energy INCREASED") return {'early_energy': float(np.mean(energies[:10])) if len(energies) > 10 else 0, 'late_energy': float(np.mean(energies[-10:])) if len(energies) > 10 else 0} def benchmark_binding(mle: MLESystem, n_trials: int = 10): """Test de binding/unbinding - rapide.""" print("\n" + "="*70) print("BENCHMARK: Binding & Composition") print("="*70) roles = generate_unrelated_vectors(3) fillers = generate_unrelated_vectors(3) successes = 0 for trial in range(n_trials): role_idx = trial % 3 filler_idx = (trial + 1) % 3 bound = mle.binder.bind_role_filler(roles[role_idx], fillers[filler_idx]) recovered = mle.binder.unbind_role_filler(bound, roles[role_idx]) similarity = np.mean(recovered == fillers[filler_idx]) if similarity > 0.6: successes += 1 accuracy = successes / n_trials print(f" Binding accuracy: {successes}/{n_trials} ({accuracy:.1%})") return {'binding_accuracy': accuracy} def main(): print("="*70) print("MLE SYSTEM COMPREHENSIVE BENCHMARK") print("="*70) mle = MLESystem( memory_capacity=2000, online_learning=True, temperature=0.5, ) learning_results = benchmark_learning(mle) stability_results = benchmark_stability(mle) binding_results = benchmark_binding(mle) print("\n" + "="*70) print("FINAL SUMMARY") print("="*70) mle.print_summary() all_results = { 'learning_curve': learning_results, 'stability': stability_results, 'binding': binding_results, } with open("benchmark_results.json", "w") as f: json.dump(all_results, f, indent=2, default=float) print("\n✓ Benchmark complete!") return all_results if __name__ == "__main__": results = main()