""" MLE Comprehensive Test Suite =============================== Tests covering: 1. SIMD operations correctness & performance 2. Memory storage & retrieval 3. LSH indexing quality 4. Routing latency & scalability 5. Binding operations (binary & HRR) 6. Energy convergence 7. Reasoning capabilities (association, analogy, composition) 8. End-to-end integration """ import numpy as np import time import sys import os # Add project root to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from mle.utils.simd_ops import ( N_BITS, N_WORDS, hamming_distance, hamming_batch, hamming_topk, hamming_similarity, xor_vectors, popcount, majority_vote, random_binary_vector, random_binary_vectors, normalize_density, get_native_lib ) from mle.memory.sparse_address_table import SparseAddressTable, HammingLSH from mle.routing.recursive_jit_router import RecursiveJITRouter from mle.binding.semantic_binding import HRRBinding, BinaryBinding, BindingEngine from mle.energy.energy_model import EnergyFunction, RelaxationDynamics, HopfieldDynamics, EnergyModel from mle.inference.reasoning_engine import ReasoningEngine def header(title): print(f"\n{'='*70}") print(f" {title}") print(f"{'='*70}") def check(condition, message): status = "✓" if condition else "✗" print(f" [{status}] {message}") return condition # ══════════════════════════════════════════════════════════════════════════════ # 1. SIMD OPERATIONS # ══════════════════════════════════════════════════════════════════════════════ def test_simd_operations(): header("1. SIMD Operations") all_pass = True np.random.seed(42) # Check native lib lib = get_native_lib() all_pass &= check(lib is not None, f"Native SIMD library compiled: {lib is not None}") # Basic Hamming distance a = random_binary_vector() b = random_binary_vector() dist = hamming_distance(a, b) all_pass &= check( 1800 < dist < 2200, f"Random vector Hamming distance ≈ N/2: {dist} (expected ~2048)" ) # Self-distance = 0 all_pass &= check( hamming_distance(a, a) == 0, "Self-distance = 0" ) # XOR identity: dist(a, a⊕b) should relate to popcount(b) xor_ab = xor_vectors(a, b) d1 = hamming_distance(a, xor_ab) d2 = popcount(b) # d1 should equal popcount(a XOR (a XOR b)) = popcount(b) all_pass &= check( d1 == d2, f"XOR identity: dist(a, a⊕b) = popcount(b): {d1} == {d2}" ) # Batch Hamming distance corpus = random_binary_vectors(1000) dists = hamming_batch(a, corpus) all_pass &= check( dists.shape == (1000,), f"Batch Hamming shape: {dists.shape}" ) all_pass &= check( np.all(dists >= 0) and np.all(dists <= N_BITS), f"Batch Hamming range: [{dists.min()}, {dists.max()}]" ) # Top-K indices, distances = hamming_topk(a, corpus, k=10) all_pass &= check( len(indices) == 10, f"Top-10 returned: {len(indices)}" ) all_pass &= check( np.all(np.diff(distances) >= 0), f"Top-K sorted ascending: {distances[:5]}..." ) # Verify top-K correctness against full sort full_sort_idx = np.argsort(dists)[:10] full_sort_dist = dists[full_sort_idx] all_pass &= check( np.array_equal(distances, full_sort_dist), f"Top-K matches full sort: {np.array_equal(distances, full_sort_dist)}" ) # Majority vote vecs = random_binary_vectors(5) mv = majority_vote(np.ascontiguousarray(vecs)) all_pass &= check( mv.shape == (N_WORDS,) and mv.dtype == np.uint64, f"Majority vote shape/dtype: {mv.shape}, {mv.dtype}" ) # Normalize density v = random_binary_vector() v_norm = normalize_density(v, 0.5) actual_density = popcount(v_norm) / N_BITS all_pass &= check( abs(actual_density - 0.5) < 0.01, f"Density normalization: {actual_density:.4f} (target 0.5)" ) # ── Performance benchmark ── print() corpus_sizes = [1_000, 10_000, 100_000] for n in corpus_sizes: corpus = random_binary_vectors(n) query = random_binary_vector() # Batch Hamming t0 = time.perf_counter() for _ in range(10): hamming_batch(query, corpus) elapsed = (time.perf_counter() - t0) / 10 * 1000 throughput = n / elapsed * 1000 print(f" ⏱ Batch Hamming ({n:>7d} vecs): {elapsed:>7.2f} ms" f" ({throughput/1e6:.1f}M vecs/s)") # Top-500 t0 = time.perf_counter() for _ in range(10): hamming_topk(query, corpus, k=500) elapsed = (time.perf_counter() - t0) / 10 * 1000 print(f" ⏱ Top-500 ({n:>7d} vecs): {elapsed:>7.2f} ms") return all_pass # ══════════════════════════════════════════════════════════════════════════════ # 2. MEMORY & LSH # ══════════════════════════════════════════════════════════════════════════════ def test_memory_and_lsh(): header("2. Memory & LSH Indexing") all_pass = True np.random.seed(42) # Create memory mem = SparseAddressTable(capacity=10000, lsh_tables=16, lsh_projections=24) all_pass &= check(mem.size == 0, f"Empty memory: size={mem.size}") # Store concepts n_concepts = 5000 addresses = random_binary_vectors(n_concepts) contents = random_binary_vectors(n_concepts) t0 = time.perf_counter() for i in range(n_concepts): mem.store(addresses[i], contents[i], metadata={'name': f'concept_{i}', 'index': i}) store_time = (time.perf_counter() - t0) * 1000 all_pass &= check( mem.size == n_concepts, f"Stored {n_concepts} concepts in {store_time:.1f}ms" ) # Exact search query = addresses[42].copy() results = mem.query_nearest(query, k=5, use_lsh=False) all_pass &= check( results[0][0] == 42 and results[0][1] == 0, f"Exact retrieval: found correct entry (dist=0)" ) # LSH search results_lsh = mem.query_nearest(query, k=5, use_lsh=True) found_exact = any(idx == 42 for idx, _ in results_lsh) all_pass &= check( found_exact, f"LSH retrieval: found exact match in top-5" ) # Near-duplicate search near = addresses[42].copy() bits = np.unpackbits(near.view(np.uint8)) # Flip 50 random bits (~1.2% difference) flip_pos = np.random.choice(N_BITS, 50, replace=False) bits[flip_pos] ^= 1 near_modified = np.packbits(bits).view(np.uint64).copy() results_near = mem.query_nearest(near_modified, k=10, use_lsh=True) all_pass &= check( results_near[0][1] <= 100, f"Near-duplicate found: best distance = {results_near[0][1]} (flipped 50 bits)" ) # Named concept cat_idx = mem.store_concept("cat", metadata={'category': 'animal'}) retrieved = mem.get_by_name("cat") all_pass &= check( retrieved is not None, f"Named concept 'cat' stored and retrieved" ) # Activation mem.activate(np.array([0, 1, 2]), np.array([0.9, 0.5, 0.3])) active = mem.get_active(threshold=0.4) all_pass &= check( len(active) == 2, f"Activation: {len(active)} entries above threshold 0.4" ) mem.decay_activations(0.5) active_after = mem.get_active(threshold=0.4) all_pass &= check( len(active_after) == 1, f"After decay: {len(active_after)} entries above threshold 0.4" ) # Stats stats = mem.stats() all_pass &= check( stats['size'] == n_concepts + 1, f"Memory stats: {stats}" ) # ── LSH Recall benchmark ── # Test with near-duplicates (meaningful LSH scenario) # Create clusters: for 100 base vectors, create 5 near-duplicates each (50 bits flipped) print() mem2 = SparseAddressTable(capacity=2000, lsh_tables=32, lsh_projections=8) base_vecs = random_binary_vectors(100) cluster_map = {} # idx -> cluster_id next_idx = 0 for cid in range(100): mem2.store(base_vecs[cid], base_vecs[cid]) cluster_map[next_idx] = cid next_idx += 1 for _ in range(5): bits = np.unpackbits(base_vecs[cid].view(np.uint8)).copy() flips = np.random.choice(N_BITS, 100, replace=False) bits[flips] ^= 1 variant = np.packbits(bits).view(np.uint64).copy() mem2.store(variant, variant) cluster_map[next_idx] = cid next_idx += 1 # For each base vector, check if LSH finds its cluster members recall_tests = 100 total_recall = 0 for cid in range(recall_tests): query = base_vecs[cid] lsh_results = mem2.query_nearest(query, k=10, use_lsh=True) # Count how many results are from the same cluster lsh_ids = [idx for idx, _ in lsh_results] same_cluster = sum(1 for idx in lsh_ids if cluster_map.get(idx) == cid) # Each cluster has 6 members; top-10 should find most total_recall += same_cluster / min(6, 10) avg_recall = total_recall / recall_tests all_pass &= check( avg_recall > 0.3, f"LSH Cluster Recall@10: {avg_recall:.2%} (near-duplicates, 100 clusters)" ) # Also verify that exact self-lookup always works via LSH exact_recall = 0 for cid in range(recall_tests): query = base_vecs[cid] lsh_results = mem2.query_nearest(query, k=1, use_lsh=True) if lsh_results and lsh_results[0][1] == 0: exact_recall += 1 all_pass &= check( exact_recall == recall_tests, f"LSH Exact self-lookup: {exact_recall}/{recall_tests}" ) return all_pass # ══════════════════════════════════════════════════════════════════════════════ # 3. ROUTING # ══════════════════════════════════════════════════════════════════════════════ def test_routing(): header("3. Recursive JIT Routing") all_pass = True np.random.seed(42) # Build memory with 10K entries mem = SparseAddressTable(capacity=20000) n = 10000 addresses = random_binary_vectors(n) contents = random_binary_vectors(n) for i in range(n): mem.store(addresses[i], contents[i], metadata={'name': f'v_{i}'}) router = RecursiveJITRouter( memory=mem, beam_width=500, max_depth=3, expansion_factor=5, ) # Basic routing query = addresses[100].copy() result = router.route(query) all_pass &= check( len(result.indices) > 0, f"Routing returned {len(result.indices)} results" ) all_pass &= check( result.distances[0] == 0, f"Exact match found at distance 0" ) all_pass &= check( result.latency_ms < 1000, f"Routing latency: {result.latency_ms:.1f}ms (target < 1000ms)" ) # Random query routing random_q = random_binary_vector() result_rnd = router.route(random_q) all_pass &= check( len(result_rnd.indices) == 500, f"Beam width respected: {len(result_rnd.indices)} (target 500)" ) all_pass &= check( np.all(np.diff(result_rnd.distances) >= 0), "Results sorted by distance" ) # Beam convergence (distances should decrease across depth) all_pass &= check( len(result_rnd.beam_history) > 0, f"Beam history recorded: {len(result_rnd.beam_history)} depths, " f"means={[f'{m:.0f}' for m in result_rnd.beam_history]}" ) # Route and activate result_act = router.route_and_activate(random_q) active = mem.get_active(threshold=0.001) all_pass &= check( len(active) > 0, f"Route-and-activate: {len(active)} entries activated" ) # Multi-hop routing results_multi = router.multi_hop_route(random_q, hops=2) all_pass &= check( len(results_multi) == 2, f"Multi-hop routing: {len(results_multi)} hops completed" ) # ── Scalability benchmark ── print() for n_test in [1_000, 10_000, 50_000]: mem_test = SparseAddressTable(capacity=n_test + 1000) addrs = random_binary_vectors(n_test) conts = random_binary_vectors(n_test) for i in range(n_test): mem_test.store(addrs[i], conts[i]) r_test = RecursiveJITRouter(mem_test, beam_width=500, max_depth=3) latencies = [] for _ in range(10): q = random_binary_vector() res = r_test.route(q) latencies.append(res.latency_ms) avg_lat = np.mean(latencies) p99_lat = np.percentile(latencies, 99) print(f" ⏱ Routing ({n_test:>6d} entries): " f"avg={avg_lat:.1f}ms, p99={p99_lat:.1f}ms, " f"explored={res.candidates_explored}") return all_pass # ══════════════════════════════════════════════════════════════════════════════ # 4. BINDING OPERATIONS # ══════════════════════════════════════════════════════════════════════════════ def test_binding(): header("4. Binding Operations") all_pass = True np.random.seed(42) # ── Binary binding (BSC) ── print(" --- Binary Binding (BSC/XOR) ---") a = random_binary_vector() b = random_binary_vector() # Bind + unbind = identity bound = BinaryBinding.bind(a, b) recovered = BinaryBinding.unbind(bound, b) all_pass &= check( hamming_distance(a, recovered) == 0, "XOR bind+unbind = exact recovery" ) # Bound is quasi-orthogonal to inputs sim_ab = hamming_similarity(bound, a) sim_bb = hamming_similarity(bound, b) all_pass &= check( abs(sim_ab - 0.5) < 0.05 and abs(sim_bb - 0.5) < 0.05, f"Bound quasi-orthogonal to inputs: sim(C,A)={sim_ab:.3f}, sim(C,B)={sim_bb:.3f}" ) # Bundle (majority vote) c = random_binary_vector() bundled = BinaryBinding.bundle(a, b, c) sim_a = hamming_similarity(bundled, a) sim_b = hamming_similarity(bundled, b) sim_c = hamming_similarity(bundled, c) all_pass &= check( sim_a > 0.55 and sim_b > 0.55 and sim_c > 0.55, f"Bundle preserves similarity: {sim_a:.3f}, {sim_b:.3f}, {sim_c:.3f}" ) # Permutation perm_a = BinaryBinding.permute(a, 1) inv_perm_a = BinaryBinding.inverse_permute(perm_a, 1) all_pass &= check( hamming_distance(a, inv_perm_a) == 0, "Permutation + inverse = identity" ) all_pass &= check( hamming_similarity(a, perm_a) < 0.55, f"Permuted is dissimilar: sim={hamming_similarity(a, perm_a):.3f}" ) # Triple encoding s, r, o = random_binary_vector(), random_binary_vector(), random_binary_vector() triple = BinaryBinding.encode_triple(s, r, o) # Decode object: unbind(unbind(triple, s), r) decoded_o = BinaryBinding.unbind(BinaryBinding.unbind(triple, s), r) all_pass &= check( hamming_distance(o, decoded_o) == 0, "Triple encode/decode: exact recovery of object" ) # ── HRR binding (circular convolution) ── print(" --- HRR Binding (Circular Convolution) ---") dim = 4096 ha = HRRBinding.random_vector(dim) hb = HRRBinding.random_vector(dim) # Bind + unbind ≈ identity (approximate for HRR) hbound = HRRBinding.bind(ha, hb) hrecovered = HRRBinding.unbind(hbound, hb) hrr_sim = HRRBinding.similarity(ha, hrecovered) all_pass &= check( hrr_sim > 0.3, f"HRR bind+unbind similarity: {hrr_sim:.3f} (should be >> 0, indicating recovery)" ) # Bound is quasi-orthogonal hrr_orth = HRRBinding.similarity(hbound, ha) all_pass &= check( abs(hrr_orth) < 0.2, f"HRR bound quasi-orthogonal: sim={hrr_orth:.3f}" ) # Bundle preserves components hc = HRRBinding.random_vector(dim) hbundled = HRRBinding.bundle(ha, hb, hc) all_pass &= check( HRRBinding.similarity(hbundled, ha) > 0.2, f"HRR bundle preserves components: sim={HRRBinding.similarity(hbundled, ha):.3f}" ) # ── Binding Engine ── print(" --- Binding Engine ---") engine = BindingEngine(use_binary=True) engine.register_concept("king") engine.register_concept("queen") engine.register_concept("man") engine.register_concept("woman") sim_kk = engine.similarity(engine.get_concept("king"), engine.get_concept("king")) sim_kq = engine.similarity(engine.get_concept("king"), engine.get_concept("queen")) all_pass &= check( sim_kk == 1.0, f"Self-similarity = 1.0: {sim_kk}" ) all_pass &= check( abs(sim_kq - 0.5) < 0.05, f"Random concept similarity ≈ 0.5: {sim_kq:.3f}" ) # ── Performance ── print() n_ops = 10000 t0 = time.perf_counter() for _ in range(n_ops): BinaryBinding.bind(a, b) elapsed = (time.perf_counter() - t0) * 1000 print(f" ⏱ Binary bind: {n_ops} ops in {elapsed:.1f}ms " f"({n_ops/elapsed*1000:.0f} ops/s)") t0 = time.perf_counter() for _ in range(n_ops): HRRBinding.bind(ha, hb) elapsed = (time.perf_counter() - t0) * 1000 print(f" ⏱ HRR bind: {n_ops} ops in {elapsed:.1f}ms " f"({n_ops/elapsed*1000:.0f} ops/s)") return all_pass # ══════════════════════════════════════════════════════════════════════════════ # 5. ENERGY & CONVERGENCE # ══════════════════════════════════════════════════════════════════════════════ def test_energy_convergence(): header("5. Energy Model & Convergence") all_pass = True np.random.seed(42) # Create some context vectors n_context = 20 context = random_binary_vectors(n_context) activations = np.random.dirichlet(np.ones(n_context)) # ── Energy function ── efn = EnergyFunction(alpha=1.0, beta=0.5, gamma=0.1, delta=0.05) # Random state should have moderate energy state = random_binary_vector() e = efn.total_energy(state, context, activations) all_pass &= check( 'total' in e and 'compatibility' in e, f"Energy components computed: {list(e.keys())}" ) all_pass &= check( isinstance(e['total'], float), f"Total energy: {e['total']:.4f}" ) # ── Binary relaxation ── print(" --- Binary Relaxation ---") dynamics = RelaxationDynamics( efn, max_iterations=30, n_candidates=16, flip_fraction=0.05 ) result = dynamics.relax(state, context, activations) initial_e = result['trajectory'][0]['total'] final_e = result['final_energy'] all_pass &= check( final_e <= initial_e + 0.01, # allow tiny float imprecision f"Energy decreased: {initial_e:.4f} → {final_e:.4f} " f"(Δ = {initial_e - final_e:.4f})" ) all_pass &= check( result['iterations'] > 0, f"Iterations: {result['iterations']}" ) # Check trajectory is generally decreasing traj_energies = [t['total'] for t in result['trajectory']] decreasing_steps = sum(1 for i in range(1, len(traj_energies)) if traj_energies[i] <= traj_energies[i-1] + 0.001) pct_decreasing = decreasing_steps / max(len(traj_energies) - 1, 1) all_pass &= check( pct_decreasing > 0.5, f"Trajectory mostly decreasing: {pct_decreasing:.0%}" ) # ── Hopfield relaxation ── print(" --- Hopfield Dynamics ---") hopfield = HopfieldDynamics(beta=8.0, max_iterations=20) h_result = hopfield.relax(state, context, activations) h_traj = h_result['energy_trajectory'] all_pass &= check( len(h_traj) > 1, f"Hopfield trajectory: {len(h_traj)} steps" ) all_pass &= check( h_traj[-1] <= h_traj[0] + 0.01, f"Hopfield energy decreased: {h_traj[0]:.4f} → {h_traj[-1]:.4f}" ) # Attention should be concentrated att = h_result.get('attention_weights') if att is not None: max_att = att.max() all_pass &= check( max_att > 1.0 / n_context, f"Hopfield attention concentrated: max={max_att:.4f} (uniform={1/n_context:.4f})" ) # ── Hybrid model ── print(" --- Hybrid Energy Model ---") model = EnergyModel(mode='hybrid') hybrid_result = model.minimize(state, context, activations) all_pass &= check( 'final_state' in hybrid_result, f"Hybrid model produced final state" ) all_pass &= check( hybrid_result['converged'] or hybrid_result['total_iterations'] > 0, f"Hybrid: {hybrid_result['total_iterations']} total iterations, " f"converged={hybrid_result['converged']}" ) return all_pass # ══════════════════════════════════════════════════════════════════════════════ # 6. REASONING CAPABILITIES # ══════════════════════════════════════════════════════════════════════════════ def test_reasoning(): header("6. Reasoning Capabilities") all_pass = True np.random.seed(42) engine = ReasoningEngine( beam_width=200, max_routing_depth=2, max_reasoning_steps=3, energy_mode='hybrid', relaxation_iterations=20, ) # ── Build knowledge base ── print(" --- Building Knowledge Base ---") concepts = [ "cat", "dog", "animal", "pet", "fish", "water", "ocean", "river", "bird", "sky", "wing", "fly", "car", "road", "wheel", "engine", "tree", "leaf", "forest", "green", "sun", "moon", "star", "night", "king", "queen", "prince", "princess", "man", "woman", "child", "person", ] for c in concepts: engine.add_concept(c) relations = [ ("cat", "is_a", "animal"), ("dog", "is_a", "animal"), ("cat", "is_a", "pet"), ("dog", "is_a", "pet"), ("fish", "lives_in", "water"), ("fish", "is_a", "animal"), ("bird", "has", "wing"), ("bird", "can", "fly"), ("bird", "is_a", "animal"), ("car", "has", "wheel"), ("car", "on", "road"), ("tree", "has", "leaf"), ("tree", "in", "forest"), ("leaf", "is", "green"), ("king", "is_a", "man"), ("queen", "is_a", "woman"), ("prince", "is_a", "man"), ("princess", "is_a", "woman"), ("king", "married_to", "queen"), ("sun", "in", "sky"), ("moon", "in", "sky"), ("star", "in", "sky"), ] for s, r, o in relations: engine.add_relation(s, r, o) stats = engine.stats() all_pass &= check( stats['codebook_size'] >= len(concepts), f"Knowledge base: {stats['codebook_size']} concepts, " f"{stats['memory']['size']} memory entries" ) # ── Test 1: Association ── print(" --- Association ---") assoc_cat = engine.associate("cat", top_k=10) all_pass &= check( len(assoc_cat) > 0, f"Association for 'cat': {len(assoc_cat)} results" ) if assoc_cat: print(f" Top associations: {assoc_cat[:5]}") # ── Test 2: Concept Query ── print(" --- Concept Query ---") result = engine.reason("cat", max_steps=2) all_pass &= check( result['response'] is not None, f"Reasoning on 'cat': {result['num_steps']} steps, " f"{result['latency_ms']:.1f}ms" ) if result['response']['nearest_concepts']: top_concept = result['response']['nearest_concepts'][0] print(f" Nearest concept: {top_concept[0]} (sim={top_concept[1]:.3f})") # ── Test 3: Energy convergence during reasoning ── print(" --- Energy Convergence ---") energies = [s.energy for s in result['reasoning_chain'] if s.energy != float('inf')] if len(energies) >= 2: all_pass &= check( energies[-1] <= energies[0] + 0.01, f"Energy decreased during reasoning: {energies[0]:.4f} → {energies[-1]:.4f}" ) print(f" Energy trajectory: {[f'{e:.4f}' for e in energies]}") # ── Test 4: Analogy ── print(" --- Analogy ---") analogy_result = engine.solve_analogy("king", "man", "queen") all_pass &= check( analogy_result is not None, f"Analogy 'king:man :: queen:?': completed in {analogy_result['latency_ms']:.1f}ms" ) if analogy_result['codebook_ranking']: top_answer = analogy_result['codebook_ranking'][0] print(f" Top answer: {top_answer[0]} (sim={top_answer[1]:.3f})") top_5 = [(n, f"{s:.3f}") for n, s in analogy_result['codebook_ranking'][:5]] print(f" Top-5: {top_5}") # ── Test 5: Composition ── print(" --- Composition ---") comp_result = engine.compose("water", "animal") all_pass &= check( comp_result is not None, f"Composition 'water + animal': {comp_result['latency_ms']:.1f}ms" ) if comp_result['response']['nearest_concepts']: top = comp_result['response']['nearest_concepts'][:5] print(f" Nearest to 'water+animal': {[(n, f'{s:.3f}') for n, s in top]}") # ── Test 6: Structured query ── print(" --- Structured Query ---") struct_result = engine.reason( {"subject": "bird", "relation": "can"}, max_steps=2, roles=["subject", "relation"] ) all_pass &= check( struct_result is not None, f"Structured query completed: {struct_result['latency_ms']:.1f}ms" ) if struct_result['response'].get('role_fillers'): for role, fillers in struct_result['response']['role_fillers'].items(): print(f" Role '{role}': {fillers[:3]}") # ── Test 7: Multi-step reasoning convergence ── print(" --- Multi-step Convergence ---") deep_result = engine.reason("forest", max_steps=5) chain = deep_result['reasoning_chain'] all_pass &= check( len(chain) > 0, f"Multi-step reasoning: {len(chain)} steps, {deep_result['latency_ms']:.1f}ms" ) step_energies = [s.energy for s in chain if s.energy != float('inf')] if step_energies: print(f" Step energies: {[f'{e:.4f}' for e in step_energies]}") return all_pass # ══════════════════════════════════════════════════════════════════════════════ # 7. END-TO-END INTEGRATION # ══════════════════════════════════════════════════════════════════════════════ def test_integration(): header("7. End-to-End Integration") all_pass = True np.random.seed(42) # Build a larger knowledge base engine = ReasoningEngine( beam_width=500, max_routing_depth=3, max_reasoning_steps=3, energy_mode='hybrid', ) # Create 1000 random concepts with some structure n_base = 500 categories = ["animal", "plant", "vehicle", "tool", "place"] for cat in categories: engine.add_concept(cat) for i in range(n_base): name = f"concept_{i}" engine.add_concept(name) cat = categories[i % len(categories)] engine.add_relation(name, "is_a", cat) stats = engine.stats() print(f" Knowledge base: {stats}") # Test full pipeline t0 = time.perf_counter() result = engine.reason("concept_42", max_steps=3) total_ms = (time.perf_counter() - t0) * 1000 all_pass &= check( result['response'] is not None, f"Full pipeline completed in {total_ms:.1f}ms" ) # Test batch queries print() print(" --- Batch Query Benchmark ---") latencies = [] for i in range(50): query = f"concept_{np.random.randint(n_base)}" t0 = time.perf_counter() r = engine.reason(query, max_steps=2) latencies.append((time.perf_counter() - t0) * 1000) avg_lat = np.mean(latencies) p50_lat = np.percentile(latencies, 50) p99_lat = np.percentile(latencies, 99) all_pass &= check( True, f"50 queries: avg={avg_lat:.1f}ms, p50={p50_lat:.1f}ms, p99={p99_lat:.1f}ms" ) # Memory efficiency mem_stats = engine.memory.stats() print(f" Memory usage: {mem_stats['memory_mb']:.2f} MB " f"for {mem_stats['size']} entries") bytes_per_entry = mem_stats['memory_mb'] * 1024 * 1024 / max(mem_stats['size'], 1) all_pass &= check( bytes_per_entry < 2048, # Should be ~1024 bytes (512 addr + 512 content) f"Memory efficiency: {bytes_per_entry:.0f} bytes/entry (target ≤ 1024)" ) return all_pass # ══════════════════════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════════════════════ def main(): print("\n" + "█" * 70) print(" MLE — Morpho-Logic Engine — Comprehensive Test Suite") print("█" * 70) results = {} tests = [ ("SIMD Operations", test_simd_operations), ("Memory & LSH", test_memory_and_lsh), ("Routing", test_routing), ("Binding", test_binding), ("Energy Convergence", test_energy_convergence), ("Reasoning", test_reasoning), ("Integration", test_integration), ] for name, test_fn in tests: try: results[name] = test_fn() except Exception as e: print(f"\n ✗✗✗ {name} FAILED with exception: {e}") import traceback traceback.print_exc() results[name] = False # Summary header("TEST SUMMARY") total = len(results) passed = sum(1 for v in results.values() if v) for name, result in results.items(): status = "PASS ✓" if result else "FAIL ✗" print(f" [{status}] {name}") print(f"\n Total: {passed}/{total} test groups passed") print("█" * 70) return 0 if passed == total else 1 if __name__ == '__main__': exit(main())