#!/usr/bin/env python3 """ Generate new neurons by sampling in functional parameter space. Each neuron is a piecewise-linear function fully described by 6 values: (boundary_x1, boundary_x2, left_slope, mid_slope, right_slope, y_boundary2) We extract these from your existing neurons, fit a distribution over them, sample new combinations, and reconstruct valid W1/b1/W2/b2 for each. """ import numpy as np import torch from safetensors.torch import load_file, save_file from pathlib import Path import json import argparse import os # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- NEURON_SOURCE = "multi" # "single" | "multi" SINGLE_FILE = "test_mlp_hf/model.safetensors" MULTI_DIR = "source_llm_neurons" SINGLE_BOUNDARY_MODE = True # Generate single-boundary neurons (2 active) instead of double-boundary (3 active) N_GENERATE = 500 # generate 500 neurons OUTPUT_DIR = "generated_neurons" RANDOM_SEED = 42 # Generation strategy: # "gaussian" — fit mean/cov to existing neurons, sample from N(mu, sigma) # "interpolate" — convex combinations of pairs of existing neurons # "grid" — systematic grid over the observed parameter ranges # "all" — produce all three sets STRATEGY = "all" # --------------------------------------------------------------------------- # 1. Load existing neurons # --------------------------------------------------------------------------- def load_neurons(source, single_file, multi_dir): neurons = [] if source == "single": w = load_file(single_file) neurons.append({k: v.float().numpy() for k, v in { "W1": w["layer1.weight"], "b1": w["layer1.bias"], "W2": w["layer2.weight"], "b2": w["layer2.bias"], }.items()}) elif source == "multi": for f in sorted(Path(multi_dir).glob("neuron_*.safetensors")): w = load_file(str(f)) neurons.append({k: v.float().numpy() for k, v in { "W1": w["layer1.weight"], "b1": w["layer1.bias"], "W2": w["layer2.weight"], "b2": w["layer2.bias"], }.items()}) return neurons # --------------------------------------------------------------------------- # 2. Extract functional parameters from raw weights # --------------------------------------------------------------------------- def weights_to_functional(W1, b1, W2, b2, x_probe_range=(-2.0, 2.0), n_probe=200000): xs = np.linspace(x_probe_range[0], x_probe_range[1], n_probe) def forward(x_scalar): x = np.array([[x_scalar]], dtype=np.float32) h = np.maximum(0, x @ W1.T + b1) y = h @ W2.T + b2 return float(y.squeeze()) ys = np.array([forward(x) for x in xs]) slopes = np.gradient(ys, xs) slope_changes = np.abs(np.gradient(slopes, xs)) peak_window = int(n_probe * 0.1) idx1 = int(np.argmax(slope_changes)) masked_changes = slope_changes.copy() l_mask = max(0, idx1 - peak_window) r_mask = min(n_probe, idx1 + peak_window) masked_changes[l_mask:r_mask] = 0.0 idx2 = int(np.argmax(masked_changes)) if idx1 > idx2: idx1, idx2 = idx2, idx1 boundary_x1 = float(xs[idx1]) boundary_x2 = float(xs[idx2]) margin = int(n_probe * 0.03) idx_l = max(0, idx1 - margin) idx_m1 = min(n_probe - 1, idx1 + margin) idx_m2 = max(0, idx2 - margin) idx_r = min(n_probe - 1, idx2 + margin) left_slope = float(np.mean(slopes[:idx_l])) if idx_l > 0 else float(slopes[0]) if idx_m2 > idx_m1: mid_slope = float(np.mean(slopes[idx_m1:idx_m2])) else: mid_slope = float(slopes[(idx1 + idx2) // 2]) right_slope = float(np.mean(slopes[idx_r:])) if idx_r < n_probe - 1 else float(slopes[-1]) y_boundary2 = float(ys[idx2]) return { "boundary_x1": boundary_x1, "boundary_x2": boundary_x2, "left_slope": left_slope, "mid_slope": mid_slope, "right_slope": right_slope, "y_boundary2": y_boundary2, } # --------------------------------------------------------------------------- # 3. Reconstruct weights from functional parameters # --------------------------------------------------------------------------- def functional_to_weights(boundary_x1, boundary_x2, left_slope, mid_slope, right_slope, y_boundary2, n_hidden=8): if boundary_x1 > boundary_x2: boundary_x1, boundary_x2 = boundary_x2, boundary_x1 W1 = np.zeros((n_hidden, 1), dtype=np.float32) b1 = np.zeros(n_hidden, dtype=np.float32) W2 = np.zeros((1, n_hidden), dtype=np.float32) b2 = np.zeros(1, dtype=np.float32) # Neuron 0: always active, pure slope carrier W1[0, 0] = 1.0 b1[0] = 100.0 # Ensures carrier stability during extreme negative activation outliers W2[0, 0] = right_slope # Neuron 1: active left of boundary_x1 W1[1, 0] = -1.0 b1[1] = boundary_x1 W2[0, 1] = -(left_slope - mid_slope) # Neuron 2: active left of boundary_x2 W1[2, 0] = -1.0 b1[2] = boundary_x2 W2[0, 2] = -(mid_slope - right_slope) target_y = y_boundary2 neuron0_out = W2[0, 0] * (W1[0, 0] * boundary_x2 + b1[0]) b2[0] = target_y - neuron0_out return W1, b1, W2, b2 def functional_to_weights_single(boundary_x, left_slope, right_slope, y_at_boundary, n_hidden=8): """Single-boundary version: only 2 active neurons (carrier + 1 transition)""" W1 = np.zeros((n_hidden, 1), dtype=np.float32) b1 = np.zeros(n_hidden, dtype=np.float32) W2 = np.zeros((1, n_hidden), dtype=np.float32) b2 = np.zeros(1, dtype=np.float32) # Neuron 0: always active, pure slope carrier (carries right_slope) W1[0, 0] = 1.0 b1[0] = 100.0 W2[0, 0] = right_slope # Neuron 1: active left of boundary_x (adds left_slope - right_slope) W1[1, 0] = -1.0 b1[1] = boundary_x W2[0, 1] = -(left_slope - right_slope) # Calculate b2 for continuity at boundary target_y = y_at_boundary neuron0_out = W2[0, 0] * (W1[0, 0] * boundary_x + b1[0]) b2[0] = target_y - neuron0_out return W1, b1, W2, b2 # --------------------------------------------------------------------------- # 4. Validate a generated neuron (analytical, not numerical gradient) # --------------------------------------------------------------------------- def _mlp_forward(x_scalar, W1, b1, W2, b2): x = np.array([[x_scalar]], dtype=np.float32) h = np.maximum(0.0, x @ W1.T + b1) return float((h @ W2.T + b2).squeeze()) def validate_neuron(W1, b1, W2, b2, params, tol=0.05): bx1 = params["boundary_x1"] bx2 = params["boundary_x2"] # Dynamically scale probes so we don't accidentally step over boundaries # when random generation places bx1 and bx2 extremely close together. dist = max(abs(bx2 - bx1), 1e-6) eps = min(1e-3, dist / 10.0) gap = min(0.05, dist / 4.0) y_at_bx2 = _mlp_forward(bx2, W1, b1, W2, b2) slope_left = (_mlp_forward(bx1 - gap, W1, b1, W2, b2) - _mlp_forward(bx1 - gap - eps, W1, b1, W2, b2)) / eps x_mid = (bx1 + bx2) / 2 slope_mid = (_mlp_forward(x_mid + eps, W1, b1, W2, b2) - _mlp_forward(x_mid, W1, b1, W2, b2)) / eps slope_right = (_mlp_forward(bx2 + gap + eps, W1, b1, W2, b2) - _mlp_forward(bx2 + gap, W1, b1, W2, b2)) / eps recovered = { "boundary_x1": bx1, "boundary_x2": bx2, "left_slope": slope_left, "mid_slope": slope_mid, "right_slope": slope_right, "y_boundary2": y_at_bx2, } checks = { "left_slope": abs(slope_left - params["left_slope"]) < tol, "mid_slope": abs(slope_mid - params["mid_slope"]) < tol, "right_slope": abs(slope_right - params["right_slope"]) < tol, "y_boundary2": abs(y_at_bx2 - params["y_boundary2"]) < tol * 5, } return all(checks.values()), checks, recovered def validate_neuron_single(W1, b1, W2, b2, params, tol=0.05): """Validate single-boundary neuron (only 2 slopes)""" bx = params["boundary_x"] eps = 1e-3 gap = 0.05 y_at_bx = _mlp_forward(bx, W1, b1, W2, b2) slope_left = (_mlp_forward(bx - gap, W1, b1, W2, b2) - _mlp_forward(bx - gap - eps, W1, b1, W2, b2)) / eps slope_right = (_mlp_forward(bx + gap + eps, W1, b1, W2, b2) - _mlp_forward(bx + gap, W1, b1, W2, b2)) / eps recovered = { "boundary_x": bx, "left_slope": slope_left, "right_slope": slope_right, "y_at_boundary": y_at_bx, } checks = { "left_slope": abs(slope_left - params["left_slope"]) < tol, "right_slope": abs(slope_right - params["right_slope"]) < tol, "y_at_boundary": abs(y_at_bx - params["y_at_boundary"]) < tol * 5, } return all(checks.values()), checks, recovered # --------------------------------------------------------------------------- # 5. Generation strategies # --------------------------------------------------------------------------- def strategy_gaussian(functional_params, n, rng): mat = np.array([ [p["boundary_x1"], p["boundary_x2"], p["left_slope"], p["mid_slope"], p["right_slope"], p["y_boundary2"]] for p in functional_params ]) mu = mat.mean(axis=0) cov = np.cov(mat.T) if len(mat) > 1 else np.eye(6) * 0.1 cov += np.eye(6) * 1e-4 samples = rng.multivariate_normal(mu, cov, size=n) return [ {"boundary_x1": s[0], "boundary_x2": s[1], "left_slope": s[2], "mid_slope": s[3], "right_slope": s[4], "y_boundary2": s[5]} for s in samples ] def strategy_interpolate(functional_params, n, rng): results = [] fp = functional_params for _ in range(n): i, j = rng.choice(len(fp), size=2, replace=True) t = rng.uniform(0, 1) results.append({ k: (1 - t) * fp[i][k] + t * fp[j][k] for k in fp[i] }) return results def strategy_grid(functional_params, n, rng): def get_range(vals, margin=0.2): v_min, v_max = min(vals), max(vals) if v_min == v_max: # Prevent 0-variance collapse by injecting a spread for single neurons offset = abs(v_min) * margin if v_min != 0 else margin return v_min - offset, v_max + offset return v_min, v_max bx1_min, bx1_max = get_range([p["boundary_x1"] for p in functional_params]) bx2_min, bx2_max = get_range([p["boundary_x2"] for p in functional_params]) ls_min, ls_max = get_range([p["left_slope"] for p in functional_params]) ms_min, ms_max = get_range([p["mid_slope"] for p in functional_params]) rs_min, rs_max = get_range([p["right_slope"] for p in functional_params]) yb_min, yb_max = get_range([p["y_boundary2"] for p in functional_params]) side = max(2, int(n ** (1.0/6.0)) + 1) grid = [] for bx1i in np.linspace(bx1_min, bx1_max, side): for bx2i in np.linspace(bx2_min, bx2_max, side): for lsi in np.linspace(ls_min, ls_max, side): for msi in np.linspace(ms_min, ms_max, side): for rsi in np.linspace(rs_min, rs_max, side): for ybi in np.linspace(yb_min, yb_max, side): grid.append({ "boundary_x1": bx1i, "boundary_x2": bx2i, "left_slope": lsi, "mid_slope": msi, "right_slope": rsi, "y_boundary2": ybi, }) rng.shuffle(grid) while len(grid) < n: grid += grid return grid[:n] # --------------------------------------------------------------------------- # 6. Main # --------------------------------------------------------------------------- if __name__ == "__main__": rng = np.random.default_rng(RANDOM_SEED) out = Path(OUTPUT_DIR) out.mkdir(exist_ok=True) print("=" * 60) print("Generating new neurons from existing ones (Multi-Boundary)") print("=" * 60) print("\n[1] Loading existing neurons...") neurons = load_neurons(NEURON_SOURCE, SINGLE_FILE, MULTI_DIR) print(f" {len(neurons)} source neuron(s)") print("\n[2] Extracting functional parameters...") functional_params = [] for k, n in enumerate(neurons): p = weights_to_functional(n["W1"], n["b1"], n["W2"], n["b2"]) functional_params.append(p) print(f" Neuron {k}: boundary1={p['boundary_x1']:+.4f} " f"boundary2={p['boundary_x2']:+.4f} " f"left_slope={p['left_slope']:+.4f} " f"mid_slope={p['mid_slope']:+.4f} " f"right_slope={p['right_slope']:+.4f} " f"y@boundary2={p['y_boundary2']:+.4f}") strategies = ( ["gaussian", "interpolate", "grid"] if STRATEGY == "all" else [STRATEGY] ) total_saved = 0 summary = {} for strat in strategies: print(f"\n[3] Generating {N_GENERATE} neurons via '{strat}'...") if strat == "gaussian": new_params = strategy_gaussian(functional_params, N_GENERATE, rng) elif strat == "interpolate": new_params = strategy_interpolate(functional_params, N_GENERATE, rng) elif strat == "grid": new_params = strategy_grid(functional_params, N_GENERATE, rng) else: raise ValueError(f"Unknown strategy: {strat}") strat_dir = out / strat strat_dir.mkdir(exist_ok=True) n_valid = 0 for idx, p in enumerate(new_params): if SINGLE_BOUNDARY_MODE: # Convert double-boundary params to single-boundary # Use boundary_x1 as the single boundary, ignore boundary_x2 # Use left_slope and right_slope, ignore mid_slope # Estimate y_at_boundary from y_boundary2 W1, b1, W2, b2 = functional_to_weights_single( p["boundary_x1"], p["left_slope"], p["right_slope"], p["y_boundary2"], ) # Create single-boundary params for validation p_single = { "boundary_x": p["boundary_x1"], "left_slope": p["left_slope"], "right_slope": p["right_slope"], "y_at_boundary": p["y_boundary2"], } valid, checks, recovered = validate_neuron_single(W1, b1, W2, b2, p_single) else: W1, b1, W2, b2 = functional_to_weights( p["boundary_x1"], p["boundary_x2"], p["left_slope"], p["mid_slope"], p["right_slope"], p["y_boundary2"], ) valid, checks, recovered = validate_neuron(W1, b1, W2, b2, p) if valid: save_file( { "layer1.weight": torch.tensor(W1), "layer1.bias": torch.tensor(b1), "layer2.weight": torch.tensor(W2), "layer2.bias": torch.tensor(b2), }, # Padded to 6 digits (06d) to prevent python alphabetical sorting issues downstream str(strat_dir / f"neuron_{idx:06d}.safetensors"), ) n_valid += 1 else: failed = [k for k, v in checks.items() if not v] if idx < 10 or idx % 50000 == 0: print(f" [skip] neuron_{idx:06d}: failed checks {failed}") pct = 100 * n_valid / N_GENERATE print(f" Saved {n_valid}/{N_GENERATE} valid neurons ({pct:.0f}%) to {strat_dir}/") summary[strat] = {"generated": N_GENERATE, "valid": n_valid, "path": str(strat_dir)} total_saved += n_valid meta = { "source_neurons": len(neurons), "source_functional_params": functional_params, "strategies": summary, "total_saved": total_saved, } with open(out / "generation_meta.json", "w") as f: json.dump(meta, f, indent=2) print(f"\n{'=' * 60}") print(f"Total neurons generated: {total_saved}") print(f"Metadata saved to {out}/generation_meta.json") print(f"\nTo use generated neurons in append_neurons_to_t5.py:") print(f" NEURON_SOURCE = 'multi'") print(f" MULTI_DIR = '{out}/gaussian' # or interpolate / grid") print(f"{'=' * 60}")