Spaces:
Running on Zero
Running on Zero
| """ | |
| mm_grad.py -- pure-numpy forward + backward (REINFORCE gradient) for the Modular | |
| Mind policy, so the boss can be **finetuned from real player data on a CPU** with | |
| no torch at runtime. | |
| The math is identical to mm_torch.ModularMindPolicy, hand-differentiated so a | |
| gradient step is a few thousand FLOPs (microseconds). Verified against torch | |
| autograd in test_grad() to <1e-6. | |
| Pipeline: | |
| player plays a fight -> browser logs (state, action, bossHP, playerHP) per boss | |
| decision + who died -> /learn -> we rebuild the per-step rewards (damage dealt | |
| - taken, + kill/- death), compute REINFORCE returns, and take one Adam step that | |
| nudges the policy toward what worked against real humans. A frozen copy of the | |
| sim-trained weights is kept as an anchor (small pull-back) so it can't drift far. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| from features import ACTIONS, NF, extract_features, legal_mask | |
| from modular_mind import SPEC_DEFS, D_LATENT, H | |
| NA = len(ACTIONS) | |
| EPS = 1e-5 | |
| def _ln_fwd(x, w, b): | |
| mu = x.mean() | |
| var = ((x - mu) ** 2).mean() | |
| std = np.sqrt(var + EPS) | |
| xhat = (x - mu) / std | |
| return xhat * w + b, (xhat, std, w) | |
| def _ln_bwd(gy, cache): | |
| xhat, std, w = cache | |
| n = xhat.shape[0] | |
| gw = gy * xhat | |
| gb = gy.copy() | |
| gxhat = gy * w | |
| gx = (gxhat - gxhat.mean() - xhat * (gxhat * xhat).mean()) / std | |
| return gx, gw, gb | |
| def _relu(x): | |
| return np.maximum(x, 0.0) | |
| class OnlineLearner: | |
| """Holds the live weights + Adam state; updates them from player trajectories.""" | |
| def __init__(self, weights, lr=5e-3, gamma=0.97, anchor_pull=0.02, | |
| w_deal=6.0, w_take=5.0, time_pen=0.01, entropy_coef=0.01): | |
| self.W = {k: v.astype(np.float64).copy() for k, v in weights.items()} | |
| self.anchor = {k: v.copy() for k, v in self.W.items()} # sim-trained anchor | |
| self.lr, self.gamma, self.anchor_pull = lr, gamma, anchor_pull | |
| self.w_deal, self.w_take, self.time_pen = w_deal, w_take, time_pen | |
| self.entropy_coef = entropy_coef | |
| self.owns = [ACTIONS.index(o) if o else None for _, o, _ in SPEC_DEFS] | |
| self.m = {k: np.zeros_like(v) for k, v in self.W.items()} | |
| self.v = {k: np.zeros_like(v) for k, v in self.W.items()} | |
| self.t = 0 | |
| # ---- forward with cached intermediates ------------------------------- | |
| def _forward(self, f): | |
| W = self.W | |
| hs, lats, drives = [], [], np.zeros(NA) | |
| for i, owns in enumerate(self.owns): | |
| pre = W[f"s{i}_fc1_w"] @ f + W[f"s{i}_fc1_b"] | |
| h = np.tanh(pre) | |
| hs.append(h) | |
| lat = W[f"s{i}_lat_w"] @ h + W[f"s{i}_lat_b"] | |
| lats.append(lat) | |
| if owns is not None: | |
| drives[owns] += W[f"s{i}_drv_w"][0] @ h + W[f"s{i}_drv_b"][0] | |
| z = np.sum(lats, axis=0) | |
| zn, ln_in_c = _ln_fwd(z, W["link_ni_w"], W["link_ni_b"]) | |
| pre_g = W["link_g"] @ zn | |
| g_act = _relu(pre_g) | |
| v_act = W["link_v"] @ zn | |
| reglu = g_act * v_act | |
| out = W["link_d"] @ reglu | |
| shared, ln_out_c = _ln_fwd(out + z, W["link_no_w"], W["link_no_b"]) | |
| modulation = W["coord_w"] @ shared + W["coord_b"] | |
| logits = drives + modulation | |
| cache = dict(f=f, hs=hs, lats=lats, z=z, zn=zn, ln_in_c=ln_in_c, pre_g=pre_g, | |
| g_act=g_act, v_act=v_act, reglu=reglu, out=out, shared=shared, | |
| ln_out_c=ln_out_c) | |
| return logits, cache | |
| # ---- backward: accumulate grads of (advantage * -logpi - H) ---------- | |
| def _backward(self, cache, g_logits, grads): | |
| W = self.W | |
| # coordinator | |
| grads["coord_w"] += np.outer(g_logits, cache["shared"]) | |
| grads["coord_b"] += g_logits | |
| g_shared = W["coord_w"].T @ g_logits | |
| # owned-action drives | |
| g_drive = {} | |
| for i, owns in enumerate(self.owns): | |
| if owns is not None: | |
| g_drive[i] = g_logits[owns] | |
| # out + z layernorm | |
| g_outz, gw, gb = _ln_bwd(g_shared, cache["ln_out_c"]) | |
| grads["link_no_w"] += gw | |
| grads["link_no_b"] += gb | |
| g_out = g_outz | |
| g_z = g_outz.copy() | |
| # out = Wd @ reglu | |
| grads["link_d"] += np.outer(g_out, cache["reglu"]) | |
| g_reglu = W["link_d"].T @ g_out | |
| # reglu = relu(Wg@zn) * (Wv@zn) | |
| g_g_act = g_reglu * cache["v_act"] | |
| g_v_act = g_reglu * cache["g_act"] | |
| g_pre_g = g_g_act * (cache["pre_g"] > 0) | |
| grads["link_g"] += np.outer(g_pre_g, cache["zn"]) | |
| grads["link_v"] += np.outer(g_v_act, cache["zn"]) | |
| g_zn = W["link_g"].T @ g_pre_g + W["link_v"].T @ g_v_act | |
| # zn = layernorm(z) | |
| g_z_ln, gw, gb = _ln_bwd(g_zn, cache["ln_in_c"]) | |
| grads["link_ni_w"] += gw | |
| grads["link_ni_b"] += gb | |
| g_z += g_z_ln | |
| # z = sum(lat_i) -> each specialist | |
| for i, owns in enumerate(self.owns): | |
| h = cache["hs"][i] | |
| g_lat = g_z | |
| grads[f"s{i}_lat_w"] += np.outer(g_lat, h) | |
| grads[f"s{i}_lat_b"] += g_lat | |
| g_h = W[f"s{i}_lat_w"].T @ g_lat | |
| if owns is not None: | |
| grads[f"s{i}_drv_w"][0] += g_drive[i] * h | |
| grads[f"s{i}_drv_b"][0] += g_drive[i] | |
| g_h = g_h + W[f"s{i}_drv_w"][0] * g_drive[i] | |
| g_pre = g_h * (1.0 - h * h) | |
| grads[f"s{i}_fc1_w"] += np.outer(g_pre, cache["f"]) | |
| grads[f"s{i}_fc1_b"] += g_pre | |
| def logpi_grad(self, f, action, advantage, mask): | |
| """Grad of advantage * -log pi(action|state) (+ entropy bonus), accumulated.""" | |
| logits, cache = self._forward(f) | |
| masked = np.where(mask > 0.5, logits, -1e9) | |
| p = np.exp(masked - masked.max()) | |
| p = p / p.sum() | |
| onehot = np.zeros(NA) | |
| onehot[action] = 1.0 | |
| # d(-adv*logpi)/dlogits = adv*(p - onehot); entropy bonus grad = ent_coef*(p*(logp+H_)...) | |
| g_logits = advantage * (p - onehot) | |
| # entropy regularizer (encourage exploration): d(-ent_coef*H)/dlogits | |
| with np.errstate(divide="ignore"): | |
| logp = np.where(p > 1e-12, np.log(p), 0.0) | |
| ent_term = self.entropy_coef * p * (logp + (p * (-logp)).sum()) | |
| g_logits = g_logits + np.where(mask > 0.5, ent_term, 0.0) | |
| grads = {k: np.zeros_like(v) for k, v in self.W.items()} | |
| self._backward(cache, g_logits, grads) | |
| return grads | |
| def _trajectory_rewards(self, steps, result): | |
| """Rebuild per-decision rewards from logged HP (damage dealt - taken).""" | |
| n = len(steps) | |
| rews = np.zeros(n) | |
| for t in range(n): | |
| nb = steps[t + 1]["bossHP"] if t + 1 < n else (0.0 if result.get("bossDied") else steps[t]["bossHP"]) | |
| npl = steps[t + 1]["playerHP"] if t + 1 < n else (0.0 if result.get("playerDied") else steps[t]["playerHP"]) | |
| dealt = max(0.0, steps[t]["playerHP"] - npl) | |
| taken = max(0.0, steps[t]["bossHP"] - nb) | |
| rews[t] = dealt * self.w_deal - taken * self.w_take - self.time_pen | |
| if result.get("playerDied"): | |
| rews[-1] += 8.0 | |
| elif result.get("bossDied"): | |
| rews[-1] -= 5.0 | |
| return rews | |
| def update(self, trajectories): | |
| """trajectories: list of {steps:[{state,action,bossHP,playerHP}], result:{}}. | |
| Returns dict of stats. Mutates self.W in place (one Adam step).""" | |
| grads = {k: np.zeros_like(v) for k, v in self.W.items()} | |
| all_returns, nsteps = [], 0 | |
| # first pass: gather returns for a baseline | |
| per_traj = [] | |
| for tr in trajectories: | |
| steps = tr.get("steps", []) | |
| if len(steps) < 2: | |
| continue | |
| rews = self._trajectory_rewards(steps, tr.get("result", {})) | |
| G = np.zeros(len(rews)) | |
| acc = 0.0 | |
| for t in reversed(range(len(rews))): | |
| acc = rews[t] + self.gamma * acc | |
| G[t] = acc | |
| per_traj.append((steps, G)) | |
| all_returns.extend(G.tolist()) | |
| if not per_traj: | |
| return {"updated": False, "reason": "not enough data"} | |
| baseline = float(np.mean(all_returns)) | |
| adv_std = float(np.std(all_returns)) + 1e-6 | |
| # second pass: accumulate gradient | |
| for steps, G in per_traj: | |
| for t, st in enumerate(steps): | |
| s = st["state"] | |
| f = extract_features(s).astype(np.float64) | |
| mask = legal_mask(s) | |
| action = ACTIONS.index(st["action"]) if isinstance(st["action"], str) else int(st["action"]) | |
| adv = (G[t] - baseline) / adv_std | |
| g = self.logpi_grad(f, action, adv, mask) | |
| for k in grads: | |
| grads[k] += g[k] | |
| nsteps += 1 | |
| # average + anchor pull-back (stay near the sim-trained policy) | |
| self.t += 1 | |
| b1, b2 = 0.9, 0.999 | |
| for k in self.W: | |
| gk = grads[k] / max(1, nsteps) + self.anchor_pull * (self.W[k] - self.anchor[k]) | |
| self.m[k] = b1 * self.m[k] + (1 - b1) * gk | |
| self.v[k] = b2 * self.v[k] + (1 - b2) * (gk * gk) | |
| mhat = self.m[k] / (1 - b1 ** self.t) | |
| vhat = self.v[k] / (1 - b2 ** self.t) | |
| self.W[k] -= self.lr * mhat / (np.sqrt(vhat) + 1e-8) | |
| return {"updated": True, "steps": nsteps, "trajectories": len(per_traj), | |
| "avg_return": round(baseline, 3)} | |
| def test_grad(): | |
| """Verify the numpy logpi-gradient matches torch autograd.""" | |
| import torch | |
| from mm_torch import ModularMindPolicy | |
| m = ModularMindPolicy().double() | |
| m.export_npz("_gradchk.npz") | |
| W = {k: v for k, v in np.load("_gradchk.npz").items()} | |
| learner = OnlineLearner(W, entropy_coef=0.0) | |
| rng = np.random.default_rng(0) | |
| maxrel = 0.0 | |
| for _ in range(5): | |
| f = rng.normal(size=NF) | |
| action = int(rng.integers(NA)) | |
| mask = np.ones(NA) | |
| # numpy grad of -logpi(action) (advantage=1) | |
| gnp = learner.logpi_grad(f, action, 1.0, mask) | |
| # torch grad | |
| m.zero_grad() | |
| x = torch.tensor(f, dtype=torch.float64).unsqueeze(0) | |
| logits, _ = m(x) | |
| logp = torch.log_softmax(logits, dim=-1)[0, action] | |
| (-logp).backward() | |
| # compare coordinator weight grad as a representative | |
| gt = m.coordinator.weight.grad.numpy() | |
| rel = np.abs(gnp["coord_w"] - gt).max() / (np.abs(gt).max() + 1e-9) | |
| maxrel = max(maxrel, rel) | |
| import os | |
| os.remove("_gradchk.npz") | |
| print(f"max relative grad error (coord_w) vs torch: {maxrel:.2e}") | |
| return maxrel | |
| if __name__ == "__main__": | |
| test_grad() | |