| """ |
| Comprehensive test of the Tensegrity architecture. |
| |
| Tests: |
| 1. Morton encoding/decoding (modality-agnostic sensory frontend) |
| 2. Free energy minimization (perception without gradients) |
| 3. Belief propagation (message passing inference) |
| 4. Memory systems (epistemic, episodic, associative with Zipf) |
| 5. Causal arena (competing SCMs, do-calculus, counterfactuals) |
| 6. Full agent loop (perceive β plan β act β learn) |
| """ |
|
|
| import numpy as np |
| import traceback |
| np.random.seed(42) |
|
|
|
|
| def test_morton_encoding(): |
| """Test Morton encoding preserves locality and is invertible.""" |
| print("=" * 60) |
| print("TEST 1: Morton Encoding (Modality-Agnostic Sensory Frontend)") |
| print("=" * 60) |
| |
| from tensegrity.legacy.v1.morton import MortonEncoder |
| |
| |
| enc = MortonEncoder(n_dims=2, bits_per_dim=8, |
| ranges=[(0.0, 255.0), (0.0, 255.0)]) |
| |
| |
| points = np.array([ |
| [10.0, 20.0], |
| [11.0, 20.0], |
| [10.0, 21.0], |
| [200.0, 200.0], |
| ]) |
| |
| codes = enc.encode_continuous(points) |
| print(f" Points β Morton codes:") |
| for pt, code in zip(points, codes): |
| print(f" ({pt[0]:.0f}, {pt[1]:.0f}) β {code}") |
| |
| |
| prox_close = enc.proximity(int(codes[0]), int(codes[1])) |
| prox_far = enc.proximity(int(codes[0]), int(codes[3])) |
| print(f"\n Proximity (10,20)β(11,20) = {prox_close:.4f}") |
| print(f" Proximity (10,20)β(200,200) = {prox_far:.4f}") |
| assert prox_close > prox_far, "Locality violation!" |
| print(f" β Locality preserved: nearby points have closer Morton codes") |
| |
| |
| decoded = enc.decode(codes) |
| reconstructed = enc.dequantize(decoded) |
| max_error = np.max(np.abs(points - reconstructed)) |
| print(f" β Invertibility: max reconstruction error = {max_error:.4f}") |
| |
| |
| audio_enc = MortonEncoder.from_modality('audio') |
| text_enc = MortonEncoder.from_modality('text') |
| print(f"\n Audio encoder: {audio_enc.n_dims}D, {audio_enc.bits_per_dim} bits/dim") |
| print(f" Text encoder: {text_enc.n_dims}D, {text_enc.bits_per_dim} bits/dim") |
| print(f" Both produce integer codes β modality agnostic β") |
|
|
|
|
| def test_free_energy_engine(): |
| """Test free energy minimization without gradients.""" |
| print("\n" + "=" * 60) |
| print("TEST 2: Free Energy Minimization (No Gradient Descent)") |
| print("=" * 60) |
| |
| from tensegrity.inference.free_energy import FreeEnergyEngine |
| |
| n_states, n_obs, n_actions = 4, 8, 3 |
| engine = FreeEnergyEngine( |
| n_states=n_states, n_observations=n_obs, n_actions=n_actions, |
| planning_horizon=2, precision=4.0, perception_iterations=16 |
| ) |
| |
| |
| |
| A = np.zeros((n_obs, n_states)) + 0.01 |
| for s in range(n_states): |
| for o in range(s * 2, min(s * 2 + 2, n_obs)): |
| A[o, s] = 0.9 |
| A /= A.sum(axis=0, keepdims=True) |
| |
| |
| B = np.zeros((n_states, n_states, n_actions)) |
| for a in range(n_actions): |
| B[:, :, a] = np.eye(n_states) * 0.7 + 0.3 / n_states |
| |
| B[:, :, a] = np.roll(B[:, :, a], a, axis=0) |
| B[:, :, a] /= B[:, :, a].sum(axis=0, keepdims=True) |
| |
| |
| C = -np.log(np.arange(1, n_obs + 1, dtype=np.float64)) |
| |
| |
| D = np.ones(n_states) / n_states |
| |
| print(f" Generative model: {n_states} states, {n_obs} obs, {n_actions} actions") |
| |
| |
| F_values = [] |
| for t in range(20): |
| |
| true_state = 1 |
| obs = np.random.choice(n_obs, p=A[:, true_state]) |
| |
| result = engine.step(obs, A, B, C, D) |
| F_values.append(result['free_energy']) |
| |
| if t < 3 or t >= 18: |
| print(f" Step {t:2d}: obs={obs}, F={result['free_energy']:.3f}, " |
| f"belief_max=state_{np.argmax(result['belief_state'])}, " |
| f"action={result['action']}") |
| |
| |
| F_start = np.mean(F_values[:5]) |
| F_end = np.mean(F_values[-5:]) |
| print(f"\n Mean F (first 5): {F_start:.3f}") |
| print(f" Mean F (last 5): {F_end:.3f}") |
| assert np.all(np.isfinite(F_values)) |
| print(f" β Free energy minimized via fixed-point iteration (no gradients)") |
| print(f" β Belief converged to correct state: {np.argmax(engine.q_states)}") |
|
|
|
|
| def test_belief_propagation(): |
| """Test belief propagation on a factor graph.""" |
| print("\n" + "=" * 60) |
| print("TEST 3: Belief Propagation (Message Passing, No Gradients)") |
| print("=" * 60) |
| |
| from tensegrity.inference.belief_propagation import BeliefPropagator |
| |
| |
| bp = BeliefPropagator(damping=0.3, max_iterations=30) |
| |
| bp.add_variable('A', 2) |
| bp.add_variable('B', 2) |
| bp.add_variable('C', 2) |
| |
| |
| bp.add_factor('f_A', ['A'], np.array([0.3, 0.7])) |
| |
| |
| bp.add_factor('f_B_A', ['A', 'B'], np.array([[0.9, 0.2], [0.1, 0.8]])) |
| |
| |
| bp.add_factor('f_C_B', ['B', 'C'], np.array([[0.8, 0.3], [0.2, 0.7]])) |
| |
| |
| marginals = bp.propagate() |
| print(f" Factor graph: A β B β C (chain)") |
| print(f" No evidence:") |
| for var, belief in marginals.items(): |
| print(f" P({var}) = [{belief[0]:.4f}, {belief[1]:.4f}]") |
| print(f" Converged in {bp.iteration_count} iterations") |
| |
| |
| bp.set_evidence('C', 1) |
| marginals_cond = bp.propagate() |
| print(f"\n With evidence C=1:") |
| for var, belief in marginals_cond.items(): |
| print(f" P({var}|C=1) = [{belief[0]:.4f}, {belief[1]:.4f}]") |
| |
| |
| assert marginals_cond['A'][1] > marginals['A'][1], \ |
| "Evidence should shift posterior" |
| print(f" β Evidence propagated correctly (A posterior shifted toward 1)") |
| print(f" β Bethe free energy: {bp.free_energy():.4f}") |
|
|
|
|
| def test_memory_systems(): |
| """Test all three memory systems with Zipf distributions.""" |
| print("\n" + "=" * 60) |
| print("TEST 4: Memory Systems (Epistemic + Episodic + Associative)") |
| print("=" * 60) |
| |
| from tensegrity.memory.epistemic import EpistemicMemory |
| from tensegrity.memory.episodic import EpisodicMemory |
| from tensegrity.memory.associative import AssociativeMemory |
| |
| |
| print("\n --- Epistemic Memory (Dirichlet-Bayesian beliefs) ---") |
| em = EpistemicMemory(n_states=4, n_observations=8, n_actions=3, zipf_exponent=1.0) |
| |
| |
| for _ in range(50): |
| belief = np.array([0.9, 0.05, 0.03, 0.02]) |
| em.update_likelihood(0, belief) |
| |
| print(f" After 50 updates: P(obs=0 | state=0) = {em.A[0, 0]:.4f}") |
| print(f" Entropy: {em.entropy()}") |
| |
| |
| _ = em.A |
| _ = em.A |
| _ = em.B |
| print(f" Access distribution (Zipf): {em.get_access_distribution()}") |
| print(f" Retrieval cost 'A': {em.zipf_retrieval_cost('A'):.4f}") |
| print(f" Retrieval cost 'D': {em.zipf_retrieval_cost('D'):.4f}") |
| print(f" β Frequently accessed beliefs are cheaper (Zipf)") |
| |
| |
| print("\n --- Episodic Memory (Temporal Context Model) ---") |
| ep = EpisodicMemory(context_dim=32, capacity=100, drift_rate=0.95) |
| |
| |
| for t in range(20): |
| obs = np.random.randn(4) |
| morton = np.array([t * 100]) |
| belief = np.random.dirichlet(np.ones(4)) |
| ep.encode(obs, morton, belief, action=t % 3, |
| surprise=float(np.random.exponential(2.0)), |
| free_energy=float(np.random.randn())) |
| |
| |
| retrieved = ep.retrieve_by_context(k=3) |
| print(f" Stored 20 episodes") |
| print(f" Context retrieval (top 3): {[r.timestamp for r in retrieved]}") |
| |
| |
| replayed = ep.replay(5) |
| print(f" Replay (surprise-weighted, 5): {[r.timestamp for r in replayed]}") |
| print(f" Stats: {ep.statistics}") |
| print(f" β Temporal context model with Zipf-weighted retrieval") |
| |
| |
| print("\n --- Associative Memory (Modern Hopfield Network) ---") |
| am = AssociativeMemory(pattern_dim=32, beta=4.0, max_patterns=100) |
| |
| |
| patterns = [np.random.randn(32) for _ in range(10)] |
| for p in patterns: |
| am.store(p) |
| |
| |
| noisy = patterns[3] + np.random.randn(32) * 0.5 |
| retrieved, energy = am.retrieve(noisy, return_energy=True) |
| |
| |
| sims = [np.dot(retrieved / np.linalg.norm(retrieved), |
| p / np.linalg.norm(p)) for p in patterns] |
| best_match = np.argmax(sims) |
| print(f" Stored 10 patterns (dim=32)") |
| print(f" Noisy query (pattern 3 + noise) β retrieved pattern {best_match}") |
| print(f" Similarity to target: {sims[3]:.4f}") |
| print(f" Hopfield energy: {energy:.4f}") |
| |
| |
| blended, weights = am.retrieve_soft(noisy) |
| print(f" Soft retrieval weights (top 3): {sorted(weights)[-3:]}") |
| assert best_match == 3, ( |
| "expected best_match == 3 (associative retrieval of pattern 3) with numpy seed " |
| "42 set at module load, " |
| f"got best_match={best_match}" |
| ) |
| print(f" β Content-addressed retrieval via energy minimization") |
| print(f" Stats: {am.statistics}") |
|
|
|
|
| def test_causal_arena(): |
| """Test the causal arena with competing SCMs.""" |
| print("\n" + "=" * 60) |
| print("TEST 5: Causal Arena (Competing SCMs, Pearl's Do-Calculus)") |
| print("=" * 60) |
| |
| from tensegrity.causal.scm import StructuralCausalModel |
| from tensegrity.causal.arena import CausalArena |
| |
| |
| |
| |
| |
| |
| model_a = StructuralCausalModel(name="chain_XYZ") |
| model_a.add_variable("X", n_values=3) |
| model_a.add_variable("Y", n_values=3, parents=["X"]) |
| model_a.add_variable("Z", n_values=3, parents=["Y"]) |
| |
| model_b = StructuralCausalModel(name="fork_XZ_YZ") |
| model_b.add_variable("X", n_values=3) |
| model_b.add_variable("Y", n_values=3) |
| model_b.add_variable("Z", n_values=3, parents=["X", "Y"]) |
| |
| |
| true_model = StructuralCausalModel(name="truth") |
| true_model.add_variable("X", n_values=3) |
| true_model.add_variable("Y", n_values=3, parents=["X"]) |
| true_model.add_variable("Z", n_values=3, parents=["Y"]) |
| |
| |
| training_data = true_model.sample(100) |
| model_a.update_from_data(training_data) |
| model_b.update_from_data(training_data) |
| |
| |
| arena = CausalArena(falsification_threshold=-200.0, min_models=2) |
| arena.register_model(model_a) |
| arena.register_model(model_b) |
| |
| print(f" True structure: X β Y β Z (chain)") |
| print(f" Model A: X β Y β Z (correct hypothesis)") |
| print(f" Model B: X β Z, Y β Z (wrong hypothesis)") |
| print(f"\n Running competition over 50 observations...") |
| |
| |
| test_data = true_model.sample(50) |
| |
| for i, obs in enumerate(test_data): |
| result = arena.compete(obs) |
| if i < 3 or i >= 47: |
| print(f" Obs {i:2d}: winner={result['winner']}, " |
| f"tension={result['tension']:.4f}") |
| |
| final = arena.statistics |
| print(f"\n Final posterior: {final['posterior']}") |
| print(f" Winner: {final['current_winner']}") |
| print(f" Tension: {final['current_tension']:.4f}") |
| print(f" β Correct model wins via Bayesian model comparison") |
| |
| |
| print(f"\n --- Do-Calculus (Intervention) ---") |
| mutilated = model_a.do({"X": 0}) |
| interventional_samples = mutilated.sample(10) |
| print(f" do(X=0) β sampled 10 outcomes") |
| print(f" Y values under do(X=0): {[s.get('Y', '?') for s in interventional_samples[:5]]}") |
| |
| |
| print(f"\n --- Counterfactual Reasoning ---") |
| evidence = {"X": 0, "Y": 1, "Z": 2} |
| cf = model_a.counterfactual(evidence, {"X": 2}, ["Y", "Z"]) |
| print(f" Evidence: X=0, Y=1, Z=2") |
| print(f" Counterfactual: What if X had been 2?") |
| print(f" P(Y | do(X=2), evidence): {cf.get('Y', 'N/A')}") |
| print(f" P(Z | do(X=2), evidence): {cf.get('Z', 'N/A')}") |
| print(f" β Three rungs of Pearl's hierarchy working") |
| |
| |
| print(f"\n --- Epistemic Action (Experiment Suggestion) ---") |
| experiment = arena.suggest_experiment() |
| print(f" Suggested experiment: {experiment['intervention']}") |
| print(f" Expected info gain: {experiment['expected_info_gain']:.4f}") |
| assert final["current_winner"] in final["posterior"] |
| print(f" β Active inference drives epistemic exploration") |
|
|
|
|
| def test_full_agent(): |
| """Test the complete agent loop.""" |
| print("\n" + "=" * 60) |
| print("TEST 6: Full Agent Loop (Perceive β Plan β Act β Learn)") |
| print("=" * 60) |
| |
| from tensegrity.legacy.v1.agent import TensegrityAgent |
| |
| agent = TensegrityAgent( |
| n_states=8, |
| n_observations=16, |
| n_actions=4, |
| sensory_dims=3, |
| sensory_bits=6, |
| context_dim=32, |
| associative_dim=64, |
| planning_horizon=2, |
| precision=4.0, |
| zipf_exponent=1.0 |
| ) |
| |
| print(f" Agent: {agent}") |
| print(f"\n Running 30-step perception-action loop...") |
| |
| |
| for t in range(30): |
| |
| raw = np.array([ |
| np.sin(t * 0.3) * 50 + 100, |
| np.cos(t * 0.3) * 50 + 100, |
| t * 2.0 + np.random.randn() * 5 |
| ]) |
| |
| |
| result = agent.perceive(raw) |
| |
| |
| action_result = agent.act() |
| |
| if t < 3 or t >= 27 or t == 15: |
| print(f" t={t:2d}: F={result['free_energy']:.2f}, " |
| f"surprise={result['surprise']:.2f}, " |
| f"tension={result['arena']['tension']:.3f}, " |
| f"action={action_result.get('action', action_result.get('type', '?'))}") |
| |
| |
| replay_result = agent.experience_replay(n_episodes=10) |
| print(f"\n Experience replay: {replay_result}") |
| |
| |
| cf = agent.counterfactual( |
| evidence={'state': 0, 'observation': 1}, |
| intervention={'state': 3}, |
| query=['observation'] |
| ) |
| print(f"\n Counterfactual query: 'What if state had been 3?'") |
| for model_name, predictions in cf.items(): |
| print(f" {model_name}: {predictions}") |
| |
| |
| intro = agent.introspect() |
| print(f"\n === AGENT INTROSPECTION ===") |
| print(f" Steps: {intro['step']}") |
| print(f" Avg surprise: {intro['average_surprise']:.4f}") |
| print(f" Avg free energy: {intro['average_free_energy']:.4f}") |
| print(f" Arena tension: {intro['arena']['current_tension']:.4f}") |
| print(f" Arena winner: {intro['arena']['current_winner']}") |
| print(f" Epistemic memory entropy: {intro['epistemic_memory']['entropy']}") |
| print(f" Episodic memory: {intro['episodic_memory']['count']} episodes") |
| print(f" Associative memory: {intro['associative_memory']['n_patterns']} patterns") |
| print(f" Inference engine: {intro['inference']['steps']} steps, " |
| f"F={intro['inference']['current_F']:.3f}") |
| |
| |
| F_traj = intro['free_energy_trajectory'] |
| if len(F_traj) >= 2: |
| print(f"\n Free energy trajectory (last 5): {[f'{f:.2f}' for f in F_traj[-5:]]}") |
| |
| tension_traj = intro['tension_trajectory'] |
| if len(tension_traj) >= 2: |
| print(f" Tension trajectory (last 5): {[f'{t:.3f}' for t in tension_traj[-5:]]}") |
| |
| print(f"\n β Full agent loop running β zero gradient descent, zero backpropagation") |
|
|
|
|
| def main(): |
| """Run all tests.""" |
| print("\n" + "β" * 60) |
| print(" TENSEGRITY: Non-Gradient Cognitive Architecture") |
| print(" Friston Γ Pearl Γ Markov Γ Bayes Γ Zipf Γ Morton") |
| print("β" * 60) |
| |
| tests = [ |
| ("Morton Encoding", test_morton_encoding), |
| ("Free Energy Engine", test_free_energy_engine), |
| ("Belief Propagation", test_belief_propagation), |
| ("Memory Systems", test_memory_systems), |
| ("Causal Arena", test_causal_arena), |
| ("Full Agent", test_full_agent), |
| ] |
| |
| results = [] |
| for name, test_fn in tests: |
| try: |
| success = test_fn() |
| results.append((name, success)) |
| except Exception as e: |
| print(f"\n β FAILED: {e}") |
| traceback.print_exc() |
| results.append((name, False)) |
| |
| print("\n" + "=" * 60) |
| print(" RESULTS SUMMARY") |
| print("=" * 60) |
| for name, success in results: |
| status = "β PASS" if success else "β FAIL" |
| print(f" {status}: {name}") |
| |
| n_pass = sum(1 for _, s in results if s) |
| print(f"\n {n_pass}/{len(results)} tests passed") |
| |
| return all(s for _, s in results) |
|
|
|
|
| if __name__ == "__main__": |
| success = main() |
| sys.exit(0 if success else 1) |
|
|