catalyst-n1 / sdk /tests /test_microcode.py
mrwabbit's picture
Initial upload: Catalyst N1 open source neuromorphic processor RTL
e4cdd5f verified
"""Tests for P19 microcode learning engine."""
import pytest
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import neurocore as nc
from neurocore.microcode import (
encode_instruction, decode_instruction, execute_program,
LearningRule, _assemble,
OP_NOP, OP_ADD, OP_SUB, OP_MUL, OP_SHR, OP_SHL,
OP_MAX, OP_MIN, OP_LOADI, OP_STORE_W, OP_STORE_E,
OP_SKIP_Z, OP_SKIP_NZ, OP_HALT,
R_TRACE1, R_TRACE2, R_WEIGHT, R_ELIG, R_CONST,
R_TEMP0, R_TEMP1, R_REWARD,
LTD_START, LTD_END, LTP_START, LTP_END,
MICROCODE_DEPTH,
)
from neurocore.constants import NEURONS_PER_CORE, WEIGHT_MAX_STDP, WEIGHT_MIN_STDP
class TestEncoding:
def test_encode_decode_roundtrip(self):
"""Encoding then decoding should return original fields."""
word = encode_instruction(OP_ADD, dst=R_WEIGHT, src_a=R_TRACE1, src_b=R_TEMP0)
d = decode_instruction(word)
assert d["op"] == OP_ADD
assert d["dst"] == R_WEIGHT
assert d["src_a"] == R_TRACE1
assert d["src_b"] == R_TEMP0
assert d["op_name"] == "ADD"
def test_all_opcodes_valid(self):
"""All 14 opcodes should encode to valid 32-bit words."""
for op in range(14):
word = encode_instruction(op)
assert 0 <= word <= 0xFFFFFFFF
d = decode_instruction(word)
assert d["op"] == op
def test_shift_encoding(self):
"""Shift field should roundtrip correctly."""
for shift in range(8):
word = encode_instruction(OP_SHR, dst=R_TEMP0, src_a=R_TRACE1, shift=shift)
d = decode_instruction(word)
assert d["shift"] == shift
def test_immediate_encoding(self):
"""Signed immediate should roundtrip correctly."""
for imm in [0, 1, -1, 32767, -32768, 100, -100]:
word = encode_instruction(OP_LOADI, dst=R_CONST, imm=imm)
d = decode_instruction(word)
assert d["imm"] == imm
def test_invalid_opcode_raises(self):
with pytest.raises(ValueError):
encode_instruction(14)
with pytest.raises(ValueError):
encode_instruction(-1)
def test_invalid_register_raises(self):
with pytest.raises(ValueError):
encode_instruction(OP_ADD, dst=8)
class TestExecution:
def test_add(self):
"""ADD R5, R0, R2 with R0=10, R2=20 -> R5=30."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_ADD, dst=R_TEMP0, src_a=R_TRACE1, src_b=R_WEIGHT)
prog[1] = encode_instruction(OP_HALT)
regs = [10, 0, 20, 0, 0, 0, 0, 0]
result = execute_program(prog, 0, 16, regs)
assert regs[R_TEMP0] == 30
def test_sub(self):
"""SUB R5, R2, R0 with R2=100, R0=30 -> R5=70."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_SUB, dst=R_TEMP0, src_a=R_WEIGHT, src_b=R_TRACE1)
prog[1] = encode_instruction(OP_HALT)
regs = [30, 0, 100, 0, 0, 0, 0, 0]
execute_program(prog, 0, 16, regs)
assert regs[R_TEMP0] == 70
def test_shr(self):
"""SHR R5, R0, 3 with R0=100 -> R5=12."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_SHR, dst=R_TEMP0, src_a=R_TRACE1, shift=3)
prog[1] = encode_instruction(OP_HALT)
regs = [100, 0, 0, 0, 0, 0, 0, 0]
execute_program(prog, 0, 16, regs)
assert regs[R_TEMP0] == 12
def test_shl(self):
"""SHL R5, R0, 2 with R0=5 -> R5=20."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_SHL, dst=R_TEMP0, src_a=R_TRACE1, shift=2)
prog[1] = encode_instruction(OP_HALT)
regs = [5, 0, 0, 0, 0, 0, 0, 0]
execute_program(prog, 0, 16, regs)
assert regs[R_TEMP0] == 20
def test_max_min(self):
"""MAX and MIN opcodes."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_MAX, dst=R_TEMP0, src_a=R_TRACE1, src_b=R_WEIGHT)
prog[1] = encode_instruction(OP_MIN, dst=R_TEMP1, src_a=R_TRACE1, src_b=R_WEIGHT)
prog[2] = encode_instruction(OP_HALT)
regs = [30, 0, 100, 0, 0, 0, 0, 0]
execute_program(prog, 0, 16, regs)
assert regs[R_TEMP0] == 100 # max(30, 100)
assert regs[R_TEMP1] == 30 # min(30, 100)
def test_loadi(self):
"""LOADI R4, 42 -> R4=42."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_LOADI, dst=R_CONST, imm=42)
prog[1] = encode_instruction(OP_HALT)
regs = [0] * 8
execute_program(prog, 0, 16, regs)
assert regs[R_CONST] == 42
def test_skip_z(self):
"""SKIP_Z should skip next instruction when src_a == 0."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_SKIP_Z, src_a=R_TRACE1) # R0=0, skip
prog[1] = encode_instruction(OP_LOADI, dst=R_TEMP0, imm=99) # skipped
prog[2] = encode_instruction(OP_LOADI, dst=R_TEMP1, imm=42) # executed
prog[3] = encode_instruction(OP_HALT)
regs = [0] * 8
execute_program(prog, 0, 16, regs)
assert regs[R_TEMP0] == 0 # skipped
assert regs[R_TEMP1] == 42 # executed
def test_store_w(self):
"""STORE_W should report weight written."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_LOADI, dst=R_WEIGHT, imm=999)
prog[1] = encode_instruction(OP_STORE_W, src_a=R_WEIGHT)
prog[2] = encode_instruction(OP_HALT)
regs = [0, 0, 500, 0, 0, 0, 0, 0]
result = execute_program(prog, 0, 16, regs)
assert result["weight_written"] is True
assert result["weight"] == 999
def test_store_e(self):
"""STORE_E should report eligibility written."""
prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
prog[0] = encode_instruction(OP_LOADI, dst=R_ELIG, imm=-50)
prog[1] = encode_instruction(OP_STORE_E, src_a=R_ELIG)
prog[2] = encode_instruction(OP_HALT)
regs = [0] * 8
result = execute_program(prog, 0, 16, regs)
assert result["elig_written"] is True
assert result["elig"] == -50
class TestAssembler:
def test_basic_assembly(self):
"""Assemble a simple LTD program."""
text = """
SHR R5, R0, 3
SKIP_Z R5
SUB R2, R2, R5
STORE_W R2
HALT
"""
instrs = _assemble(text)
assert len(instrs) == 5
d = decode_instruction(instrs[0])
assert d["op_name"] == "SHR"
assert d["dst"] == R_TEMP0
assert d["src_a"] == R_TRACE1
assert d["shift"] == 3
def test_comments_stripped(self):
"""Comments starting with ; or # should be ignored."""
text = """
; This is a comment
NOP
# Another comment
HALT
"""
instrs = _assemble(text)
assert len(instrs) == 2
def test_loadi_assembly(self):
"""LOADI with hex immediate."""
text = "LOADI R4, 0xFF"
instrs = _assemble(text)
d = decode_instruction(instrs[0])
assert d["op"] == OP_LOADI
assert d["imm"] == 255
class TestLearningRule:
def test_stdp_factory(self):
"""LearningRule.stdp() should produce a 64-word program."""
rule = LearningRule.stdp()
prog = rule.get_program()
assert len(prog) == MICROCODE_DEPTH
# LTD region should have non-NOP instructions
ltd = rule.get_ltd()
assert any(decode_instruction(w)["op"] != OP_NOP for w in ltd)
def test_three_factor_factory(self):
"""LearningRule.three_factor() uses STORE_E instead of STORE_W."""
rule = LearningRule.three_factor()
ltd = rule.get_ltd()
has_store_e = any(decode_instruction(w)["op"] == OP_STORE_E for w in ltd)
has_store_w = any(decode_instruction(w)["op"] == OP_STORE_W for w in ltd)
assert has_store_e
assert not has_store_w
def test_from_instructions(self):
"""Build rule from raw instruction lists."""
ltd = [encode_instruction(OP_HALT)]
ltp = [encode_instruction(OP_HALT)]
rule = LearningRule.from_instructions(ltd, ltp)
prog = rule.get_program()
assert decode_instruction(prog[0])["op"] == OP_HALT
assert decode_instruction(prog[16])["op"] == OP_HALT
def test_assemble_ltd_ltp(self):
"""Build rule from assembly text."""
rule = LearningRule()
rule.assemble_ltd("SHR R5, R0, 3\nSKIP_Z R5\nSUB R2, R2, R5\nSTORE_W R2\nHALT")
rule.assemble_ltp("SHR R5, R0, 3\nSKIP_Z R5\nADD R2, R2, R5\nSTORE_W R2\nHALT")
prog = rule.get_program()
# LTD starts at 0
assert decode_instruction(prog[0])["op"] == OP_SHR
# LTP starts at 16
assert decode_instruction(prog[16])["op"] == OP_SHR
class TestMicrocodeSTDP:
"""Test that microcode STDP reproduces hardcoded STDP behavior."""
def test_default_microcode_stdp_weight_change(self):
"""Default microcode STDP should produce same weight changes as hardcoded."""
net = nc.Network()
src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
net.connect(src, tgt, topology="all_to_all", weight=500)
net.set_learning_rule(LearningRule.stdp())
sim = nc.Simulator()
sim.deploy(net)
sim.set_learning(learn=True)
# Make src spike, then tgt spikes from synaptic input (LTP)
sim.inject(src, current=200)
sim.run(1) # src spikes at t=0
sim.run(1) # tgt receives input, spikes at t=1 -> LTP
# Weight should have increased
adj = sim._adjacency
for targets in adj.values():
for entry in targets:
w = entry[1]
assert w > 500, f"Expected LTP increase, got {w}"
def test_default_microcode_three_factor(self):
"""Default 3-factor microcode should accumulate eligibility."""
net = nc.Network()
src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
net.connect(src, tgt, topology="all_to_all", weight=500)
net.set_learning_rule(LearningRule.three_factor())
sim = nc.Simulator()
sim.deploy(net)
sim.set_learning(learn=True, three_factor=True)
sim.inject(src, current=200)
sim.inject(tgt, current=200)
sim.run(3)
# Should have eligibility
assert len(sim._eligibility) > 0
# Weight unchanged without reward
for targets in sim._adjacency.values():
for entry in targets:
assert entry[1] == 500
def test_anti_stdp_custom_rule(self):
"""Custom anti-STDP: LTD becomes LTP and vice versa."""
rule = LearningRule()
# Anti-STDP LTD: ADD weight (instead of SUB)
rule.assemble_ltd(
"SHR R5, R0, 3\n"
"SKIP_Z R5\n"
"ADD R2, R2, R5\n"
"STORE_W R2\n"
"HALT"
)
# Anti-STDP LTP: SUB weight (instead of ADD)
rule.assemble_ltp(
"SHR R5, R0, 3\n"
"SKIP_Z R5\n"
"SUB R2, R2, R5\n"
"STORE_W R2\n"
"HALT"
)
net = nc.Network()
src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
net.connect(src, tgt, topology="all_to_all", weight=500)
net.set_learning_rule(rule)
sim = nc.Simulator()
sim.deploy(net)
sim.set_learning(learn=True)
# src fires then tgt fires -> LTP normally increases weight
# but anti-STDP should DECREASE it
sim.inject(src, current=200)
sim.run(1)
sim.run(1)
adj = sim._adjacency
for targets in adj.values():
for entry in targets:
w = entry[1]
assert w < 500, f"Anti-STDP should decrease weight, got {w}"
def test_compiler_generates_learn_cmds(self):
"""Compiler should generate PROG_LEARN commands when rule is attached."""
from neurocore.compiler import Compiler
net = nc.Network()
src = net.population(2)
tgt = net.population(2)
net.connect(src, tgt, topology="all_to_all", weight=200)
net.set_learning_rule(LearningRule.stdp())
compiled = Compiler().compile(net)
assert len(compiled.prog_learn_cmds) > 0
# Each cmd should have core, addr, instr
for cmd in compiled.prog_learn_cmds:
assert "core" in cmd
assert "addr" in cmd
assert "instr" in cmd