catalyst-cloud / app.py
Henry Barnes
fix: replace deprecated gr.update() with gr.Dropdown() for Gradio 5.x
488732d
"""Catalyst Neuromorphic — Interactive Processor Configurator & Simulator.
Explore the N1 and N2 neuromorphic processors, configure spiking neural
networks with hardware-accurate constraints, and run local simulations
with live spike raster visualisation.
"""
import gradio as gr
import numpy as np
# ── Processor specs ──────────────────────────────────────────────────────────
PROCESSORS = {
"N1": {
"cores": 128,
"neurons_per_core": 1024,
"synapses_per_core": 131072,
"total_neurons": 131072,
"dendrites": 4,
"graded_spike_bits": 8,
"learning_opcodes": 14,
"max_axon_delay": 63,
"parity": "Intel Loihi 1",
"neuron_models": ["LIF"],
"features": [
"Dendritic compartments (4 per neuron)",
"Graded spikes (8-bit payload)",
"On-chip learning (14-opcode ISA, STDP)",
"Axonal delays (up to 63 timesteps)",
"Programmable synaptic plasticity",
],
},
"N2": {
"cores": 128,
"neurons_per_core": 1024,
"synapses_per_core": 131072,
"total_neurons": 131072,
"dendrites": 4,
"graded_spike_bits": 8,
"learning_opcodes": 14,
"max_axon_delay": 63,
"parity": "Intel Loihi 2",
"neuron_models": ["LIF", "CUBA", "ALIF", "Izhikevich", "Custom"],
"features": [
"Programmable neuron models (microcode engine)",
"CUBA, LIF, ALIF, Izhikevich built-in",
"Custom neuron models via microcode",
"Graded spikes (8-bit payload)",
"On-chip learning (14-opcode ISA)",
"Dendritic compartments (4 per neuron)",
"Axonal delays (up to 63 timesteps)",
"Multi-chip scalability",
"Three-factor learning rules",
],
},
}
# ── Local LIF simulator ─────────────────────────────────────────────────────
def simulate_lif(populations, connections, timesteps, dt=1.0):
"""Run a simple LIF simulation. Returns spike times per population."""
# Build neuron arrays
pop_offsets = {}
total = 0
for p in populations:
pop_offsets[p["label"]] = total
total += p["size"]
voltage = np.zeros(total)
threshold = np.array([1000.0] * total)
leak = np.array([50.0] * total)
refrac = np.zeros(total, dtype=int)
# Apply per-population params
for p in populations:
off = pop_offsets[p["label"]]
sz = p["size"]
threshold[off : off + sz] = p.get("threshold", 1000)
leak[off : off + sz] = p.get("leak", 50)
# Build weight matrix
W = np.zeros((total, total))
for c in connections:
src_off = pop_offsets[c["source"]]
src_sz = next(p["size"] for p in populations if p["label"] == c["source"])
tgt_off = pop_offsets[c["target"]]
tgt_sz = next(p["size"] for p in populations if p["label"] == c["target"])
topo = c.get("topology", "random_sparse")
w = c.get("weight", 500)
prob = c.get("probability", 0.3)
if topo == "all_to_all":
W[tgt_off : tgt_off + tgt_sz, src_off : src_off + src_sz] = w
elif topo == "one_to_one":
n = min(src_sz, tgt_sz)
for i in range(n):
W[tgt_off + i, src_off + i] = w
else: # random_sparse
mask = np.random.random((tgt_sz, src_sz)) < prob
W[tgt_off : tgt_off + tgt_sz, src_off : src_off + src_sz] = mask * w
# Stimulus: constant current to first population
stim_pop = populations[0]
stim_off = pop_offsets[stim_pop["label"]]
stim_sz = stim_pop["size"]
stim_current = np.zeros(total)
stim_current[stim_off : stim_off + stim_sz] = populations[0].get("input_current", 800)
# Run
spike_times = {p["label"]: {} for p in populations}
for t in range(timesteps):
# Refractory
active = refrac <= 0
# Leak
voltage = voltage * (1.0 - leak / 4096.0)
# Synaptic input from previous spikes
spikes_vec = np.zeros(total)
for p in populations:
off = pop_offsets[p["label"]]
sz = p["size"]
for nid_str, times in spike_times[p["label"]].items():
nid = int(nid_str)
if times and times[-1] == t - 1:
spikes_vec[off + nid] = 1.0
synaptic = W @ spikes_vec
# Update voltage
voltage += (stim_current + synaptic) * active
# Noise (small)
voltage += np.random.randn(total) * 20 * active
# Spike detection
fired = (voltage >= threshold) & active
indices = np.where(fired)[0]
for idx in indices:
# Find which population
for p in populations:
off = pop_offsets[p["label"]]
sz = p["size"]
if off <= idx < off + sz:
nid = idx - off
key = str(nid)
if key not in spike_times[p["label"]]:
spike_times[p["label"]][key] = []
spike_times[p["label"]][key].append(t)
break
# Reset
voltage[fired] = 0.0
refrac[fired] = 3 # refractory period
refrac -= 1
refrac = np.maximum(refrac, 0)
return spike_times, pop_offsets
def make_raster(populations, spike_times, timesteps):
"""Create a dark-themed spike raster plot."""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(12, 5), dpi=100)
fig.patch.set_facecolor("#0d1117")
ax.set_facecolor("#0d1117")
colors = ["#4A9EFF", "#FF6B6B", "#50C878", "#FFD93D", "#C084FC"]
neuron_offset = 0
total_spikes = 0
for i, pop in enumerate(populations):
color = colors[i % len(colors)]
pop_spikes = spike_times.get(pop["label"], {})
for nid_str, times in pop_spikes.items():
nid = int(nid_str)
y = neuron_offset + nid
ax.scatter(times, [y] * len(times), s=1.5, c=color, marker="|", linewidths=0.6)
total_spikes += len(times)
mid = neuron_offset + pop["size"] // 2
ax.annotate(
f'{pop["label"]}\n({pop["size"]})',
xy=(-0.01, mid),
xycoords=("axes fraction", "data"),
fontsize=8,
color=color,
ha="right",
va="center",
)
neuron_offset += pop["size"]
ax.set_xlabel("Timestep", color="#8b949e", fontsize=10)
ax.set_title("Spike Raster", color="white", fontsize=12, fontweight="bold", pad=10)
ax.tick_params(colors="#8b949e", labelsize=8)
ax.spines["bottom"].set_color("#30363d")
ax.spines["left"].set_color("#30363d")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.set_xlim(-1, timesteps + 1)
ax.set_ylim(-1, neuron_offset)
ax.set_yticks([])
plt.tight_layout()
return fig, total_spikes
# ── Hardware constraint validation ───────────────────────────────────────────
def validate_hardware(processor, populations, connections):
"""Check if the network fits on the selected processor."""
spec = PROCESSORS[processor]
total_neurons = sum(p["size"] for p in populations)
max_neurons = spec["total_neurons"]
cores_needed = max(1, -(-total_neurons // spec["neurons_per_core"])) # ceil div
cores_available = spec["cores"]
total_synapses = 0
for c in connections:
src_sz = next(p["size"] for p in populations if p["label"] == c["source"])
tgt_sz = next(p["size"] for p in populations if p["label"] == c["target"])
topo = c.get("topology", "random_sparse")
prob = c.get("probability", 0.3)
if topo == "all_to_all":
total_synapses += src_sz * tgt_sz
elif topo == "one_to_one":
total_synapses += min(src_sz, tgt_sz)
else:
total_synapses += int(src_sz * tgt_sz * prob)
fits = cores_needed <= cores_available
utilisation = (cores_needed / cores_available) * 100
report = f"### Hardware Mapping: {processor}\n\n"
report += f"| Resource | Used | Available | Status |\n"
report += f"|----------|------|-----------|--------|\n"
report += f"| Neurons | {total_neurons:,} | {max_neurons:,} | {'OK' if total_neurons <= max_neurons else 'OVER'} |\n"
report += f"| Cores | {cores_needed} | {cores_available} | {'OK' if fits else 'OVER'} |\n"
report += f"| Synapses | {total_synapses:,} | {spec['synapses_per_core'] * cores_available:,} | est. |\n"
report += f"| Utilisation | {utilisation:.1f}% | | |\n\n"
if fits:
report += f"**Network fits on {processor}.** Using {cores_needed}/{cores_available} cores ({utilisation:.1f}%)."
else:
report += f"**Network does NOT fit on {processor}.** Needs {cores_needed} cores but only {cores_available} available. Reduce neuron count."
return report, fits
# ── Main interface ───────────────────────────────────────────────────────────
def run_demo(processor, num_cores, neurons_per_core, neuron_model,
hidden_size, output_size, topology, weight, probability,
timesteps, input_current):
"""Configure, validate, and simulate."""
input_size = num_cores * neurons_per_core
if input_size > 2048:
input_size = 2048 # cap for demo performance
populations = [
{"label": "input", "size": min(input_size, 512), "threshold": 1000,
"leak": 50, "input_current": input_current},
]
connections = []
if hidden_size > 0:
populations.append({"label": "hidden", "size": hidden_size,
"threshold": 1000, "leak": 50})
connections.append({
"source": "input", "target": "hidden",
"topology": topology, "weight": weight, "probability": probability,
})
if output_size > 0:
src = "hidden" if hidden_size > 0 else "input"
populations.append({"label": "output", "size": output_size,
"threshold": 1000, "leak": 50})
connections.append({
"source": src, "target": "output",
"topology": topology, "weight": weight, "probability": probability,
})
# Validate
hw_report, fits = validate_hardware(processor, populations, connections)
# Cap simulation size for responsiveness
total = sum(p["size"] for p in populations)
if total > 2000:
return hw_report + "\n\n*Demo capped at 2,000 neurons for browser performance. Full scale available via Cloud API.*", None
# Simulate
spike_times, _ = simulate_lif(populations, connections, timesteps)
fig, total_spikes = make_raster(populations, spike_times, timesteps)
# Stats
stats = f"\n\n---\n### Simulation Results\n"
stats += f"- **Total spikes**: {total_spikes:,}\n"
stats += f"- **Timesteps**: {timesteps}\n"
stats += f"- **Neuron model**: {neuron_model}\n"
for p in populations:
pop_spikes = sum(len(t) for t in spike_times[p["label"]].values())
rate = pop_spikes / (p["size"] * timesteps) if p["size"] * timesteps > 0 else 0
stats += f"- **{p['label']}**: {pop_spikes:,} spikes ({rate:.3f} spikes/neuron/step)\n"
return hw_report + stats, fig
def get_neuron_models(processor):
"""Return available neuron models for selected processor."""
models = PROCESSORS[processor]["neuron_models"]
return gr.Dropdown(choices=models, value=models[0])
def get_processor_info(processor):
"""Return markdown specs for selected processor."""
spec = PROCESSORS[processor]
md = f"## Catalyst {processor}\n\n"
md += f"**Parity**: {spec['parity']}\n\n"
md += f"| Spec | Value |\n|------|-------|\n"
md += f"| Cores | {spec['cores']} |\n"
md += f"| Neurons/core | {spec['neurons_per_core']:,} |\n"
md += f"| Total neurons | {spec['total_neurons']:,} |\n"
md += f"| Synapses/core | {spec['synapses_per_core']:,} |\n"
md += f"| Dendrites | {spec['dendrites']} compartments |\n"
md += f"| Graded spikes | {spec['graded_spike_bits']}-bit |\n"
md += f"| Learning opcodes | {spec['learning_opcodes']} |\n"
md += f"| Max axon delay | {spec['max_axon_delay']} timesteps |\n"
md += f"| Neuron models | {', '.join(spec['neuron_models'])} |\n\n"
md += "### Key Features\n\n"
for f in spec["features"]:
md += f"- {f}\n"
return md
# ── Gradio app ───────────────────────────────────────────────────────────────
with gr.Blocks(
title="Catalyst Neuromorphic — Processor Configurator",
theme=gr.themes.Base(
primary_hue="blue",
neutral_hue="slate",
font=gr.themes.GoogleFont("Inter"),
),
css="""
.gradio-container { max-width: 1100px !important; }
.dark { background: #0d1117 !important; }
""",
) as demo:
gr.Markdown("""
# Catalyst Neuromorphic — Processor Configurator
Explore the **N1** and **N2** spiking neuromorphic processors.
Configure networks, validate hardware constraints, and run simulations — all in the browser.
""")
with gr.Tab("Processors"):
gr.Markdown("""
### Compare the N1 and N2 neuromorphic processors
| | **N1** | **N2** |
|---|---|---|
| **Parity** | Intel Loihi 1 | Intel Loihi 2 |
| **Cores** | 128 | 128 |
| **Neurons/core** | 1,024 | 1,024 |
| **Total neurons** | 131,072 | 131,072 |
| **Neuron models** | CUBA LIF | CUBA, Izhikevich, Adaptive LIF, Sigma-Delta, Resonate-and-Fire |
| **Learning** | 14-opcode ISA, STDP | 14-opcode ISA, three-factor |
| **Dendrites** | 4 compartments | 4 compartments |
| **Graded spikes** | 8-bit | 8-bit |
| **Max axon delay** | 63 timesteps | 63 timesteps |
| **Key advance** | Foundation | Programmable neuron microcode engine |
The **N1** is a complete neuromorphic processor with full Loihi 1 parity — 128 cores, on-chip learning, dendritic computation.
The **N2** adds a **programmable microcode engine** for custom neuron models. Instead of hardwired LIF, you can program arbitrary neuron dynamics — CUBA, ALIF, Izhikevich, or anything you design.
Both have been validated on FPGA. Both are fully open-design.
**Papers**: [N1 Paper (Zenodo)](https://zenodo.org/records/18727094) | [N2 Paper (Zenodo)](https://zenodo.org/records/18728256)
**Website**: [catalyst-neuromorphic.com](https://catalyst-neuromorphic.com)
""")
with gr.Tab("Configure & Simulate"):
with gr.Row():
with gr.Column(scale=1):
processor = gr.Radio(
["N1", "N2"], value="N2", label="Processor",
info="Select which processor to target",
)
neuron_model = gr.Dropdown(
["LIF", "CUBA", "ALIF", "Izhikevich", "Custom"],
value="LIF", label="Neuron Model",
info="N2 supports programmable models",
)
num_cores = gr.Slider(1, 128, value=4, step=1,
label="Cores", info="How many cores to use")
neurons_per_core = gr.Slider(1, 1024, value=64, step=1,
label="Neurons per core")
with gr.Column(scale=1):
hidden_size = gr.Slider(0, 512, value=128, step=1,
label="Hidden neurons", info="0 = direct input→output")
output_size = gr.Slider(0, 256, value=64, step=1,
label="Output neurons", info="0 = no output layer")
topology = gr.Dropdown(
["all_to_all", "one_to_one", "random_sparse"],
value="random_sparse", label="Connection Topology",
)
weight = gr.Slider(100, 3000, value=800, step=50,
label="Synaptic Weight")
probability = gr.Slider(0.01, 1.0, value=0.3, step=0.01,
label="Connection Probability",
info="For random_sparse topology")
with gr.Row():
timesteps = gr.Slider(10, 500, value=200, step=10, label="Timesteps")
input_current = gr.Slider(100, 5000, value=800, step=100,
label="Input Current")
run_btn = gr.Button("Simulate", variant="primary", size="lg")
with gr.Row():
hw_report = gr.Markdown(label="Hardware Report")
raster_plot = gr.Plot(label="Spike Raster")
# Events
processor.change(get_neuron_models, inputs=[processor], outputs=[neuron_model])
run_btn.click(
run_demo,
inputs=[processor, num_cores, neurons_per_core, neuron_model,
hidden_size, output_size, topology, weight, probability,
timesteps, input_current],
outputs=[hw_report, raster_plot],
)
with gr.Tab("Cloud API"):
gr.Markdown("""
### Run at scale with the Catalyst Cloud API
The simulator above runs locally in the browser for small networks.
For **full-scale simulations** (131K+ neurons, hardware-accurate timing, on-chip learning),
use the Catalyst Cloud API.
**Install the Python SDK:**
```bash
pip install catalyst-cloud
```
**Quick start:**
```python
from catalyst_cloud import Client
client = Client("cn_live_your_key_here")
# Create a network
net = client.create_network(
populations=[
{"label": "input", "size": 700},
{"label": "hidden", "size": 512, "params": {"threshold": 1000}},
],
connections=[
{"source": "input", "target": "hidden",
"topology": "random_sparse", "weight": 500, "p": 0.3},
],
)
# Run simulation
result = client.simulate(
network_id=net["network_id"],
timesteps=250,
stimuli=[{"population": "input", "current": 5000}],
)
print(f"Total spikes: {result['total_spikes']}")
```
### Links
- [Sign up for free](https://catalyst-neuromorphic.com/cloud)
- [API Documentation](https://catalyst-neuromorphic.com/cloud/docs)
- [Pricing](https://catalyst-neuromorphic.com/cloud/pricing)
- [PyPI: catalyst-cloud](https://pypi.org/project/catalyst-cloud/)
- [GitHub: catalyst-cloud-python](https://github.com/catalyst-neuromorphic/catalyst-cloud-python)
""")
gr.Markdown("""
---
[Website](https://catalyst-neuromorphic.com) |
[Research](https://catalyst-neuromorphic.com/research) |
[Cloud API](https://catalyst-neuromorphic.com/cloud) |
[GitHub](https://github.com/catalyst-neuromorphic)
""")
demo.launch()