File size: 4,267 Bytes
3a8e55e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/env python3
"""Definitive RLT encoder gate: do SUCCESS vs FAILURE z_rl separate?

The RL Token paper validates the token by showing success frames form a smooth
manifold while failures (failed grasps / tower collapses) cluster apart. This is
the real test (stronger than the success-only smoothness gate in tsne_gate.py).

  success = the 44 teleop demos        -> encoder_cache_prefix/
  failure = frozen-baseline rollouts   -> encoder_cache_fail/   (SR~0)

Reports: linear separability (LogisticRegression 5-fold CV accuracy on z_rl) and
silhouette score, plus a t-SNE colored by class. High CV-acc + clear visual
split = z_rl encodes task-success information => good RL state.

CPU-only (won't fight the GPU policy server). Run AFTER collecting failures:
  ./lerobot/.venv/bin/python gate_success_fail.py --ckpt checkpoints/rl_token_encoder_FINAL.pt
"""
from __future__ import annotations
import argparse, glob, os
import numpy as np, torch
import matplotlib; matplotlib.use("Agg")
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.metrics import silhouette_score
from rl_token_encoder import RLTokenAutoencoder, RLTokenConfig


def encode_dir(ae, d, cap):
    fs = sorted(glob.glob(os.path.join(d, "*.npz")))
    if not fs:
        return np.zeros((0, 2560), np.float32)
    if len(fs) > cap:
        fs = fs[:: len(fs) // cap]
    Z = []
    with torch.no_grad():
        for f in fs:
            e = torch.from_numpy(np.load(f)["embeddings"].astype(np.float32))[None]
            m = torch.ones(1, e.shape[1], dtype=torch.bool)
            Z.append(ae.encode(e, m)[0].numpy())
    return np.stack(Z)


def main():
    p = argparse.ArgumentParser()
    p.add_argument("--ckpt", default="checkpoints/rl_token_encoder_FINAL.pt")
    p.add_argument("--success-dir", default="./encoder_cache_prefix")
    p.add_argument("--fail-dir", default="./encoder_cache_fail")
    p.add_argument("--weights", default="ema", choices=["ema", "model"])
    p.add_argument("--cap", type=int, default=800, help="max points per class (balanced)")
    p.add_argument("--out", default="outputs/gate_success_fail.png")
    args = p.parse_args()

    ck = torch.load(args.ckpt, map_location="cpu")
    ae = RLTokenAutoencoder(RLTokenConfig(dim=2560)); ae.load_state_dict(ck[args.weights]); ae.eval()
    print(f"loaded {args.ckpt} ({args.weights})")

    Zs = encode_dir(ae, args.success_dir, args.cap)
    Zf = encode_dir(ae, args.fail_dir, args.cap)
    print(f"success z_rl: {len(Zs)}   failure z_rl: {len(Zf)}")
    if len(Zf) < 10:
        print("⚠️ <10 failure shards — collect baseline rollouts into", args.fail_dir, "first."); return

    n = min(len(Zs), len(Zf))  # balance
    Zs, Zf = Zs[:n], Zf[:n]
    X = np.concatenate([Zs, Zf]); y = np.array([0] * n + [1] * n)

    # 1) linear separability (the quantitative verdict)
    clf = LogisticRegression(max_iter=2000, C=1.0)
    acc = cross_val_score(clf, X, y, cv=5, scoring="accuracy").mean()
    sil = silhouette_score(X, y)
    print(f"\nSEPARATION:")
    print(f"  LogReg 5-fold CV accuracy = {acc:.1%}   (50%=indistinguishable, >80%=clearly separable)")
    print(f"  silhouette (by class)     = {sil:.3f}   (>0 = classes form distinct groups)")

    # 2) t-SNE colored by class
    Xp = PCA(n_components=min(50, X.shape[0])).fit_transform(X)
    emb = TSNE(n_components=2, perplexity=30, init="pca", random_state=0).fit_transform(Xp)
    plt.figure(figsize=(7, 6))
    plt.scatter(emb[:n, 0], emb[:n, 1], c="tab:green", s=10, label="success (demos)", alpha=0.6)
    plt.scatter(emb[n:, 0], emb[n:, 1], c="tab:red", s=10, label="failure (baseline)", alpha=0.6)
    plt.legend(); plt.title(f"z_rl: success vs failure  (LogReg CV acc {acc:.0%}, silhouette {sil:.2f})")
    os.makedirs(os.path.dirname(args.out), exist_ok=True)
    plt.tight_layout(); plt.savefig(args.out, dpi=120)
    print(f"saved {args.out}")
    print("\nGATE:", "✅ z_rl separates success from failure — good RL state" if acc > 0.8
          else "⚠️ weak separation — z_rl may not capture task success cleanly")


if __name__ == "__main__":
    main()