ContourFuse / genneuron.py
CompressedGemma's picture
Generate functional neurons from source neuron
eb88431 verified
Raw
History Blame Contribute Delete
16.9 kB
#!/usr/bin/env python3
"""
Generate new neurons by sampling in functional parameter space.
Each neuron is a piecewise-linear function fully described by 6 values:
(boundary_x1, boundary_x2, left_slope, mid_slope, right_slope, y_boundary2)
We extract these from your existing neurons, fit a distribution over them,
sample new combinations, and reconstruct valid W1/b1/W2/b2 for each.
"""
import numpy as np
import torch
from safetensors.torch import load_file, save_file
from pathlib import Path
import json
import argparse
import os
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
NEURON_SOURCE = "multi" # "single" | "multi"
SINGLE_FILE = "test_mlp_hf/model.safetensors"
MULTI_DIR = "source_llm_neurons"
SINGLE_BOUNDARY_MODE = True # Generate single-boundary neurons (2 active) instead of double-boundary (3 active)
N_GENERATE = 500 # generate 500 neurons
OUTPUT_DIR = "generated_neurons"
RANDOM_SEED = 42
# Generation strategy:
# "gaussian" — fit mean/cov to existing neurons, sample from N(mu, sigma)
# "interpolate" — convex combinations of pairs of existing neurons
# "grid" — systematic grid over the observed parameter ranges
# "all" — produce all three sets
STRATEGY = "all"
# ---------------------------------------------------------------------------
# 1. Load existing neurons
# ---------------------------------------------------------------------------
def load_neurons(source, single_file, multi_dir):
neurons = []
if source == "single":
w = load_file(single_file)
neurons.append({k: v.float().numpy() for k, v in {
"W1": w["layer1.weight"],
"b1": w["layer1.bias"],
"W2": w["layer2.weight"],
"b2": w["layer2.bias"],
}.items()})
elif source == "multi":
for f in sorted(Path(multi_dir).glob("neuron_*.safetensors")):
w = load_file(str(f))
neurons.append({k: v.float().numpy() for k, v in {
"W1": w["layer1.weight"],
"b1": w["layer1.bias"],
"W2": w["layer2.weight"],
"b2": w["layer2.bias"],
}.items()})
return neurons
# ---------------------------------------------------------------------------
# 2. Extract functional parameters from raw weights
# ---------------------------------------------------------------------------
def weights_to_functional(W1, b1, W2, b2, x_probe_range=(-2.0, 2.0), n_probe=200000):
xs = np.linspace(x_probe_range[0], x_probe_range[1], n_probe)
def forward(x_scalar):
x = np.array([[x_scalar]], dtype=np.float32)
h = np.maximum(0, x @ W1.T + b1)
y = h @ W2.T + b2
return float(y.squeeze())
ys = np.array([forward(x) for x in xs])
slopes = np.gradient(ys, xs)
slope_changes = np.abs(np.gradient(slopes, xs))
peak_window = int(n_probe * 0.1)
idx1 = int(np.argmax(slope_changes))
masked_changes = slope_changes.copy()
l_mask = max(0, idx1 - peak_window)
r_mask = min(n_probe, idx1 + peak_window)
masked_changes[l_mask:r_mask] = 0.0
idx2 = int(np.argmax(masked_changes))
if idx1 > idx2:
idx1, idx2 = idx2, idx1
boundary_x1 = float(xs[idx1])
boundary_x2 = float(xs[idx2])
margin = int(n_probe * 0.03)
idx_l = max(0, idx1 - margin)
idx_m1 = min(n_probe - 1, idx1 + margin)
idx_m2 = max(0, idx2 - margin)
idx_r = min(n_probe - 1, idx2 + margin)
left_slope = float(np.mean(slopes[:idx_l])) if idx_l > 0 else float(slopes[0])
if idx_m2 > idx_m1:
mid_slope = float(np.mean(slopes[idx_m1:idx_m2]))
else:
mid_slope = float(slopes[(idx1 + idx2) // 2])
right_slope = float(np.mean(slopes[idx_r:])) if idx_r < n_probe - 1 else float(slopes[-1])
y_boundary2 = float(ys[idx2])
return {
"boundary_x1": boundary_x1,
"boundary_x2": boundary_x2,
"left_slope": left_slope,
"mid_slope": mid_slope,
"right_slope": right_slope,
"y_boundary2": y_boundary2,
}
# ---------------------------------------------------------------------------
# 3. Reconstruct weights from functional parameters
# ---------------------------------------------------------------------------
def functional_to_weights(boundary_x1, boundary_x2, left_slope, mid_slope, right_slope, y_boundary2,
n_hidden=8):
if boundary_x1 > boundary_x2:
boundary_x1, boundary_x2 = boundary_x2, boundary_x1
W1 = np.zeros((n_hidden, 1), dtype=np.float32)
b1 = np.zeros(n_hidden, dtype=np.float32)
W2 = np.zeros((1, n_hidden), dtype=np.float32)
b2 = np.zeros(1, dtype=np.float32)
# Neuron 0: always active, pure slope carrier
W1[0, 0] = 1.0
b1[0] = 100.0 # Ensures carrier stability during extreme negative activation outliers
W2[0, 0] = right_slope
# Neuron 1: active left of boundary_x1
W1[1, 0] = -1.0
b1[1] = boundary_x1
W2[0, 1] = -(left_slope - mid_slope)
# Neuron 2: active left of boundary_x2
W1[2, 0] = -1.0
b1[2] = boundary_x2
W2[0, 2] = -(mid_slope - right_slope)
target_y = y_boundary2
neuron0_out = W2[0, 0] * (W1[0, 0] * boundary_x2 + b1[0])
b2[0] = target_y - neuron0_out
return W1, b1, W2, b2
def functional_to_weights_single(boundary_x, left_slope, right_slope, y_at_boundary,
n_hidden=8):
"""Single-boundary version: only 2 active neurons (carrier + 1 transition)"""
W1 = np.zeros((n_hidden, 1), dtype=np.float32)
b1 = np.zeros(n_hidden, dtype=np.float32)
W2 = np.zeros((1, n_hidden), dtype=np.float32)
b2 = np.zeros(1, dtype=np.float32)
# Neuron 0: always active, pure slope carrier (carries right_slope)
W1[0, 0] = 1.0
b1[0] = 100.0
W2[0, 0] = right_slope
# Neuron 1: active left of boundary_x (adds left_slope - right_slope)
W1[1, 0] = -1.0
b1[1] = boundary_x
W2[0, 1] = -(left_slope - right_slope)
# Calculate b2 for continuity at boundary
target_y = y_at_boundary
neuron0_out = W2[0, 0] * (W1[0, 0] * boundary_x + b1[0])
b2[0] = target_y - neuron0_out
return W1, b1, W2, b2
# ---------------------------------------------------------------------------
# 4. Validate a generated neuron (analytical, not numerical gradient)
# ---------------------------------------------------------------------------
def _mlp_forward(x_scalar, W1, b1, W2, b2):
x = np.array([[x_scalar]], dtype=np.float32)
h = np.maximum(0.0, x @ W1.T + b1)
return float((h @ W2.T + b2).squeeze())
def validate_neuron(W1, b1, W2, b2, params, tol=0.05):
bx1 = params["boundary_x1"]
bx2 = params["boundary_x2"]
# Dynamically scale probes so we don't accidentally step over boundaries
# when random generation places bx1 and bx2 extremely close together.
dist = max(abs(bx2 - bx1), 1e-6)
eps = min(1e-3, dist / 10.0)
gap = min(0.05, dist / 4.0)
y_at_bx2 = _mlp_forward(bx2, W1, b1, W2, b2)
slope_left = (_mlp_forward(bx1 - gap, W1, b1, W2, b2) -
_mlp_forward(bx1 - gap - eps, W1, b1, W2, b2)) / eps
x_mid = (bx1 + bx2) / 2
slope_mid = (_mlp_forward(x_mid + eps, W1, b1, W2, b2) -
_mlp_forward(x_mid, W1, b1, W2, b2)) / eps
slope_right = (_mlp_forward(bx2 + gap + eps, W1, b1, W2, b2) -
_mlp_forward(bx2 + gap, W1, b1, W2, b2)) / eps
recovered = {
"boundary_x1": bx1,
"boundary_x2": bx2,
"left_slope": slope_left,
"mid_slope": slope_mid,
"right_slope": slope_right,
"y_boundary2": y_at_bx2,
}
checks = {
"left_slope": abs(slope_left - params["left_slope"]) < tol,
"mid_slope": abs(slope_mid - params["mid_slope"]) < tol,
"right_slope": abs(slope_right - params["right_slope"]) < tol,
"y_boundary2": abs(y_at_bx2 - params["y_boundary2"]) < tol * 5,
}
return all(checks.values()), checks, recovered
def validate_neuron_single(W1, b1, W2, b2, params, tol=0.05):
"""Validate single-boundary neuron (only 2 slopes)"""
bx = params["boundary_x"]
eps = 1e-3
gap = 0.05
y_at_bx = _mlp_forward(bx, W1, b1, W2, b2)
slope_left = (_mlp_forward(bx - gap, W1, b1, W2, b2) -
_mlp_forward(bx - gap - eps, W1, b1, W2, b2)) / eps
slope_right = (_mlp_forward(bx + gap + eps, W1, b1, W2, b2) -
_mlp_forward(bx + gap, W1, b1, W2, b2)) / eps
recovered = {
"boundary_x": bx,
"left_slope": slope_left,
"right_slope": slope_right,
"y_at_boundary": y_at_bx,
}
checks = {
"left_slope": abs(slope_left - params["left_slope"]) < tol,
"right_slope": abs(slope_right - params["right_slope"]) < tol,
"y_at_boundary": abs(y_at_bx - params["y_at_boundary"]) < tol * 5,
}
return all(checks.values()), checks, recovered
# ---------------------------------------------------------------------------
# 5. Generation strategies
# ---------------------------------------------------------------------------
def strategy_gaussian(functional_params, n, rng):
mat = np.array([
[p["boundary_x1"], p["boundary_x2"], p["left_slope"], p["mid_slope"], p["right_slope"], p["y_boundary2"]]
for p in functional_params
])
mu = mat.mean(axis=0)
cov = np.cov(mat.T) if len(mat) > 1 else np.eye(6) * 0.1
cov += np.eye(6) * 1e-4
samples = rng.multivariate_normal(mu, cov, size=n)
return [
{"boundary_x1": s[0], "boundary_x2": s[1], "left_slope": s[2],
"mid_slope": s[3], "right_slope": s[4], "y_boundary2": s[5]}
for s in samples
]
def strategy_interpolate(functional_params, n, rng):
results = []
fp = functional_params
for _ in range(n):
i, j = rng.choice(len(fp), size=2, replace=True)
t = rng.uniform(0, 1)
results.append({
k: (1 - t) * fp[i][k] + t * fp[j][k]
for k in fp[i]
})
return results
def strategy_grid(functional_params, n, rng):
def get_range(vals, margin=0.2):
v_min, v_max = min(vals), max(vals)
if v_min == v_max:
# Prevent 0-variance collapse by injecting a spread for single neurons
offset = abs(v_min) * margin if v_min != 0 else margin
return v_min - offset, v_max + offset
return v_min, v_max
bx1_min, bx1_max = get_range([p["boundary_x1"] for p in functional_params])
bx2_min, bx2_max = get_range([p["boundary_x2"] for p in functional_params])
ls_min, ls_max = get_range([p["left_slope"] for p in functional_params])
ms_min, ms_max = get_range([p["mid_slope"] for p in functional_params])
rs_min, rs_max = get_range([p["right_slope"] for p in functional_params])
yb_min, yb_max = get_range([p["y_boundary2"] for p in functional_params])
side = max(2, int(n ** (1.0/6.0)) + 1)
grid = []
for bx1i in np.linspace(bx1_min, bx1_max, side):
for bx2i in np.linspace(bx2_min, bx2_max, side):
for lsi in np.linspace(ls_min, ls_max, side):
for msi in np.linspace(ms_min, ms_max, side):
for rsi in np.linspace(rs_min, rs_max, side):
for ybi in np.linspace(yb_min, yb_max, side):
grid.append({
"boundary_x1": bx1i, "boundary_x2": bx2i,
"left_slope": lsi, "mid_slope": msi,
"right_slope": rsi, "y_boundary2": ybi,
})
rng.shuffle(grid)
while len(grid) < n:
grid += grid
return grid[:n]
# ---------------------------------------------------------------------------
# 6. Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
rng = np.random.default_rng(RANDOM_SEED)
out = Path(OUTPUT_DIR)
out.mkdir(exist_ok=True)
print("=" * 60)
print("Generating new neurons from existing ones (Multi-Boundary)")
print("=" * 60)
print("\n[1] Loading existing neurons...")
neurons = load_neurons(NEURON_SOURCE, SINGLE_FILE, MULTI_DIR)
print(f" {len(neurons)} source neuron(s)")
print("\n[2] Extracting functional parameters...")
functional_params = []
for k, n in enumerate(neurons):
p = weights_to_functional(n["W1"], n["b1"], n["W2"], n["b2"])
functional_params.append(p)
print(f" Neuron {k}: boundary1={p['boundary_x1']:+.4f} "
f"boundary2={p['boundary_x2']:+.4f} "
f"left_slope={p['left_slope']:+.4f} "
f"mid_slope={p['mid_slope']:+.4f} "
f"right_slope={p['right_slope']:+.4f} "
f"y@boundary2={p['y_boundary2']:+.4f}")
strategies = (
["gaussian", "interpolate", "grid"] if STRATEGY == "all"
else [STRATEGY]
)
total_saved = 0
summary = {}
for strat in strategies:
print(f"\n[3] Generating {N_GENERATE} neurons via '{strat}'...")
if strat == "gaussian":
new_params = strategy_gaussian(functional_params, N_GENERATE, rng)
elif strat == "interpolate":
new_params = strategy_interpolate(functional_params, N_GENERATE, rng)
elif strat == "grid":
new_params = strategy_grid(functional_params, N_GENERATE, rng)
else:
raise ValueError(f"Unknown strategy: {strat}")
strat_dir = out / strat
strat_dir.mkdir(exist_ok=True)
n_valid = 0
for idx, p in enumerate(new_params):
if SINGLE_BOUNDARY_MODE:
# Convert double-boundary params to single-boundary
# Use boundary_x1 as the single boundary, ignore boundary_x2
# Use left_slope and right_slope, ignore mid_slope
# Estimate y_at_boundary from y_boundary2
W1, b1, W2, b2 = functional_to_weights_single(
p["boundary_x1"], p["left_slope"], p["right_slope"],
p["y_boundary2"],
)
# Create single-boundary params for validation
p_single = {
"boundary_x": p["boundary_x1"],
"left_slope": p["left_slope"],
"right_slope": p["right_slope"],
"y_at_boundary": p["y_boundary2"],
}
valid, checks, recovered = validate_neuron_single(W1, b1, W2, b2, p_single)
else:
W1, b1, W2, b2 = functional_to_weights(
p["boundary_x1"], p["boundary_x2"], p["left_slope"],
p["mid_slope"], p["right_slope"], p["y_boundary2"],
)
valid, checks, recovered = validate_neuron(W1, b1, W2, b2, p)
if valid:
save_file(
{
"layer1.weight": torch.tensor(W1),
"layer1.bias": torch.tensor(b1),
"layer2.weight": torch.tensor(W2),
"layer2.bias": torch.tensor(b2),
},
# Padded to 6 digits (06d) to prevent python alphabetical sorting issues downstream
str(strat_dir / f"neuron_{idx:06d}.safetensors"),
)
n_valid += 1
else:
failed = [k for k, v in checks.items() if not v]
if idx < 10 or idx % 50000 == 0:
print(f" [skip] neuron_{idx:06d}: failed checks {failed}")
pct = 100 * n_valid / N_GENERATE
print(f" Saved {n_valid}/{N_GENERATE} valid neurons ({pct:.0f}%) to {strat_dir}/")
summary[strat] = {"generated": N_GENERATE, "valid": n_valid, "path": str(strat_dir)}
total_saved += n_valid
meta = {
"source_neurons": len(neurons),
"source_functional_params": functional_params,
"strategies": summary,
"total_saved": total_saved,
}
with open(out / "generation_meta.json", "w") as f:
json.dump(meta, f, indent=2)
print(f"\n{'=' * 60}")
print(f"Total neurons generated: {total_saved}")
print(f"Metadata saved to {out}/generation_meta.json")
print(f"\nTo use generated neurons in append_neurons_to_t5.py:")
print(f" NEURON_SOURCE = 'multi'")
print(f" MULTI_DIR = '{out}/gaussian' # or interpolate / grid")
print(f"{'=' * 60}")