""" Neuromorphic Chip Host Controller ================================== Python script to communicate with the neuromorphic FPGA over UART. v1.0 Loihi parity: CSR pool, multicast routing, noise, dual traces, axon delays, synapse formats, microcode learning, hierarchical routing. Usage: python fpga/host.py --port COM3 # Windows python fpga/host.py --port /dev/ttyUSB1 # Linux Commands: python fpga/host.py --port COM3 --demo # Run demo (program chain, stimulate, run) python fpga/host.py --port COM3 --status # Query chip status """ import serial import struct import time import argparse import sys class NeuromorphicChip: """Interface to the neuromorphic FPGA over UART.""" # Command opcodes (Phase 13a protocol) CMD_PROG_POOL = 0x01 CMD_PROG_ROUTE = 0x02 CMD_STIMULUS = 0x03 CMD_RUN = 0x04 CMD_STATUS = 0x05 CMD_LEARN_CFG = 0x06 CMD_PROG_NEURON = 0x07 CMD_PROG_INDEX = 0x08 CMD_REWARD = 0x09 CMD_PROG_DELAY = 0x0A CMD_PROG_LEARN = 0x0C CMD_PROG_GLOBAL_ROUTE = 0x10 # Parameter IDs for CMD_PROG_NEURON PARAM_THRESHOLD = 0 PARAM_LEAK = 1 PARAM_RESTING = 2 PARAM_REFRAC = 3 PARAM_DEND_THRESHOLD = 4 # Response codes RESP_ACK = 0xAA RESP_DONE = 0xDD def __init__(self, port, baud=115200, timeout=10): self.ser = serial.Serial(port, baud, timeout=timeout) time.sleep(0.1) self.ser.reset_input_buffer() self._pool_alloc = {} # per-core pool bump allocator: core -> next_addr print(f"Connected to {port} @ {baud} baud") def close(self): self.ser.close() def _send(self, data): """Send raw bytes.""" self.ser.write(bytes(data)) def _recv(self, n): """Receive exactly n bytes.""" data = self.ser.read(n) if len(data) != n: raise TimeoutError(f"Expected {n} bytes, got {len(data)}") return data def _wait_ack(self): """Wait for ACK (0xAA) response.""" resp = self._recv(1) if resp[0] != self.RESP_ACK: raise ValueError(f"Expected ACK (0xAA), got 0x{resp[0]:02X}") def _alloc_pool(self, core, count=1): """Allocate pool entries for a core (bump allocator).""" if core not in self._pool_alloc: self._pool_alloc[core] = 0 addr = self._pool_alloc[core] self._pool_alloc[core] += count return addr def prog_pool(self, core, pool_addr, src, target, weight, comp=0): """Program a connection pool entry. Args: core: Core ID pool_addr: Pool address (0 to POOL_DEPTH-1) src: Source neuron (for reverse table, 0-1023) target: Target neuron (0-1023) weight: Signed 16-bit weight comp: Compartment ID (0=soma, 1-3=dendrites) """ w = weight & 0xFFFF # Pack flags: {comp[1:0], src[9:8], target[9:8], 2'b00} flags = ((comp & 0x3) << 6) | (((src >> 8) & 0x3) << 4) | (((target >> 8) & 0x3) << 2) self._send([ self.CMD_PROG_POOL, core & 0xFF, (pool_addr >> 8) & 0xFF, pool_addr & 0xFF, flags, src & 0xFF, target & 0xFF, (w >> 8) & 0xFF, w & 0xFF ]) self._wait_ack() def prog_index(self, core, neuron, base_addr, count, format=0, base_target=0): """Program a CSR index entry (base_addr + count for a neuron). Args: core: Core ID neuron: Neuron ID (0-1023) base_addr: Pool base address count: Number of connections format: Synapse format (0=sparse, 1=dense, 2=pop) base_target: Base target neuron for dense/pop formats """ self._send([ self.CMD_PROG_INDEX, core & 0xFF, (neuron >> 8) & 0xFF, neuron & 0xFF, (base_addr >> 8) & 0xFF, base_addr & 0xFF, (count >> 8) & 0xFF, count & 0xFF, ((format & 0x3) << 6) | ((base_target >> 8) & 0x3), base_target & 0xFF, ]) self._wait_ack() def prog_conn(self, core, src, targets_weights, comp=0): """High-level: program connections for a source neuron using pool allocator. Args: core: Core ID src: Source neuron targets_weights: List of (target, weight) tuples comp: Compartment ID (default 0=soma) """ if not targets_weights: return base = self._alloc_pool(core, len(targets_weights)) for i, (target, weight) in enumerate(targets_weights): self.prog_pool(core, base + i, src, target, weight, comp) self.prog_index(core, src, base, len(targets_weights)) def prog_route(self, src_core, src_neuron, dest_core, dest_neuron, weight, slot=0): """Program an inter-core route (multicast slot). Args: src_core: Source core ID src_neuron: Source neuron (0-1023) dest_core: Destination core ID dest_neuron: Destination neuron (0-1023) weight: Signed 16-bit weight slot: Route slot (0-7) for multicast fanout """ w = weight & 0xFFFF self._send([ self.CMD_PROG_ROUTE, src_core & 0xFF, (src_neuron >> 8) & 0xFF, src_neuron & 0xFF, slot & 0xFF, dest_core & 0xFF, (dest_neuron >> 8) & 0xFF, dest_neuron & 0xFF, (w >> 8) & 0xFF, w & 0xFF ]) self._wait_ack() def stimulus(self, core, neuron, current): """Set external stimulus current for next RUN. Args: core: Target core ID neuron: Target neuron (0-1023) current: Signed 16-bit current value """ c = current & 0xFFFF self._send([ self.CMD_STIMULUS, core & 0xFF, (neuron >> 8) & 0xFF, neuron & 0xFF, (c >> 8) & 0xFF, c & 0xFF ]) self._wait_ack() def run(self, timesteps): """Run the mesh for N timesteps. Args: timesteps: Number of timesteps (1-65535) Returns: Number of spikes that occurred during the run. """ ts = timesteps & 0xFFFF self._send([ self.CMD_RUN, (ts >> 8) & 0xFF, ts & 0xFF ]) resp = self._recv(5) if resp[0] != self.RESP_DONE: raise ValueError(f"Expected DONE (0xDD), got 0x{resp[0]:02X}") spikes = struct.unpack('>I', resp[1:5])[0] return spikes def reward(self, value): """Set reward value for 3-factor learning. Args: value: Signed 16-bit reward (0 = no reward) """ v = value & 0xFFFF self._send([ self.CMD_REWARD, (v >> 8) & 0xFF, v & 0xFF ]) self._wait_ack() def set_learning(self, learn_enable, graded_enable=False, dendritic_enable=False, async_enable=False, threefactor_enable=False, noise_enable=False): """Configure learning mode flags.""" flags = ((int(learn_enable) & 1) | ((int(graded_enable) & 1) << 1) | ((int(dendritic_enable) & 1) << 2) | ((int(async_enable) & 1) << 3) | ((int(threefactor_enable) & 1) << 4) | ((int(noise_enable) & 1) << 5)) self._send([self.CMD_LEARN_CFG, flags]) self._wait_ack() def prog_delay(self, core, pool_addr, delay): """Program an axon delay for a pool entry (P17). Args: core: Core ID pool_addr: Pool address of the connection delay: Delay in timesteps (0-63) """ self._send([ self.CMD_PROG_DELAY, core & 0xFF, (pool_addr >> 8) & 0xFF, pool_addr & 0xFF, delay & 0x3F, ]) self._wait_ack() def prog_learn(self, core, addr, instr): """Program a microcode learning instruction (P19). Args: core: Core ID addr: Instruction address (0-63) instr: 32-bit instruction word """ self._send([ self.CMD_PROG_LEARN, core & 0xFF, addr & 0x3F, (instr >> 24) & 0xFF, (instr >> 16) & 0xFF, (instr >> 8) & 0xFF, instr & 0xFF, ]) self._wait_ack() def prog_global_route(self, src_core, src_neuron, dest_core, dest_neuron, weight, slot=0): """Program an inter-cluster global route (P20). Args: src_core: Source core ID src_neuron: Source neuron (0-1023) dest_core: Destination core ID dest_neuron: Destination neuron (0-1023) weight: Signed 16-bit weight slot: Route slot (0-3) """ w = weight & 0xFFFF self._send([ self.CMD_PROG_GLOBAL_ROUTE, src_core & 0xFF, (src_neuron >> 8) & 0xFF, src_neuron & 0xFF, slot & 0xFF, dest_core & 0xFF, (dest_neuron >> 8) & 0xFF, dest_neuron & 0xFF, (w >> 8) & 0xFF, w & 0xFF, ]) self._wait_ack() def async_mode(self, enable=True): """Enable or disable async event-driven mode.""" self.set_learning(False, False, False, async_enable=enable) def prog_neuron(self, core, neuron, param_id, value): """Program a per-neuron parameter. Args: core: Core ID neuron: Neuron ID (0-1023) param_id: Parameter (PARAM_THRESHOLD=0, PARAM_LEAK=1, etc.) value: Signed 16-bit value """ v = value & 0xFFFF self._send([ self.CMD_PROG_NEURON, core & 0xFF, (neuron >> 8) & 0xFF, neuron & 0xFF, param_id & 0xFF, (v >> 8) & 0xFF, v & 0xFF ]) self._wait_ack() def status(self): """Query chip status. Returns: Tuple of (state, timestep_count) """ self._send([self.CMD_STATUS]) resp = self._recv(5) state = resp[0] ts_count = struct.unpack('>I', resp[1:5])[0] return state, ts_count def demo(chip): """Run a demonstration: program a spike chain and observe propagation.""" print("\n" + "=" * 60) print(" Neuromorphic Chip Demo (Phase 13b: CSR + Multicast)") print("=" * 60) state, ts = chip.status() print(f"\nInitial status: state={state}, timesteps={ts}") # Program a spike chain: Core 0, N0→N1→N2→N3 print("\nProgramming spike chain: Core 0, N0 -> N1 -> N2 -> N3") chip.prog_conn(0, 0, [(1, 1200)]) print(" N0 -> N1 (w=1200) OK") chip.prog_conn(0, 1, [(2, 1200)]) print(" N1 -> N2 (w=1200) OK") chip.prog_conn(0, 2, [(3, 1200)]) print(" N2 -> N3 (w=1200) OK") # Program cross-core route: Core 0 N3 → Core 1 N0 print("\nProgramming cross-core route: C0:N3 -> C1:N0") chip.prog_route(src_core=0, src_neuron=3, dest_core=1, dest_neuron=0, weight=1200) print(" Route OK") # Core 1 chain print("Programming Core 1 chain: N0 -> N1 -> N2") chip.prog_conn(1, 0, [(1, 1200)]) chip.prog_conn(1, 1, [(2, 1200)]) print(" Core 1 chain OK") # Stimulate and run print("\nApplying stimulus: Core 0, N0, current=1200") chip.stimulus(core=0, neuron=0, current=1200) print("Running 20 timesteps...") t_start = time.time() spikes = chip.run(20) elapsed = time.time() - t_start print(f" Done! {spikes} spikes in {elapsed:.3f}s") # Run more without stimulus print("\nRunning 10 more timesteps (no stimulus)...") spikes2 = chip.run(10) print(f" {spikes2} spikes (should be 0 - no input)") # Final status state, ts = chip.status() print(f"\nFinal status: state={state}, timesteps={ts}") print("\n" + "=" * 60) print(" Demo complete! The chip is alive.") print("=" * 60) def main(): parser = argparse.ArgumentParser(description="Neuromorphic Chip Host Controller") parser.add_argument("--port", required=True, help="Serial port (e.g., COM3 or /dev/ttyUSB1)") parser.add_argument("--baud", type=int, default=115200, help="Baud rate (default: 115200)") parser.add_argument("--demo", action="store_true", help="Run demo program") parser.add_argument("--status", action="store_true", help="Query chip status") args = parser.parse_args() chip = NeuromorphicChip(args.port, args.baud) try: if args.status: state, ts = chip.status() print(f"State: {state} ({'idle' if state == 0 else 'busy'})") print(f"Timestep count: {ts}") elif args.demo: demo(chip) else: print("No command specified. Use --demo or --status") print("Or import NeuromorphicChip in Python for programmatic access:") print("") print(" from host import NeuromorphicChip") print(" chip = NeuromorphicChip('COM3')") print(" chip.prog_conn(0, 0, [(1, 1200), (2, 800)]) # N0 -> N1(w=1200), N2(w=800)") print(" chip.prog_index(0, 0, 0, 2) # Or use prog_conn() which handles this") print(" chip.stimulus(core=0, neuron=0, current=1200)") print(" spikes = chip.run(100)") finally: chip.close() if __name__ == "__main__": main()