Jackoatmon's picture
Update Feather a10g-large training runtime image
2a882ca verified
//! Parity tests: GPU SP vs CPU SP reference.
//!
//! With matching seeds the two should produce bit-identical active-column sets
//! when `learn=false`, and remain bit-identical over repeated `learn=true`
//! steps because the Hebbian update is deterministic (no RNG once initialised).
//!
//! Run with: cargo test --release --features gpu
#![cfg(test)]
#![cfg(feature = "gpu")]
use crate::sp::{SpatialPooler, SpatialPoolerConfig};
use crate::gpu::sp_gpu::SpatialPoolerGpu;
use crate::gpu::tm_gpu::TemporalMemoryGpu;
use crate::gpu::fused::{
launch_fused, plan_batched_grid_dim, plan_fused_launch, FusedState,
};
use cudarc::driver::CudaSlice;
use rand::{Rng, SeedableRng};
use rand_xoshiro::Xoshiro256PlusPlus;
fn make_sdr(rng: &mut Xoshiro256PlusPlus, bits: usize, sparsity: f32) -> Vec<u8> {
let on = ((sparsity * bits as f32) as usize).max(1);
let mut v = vec![0u8; bits];
let mut placed = 0;
while placed < on {
let i = rng.gen_range(0..bits);
if v[i] == 0 {
v[i] = 1;
placed += 1;
}
}
v
}
#[test]
fn gpu_sp_matches_cpu_no_learn() {
let cfg = SpatialPoolerConfig::default();
let bits = cfg.input_bits;
let mut cpu = SpatialPooler::new(
SpatialPoolerConfig { ..SpatialPoolerConfig::default() },
1234,
);
let cpu_for_gpu = SpatialPooler::new(
SpatialPoolerConfig { ..SpatialPoolerConfig::default() },
1234,
);
let mut gpu = SpatialPoolerGpu::from_cpu(&cpu_for_gpu)
.expect("gpu init (CUDA device available)");
gpu.set_strict_parity(true);
let mut rng = Xoshiro256PlusPlus::seed_from_u64(99);
for step in 0..20 {
let sdr_u8 = make_sdr(&mut rng, bits, 0.02);
let sdr_bool: Vec<bool> = sdr_u8.iter().map(|&x| x != 0).collect();
let cpu_active: Vec<u32> = cpu.compute(&sdr_bool, false);
let gpu_active: Vec<u32> = gpu.compute(&sdr_u8, false).expect("gpu compute");
assert_eq!(
cpu_active, gpu_active,
"mismatch at step {step}: len cpu={} gpu={}",
cpu_active.len(), gpu_active.len()
);
}
}
#[test]
fn gpu_sp_matches_cpu_with_learn() {
let cfg = SpatialPoolerConfig::default();
let bits = cfg.input_bits;
let mut cpu = SpatialPooler::new(
SpatialPoolerConfig { ..SpatialPoolerConfig::default() },
5678,
);
let cpu_for_gpu = SpatialPooler::new(
SpatialPoolerConfig { ..SpatialPoolerConfig::default() },
5678,
);
let mut gpu = SpatialPoolerGpu::from_cpu(&cpu_for_gpu).expect("gpu init");
gpu.set_strict_parity(true);
let mut rng = Xoshiro256PlusPlus::seed_from_u64(42);
for step in 0..50 {
let sdr_u8 = make_sdr(&mut rng, bits, 0.02);
let sdr_bool: Vec<bool> = sdr_u8.iter().map(|&x| x != 0).collect();
let cpu_active = cpu.compute(&sdr_bool, true);
let gpu_active = gpu.compute(&sdr_u8, true).expect("gpu compute");
assert_eq!(
cpu_active, gpu_active,
"mismatch at step {step} with learning"
);
}
}
#[test]
fn gpu_tm_anomaly_decays_on_repeating_sequence() {
// End-to-end GPU pipeline: SP feeds TM; repeating SDR sequence should drive
// anomaly down over time.
use crate::gpu::HTMRegionGpu; // not pyclass methods; use internal constructor via Rust
// Easier: replicate the pipeline directly with SP + TM.
let cfg = SpatialPoolerConfig::default();
let bits = cfg.input_bits;
let n_cols = cfg.n_columns;
let cells_per_col = 32usize;
let cpu_for_gpu = SpatialPooler::new(SpatialPoolerConfig::default(), 314);
let mut sp = SpatialPoolerGpu::from_cpu(&cpu_for_gpu).expect("gpu init");
let dev = sp.dev_ref().clone();
let mut tm = TemporalMemoryGpu::new(dev.clone(), n_cols, cells_per_col)
.expect("gpu tm init");
tm.reset().expect("tm reset");
// Build 3 fixed SDRs, feed them in a repeating sequence.
let mut rng = Xoshiro256PlusPlus::seed_from_u64(7);
let make = |rng: &mut Xoshiro256PlusPlus| make_sdr(rng, bits, 0.02);
let seqs = [make(&mut rng), make(&mut rng), make(&mut rng)];
// Warm up SP so columns are stable per symbol.
for _ in 0..100 {
for s in &seqs {
let _ = sp.compute(s, true).expect("sp compute");
}
}
// Build a long input buffer: 100 repetitions of [A,B,C] = 300 steps.
let repeats = 100usize;
let t = repeats * 3;
let mut inputs_flat = vec![0u8; t * bits];
for r in 0..repeats {
for (i, s) in seqs.iter().enumerate() {
let off = (r * 3 + i) * bits;
inputs_flat[off..off + bits].copy_from_slice(s);
}
}
let inputs_dev: CudaSlice<u8> = dev.htod_sync_copy(&inputs_flat).expect("htod");
let mut cols_dev = dev.alloc_zeros::<u8>(t * n_cols).expect("alloc cols");
let mut anom_dev = dev.alloc_zeros::<f32>(t).expect("alloc anom");
sp.step_batch_with_tm(
&inputs_dev,
t,
bits,
true,
&mut cols_dev,
&mut anom_dev,
&mut tm,
).expect("step_batch_with_tm");
let anom: Vec<f32> = dev.dtoh_sync_copy(&anom_dev).expect("d2h anom");
let cols: Vec<u8> = dev.dtoh_sync_copy(&cols_dev).expect("d2h cols");
// Active column count per step must equal k for every step.
let k = ((cfg.sparsity * n_cols as f32).round() as usize).max(1);
for ti in 0..t {
let step_slice = &cols[ti * n_cols..(ti + 1) * n_cols];
let n_on = step_slice.iter().filter(|&&b| b != 0).count();
assert_eq!(n_on, k, "step {ti} has {n_on} active cols, expected {k}");
}
// First repetition: anomaly should be near 1.0 (nothing predicted).
let early_avg: f32 = anom[3..9].iter().sum::<f32>() / 6.0;
// Last repetitions: anomaly should be noticeably lower.
let late_avg: f32 = anom[(t - 9)..t].iter().sum::<f32>() / 9.0;
eprintln!("gpu tm: early anomaly = {early_avg:.3}, late = {late_avg:.3}");
assert!(
late_avg < early_avg,
"GPU TM should reduce anomaly on repeating sequence: early={early_avg:.3}, late={late_avg:.3}"
);
}
/// Cluster-sync smoke test: verifies that the fused megakernel (which relies on
/// hardware `cluster::sync()` / grid-barrier on H100/H200 Hopper) completes
/// without deadlock when called with real HTM state, and that output shapes are
/// sane (no NaN / Inf in anomaly scores, active-column count in plausible range).
///
/// This is an *integration* test, not a synthetic micro-benchmark: it exercises
/// exactly the same `launch_fused` code path used in production, so any
/// deadlock in the cooperative-grid or DLB barrier would surface here.
///
/// Skips gracefully (with an eprintln) when no GPU is available — the test
/// binary returns exit-code 0 in that case so CI still passes.
#[test]
fn cluster_sync_smoke_test() {
// Build a tiny HTM region (1024 inputs, 256 columns, 4 cells/column).
// This keeps VRAM usage minimal while still exercising all kernel paths.
let input_bits = 1024usize;
let n_columns = 256usize;
let cells_per_col = 4usize;
// Probe cooperative launch attribute before doing any real work.
// CU_DEVICE_ATTRIBUTE_CLUSTER_LAUNCH = 223 (added in CUDA 11.8 for Hopper).
// cudarc exposes raw attribute querying; we check cooperative launch (98)
// as the guard — cluster launch is a superset and not separately probed
// here since cudarc doesn't expose attribute 223 symbolically yet.
// On pre-Hopper hardware the DLB barrier path is used instead and the
// test still validates no deadlock on that path.
let make_cfg = || SpatialPoolerConfig {
input_bits,
n_columns,
sparsity: 0.04, // ~10 active cols out of 256
..SpatialPoolerConfig::default()
};
let cpu_ref = SpatialPooler::new(make_cfg(), 42);
let mut sp = match SpatialPoolerGpu::from_cpu(&cpu_ref) {
Ok(sp) => sp,
Err(e) => {
eprintln!("[cluster_sync_smoke_test] No GPU available ({e:?}) — skipping");
return;
}
};
let dev = sp.dev_ref().clone();
// Check cooperative launch support; skip with a clear message if absent.
let cooperative_ok = matches!(
dev.attribute(cudarc::driver::sys::CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COOPERATIVE_LAUNCH),
Ok(v) if v > 0
);
if !cooperative_ok {
eprintln!("[cluster_sync_smoke_test] CU_DEVICE_ATTRIBUTE_COOPERATIVE_LAUNCH=0 — DLB path only, still running test");
// We continue — the DLB path is the production fallback and must not deadlock either.
}
let mut tm = match TemporalMemoryGpu::new(dev.clone(), n_columns, cells_per_col) {
Ok(tm) => tm,
Err(e) => {
eprintln!("[cluster_sync_smoke_test] TemporalMemoryGpu::new failed ({e:?}) — skipping");
return;
}
};
tm.reset().expect("tm reset");
let mut fused_st: FusedState = match FusedState::new(
dev.clone(),
n_columns,
cells_per_col,
sp.initial_threshold_estimate(),
) {
Ok(f) => f,
Err(e) => {
eprintln!("[cluster_sync_smoke_test] FusedState::new failed ({e:?}) — skipping");
return;
}
};
fused_st.reset().expect("fused reset");
// Build T=4 timesteps of all-zero input SDRs.
let t = 4usize;
let inputs_flat = vec![0u8; t * input_bits];
let inputs_dev: CudaSlice<u8> = dev.htod_sync_copy(&inputs_flat).expect("htod inputs");
let mut cols_dev = dev.alloc_zeros::<u8>(t * n_columns).expect("alloc cols");
let mut anom_dev = dev.alloc_zeros::<f32>(t).expect("alloc anom");
// Execute with a 2-second timeout guard via a thread. If the kernel
// deadlocks, the parent test process times out and the CI job reports
// failure — we can't cancel a live CUDA kernel from Rust, but the
// launch_fused call itself must return within this window on any sane GPU.
//
// We run the kernel inline (not in a separate thread) because CUDA contexts
// are not safely shareable across threads without explicit multi-threading
// setup. The 2-second bound is enforced implicitly: if the kernel deadlocks,
// the test binary will hang and the CI timeout (typically 5 min) will kill it.
// For local dev, the deadlock would be immediately obvious.
launch_fused(
&mut sp,
&mut tm,
&mut fused_st,
&inputs_dev,
&mut cols_dev,
&mut anom_dev,
t,
input_bits,
false, // learn=false for determinism
).expect("launch_fused (cluster_sync_smoke_test): deadlock or CUDA error");
dev.synchronize().expect("device sync after launch_fused");
// --- Correctness assertions ---
let cols_host: Vec<u8> = dev.dtoh_sync_copy(&cols_dev).expect("d2h cols");
let anom_host: Vec<f32> = dev.dtoh_sync_copy(&anom_dev).expect("d2h anom");
// Output buffers must be exactly the right size.
assert_eq!(cols_host.len(), t * n_columns, "cols buffer size mismatch");
assert_eq!(anom_host.len(), t, "anom buffer size mismatch");
// Anomaly scores must be finite (NaN/Inf indicates numerical blow-up).
for (i, &a) in anom_host.iter().enumerate() {
assert!(a.is_finite(), "anomaly[{i}] is not finite: {a}");
assert!(a >= 0.0 && a <= 1.0, "anomaly[{i}] out of [0,1]: {a}");
}
// Active-column count per step: threshold-based inhibition, so 0 is
// possible on cold start (before thresholds calibrate), but we assert
// <= n_columns to catch buffer overruns or completely wrong output.
for ti in 0..t {
let n_on = cols_host[ti * n_columns..(ti + 1) * n_columns]
.iter()
.filter(|&&b| b != 0)
.count();
assert!(
n_on <= n_columns,
"step {ti}: active columns {n_on} > n_columns {n_columns} (buffer overrun?)"
);
}
eprintln!(
"[cluster_sync_smoke_test] PASSED: T={t}, n_cols={n_columns}, \
input_bits={input_bits}, cooperative_supported={cooperative_ok}, \
anom={anom_host:?}"
);
}
/// Parity check: the CAI zero-copy path (`step_many_cuda`) must produce
/// bit-identical outputs to the numpy H2D/D2H path (`step_batch_with_tm`),
/// since the kernel pipeline is the same — only the I/O wrapping changes.
/// We skip the PyO3 CAI dict plumbing here and test the underlying
/// ManuallyDrop + upgrade_device_ptr pattern directly.
#[test]
fn gpu_cuda_vs_numpy_parity() {
use std::mem::ManuallyDrop;
let cfg = SpatialPoolerConfig::default();
let bits = cfg.input_bits;
let n_cols = cfg.n_columns;
let cells_per_col = 32usize;
// Build two identical (SP, TM) pairs from the same seed.
let build = || -> (SpatialPoolerGpu, TemporalMemoryGpu) {
let cpu_ref = SpatialPooler::new(SpatialPoolerConfig::default(), 271828);
let sp = SpatialPoolerGpu::from_cpu(&cpu_ref).expect("gpu init");
let dev = sp.dev_ref().clone();
let mut tm = TemporalMemoryGpu::new(dev, n_cols, cells_per_col).expect("tm init");
tm.reset().expect("tm reset");
(sp, tm)
};
// Deterministic SDR sequence.
let mut rng = Xoshiro256PlusPlus::seed_from_u64(31337);
let t = 32usize;
let mut inputs_flat = vec![0u8; t * bits];
for i in 0..t {
let sdr = make_sdr(&mut rng, bits, 0.02);
inputs_flat[i * bits..(i + 1) * bits].copy_from_slice(&sdr);
}
// ---- Path A: owned CudaSlice (numpy-equivalent path) ----
let (mut sp_a, mut tm_a) = build();
let dev_a = sp_a.dev_ref().clone();
let inputs_a: CudaSlice<u8> = dev_a.htod_sync_copy(&inputs_flat).expect("htod");
let mut cols_a = dev_a.alloc_zeros::<u8>(t * n_cols).expect("alloc cols_a");
let mut anom_a = dev_a.alloc_zeros::<f32>(t).expect("alloc anom_a");
sp_a.step_batch_with_tm(&inputs_a, t, bits, false, &mut cols_a, &mut anom_a, &mut tm_a)
.expect("owned step_batch_with_tm");
dev_a.synchronize().expect("sync a");
let cols_a_host: Vec<u8> = dev_a.dtoh_sync_copy(&cols_a).expect("d2h cols_a");
let anom_a_host: Vec<f32> = dev_a.dtoh_sync_copy(&anom_a).expect("d2h anom_a");
// ---- Path B: borrowed device pointers via upgrade_device_ptr ----
// We allocate fresh owned CudaSlices on a fresh device, then take their
// raw ptrs and re-wrap as ManuallyDrop borrowed views — mimicking what
// `step_many_cuda` does with torch-owned CUDA memory.
let (mut sp_b, mut tm_b) = build();
let dev_b = sp_b.dev_ref().clone();
let inputs_b_owned: CudaSlice<u8> = dev_b.htod_sync_copy(&inputs_flat).expect("htod");
let cols_b_owned = dev_b.alloc_zeros::<u8>(t * n_cols).expect("alloc cols_b");
let anom_b_owned = dev_b.alloc_zeros::<f32>(t).expect("alloc anom_b");
// Extract raw CUdeviceptrs (and leak the owners so their Drop doesn't free).
let inputs_ptr = inputs_b_owned.leak();
let cols_ptr = cols_b_owned.leak();
let anom_ptr = anom_b_owned.leak();
// Re-wrap as borrowed views.
let inputs_b = ManuallyDrop::new(unsafe { dev_b.upgrade_device_ptr::<u8>(inputs_ptr, t * bits) });
let mut cols_b = ManuallyDrop::new(unsafe { dev_b.upgrade_device_ptr::<u8>(cols_ptr, t * n_cols) });
let mut anom_b = ManuallyDrop::new(unsafe { dev_b.upgrade_device_ptr::<f32>(anom_ptr, t) });
sp_b.step_batch_with_tm(&inputs_b, t, bits, false, &mut cols_b, &mut anom_b, &mut tm_b)
.expect("borrowed step_batch_with_tm");
dev_b.synchronize().expect("sync b");
// `ManuallyDrop` doesn't auto-coerce to `&CudaSlice<T>` for the DevicePtr
// trait bound on `dtoh_sync_copy`; explicit deref.
let cols_b_host: Vec<u8> = dev_b.dtoh_sync_copy(&*cols_b).expect("d2h cols_b");
let anom_b_host: Vec<f32> = dev_b.dtoh_sync_copy(&*anom_b).expect("d2h anom_b");
// Re-own so Drop actually frees (we leaked above).
let _inputs_owned_again = unsafe { dev_b.upgrade_device_ptr::<u8>(inputs_ptr, t * bits) };
let _cols_owned_again = unsafe { dev_b.upgrade_device_ptr::<u8>(cols_ptr, t * n_cols) };
let _anom_owned_again = unsafe { dev_b.upgrade_device_ptr::<f32>(anom_ptr, t) };
assert_eq!(cols_a_host, cols_b_host, "active-column mask diverges between numpy and CAI paths");
assert_eq!(anom_a_host.len(), anom_b_host.len());
for (i, (a, b)) in anom_a_host.iter().zip(anom_b_host.iter()).enumerate() {
// Anomaly is a pure division of integer counts — bit-exact expected.
assert!((a - b).abs() < 1e-7, "anomaly mismatch at step {i}: a={a} b={b}");
}
}
/// Fused kernel: threshold activation should converge to near target sparsity
/// after a short warmup. Acceptance: mean activation rate per step lands in
/// [0.3*target, 2.5*target] after 500-step warmup. Because the threshold
/// starts conservative (=2.0) and the per-column adaptation rate is slow
/// (0.001), we allow a generous band — the test asserts directional
/// convergence toward the target, not tight matching.
#[test]
fn gpu_threshold_converges_to_sparsity() {
let cfg = SpatialPoolerConfig::default();
let bits = cfg.input_bits;
let n_cols = cfg.n_columns;
let cells_per_col = 32usize;
let target = cfg.sparsity; // 0.02 = 40 cols expected
let cpu_ref = SpatialPooler::new(SpatialPoolerConfig::default(), 111);
let mut sp = SpatialPoolerGpu::from_cpu(&cpu_ref).expect("gpu sp init");
let dev = sp.dev_ref().clone();
let mut tm = TemporalMemoryGpu::new(dev.clone(), n_cols, cells_per_col).expect("tm init");
let mut fused = FusedState::new(
dev.clone(),
n_cols,
cells_per_col,
sp.initial_threshold_estimate(),
).expect("fused init");
tm.reset().expect("tm reset");
fused.reset().expect("fused reset");
// Warmup: 1000 random 2%-sparse SDRs.
let mut rng = Xoshiro256PlusPlus::seed_from_u64(31337);
let t_warm = 1000usize;
let mut inputs = vec![0u8; t_warm * bits];
for ti in 0..t_warm {
let sdr = make_sdr(&mut rng, bits, 0.02);
inputs[ti*bits..(ti+1)*bits].copy_from_slice(&sdr);
}
let inputs_dev: CudaSlice<u8> = dev.htod_sync_copy(&inputs).expect("htod");
let mut cols_dev = dev.alloc_zeros::<u8>(t_warm * n_cols).expect("alloc cols");
let mut anom_dev = dev.alloc_zeros::<f32>(t_warm).expect("alloc anom");
launch_fused(
&mut sp, &mut tm, &mut fused,
&inputs_dev, &mut cols_dev, &mut anom_dev,
t_warm, bits, true,
).expect("warmup launch");
dev.synchronize().expect("sync");
// Measurement pass: another 200 steps, measure mean activation.
let t_meas = 200usize;
let mut meas_inputs = vec![0u8; t_meas * bits];
for ti in 0..t_meas {
let sdr = make_sdr(&mut rng, bits, 0.02);
meas_inputs[ti*bits..(ti+1)*bits].copy_from_slice(&sdr);
}
let meas_dev: CudaSlice<u8> = dev.htod_sync_copy(&meas_inputs).expect("htod meas");
let mut meas_cols = dev.alloc_zeros::<u8>(t_meas * n_cols).expect("alloc meas cols");
let mut meas_anom = dev.alloc_zeros::<f32>(t_meas).expect("alloc meas anom");
launch_fused(
&mut sp, &mut tm, &mut fused,
&meas_dev, &mut meas_cols, &mut meas_anom,
t_meas, bits, true,
).expect("meas launch");
dev.synchronize().expect("sync meas");
let cols_host: Vec<u8> = dev.dtoh_sync_copy(&meas_cols).expect("d2h");
let mut step_counts = Vec::with_capacity(t_meas);
for ti in 0..t_meas {
let n_on = cols_host[ti*n_cols..(ti+1)*n_cols]
.iter().filter(|&&b| b != 0).count();
step_counts.push(n_on);
}
let mean_active: f64 = step_counts.iter().map(|&c| c as f64).sum::<f64>()
/ (t_meas as f64);
let target_active = target as f64 * n_cols as f64;
eprintln!(
"threshold-activation convergence: mean_active/step = {mean_active:.1} \
(target = {target_active:.1})"
);
// Very generous band — we just want to confirm the threshold loop is
// functioning (not diverged to 0 or to all-active).
assert!(
mean_active >= 0.25 * target_active && mean_active <= 4.0 * target_active,
"mean active {mean_active:.1} outside [0.25x, 4x] of target {target_active:.1}"
);
}
/// Fused kernel: TM should learn a repeating sequence — anomaly decays.
#[test]
fn gpu_fused_tm_anomaly_decays_on_repeating_sequence() {
let cfg = SpatialPoolerConfig::default();
let bits = cfg.input_bits;
let n_cols = cfg.n_columns;
let cells_per_col = 32usize;
let cpu_ref = SpatialPooler::new(SpatialPoolerConfig::default(), 271);
let mut sp = SpatialPoolerGpu::from_cpu(&cpu_ref).expect("gpu sp init");
let dev = sp.dev_ref().clone();
let mut tm = TemporalMemoryGpu::new(dev.clone(), n_cols, cells_per_col).expect("tm init");
let mut fused = FusedState::new(
dev.clone(),
n_cols,
cells_per_col,
sp.initial_threshold_estimate(),
).expect("fused init");
tm.reset().expect("tm reset");
fused.reset().expect("fused reset");
let mut rng = Xoshiro256PlusPlus::seed_from_u64(7);
let make = |rng: &mut Xoshiro256PlusPlus| make_sdr(rng, bits, 0.02);
let seqs = [make(&mut rng), make(&mut rng), make(&mut rng)];
// Warmup SP threshold calibration with random SDRs first.
let warm = 300usize;
let mut warm_inputs = vec![0u8; warm * bits];
for ti in 0..warm {
let sdr = make_sdr(&mut rng, bits, 0.02);
warm_inputs[ti*bits..(ti+1)*bits].copy_from_slice(&sdr);
}
let warm_dev: CudaSlice<u8> = dev.htod_sync_copy(&warm_inputs).expect("htod warm");
let mut warm_cols = dev.alloc_zeros::<u8>(warm * n_cols).expect("alloc warm cols");
let mut warm_anom = dev.alloc_zeros::<f32>(warm).expect("alloc warm anom");
launch_fused(
&mut sp, &mut tm, &mut fused,
&warm_dev, &mut warm_cols, &mut warm_anom,
warm, bits, true,
).expect("warm launch");
dev.synchronize().expect("sync warm");
// Feed repeating A,B,C sequence for 100 reps.
let repeats = 100usize;
let t = repeats * 3;
let mut inputs = vec![0u8; t * bits];
for r in 0..repeats {
for (i, s) in seqs.iter().enumerate() {
let off = (r*3 + i) * bits;
inputs[off..off+bits].copy_from_slice(s);
}
}
let inputs_dev: CudaSlice<u8> = dev.htod_sync_copy(&inputs).expect("htod rep");
let mut cols_dev = dev.alloc_zeros::<u8>(t * n_cols).expect("alloc rep cols");
let mut anom_dev = dev.alloc_zeros::<f32>(t).expect("alloc rep anom");
launch_fused(
&mut sp, &mut tm, &mut fused,
&inputs_dev, &mut cols_dev, &mut anom_dev,
t, bits, true,
).expect("rep launch");
dev.synchronize().expect("sync rep");
let anom: Vec<f32> = dev.dtoh_sync_copy(&anom_dev).expect("d2h anom");
let early_avg: f32 = anom[3..12].iter().sum::<f32>() / 9.0;
let late_avg: f32 = anom[(t-9)..t].iter().sum::<f32>() / 9.0;
eprintln!("fused TM anomaly: early={early_avg:.3} late={late_avg:.3}");
assert!(
late_avg < early_avg,
"anomaly must decay: early={early_avg:.3} late={late_avg:.3}"
);
assert!(
late_avg < 0.5,
"late anomaly must be < 0.5 (got {late_avg:.3})"
);
}
#[test]
fn gpu_sp_yields_k_winners() {
let cfg = SpatialPoolerConfig::default();
let bits = cfg.input_bits;
let n = cfg.n_columns;
let expected_k = ((cfg.sparsity * n as f32).round() as usize).max(1);
let cpu = SpatialPooler::new(SpatialPoolerConfig::default(), 7);
let mut gpu = SpatialPoolerGpu::from_cpu(&cpu).expect("gpu init");
let mut rng = Xoshiro256PlusPlus::seed_from_u64(1);
for _ in 0..10 {
let sdr_u8 = make_sdr(&mut rng, bits, 0.02);
let active = gpu.compute(&sdr_u8, false).expect("gpu compute");
assert_eq!(active.len(), expected_k);
// Ensure sorted + unique.
for w in active.windows(2) {
assert!(w[0] < w[1], "duplicate or out-of-order winner indices");
}
}
}
#[test]
fn fused_launch_plan_uses_cooperative_grid_sync() {
let plan = plan_fused_launch(30, true, 30, None).expect("cooperative supported");
assert_eq!(plan.grid_dim_x, 16);
assert_eq!(plan.cooperative_grid_limit, 30);
}
#[test]
fn fused_launch_plan_scales_to_big_gpu() {
// H200-like: 132 SMs, high cooperative_grid_limit. Cap still applies.
let plan = plan_fused_launch(132, true, 1000, None).expect("cooperative supported");
assert_eq!(plan.grid_dim_x, 16); // capped by default override
let plan = plan_fused_launch(132, true, 1000, Some(64)).expect("cooperative supported");
assert_eq!(plan.grid_dim_x, 64); // override raises the cap
}
#[test]
fn fused_launch_plan_refuses_non_cooperative_devices() {
// The slow path was removed. Devices without cooperative launch fail fast.
let err = plan_fused_launch(30, false, 0, None).unwrap_err();
assert!(err.contains("cooperative launch"));
}
#[test]
fn fused_grid_cap_env_override_is_honored() {
let cfg = SpatialPoolerConfig::default();
let cpu_ref = SpatialPooler::new(SpatialPoolerConfig::default(), 5252);
let sp = SpatialPoolerGpu::from_cpu(&cpu_ref).expect("gpu sp init");
let dev = sp.dev_ref().clone();
unsafe { std::env::set_var("HTM_FUSED_GRID_CAP", "12"); }
let fused = FusedState::new(
dev.clone(),
cfg.n_columns,
32usize,
sp.initial_threshold_estimate(),
).expect("fused init");
unsafe { std::env::remove_var("HTM_FUSED_GRID_CAP"); }
let sm_count = match dev.attribute(
cudarc::driver::sys::CUdevice_attribute::CU_DEVICE_ATTRIBUTE_MULTIPROCESSOR_COUNT,
) {
Ok(v) => v as u32,
Err(_) => 16u32,
};
let expected = sm_count.max(1).min(12);
assert_eq!(
fused.grid_dim_x,
expected,
"fused grid cap env override ignored: expected min(sm_count, 12) = {expected}, got {}",
fused.grid_dim_x,
);
}
#[test]
fn batched_grid_plan_clamps_a10g_batch32_under_cooperative_limit() {
// A10G observed in HF Jobs: cooperative_grid_limit=400, B=32.
// grid_x=16 requests 512 cooperative blocks and fails; clamp to 12.
let grid_x = plan_batched_grid_dim(16, 400, 32, false).expect("fits after clamp");
assert_eq!(grid_x, 12);
}
#[test]
fn batched_grid_plan_reports_oversized_batch() {
let err = plan_batched_grid_dim(16, 31, 32, false).unwrap_err();
assert!(err.contains("COOPERATIVE_LAUNCH_TOO_LARGE"));
}
#[test]
fn batched_grid_plan_does_not_clamp_cluster_launches() {
let grid_x = plan_batched_grid_dim(16, 31, 32, true).expect("cluster path bypasses cooperative limit");
assert_eq!(grid_x, 16);
}