Spaces:
Sleeping
Sleeping
| # data_utils.py | |
| import numpy as np | |
| from datasets import load_dataset | |
| # --------- Hashing vectorizer (no sklearn) ---------- | |
| def hash_vectorize(texts, n_features=4096, seed=1234): | |
| rng = np.random.RandomState(seed) | |
| # Simple 2-gram hashing | |
| feats = np.zeros((len(texts), n_features), dtype=np.float32) | |
| for i, t in enumerate(texts): | |
| t = (t or "").lower() | |
| tokens = t.split() | |
| for j, tok in enumerate(tokens): | |
| h1 = (hash(tok) % n_features) | |
| feats[i, h1] += 1.0 | |
| if j+1 < len(tokens): | |
| bg = tok + "_" + tokens[j+1] | |
| h2 = (hash(bg) % n_features) | |
| feats[i, h2] += 1.0 | |
| # L2 norm | |
| nrm = np.linalg.norm(feats[i]) + 1e-8 | |
| feats[i] /= nrm | |
| return feats | |
| # --------- PIQA loader (tiny subsets) ---------- | |
| def load_piqa(subset=800, seed=42): | |
| ds = load_dataset("piqa") | |
| tr = ds["train"] | |
| va = ds["validation"] | |
| rng = np.random.RandomState(seed) | |
| idx_tr = rng.choice(len(tr), size=min(subset, len(tr)), replace=False) | |
| idx_va = rng.choice(len(va), size=min(max(subset//4, 200), len(va)), replace=False) | |
| def pack(rows, idxs): | |
| X_text, y = [], [] | |
| for k in idxs: | |
| p = rows[k] | |
| stem = p["goal"] or "" | |
| a, b = p["sol1"] or "", p["sol2"] or "" | |
| # Make two rows (stem+opt, label is which is correct) | |
| X_text += [stem + " " + a, stem + " " + b] | |
| y += [1 if p["label"]==0 else 0, 1 if p["label"]==1 else 0] | |
| return X_text, np.array(y, dtype=np.int64) | |
| Xtr_txt, ytr = pack(tr, idx_tr) | |
| Xva_txt, yva = pack(va, idx_va) | |
| return Xtr_txt, ytr, Xva_txt, yva | |
| # --------- HellaSwag loader (tiny subsets) ---------- | |
| def load_hellaswag(subset=800, seed=42): | |
| ds = load_dataset("hellaswag") | |
| tr = ds["train"] | |
| va = ds["validation"] | |
| rng = np.random.RandomState(seed) | |
| idx_tr = rng.choice(len(tr), size=min(subset, len(tr)), replace=False) | |
| idx_va = rng.choice(len(va), size=min(max(subset//4, 200), len(va)), replace=False) | |
| def pack(rows, idxs): | |
| X_text, y = [], [] | |
| for k in idxs: | |
| p = rows[k] | |
| ctx = (p["ctx"] or "") + " " + (p["ctx_a"] or "") | |
| label = int(p["label"]) | |
| # expand to 4 candidates; supervise one-vs-all over 4 rows | |
| endings = p["endings"] | |
| for i, e in enumerate(endings): | |
| X_text.append(ctx + " " + e) | |
| y.append(1 if i==label else 0) | |
| return X_text, np.array(y, dtype=np.int64) | |
| Xtr_txt, ytr = pack(tr, idx_tr) | |
| Xva_txt, yva = pack(va, idx_va) | |
| return Xtr_txt, ytr, Xva_txt, yva | |