| |
| """Feather-specific capability scan for durable checkpoints. |
| |
| This intentionally avoids transformer scale-law claims. It measures this model's own |
| readiness curve from checkpoints: continuation BPB, forced-choice cloze accuracy, |
| factual rank, exact-ish BLEU/ROUGE, and generation hygiene. |
| |
| Non-invasive: reads a local checkpoint or downloads one from the Hub; never touches a |
| running HF Job pod. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import math |
| import os |
| import re |
| import sys |
| import time |
| from collections import Counter |
| from pathlib import Path |
| from typing import Iterable |
|
|
| import torch |
|
|
| try: |
| sys.stdout.reconfigure(line_buffering=True) |
| except Exception: |
| pass |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| sys.path.insert(0, str(ROOT)) |
|
|
|
|
| def _tokenize_words(text: str) -> list[str]: |
| return re.findall(r"[A-Za-z0-9']+|[^\w\s]", text.lower()) |
|
|
|
|
| def rouge_l(pred: str, ref: str) -> float: |
| a, b = _tokenize_words(pred), _tokenize_words(ref) |
| if not a or not b: |
| return 0.0 |
| prev = [0] * (len(b) + 1) |
| for x in a: |
| cur = [0] |
| for j, y in enumerate(b, 1): |
| cur.append(prev[j - 1] + 1 if x == y else max(prev[j], cur[-1])) |
| prev = cur |
| lcs = prev[-1] |
| prec, rec = lcs / len(a), lcs / len(b) |
| return 0.0 if prec + rec == 0 else 2 * prec * rec / (prec + rec) |
|
|
|
|
| def bleu12(pred: str, ref: str) -> float: |
| p, r = _tokenize_words(pred), _tokenize_words(ref) |
| if not p or not r: |
| return 0.0 |
| scores = [] |
| for n in (1, 2): |
| pc = Counter(tuple(p[i:i+n]) for i in range(max(0, len(p)-n+1))) |
| rc = Counter(tuple(r[i:i+n]) for i in range(max(0, len(r)-n+1))) |
| denom = max(1, sum(pc.values())) |
| hit = sum(min(c, rc[g]) for g, c in pc.items()) |
| scores.append((hit + 1e-9) / denom) |
| bp = 1.0 if len(p) > len(r) else math.exp(1 - len(r) / max(1, len(p))) |
| return bp * math.sqrt(scores[0] * scores[1]) |
|
|
|
|
| HELDOUT_TEXTS = [ |
| "The capital of France is Paris, a city on the Seine known for art, science, and political history.", |
| "Water boils at one hundred degrees Celsius at standard atmospheric pressure.", |
| "Photosynthesis allows plants to convert light energy, carbon dioxide, and water into sugars and oxygen.", |
| "William Shakespeare wrote plays including Hamlet, Macbeth, and Romeo and Juliet.", |
| "The theory of evolution by natural selection is associated with Charles Darwin and Alfred Russel Wallace.", |
| "In computer science, a hash table stores key value pairs and uses a hash function to choose a bucket.", |
| ] |
|
|
| FORCED_CHOICE = [ |
| ("The capital of France is", [" Paris", " London", " Berlin", " Rome"], 0), |
| ("Water boils at", [" 100 degrees Celsius", " 20 degrees Celsius", " minus 10 degrees Celsius", " 1000 degrees Celsius"], 0), |
| ("Shakespeare wrote", [" Hamlet", " The Origin of Species", " The Republic", " War and Peace"], 0), |
| ("The theory of evolution was proposed by", [" Charles Darwin", " Isaac Newton", " Albert Einstein", " Marie Curie"], 0), |
| ("Photosynthesis produces", [" oxygen", " iron", " salt", " plastic"], 0), |
| ("A triangle has", [" three sides", " five sides", " seven sides", " no sides"], 0), |
| ] |
|
|
| GEN_PROBES = [ |
| ("The capital of France is", "Paris."), |
| ("Water boils at", "100 degrees Celsius."), |
| ("Once upon a time", "there was"), |
| ("Photosynthesis is", "the process"), |
| ("In computer science, a hash table", "stores key value pairs."), |
| ] |
|
|
|
|
| def resolve_checkpoint(args: argparse.Namespace) -> Path: |
| if args.ckpt: |
| return Path(args.ckpt).expanduser().resolve() |
| if args.repo_id and args.job_id: |
| from huggingface_hub import hf_hub_download |
| filename = f"jobs/{args.job_id}/{args.ckpt_name}" |
| print(f"[scan] downloading {args.repo_id}/{filename}") |
| return Path(hf_hub_download(args.repo_id, filename, repo_type="model", token=os.environ.get("HF_TOKEN"))) |
| if args.repo_id and args.repo_path: |
| from huggingface_hub import hf_hub_download |
| print(f"[scan] downloading {args.repo_id}/{args.repo_path}") |
| return Path(hf_hub_download(args.repo_id, args.repo_path, repo_type="model", token=os.environ.get("HF_TOKEN"))) |
| raise SystemExit("provide --ckpt or --repo-id with --job-id/--repo-path") |
|
|
|
|
| def load_model(ckpt_path: Path, device: torch.device): |
| if os.environ.get("HYDRA_USE_NEMOTRON", "0") == "1": |
| import prepare_nemotron as _p_nemo |
| _p_nemo.ensure_tokenizer() |
| try: |
| import subsystems.sdr_retina as _sdr_retina |
| _sdr_retina.build_retina() |
| except Exception as e: |
| print(f"[scan] retina build/hydrate warning: {type(e).__name__}: {e}", flush=True) |
| from prepare import Tokenizer |
| from hydra.config import PostSemClawConfig |
| from hydra.model import PostSemClawModel |
| from hydra.training import config_from_dict |
|
|
| tokenizer = Tokenizer.from_directory() |
| ckpt = torch.load(str(ckpt_path), map_location="cpu", weights_only=False) |
| cfg_payload = ckpt.get("config") if isinstance(ckpt, dict) else None |
| config = config_from_dict(cfg_payload) if isinstance(cfg_payload, dict) else PostSemClawConfig( |
| sequence_len=int(os.environ.get("HYDRA_SEQ_LEN", "2048")), |
| vocab_size=tokenizer.get_vocab_size(), |
| ) |
| with torch.device("meta"): |
| model = PostSemClawModel(config) |
| model.to_empty(device=device) |
| state = ckpt.get("model_state_dict", ckpt) |
| missing, unexpected = model.load_state_dict(state, strict=False) |
| model.eval() |
| if hasattr(model, "set_bos_token_id"): |
| model.set_bos_token_id(tokenizer.get_bos_token_id()) |
| meta = { |
| "ckpt_path": str(ckpt_path), |
| "step": ckpt.get("step") if isinstance(ckpt, dict) else None, |
| "val_bpb": ckpt.get("val_bpb") if isinstance(ckpt, dict) else None, |
| "missing": len(missing), |
| "unexpected": len(unexpected), |
| "config": getattr(config, "__dict__", {}), |
| } |
| return model, tokenizer, meta |
|
|
|
|
| def ids_for(tokenizer, text: str) -> list[int]: |
| ids = tokenizer.encode(text) |
| if not ids: |
| bos = tokenizer.get_bos_token_id() |
| ids = [bos] |
| return ids |
|
|
|
|
| @torch.no_grad() |
| def score_text_bpb(model, tokenizer, text: str, device: torch.device) -> float: |
| ids = ids_for(tokenizer, text) |
| if len(ids) < 2: |
| return float("nan") |
| x = torch.tensor([ids[:-1]], dtype=torch.long, device=device) |
| y = torch.tensor([ids[1:]], dtype=torch.long, device=device) |
| with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16, enabled=device.type == "cuda"): |
| loss = model(x, y, reduction="none").reshape(-1).float().sum().item() |
| return loss / (math.log(2) * max(1, len(text.encode("utf-8")))) |
|
|
|
|
| @torch.no_grad() |
| def continuation_nll(model, tokenizer, prompt: str, continuation: str, device: torch.device) -> float: |
| pids = ids_for(tokenizer, prompt) |
| cids = ids_for(tokenizer, continuation) |
| seq = pids + cids |
| if len(seq) < 2: |
| return float("inf") |
| x = torch.tensor([seq[:-1]], dtype=torch.long, device=device) |
| y = torch.tensor([seq[1:]], dtype=torch.long, device=device) |
| with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16, enabled=device.type == "cuda"): |
| losses = model(x, y, reduction="none").reshape(-1).float() |
| |
| start = max(0, len(pids) - 1) |
| cont = losses[start:start + len(cids)] |
| return float(cont.mean().item()) if cont.numel() else float("inf") |
|
|
|
|
| @torch.no_grad() |
| def _sample_next(logits: torch.Tensor, mode: str, state: dict) -> int: |
| z = logits.float().detach().cpu() |
| if mode == "greedy": |
| return int(z.argmax().item()) |
| if mode == "top_k": |
| k = min(64, z.numel()) |
| vals, idx = torch.topk(z / 0.8, k) |
| return int(idx[torch.multinomial(torch.softmax(vals, dim=-1), 1).item()].item()) |
| if mode == "top_p": |
| probs = torch.softmax(z / 0.8, dim=-1) |
| vals, idx = torch.sort(probs, descending=True) |
| keep = torch.cumsum(vals, dim=-1) <= 0.92 |
| keep[0] = True |
| vals, idx = vals[keep], idx[keep] |
| vals = vals / vals.sum() |
| return int(idx[torch.multinomial(vals, 1).item()].item()) |
| if mode == "mirostat": |
| tau = float(state.setdefault("tau", 5.0)); eta = float(state.setdefault("eta", 0.10)) |
| mu = float(state.setdefault("mu", 2.0 * tau)) |
| probs = torch.softmax(z, dim=-1) |
| vals, idx = torch.sort(probs, descending=True) |
| k = max(8, min(256, int(2 ** max(1.0, min(8.0, mu))))) |
| vals, idx = vals[:k], idx[:k] |
| vals = vals / vals.sum() |
| j = int(torch.multinomial(vals, 1).item()) |
| p = max(float(vals[j].item()), 1e-12) |
| surprise = -math.log2(p) |
| state["mu"] = mu - eta * (surprise - tau) |
| return int(idx[j].item()) |
| raise ValueError(mode) |
|
|
|
|
| @torch.no_grad() |
| def generate_sample(model, tokenizer, prompt: str, device: torch.device, max_new: int, mode: str) -> str: |
| ids = ids_for(tokenizer, prompt) |
| max_ctx = int(getattr(getattr(model, "config", None), "sequence_len", os.environ.get("HYDRA_SEQ_LEN", "2048"))) |
| state: dict = {} |
| torch.manual_seed(1234 + abs(hash((prompt, mode))) % 100000) |
| for _ in range(max_new): |
| ctx = ids[-max_ctx:] |
| x = torch.tensor([ctx], dtype=torch.long, device=device) |
| with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16, enabled=device.type == "cuda"): |
| logits = model(x) |
| ids.append(_sample_next(logits[0, -1], mode, state)) |
| return tokenizer.decode(ids) |
|
|
|
|
| def generation_hygiene(text: str) -> dict[str, float]: |
| tail = text[-512:] |
| chars = list(tail) |
| printable = sum(c.isprintable() or c in "\n\t" for c in chars) / max(1, len(chars)) |
| alpha_space = sum(c.isalpha() or c.isspace() or c in ".,;:'\"!?-()" for c in chars) / max(1, len(chars)) |
| toks = _tokenize_words(tail) |
| rep = 0.0 |
| if len(toks) >= 8: |
| grams = [tuple(toks[i:i+4]) for i in range(len(toks)-3)] |
| rep = 1.0 - len(set(grams)) / max(1, len(grams)) |
| return {"printable": printable, "alpha_space": alpha_space, "repeat4": rep} |
|
|
|
|
| def verdict(metrics: dict) -> dict[str, object]: |
| bpb = metrics["heldout_bpb_mean"] |
| fc = metrics["forced_choice_acc"] |
| rouge = metrics["rouge_l_mean"] |
| hygiene = metrics["hygiene_mean"] |
| return { |
| "english_substrate": bpb <= 1.35 and hygiene >= 0.80, |
| "readable_generation": hygiene >= 0.88 and metrics["repeat4_mean"] <= 0.35, |
| "factual_cloze_emerging": fc >= 0.50, |
| "bleu_rouge_emerging": rouge >= 0.20 and metrics["bleu12_mean"] >= 0.08, |
| "recall_ready": fc >= 0.66 and rouge >= 0.30 and bpb <= 1.15, |
| } |
|
|
|
|
| def main() -> int: |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--ckpt") |
| ap.add_argument("--repo-id", default=os.environ.get("HF_REPO_ID", "GAInTech/feather-pretrain-checkpoints")) |
| ap.add_argument("--job-id") |
| ap.add_argument("--repo-path") |
| ap.add_argument("--ckpt-name", default="latest.pt") |
| ap.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu") |
| ap.add_argument("--max-new", type=int, default=32) |
| ap.add_argument("--json-out") |
| args = ap.parse_args() |
|
|
| t0 = time.time() |
| device = torch.device(args.device if args.device != "cuda" or torch.cuda.is_available() else "cpu") |
| ckpt_path = resolve_checkpoint(args) |
| print(f"[scan] checkpoint={ckpt_path} device={device}") |
| model, tokenizer, meta = load_model(ckpt_path, device) |
| print(f"[scan] loaded step={meta['step']} missing={meta['missing']} unexpected={meta['unexpected']}") |
|
|
| heldout = [score_text_bpb(model, tokenizer, t, device) for t in HELDOUT_TEXTS] |
|
|
| forced_rows = [] |
| for prompt, opts, gold in FORCED_CHOICE: |
| scores = [continuation_nll(model, tokenizer, prompt, opt, device) for opt in opts] |
| pred = min(range(len(scores)), key=scores.__getitem__) |
| forced_rows.append({"prompt": prompt, "pred": pred, "gold": gold, "ok": pred == gold, "scores": scores, "options": opts}) |
|
|
| gen_rows = [] |
| for mode in ("greedy", "top_k", "top_p", "mirostat"): |
| for prompt, ref in GEN_PROBES: |
| out = generate_sample(model, tokenizer, prompt, device, args.max_new, mode) |
| cont = out[len(prompt):] if out.startswith(prompt) else out |
| h = generation_hygiene(out) |
| gen_rows.append({"mode": mode, "prompt": prompt, "reference": ref, "output": out, "continuation": cont, "rouge_l": rouge_l(cont, ref), "bleu12": bleu12(cont, ref), **h}) |
|
|
| mode_stats = {} |
| for mode in sorted({r["mode"] for r in gen_rows}): |
| rows = [r for r in gen_rows if r["mode"] == mode] |
| mode_stats[mode] = { |
| "rouge_l_mean": sum(r["rouge_l"] for r in rows) / len(rows), |
| "bleu12_mean": sum(r["bleu12"] for r in rows) / len(rows), |
| "hygiene_mean": sum(r["alpha_space"] for r in rows) / len(rows), |
| "repeat4_mean": sum(r["repeat4"] for r in rows) / len(rows), |
| } |
| best_mode = max( |
| mode_stats, |
| key=lambda m: (mode_stats[m]["rouge_l_mean"] + mode_stats[m]["bleu12_mean"] - 0.25 * mode_stats[m]["repeat4_mean"]), |
| ) |
| metrics = { |
| "meta": {k: v for k, v in meta.items() if k != "config"}, |
| "heldout_bpb": heldout, |
| "heldout_bpb_mean": float(sum(heldout) / len(heldout)), |
| "forced_choice": forced_rows, |
| "forced_choice_acc": sum(r["ok"] for r in forced_rows) / len(forced_rows), |
| "generations": gen_rows, |
| "mode_stats": mode_stats, |
| "best_generation_mode": best_mode, |
| "rouge_l_mean": mode_stats[best_mode]["rouge_l_mean"], |
| "bleu12_mean": mode_stats[best_mode]["bleu12_mean"], |
| "hygiene_mean": mode_stats[best_mode]["hygiene_mean"], |
| "repeat4_mean": mode_stats[best_mode]["repeat4_mean"], |
| "seconds": round(time.time() - t0, 3), |
| } |
| metrics["verdict"] = verdict(metrics) |
|
|
| print("[CAPABILITY_SCAN_JSON] " + json.dumps(metrics, sort_keys=True)) |
| print("\n=== SUMMARY ===") |
| print(f"step={meta['step']} heldout_bpb={metrics['heldout_bpb_mean']:.4f} forced_choice={metrics['forced_choice_acc']:.3f} best_mode={metrics['best_generation_mode']} rougeL={metrics['rouge_l_mean']:.3f} bleu12={metrics['bleu12_mean']:.3f} hygiene={metrics['hygiene_mean']:.3f} repeat4={metrics['repeat4_mean']:.3f}") |
| print("mode_stats=" + json.dumps(metrics["mode_stats"], sort_keys=True)) |
| print("verdict=" + json.dumps(metrics["verdict"], sort_keys=True)) |
| print("\n=== GENERATIONS ===") |
| for r in gen_rows: |
| safe = r["output"].replace("\n", "\\n") |
| print(f"PROMPT [{r['mode']}] {r['prompt']!r} -> {safe!r}") |
|
|
| if args.json_out: |
| Path(args.json_out).write_text(json.dumps(metrics, indent=2, sort_keys=True)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|