| """ |
| MLE Comprehensive Test Suite |
| =============================== |
| Tests covering: |
| 1. SIMD operations correctness & performance |
| 2. Memory storage & retrieval |
| 3. LSH indexing quality |
| 4. Routing latency & scalability |
| 5. Binding operations (binary & HRR) |
| 6. Energy convergence |
| 7. Reasoning capabilities (association, analogy, composition) |
| 8. End-to-end integration |
| """ |
|
|
| import numpy as np |
| import time |
| import sys |
| import os |
|
|
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) |
|
|
| from mle.utils.simd_ops import ( |
| N_BITS, N_WORDS, |
| hamming_distance, hamming_batch, hamming_topk, |
| hamming_similarity, xor_vectors, popcount, |
| majority_vote, random_binary_vector, random_binary_vectors, |
| normalize_density, get_native_lib |
| ) |
| from mle.memory.sparse_address_table import SparseAddressTable, HammingLSH |
| from mle.routing.recursive_jit_router import RecursiveJITRouter |
| from mle.binding.semantic_binding import HRRBinding, BinaryBinding, BindingEngine |
| from mle.energy.energy_model import EnergyFunction, RelaxationDynamics, HopfieldDynamics, EnergyModel |
| from mle.inference.reasoning_engine import ReasoningEngine |
|
|
|
|
| def header(title): |
| print(f"\n{'='*70}") |
| print(f" {title}") |
| print(f"{'='*70}") |
|
|
|
|
| def check(condition, message): |
| status = "β" if condition else "β" |
| print(f" [{status}] {message}") |
| return condition |
|
|
|
|
| |
| |
| |
|
|
| def test_simd_operations(): |
| header("1. SIMD Operations") |
| all_pass = True |
| np.random.seed(42) |
|
|
| |
| lib = get_native_lib() |
| all_pass &= check(lib is not None, f"Native SIMD library compiled: {lib is not None}") |
|
|
| |
| a = random_binary_vector() |
| b = random_binary_vector() |
| dist = hamming_distance(a, b) |
| all_pass &= check( |
| 1800 < dist < 2200, |
| f"Random vector Hamming distance β N/2: {dist} (expected ~2048)" |
| ) |
|
|
| |
| all_pass &= check( |
| hamming_distance(a, a) == 0, |
| "Self-distance = 0" |
| ) |
|
|
| |
| xor_ab = xor_vectors(a, b) |
| d1 = hamming_distance(a, xor_ab) |
| d2 = popcount(b) |
| |
| all_pass &= check( |
| d1 == d2, |
| f"XOR identity: dist(a, aβb) = popcount(b): {d1} == {d2}" |
| ) |
|
|
| |
| corpus = random_binary_vectors(1000) |
| dists = hamming_batch(a, corpus) |
| all_pass &= check( |
| dists.shape == (1000,), |
| f"Batch Hamming shape: {dists.shape}" |
| ) |
| all_pass &= check( |
| np.all(dists >= 0) and np.all(dists <= N_BITS), |
| f"Batch Hamming range: [{dists.min()}, {dists.max()}]" |
| ) |
|
|
| |
| indices, distances = hamming_topk(a, corpus, k=10) |
| all_pass &= check( |
| len(indices) == 10, |
| f"Top-10 returned: {len(indices)}" |
| ) |
| all_pass &= check( |
| np.all(np.diff(distances) >= 0), |
| f"Top-K sorted ascending: {distances[:5]}..." |
| ) |
|
|
| |
| full_sort_idx = np.argsort(dists)[:10] |
| full_sort_dist = dists[full_sort_idx] |
| all_pass &= check( |
| np.array_equal(distances, full_sort_dist), |
| f"Top-K matches full sort: {np.array_equal(distances, full_sort_dist)}" |
| ) |
|
|
| |
| vecs = random_binary_vectors(5) |
| mv = majority_vote(np.ascontiguousarray(vecs)) |
| all_pass &= check( |
| mv.shape == (N_WORDS,) and mv.dtype == np.uint64, |
| f"Majority vote shape/dtype: {mv.shape}, {mv.dtype}" |
| ) |
|
|
| |
| v = random_binary_vector() |
| v_norm = normalize_density(v, 0.5) |
| actual_density = popcount(v_norm) / N_BITS |
| all_pass &= check( |
| abs(actual_density - 0.5) < 0.01, |
| f"Density normalization: {actual_density:.4f} (target 0.5)" |
| ) |
|
|
| |
| print() |
| corpus_sizes = [1_000, 10_000, 100_000] |
| for n in corpus_sizes: |
| corpus = random_binary_vectors(n) |
| query = random_binary_vector() |
|
|
| |
| t0 = time.perf_counter() |
| for _ in range(10): |
| hamming_batch(query, corpus) |
| elapsed = (time.perf_counter() - t0) / 10 * 1000 |
| throughput = n / elapsed * 1000 |
| print(f" β± Batch Hamming ({n:>7d} vecs): {elapsed:>7.2f} ms" |
| f" ({throughput/1e6:.1f}M vecs/s)") |
|
|
| |
| t0 = time.perf_counter() |
| for _ in range(10): |
| hamming_topk(query, corpus, k=500) |
| elapsed = (time.perf_counter() - t0) / 10 * 1000 |
| print(f" β± Top-500 ({n:>7d} vecs): {elapsed:>7.2f} ms") |
|
|
| return all_pass |
|
|
|
|
| |
| |
| |
|
|
| def test_memory_and_lsh(): |
| header("2. Memory & LSH Indexing") |
| all_pass = True |
| np.random.seed(42) |
|
|
| |
| mem = SparseAddressTable(capacity=10000, lsh_tables=16, lsh_projections=24) |
| all_pass &= check(mem.size == 0, f"Empty memory: size={mem.size}") |
|
|
| |
| n_concepts = 5000 |
| addresses = random_binary_vectors(n_concepts) |
| contents = random_binary_vectors(n_concepts) |
|
|
| t0 = time.perf_counter() |
| for i in range(n_concepts): |
| mem.store(addresses[i], contents[i], |
| metadata={'name': f'concept_{i}', 'index': i}) |
| store_time = (time.perf_counter() - t0) * 1000 |
|
|
| all_pass &= check( |
| mem.size == n_concepts, |
| f"Stored {n_concepts} concepts in {store_time:.1f}ms" |
| ) |
|
|
| |
| query = addresses[42].copy() |
| results = mem.query_nearest(query, k=5, use_lsh=False) |
| all_pass &= check( |
| results[0][0] == 42 and results[0][1] == 0, |
| f"Exact retrieval: found correct entry (dist=0)" |
| ) |
|
|
| |
| results_lsh = mem.query_nearest(query, k=5, use_lsh=True) |
| found_exact = any(idx == 42 for idx, _ in results_lsh) |
| all_pass &= check( |
| found_exact, |
| f"LSH retrieval: found exact match in top-5" |
| ) |
|
|
| |
| near = addresses[42].copy() |
| bits = np.unpackbits(near.view(np.uint8)) |
| |
| flip_pos = np.random.choice(N_BITS, 50, replace=False) |
| bits[flip_pos] ^= 1 |
| near_modified = np.packbits(bits).view(np.uint64).copy() |
|
|
| results_near = mem.query_nearest(near_modified, k=10, use_lsh=True) |
| all_pass &= check( |
| results_near[0][1] <= 100, |
| f"Near-duplicate found: best distance = {results_near[0][1]} (flipped 50 bits)" |
| ) |
|
|
| |
| cat_idx = mem.store_concept("cat", metadata={'category': 'animal'}) |
| retrieved = mem.get_by_name("cat") |
| all_pass &= check( |
| retrieved is not None, |
| f"Named concept 'cat' stored and retrieved" |
| ) |
|
|
| |
| mem.activate(np.array([0, 1, 2]), np.array([0.9, 0.5, 0.3])) |
| active = mem.get_active(threshold=0.4) |
| all_pass &= check( |
| len(active) == 2, |
| f"Activation: {len(active)} entries above threshold 0.4" |
| ) |
|
|
| mem.decay_activations(0.5) |
| active_after = mem.get_active(threshold=0.4) |
| all_pass &= check( |
| len(active_after) == 1, |
| f"After decay: {len(active_after)} entries above threshold 0.4" |
| ) |
|
|
| |
| stats = mem.stats() |
| all_pass &= check( |
| stats['size'] == n_concepts + 1, |
| f"Memory stats: {stats}" |
| ) |
|
|
| |
| |
| |
| print() |
| mem2 = SparseAddressTable(capacity=2000, lsh_tables=32, lsh_projections=8) |
| base_vecs = random_binary_vectors(100) |
| cluster_map = {} |
| next_idx = 0 |
| for cid in range(100): |
| mem2.store(base_vecs[cid], base_vecs[cid]) |
| cluster_map[next_idx] = cid |
| next_idx += 1 |
| for _ in range(5): |
| bits = np.unpackbits(base_vecs[cid].view(np.uint8)).copy() |
| flips = np.random.choice(N_BITS, 100, replace=False) |
| bits[flips] ^= 1 |
| variant = np.packbits(bits).view(np.uint64).copy() |
| mem2.store(variant, variant) |
| cluster_map[next_idx] = cid |
| next_idx += 1 |
|
|
| |
| recall_tests = 100 |
| total_recall = 0 |
| for cid in range(recall_tests): |
| query = base_vecs[cid] |
| lsh_results = mem2.query_nearest(query, k=10, use_lsh=True) |
| |
| lsh_ids = [idx for idx, _ in lsh_results] |
| same_cluster = sum(1 for idx in lsh_ids if cluster_map.get(idx) == cid) |
| |
| total_recall += same_cluster / min(6, 10) |
| avg_recall = total_recall / recall_tests |
| all_pass &= check( |
| avg_recall > 0.3, |
| f"LSH Cluster Recall@10: {avg_recall:.2%} (near-duplicates, 100 clusters)" |
| ) |
|
|
| |
| exact_recall = 0 |
| for cid in range(recall_tests): |
| query = base_vecs[cid] |
| lsh_results = mem2.query_nearest(query, k=1, use_lsh=True) |
| if lsh_results and lsh_results[0][1] == 0: |
| exact_recall += 1 |
| all_pass &= check( |
| exact_recall == recall_tests, |
| f"LSH Exact self-lookup: {exact_recall}/{recall_tests}" |
| ) |
|
|
| return all_pass |
|
|
|
|
| |
| |
| |
|
|
| def test_routing(): |
| header("3. Recursive JIT Routing") |
| all_pass = True |
| np.random.seed(42) |
|
|
| |
| mem = SparseAddressTable(capacity=20000) |
| n = 10000 |
| addresses = random_binary_vectors(n) |
| contents = random_binary_vectors(n) |
| for i in range(n): |
| mem.store(addresses[i], contents[i], metadata={'name': f'v_{i}'}) |
|
|
| router = RecursiveJITRouter( |
| memory=mem, |
| beam_width=500, |
| max_depth=3, |
| expansion_factor=5, |
| ) |
|
|
| |
| query = addresses[100].copy() |
| result = router.route(query) |
| all_pass &= check( |
| len(result.indices) > 0, |
| f"Routing returned {len(result.indices)} results" |
| ) |
| all_pass &= check( |
| result.distances[0] == 0, |
| f"Exact match found at distance 0" |
| ) |
| all_pass &= check( |
| result.latency_ms < 1000, |
| f"Routing latency: {result.latency_ms:.1f}ms (target < 1000ms)" |
| ) |
|
|
| |
| random_q = random_binary_vector() |
| result_rnd = router.route(random_q) |
| all_pass &= check( |
| len(result_rnd.indices) == 500, |
| f"Beam width respected: {len(result_rnd.indices)} (target 500)" |
| ) |
| all_pass &= check( |
| np.all(np.diff(result_rnd.distances) >= 0), |
| "Results sorted by distance" |
| ) |
|
|
| |
| all_pass &= check( |
| len(result_rnd.beam_history) > 0, |
| f"Beam history recorded: {len(result_rnd.beam_history)} depths, " |
| f"means={[f'{m:.0f}' for m in result_rnd.beam_history]}" |
| ) |
|
|
| |
| result_act = router.route_and_activate(random_q) |
| active = mem.get_active(threshold=0.001) |
| all_pass &= check( |
| len(active) > 0, |
| f"Route-and-activate: {len(active)} entries activated" |
| ) |
|
|
| |
| results_multi = router.multi_hop_route(random_q, hops=2) |
| all_pass &= check( |
| len(results_multi) == 2, |
| f"Multi-hop routing: {len(results_multi)} hops completed" |
| ) |
|
|
| |
| print() |
| for n_test in [1_000, 10_000, 50_000]: |
| mem_test = SparseAddressTable(capacity=n_test + 1000) |
| addrs = random_binary_vectors(n_test) |
| conts = random_binary_vectors(n_test) |
| for i in range(n_test): |
| mem_test.store(addrs[i], conts[i]) |
| r_test = RecursiveJITRouter(mem_test, beam_width=500, max_depth=3) |
|
|
| latencies = [] |
| for _ in range(10): |
| q = random_binary_vector() |
| res = r_test.route(q) |
| latencies.append(res.latency_ms) |
|
|
| avg_lat = np.mean(latencies) |
| p99_lat = np.percentile(latencies, 99) |
| print(f" β± Routing ({n_test:>6d} entries): " |
| f"avg={avg_lat:.1f}ms, p99={p99_lat:.1f}ms, " |
| f"explored={res.candidates_explored}") |
|
|
| return all_pass |
|
|
|
|
| |
| |
| |
|
|
| def test_binding(): |
| header("4. Binding Operations") |
| all_pass = True |
| np.random.seed(42) |
|
|
| |
| print(" --- Binary Binding (BSC/XOR) ---") |
| a = random_binary_vector() |
| b = random_binary_vector() |
|
|
| |
| bound = BinaryBinding.bind(a, b) |
| recovered = BinaryBinding.unbind(bound, b) |
| all_pass &= check( |
| hamming_distance(a, recovered) == 0, |
| "XOR bind+unbind = exact recovery" |
| ) |
|
|
| |
| sim_ab = hamming_similarity(bound, a) |
| sim_bb = hamming_similarity(bound, b) |
| all_pass &= check( |
| abs(sim_ab - 0.5) < 0.05 and abs(sim_bb - 0.5) < 0.05, |
| f"Bound quasi-orthogonal to inputs: sim(C,A)={sim_ab:.3f}, sim(C,B)={sim_bb:.3f}" |
| ) |
|
|
| |
| c = random_binary_vector() |
| bundled = BinaryBinding.bundle(a, b, c) |
| sim_a = hamming_similarity(bundled, a) |
| sim_b = hamming_similarity(bundled, b) |
| sim_c = hamming_similarity(bundled, c) |
| all_pass &= check( |
| sim_a > 0.55 and sim_b > 0.55 and sim_c > 0.55, |
| f"Bundle preserves similarity: {sim_a:.3f}, {sim_b:.3f}, {sim_c:.3f}" |
| ) |
|
|
| |
| perm_a = BinaryBinding.permute(a, 1) |
| inv_perm_a = BinaryBinding.inverse_permute(perm_a, 1) |
| all_pass &= check( |
| hamming_distance(a, inv_perm_a) == 0, |
| "Permutation + inverse = identity" |
| ) |
| all_pass &= check( |
| hamming_similarity(a, perm_a) < 0.55, |
| f"Permuted is dissimilar: sim={hamming_similarity(a, perm_a):.3f}" |
| ) |
|
|
| |
| s, r, o = random_binary_vector(), random_binary_vector(), random_binary_vector() |
| triple = BinaryBinding.encode_triple(s, r, o) |
| |
| decoded_o = BinaryBinding.unbind(BinaryBinding.unbind(triple, s), r) |
| all_pass &= check( |
| hamming_distance(o, decoded_o) == 0, |
| "Triple encode/decode: exact recovery of object" |
| ) |
|
|
| |
| print(" --- HRR Binding (Circular Convolution) ---") |
| dim = 4096 |
| ha = HRRBinding.random_vector(dim) |
| hb = HRRBinding.random_vector(dim) |
|
|
| |
| hbound = HRRBinding.bind(ha, hb) |
| hrecovered = HRRBinding.unbind(hbound, hb) |
| hrr_sim = HRRBinding.similarity(ha, hrecovered) |
| all_pass &= check( |
| hrr_sim > 0.3, |
| f"HRR bind+unbind similarity: {hrr_sim:.3f} (should be >> 0, indicating recovery)" |
| ) |
|
|
| |
| hrr_orth = HRRBinding.similarity(hbound, ha) |
| all_pass &= check( |
| abs(hrr_orth) < 0.2, |
| f"HRR bound quasi-orthogonal: sim={hrr_orth:.3f}" |
| ) |
|
|
| |
| hc = HRRBinding.random_vector(dim) |
| hbundled = HRRBinding.bundle(ha, hb, hc) |
| all_pass &= check( |
| HRRBinding.similarity(hbundled, ha) > 0.2, |
| f"HRR bundle preserves components: sim={HRRBinding.similarity(hbundled, ha):.3f}" |
| ) |
|
|
| |
| print(" --- Binding Engine ---") |
| engine = BindingEngine(use_binary=True) |
| engine.register_concept("king") |
| engine.register_concept("queen") |
| engine.register_concept("man") |
| engine.register_concept("woman") |
|
|
| sim_kk = engine.similarity(engine.get_concept("king"), engine.get_concept("king")) |
| sim_kq = engine.similarity(engine.get_concept("king"), engine.get_concept("queen")) |
| all_pass &= check( |
| sim_kk == 1.0, |
| f"Self-similarity = 1.0: {sim_kk}" |
| ) |
| all_pass &= check( |
| abs(sim_kq - 0.5) < 0.05, |
| f"Random concept similarity β 0.5: {sim_kq:.3f}" |
| ) |
|
|
| |
| print() |
| n_ops = 10000 |
| t0 = time.perf_counter() |
| for _ in range(n_ops): |
| BinaryBinding.bind(a, b) |
| elapsed = (time.perf_counter() - t0) * 1000 |
| print(f" β± Binary bind: {n_ops} ops in {elapsed:.1f}ms " |
| f"({n_ops/elapsed*1000:.0f} ops/s)") |
|
|
| t0 = time.perf_counter() |
| for _ in range(n_ops): |
| HRRBinding.bind(ha, hb) |
| elapsed = (time.perf_counter() - t0) * 1000 |
| print(f" β± HRR bind: {n_ops} ops in {elapsed:.1f}ms " |
| f"({n_ops/elapsed*1000:.0f} ops/s)") |
|
|
| return all_pass |
|
|
|
|
| |
| |
| |
|
|
| def test_energy_convergence(): |
| header("5. Energy Model & Convergence") |
| all_pass = True |
| np.random.seed(42) |
|
|
| |
| n_context = 20 |
| context = random_binary_vectors(n_context) |
| activations = np.random.dirichlet(np.ones(n_context)) |
|
|
| |
| efn = EnergyFunction(alpha=1.0, beta=0.5, gamma=0.1, delta=0.05) |
|
|
| |
| state = random_binary_vector() |
| e = efn.total_energy(state, context, activations) |
| all_pass &= check( |
| 'total' in e and 'compatibility' in e, |
| f"Energy components computed: {list(e.keys())}" |
| ) |
| all_pass &= check( |
| isinstance(e['total'], float), |
| f"Total energy: {e['total']:.4f}" |
| ) |
|
|
| |
| print(" --- Binary Relaxation ---") |
| dynamics = RelaxationDynamics( |
| efn, max_iterations=30, n_candidates=16, flip_fraction=0.05 |
| ) |
| result = dynamics.relax(state, context, activations) |
|
|
| initial_e = result['trajectory'][0]['total'] |
| final_e = result['final_energy'] |
| all_pass &= check( |
| final_e <= initial_e + 0.01, |
| f"Energy decreased: {initial_e:.4f} β {final_e:.4f} " |
| f"(Ξ = {initial_e - final_e:.4f})" |
| ) |
| all_pass &= check( |
| result['iterations'] > 0, |
| f"Iterations: {result['iterations']}" |
| ) |
|
|
| |
| traj_energies = [t['total'] for t in result['trajectory']] |
| decreasing_steps = sum(1 for i in range(1, len(traj_energies)) |
| if traj_energies[i] <= traj_energies[i-1] + 0.001) |
| pct_decreasing = decreasing_steps / max(len(traj_energies) - 1, 1) |
| all_pass &= check( |
| pct_decreasing > 0.5, |
| f"Trajectory mostly decreasing: {pct_decreasing:.0%}" |
| ) |
|
|
| |
| print(" --- Hopfield Dynamics ---") |
| hopfield = HopfieldDynamics(beta=8.0, max_iterations=20) |
| h_result = hopfield.relax(state, context, activations) |
|
|
| h_traj = h_result['energy_trajectory'] |
| all_pass &= check( |
| len(h_traj) > 1, |
| f"Hopfield trajectory: {len(h_traj)} steps" |
| ) |
| all_pass &= check( |
| h_traj[-1] <= h_traj[0] + 0.01, |
| f"Hopfield energy decreased: {h_traj[0]:.4f} β {h_traj[-1]:.4f}" |
| ) |
|
|
| |
| att = h_result.get('attention_weights') |
| if att is not None: |
| max_att = att.max() |
| all_pass &= check( |
| max_att > 1.0 / n_context, |
| f"Hopfield attention concentrated: max={max_att:.4f} (uniform={1/n_context:.4f})" |
| ) |
|
|
| |
| print(" --- Hybrid Energy Model ---") |
| model = EnergyModel(mode='hybrid') |
| hybrid_result = model.minimize(state, context, activations) |
| all_pass &= check( |
| 'final_state' in hybrid_result, |
| f"Hybrid model produced final state" |
| ) |
| all_pass &= check( |
| hybrid_result['converged'] or hybrid_result['total_iterations'] > 0, |
| f"Hybrid: {hybrid_result['total_iterations']} total iterations, " |
| f"converged={hybrid_result['converged']}" |
| ) |
|
|
| return all_pass |
|
|
|
|
| |
| |
| |
|
|
| def test_reasoning(): |
| header("6. Reasoning Capabilities") |
| all_pass = True |
| np.random.seed(42) |
|
|
| engine = ReasoningEngine( |
| beam_width=200, |
| max_routing_depth=2, |
| max_reasoning_steps=3, |
| energy_mode='hybrid', |
| relaxation_iterations=20, |
| ) |
|
|
| |
| print(" --- Building Knowledge Base ---") |
| concepts = [ |
| "cat", "dog", "animal", "pet", |
| "fish", "water", "ocean", "river", |
| "bird", "sky", "wing", "fly", |
| "car", "road", "wheel", "engine", |
| "tree", "leaf", "forest", "green", |
| "sun", "moon", "star", "night", |
| "king", "queen", "prince", "princess", |
| "man", "woman", "child", "person", |
| ] |
|
|
| for c in concepts: |
| engine.add_concept(c) |
|
|
| relations = [ |
| ("cat", "is_a", "animal"), |
| ("dog", "is_a", "animal"), |
| ("cat", "is_a", "pet"), |
| ("dog", "is_a", "pet"), |
| ("fish", "lives_in", "water"), |
| ("fish", "is_a", "animal"), |
| ("bird", "has", "wing"), |
| ("bird", "can", "fly"), |
| ("bird", "is_a", "animal"), |
| ("car", "has", "wheel"), |
| ("car", "on", "road"), |
| ("tree", "has", "leaf"), |
| ("tree", "in", "forest"), |
| ("leaf", "is", "green"), |
| ("king", "is_a", "man"), |
| ("queen", "is_a", "woman"), |
| ("prince", "is_a", "man"), |
| ("princess", "is_a", "woman"), |
| ("king", "married_to", "queen"), |
| ("sun", "in", "sky"), |
| ("moon", "in", "sky"), |
| ("star", "in", "sky"), |
| ] |
|
|
| for s, r, o in relations: |
| engine.add_relation(s, r, o) |
|
|
| stats = engine.stats() |
| all_pass &= check( |
| stats['codebook_size'] >= len(concepts), |
| f"Knowledge base: {stats['codebook_size']} concepts, " |
| f"{stats['memory']['size']} memory entries" |
| ) |
|
|
| |
| print(" --- Association ---") |
| assoc_cat = engine.associate("cat", top_k=10) |
| all_pass &= check( |
| len(assoc_cat) > 0, |
| f"Association for 'cat': {len(assoc_cat)} results" |
| ) |
| if assoc_cat: |
| print(f" Top associations: {assoc_cat[:5]}") |
|
|
| |
| print(" --- Concept Query ---") |
| result = engine.reason("cat", max_steps=2) |
| all_pass &= check( |
| result['response'] is not None, |
| f"Reasoning on 'cat': {result['num_steps']} steps, " |
| f"{result['latency_ms']:.1f}ms" |
| ) |
| if result['response']['nearest_concepts']: |
| top_concept = result['response']['nearest_concepts'][0] |
| print(f" Nearest concept: {top_concept[0]} (sim={top_concept[1]:.3f})") |
|
|
| |
| print(" --- Energy Convergence ---") |
| energies = [s.energy for s in result['reasoning_chain'] if s.energy != float('inf')] |
| if len(energies) >= 2: |
| all_pass &= check( |
| energies[-1] <= energies[0] + 0.01, |
| f"Energy decreased during reasoning: {energies[0]:.4f} β {energies[-1]:.4f}" |
| ) |
| print(f" Energy trajectory: {[f'{e:.4f}' for e in energies]}") |
|
|
| |
| print(" --- Analogy ---") |
| analogy_result = engine.solve_analogy("king", "man", "queen") |
| all_pass &= check( |
| analogy_result is not None, |
| f"Analogy 'king:man :: queen:?': completed in {analogy_result['latency_ms']:.1f}ms" |
| ) |
| if analogy_result['codebook_ranking']: |
| top_answer = analogy_result['codebook_ranking'][0] |
| print(f" Top answer: {top_answer[0]} (sim={top_answer[1]:.3f})") |
| top_5 = [(n, f"{s:.3f}") for n, s in analogy_result['codebook_ranking'][:5]] |
| print(f" Top-5: {top_5}") |
|
|
| |
| print(" --- Composition ---") |
| comp_result = engine.compose("water", "animal") |
| all_pass &= check( |
| comp_result is not None, |
| f"Composition 'water + animal': {comp_result['latency_ms']:.1f}ms" |
| ) |
| if comp_result['response']['nearest_concepts']: |
| top = comp_result['response']['nearest_concepts'][:5] |
| print(f" Nearest to 'water+animal': {[(n, f'{s:.3f}') for n, s in top]}") |
|
|
| |
| print(" --- Structured Query ---") |
| struct_result = engine.reason( |
| {"subject": "bird", "relation": "can"}, |
| max_steps=2, |
| roles=["subject", "relation"] |
| ) |
| all_pass &= check( |
| struct_result is not None, |
| f"Structured query completed: {struct_result['latency_ms']:.1f}ms" |
| ) |
| if struct_result['response'].get('role_fillers'): |
| for role, fillers in struct_result['response']['role_fillers'].items(): |
| print(f" Role '{role}': {fillers[:3]}") |
|
|
| |
| print(" --- Multi-step Convergence ---") |
| deep_result = engine.reason("forest", max_steps=5) |
| chain = deep_result['reasoning_chain'] |
| all_pass &= check( |
| len(chain) > 0, |
| f"Multi-step reasoning: {len(chain)} steps, {deep_result['latency_ms']:.1f}ms" |
| ) |
| step_energies = [s.energy for s in chain if s.energy != float('inf')] |
| if step_energies: |
| print(f" Step energies: {[f'{e:.4f}' for e in step_energies]}") |
|
|
| return all_pass |
|
|
|
|
| |
| |
| |
|
|
| def test_integration(): |
| header("7. End-to-End Integration") |
| all_pass = True |
| np.random.seed(42) |
|
|
| |
| engine = ReasoningEngine( |
| beam_width=500, |
| max_routing_depth=3, |
| max_reasoning_steps=3, |
| energy_mode='hybrid', |
| ) |
|
|
| |
| n_base = 500 |
| categories = ["animal", "plant", "vehicle", "tool", "place"] |
| for cat in categories: |
| engine.add_concept(cat) |
|
|
| for i in range(n_base): |
| name = f"concept_{i}" |
| engine.add_concept(name) |
| cat = categories[i % len(categories)] |
| engine.add_relation(name, "is_a", cat) |
|
|
| stats = engine.stats() |
| print(f" Knowledge base: {stats}") |
|
|
| |
| t0 = time.perf_counter() |
| result = engine.reason("concept_42", max_steps=3) |
| total_ms = (time.perf_counter() - t0) * 1000 |
|
|
| all_pass &= check( |
| result['response'] is not None, |
| f"Full pipeline completed in {total_ms:.1f}ms" |
| ) |
|
|
| |
| print() |
| print(" --- Batch Query Benchmark ---") |
| latencies = [] |
| for i in range(50): |
| query = f"concept_{np.random.randint(n_base)}" |
| t0 = time.perf_counter() |
| r = engine.reason(query, max_steps=2) |
| latencies.append((time.perf_counter() - t0) * 1000) |
|
|
| avg_lat = np.mean(latencies) |
| p50_lat = np.percentile(latencies, 50) |
| p99_lat = np.percentile(latencies, 99) |
| all_pass &= check( |
| True, |
| f"50 queries: avg={avg_lat:.1f}ms, p50={p50_lat:.1f}ms, p99={p99_lat:.1f}ms" |
| ) |
|
|
| |
| mem_stats = engine.memory.stats() |
| print(f" Memory usage: {mem_stats['memory_mb']:.2f} MB " |
| f"for {mem_stats['size']} entries") |
| bytes_per_entry = mem_stats['memory_mb'] * 1024 * 1024 / max(mem_stats['size'], 1) |
| all_pass &= check( |
| bytes_per_entry < 2048, |
| f"Memory efficiency: {bytes_per_entry:.0f} bytes/entry (target β€ 1024)" |
| ) |
|
|
| return all_pass |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| print("\n" + "β" * 70) |
| print(" MLE β Morpho-Logic Engine β Comprehensive Test Suite") |
| print("β" * 70) |
|
|
| results = {} |
| tests = [ |
| ("SIMD Operations", test_simd_operations), |
| ("Memory & LSH", test_memory_and_lsh), |
| ("Routing", test_routing), |
| ("Binding", test_binding), |
| ("Energy Convergence", test_energy_convergence), |
| ("Reasoning", test_reasoning), |
| ("Integration", test_integration), |
| ] |
|
|
| for name, test_fn in tests: |
| try: |
| results[name] = test_fn() |
| except Exception as e: |
| print(f"\n βββ {name} FAILED with exception: {e}") |
| import traceback |
| traceback.print_exc() |
| results[name] = False |
|
|
| |
| header("TEST SUMMARY") |
| total = len(results) |
| passed = sum(1 for v in results.values() if v) |
| for name, result in results.items(): |
| status = "PASS β" if result else "FAIL β" |
| print(f" [{status}] {name}") |
|
|
| print(f"\n Total: {passed}/{total} test groups passed") |
| print("β" * 70) |
|
|
| return 0 if passed == total else 1 |
|
|
|
|
| if __name__ == '__main__': |
| exit(main()) |
|
|