| | """
|
| | Neuromorphic Chip Host Controller
|
| | ==================================
|
| | Python script to communicate with the neuromorphic FPGA over UART.
|
| |
|
| | v1.0 Loihi parity: CSR pool, multicast routing, noise, dual traces,
|
| | axon delays, synapse formats, microcode learning, hierarchical routing.
|
| |
|
| | Usage:
|
| | python fpga/host.py --port COM3 # Windows
|
| | python fpga/host.py --port /dev/ttyUSB1 # Linux
|
| |
|
| | Commands:
|
| | python fpga/host.py --port COM3 --demo # Run demo (program chain, stimulate, run)
|
| | python fpga/host.py --port COM3 --status # Query chip status
|
| | """
|
| |
|
| | import serial
|
| | import struct
|
| | import time
|
| | import argparse
|
| | import sys
|
| |
|
| |
|
| | class NeuromorphicChip:
|
| | """Interface to the neuromorphic FPGA over UART."""
|
| |
|
| |
|
| | CMD_PROG_POOL = 0x01
|
| | CMD_PROG_ROUTE = 0x02
|
| | CMD_STIMULUS = 0x03
|
| | CMD_RUN = 0x04
|
| | CMD_STATUS = 0x05
|
| | CMD_LEARN_CFG = 0x06
|
| | CMD_PROG_NEURON = 0x07
|
| | CMD_PROG_INDEX = 0x08
|
| | CMD_REWARD = 0x09
|
| | CMD_PROG_DELAY = 0x0A
|
| | CMD_PROG_LEARN = 0x0C
|
| | CMD_PROG_GLOBAL_ROUTE = 0x10
|
| |
|
| |
|
| | PARAM_THRESHOLD = 0
|
| | PARAM_LEAK = 1
|
| | PARAM_RESTING = 2
|
| | PARAM_REFRAC = 3
|
| | PARAM_DEND_THRESHOLD = 4
|
| |
|
| |
|
| | RESP_ACK = 0xAA
|
| | RESP_DONE = 0xDD
|
| |
|
| | def __init__(self, port, baud=115200, timeout=10):
|
| | self.ser = serial.Serial(port, baud, timeout=timeout)
|
| | time.sleep(0.1)
|
| | self.ser.reset_input_buffer()
|
| | self._pool_alloc = {}
|
| | print(f"Connected to {port} @ {baud} baud")
|
| |
|
| | def close(self):
|
| | self.ser.close()
|
| |
|
| | def _send(self, data):
|
| | """Send raw bytes."""
|
| | self.ser.write(bytes(data))
|
| |
|
| | def _recv(self, n):
|
| | """Receive exactly n bytes."""
|
| | data = self.ser.read(n)
|
| | if len(data) != n:
|
| | raise TimeoutError(f"Expected {n} bytes, got {len(data)}")
|
| | return data
|
| |
|
| | def _wait_ack(self):
|
| | """Wait for ACK (0xAA) response."""
|
| | resp = self._recv(1)
|
| | if resp[0] != self.RESP_ACK:
|
| | raise ValueError(f"Expected ACK (0xAA), got 0x{resp[0]:02X}")
|
| |
|
| | def _alloc_pool(self, core, count=1):
|
| | """Allocate pool entries for a core (bump allocator)."""
|
| | if core not in self._pool_alloc:
|
| | self._pool_alloc[core] = 0
|
| | addr = self._pool_alloc[core]
|
| | self._pool_alloc[core] += count
|
| | return addr
|
| |
|
| | def prog_pool(self, core, pool_addr, src, target, weight, comp=0):
|
| | """Program a connection pool entry.
|
| |
|
| | Args:
|
| | core: Core ID
|
| | pool_addr: Pool address (0 to POOL_DEPTH-1)
|
| | src: Source neuron (for reverse table, 0-1023)
|
| | target: Target neuron (0-1023)
|
| | weight: Signed 16-bit weight
|
| | comp: Compartment ID (0=soma, 1-3=dendrites)
|
| | """
|
| | w = weight & 0xFFFF
|
| |
|
| | flags = ((comp & 0x3) << 6) | (((src >> 8) & 0x3) << 4) | (((target >> 8) & 0x3) << 2)
|
| | self._send([
|
| | self.CMD_PROG_POOL,
|
| | core & 0xFF,
|
| | (pool_addr >> 8) & 0xFF, pool_addr & 0xFF,
|
| | flags,
|
| | src & 0xFF,
|
| | target & 0xFF,
|
| | (w >> 8) & 0xFF, w & 0xFF
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def prog_index(self, core, neuron, base_addr, count, format=0, base_target=0):
|
| | """Program a CSR index entry (base_addr + count for a neuron).
|
| |
|
| | Args:
|
| | core: Core ID
|
| | neuron: Neuron ID (0-1023)
|
| | base_addr: Pool base address
|
| | count: Number of connections
|
| | format: Synapse format (0=sparse, 1=dense, 2=pop)
|
| | base_target: Base target neuron for dense/pop formats
|
| | """
|
| | self._send([
|
| | self.CMD_PROG_INDEX,
|
| | core & 0xFF,
|
| | (neuron >> 8) & 0xFF, neuron & 0xFF,
|
| | (base_addr >> 8) & 0xFF, base_addr & 0xFF,
|
| | (count >> 8) & 0xFF, count & 0xFF,
|
| | ((format & 0x3) << 6) | ((base_target >> 8) & 0x3),
|
| | base_target & 0xFF,
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def prog_conn(self, core, src, targets_weights, comp=0):
|
| | """High-level: program connections for a source neuron using pool allocator.
|
| |
|
| | Args:
|
| | core: Core ID
|
| | src: Source neuron
|
| | targets_weights: List of (target, weight) tuples
|
| | comp: Compartment ID (default 0=soma)
|
| | """
|
| | if not targets_weights:
|
| | return
|
| | base = self._alloc_pool(core, len(targets_weights))
|
| | for i, (target, weight) in enumerate(targets_weights):
|
| | self.prog_pool(core, base + i, src, target, weight, comp)
|
| | self.prog_index(core, src, base, len(targets_weights))
|
| |
|
| | def prog_route(self, src_core, src_neuron, dest_core, dest_neuron, weight, slot=0):
|
| | """Program an inter-core route (multicast slot).
|
| |
|
| | Args:
|
| | src_core: Source core ID
|
| | src_neuron: Source neuron (0-1023)
|
| | dest_core: Destination core ID
|
| | dest_neuron: Destination neuron (0-1023)
|
| | weight: Signed 16-bit weight
|
| | slot: Route slot (0-7) for multicast fanout
|
| | """
|
| | w = weight & 0xFFFF
|
| | self._send([
|
| | self.CMD_PROG_ROUTE,
|
| | src_core & 0xFF,
|
| | (src_neuron >> 8) & 0xFF, src_neuron & 0xFF,
|
| | slot & 0xFF,
|
| | dest_core & 0xFF,
|
| | (dest_neuron >> 8) & 0xFF, dest_neuron & 0xFF,
|
| | (w >> 8) & 0xFF, w & 0xFF
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def stimulus(self, core, neuron, current):
|
| | """Set external stimulus current for next RUN.
|
| |
|
| | Args:
|
| | core: Target core ID
|
| | neuron: Target neuron (0-1023)
|
| | current: Signed 16-bit current value
|
| | """
|
| | c = current & 0xFFFF
|
| | self._send([
|
| | self.CMD_STIMULUS,
|
| | core & 0xFF,
|
| | (neuron >> 8) & 0xFF, neuron & 0xFF,
|
| | (c >> 8) & 0xFF, c & 0xFF
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def run(self, timesteps):
|
| | """Run the mesh for N timesteps.
|
| |
|
| | Args:
|
| | timesteps: Number of timesteps (1-65535)
|
| |
|
| | Returns:
|
| | Number of spikes that occurred during the run.
|
| | """
|
| | ts = timesteps & 0xFFFF
|
| | self._send([
|
| | self.CMD_RUN,
|
| | (ts >> 8) & 0xFF, ts & 0xFF
|
| | ])
|
| | resp = self._recv(5)
|
| | if resp[0] != self.RESP_DONE:
|
| | raise ValueError(f"Expected DONE (0xDD), got 0x{resp[0]:02X}")
|
| | spikes = struct.unpack('>I', resp[1:5])[0]
|
| | return spikes
|
| |
|
| | def reward(self, value):
|
| | """Set reward value for 3-factor learning.
|
| |
|
| | Args:
|
| | value: Signed 16-bit reward (0 = no reward)
|
| | """
|
| | v = value & 0xFFFF
|
| | self._send([
|
| | self.CMD_REWARD,
|
| | (v >> 8) & 0xFF, v & 0xFF
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def set_learning(self, learn_enable, graded_enable=False, dendritic_enable=False,
|
| | async_enable=False, threefactor_enable=False, noise_enable=False):
|
| | """Configure learning mode flags."""
|
| | flags = ((int(learn_enable) & 1)
|
| | | ((int(graded_enable) & 1) << 1)
|
| | | ((int(dendritic_enable) & 1) << 2)
|
| | | ((int(async_enable) & 1) << 3)
|
| | | ((int(threefactor_enable) & 1) << 4)
|
| | | ((int(noise_enable) & 1) << 5))
|
| | self._send([self.CMD_LEARN_CFG, flags])
|
| | self._wait_ack()
|
| |
|
| | def prog_delay(self, core, pool_addr, delay):
|
| | """Program an axon delay for a pool entry (P17).
|
| |
|
| | Args:
|
| | core: Core ID
|
| | pool_addr: Pool address of the connection
|
| | delay: Delay in timesteps (0-63)
|
| | """
|
| | self._send([
|
| | self.CMD_PROG_DELAY,
|
| | core & 0xFF,
|
| | (pool_addr >> 8) & 0xFF, pool_addr & 0xFF,
|
| | delay & 0x3F,
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def prog_learn(self, core, addr, instr):
|
| | """Program a microcode learning instruction (P19).
|
| |
|
| | Args:
|
| | core: Core ID
|
| | addr: Instruction address (0-63)
|
| | instr: 32-bit instruction word
|
| | """
|
| | self._send([
|
| | self.CMD_PROG_LEARN,
|
| | core & 0xFF,
|
| | addr & 0x3F,
|
| | (instr >> 24) & 0xFF,
|
| | (instr >> 16) & 0xFF,
|
| | (instr >> 8) & 0xFF,
|
| | instr & 0xFF,
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def prog_global_route(self, src_core, src_neuron, dest_core, dest_neuron,
|
| | weight, slot=0):
|
| | """Program an inter-cluster global route (P20).
|
| |
|
| | Args:
|
| | src_core: Source core ID
|
| | src_neuron: Source neuron (0-1023)
|
| | dest_core: Destination core ID
|
| | dest_neuron: Destination neuron (0-1023)
|
| | weight: Signed 16-bit weight
|
| | slot: Route slot (0-3)
|
| | """
|
| | w = weight & 0xFFFF
|
| | self._send([
|
| | self.CMD_PROG_GLOBAL_ROUTE,
|
| | src_core & 0xFF,
|
| | (src_neuron >> 8) & 0xFF, src_neuron & 0xFF,
|
| | slot & 0xFF,
|
| | dest_core & 0xFF,
|
| | (dest_neuron >> 8) & 0xFF, dest_neuron & 0xFF,
|
| | (w >> 8) & 0xFF, w & 0xFF,
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def async_mode(self, enable=True):
|
| | """Enable or disable async event-driven mode."""
|
| | self.set_learning(False, False, False, async_enable=enable)
|
| |
|
| | def prog_neuron(self, core, neuron, param_id, value):
|
| | """Program a per-neuron parameter.
|
| |
|
| | Args:
|
| | core: Core ID
|
| | neuron: Neuron ID (0-1023)
|
| | param_id: Parameter (PARAM_THRESHOLD=0, PARAM_LEAK=1, etc.)
|
| | value: Signed 16-bit value
|
| | """
|
| | v = value & 0xFFFF
|
| | self._send([
|
| | self.CMD_PROG_NEURON,
|
| | core & 0xFF,
|
| | (neuron >> 8) & 0xFF, neuron & 0xFF,
|
| | param_id & 0xFF,
|
| | (v >> 8) & 0xFF, v & 0xFF
|
| | ])
|
| | self._wait_ack()
|
| |
|
| | def status(self):
|
| | """Query chip status.
|
| |
|
| | Returns:
|
| | Tuple of (state, timestep_count)
|
| | """
|
| | self._send([self.CMD_STATUS])
|
| | resp = self._recv(5)
|
| | state = resp[0]
|
| | ts_count = struct.unpack('>I', resp[1:5])[0]
|
| | return state, ts_count
|
| |
|
| |
|
| | def demo(chip):
|
| | """Run a demonstration: program a spike chain and observe propagation."""
|
| |
|
| | print("\n" + "=" * 60)
|
| | print(" Neuromorphic Chip Demo (Phase 13b: CSR + Multicast)")
|
| | print("=" * 60)
|
| |
|
| | state, ts = chip.status()
|
| | print(f"\nInitial status: state={state}, timesteps={ts}")
|
| |
|
| |
|
| | print("\nProgramming spike chain: Core 0, N0 -> N1 -> N2 -> N3")
|
| | chip.prog_conn(0, 0, [(1, 1200)])
|
| | print(" N0 -> N1 (w=1200) OK")
|
| | chip.prog_conn(0, 1, [(2, 1200)])
|
| | print(" N1 -> N2 (w=1200) OK")
|
| | chip.prog_conn(0, 2, [(3, 1200)])
|
| | print(" N2 -> N3 (w=1200) OK")
|
| |
|
| |
|
| | print("\nProgramming cross-core route: C0:N3 -> C1:N0")
|
| | chip.prog_route(src_core=0, src_neuron=3,
|
| | dest_core=1, dest_neuron=0, weight=1200)
|
| | print(" Route OK")
|
| |
|
| |
|
| | print("Programming Core 1 chain: N0 -> N1 -> N2")
|
| | chip.prog_conn(1, 0, [(1, 1200)])
|
| | chip.prog_conn(1, 1, [(2, 1200)])
|
| | print(" Core 1 chain OK")
|
| |
|
| |
|
| | print("\nApplying stimulus: Core 0, N0, current=1200")
|
| | chip.stimulus(core=0, neuron=0, current=1200)
|
| |
|
| | print("Running 20 timesteps...")
|
| | t_start = time.time()
|
| | spikes = chip.run(20)
|
| | elapsed = time.time() - t_start
|
| | print(f" Done! {spikes} spikes in {elapsed:.3f}s")
|
| |
|
| |
|
| | print("\nRunning 10 more timesteps (no stimulus)...")
|
| | spikes2 = chip.run(10)
|
| | print(f" {spikes2} spikes (should be 0 - no input)")
|
| |
|
| |
|
| | state, ts = chip.status()
|
| | print(f"\nFinal status: state={state}, timesteps={ts}")
|
| |
|
| | print("\n" + "=" * 60)
|
| | print(" Demo complete! The chip is alive.")
|
| | print("=" * 60)
|
| |
|
| |
|
| | def main():
|
| | parser = argparse.ArgumentParser(description="Neuromorphic Chip Host Controller")
|
| | parser.add_argument("--port", required=True, help="Serial port (e.g., COM3 or /dev/ttyUSB1)")
|
| | parser.add_argument("--baud", type=int, default=115200, help="Baud rate (default: 115200)")
|
| | parser.add_argument("--demo", action="store_true", help="Run demo program")
|
| | parser.add_argument("--status", action="store_true", help="Query chip status")
|
| | args = parser.parse_args()
|
| |
|
| | chip = NeuromorphicChip(args.port, args.baud)
|
| |
|
| | try:
|
| | if args.status:
|
| | state, ts = chip.status()
|
| | print(f"State: {state} ({'idle' if state == 0 else 'busy'})")
|
| | print(f"Timestep count: {ts}")
|
| | elif args.demo:
|
| | demo(chip)
|
| | else:
|
| | print("No command specified. Use --demo or --status")
|
| | print("Or import NeuromorphicChip in Python for programmatic access:")
|
| | print("")
|
| | print(" from host import NeuromorphicChip")
|
| | print(" chip = NeuromorphicChip('COM3')")
|
| | print(" chip.prog_conn(0, 0, [(1, 1200), (2, 800)]) # N0 -> N1(w=1200), N2(w=800)")
|
| | print(" chip.prog_index(0, 0, 0, 2) # Or use prog_conn() which handles this")
|
| | print(" chip.stimulus(core=0, neuron=0, current=1200)")
|
| | print(" spikes = chip.run(100)")
|
| | finally:
|
| | chip.close()
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | main()
|
| |
|