"""Tests for GPU-accelerated LIF simulator. Validates that GpuSimulator produces identical results to the CPU Simulator across all features: single neuron, chains, inhibition, graded spikes, dendritic compartments, noise, dual traces, axon delays, STDP, 3-factor. """ import pytest import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import neurocore as nc from neurocore.constants import ( DEFAULT_THRESHOLD, DEFAULT_LEAK, DEFAULT_REFRAC, NEURONS_PER_CORE, TRACE_MAX, DEFAULT_TAU1, DEFAULT_TAU2, ) # Skip all tests if PyTorch/CUDA unavailable torch = pytest.importorskip("torch") pytestmark = pytest.mark.skipif( not torch.cuda.is_available(), reason="CUDA not available", ) def _get_gpu_device(): """Get best available GPU device.""" if torch.cuda.device_count() > 1: return torch.device("cuda:1") return torch.device("cuda:0") def _gid(placement, pop, neuron_idx=0): """Helper: population neuron index -> global ID.""" core, nid = placement.neuron_map[(pop.id, neuron_idx)] return core * NEURONS_PER_CORE + nid def _run_cpu(net, stimulus_fn, timesteps, learn_cfg=None): """Run network on CPU simulator with given stimulus pattern.""" sim = nc.Simulator() sim.deploy(net) if learn_cfg: sim.set_learning(**learn_cfg) return _run_sim(sim, stimulus_fn, timesteps) def _run_gpu(net, stimulus_fn, timesteps, learn_cfg=None): """Run network on GPU simulator with given stimulus pattern.""" sim = nc.GpuSimulator(device=_get_gpu_device()) sim.deploy(net) if learn_cfg: sim.set_learning(**learn_cfg) return _run_sim(sim, stimulus_fn, timesteps) def _run_sim(sim, stimulus_fn, timesteps): """Run stimulus pattern then collect results.""" if stimulus_fn is None: return sim.run(timesteps) # stimulus_fn(sim, t) called per timestep all_trains = {} total = 0 for t in range(timesteps): stimulus_fn(sim, t) result = sim.run(1) total += result.total_spikes for gid, times in result.spike_trains.items(): if gid not in all_trains: all_trains[gid] = [] all_trains[gid].extend([t_ + t for t_ in times]) # Return a combined result-like object return _CombinedResult(total, timesteps, all_trains, result.placement) class _CombinedResult: """Lightweight result aggregator for multi-run tests.""" def __init__(self, total_spikes, timesteps, spike_trains, placement): self.total_spikes = total_spikes self.timesteps = timesteps self.spike_trains = spike_trains self.placement = placement def _assert_trains_match(cpu_result, gpu_result, msg=""): """Assert spike trains from CPU and GPU match exactly.""" cpu_trains = cpu_result.spike_trains gpu_trains = gpu_result.spike_trains all_gids = set(cpu_trains.keys()) | set(gpu_trains.keys()) for gid in sorted(all_gids): cpu_times = cpu_trains.get(gid, []) gpu_times = gpu_trains.get(gid, []) assert cpu_times == gpu_times, ( f"{msg}GID {gid}: CPU spikes={cpu_times}, GPU spikes={gpu_times}" ) assert cpu_result.total_spikes == gpu_result.total_spikes, ( f"{msg}Total: CPU={cpu_result.total_spikes}, GPU={gpu_result.total_spikes}" ) class TestSingleNeuronGPU: def test_constant_input_spike_timing(self): """CPU vs GPU: single neuron with constant input, same spike times.""" net = nc.Network() pop = net.population(1, params={"threshold": 1000, "leak": 3}) def stim(sim, t): sim.inject(pop, current=200) cpu = _run_cpu(net, stim, 20) gpu = _run_gpu(net, stim, 20) _assert_trains_match(cpu, gpu, "SingleNeuron constant input: ") def test_refractory_period(self): """CPU vs GPU: refractory timing matches.""" net = nc.Network() pop = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 3}) def stim(sim, t): sim.inject(pop, current=200) cpu = _run_cpu(net, stim, 20) gpu = _run_gpu(net, stim, 20) _assert_trains_match(cpu, gpu, "Refractory: ") def test_subthreshold_no_spikes(self): """Below-threshold input produces no spikes on either backend.""" net = nc.Network() pop = net.population(1, params={"threshold": 1000, "leak": 100, "resting": 0}) def stim(sim, t): sim.inject(pop, current=50) cpu = _run_cpu(net, stim, 10) gpu = _run_gpu(net, stim, 10) assert cpu.total_spikes == 0 assert gpu.total_spikes == 0 class TestChainPropagationGPU: def test_spike_chain_4_neurons(self): """CPU vs GPU: 4-neuron chain propagation matches exactly.""" net = nc.Network() n0 = net.population(1, label="n0") n1 = net.population(1, label="n1") n2 = net.population(1, label="n2") n3 = net.population(1, label="n3") net.connect(n0, n1, topology="all_to_all", weight=1200) net.connect(n1, n2, topology="all_to_all", weight=1200) net.connect(n2, n3, topology="all_to_all", weight=1200) def stim(sim, t): if t == 0: sim.inject(n0, current=1200) cpu = _run_cpu(net, stim, 10) gpu = _run_gpu(net, stim, 10) _assert_trains_match(cpu, gpu, "Chain: ") # Verify chain timing p = cpu.placement assert 0 in cpu.spike_trains.get(_gid(p, n0), []) assert 1 in cpu.spike_trains.get(_gid(p, n1), []) assert 2 in cpu.spike_trains.get(_gid(p, n2), []) assert 3 in cpu.spike_trains.get(_gid(p, n3), []) class TestInhibitionGPU: def test_inhibitory_weight_prevents_spike(self): """CPU vs GPU: inhibition suppresses target spike on both.""" net = nc.Network() exc = net.population(1, label="exc") inh = net.population(1, label="inh") target = net.population(1, label="target") net.connect(exc, target, topology="all_to_all", weight=500) net.connect(inh, target, topology="all_to_all", weight=-600) def stim(sim, t): if t == 0: sim.inject(exc, current=1200) sim.inject(inh, current=1200) cpu = _run_cpu(net, stim, 5) gpu = _run_gpu(net, stim, 5) _assert_trains_match(cpu, gpu, "Inhibition: ") # Target should not spike at t=1 (net input = 500-600 = -100) p = cpu.placement tgt_gid = _gid(p, target) assert 1 not in cpu.spike_trains.get(tgt_gid, []) assert 1 not in gpu.spike_trains.get(tgt_gid, []) class TestGradedSpikesGPU: def test_graded_payload_scaling(self): """CPU vs GPU: graded spike delivery matches.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0}) tgt = net.population(1, params={"threshold": 1000, "leak": 0}) net.connect(src, tgt, topology="all_to_all", weight=200) def stim(sim, t): if t == 0: sim.inject(src, current=500) cfg = {"graded": True} cpu = _run_cpu(net, stim, 5, learn_cfg=cfg) gpu = _run_gpu(net, stim, 5, learn_cfg=cfg) _assert_trains_match(cpu, gpu, "Graded: ") class TestDendriticCompartmentsGPU: def test_dendritic_threshold_suppression(self): """CPU vs GPU: dendritic threshold suppresses sub-threshold input.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0}) tgt = net.population(1, params={ "threshold": 1000, "leak": 0, "dend_threshold": 500 }) net.connect(src, tgt, topology="all_to_all", weight=200, compartment=1) def stim(sim, t): if t == 0: sim.inject(src, current=200) cfg = {"dendritic": True} cpu = _run_cpu(net, stim, 5, learn_cfg=cfg) gpu = _run_gpu(net, stim, 5, learn_cfg=cfg) _assert_trains_match(cpu, gpu, "Dendritic: ") # Target should not spike (200 weight < 500 dendrite threshold) assert cpu.total_spikes == 1 # only src assert gpu.total_spikes == 1 class TestNoiseGPU: def test_noise_disabled_deterministic(self): """Without noise, CPU and GPU produce identical results.""" net = nc.Network() pop = net.population(4, params={"threshold": 500, "leak": 3}) def stim(sim, t): sim.inject(pop, current=100) cpu = _run_cpu(net, stim, 20) gpu = _run_gpu(net, stim, 20) _assert_trains_match(cpu, gpu, "NoNoise: ") def test_noise_enabled_matches_cpu(self): """With noise enabled, GPU LFSR sequence matches CPU.""" net = nc.Network() pop = net.population(4, params={ "threshold": 500, "leak": 3, "noise_config": 0x34, # mantissa=4, exponent=3 }) def stim(sim, t): sim.inject(pop, current=100) cfg = {"noise": True} cpu = _run_cpu(net, stim, 20, learn_cfg=cfg) gpu = _run_gpu(net, stim, 20, learn_cfg=cfg) _assert_trains_match(cpu, gpu, "Noise: ") class TestDualTracesGPU: def test_both_traces_set_on_spike(self): """After spiking, both traces should be TRACE_MAX on GPU.""" net = nc.Network() pop = net.population(1, params={"threshold": 100, "leak": 0}) sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.inject(pop, current=200) sim_gpu.run(1) assert int(sim_gpu._trace[0].item()) == TRACE_MAX assert int(sim_gpu._trace2[0].item()) == TRACE_MAX def test_different_decay_rates(self): """tau1=2 decays faster than tau2=6 — identical on GPU and CPU.""" net = nc.Network() pop = net.population(1, params={ "threshold": 100, "leak": 0, "refrac": 0, "tau1": 2, "tau2": 6, }) # CPU sim_cpu = nc.Simulator() sim_cpu.deploy(net) sim_cpu.inject(pop, current=200) sim_cpu.run(1) # spike sim_cpu.run(5) # decay cpu_t1 = int(sim_cpu._trace[0]) cpu_t2 = int(sim_cpu._trace2[0]) # GPU sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.inject(pop, current=200) sim_gpu.run(1) # spike sim_gpu.run(5) # decay gpu_t1 = int(sim_gpu._trace[0].item()) gpu_t2 = int(sim_gpu._trace2[0].item()) assert cpu_t1 == gpu_t1, f"trace1: CPU={cpu_t1}, GPU={gpu_t1}" assert cpu_t2 == gpu_t2, f"trace2: CPU={cpu_t2}, GPU={gpu_t2}" assert cpu_t1 < cpu_t2 # faster decay def test_min_step_1_convergence(self): """Traces reach 0 via min-step-1, same on CPU and GPU.""" net = nc.Network() pop = net.population(1, params={ "threshold": 100, "leak": 0, "refrac": 0, "tau1": 8, "tau2": 8, }) sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.inject(pop, current=200) sim_gpu.run(1) # spike sim_gpu.run(200) # long decay assert int(sim_gpu._trace[0].item()) == 0 assert int(sim_gpu._trace2[0].item()) == 0 class TestAxonDelaysGPU: def test_delay_zero_backward_compat(self): """delay=0: CPU vs GPU identical timing.""" net = nc.Network() n0 = net.population(1, params={"threshold": 100, "leak": 0}, label="n0") n1 = net.population(1, params={"threshold": 100, "leak": 0}, label="n1") net.connect(n0, n1, topology="all_to_all", weight=200, delay=0) def stim(sim, t): if t == 0: sim.inject(n0, current=200) cpu = _run_cpu(net, stim, 5) gpu = _run_gpu(net, stim, 5) _assert_trains_match(cpu, gpu, "Delay0: ") def test_delay_3_shifts_spike(self): """delay=3: CPU vs GPU produce same shifted spike time.""" net = nc.Network() n0 = net.population(1, params={"threshold": 100, "leak": 0}, label="n0") n1 = net.population(1, params={"threshold": 100, "leak": 0}, label="n1") net.connect(n0, n1, topology="all_to_all", weight=200, delay=3) def stim(sim, t): if t == 0: sim.inject(n0, current=200) cpu = _run_cpu(net, stim, 10) gpu = _run_gpu(net, stim, 10) _assert_trains_match(cpu, gpu, "Delay3: ") # n1 should spike later than t=1 p = cpu.placement n1_spikes = cpu.spike_trains.get(_gid(p, n1), []) assert len(n1_spikes) > 0 assert n1_spikes[0] > 1 def test_mixed_delays(self): """Two targets with different delays: CPU vs GPU match.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0}, label="src") fast = net.population(1, params={"threshold": 100, "leak": 0}, label="fast") slow = net.population(1, params={"threshold": 100, "leak": 0}, label="slow") net.connect(src, fast, topology="all_to_all", weight=200, delay=1) net.connect(src, slow, topology="all_to_all", weight=200, delay=5) def stim(sim, t): if t == 0: sim.inject(src, current=200) cpu = _run_cpu(net, stim, 10) gpu = _run_gpu(net, stim, 10) _assert_trains_match(cpu, gpu, "MixedDelay: ") class TestSynapseFormatsGPU: def test_dense_matches_cpu(self): """Dense format: CPU vs GPU identical.""" net = nc.Network() src = net.population(2, params={"threshold": 100, "leak": 0}) tgt = net.population(2, params={"threshold": 100, "leak": 0}) net.connect(src, tgt, topology="all_to_all", weight=200, format='dense') def stim(sim, t): if t == 0: sim.inject(src, current=200) cpu = _run_cpu(net, stim, 5) gpu = _run_gpu(net, stim, 5) _assert_trains_match(cpu, gpu, "Dense: ") def test_pop_matches_cpu(self): """Pop format: CPU vs GPU identical.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0}) tgt = net.population(4, params={"threshold": 100, "leak": 0}) net.connect(src, tgt, topology="all_to_all", weight=300, format='pop') def stim(sim, t): if t == 0: sim.inject(src, current=200) cpu = _run_cpu(net, stim, 5) gpu = _run_gpu(net, stim, 5) _assert_trains_match(cpu, gpu, "Pop: ") class TestSTDPGPU: def test_ltp_weight_increase(self): """Pre-before-post should increase weight on both backends.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) net.connect(src, tgt, topology="all_to_all", weight=500) cfg = {"learn": True} # CPU sim_cpu = nc.Simulator() sim_cpu.deploy(net) sim_cpu.set_learning(**cfg) sim_cpu.inject(src, current=200) sim_cpu.run(1) # src spikes t=0 sim_cpu.run(1) # tgt gets 500 >= threshold, spikes t=1 -> LTP cpu_w = None for targets in sim_cpu._adjacency.values(): for entry in targets: cpu_w = entry[1] # GPU sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.set_learning(**cfg) sim_gpu.inject(src, current=200) sim_gpu.run(1) sim_gpu.run(1) # Sync weights back gpu_adj = sim_gpu.get_weights() gpu_w = None for targets in gpu_adj.values(): for entry in targets: gpu_w = entry[1] assert cpu_w is not None and cpu_w > 500, f"CPU LTP failed: w={cpu_w}" assert gpu_w is not None and gpu_w > 500, f"GPU LTP failed: w={gpu_w}" assert cpu_w == gpu_w, f"Weight mismatch: CPU={cpu_w}, GPU={gpu_w}" def test_stdp_weight_evolution_100_steps(self): """Run 100 timesteps of STDP, CPU vs GPU weights match.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 1}) tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 1}) net.connect(src, tgt, topology="all_to_all", weight=500) cfg = {"learn": True} def stim(sim, t): sim.inject(src, current=200) # CPU sim_cpu = nc.Simulator() sim_cpu.deploy(net) sim_cpu.set_learning(**cfg) for t in range(100): sim_cpu.inject(src, current=200) sim_cpu.run(1) cpu_w = None for targets in sim_cpu._adjacency.values(): for entry in targets: cpu_w = entry[1] # GPU sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.set_learning(**cfg) for t in range(100): sim_gpu.inject(src, current=200) sim_gpu.run(1) gpu_adj = sim_gpu.get_weights() gpu_w = None for targets in gpu_adj.values(): for entry in targets: gpu_w = entry[1] assert cpu_w == gpu_w, f"100-step STDP: CPU={cpu_w}, GPU={gpu_w}" class TestThreeFactorGPU: def test_no_reward_no_weight_change(self): """Without reward, weights unchanged on both backends.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) net.connect(src, tgt, topology="all_to_all", weight=500) cfg = {"learn": True, "three_factor": True} # GPU sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.set_learning(**cfg) sim_gpu.inject(src, current=200) sim_gpu.inject(tgt, current=200) sim_gpu.run(5) gpu_adj = sim_gpu.get_weights() for targets in gpu_adj.values(): for entry in targets: assert entry[1] == 500, f"Weight changed without reward: {entry[1]}" def test_reward_changes_weight(self): """Positive reward should change weights on GPU.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) net.connect(src, tgt, topology="all_to_all", weight=500) cfg = {"learn": True, "three_factor": True} sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.set_learning(**cfg) for _ in range(3): sim_gpu.inject(src, current=200) sim_gpu.inject(tgt, current=200) sim_gpu.run(1) sim_gpu.reward(500) sim_gpu.run(1) gpu_adj = sim_gpu.get_weights() weight_changed = False for targets in gpu_adj.values(): for entry in targets: if entry[1] != 500: weight_changed = True assert weight_changed, "Reward should modify weights via eligibility" def test_three_factor_cpu_gpu_match(self): """Full 3-factor sequence: CPU vs GPU weight match.""" net = nc.Network() src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0}) net.connect(src, tgt, topology="all_to_all", weight=500) cfg = {"learn": True, "three_factor": True} # CPU sim_cpu = nc.Simulator() sim_cpu.deploy(net) sim_cpu.set_learning(**cfg) for _ in range(3): sim_cpu.inject(src, current=200) sim_cpu.inject(tgt, current=200) sim_cpu.run(1) sim_cpu.reward(500) sim_cpu.run(1) cpu_w = None for targets in sim_cpu._adjacency.values(): for entry in targets: cpu_w = entry[1] # GPU sim_gpu = nc.GpuSimulator(device=_get_gpu_device()) sim_gpu.deploy(net) sim_gpu.set_learning(**cfg) for _ in range(3): sim_gpu.inject(src, current=200) sim_gpu.inject(tgt, current=200) sim_gpu.run(1) sim_gpu.reward(500) sim_gpu.run(1) gpu_adj = sim_gpu.get_weights() gpu_w = None for targets in gpu_adj.values(): for entry in targets: gpu_w = entry[1] assert cpu_w == gpu_w, f"3-factor: CPU={cpu_w}, GPU={gpu_w}" class TestScalingGPU: @pytest.mark.parametrize("n_neurons,p", [(64, 0.1), (256, 0.05), (1024, 0.015)]) def test_multi_neuron_match(self, n_neurons, p): """CPU vs GPU exact match at various scales.""" net = nc.Network() pop = net.population(n_neurons, params={"threshold": 500, "leak": 3}) net.connect(pop, pop, topology="random_sparse", p=p, weight=200, seed=42) def stim(sim, t): if t < 5: sim.inject(pop[:8], current=1200) cpu = _run_cpu(net, stim, 20) gpu = _run_gpu(net, stim, 20) _assert_trains_match(cpu, gpu, f"Scale {n_neurons}: ") def test_4096_neurons_runs(self): """4096 neurons runs on GPU without error (no CPU comparison for speed).""" net = nc.Network() pop = net.population(4096, params={"threshold": 500, "leak": 3}) net.connect(pop, pop, topology="fixed_fan_out", fan_out=4, weight=200, seed=42) sim = nc.GpuSimulator(device=_get_gpu_device()) sim.deploy(net) sim.inject(pop[:16], current=1200) result = sim.run(10) assert result.total_spikes > 0 assert result.timesteps == 10 sim.close() class TestRunResultGPU: def test_backend_tag(self): """GPU results should report backend='gpu_simulator'.""" net = nc.Network() pop = net.population(4) sim = nc.GpuSimulator(device=_get_gpu_device()) sim.deploy(net) result = sim.run(1) assert result.backend == "gpu_simulator" def test_status(self): """status() should return timestep count.""" net = nc.Network() pop = net.population(4) sim = nc.GpuSimulator(device=_get_gpu_device()) sim.deploy(net) sim.run(5) s = sim.status() assert s["timestep_count"] == 5 def test_async_raises(self): """Async mode should raise NeurocoreError on GPU.""" net = nc.Network() pop = net.population(4) sim = nc.GpuSimulator(device=_get_gpu_device()) sim.deploy(net) with pytest.raises(nc.NeurocoreError): sim.set_learning(async_mode=True)