"""Cycle-accurate software LIF simulator matching scalable_core_v2.v. Sync mode: Pipeline order per timestep: DELIVER -> UPDATE -> LEARN Async mode (P12 GALS): Event-driven micro-steps until quiescence. P13 update: - 1024 neurons per core (NEURONS_PER_CORE=1024) - CSR pool connectivity (variable fanout) - Multicast inter-core routing (up to 8 destinations) - 3-factor learning: eligibility traces + reward modulation """ import numpy as np from collections import defaultdict from .backend import Backend from .compiler import Compiler, CompiledNetwork from .network import Network, Population, PopulationSlice from .constants import ( MAX_CORES, NEURONS_PER_CORE, GRADE_SHIFT, TRACE_MAX, TRACE_DECAY, LEARN_SHIFT, WEIGHT_MAX_STDP, WEIGHT_MIN_STDP, REWARD_SHIFT, ELIG_DECAY_SHIFT, ELIG_MAX, DEFAULT_THRESHOLD, DEFAULT_LEAK, DEFAULT_RESTING, DEFAULT_REFRAC, DEFAULT_DEND_THRESHOLD, DEFAULT_NOISE_CONFIG, DEFAULT_TAU1, DEFAULT_TAU2, NOISE_LFSR_SEED, NOISE_LFSR_TAPS, DELAY_QUEUE_BUCKETS, ) from .microcode import ( execute_program, R_TRACE1, R_TRACE2, R_WEIGHT, R_ELIG, R_CONST, R_TEMP0, R_TEMP1, R_REWARD, LTD_START, LTD_END, LTP_START, LTP_END, ) from .exceptions import NeurocoreError # Safety limit to prevent infinite loops in async mode ASYNC_MAX_MICRO_STEPS = 10000 class Simulator(Backend): """Cycle-accurate Python LIF simulator.""" def __init__(self, num_cores=MAX_CORES): self.max_cores = num_cores self._compiled = None # Use large pool_depth for simulation (no hardware constraint) self._compiler = Compiler(max_cores=num_cores, pool_depth=2**20) self._n = 0 # total neurons # Neuron state self._potential = None self._refrac = None self._trace = None # Per-neuron parameters self._threshold = None self._leak = None self._resting = None self._refrac_period = None self._dend_threshold = None # Connection tables # Full adjacency: src_global -> [(tgt_global, weight, compartment)] self._adjacency = None # Split for async: intra-core and inter-core self._intra_core_adj = None self._inter_core_adj = None # P14 Noise state self._noise_config = None self._noise_enable = False self._lfsr = None # P15 Dual trace state self._trace2 = None self._tau1 = None self._tau2 = None # P19 microcode learning rule self._learning_rule = None # Config flags self._learn_enable = False self._graded_enable = False self._dendritic_enable = False self._async_enable = False self._three_factor_enable = False # P13c self._noise_enable = False # P14 # Stimulus buffer: neuron_global_id -> current self._ext_current = None # Pending spikes from previous timestep: [(global_id, payload)] self._pending_spikes = [] # P17 delay queue: {timestep_bucket: [(tgt_gid, delivered_current, comp)]} self._delay_queue = None # Timestep counter self._timestep_count = 0 # 3-factor learning state (P13c) # eligibility per synapse: {(src_gid, tgt_gid): elig_value} self._eligibility = None self._reward_value = 0 # current reward signal self._reward_pending = False # whether reward was set for this timestep def deploy(self, network_or_compiled): """Compile (if needed) and initialize simulator state.""" if isinstance(network_or_compiled, Network): self._compiled = self._compiler.compile(network_or_compiled) elif isinstance(network_or_compiled, CompiledNetwork): self._compiled = network_or_compiled else: raise TypeError(f"Expected Network or CompiledNetwork, got {type(network_or_compiled)}") n = self._compiled.placement.total_neurons self._n = n # Initialize neuron state arrays self._potential = np.zeros(n, dtype=np.int32) self._refrac = np.zeros(n, dtype=np.int32) self._trace = np.zeros(n, dtype=np.int32) self._ext_current = np.zeros(n, dtype=np.int32) # Per-neuron parameters from compiled network self._threshold = np.full(n, DEFAULT_THRESHOLD, dtype=np.int32) self._leak = np.full(n, DEFAULT_LEAK, dtype=np.int32) self._resting = np.full(n, DEFAULT_RESTING, dtype=np.int32) self._refrac_period = np.full(n, DEFAULT_REFRAC, dtype=np.int32) self._dend_threshold = np.full(n, DEFAULT_DEND_THRESHOLD, dtype=np.int32) self._noise_config = np.full(n, DEFAULT_NOISE_CONFIG, dtype=np.uint8) self._tau1 = np.full(n, DEFAULT_TAU1, dtype=np.int32) self._tau2 = np.full(n, DEFAULT_TAU2, dtype=np.int32) self._trace2 = np.zeros(n, dtype=np.int32) # Seed LFSRs differently per neuron (RTL uses one LFSR per core, # advanced per neuron — each neuron sees a different LFSR state) self._lfsr = np.zeros(n, dtype=np.uint16) lfsr = NOISE_LFSR_SEED for gid in range(n): self._lfsr[gid] = lfsr # Advance LFSR to give each neuron a unique starting state bit = lfsr & 1 lfsr >>= 1 if bit: lfsr ^= NOISE_LFSR_TAPS for gid, params in self._compiled.neuron_params.items(): if gid < n: self._threshold[gid] = params.threshold self._leak[gid] = params.leak self._resting[gid] = params.resting self._refrac_period[gid] = params.refrac self._dend_threshold[gid] = params.dend_threshold self._noise_config[gid] = params.noise_config self._tau1[gid] = params.tau1 self._tau2[gid] = params.tau2 # Build adjacency from compiled network self._adjacency = dict(self._compiled.adjacency) # Build split adjacency for async mode (4-tuple: tgt, weight, comp, delay) self._intra_core_adj = defaultdict(list) self._inter_core_adj = defaultdict(list) for src_gid, targets in self._adjacency.items(): src_core = src_gid // NEURONS_PER_CORE for entry in targets: tgt_gid, weight, comp = entry[0], entry[1], entry[2] delay = entry[3] if len(entry) > 3 else 0 tgt_core = tgt_gid // NEURONS_PER_CORE if src_core == tgt_core: self._intra_core_adj[src_gid].append((tgt_gid, weight, comp, delay)) else: self._inter_core_adj[src_gid].append((tgt_gid, weight, comp, delay)) # Apply learn config cfg = self._compiled.learn_config self._learn_enable = cfg.get("learn_enable", False) self._graded_enable = cfg.get("graded_enable", False) self._dendritic_enable = cfg.get("dendritic_enable", False) self._async_enable = cfg.get("async_enable", False) self._noise_enable = cfg.get("noise_enable", False) # P19: Load custom learning rule if present self._learning_rule = self._compiled.learning_rule # Initialize eligibility table (P13c) self._eligibility = defaultdict(int) self._reward_value = 0 self._reward_pending = False self._pending_spikes = [] self._delay_queue = defaultdict(list) self._timestep_count = 0 def inject(self, target, current): """Set external stimulus current for specified neurons.""" if self._compiled is None: raise NeurocoreError("No network deployed. Call deploy() first.") resolved = self._resolve_targets(target) for core, neuron in resolved: gid = core * NEURONS_PER_CORE + neuron if gid < self._n: self._ext_current[gid] = current def reward(self, value): """Set reward signal for next run() call (P13c 3-factor learning). Positive reward strengthens eligible synapses, negative weakens them. Only applied when 3-factor learning is enabled. """ self._reward_value = int(value) self._reward_pending = True def run(self, timesteps): """Execute timesteps and return RunResult with full spike trains.""" from .result import RunResult if self._compiled is None: raise NeurocoreError("No network deployed. Call deploy() first.") if self._async_enable: return self._run_async(timesteps) return self._run_sync(timesteps) def _run_sync(self, timesteps): """Synchronous execution: all cores run every timestep.""" from .result import RunResult n = self._n spike_trains = defaultdict(list) total_spikes = 0 # Mutable weight table for learning (copy from adjacency) weights = {} if self._learn_enable: for src, targets in self._adjacency.items(): weights[src] = list(targets) for t in range(timesteps): acc_soma = np.zeros(n, dtype=np.int32) acc_dend = [np.zeros(n, dtype=np.int32) for _ in range(3)] bucket = self._timestep_count % DELAY_QUEUE_BUCKETS for tgt_gid, delivered, comp in self._delay_queue.pop(bucket, []): if comp == 0: acc_soma[tgt_gid] += delivered elif 1 <= comp <= 3: acc_dend[comp - 1][tgt_gid] += delivered for spike_gid, payload in self._pending_spikes: adj = (weights if self._learn_enable else self._adjacency) targets = adj.get(spike_gid, []) for entry in targets: tgt_gid, weight, comp = entry[0], entry[1], entry[2] delay = entry[3] if len(entry) > 3 else 0 if tgt_gid >= n: continue if self._graded_enable: delivered = (weight * payload) >> GRADE_SHIFT else: delivered = weight if delay > 0: future = (self._timestep_count + delay) % DELAY_QUEUE_BUCKETS self._delay_queue[future].append((tgt_gid, delivered, comp)) elif comp == 0: acc_soma[tgt_gid] += delivered elif 1 <= comp <= 3: acc_dend[comp - 1][tgt_gid] += delivered acc_soma += self._ext_current new_spikes = self._update_neurons(range(n), acc_soma, acc_dend) total_spikes += len(new_spikes) for gid, payload in new_spikes: spike_trains[gid].append(t) if self._learn_enable: if self._three_factor_enable: # 3-factor: STDP -> eligibility, then reward -> weight self._elig_update(weights, new_spikes) if self._reward_pending: self._reward_apply(weights) self._reward_pending = False self._elig_decay() else: # 2-factor: direct STDP weight update self._stdp_update(weights, new_spikes) self._pending_spikes = new_spikes self._ext_current[:] = 0 self._timestep_count += 1 if self._learn_enable: self._adjacency = weights return RunResult( total_spikes=total_spikes, timesteps=timesteps, spike_trains=dict(spike_trains), placement=self._compiled.placement, backend="simulator", ) def _run_async(self, timesteps): """Async event-driven execution matching P12 GALS. Each timestep runs micro-steps until quiescence: 1. External stimulus -> per-core injection FIFOs (PCIFs) 2. Loop: a. Cores with non-empty PCIFs: deliver input, run UPDATE b. Inter-core spikes -> route to destination PCIFs c. Intra-core spikes -> mark core for restart (deferred restart) d. All quiet -> quiescence -> timestep done 3. Only neurons in active cores get updated """ from .result import RunResult n = self._n num_cores = self._compiled.placement.num_cores_used spike_trains = defaultdict(list) total_spikes = 0 for t in range(timesteps): # Per-core injection FIFOs: core_id -> [(neuron_gid, current)] pcif = defaultdict(list) # Buffer external stimulus into PCIFs for gid in range(n): if self._ext_current[gid] != 0: core = gid // NEURONS_PER_CORE pcif[core].append((gid, int(self._ext_current[gid]))) # Also buffer pending inter-core spikes from previous timestep for spike_gid, payload in self._pending_spikes: targets = self._inter_core_adj.get(spike_gid, []) for entry in targets: tgt_gid, weight, comp = entry[0], entry[1], entry[2] if tgt_gid >= n: continue tgt_core = tgt_gid // NEURONS_PER_CORE if self._graded_enable: delivered = (weight * payload) >> GRADE_SHIFT else: delivered = weight pcif[tgt_core].append((tgt_gid, delivered, comp)) # Buffer pending intra-core spikes core_internal_spikes = defaultdict(list) for spike_gid, payload in self._pending_spikes: src_core = spike_gid // NEURONS_PER_CORE intra_targets = self._intra_core_adj.get(spike_gid, []) for entry in intra_targets: tgt_gid, weight, comp = entry[0], entry[1], entry[2] if self._graded_enable: delivered = (weight * payload) >> GRADE_SHIFT else: delivered = weight core_internal_spikes[src_core].append((tgt_gid, delivered, comp)) core_needs_restart = set() all_new_spikes = [] micro_step = 0 while micro_step < ASYNC_MAX_MICRO_STEPS: micro_step += 1 active_cores = set() for c in range(num_cores): if pcif[c] or core_internal_spikes[c] or c in core_needs_restart: active_cores.add(c) if not active_cores: break # quiescence new_inter_core = [] core_needs_restart_next = set() for core_id in sorted(active_cores): core_start = core_id * NEURONS_PER_CORE core_end = min(core_start + NEURONS_PER_CORE, n) acc_soma = np.zeros(n, dtype=np.int32) acc_dend = [np.zeros(n, dtype=np.int32) for _ in range(3)] # Deliver PCIF entries for entry in pcif[core_id]: if len(entry) == 2: gid, current = entry acc_soma[gid] += current else: gid, current, comp = entry if comp == 0: acc_soma[gid] += current elif 1 <= comp <= 3: acc_dend[comp - 1][gid] += current pcif[core_id] = [] # Deliver internal spikes for entry in core_internal_spikes[core_id]: tgt_gid, delivered, comp = entry if comp == 0: acc_soma[tgt_gid] += delivered elif 1 <= comp <= 3: acc_dend[comp - 1][tgt_gid] += delivered core_internal_spikes[core_id] = [] core_needs_restart.discard(core_id) # Run UPDATE for ALL neurons in this core neuron_range = range(core_start, core_end) core_spikes = self._update_neurons(neuron_range, acc_soma, acc_dend) if core_spikes: core_needs_restart_next.add(core_id) for spike_gid, payload in core_spikes: all_new_spikes.append((spike_gid, payload)) spike_trains[spike_gid].append(t) # Intra-core targets -> buffer for restart intra_targets = self._intra_core_adj.get(spike_gid, []) for entry in intra_targets: tgt_gid, weight, comp = entry[0], entry[1], entry[2] if self._graded_enable: delivered = (weight * payload) >> GRADE_SHIFT else: delivered = weight core_internal_spikes[core_id].append( (tgt_gid, delivered, comp)) # Inter-core targets -> route to dest PCIF inter_targets = self._inter_core_adj.get(spike_gid, []) for entry in inter_targets: tgt_gid, weight, comp = entry[0], entry[1], entry[2] if tgt_gid >= n: continue tgt_core = tgt_gid // NEURONS_PER_CORE if self._graded_enable: delivered = (weight * payload) >> GRADE_SHIFT else: delivered = weight pcif[tgt_core].append((tgt_gid, delivered, comp)) core_needs_restart = core_needs_restart_next total_spikes += len(all_new_spikes) self._pending_spikes = [] self._ext_current[:] = 0 self._timestep_count += 1 return RunResult( total_spikes=total_spikes, timesteps=timesteps, spike_trains=dict(spike_trains), placement=self._compiled.placement, backend="simulator", ) def _decay_trace(self, trace_val, tau): """P15 exponential trace decay with min-step-1 fix.""" if trace_val <= 0: return 0 decay = trace_val >> tau if decay == 0: decay = 1 # min-step-1: always decay by at least 1 return max(0, trace_val - decay) def _advance_lfsr(self, i): """Advance per-neuron 16-bit Galois LFSR (x^16+x^14+x^13+x^11+1).""" lfsr = int(self._lfsr[i]) bit = lfsr & 1 lfsr >>= 1 if bit: lfsr ^= NOISE_LFSR_TAPS self._lfsr[i] = lfsr return lfsr def _update_neurons(self, neuron_range, acc_soma, acc_dend): """Run LIF UPDATE for a set of neurons. Returns [(gid, payload), ...].""" new_spikes = [] for i in neuron_range: total_input = int(acc_soma[i]) if self._dendritic_enable: dthr = int(self._dend_threshold[i]) for d in range(3): dval = int(acc_dend[d][i]) if dval > dthr: total_input += dval - dthr potential = int(self._potential[i]) refrac = int(self._refrac[i]) leak = int(self._leak[i]) threshold = int(self._threshold[i]) resting = int(self._resting[i]) trace = int(self._trace[i]) trace2 = int(self._trace2[i]) tau1 = int(self._tau1[i]) tau2 = int(self._tau2[i]) # P14: Apply noise to threshold if self._noise_enable: cfg = int(self._noise_config[i]) mantissa = cfg & 0x0F exponent = (cfg >> 4) & 0x0F if mantissa > 0: lfsr = self._advance_lfsr(i) noise_mask = mantissa << exponent noise_val = (lfsr & noise_mask) - (noise_mask >> 1) threshold = threshold + noise_val if refrac > 0: self._potential[i] = resting self._refrac[i] = refrac - 1 self._trace[i] = self._decay_trace(trace, tau1) self._trace2[i] = self._decay_trace(trace2, tau2) elif potential + total_input - leak >= threshold: excess = potential + total_input - leak - threshold payload = max(1, min(255, excess)) self._potential[i] = resting self._refrac[i] = int(self._refrac_period[i]) self._trace[i] = TRACE_MAX self._trace2[i] = TRACE_MAX new_spikes.append((i, payload if self._graded_enable else 128)) elif potential + total_input > leak: self._potential[i] = potential + total_input - leak self._trace[i] = self._decay_trace(trace, tau1) self._trace2[i] = self._decay_trace(trace2, tau2) else: self._potential[i] = resting self._trace[i] = self._decay_trace(trace, tau1) self._trace2[i] = self._decay_trace(trace2, tau2) return new_spikes def _stdp_update(self, weights, new_spikes): """2-factor STDP: direct weight update. If a custom learning rule is set (P19), uses the microcode interpreter. Otherwise falls back to the hardcoded P7 STDP behavior. """ if self._learning_rule is not None: self._microcode_learn(weights, new_spikes, three_factor=False) return for spike_gid, _ in new_spikes: # LTD: this neuron spiked (pre), check post-synaptic traces if spike_gid in weights: updated = [] for entry in weights[spike_gid]: tgt, w, c = entry[0], entry[1], entry[2] rest = entry[3:] if tgt < self._n: post_trace = int(self._trace[tgt]) if post_trace > 0: delta = post_trace >> LEARN_SHIFT w = max(WEIGHT_MIN_STDP, w - delta) updated.append((tgt, w, c, *rest)) weights[spike_gid] = updated # LTP: this neuron spiked (post), check pre-synaptic traces for src, targets in weights.items(): if src == spike_gid: continue updated = [] for entry in targets: tgt, w, c = entry[0], entry[1], entry[2] rest = entry[3:] if tgt == spike_gid: pre_trace = int(self._trace[src]) if pre_trace > 0: delta = pre_trace >> LEARN_SHIFT w = min(WEIGHT_MAX_STDP, w + delta) updated.append((tgt, w, c, *rest)) weights[src] = updated def _elig_update(self, weights, new_spikes): """P13c 3-factor: STDP correlation -> eligibility accumulation. If a custom learning rule is set (P19), uses the microcode interpreter. Otherwise falls back to the hardcoded behavior. """ if self._learning_rule is not None: self._microcode_learn(weights, new_spikes, three_factor=True) return for spike_gid, _ in new_spikes: # LTD direction: pre spiked, check post traces if spike_gid in weights: for entry in weights[spike_gid]: tgt = entry[0] if tgt < self._n: post_trace = int(self._trace[tgt]) if post_trace > 0: delta = post_trace >> LEARN_SHIFT key = (spike_gid, tgt) self._eligibility[key] = max( -ELIG_MAX, self._eligibility[key] - delta) # LTP direction: post spiked, check pre traces for src, targets in weights.items(): if src == spike_gid: continue for entry in targets: tgt = entry[0] if tgt == spike_gid: pre_trace = int(self._trace[src]) if pre_trace > 0: delta = pre_trace >> LEARN_SHIFT key = (src, spike_gid) self._eligibility[key] = min( ELIG_MAX, self._eligibility[key] + delta) def _reward_apply(self, weights): """P13c: Apply reward signal to weights via eligibility. weight += (eligibility * reward) >> REWARD_SHIFT """ reward = self._reward_value if reward == 0: return for src in list(weights.keys()): updated = [] for entry in weights[src]: tgt, w, c = entry[0], entry[1], entry[2] rest = entry[3:] key = (src, tgt) elig = self._eligibility.get(key, 0) if elig != 0: delta = (elig * reward) >> REWARD_SHIFT w = max(WEIGHT_MIN_STDP, min(WEIGHT_MAX_STDP, w + delta)) updated.append((tgt, w, c, *rest)) weights[src] = updated self._reward_value = 0 def _elig_decay(self): """P13c: Exponential decay of all eligibility traces. elig -= elig >> ELIG_DECAY_SHIFT (~12.5% per timestep) """ to_delete = [] for key in self._eligibility: val = self._eligibility[key] if val > 0: val -= max(1, val >> ELIG_DECAY_SHIFT) elif val < 0: val += max(1, (-val) >> ELIG_DECAY_SHIFT) if val == 0: to_delete.append(key) else: self._eligibility[key] = val for key in to_delete: del self._eligibility[key] def _microcode_learn(self, weights, new_spikes, three_factor=False): """P19: Run microcode learning programs for spiked neurons. For each pre-synaptic spike: run LTD program (PC 0-15) on each outgoing synapse. For each post-synaptic spike: run LTP program (PC 16-31) on each incoming synapse. Registers are loaded per-synapse: R0=trace1 (counterpart), R1=trace2, R2=weight, R3=eligibility, R4=constant, R5=temp0, R6=temp1, R7=reward """ program = self._learning_rule.get_program() for spike_gid, _ in new_spikes: # LTD: this neuron spiked (pre), run LTD program per outgoing synapse if spike_gid in weights: updated = [] for entry in weights[spike_gid]: tgt, w, c = entry[0], entry[1], entry[2] rest = entry[3:] if tgt < self._n: post_trace1 = int(self._trace[tgt]) post_trace2 = int(self._trace2[tgt]) elig = self._eligibility.get((spike_gid, tgt), 0) regs = [post_trace1, post_trace2, w, elig, 0, 0, 0, self._reward_value] result = execute_program( program, LTD_START, LTD_END + 1, regs) if three_factor: if result["elig_written"]: new_elig = max(-ELIG_MAX, min(ELIG_MAX, result["elig"])) self._eligibility[(spike_gid, tgt)] = new_elig else: if result["weight_written"]: w = max(WEIGHT_MIN_STDP, min(WEIGHT_MAX_STDP, result["weight"])) updated.append((tgt, w, c, *rest)) weights[spike_gid] = updated # LTP: this neuron spiked (post), run LTP program per incoming synapse for src, targets in weights.items(): if src == spike_gid: continue updated = [] for entry in targets: tgt, w, c = entry[0], entry[1], entry[2] rest = entry[3:] if tgt == spike_gid: pre_trace1 = int(self._trace[src]) pre_trace2 = int(self._trace2[src]) elig = self._eligibility.get((src, tgt), 0) regs = [pre_trace1, pre_trace2, w, elig, 0, 0, 0, self._reward_value] result = execute_program( program, LTP_START, LTP_END + 1, regs) if three_factor: if result["elig_written"]: new_elig = max(-ELIG_MAX, min(ELIG_MAX, result["elig"])) self._eligibility[(src, tgt)] = new_elig else: if result["weight_written"]: w = max(WEIGHT_MIN_STDP, min(WEIGHT_MAX_STDP, result["weight"])) updated.append((tgt, w, c, *rest)) weights[src] = updated def set_learning(self, learn=False, graded=False, dendritic=False, async_mode=False, three_factor=False, noise=False): """Configure learning and feature flags. Args: learn: Enable STDP learning graded: Enable graded spike payloads dendritic: Enable dendritic compartments async_mode: Enable P12 GALS event-driven mode three_factor: Enable P13c 3-factor learning (requires learn=True) noise: Enable P14 stochastic noise injection """ self._learn_enable = learn self._graded_enable = graded self._dendritic_enable = dendritic self._async_enable = async_mode self._three_factor_enable = three_factor self._noise_enable = noise if three_factor and not learn: self._learn_enable = True def status(self): return { "state": 0, # always idle in simulator "timestep_count": self._timestep_count, } def close(self): pass # nothing to release def _resolve_targets(self, target): """Convert Population/PopulationSlice to [(core, neuron)] pairs.""" if isinstance(target, list): return target placement = self._compiled.placement if isinstance(target, PopulationSlice): return [ placement.neuron_map[(target.population.id, i)] for i in target.indices ] if isinstance(target, Population): return [ placement.neuron_map[(target.id, i)] for i in range(target.size) ] raise TypeError(f"Cannot resolve target of type {type(target)}")