| | """Tests for cycle-accurate LIF simulator.
|
| |
|
| | These tests verify the simulator matches the RTL behavior in scalable_core_v2.v.
|
| | P20 update: noise, dual traces, delays, formats, microcode, hierarchical routing.
|
| | """
|
| |
|
| | import pytest
|
| | import sys, os
|
| | sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
| |
|
| | import neurocore as nc
|
| | from neurocore.constants import (
|
| | DEFAULT_THRESHOLD, DEFAULT_LEAK, DEFAULT_REFRAC, NEURONS_PER_CORE,
|
| | TRACE_MAX, DEFAULT_TAU1, DEFAULT_TAU2,
|
| | )
|
| |
|
| |
|
| | class TestSingleNeuron:
|
| | def test_constant_input_spike_timing(self):
|
| | """With threshold=1000, leak=3, constant input=200:
|
| | Each timestep adds (200 - 3) = 197 to potential.
|
| | Spike at timestep where cumulative >= 1000.
|
| | ceil(1000 / 197) = 6 timesteps.
|
| |
|
| | t0: 0 + 200 - 3 = 197
|
| | t1: 197 + 200 - 3 = 394
|
| | t2: 394 + 200 - 3 = 591
|
| | t3: 591 + 200 - 3 = 788
|
| | t4: 788 + 200 - 3 = 985 (< 1000)
|
| | t5: 985 + 200 - 3 = 1182 >= 1000 -> SPIKE at t5
|
| | """
|
| | net = nc.Network()
|
| | pop = net.population(1, params={"threshold": 1000, "leak": 3})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | spike_times = []
|
| | for t in range(20):
|
| | sim.inject(pop, current=200)
|
| | result = sim.run(1)
|
| | if result.total_spikes > 0:
|
| | spike_times.append(t)
|
| |
|
| | assert spike_times[0] == 5
|
| |
|
| | def test_refractory_period(self):
|
| | """After spiking, neuron should be silent for refrac_period timesteps."""
|
| | net = nc.Network()
|
| | pop = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 3})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | spike_times = []
|
| | for t in range(20):
|
| | sim.inject(pop, current=200)
|
| | result = sim.run(1)
|
| | if result.total_spikes > 0:
|
| | spike_times.append(t)
|
| |
|
| | assert spike_times[0] == 0
|
| | assert spike_times[1] == 4
|
| |
|
| | def test_subthreshold_decay_to_resting(self):
|
| | """If input is less than leak, potential should floor to resting."""
|
| | net = nc.Network()
|
| | pop = net.population(1, params={"threshold": 1000, "leak": 100, "resting": 0})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | sim.inject(pop, current=50)
|
| | result = sim.run(1)
|
| | assert result.total_spikes == 0
|
| | assert int(sim._potential[0]) == 0
|
| |
|
| |
|
| | class TestChainPropagation:
|
| | def test_spike_chain(self, chain_network_manual):
|
| | """N0 -> N1 -> N2 -> N3 with weight=1200, stimulus N0."""
|
| | net, n0, n1, n2, n3 = chain_network_manual
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | sim.inject(n0, current=1200)
|
| | result = sim.run(10)
|
| |
|
| | assert result.total_spikes >= 4
|
| |
|
| | p = result.placement
|
| | gid0 = p.neuron_map[(n0.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n0.id, 0)][1]
|
| | gid1 = p.neuron_map[(n1.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n1.id, 0)][1]
|
| | gid2 = p.neuron_map[(n2.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n2.id, 0)][1]
|
| | gid3 = p.neuron_map[(n3.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n3.id, 0)][1]
|
| |
|
| | assert 0 in result.spike_trains.get(gid0, [])
|
| | assert 1 in result.spike_trains.get(gid1, [])
|
| | assert 2 in result.spike_trains.get(gid2, [])
|
| | assert 3 in result.spike_trains.get(gid3, [])
|
| |
|
| |
|
| | class TestInhibition:
|
| | def test_inhibitory_weight_prevents_spike(self):
|
| | """Negative weight should reduce potential."""
|
| | net = nc.Network()
|
| | exc = net.population(1, label="exc")
|
| | inh = net.population(1, label="inh")
|
| | target = net.population(1, label="target")
|
| |
|
| | net.connect(exc, target, topology="all_to_all", weight=500)
|
| | net.connect(inh, target, topology="all_to_all", weight=-600)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | sim.inject(exc, current=1200)
|
| | sim.inject(inh, current=1200)
|
| | result = sim.run(5)
|
| |
|
| | p = result.placement
|
| | tgt_gid = p.neuron_map[(target.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(target.id, 0)][1]
|
| | tgt_spikes = result.spike_trains.get(tgt_gid, [])
|
| | assert 1 not in tgt_spikes
|
| |
|
| |
|
| | class TestGradedSpikes:
|
| | def test_graded_payload_scaling(self):
|
| | """With graded enabled, spike payload should scale delivered current."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0})
|
| | tgt = net.population(1, params={"threshold": 1000, "leak": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=200)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(graded=True)
|
| |
|
| | sim.inject(src, current=500)
|
| | result = sim.run(3)
|
| |
|
| | p = result.placement
|
| | tgt_gid = p.neuron_map[(tgt.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(tgt.id, 0)][1]
|
| | assert 1 not in result.spike_trains.get(tgt_gid, [])
|
| |
|
| |
|
| | class TestDendriticCompartments:
|
| | def test_dendritic_threshold(self):
|
| | """Dendritic input below threshold should be suppressed."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0})
|
| | tgt = net.population(1, params={
|
| | "threshold": 1000, "leak": 0, "dend_threshold": 500
|
| | })
|
| | net.connect(src, tgt, topology="all_to_all", weight=200, compartment=1)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | sim.inject(src, current=200)
|
| | result = sim.run(5)
|
| |
|
| | p = result.placement
|
| | tgt_gid = p.neuron_map[(tgt.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(tgt.id, 0)][1]
|
| | assert len(result.spike_trains.get(tgt_gid, [])) == 0
|
| |
|
| |
|
| | class TestAsyncMode:
|
| | """Tests for P12 GALS async event-driven simulation."""
|
| |
|
| | def test_basic_async_propagation(self, chain_network_manual):
|
| | """Chain N0->N1->N2->N3 should propagate in async mode."""
|
| | net, n0, n1, n2, n3 = chain_network_manual
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(async_mode=True)
|
| |
|
| | sim.inject(n0, current=1200)
|
| | result = sim.run(1)
|
| |
|
| | assert result.total_spikes == 4
|
| |
|
| | p = result.placement
|
| | gid0 = p.neuron_map[(n0.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n0.id, 0)][1]
|
| | gid1 = p.neuron_map[(n1.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n1.id, 0)][1]
|
| | gid2 = p.neuron_map[(n2.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n2.id, 0)][1]
|
| | gid3 = p.neuron_map[(n3.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n3.id, 0)][1]
|
| |
|
| | assert 0 in result.spike_trains.get(gid0, [])
|
| | assert 0 in result.spike_trains.get(gid1, [])
|
| | assert 0 in result.spike_trains.get(gid2, [])
|
| | assert 0 in result.spike_trains.get(gid3, [])
|
| |
|
| | def test_quiescence_single_neuron(self):
|
| | """Isolated neuron with no connections — activity dies immediately."""
|
| | net = nc.Network()
|
| | pop = net.population(1, params={"threshold": 100, "leak": 0})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(async_mode=True)
|
| |
|
| | sim.inject(pop, current=200)
|
| | result = sim.run(1)
|
| | assert result.total_spikes == 1
|
| |
|
| | def test_async_sync_equivalence(self):
|
| | """Critical test: async mode must produce identical spike counts
|
| | to sync mode for accumulation-dominated workloads."""
|
| | def build_and_run(async_mode):
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 1000, "leak": 3, "refrac": 3})
|
| | tgt = net.population(1, params={"threshold": 1000, "leak": 3, "refrac": 3})
|
| | net.connect(src, tgt, topology="all_to_all", weight=1200)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(async_mode=async_mode)
|
| |
|
| | total = 0
|
| | for _ in range(10):
|
| | sim.inject(src, current=200)
|
| | result = sim.run(1)
|
| | total += result.total_spikes
|
| | return total
|
| |
|
| | sync_spikes = build_and_run(async_mode=False)
|
| | async_spikes = build_and_run(async_mode=True)
|
| |
|
| | assert sync_spikes == async_spikes, (
|
| | f"Sync ({sync_spikes}) != Async ({async_spikes}) — equivalence broken!")
|
| |
|
| | def test_async_chain_collapses_to_one_timestep(self):
|
| | """In async mode, a spike chain propagates within a single timestep."""
|
| | net = nc.Network()
|
| | n0 = net.population(1, params={"threshold": 100, "leak": 0}, label="n0")
|
| | n1 = net.population(1, params={"threshold": 100, "leak": 0}, label="n1")
|
| | n2 = net.population(1, params={"threshold": 100, "leak": 0}, label="n2")
|
| | n3 = net.population(1, params={"threshold": 100, "leak": 0}, label="n3")
|
| | net.connect(n0, n1, topology="all_to_all", weight=200)
|
| | net.connect(n1, n2, topology="all_to_all", weight=200)
|
| | net.connect(n2, n3, topology="all_to_all", weight=200)
|
| |
|
| |
|
| | sim_sync = nc.Simulator()
|
| | sim_sync.deploy(net)
|
| | sim_sync.inject(n0, current=200)
|
| | result_sync = sim_sync.run(1)
|
| | assert result_sync.total_spikes == 1
|
| |
|
| |
|
| | sim_async = nc.Simulator()
|
| | sim_async.deploy(net)
|
| | sim_async.set_learning(async_mode=True)
|
| | sim_async.inject(n0, current=200)
|
| | result_async = sim_async.run(1)
|
| | assert result_async.total_spikes == 4
|
| |
|
| | def test_async_multi_population(self):
|
| | """E/I network should work in async mode."""
|
| | net = nc.Network()
|
| | exc = net.population(8, params={"threshold": 500, "leak": 2, "refrac": 2})
|
| | inh = net.population(4, params={"threshold": 400, "leak": 2, "refrac": 2})
|
| | net.connect(exc, inh, topology="fixed_fan_out", fan_out=4, weight=250, seed=42)
|
| | net.connect(inh, exc, topology="fixed_fan_out", fan_out=8, weight=-200, seed=42)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(async_mode=True)
|
| |
|
| | sim.inject(exc[:4], current=600)
|
| | result = sim.run(5)
|
| |
|
| | assert result.total_spikes > 0
|
| | assert result.timesteps == 5
|
| |
|
| | def test_async_no_input_no_spikes(self):
|
| | """No stimulus -> no activity in async mode."""
|
| | net = nc.Network()
|
| | net.population(16, params={"threshold": 500, "leak": 2})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(async_mode=True)
|
| |
|
| | result = sim.run(10)
|
| | assert result.total_spikes == 0
|
| |
|
| | def test_async_inter_core_routing(self):
|
| | """Spikes should propagate across cores in async mode."""
|
| | net = nc.Network()
|
| | a = net.population(NEURONS_PER_CORE, label="core0")
|
| | b = net.population(1, params={"threshold": 100, "leak": 0}, label="core1")
|
| | net.connect(a, b, topology="all_to_all", weight=200)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(async_mode=True)
|
| |
|
| | sim.inject(a[0], current=1200)
|
| | result = sim.run(1)
|
| |
|
| | p = result.placement
|
| | b_gid = p.neuron_map[(b.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(b.id, 0)][1]
|
| | assert 0 in result.spike_trains.get(b_gid, []), \
|
| | "Inter-core spike failed to propagate in async mode"
|
| |
|
| |
|
| | class TestThreeFactorLearning:
|
| | """Tests for P13c 3-factor learning with eligibility traces."""
|
| |
|
| | def test_eligibility_accumulation_no_weight_change(self):
|
| | """Without reward, STDP correlation accumulates eligibility but
|
| | doesn't change weights."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=500)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(learn=True, three_factor=True)
|
| |
|
| |
|
| | sim.inject(src, current=200)
|
| | sim.inject(tgt, current=200)
|
| | sim.run(5)
|
| |
|
| |
|
| | assert len(sim._eligibility) > 0, "Eligibility should accumulate"
|
| |
|
| |
|
| | adj = sim._adjacency
|
| | for targets in adj.values():
|
| | for entry in targets:
|
| | w = entry[1]
|
| | assert w == 500, f"Weight changed without reward: {w}"
|
| |
|
| | def test_reward_changes_weights(self):
|
| | """Positive reward should change weights when eligibility exists."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=500)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(learn=True, three_factor=True)
|
| |
|
| |
|
| | for _ in range(3):
|
| | sim.inject(src, current=200)
|
| | sim.inject(tgt, current=200)
|
| | sim.run(1)
|
| |
|
| |
|
| | sim.reward(500)
|
| | sim.run(1)
|
| |
|
| |
|
| | weight_changed = False
|
| | for targets in sim._adjacency.values():
|
| | for entry in targets:
|
| | w = entry[1]
|
| | if w != 500:
|
| | weight_changed = True
|
| | assert weight_changed, "Reward should modify weights via eligibility"
|
| |
|
| | def test_negative_reward_weakens(self):
|
| | """Negative reward should decrease weights for positive eligibility."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=500)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(learn=True, three_factor=True)
|
| |
|
| |
|
| | for _ in range(3):
|
| | sim.inject(src, current=200)
|
| | sim.run(1)
|
| |
|
| |
|
| | sim.reward(-500)
|
| | sim.run(1)
|
| |
|
| |
|
| | for targets in sim._adjacency.values():
|
| | for entry in targets:
|
| | w = entry[1]
|
| | if w != 500:
|
| |
|
| | assert w < 500, f"Expected weight < 500, got {w}"
|
| |
|
| | def test_eligibility_decays(self):
|
| | """Eligibility should decay over time without reward."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 1})
|
| | tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 1})
|
| | net.connect(src, tgt, topology="all_to_all", weight=500)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(learn=True, three_factor=True)
|
| |
|
| |
|
| |
|
| |
|
| | sim.inject(src, current=200)
|
| | sim.run(1)
|
| |
|
| |
|
| | sim.run(1)
|
| |
|
| | assert len(sim._eligibility) > 0, \
|
| | "Eligibility should accumulate from temporal correlation"
|
| |
|
| |
|
| | for _ in range(100):
|
| | sim.run(1)
|
| |
|
| | assert len(sim._eligibility) == 0, \
|
| | "Eligibility should fully decay without reinforcement"
|
| |
|
| | def test_delayed_reward(self):
|
| | """Reward arriving after delay should still modify weights
|
| | (eligibility hasn't fully decayed)."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=500)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(learn=True, three_factor=True)
|
| |
|
| |
|
| | sim.inject(src, current=200)
|
| | sim.inject(tgt, current=200)
|
| | sim.run(1)
|
| |
|
| |
|
| | sim.run(3)
|
| | assert len(sim._eligibility) > 0, "Eligibility should persist briefly"
|
| |
|
| |
|
| | sim.reward(500)
|
| | sim.run(1)
|
| |
|
| |
|
| | weight_changed = False
|
| | for targets in sim._adjacency.values():
|
| | for entry in targets:
|
| | w = entry[1]
|
| | if w != 500:
|
| | weight_changed = True
|
| | assert weight_changed, "Delayed reward should still modify weights"
|
| |
|
| | def test_three_factor_implies_learn(self):
|
| | """Setting three_factor=True should auto-enable learn."""
|
| | sim = nc.Simulator()
|
| | net = nc.Network()
|
| | net.population(1)
|
| | sim.deploy(net)
|
| | sim.set_learning(three_factor=True)
|
| | assert sim._learn_enable is True
|
| | assert sim._three_factor_enable is True
|
| |
|
| |
|
| | class TestRunResult:
|
| | def test_result_fields(self, chain_network_manual):
|
| | net, n0, _, _, _ = chain_network_manual
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(n0, current=1200)
|
| | result = sim.run(10)
|
| | assert result.backend == "simulator"
|
| | assert result.timesteps == 10
|
| | assert isinstance(result.spike_trains, dict)
|
| |
|
| | def test_firing_rates(self, chain_network_manual):
|
| | net, n0, _, _, _ = chain_network_manual
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(n0, current=1200)
|
| | result = sim.run(10)
|
| | rates = result.firing_rates()
|
| | assert isinstance(rates, dict)
|
| | assert all(r >= 0 for r in rates.values())
|
| |
|
| | def test_spike_count_timeseries(self, chain_network_manual):
|
| | net, n0, _, _, _ = chain_network_manual
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(n0, current=1200)
|
| | result = sim.run(10)
|
| | ts = result.spike_count_timeseries()
|
| | assert len(ts) == 10
|
| |
|
| |
|
| | class TestStochasticNoise:
|
| | """Tests for P14 stochastic noise injection."""
|
| |
|
| | def test_noise_disabled_deterministic(self):
|
| | """With noise_enable=False, identical runs produce identical results."""
|
| | def run_once():
|
| | net = nc.Network()
|
| | pop = net.population(4, params={"threshold": 500, "leak": 3})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | total = 0
|
| | for _ in range(20):
|
| | sim.inject(pop, current=100)
|
| | result = sim.run(1)
|
| | total += result.total_spikes
|
| | return total
|
| |
|
| | assert run_once() == run_once()
|
| |
|
| | def test_noise_enabled_variability(self):
|
| | """With noise_enable=True and non-zero config, results vary due to
|
| | different LFSR evolution per neuron (different noise sequences for
|
| | neurons near threshold)."""
|
| | net = nc.Network()
|
| |
|
| | pop = net.population(16, params={
|
| | "threshold": 200, "leak": 0, "refrac": 0,
|
| | "noise_config": 0x34
|
| | })
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(noise=True)
|
| |
|
| |
|
| | sim.inject(pop, current=200)
|
| | result = sim.run(20)
|
| |
|
| |
|
| |
|
| | trains = result.spike_trains
|
| | spike_sets = [set(trains.get(i, [])) for i in range(16)]
|
| |
|
| |
|
| | unique_patterns = len(set(frozenset(s) for s in spike_sets))
|
| | assert unique_patterns > 1, \
|
| | "All neurons had identical spike patterns despite noise"
|
| |
|
| | def test_zero_config_still_deterministic(self):
|
| | """noise_enable=True but noise_config=0 means no actual noise."""
|
| | def run_once():
|
| | net = nc.Network()
|
| | pop = net.population(4, params={"threshold": 500, "leak": 3})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(noise=True)
|
| | total = 0
|
| | for _ in range(20):
|
| | sim.inject(pop, current=100)
|
| | result = sim.run(1)
|
| | total += result.total_spikes
|
| | return total
|
| |
|
| | assert run_once() == run_once()
|
| |
|
| | def test_noise_config_generates_commands(self):
|
| | """Non-default noise_config should generate PROG_NEURON param_id=5."""
|
| | net = nc.Network()
|
| | net.population(2, params={"noise_config": 0x45})
|
| | from neurocore.compiler import Compiler
|
| | compiled = Compiler().compile(net)
|
| | noise_cmds = [c for c in compiled.prog_neuron_cmds if c["param_id"] == 5]
|
| | assert len(noise_cmds) == 2
|
| | assert noise_cmds[0]["value"] == 0x45
|
| |
|
| |
|
| | class TestDualTraces:
|
| | """Tests for P15 dual spike traces with exponential decay."""
|
| |
|
| | def test_both_traces_set_on_spike(self):
|
| | """After spiking, both trace and trace2 should be TRACE_MAX."""
|
| | net = nc.Network()
|
| | pop = net.population(1, params={"threshold": 100, "leak": 0})
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | sim.inject(pop, current=200)
|
| | sim.run(1)
|
| |
|
| | assert int(sim._trace[0]) == TRACE_MAX
|
| | assert int(sim._trace2[0]) == TRACE_MAX
|
| |
|
| | def test_different_decay_rates(self):
|
| | """tau1=2 should decay faster than tau2=6."""
|
| | net = nc.Network()
|
| | pop = net.population(1, params={
|
| | "threshold": 100, "leak": 0, "refrac": 0,
|
| | "tau1": 2, "tau2": 6
|
| | })
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | sim.inject(pop, current=200)
|
| | sim.run(1)
|
| |
|
| |
|
| | sim.run(5)
|
| |
|
| | trace1 = int(sim._trace[0])
|
| | trace2 = int(sim._trace2[0])
|
| | assert trace1 < trace2, \
|
| | f"trace1 ({trace1}) should be < trace2 ({trace2}) with faster decay"
|
| |
|
| | def test_min_step_1_convergence(self):
|
| | """Traces should reach 0 (no stuck values) via min-step-1."""
|
| | net = nc.Network()
|
| | pop = net.population(1, params={
|
| | "threshold": 100, "leak": 0, "refrac": 0,
|
| | "tau1": 8, "tau2": 8
|
| | })
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| | sim.inject(pop, current=200)
|
| | sim.run(1)
|
| |
|
| |
|
| | sim.run(200)
|
| | assert int(sim._trace[0]) == 0
|
| | assert int(sim._trace2[0]) == 0
|
| |
|
| | def test_stdp_uses_trace1(self):
|
| | """STDP weight updates should use trace1 only (backward compat)."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=500)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.set_learning(learn=True)
|
| |
|
| |
|
| | sim.inject(src, current=200)
|
| | sim.run(1)
|
| | sim.run(1)
|
| |
|
| |
|
| | adj = sim._adjacency
|
| | for targets in adj.values():
|
| | for entry in targets:
|
| | w = entry[1]
|
| | assert w > 500, f"Expected LTP weight increase, got {w}"
|
| |
|
| | def test_default_tau_values(self):
|
| | """Default tau1=3, tau2=4 should be set."""
|
| | net = nc.Network()
|
| | pop = net.population(1)
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | assert int(sim._tau1[0]) == DEFAULT_TAU1
|
| | assert int(sim._tau2[0]) == DEFAULT_TAU2
|
| |
|
| | def test_tau_generates_commands(self):
|
| | """Non-default tau values should generate PROG_NEURON commands."""
|
| | net = nc.Network()
|
| | net.population(2, params={"tau1": 5, "tau2": 7})
|
| | from neurocore.compiler import Compiler
|
| | compiled = Compiler().compile(net)
|
| | tau1_cmds = [c for c in compiled.prog_neuron_cmds if c["param_id"] == 6]
|
| | tau2_cmds = [c for c in compiled.prog_neuron_cmds if c["param_id"] == 7]
|
| | assert len(tau1_cmds) == 2
|
| | assert len(tau2_cmds) == 2
|
| | assert tau1_cmds[0]["value"] == 5
|
| | assert tau2_cmds[0]["value"] == 7
|
| |
|
| |
|
| | class TestAxonDelays:
|
| | """Tests for P17 axon delays."""
|
| |
|
| | def test_delay_zero_backward_compat(self):
|
| | """Chain with delay=0 should behave identically to original."""
|
| | net = nc.Network()
|
| | n0 = net.population(1, params={"threshold": 100, "leak": 0}, label="n0")
|
| | n1 = net.population(1, params={"threshold": 100, "leak": 0}, label="n1")
|
| | net.connect(n0, n1, topology="all_to_all", weight=200, delay=0)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(n0, current=200)
|
| | result = sim.run(5)
|
| |
|
| | p = result.placement
|
| | gid1 = p.neuron_map[(n1.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n1.id, 0)][1]
|
| | assert 1 in result.spike_trains.get(gid1, []), \
|
| | "N1 should spike at t=1 with delay=0"
|
| |
|
| | def test_delay_3_shifts_spike(self):
|
| | """With delay=3, target should spike 3 timesteps later than delay=0."""
|
| | net = nc.Network()
|
| | n0 = net.population(1, params={"threshold": 100, "leak": 0}, label="n0")
|
| | n1 = net.population(1, params={"threshold": 100, "leak": 0}, label="n1")
|
| | net.connect(n0, n1, topology="all_to_all", weight=200, delay=3)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(n0, current=200)
|
| | result = sim.run(10)
|
| |
|
| | p = result.placement
|
| | gid1 = p.neuron_map[(n1.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(n1.id, 0)][1]
|
| | spikes_n1 = result.spike_trains.get(gid1, [])
|
| |
|
| |
|
| |
|
| | assert len(spikes_n1) > 0, "N1 should eventually spike"
|
| | assert spikes_n1[0] > 1, \
|
| | f"N1 first spike at t={spikes_n1[0]}, should be delayed past t=1"
|
| |
|
| | def test_mixed_delays(self):
|
| | """Two targets with different delays should spike at different times."""
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0}, label="src")
|
| | fast = net.population(1, params={"threshold": 100, "leak": 0}, label="fast")
|
| | slow = net.population(1, params={"threshold": 100, "leak": 0}, label="slow")
|
| | net.connect(src, fast, topology="all_to_all", weight=200, delay=1)
|
| | net.connect(src, slow, topology="all_to_all", weight=200, delay=5)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(src, current=200)
|
| | result = sim.run(10)
|
| |
|
| | p = result.placement
|
| | gid_fast = p.neuron_map[(fast.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(fast.id, 0)][1]
|
| | gid_slow = p.neuron_map[(slow.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(slow.id, 0)][1]
|
| | fast_spikes = result.spike_trains.get(gid_fast, [])
|
| | slow_spikes = result.spike_trains.get(gid_slow, [])
|
| | assert len(fast_spikes) > 0 and len(slow_spikes) > 0
|
| | assert fast_spikes[0] < slow_spikes[0], \
|
| | f"Fast ({fast_spikes[0]}) should spike before slow ({slow_spikes[0]})"
|
| |
|
| | def test_delay_validation(self):
|
| | """Invalid delay values should raise ValueError."""
|
| | net = nc.Network()
|
| | src = net.population(1)
|
| | tgt = net.population(1)
|
| | with pytest.raises(ValueError):
|
| | net.connect(src, tgt, weight=200, delay=-1)
|
| | with pytest.raises(ValueError):
|
| | net.connect(src, tgt, weight=200, delay=64)
|
| |
|
| | def test_delay_generates_commands(self):
|
| | """delay>0 should generate PROG_DELAY commands in compiler."""
|
| | net = nc.Network()
|
| | src = net.population(2)
|
| | tgt = net.population(2)
|
| | net.connect(src, tgt, topology="all_to_all", weight=200, delay=5)
|
| | from neurocore.compiler import Compiler
|
| | compiled = Compiler().compile(net)
|
| | assert len(compiled.prog_delay_cmds) == 4
|
| | assert all(c["delay"] == 5 for c in compiled.prog_delay_cmds)
|
| |
|
| |
|
| | class TestSynapseFormats:
|
| | """Tests for P18 synapse formats (sparse, dense, pop)."""
|
| |
|
| | def test_sparse_backward_compat(self):
|
| | """Default format='sparse' should behave identically to pre-P18."""
|
| | net = nc.Network()
|
| | src = net.population(2, params={"threshold": 100, "leak": 0})
|
| | tgt = net.population(2, params={"threshold": 100, "leak": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=200, format='sparse')
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(src, current=200)
|
| | result = sim.run(5)
|
| |
|
| |
|
| | p = result.placement
|
| | gid_t0 = p.neuron_map[(tgt.id, 0)][0] * NEURONS_PER_CORE + p.neuron_map[(tgt.id, 0)][1]
|
| | gid_t1 = p.neuron_map[(tgt.id, 1)][0] * NEURONS_PER_CORE + p.neuron_map[(tgt.id, 1)][1]
|
| | assert 1 in result.spike_trains.get(gid_t0, [])
|
| | assert 1 in result.spike_trains.get(gid_t1, [])
|
| |
|
| | def test_dense_all_to_all(self):
|
| | """Dense format with all_to_all should produce same spikes as sparse."""
|
| | def run_with_format(fmt):
|
| | net = nc.Network()
|
| | src = net.population(2, params={"threshold": 100, "leak": 0})
|
| | tgt = net.population(2, params={"threshold": 100, "leak": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=200, format=fmt)
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(src, current=200)
|
| | result = sim.run(5)
|
| | return result.total_spikes
|
| |
|
| | sparse_spikes = run_with_format('sparse')
|
| | dense_spikes = run_with_format('dense')
|
| | assert sparse_spikes == dense_spikes, \
|
| | f"Dense ({dense_spikes}) should match sparse ({sparse_spikes})"
|
| |
|
| | def test_pop_shared_weight(self):
|
| | """Pop format should produce same spikes as sparse with uniform weights."""
|
| | def run_with_format(fmt):
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0})
|
| | tgt = net.population(4, params={"threshold": 100, "leak": 0})
|
| | net.connect(src, tgt, topology="all_to_all", weight=300, format=fmt)
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(src, current=200)
|
| | result = sim.run(5)
|
| | return result.total_spikes
|
| |
|
| | sparse_spikes = run_with_format('sparse')
|
| | pop_spikes = run_with_format('pop')
|
| | assert sparse_spikes == pop_spikes, \
|
| | f"Pop ({pop_spikes}) should match sparse ({sparse_spikes})"
|
| |
|
| | def test_compiler_format_in_index(self):
|
| | """Compiler should include format field in index commands."""
|
| | from neurocore.compiler import Compiler
|
| | from neurocore.constants import FMT_DENSE, FMT_POP
|
| |
|
| |
|
| | net = nc.Network()
|
| | src = net.population(1)
|
| | tgt = net.population(3)
|
| | net.connect(src, tgt, topology="all_to_all", weight=200, format='dense')
|
| | compiled = Compiler().compile(net)
|
| | assert len(compiled.prog_index_cmds) > 0
|
| | idx = compiled.prog_index_cmds[0]
|
| | assert idx["format"] == FMT_DENSE
|
| | assert "base_target" in idx
|
| |
|
| | def test_pop_format_single_pool_entry(self):
|
| | """Pop format should generate only 1 pool entry regardless of target count."""
|
| | from neurocore.compiler import Compiler
|
| |
|
| | net = nc.Network()
|
| | src = net.population(1)
|
| | tgt = net.population(4)
|
| | net.connect(src, tgt, topology="all_to_all", weight=200, format='pop')
|
| | compiled = Compiler().compile(net)
|
| |
|
| |
|
| | assert len(compiled.prog_pool_cmds) == 1
|
| |
|
| | assert compiled.prog_index_cmds[0]["count"] == 4
|
| |
|
| | def test_invalid_format_raises(self):
|
| | """Invalid format string should raise ValueError."""
|
| | net = nc.Network()
|
| | src = net.population(1)
|
| | tgt = net.population(1)
|
| | with pytest.raises(ValueError, match="Unknown format"):
|
| | net.connect(src, tgt, weight=200, format='invalid')
|
| |
|
| | def test_mixed_formats_same_network(self):
|
| | """Different connections can use different formats in one network."""
|
| | from neurocore.compiler import Compiler
|
| |
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0})
|
| | tgt_sparse = net.population(2, params={"threshold": 100, "leak": 0})
|
| | tgt_dense = net.population(2, params={"threshold": 100, "leak": 0})
|
| | net.connect(src, tgt_sparse, topology="all_to_all", weight=200, format='sparse')
|
| | net.connect(src, tgt_dense, topology="all_to_all", weight=200, format='dense')
|
| |
|
| | compiled = Compiler().compile(net)
|
| |
|
| | formats_used = set(idx["format"] for idx in compiled.prog_index_cmds)
|
| | assert len(formats_used) >= 1
|
| |
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| | sim.inject(src, current=200)
|
| | result = sim.run(5)
|
| | assert result.total_spikes > 0
|
| |
|
| |
|
| | class TestHierarchicalRouting:
|
| | """Tests for P20 hierarchical routing (local vs global routes)."""
|
| |
|
| | def test_intra_cluster_uses_local_routes(self):
|
| | """Routes within a cluster should use prog_route_cmds (local)."""
|
| | from neurocore.compiler import Compiler
|
| |
|
| | net = nc.Network()
|
| |
|
| | a = net.population(NEURONS_PER_CORE, label="core0")
|
| | b = net.population(1, params={"threshold": 100, "leak": 0}, label="core1")
|
| | net.connect(a, b, topology="all_to_all", weight=200)
|
| |
|
| | compiled = Compiler(cluster_size=4).compile(net)
|
| |
|
| | assert len(compiled.prog_route_cmds) > 0
|
| | assert len(compiled.prog_global_route_cmds) == 0
|
| |
|
| | def test_inter_cluster_uses_global_routes(self):
|
| | """Routes across clusters should use prog_global_route_cmds."""
|
| | from neurocore.compiler import Compiler
|
| |
|
| | net = nc.Network()
|
| |
|
| | b = net.population(NEURONS_PER_CORE, label="filler1")
|
| | c = net.population(NEURONS_PER_CORE, label="filler2")
|
| | d = net.population(NEURONS_PER_CORE, label="filler3")
|
| | net.connect(b, b, topology="one_to_one", weight=100)
|
| | net.connect(c, c, topology="one_to_one", weight=100)
|
| | net.connect(d, d, topology="one_to_one", weight=100)
|
| |
|
| | a = net.population(NEURONS_PER_CORE, label="src")
|
| | e = net.population(1, params={"threshold": 100, "leak": 0}, label="tgt")
|
| | net.connect(a, e, topology="all_to_all", weight=200)
|
| |
|
| | compiled = Compiler(cluster_size=4).compile(net)
|
| |
|
| | assert len(compiled.prog_global_route_cmds) > 0, \
|
| | f"Expected global routes, got local: {len(compiled.prog_route_cmds)}"
|
| |
|
| | def test_mixed_local_and_global(self):
|
| | """Source pop can have both local and global route targets."""
|
| | from neurocore.compiler import Compiler
|
| |
|
| | net = nc.Network()
|
| |
|
| |
|
| |
|
| | a = net.population(NEURONS_PER_CORE, label="src")
|
| | b = net.population(NEURONS_PER_CORE, label="local_tgt")
|
| | e = net.population(1, params={"threshold": 100, "leak": 0}, label="global_tgt")
|
| |
|
| | net.connect(a, a, topology="one_to_one", weight=50)
|
| | net.connect(b, b, topology="one_to_one", weight=50)
|
| | net.connect(a, b, topology="one_to_one", weight=200)
|
| | net.connect(a, e, topology="all_to_all", weight=200)
|
| |
|
| |
|
| |
|
| |
|
| | compiled = Compiler(cluster_size=2).compile(net)
|
| | assert len(compiled.prog_route_cmds) > 0, "Should have local routes (a->b)"
|
| | assert len(compiled.prog_global_route_cmds) > 0, "Should have global routes (a->e)"
|
| |
|
| | def test_global_route_overflow(self):
|
| | """Exceeding GLOBAL_ROUTE_SLOTS should raise RouteOverflowError."""
|
| | from neurocore.compiler import Compiler
|
| | from neurocore.exceptions import RouteOverflowError
|
| | from neurocore.constants import GLOBAL_ROUTE_SLOTS
|
| |
|
| | net = nc.Network()
|
| |
|
| | pops = [net.population(NEURONS_PER_CORE) for _ in range(GLOBAL_ROUTE_SLOTS + 2)]
|
| |
|
| | for tgt in pops[1:]:
|
| | net.connect(pops[0], tgt, topology="one_to_one", weight=200)
|
| |
|
| | with pytest.raises(RouteOverflowError):
|
| | Compiler(cluster_size=1).compile(net)
|
| |
|
| | def test_small_network_zero_global_routes(self):
|
| | """A network fitting in one cluster should have zero global routes."""
|
| | from neurocore.compiler import Compiler
|
| |
|
| | net = nc.Network()
|
| | a = net.population(4, params={"threshold": 100, "leak": 0})
|
| | b = net.population(4, params={"threshold": 100, "leak": 0})
|
| | net.connect(a, b, topology="all_to_all", weight=200)
|
| |
|
| | compiled = Compiler(cluster_size=4).compile(net)
|
| |
|
| | assert len(compiled.prog_global_route_cmds) == 0
|
| |
|
| | def test_custom_cluster_size(self):
|
| | """Changing cluster_size should change routing classification."""
|
| | from neurocore.compiler import Compiler
|
| |
|
| | net = nc.Network()
|
| | a = net.population(NEURONS_PER_CORE, label="core0")
|
| | b = net.population(1, params={"threshold": 100, "leak": 0}, label="core1")
|
| | net.connect(a, b, topology="all_to_all", weight=200)
|
| |
|
| |
|
| | compiled_4 = Compiler(cluster_size=4).compile(net)
|
| | assert len(compiled_4.prog_global_route_cmds) == 0
|
| |
|
| |
|
| | compiled_1 = Compiler(cluster_size=1).compile(net)
|
| | assert len(compiled_1.prog_global_route_cmds) > 0
|
| |
|
| |
|
| | class TestWeightMatrix:
|
| | """Test per-synapse weight_matrix connections."""
|
| |
|
| | def test_weight_matrix_basic(self):
|
| | """A 2x2 weight matrix should create per-synapse connections."""
|
| | import numpy as np
|
| |
|
| | net = nc.Network()
|
| | src = net.population(2, params={"threshold": 100, "leak": 0})
|
| | tgt = net.population(2, params={"threshold": 100, "leak": 0})
|
| |
|
| | wm = np.array([[500, 0], [0, 300]], dtype=np.int32)
|
| | net.connect(src, tgt, weight_matrix=wm)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| |
|
| | adj = sim._compiled.adjacency
|
| |
|
| | src0_gid = 0 * 1024 + 0
|
| | found_weights = {entry[1] for entry in adj.get(src0_gid, [])}
|
| | assert 500 in found_weights, f"Expected weight 500 in {found_weights}"
|
| |
|
| | def test_weight_matrix_shape_mismatch(self):
|
| | """Shape mismatch should raise ValueError."""
|
| | import numpy as np
|
| | from neurocore.exceptions import WeightOutOfRangeError
|
| |
|
| | net = nc.Network()
|
| | src = net.population(3)
|
| | tgt = net.population(2)
|
| |
|
| | wm = np.array([[1, 2]], dtype=np.int32)
|
| | with pytest.raises(ValueError, match="weight_matrix shape"):
|
| | net.connect(src, tgt, weight_matrix=wm)
|
| |
|
| | def test_weight_matrix_range_check(self):
|
| | """Weights outside int16 range should raise."""
|
| | import numpy as np
|
| | from neurocore.exceptions import WeightOutOfRangeError
|
| |
|
| | net = nc.Network()
|
| | src = net.population(2)
|
| | tgt = net.population(2)
|
| |
|
| | wm = np.array([[40000, 0], [0, 0]], dtype=np.int32)
|
| | with pytest.raises(WeightOutOfRangeError):
|
| | net.connect(src, tgt, weight_matrix=wm)
|
| |
|
| | def test_weight_matrix_zeros_skipped(self):
|
| | """Zero entries in weight_matrix should not create connections."""
|
| | import numpy as np
|
| |
|
| | net = nc.Network()
|
| | src = net.population(3, params={"threshold": 100, "leak": 0})
|
| | tgt = net.population(3, params={"threshold": 100, "leak": 0})
|
| |
|
| |
|
| | wm = np.diag([100, 200, 300]).astype(np.int32)
|
| | net.connect(src, tgt, weight_matrix=wm)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| |
|
| | total_conns = sum(len(v) for v in sim._compiled.adjacency.values())
|
| | assert total_conns == 3, f"Expected 3 connections, got {total_conns}"
|
| |
|
| | def test_weight_matrix_simulation(self):
|
| | """End-to-end: specific weight_matrix drives correct spike behavior."""
|
| | import numpy as np
|
| |
|
| | net = nc.Network()
|
| | src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| | tgt = net.population(2, params={"threshold": 500, "leak": 0, "refrac": 0})
|
| |
|
| |
|
| | wm = np.array([[600, 200]], dtype=np.int32)
|
| | net.connect(src, tgt, weight_matrix=wm)
|
| |
|
| | sim = nc.Simulator()
|
| | sim.deploy(net)
|
| |
|
| |
|
| | sim.inject(src, current=200)
|
| | sim.run(1)
|
| | result = sim.run(1)
|
| |
|
| |
|
| |
|
| | assert result.total_spikes >= 1
|
| |
|