morpho-logic-engine / run_benchmark.py
Harry00's picture
Upload run_benchmark.py
aafc205 verified
"""Benchmark rapide et efficace du système MLE."""
import sys
sys.path.insert(0, '.')
import numpy as np
import json
import time
from mle.mle_system import MLESystem
from mle.memory import VECTOR_SIZE
np.random.seed(42)
def generate_related_vectors(n: int, base_sparsity: float = 0.05, relatedness: float = 0.7):
target_active = int(VECTOR_SIZE * base_sparsity)
n_shared = int(target_active * relatedness)
n_unique = max(1, target_active - n_shared)
shared_indices = np.random.choice(VECTOR_SIZE, size=n_shared, replace=False)
vectors = []
for i in range(n):
vec = np.zeros(VECTOR_SIZE, dtype=np.uint8)
vec[shared_indices] = 1
remaining = np.setdiff1d(np.arange(VECTOR_SIZE), shared_indices)
if len(remaining) >= n_unique:
unique_indices = np.random.choice(remaining, size=n_unique, replace=False)
vec[unique_indices] = 1
vectors.append(vec)
return vectors
def generate_unrelated_vectors(n: int, base_sparsity: float = 0.05):
target_active = int(VECTOR_SIZE * base_sparsity)
vectors = []
for i in range(n):
indices = np.random.choice(VECTOR_SIZE, size=target_active, replace=False)
vec = np.zeros(VECTOR_SIZE, dtype=np.uint8)
vec[indices] = 1
vectors.append(vec)
return vectors
def generate_query_from_base(base: np.ndarray, noise: float = 0.1):
vec = base.copy()
active = np.where(vec)[0]
n_flip = max(1, int(len(active) * noise))
if n_flip > 0 and len(active) > 0:
to_off = np.random.choice(active, size=min(n_flip, len(active)), replace=False)
vec[to_off] = 0
inactive = np.where(vec == 0)[0]
if len(inactive) > 0:
to_on = np.random.choice(inactive, size=min(n_flip, len(inactive)), replace=False)
vec[to_on] = 1
return vec
def benchmark_learning(mle: MLESystem, n_concepts: int = 5, n_batches: int = 3):
"""Benchmark d'apprentissage et généralisation - version rapide."""
print("\n" + "="*70)
print("BENCHMARK: Learning Curve & Generalization")
print("="*70)
concepts = []
for i in range(n_concepts):
base = generate_related_vectors(1, relatedness=1.0)[0]
variants = generate_related_vectors(4, relatedness=0.7)
concepts.append((base, variants))
train_data = []
test_data = []
for base, variants in concepts:
for v in variants[:2]:
train_data.append(v)
for v in variants[2:]:
test_data.append(v)
for _ in range(2):
train_data.append(generate_query_from_base(base, noise=0.15))
for _ in range(2):
test_data.append(generate_query_from_base(base, noise=0.30))
np.random.shuffle(train_data)
np.random.shuffle(test_data)
results = []
batch_size = max(1, len(train_data) // n_batches)
for batch_idx in range(n_batches):
start = batch_idx * batch_size
end = min(start + batch_size, len(train_data))
batch = train_data[start:end]
print(f"\n--- Batch {batch_idx + 1}/{n_batches} ({len(batch)} vectors) ---")
energies = []
for i, vec in enumerate(batch):
result = mle.process(vec)
if result.energy_trajectory:
energies.append(result.energy_trajectory[-1])
avg_train = np.mean(energies) if energies else 0
print(f" Train energy: {avg_train:.0f} (n={len(energies)})")
# Test rapide
test_energies = []
for vec in test_data[:5]:
result = mle.process(vec)
if result.energy_trajectory:
test_energies.append(result.energy_trajectory[-1])
avg_test = np.mean(test_energies) if test_energies else 0
print(f" Test energy: {avg_test:.0f} (n={len(test_energies)})")
print(f" Memory size: {mle.memory.size}")
results.append({
'batch': batch_idx + 1,
'train_avg_energy': float(avg_train),
'test_avg_energy': float(avg_test),
'memory_size': mle.memory.size,
'n_associations': len(mle.energy.associations),
})
return results
def benchmark_stability(mle: MLESystem, n_iterations: int = 50):
"""Test de stabilité - rapide."""
print("\n" + "="*70)
print("BENCHMARK: Stability Test")
print("="*70)
base_vectors = generate_unrelated_vectors(5)
energies = []
for i in range(n_iterations):
base = base_vectors[i % len(base_vectors)]
vec = generate_query_from_base(base, noise=0.20)
result = mle.process(vec)
if result.energy_trajectory:
energies.append(result.energy_trajectory[-1])
if i % 10 == 0:
recent = np.mean(energies[-10:]) if len(energies) >= 10 else (np.mean(energies) if energies else 0)
print(f" [{i:3d}] energy={recent:.0f} memory={mle.memory.size}")
if len(energies) > 20:
early = np.mean(energies[:10])
late = np.mean(energies[-10:])
print(f"\n Early energy: {early:.0f}")
print(f" Late energy: {late:.0f}")
if late < early * 0.9:
print(" ✓ Energy DECREASED with experience")
elif late < early * 1.1:
print(" ✓ Energy STABLE")
else:
print(" ⚠ Energy INCREASED")
return {'early_energy': float(np.mean(energies[:10])) if len(energies) > 10 else 0,
'late_energy': float(np.mean(energies[-10:])) if len(energies) > 10 else 0}
def benchmark_binding(mle: MLESystem, n_trials: int = 10):
"""Test de binding/unbinding - rapide."""
print("\n" + "="*70)
print("BENCHMARK: Binding & Composition")
print("="*70)
roles = generate_unrelated_vectors(3)
fillers = generate_unrelated_vectors(3)
successes = 0
for trial in range(n_trials):
role_idx = trial % 3
filler_idx = (trial + 1) % 3
bound = mle.binder.bind_role_filler(roles[role_idx], fillers[filler_idx])
recovered = mle.binder.unbind_role_filler(bound, roles[role_idx])
similarity = np.mean(recovered == fillers[filler_idx])
if similarity > 0.6:
successes += 1
accuracy = successes / n_trials
print(f" Binding accuracy: {successes}/{n_trials} ({accuracy:.1%})")
return {'binding_accuracy': accuracy}
def main():
print("="*70)
print("MLE SYSTEM COMPREHENSIVE BENCHMARK")
print("="*70)
mle = MLESystem(
memory_capacity=2000,
online_learning=True,
temperature=0.5,
)
learning_results = benchmark_learning(mle)
stability_results = benchmark_stability(mle)
binding_results = benchmark_binding(mle)
print("\n" + "="*70)
print("FINAL SUMMARY")
print("="*70)
mle.print_summary()
all_results = {
'learning_curve': learning_results,
'stability': stability_results,
'binding': binding_results,
}
with open("benchmark_results.json", "w") as f:
json.dump(all_results, f, indent=2, default=float)
print("\n✓ Benchmark complete!")
return all_results
if __name__ == "__main__":
results = main()