| |
| """ |
| Generate new neurons by sampling in functional parameter space. |
| |
| Each neuron is a piecewise-linear function fully described by 6 values: |
| (boundary_x1, boundary_x2, left_slope, mid_slope, right_slope, y_boundary2) |
| |
| We extract these from your existing neurons, fit a distribution over them, |
| sample new combinations, and reconstruct valid W1/b1/W2/b2 for each. |
| """ |
|
|
| import numpy as np |
| import torch |
| from safetensors.torch import load_file, save_file |
| from pathlib import Path |
| import json |
| import argparse |
| import os |
|
|
| |
| |
| |
|
|
| NEURON_SOURCE = "multi" |
| SINGLE_FILE = "test_mlp_hf/model.safetensors" |
| MULTI_DIR = "source_llm_neurons" |
|
|
| SINGLE_BOUNDARY_MODE = True |
| N_GENERATE = 500 |
| OUTPUT_DIR = "generated_neurons" |
| RANDOM_SEED = 42 |
|
|
| |
| |
| |
| |
| |
| STRATEGY = "all" |
|
|
|
|
| |
| |
| |
|
|
| def load_neurons(source, single_file, multi_dir): |
| neurons = [] |
| if source == "single": |
| w = load_file(single_file) |
| neurons.append({k: v.float().numpy() for k, v in { |
| "W1": w["layer1.weight"], |
| "b1": w["layer1.bias"], |
| "W2": w["layer2.weight"], |
| "b2": w["layer2.bias"], |
| }.items()}) |
| elif source == "multi": |
| for f in sorted(Path(multi_dir).glob("neuron_*.safetensors")): |
| w = load_file(str(f)) |
| neurons.append({k: v.float().numpy() for k, v in { |
| "W1": w["layer1.weight"], |
| "b1": w["layer1.bias"], |
| "W2": w["layer2.weight"], |
| "b2": w["layer2.bias"], |
| }.items()}) |
| return neurons |
|
|
|
|
| |
| |
| |
|
|
| def weights_to_functional(W1, b1, W2, b2, x_probe_range=(-2.0, 2.0), n_probe=200000): |
| xs = np.linspace(x_probe_range[0], x_probe_range[1], n_probe) |
|
|
| def forward(x_scalar): |
| x = np.array([[x_scalar]], dtype=np.float32) |
| h = np.maximum(0, x @ W1.T + b1) |
| y = h @ W2.T + b2 |
| return float(y.squeeze()) |
|
|
| ys = np.array([forward(x) for x in xs]) |
|
|
| slopes = np.gradient(ys, xs) |
| slope_changes = np.abs(np.gradient(slopes, xs)) |
| |
| peak_window = int(n_probe * 0.1) |
| idx1 = int(np.argmax(slope_changes)) |
| |
| masked_changes = slope_changes.copy() |
| l_mask = max(0, idx1 - peak_window) |
| r_mask = min(n_probe, idx1 + peak_window) |
| masked_changes[l_mask:r_mask] = 0.0 |
| |
| idx2 = int(np.argmax(masked_changes)) |
| |
| if idx1 > idx2: |
| idx1, idx2 = idx2, idx1 |
| |
| boundary_x1 = float(xs[idx1]) |
| boundary_x2 = float(xs[idx2]) |
| |
| margin = int(n_probe * 0.03) |
| |
| idx_l = max(0, idx1 - margin) |
| idx_m1 = min(n_probe - 1, idx1 + margin) |
| idx_m2 = max(0, idx2 - margin) |
| idx_r = min(n_probe - 1, idx2 + margin) |
| |
| left_slope = float(np.mean(slopes[:idx_l])) if idx_l > 0 else float(slopes[0]) |
| |
| if idx_m2 > idx_m1: |
| mid_slope = float(np.mean(slopes[idx_m1:idx_m2])) |
| else: |
| mid_slope = float(slopes[(idx1 + idx2) // 2]) |
| |
| right_slope = float(np.mean(slopes[idx_r:])) if idx_r < n_probe - 1 else float(slopes[-1]) |
| y_boundary2 = float(ys[idx2]) |
|
|
| return { |
| "boundary_x1": boundary_x1, |
| "boundary_x2": boundary_x2, |
| "left_slope": left_slope, |
| "mid_slope": mid_slope, |
| "right_slope": right_slope, |
| "y_boundary2": y_boundary2, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def functional_to_weights(boundary_x1, boundary_x2, left_slope, mid_slope, right_slope, y_boundary2, |
| n_hidden=8): |
| if boundary_x1 > boundary_x2: |
| boundary_x1, boundary_x2 = boundary_x2, boundary_x1 |
|
|
| W1 = np.zeros((n_hidden, 1), dtype=np.float32) |
| b1 = np.zeros(n_hidden, dtype=np.float32) |
| W2 = np.zeros((1, n_hidden), dtype=np.float32) |
| b2 = np.zeros(1, dtype=np.float32) |
|
|
| |
| W1[0, 0] = 1.0 |
| b1[0] = 100.0 |
| W2[0, 0] = right_slope |
|
|
| |
| W1[1, 0] = -1.0 |
| b1[1] = boundary_x1 |
| W2[0, 1] = -(left_slope - mid_slope) |
|
|
| |
| W1[2, 0] = -1.0 |
| b1[2] = boundary_x2 |
| W2[0, 2] = -(mid_slope - right_slope) |
|
|
| target_y = y_boundary2 |
| neuron0_out = W2[0, 0] * (W1[0, 0] * boundary_x2 + b1[0]) |
| b2[0] = target_y - neuron0_out |
|
|
| return W1, b1, W2, b2 |
|
|
|
|
| def functional_to_weights_single(boundary_x, left_slope, right_slope, y_at_boundary, |
| n_hidden=8): |
| """Single-boundary version: only 2 active neurons (carrier + 1 transition)""" |
| W1 = np.zeros((n_hidden, 1), dtype=np.float32) |
| b1 = np.zeros(n_hidden, dtype=np.float32) |
| W2 = np.zeros((1, n_hidden), dtype=np.float32) |
| b2 = np.zeros(1, dtype=np.float32) |
|
|
| |
| W1[0, 0] = 1.0 |
| b1[0] = 100.0 |
| W2[0, 0] = right_slope |
|
|
| |
| W1[1, 0] = -1.0 |
| b1[1] = boundary_x |
| W2[0, 1] = -(left_slope - right_slope) |
|
|
| |
| target_y = y_at_boundary |
| neuron0_out = W2[0, 0] * (W1[0, 0] * boundary_x + b1[0]) |
| b2[0] = target_y - neuron0_out |
|
|
| return W1, b1, W2, b2 |
|
|
|
|
| |
| |
| |
|
|
| def _mlp_forward(x_scalar, W1, b1, W2, b2): |
| x = np.array([[x_scalar]], dtype=np.float32) |
| h = np.maximum(0.0, x @ W1.T + b1) |
| return float((h @ W2.T + b2).squeeze()) |
|
|
|
|
| def validate_neuron(W1, b1, W2, b2, params, tol=0.05): |
| bx1 = params["boundary_x1"] |
| bx2 = params["boundary_x2"] |
| |
| |
| |
| dist = max(abs(bx2 - bx1), 1e-6) |
| eps = min(1e-3, dist / 10.0) |
| gap = min(0.05, dist / 4.0) |
|
|
| y_at_bx2 = _mlp_forward(bx2, W1, b1, W2, b2) |
|
|
| slope_left = (_mlp_forward(bx1 - gap, W1, b1, W2, b2) - |
| _mlp_forward(bx1 - gap - eps, W1, b1, W2, b2)) / eps |
|
|
| x_mid = (bx1 + bx2) / 2 |
| slope_mid = (_mlp_forward(x_mid + eps, W1, b1, W2, b2) - |
| _mlp_forward(x_mid, W1, b1, W2, b2)) / eps |
|
|
| slope_right = (_mlp_forward(bx2 + gap + eps, W1, b1, W2, b2) - |
| _mlp_forward(bx2 + gap, W1, b1, W2, b2)) / eps |
|
|
| recovered = { |
| "boundary_x1": bx1, |
| "boundary_x2": bx2, |
| "left_slope": slope_left, |
| "mid_slope": slope_mid, |
| "right_slope": slope_right, |
| "y_boundary2": y_at_bx2, |
| } |
|
|
| checks = { |
| "left_slope": abs(slope_left - params["left_slope"]) < tol, |
| "mid_slope": abs(slope_mid - params["mid_slope"]) < tol, |
| "right_slope": abs(slope_right - params["right_slope"]) < tol, |
| "y_boundary2": abs(y_at_bx2 - params["y_boundary2"]) < tol * 5, |
| } |
| return all(checks.values()), checks, recovered |
|
|
|
|
| def validate_neuron_single(W1, b1, W2, b2, params, tol=0.05): |
| """Validate single-boundary neuron (only 2 slopes)""" |
| bx = params["boundary_x"] |
| eps = 1e-3 |
| gap = 0.05 |
|
|
| y_at_bx = _mlp_forward(bx, W1, b1, W2, b2) |
|
|
| slope_left = (_mlp_forward(bx - gap, W1, b1, W2, b2) - |
| _mlp_forward(bx - gap - eps, W1, b1, W2, b2)) / eps |
|
|
| slope_right = (_mlp_forward(bx + gap + eps, W1, b1, W2, b2) - |
| _mlp_forward(bx + gap, W1, b1, W2, b2)) / eps |
|
|
| recovered = { |
| "boundary_x": bx, |
| "left_slope": slope_left, |
| "right_slope": slope_right, |
| "y_at_boundary": y_at_bx, |
| } |
|
|
| checks = { |
| "left_slope": abs(slope_left - params["left_slope"]) < tol, |
| "right_slope": abs(slope_right - params["right_slope"]) < tol, |
| "y_at_boundary": abs(y_at_bx - params["y_at_boundary"]) < tol * 5, |
| } |
| return all(checks.values()), checks, recovered |
|
|
|
|
| |
| |
| |
|
|
| def strategy_gaussian(functional_params, n, rng): |
| mat = np.array([ |
| [p["boundary_x1"], p["boundary_x2"], p["left_slope"], p["mid_slope"], p["right_slope"], p["y_boundary2"]] |
| for p in functional_params |
| ]) |
|
|
| mu = mat.mean(axis=0) |
| cov = np.cov(mat.T) if len(mat) > 1 else np.eye(6) * 0.1 |
| cov += np.eye(6) * 1e-4 |
|
|
| samples = rng.multivariate_normal(mu, cov, size=n) |
| return [ |
| {"boundary_x1": s[0], "boundary_x2": s[1], "left_slope": s[2], |
| "mid_slope": s[3], "right_slope": s[4], "y_boundary2": s[5]} |
| for s in samples |
| ] |
|
|
|
|
| def strategy_interpolate(functional_params, n, rng): |
| results = [] |
| fp = functional_params |
| for _ in range(n): |
| i, j = rng.choice(len(fp), size=2, replace=True) |
| t = rng.uniform(0, 1) |
| results.append({ |
| k: (1 - t) * fp[i][k] + t * fp[j][k] |
| for k in fp[i] |
| }) |
| return results |
|
|
|
|
| def strategy_grid(functional_params, n, rng): |
| def get_range(vals, margin=0.2): |
| v_min, v_max = min(vals), max(vals) |
| if v_min == v_max: |
| |
| offset = abs(v_min) * margin if v_min != 0 else margin |
| return v_min - offset, v_max + offset |
| return v_min, v_max |
|
|
| bx1_min, bx1_max = get_range([p["boundary_x1"] for p in functional_params]) |
| bx2_min, bx2_max = get_range([p["boundary_x2"] for p in functional_params]) |
| ls_min, ls_max = get_range([p["left_slope"] for p in functional_params]) |
| ms_min, ms_max = get_range([p["mid_slope"] for p in functional_params]) |
| rs_min, rs_max = get_range([p["right_slope"] for p in functional_params]) |
| yb_min, yb_max = get_range([p["y_boundary2"] for p in functional_params]) |
|
|
| side = max(2, int(n ** (1.0/6.0)) + 1) |
|
|
| grid = [] |
| for bx1i in np.linspace(bx1_min, bx1_max, side): |
| for bx2i in np.linspace(bx2_min, bx2_max, side): |
| for lsi in np.linspace(ls_min, ls_max, side): |
| for msi in np.linspace(ms_min, ms_max, side): |
| for rsi in np.linspace(rs_min, rs_max, side): |
| for ybi in np.linspace(yb_min, yb_max, side): |
| grid.append({ |
| "boundary_x1": bx1i, "boundary_x2": bx2i, |
| "left_slope": lsi, "mid_slope": msi, |
| "right_slope": rsi, "y_boundary2": ybi, |
| }) |
|
|
| rng.shuffle(grid) |
| while len(grid) < n: |
| grid += grid |
| return grid[:n] |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| rng = np.random.default_rng(RANDOM_SEED) |
| out = Path(OUTPUT_DIR) |
| out.mkdir(exist_ok=True) |
|
|
| print("=" * 60) |
| print("Generating new neurons from existing ones (Multi-Boundary)") |
| print("=" * 60) |
|
|
| print("\n[1] Loading existing neurons...") |
| neurons = load_neurons(NEURON_SOURCE, SINGLE_FILE, MULTI_DIR) |
| print(f" {len(neurons)} source neuron(s)") |
|
|
| print("\n[2] Extracting functional parameters...") |
| functional_params = [] |
| for k, n in enumerate(neurons): |
| p = weights_to_functional(n["W1"], n["b1"], n["W2"], n["b2"]) |
| functional_params.append(p) |
| print(f" Neuron {k}: boundary1={p['boundary_x1']:+.4f} " |
| f"boundary2={p['boundary_x2']:+.4f} " |
| f"left_slope={p['left_slope']:+.4f} " |
| f"mid_slope={p['mid_slope']:+.4f} " |
| f"right_slope={p['right_slope']:+.4f} " |
| f"y@boundary2={p['y_boundary2']:+.4f}") |
|
|
| strategies = ( |
| ["gaussian", "interpolate", "grid"] if STRATEGY == "all" |
| else [STRATEGY] |
| ) |
|
|
| total_saved = 0 |
| summary = {} |
|
|
| for strat in strategies: |
| print(f"\n[3] Generating {N_GENERATE} neurons via '{strat}'...") |
|
|
| if strat == "gaussian": |
| new_params = strategy_gaussian(functional_params, N_GENERATE, rng) |
| elif strat == "interpolate": |
| new_params = strategy_interpolate(functional_params, N_GENERATE, rng) |
| elif strat == "grid": |
| new_params = strategy_grid(functional_params, N_GENERATE, rng) |
| else: |
| raise ValueError(f"Unknown strategy: {strat}") |
|
|
| strat_dir = out / strat |
| strat_dir.mkdir(exist_ok=True) |
|
|
| n_valid = 0 |
| for idx, p in enumerate(new_params): |
| if SINGLE_BOUNDARY_MODE: |
| |
| |
| |
| |
| W1, b1, W2, b2 = functional_to_weights_single( |
| p["boundary_x1"], p["left_slope"], p["right_slope"], |
| p["y_boundary2"], |
| ) |
| |
| p_single = { |
| "boundary_x": p["boundary_x1"], |
| "left_slope": p["left_slope"], |
| "right_slope": p["right_slope"], |
| "y_at_boundary": p["y_boundary2"], |
| } |
| valid, checks, recovered = validate_neuron_single(W1, b1, W2, b2, p_single) |
| else: |
| W1, b1, W2, b2 = functional_to_weights( |
| p["boundary_x1"], p["boundary_x2"], p["left_slope"], |
| p["mid_slope"], p["right_slope"], p["y_boundary2"], |
| ) |
| valid, checks, recovered = validate_neuron(W1, b1, W2, b2, p) |
|
|
| if valid: |
| save_file( |
| { |
| "layer1.weight": torch.tensor(W1), |
| "layer1.bias": torch.tensor(b1), |
| "layer2.weight": torch.tensor(W2), |
| "layer2.bias": torch.tensor(b2), |
| }, |
| |
| str(strat_dir / f"neuron_{idx:06d}.safetensors"), |
| ) |
| n_valid += 1 |
| else: |
| failed = [k for k, v in checks.items() if not v] |
| if idx < 10 or idx % 50000 == 0: |
| print(f" [skip] neuron_{idx:06d}: failed checks {failed}") |
|
|
| pct = 100 * n_valid / N_GENERATE |
| print(f" Saved {n_valid}/{N_GENERATE} valid neurons ({pct:.0f}%) to {strat_dir}/") |
| summary[strat] = {"generated": N_GENERATE, "valid": n_valid, "path": str(strat_dir)} |
| total_saved += n_valid |
|
|
| meta = { |
| "source_neurons": len(neurons), |
| "source_functional_params": functional_params, |
| "strategies": summary, |
| "total_saved": total_saved, |
| } |
| with open(out / "generation_meta.json", "w") as f: |
| json.dump(meta, f, indent=2) |
|
|
| print(f"\n{'=' * 60}") |
| print(f"Total neurons generated: {total_saved}") |
| print(f"Metadata saved to {out}/generation_meta.json") |
| print(f"\nTo use generated neurons in append_neurons_to_t5.py:") |
| print(f" NEURON_SOURCE = 'multi'") |
| print(f" MULTI_DIR = '{out}/gaussian' # or interpolate / grid") |
| print(f"{'=' * 60}") |