| |
| """Definitive RLT encoder gate: do SUCCESS vs FAILURE z_rl separate? |
| |
| The RL Token paper validates the token by showing success frames form a smooth |
| manifold while failures (failed grasps / tower collapses) cluster apart. This is |
| the real test (stronger than the success-only smoothness gate in tsne_gate.py). |
| |
| success = the 44 teleop demos -> encoder_cache_prefix/ |
| failure = frozen-baseline rollouts -> encoder_cache_fail/ (SR~0) |
| |
| Reports: linear separability (LogisticRegression 5-fold CV accuracy on z_rl) and |
| silhouette score, plus a t-SNE colored by class. High CV-acc + clear visual |
| split = z_rl encodes task-success information => good RL state. |
| |
| CPU-only (won't fight the GPU policy server). Run AFTER collecting failures: |
| ./lerobot/.venv/bin/python gate_success_fail.py --ckpt checkpoints/rl_token_encoder_FINAL.pt |
| """ |
| from __future__ import annotations |
| import argparse, glob, os |
| import numpy as np, torch |
| import matplotlib; matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
| from sklearn.decomposition import PCA |
| from sklearn.manifold import TSNE |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.model_selection import cross_val_score |
| from sklearn.metrics import silhouette_score |
| from rl_token_encoder import RLTokenAutoencoder, RLTokenConfig |
|
|
|
|
| def encode_dir(ae, d, cap): |
| fs = sorted(glob.glob(os.path.join(d, "*.npz"))) |
| if not fs: |
| return np.zeros((0, 2560), np.float32) |
| if len(fs) > cap: |
| fs = fs[:: len(fs) // cap] |
| Z = [] |
| with torch.no_grad(): |
| for f in fs: |
| e = torch.from_numpy(np.load(f)["embeddings"].astype(np.float32))[None] |
| m = torch.ones(1, e.shape[1], dtype=torch.bool) |
| Z.append(ae.encode(e, m)[0].numpy()) |
| return np.stack(Z) |
|
|
|
|
| def main(): |
| p = argparse.ArgumentParser() |
| p.add_argument("--ckpt", default="checkpoints/rl_token_encoder_FINAL.pt") |
| p.add_argument("--success-dir", default="./encoder_cache_prefix") |
| p.add_argument("--fail-dir", default="./encoder_cache_fail") |
| p.add_argument("--weights", default="ema", choices=["ema", "model"]) |
| p.add_argument("--cap", type=int, default=800, help="max points per class (balanced)") |
| p.add_argument("--out", default="outputs/gate_success_fail.png") |
| args = p.parse_args() |
|
|
| ck = torch.load(args.ckpt, map_location="cpu") |
| ae = RLTokenAutoencoder(RLTokenConfig(dim=2560)); ae.load_state_dict(ck[args.weights]); ae.eval() |
| print(f"loaded {args.ckpt} ({args.weights})") |
|
|
| Zs = encode_dir(ae, args.success_dir, args.cap) |
| Zf = encode_dir(ae, args.fail_dir, args.cap) |
| print(f"success z_rl: {len(Zs)} failure z_rl: {len(Zf)}") |
| if len(Zf) < 10: |
| print("⚠️ <10 failure shards — collect baseline rollouts into", args.fail_dir, "first."); return |
|
|
| n = min(len(Zs), len(Zf)) |
| Zs, Zf = Zs[:n], Zf[:n] |
| X = np.concatenate([Zs, Zf]); y = np.array([0] * n + [1] * n) |
|
|
| |
| clf = LogisticRegression(max_iter=2000, C=1.0) |
| acc = cross_val_score(clf, X, y, cv=5, scoring="accuracy").mean() |
| sil = silhouette_score(X, y) |
| print(f"\nSEPARATION:") |
| print(f" LogReg 5-fold CV accuracy = {acc:.1%} (50%=indistinguishable, >80%=clearly separable)") |
| print(f" silhouette (by class) = {sil:.3f} (>0 = classes form distinct groups)") |
|
|
| |
| Xp = PCA(n_components=min(50, X.shape[0])).fit_transform(X) |
| emb = TSNE(n_components=2, perplexity=30, init="pca", random_state=0).fit_transform(Xp) |
| plt.figure(figsize=(7, 6)) |
| plt.scatter(emb[:n, 0], emb[:n, 1], c="tab:green", s=10, label="success (demos)", alpha=0.6) |
| plt.scatter(emb[n:, 0], emb[n:, 1], c="tab:red", s=10, label="failure (baseline)", alpha=0.6) |
| plt.legend(); plt.title(f"z_rl: success vs failure (LogReg CV acc {acc:.0%}, silhouette {sil:.2f})") |
| os.makedirs(os.path.dirname(args.out), exist_ok=True) |
| plt.tight_layout(); plt.savefig(args.out, dpi=120) |
| print(f"saved {args.out}") |
| print("\nGATE:", "✅ z_rl separates success from failure — good RL state" if acc > 0.8 |
| else "⚠️ weak separation — z_rl may not capture task success cleanly") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|