MLE-Morpho-Logic-Engine / mle /tests /test_full_system.py
Harry00's picture
feat: complete MLE engine implementation
ebaf2ce verified
"""
MLE Comprehensive Test Suite
===============================
Tests covering:
1. SIMD operations correctness & performance
2. Memory storage & retrieval
3. LSH indexing quality
4. Routing latency & scalability
5. Binding operations (binary & HRR)
6. Energy convergence
7. Reasoning capabilities (association, analogy, composition)
8. End-to-end integration
"""
import numpy as np
import time
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from mle.utils.simd_ops import (
N_BITS, N_WORDS,
hamming_distance, hamming_batch, hamming_topk,
hamming_similarity, xor_vectors, popcount,
majority_vote, random_binary_vector, random_binary_vectors,
normalize_density, get_native_lib
)
from mle.memory.sparse_address_table import SparseAddressTable, HammingLSH
from mle.routing.recursive_jit_router import RecursiveJITRouter
from mle.binding.semantic_binding import HRRBinding, BinaryBinding, BindingEngine
from mle.energy.energy_model import EnergyFunction, RelaxationDynamics, HopfieldDynamics, EnergyModel
from mle.inference.reasoning_engine import ReasoningEngine
def header(title):
print(f"\n{'='*70}")
print(f" {title}")
print(f"{'='*70}")
def check(condition, message):
status = "βœ“" if condition else "βœ—"
print(f" [{status}] {message}")
return condition
# ══════════════════════════════════════════════════════════════════════════════
# 1. SIMD OPERATIONS
# ══════════════════════════════════════════════════════════════════════════════
def test_simd_operations():
header("1. SIMD Operations")
all_pass = True
np.random.seed(42)
# Check native lib
lib = get_native_lib()
all_pass &= check(lib is not None, f"Native SIMD library compiled: {lib is not None}")
# Basic Hamming distance
a = random_binary_vector()
b = random_binary_vector()
dist = hamming_distance(a, b)
all_pass &= check(
1800 < dist < 2200,
f"Random vector Hamming distance β‰ˆ N/2: {dist} (expected ~2048)"
)
# Self-distance = 0
all_pass &= check(
hamming_distance(a, a) == 0,
"Self-distance = 0"
)
# XOR identity: dist(a, aβŠ•b) should relate to popcount(b)
xor_ab = xor_vectors(a, b)
d1 = hamming_distance(a, xor_ab)
d2 = popcount(b)
# d1 should equal popcount(a XOR (a XOR b)) = popcount(b)
all_pass &= check(
d1 == d2,
f"XOR identity: dist(a, aβŠ•b) = popcount(b): {d1} == {d2}"
)
# Batch Hamming distance
corpus = random_binary_vectors(1000)
dists = hamming_batch(a, corpus)
all_pass &= check(
dists.shape == (1000,),
f"Batch Hamming shape: {dists.shape}"
)
all_pass &= check(
np.all(dists >= 0) and np.all(dists <= N_BITS),
f"Batch Hamming range: [{dists.min()}, {dists.max()}]"
)
# Top-K
indices, distances = hamming_topk(a, corpus, k=10)
all_pass &= check(
len(indices) == 10,
f"Top-10 returned: {len(indices)}"
)
all_pass &= check(
np.all(np.diff(distances) >= 0),
f"Top-K sorted ascending: {distances[:5]}..."
)
# Verify top-K correctness against full sort
full_sort_idx = np.argsort(dists)[:10]
full_sort_dist = dists[full_sort_idx]
all_pass &= check(
np.array_equal(distances, full_sort_dist),
f"Top-K matches full sort: {np.array_equal(distances, full_sort_dist)}"
)
# Majority vote
vecs = random_binary_vectors(5)
mv = majority_vote(np.ascontiguousarray(vecs))
all_pass &= check(
mv.shape == (N_WORDS,) and mv.dtype == np.uint64,
f"Majority vote shape/dtype: {mv.shape}, {mv.dtype}"
)
# Normalize density
v = random_binary_vector()
v_norm = normalize_density(v, 0.5)
actual_density = popcount(v_norm) / N_BITS
all_pass &= check(
abs(actual_density - 0.5) < 0.01,
f"Density normalization: {actual_density:.4f} (target 0.5)"
)
# ── Performance benchmark ──
print()
corpus_sizes = [1_000, 10_000, 100_000]
for n in corpus_sizes:
corpus = random_binary_vectors(n)
query = random_binary_vector()
# Batch Hamming
t0 = time.perf_counter()
for _ in range(10):
hamming_batch(query, corpus)
elapsed = (time.perf_counter() - t0) / 10 * 1000
throughput = n / elapsed * 1000
print(f" ⏱ Batch Hamming ({n:>7d} vecs): {elapsed:>7.2f} ms"
f" ({throughput/1e6:.1f}M vecs/s)")
# Top-500
t0 = time.perf_counter()
for _ in range(10):
hamming_topk(query, corpus, k=500)
elapsed = (time.perf_counter() - t0) / 10 * 1000
print(f" ⏱ Top-500 ({n:>7d} vecs): {elapsed:>7.2f} ms")
return all_pass
# ══════════════════════════════════════════════════════════════════════════════
# 2. MEMORY & LSH
# ══════════════════════════════════════════════════════════════════════════════
def test_memory_and_lsh():
header("2. Memory & LSH Indexing")
all_pass = True
np.random.seed(42)
# Create memory
mem = SparseAddressTable(capacity=10000, lsh_tables=16, lsh_projections=24)
all_pass &= check(mem.size == 0, f"Empty memory: size={mem.size}")
# Store concepts
n_concepts = 5000
addresses = random_binary_vectors(n_concepts)
contents = random_binary_vectors(n_concepts)
t0 = time.perf_counter()
for i in range(n_concepts):
mem.store(addresses[i], contents[i],
metadata={'name': f'concept_{i}', 'index': i})
store_time = (time.perf_counter() - t0) * 1000
all_pass &= check(
mem.size == n_concepts,
f"Stored {n_concepts} concepts in {store_time:.1f}ms"
)
# Exact search
query = addresses[42].copy()
results = mem.query_nearest(query, k=5, use_lsh=False)
all_pass &= check(
results[0][0] == 42 and results[0][1] == 0,
f"Exact retrieval: found correct entry (dist=0)"
)
# LSH search
results_lsh = mem.query_nearest(query, k=5, use_lsh=True)
found_exact = any(idx == 42 for idx, _ in results_lsh)
all_pass &= check(
found_exact,
f"LSH retrieval: found exact match in top-5"
)
# Near-duplicate search
near = addresses[42].copy()
bits = np.unpackbits(near.view(np.uint8))
# Flip 50 random bits (~1.2% difference)
flip_pos = np.random.choice(N_BITS, 50, replace=False)
bits[flip_pos] ^= 1
near_modified = np.packbits(bits).view(np.uint64).copy()
results_near = mem.query_nearest(near_modified, k=10, use_lsh=True)
all_pass &= check(
results_near[0][1] <= 100,
f"Near-duplicate found: best distance = {results_near[0][1]} (flipped 50 bits)"
)
# Named concept
cat_idx = mem.store_concept("cat", metadata={'category': 'animal'})
retrieved = mem.get_by_name("cat")
all_pass &= check(
retrieved is not None,
f"Named concept 'cat' stored and retrieved"
)
# Activation
mem.activate(np.array([0, 1, 2]), np.array([0.9, 0.5, 0.3]))
active = mem.get_active(threshold=0.4)
all_pass &= check(
len(active) == 2,
f"Activation: {len(active)} entries above threshold 0.4"
)
mem.decay_activations(0.5)
active_after = mem.get_active(threshold=0.4)
all_pass &= check(
len(active_after) == 1,
f"After decay: {len(active_after)} entries above threshold 0.4"
)
# Stats
stats = mem.stats()
all_pass &= check(
stats['size'] == n_concepts + 1,
f"Memory stats: {stats}"
)
# ── LSH Recall benchmark ──
# Test with near-duplicates (meaningful LSH scenario)
# Create clusters: for 100 base vectors, create 5 near-duplicates each (50 bits flipped)
print()
mem2 = SparseAddressTable(capacity=2000, lsh_tables=32, lsh_projections=8)
base_vecs = random_binary_vectors(100)
cluster_map = {} # idx -> cluster_id
next_idx = 0
for cid in range(100):
mem2.store(base_vecs[cid], base_vecs[cid])
cluster_map[next_idx] = cid
next_idx += 1
for _ in range(5):
bits = np.unpackbits(base_vecs[cid].view(np.uint8)).copy()
flips = np.random.choice(N_BITS, 100, replace=False)
bits[flips] ^= 1
variant = np.packbits(bits).view(np.uint64).copy()
mem2.store(variant, variant)
cluster_map[next_idx] = cid
next_idx += 1
# For each base vector, check if LSH finds its cluster members
recall_tests = 100
total_recall = 0
for cid in range(recall_tests):
query = base_vecs[cid]
lsh_results = mem2.query_nearest(query, k=10, use_lsh=True)
# Count how many results are from the same cluster
lsh_ids = [idx for idx, _ in lsh_results]
same_cluster = sum(1 for idx in lsh_ids if cluster_map.get(idx) == cid)
# Each cluster has 6 members; top-10 should find most
total_recall += same_cluster / min(6, 10)
avg_recall = total_recall / recall_tests
all_pass &= check(
avg_recall > 0.3,
f"LSH Cluster Recall@10: {avg_recall:.2%} (near-duplicates, 100 clusters)"
)
# Also verify that exact self-lookup always works via LSH
exact_recall = 0
for cid in range(recall_tests):
query = base_vecs[cid]
lsh_results = mem2.query_nearest(query, k=1, use_lsh=True)
if lsh_results and lsh_results[0][1] == 0:
exact_recall += 1
all_pass &= check(
exact_recall == recall_tests,
f"LSH Exact self-lookup: {exact_recall}/{recall_tests}"
)
return all_pass
# ══════════════════════════════════════════════════════════════════════════════
# 3. ROUTING
# ══════════════════════════════════════════════════════════════════════════════
def test_routing():
header("3. Recursive JIT Routing")
all_pass = True
np.random.seed(42)
# Build memory with 10K entries
mem = SparseAddressTable(capacity=20000)
n = 10000
addresses = random_binary_vectors(n)
contents = random_binary_vectors(n)
for i in range(n):
mem.store(addresses[i], contents[i], metadata={'name': f'v_{i}'})
router = RecursiveJITRouter(
memory=mem,
beam_width=500,
max_depth=3,
expansion_factor=5,
)
# Basic routing
query = addresses[100].copy()
result = router.route(query)
all_pass &= check(
len(result.indices) > 0,
f"Routing returned {len(result.indices)} results"
)
all_pass &= check(
result.distances[0] == 0,
f"Exact match found at distance 0"
)
all_pass &= check(
result.latency_ms < 1000,
f"Routing latency: {result.latency_ms:.1f}ms (target < 1000ms)"
)
# Random query routing
random_q = random_binary_vector()
result_rnd = router.route(random_q)
all_pass &= check(
len(result_rnd.indices) == 500,
f"Beam width respected: {len(result_rnd.indices)} (target 500)"
)
all_pass &= check(
np.all(np.diff(result_rnd.distances) >= 0),
"Results sorted by distance"
)
# Beam convergence (distances should decrease across depth)
all_pass &= check(
len(result_rnd.beam_history) > 0,
f"Beam history recorded: {len(result_rnd.beam_history)} depths, "
f"means={[f'{m:.0f}' for m in result_rnd.beam_history]}"
)
# Route and activate
result_act = router.route_and_activate(random_q)
active = mem.get_active(threshold=0.001)
all_pass &= check(
len(active) > 0,
f"Route-and-activate: {len(active)} entries activated"
)
# Multi-hop routing
results_multi = router.multi_hop_route(random_q, hops=2)
all_pass &= check(
len(results_multi) == 2,
f"Multi-hop routing: {len(results_multi)} hops completed"
)
# ── Scalability benchmark ──
print()
for n_test in [1_000, 10_000, 50_000]:
mem_test = SparseAddressTable(capacity=n_test + 1000)
addrs = random_binary_vectors(n_test)
conts = random_binary_vectors(n_test)
for i in range(n_test):
mem_test.store(addrs[i], conts[i])
r_test = RecursiveJITRouter(mem_test, beam_width=500, max_depth=3)
latencies = []
for _ in range(10):
q = random_binary_vector()
res = r_test.route(q)
latencies.append(res.latency_ms)
avg_lat = np.mean(latencies)
p99_lat = np.percentile(latencies, 99)
print(f" ⏱ Routing ({n_test:>6d} entries): "
f"avg={avg_lat:.1f}ms, p99={p99_lat:.1f}ms, "
f"explored={res.candidates_explored}")
return all_pass
# ══════════════════════════════════════════════════════════════════════════════
# 4. BINDING OPERATIONS
# ══════════════════════════════════════════════════════════════════════════════
def test_binding():
header("4. Binding Operations")
all_pass = True
np.random.seed(42)
# ── Binary binding (BSC) ──
print(" --- Binary Binding (BSC/XOR) ---")
a = random_binary_vector()
b = random_binary_vector()
# Bind + unbind = identity
bound = BinaryBinding.bind(a, b)
recovered = BinaryBinding.unbind(bound, b)
all_pass &= check(
hamming_distance(a, recovered) == 0,
"XOR bind+unbind = exact recovery"
)
# Bound is quasi-orthogonal to inputs
sim_ab = hamming_similarity(bound, a)
sim_bb = hamming_similarity(bound, b)
all_pass &= check(
abs(sim_ab - 0.5) < 0.05 and abs(sim_bb - 0.5) < 0.05,
f"Bound quasi-orthogonal to inputs: sim(C,A)={sim_ab:.3f}, sim(C,B)={sim_bb:.3f}"
)
# Bundle (majority vote)
c = random_binary_vector()
bundled = BinaryBinding.bundle(a, b, c)
sim_a = hamming_similarity(bundled, a)
sim_b = hamming_similarity(bundled, b)
sim_c = hamming_similarity(bundled, c)
all_pass &= check(
sim_a > 0.55 and sim_b > 0.55 and sim_c > 0.55,
f"Bundle preserves similarity: {sim_a:.3f}, {sim_b:.3f}, {sim_c:.3f}"
)
# Permutation
perm_a = BinaryBinding.permute(a, 1)
inv_perm_a = BinaryBinding.inverse_permute(perm_a, 1)
all_pass &= check(
hamming_distance(a, inv_perm_a) == 0,
"Permutation + inverse = identity"
)
all_pass &= check(
hamming_similarity(a, perm_a) < 0.55,
f"Permuted is dissimilar: sim={hamming_similarity(a, perm_a):.3f}"
)
# Triple encoding
s, r, o = random_binary_vector(), random_binary_vector(), random_binary_vector()
triple = BinaryBinding.encode_triple(s, r, o)
# Decode object: unbind(unbind(triple, s), r)
decoded_o = BinaryBinding.unbind(BinaryBinding.unbind(triple, s), r)
all_pass &= check(
hamming_distance(o, decoded_o) == 0,
"Triple encode/decode: exact recovery of object"
)
# ── HRR binding (circular convolution) ──
print(" --- HRR Binding (Circular Convolution) ---")
dim = 4096
ha = HRRBinding.random_vector(dim)
hb = HRRBinding.random_vector(dim)
# Bind + unbind β‰ˆ identity (approximate for HRR)
hbound = HRRBinding.bind(ha, hb)
hrecovered = HRRBinding.unbind(hbound, hb)
hrr_sim = HRRBinding.similarity(ha, hrecovered)
all_pass &= check(
hrr_sim > 0.3,
f"HRR bind+unbind similarity: {hrr_sim:.3f} (should be >> 0, indicating recovery)"
)
# Bound is quasi-orthogonal
hrr_orth = HRRBinding.similarity(hbound, ha)
all_pass &= check(
abs(hrr_orth) < 0.2,
f"HRR bound quasi-orthogonal: sim={hrr_orth:.3f}"
)
# Bundle preserves components
hc = HRRBinding.random_vector(dim)
hbundled = HRRBinding.bundle(ha, hb, hc)
all_pass &= check(
HRRBinding.similarity(hbundled, ha) > 0.2,
f"HRR bundle preserves components: sim={HRRBinding.similarity(hbundled, ha):.3f}"
)
# ── Binding Engine ──
print(" --- Binding Engine ---")
engine = BindingEngine(use_binary=True)
engine.register_concept("king")
engine.register_concept("queen")
engine.register_concept("man")
engine.register_concept("woman")
sim_kk = engine.similarity(engine.get_concept("king"), engine.get_concept("king"))
sim_kq = engine.similarity(engine.get_concept("king"), engine.get_concept("queen"))
all_pass &= check(
sim_kk == 1.0,
f"Self-similarity = 1.0: {sim_kk}"
)
all_pass &= check(
abs(sim_kq - 0.5) < 0.05,
f"Random concept similarity β‰ˆ 0.5: {sim_kq:.3f}"
)
# ── Performance ──
print()
n_ops = 10000
t0 = time.perf_counter()
for _ in range(n_ops):
BinaryBinding.bind(a, b)
elapsed = (time.perf_counter() - t0) * 1000
print(f" ⏱ Binary bind: {n_ops} ops in {elapsed:.1f}ms "
f"({n_ops/elapsed*1000:.0f} ops/s)")
t0 = time.perf_counter()
for _ in range(n_ops):
HRRBinding.bind(ha, hb)
elapsed = (time.perf_counter() - t0) * 1000
print(f" ⏱ HRR bind: {n_ops} ops in {elapsed:.1f}ms "
f"({n_ops/elapsed*1000:.0f} ops/s)")
return all_pass
# ══════════════════════════════════════════════════════════════════════════════
# 5. ENERGY & CONVERGENCE
# ══════════════════════════════════════════════════════════════════════════════
def test_energy_convergence():
header("5. Energy Model & Convergence")
all_pass = True
np.random.seed(42)
# Create some context vectors
n_context = 20
context = random_binary_vectors(n_context)
activations = np.random.dirichlet(np.ones(n_context))
# ── Energy function ──
efn = EnergyFunction(alpha=1.0, beta=0.5, gamma=0.1, delta=0.05)
# Random state should have moderate energy
state = random_binary_vector()
e = efn.total_energy(state, context, activations)
all_pass &= check(
'total' in e and 'compatibility' in e,
f"Energy components computed: {list(e.keys())}"
)
all_pass &= check(
isinstance(e['total'], float),
f"Total energy: {e['total']:.4f}"
)
# ── Binary relaxation ──
print(" --- Binary Relaxation ---")
dynamics = RelaxationDynamics(
efn, max_iterations=30, n_candidates=16, flip_fraction=0.05
)
result = dynamics.relax(state, context, activations)
initial_e = result['trajectory'][0]['total']
final_e = result['final_energy']
all_pass &= check(
final_e <= initial_e + 0.01, # allow tiny float imprecision
f"Energy decreased: {initial_e:.4f} β†’ {final_e:.4f} "
f"(Ξ” = {initial_e - final_e:.4f})"
)
all_pass &= check(
result['iterations'] > 0,
f"Iterations: {result['iterations']}"
)
# Check trajectory is generally decreasing
traj_energies = [t['total'] for t in result['trajectory']]
decreasing_steps = sum(1 for i in range(1, len(traj_energies))
if traj_energies[i] <= traj_energies[i-1] + 0.001)
pct_decreasing = decreasing_steps / max(len(traj_energies) - 1, 1)
all_pass &= check(
pct_decreasing > 0.5,
f"Trajectory mostly decreasing: {pct_decreasing:.0%}"
)
# ── Hopfield relaxation ──
print(" --- Hopfield Dynamics ---")
hopfield = HopfieldDynamics(beta=8.0, max_iterations=20)
h_result = hopfield.relax(state, context, activations)
h_traj = h_result['energy_trajectory']
all_pass &= check(
len(h_traj) > 1,
f"Hopfield trajectory: {len(h_traj)} steps"
)
all_pass &= check(
h_traj[-1] <= h_traj[0] + 0.01,
f"Hopfield energy decreased: {h_traj[0]:.4f} β†’ {h_traj[-1]:.4f}"
)
# Attention should be concentrated
att = h_result.get('attention_weights')
if att is not None:
max_att = att.max()
all_pass &= check(
max_att > 1.0 / n_context,
f"Hopfield attention concentrated: max={max_att:.4f} (uniform={1/n_context:.4f})"
)
# ── Hybrid model ──
print(" --- Hybrid Energy Model ---")
model = EnergyModel(mode='hybrid')
hybrid_result = model.minimize(state, context, activations)
all_pass &= check(
'final_state' in hybrid_result,
f"Hybrid model produced final state"
)
all_pass &= check(
hybrid_result['converged'] or hybrid_result['total_iterations'] > 0,
f"Hybrid: {hybrid_result['total_iterations']} total iterations, "
f"converged={hybrid_result['converged']}"
)
return all_pass
# ══════════════════════════════════════════════════════════════════════════════
# 6. REASONING CAPABILITIES
# ══════════════════════════════════════════════════════════════════════════════
def test_reasoning():
header("6. Reasoning Capabilities")
all_pass = True
np.random.seed(42)
engine = ReasoningEngine(
beam_width=200,
max_routing_depth=2,
max_reasoning_steps=3,
energy_mode='hybrid',
relaxation_iterations=20,
)
# ── Build knowledge base ──
print(" --- Building Knowledge Base ---")
concepts = [
"cat", "dog", "animal", "pet",
"fish", "water", "ocean", "river",
"bird", "sky", "wing", "fly",
"car", "road", "wheel", "engine",
"tree", "leaf", "forest", "green",
"sun", "moon", "star", "night",
"king", "queen", "prince", "princess",
"man", "woman", "child", "person",
]
for c in concepts:
engine.add_concept(c)
relations = [
("cat", "is_a", "animal"),
("dog", "is_a", "animal"),
("cat", "is_a", "pet"),
("dog", "is_a", "pet"),
("fish", "lives_in", "water"),
("fish", "is_a", "animal"),
("bird", "has", "wing"),
("bird", "can", "fly"),
("bird", "is_a", "animal"),
("car", "has", "wheel"),
("car", "on", "road"),
("tree", "has", "leaf"),
("tree", "in", "forest"),
("leaf", "is", "green"),
("king", "is_a", "man"),
("queen", "is_a", "woman"),
("prince", "is_a", "man"),
("princess", "is_a", "woman"),
("king", "married_to", "queen"),
("sun", "in", "sky"),
("moon", "in", "sky"),
("star", "in", "sky"),
]
for s, r, o in relations:
engine.add_relation(s, r, o)
stats = engine.stats()
all_pass &= check(
stats['codebook_size'] >= len(concepts),
f"Knowledge base: {stats['codebook_size']} concepts, "
f"{stats['memory']['size']} memory entries"
)
# ── Test 1: Association ──
print(" --- Association ---")
assoc_cat = engine.associate("cat", top_k=10)
all_pass &= check(
len(assoc_cat) > 0,
f"Association for 'cat': {len(assoc_cat)} results"
)
if assoc_cat:
print(f" Top associations: {assoc_cat[:5]}")
# ── Test 2: Concept Query ──
print(" --- Concept Query ---")
result = engine.reason("cat", max_steps=2)
all_pass &= check(
result['response'] is not None,
f"Reasoning on 'cat': {result['num_steps']} steps, "
f"{result['latency_ms']:.1f}ms"
)
if result['response']['nearest_concepts']:
top_concept = result['response']['nearest_concepts'][0]
print(f" Nearest concept: {top_concept[0]} (sim={top_concept[1]:.3f})")
# ── Test 3: Energy convergence during reasoning ──
print(" --- Energy Convergence ---")
energies = [s.energy for s in result['reasoning_chain'] if s.energy != float('inf')]
if len(energies) >= 2:
all_pass &= check(
energies[-1] <= energies[0] + 0.01,
f"Energy decreased during reasoning: {energies[0]:.4f} β†’ {energies[-1]:.4f}"
)
print(f" Energy trajectory: {[f'{e:.4f}' for e in energies]}")
# ── Test 4: Analogy ──
print(" --- Analogy ---")
analogy_result = engine.solve_analogy("king", "man", "queen")
all_pass &= check(
analogy_result is not None,
f"Analogy 'king:man :: queen:?': completed in {analogy_result['latency_ms']:.1f}ms"
)
if analogy_result['codebook_ranking']:
top_answer = analogy_result['codebook_ranking'][0]
print(f" Top answer: {top_answer[0]} (sim={top_answer[1]:.3f})")
top_5 = [(n, f"{s:.3f}") for n, s in analogy_result['codebook_ranking'][:5]]
print(f" Top-5: {top_5}")
# ── Test 5: Composition ──
print(" --- Composition ---")
comp_result = engine.compose("water", "animal")
all_pass &= check(
comp_result is not None,
f"Composition 'water + animal': {comp_result['latency_ms']:.1f}ms"
)
if comp_result['response']['nearest_concepts']:
top = comp_result['response']['nearest_concepts'][:5]
print(f" Nearest to 'water+animal': {[(n, f'{s:.3f}') for n, s in top]}")
# ── Test 6: Structured query ──
print(" --- Structured Query ---")
struct_result = engine.reason(
{"subject": "bird", "relation": "can"},
max_steps=2,
roles=["subject", "relation"]
)
all_pass &= check(
struct_result is not None,
f"Structured query completed: {struct_result['latency_ms']:.1f}ms"
)
if struct_result['response'].get('role_fillers'):
for role, fillers in struct_result['response']['role_fillers'].items():
print(f" Role '{role}': {fillers[:3]}")
# ── Test 7: Multi-step reasoning convergence ──
print(" --- Multi-step Convergence ---")
deep_result = engine.reason("forest", max_steps=5)
chain = deep_result['reasoning_chain']
all_pass &= check(
len(chain) > 0,
f"Multi-step reasoning: {len(chain)} steps, {deep_result['latency_ms']:.1f}ms"
)
step_energies = [s.energy for s in chain if s.energy != float('inf')]
if step_energies:
print(f" Step energies: {[f'{e:.4f}' for e in step_energies]}")
return all_pass
# ══════════════════════════════════════════════════════════════════════════════
# 7. END-TO-END INTEGRATION
# ══════════════════════════════════════════════════════════════════════════════
def test_integration():
header("7. End-to-End Integration")
all_pass = True
np.random.seed(42)
# Build a larger knowledge base
engine = ReasoningEngine(
beam_width=500,
max_routing_depth=3,
max_reasoning_steps=3,
energy_mode='hybrid',
)
# Create 1000 random concepts with some structure
n_base = 500
categories = ["animal", "plant", "vehicle", "tool", "place"]
for cat in categories:
engine.add_concept(cat)
for i in range(n_base):
name = f"concept_{i}"
engine.add_concept(name)
cat = categories[i % len(categories)]
engine.add_relation(name, "is_a", cat)
stats = engine.stats()
print(f" Knowledge base: {stats}")
# Test full pipeline
t0 = time.perf_counter()
result = engine.reason("concept_42", max_steps=3)
total_ms = (time.perf_counter() - t0) * 1000
all_pass &= check(
result['response'] is not None,
f"Full pipeline completed in {total_ms:.1f}ms"
)
# Test batch queries
print()
print(" --- Batch Query Benchmark ---")
latencies = []
for i in range(50):
query = f"concept_{np.random.randint(n_base)}"
t0 = time.perf_counter()
r = engine.reason(query, max_steps=2)
latencies.append((time.perf_counter() - t0) * 1000)
avg_lat = np.mean(latencies)
p50_lat = np.percentile(latencies, 50)
p99_lat = np.percentile(latencies, 99)
all_pass &= check(
True,
f"50 queries: avg={avg_lat:.1f}ms, p50={p50_lat:.1f}ms, p99={p99_lat:.1f}ms"
)
# Memory efficiency
mem_stats = engine.memory.stats()
print(f" Memory usage: {mem_stats['memory_mb']:.2f} MB "
f"for {mem_stats['size']} entries")
bytes_per_entry = mem_stats['memory_mb'] * 1024 * 1024 / max(mem_stats['size'], 1)
all_pass &= check(
bytes_per_entry < 2048, # Should be ~1024 bytes (512 addr + 512 content)
f"Memory efficiency: {bytes_per_entry:.0f} bytes/entry (target ≀ 1024)"
)
return all_pass
# ══════════════════════════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════════════════════════
def main():
print("\n" + "β–ˆ" * 70)
print(" MLE β€” Morpho-Logic Engine β€” Comprehensive Test Suite")
print("β–ˆ" * 70)
results = {}
tests = [
("SIMD Operations", test_simd_operations),
("Memory & LSH", test_memory_and_lsh),
("Routing", test_routing),
("Binding", test_binding),
("Energy Convergence", test_energy_convergence),
("Reasoning", test_reasoning),
("Integration", test_integration),
]
for name, test_fn in tests:
try:
results[name] = test_fn()
except Exception as e:
print(f"\n βœ—βœ—βœ— {name} FAILED with exception: {e}")
import traceback
traceback.print_exc()
results[name] = False
# Summary
header("TEST SUMMARY")
total = len(results)
passed = sum(1 for v in results.values() if v)
for name, result in results.items():
status = "PASS βœ“" if result else "FAIL βœ—"
print(f" [{status}] {name}")
print(f"\n Total: {passed}/{total} test groups passed")
print("β–ˆ" * 70)
return 0 if passed == total else 1
if __name__ == '__main__':
exit(main())