ultron / eval_ultron.py
trojan0x's picture
Add comprehensive eval script (standard LM + security benchmarks + depth extrapolation)
6c27edb verified
#!/usr/bin/env python3
"""
Ultron Comprehensive Evaluation — Standard LM + Security Benchmarks
Evaluates both the general pretrained model and the cybersecurity CPT model
side-by-side on:
1. Standard LM benchmarks: HellaSwag, ARC-Easy, ARC-Challenge, PIQA, WinoGrande, BoolQ
2. Security benchmarks: MMLU computer_security, SecBench English MCQ, CyberMetric
3. Depth extrapolation: Same model at different loop counts (1, 2, 4, 8, 12, 16)
All results are uploaded to the respective HF Hub model repos.
Usage:
# Full eval (both models, all benchmarks)
python eval_ultron.py
# Quick test (50 samples per task)
python eval_ultron.py --limit 50
# Single model only
python eval_ultron.py --models trojan0x/ultron-sec-cpt
# Skip slow parts
python eval_ultron.py --skip_depth --skip_security
# Just security benchmarks
python eval_ultron.py --skip_depth --limit 200
"""
import os
import sys
import json
import time
import argparse
import types
import traceback
import torch
import torch.nn as nn
import torch.nn.functional as F
from huggingface_hub import hf_hub_download, snapshot_download, HfApi
from transformers import GPT2TokenizerFast
# ---- Setup Ultron model code ----
def setup_ultron():
"""Download Ultron model code from Hub and add to path."""
repo_path = snapshot_download("trojan0x/ultron", allow_patterns=["ultron/*.py"])
sys.path.insert(0, repo_path)
print(f"[setup] Ultron code loaded from: {repo_path}")
return repo_path
ULTRON_PATH = setup_ultron()
from ultron.model import Ultron, UltronConfig
# ===========================================================================
# Model Loading
# ===========================================================================
def load_model(model_id, device="cuda"):
"""Load trained Ultron model from HF Hub."""
print(f"\n{'='*60}")
print(f"Loading model: {model_id}")
print(f"{'='*60}")
# Determine checkpoint filename based on model repo name
if "sec-cpt" in model_id or "sec_cpt" in model_id:
ckpt_name = "ultron_sec_cpt_final.pt"
elif "moe" in model_id:
ckpt_name = "ultron_moe_final.pt"
else:
ckpt_name = "ultron_final.pt"
ckpt_path = hf_hub_download(model_id, ckpt_name)
ckpt = torch.load(ckpt_path, map_location="cpu", weights_only=False)
# Reconstruct config from saved dict
cfg_dict = ckpt["config"]
cfg = UltronConfig(**cfg_dict)
# Build model and load weights
model = Ultron(cfg)
model.load_state_dict(ckpt["model_state_dict"])
# float32 for stable eval — 89M fits easily on any GPU
model = model.float().to(device)
model.eval()
step = ckpt.get("step", "unknown")
tokens = ckpt.get("tokens_seen", "unknown")
rho = model.get_spectral_radius()
print(f" Checkpoint: {ckpt_name}")
print(f" Params: {model.get_num_params(False):,} total, {model.get_num_params(True):,} non-embedding")
print(f" Trained: {step} steps, {tokens} tokens")
print(f" rho(A): {rho:.6f} {'OK' if rho < 1 else 'UNSTABLE!'}")
print(f" Config: dim={cfg.dim}, heads={cfg.n_heads}, kv_heads={cfg.n_kv_heads}")
print(f" Architecture: {cfg.prelude_layers}P + {cfg.recurrent_layers}R x {cfg.max_loop_iters}L + {cfg.coda_layers}C")
print(f" Effective depth: {cfg.prelude_layers + cfg.recurrent_layers * cfg.max_loop_iters + cfg.coda_layers} layers")
print(f" max_seq_len={cfg.max_seq_len}, vocab_size={cfg.vocab_size}")
return model, cfg, {"step": step, "tokens_seen": tokens, "rho_A": rho}
# ===========================================================================
# HFLM-Compatible Wrapper
# ===========================================================================
class UltronHFWrapper(nn.Module):
"""Wraps Ultron to look like a HuggingFace CausalLM for lm-eval-harness.
Fixes:
1. tie_weights() — HFLM calls this unconditionally
2. Left truncation — sequences > max_seq_len get trimmed from left
3. float32 — avoids bf16 softmax NaN in attention
4. config attributes — HFLM reads model_type, n_positions, etc.
"""
def __init__(self, model, cfg, n_loops=None):
super().__init__()
self.model = model
self.n_loops = n_loops or cfg.max_loop_iters
self.max_seq_len = cfg.max_seq_len
# HFLM reads these attributes
self.config = types.SimpleNamespace(
model_type="gpt2",
vocab_size=cfg.vocab_size,
n_positions=cfg.max_seq_len,
max_position_embeddings=cfg.max_seq_len,
n_embd=cfg.dim,
hidden_size=cfg.dim,
is_encoder_decoder=False,
pad_token_id=None,
)
self.generation_config = types.SimpleNamespace(
do_sample=False,
temperature=1.0,
)
def tie_weights(self):
"""No-op — HFLM calls this unconditionally during init."""
pass
def forward(self, input_ids=None, attention_mask=None, **kwargs):
"""Forward pass with left-truncation safety."""
if input_ids.shape[1] > self.max_seq_len:
input_ids = input_ids[:, -self.max_seq_len:]
if attention_mask is not None:
attention_mask = attention_mask[:, -self.max_seq_len:]
logits = self.model(input_ids, n_loops=self.n_loops)
return types.SimpleNamespace(logits=logits)
def parameters(self):
return self.model.parameters()
def named_parameters(self, *args, **kwargs):
return self.model.named_parameters(*args, **kwargs)
def to(self, *args, **kwargs):
self.model = self.model.to(*args, **kwargs)
return self
def eval(self):
self.model.eval()
return self
def train(self, mode=True):
self.model.train(mode)
return self
# ===========================================================================
# Security Benchmarks: Direct MCQ Evaluation
# ===========================================================================
def eval_secbench(model, cfg, tokenizer, device, n_loops=None, limit=None):
"""Evaluate on SecBench English MCQs (log-likelihood over answer choices)."""
from datasets import load_dataset
print("\n[SecBench] Loading dataset...")
ds = load_dataset("RISys-Lab/Benchmarks_CyberSec_SecBench", "MCQs_English", split="test")
if limit:
ds = ds.select(range(min(limit, len(ds))))
print(f"[SecBench] Evaluating {len(ds)} questions")
n_loops = n_loops or cfg.max_loop_iters
model.eval()
correct = 0
total = 0
label_map = {"A": 0, "B": 1, "C": 2, "D": 3}
for i, row in enumerate(ds):
question = row["question"]
answers = row["answers"]
label = row["label"]
gt_idx = label_map.get(label, -1)
if gt_idx == -1:
continue
choices = ["A", "B", "C", "D"]
log_probs = []
for j, ch in enumerate(choices):
prompt = f"Question: {question}\nAnswer: {ch}. {answers[j]}"
tokens = tokenizer.encode(prompt, return_tensors="pt").to(device)
if tokens.shape[1] > cfg.max_seq_len:
tokens = tokens[:, -cfg.max_seq_len:]
with torch.no_grad():
logits = model(tokens, n_loops=n_loops)
# Score: mean log-prob of the answer tokens
answer_text = f" {ch}. {answers[j]}"
answer_tokens = tokenizer.encode(answer_text)
n_answer = len(answer_tokens)
lp_all = F.log_softmax(logits[0, -(n_answer+1):-1, :], dim=-1)
answer_ids = tokens[0, -n_answer:]
lp = sum(lp_all[k, answer_ids[k]].item() for k in range(n_answer))
log_probs.append(lp / max(n_answer, 1))
pred = max(range(4), key=lambda k: log_probs[k])
if pred == gt_idx:
correct += 1
total += 1
if (i + 1) % 100 == 0:
print(f" [{i+1}/{len(ds)}] acc = {correct/total:.4f}")
acc = correct / max(total, 1)
print(f"[SecBench] Final: {correct}/{total} = {acc:.4f}")
return {"acc": acc, "correct": correct, "total": total}
def eval_cybermetric(model, cfg, tokenizer, device, n_loops=None, limit=None):
"""Evaluate on CyberMetric MCQs (nested JSON format)."""
from datasets import load_dataset
print("\n[CyberMetric] Loading dataset...")
ds = load_dataset("tihanyin/CyberMetric", split="train")
if limit:
ds = ds.select(range(min(limit, len(ds))))
print(f"[CyberMetric] Evaluating {len(ds)} questions")
n_loops = n_loops or cfg.max_loop_iters
model.eval()
correct = 0
total = 0
skipped = 0
label_map = {"A": 0, "B": 1, "C": 2, "D": 3}
for i, row in enumerate(ds):
q_data = row["questions"]
question = q_data.get("question", "")
answers_dict = q_data.get("answers", {})
gt_letter = q_data.get("correct_answer", q_data.get("answer", None))
if gt_letter is None:
skipped += 1
continue
gt_letter = str(gt_letter).strip().upper()
gt_idx = label_map.get(gt_letter, -1)
if gt_idx == -1:
skipped += 1
continue
choices = ["A", "B", "C", "D"]
log_probs = []
for j, ch in enumerate(choices):
ans_text = answers_dict.get(ch, "")
prompt = f"Question: {question}\nAnswer: {ch}. {ans_text}"
tokens = tokenizer.encode(prompt, return_tensors="pt").to(device)
if tokens.shape[1] > cfg.max_seq_len:
tokens = tokens[:, -cfg.max_seq_len:]
with torch.no_grad():
logits = model(tokens, n_loops=n_loops)
answer_text = f" {ch}. {ans_text}"
answer_tokens = tokenizer.encode(answer_text)
n_answer = len(answer_tokens)
lp_all = F.log_softmax(logits[0, -(n_answer+1):-1, :], dim=-1)
answer_ids = tokens[0, -n_answer:]
lp = sum(lp_all[k, answer_ids[k]].item() for k in range(n_answer))
log_probs.append(lp / max(n_answer, 1))
pred = max(range(4), key=lambda k: log_probs[k])
if pred == gt_idx:
correct += 1
total += 1
if (i + 1) % 500 == 0:
print(f" [{i+1}/{len(ds)}] acc = {correct/total:.4f} (skipped {skipped})")
acc = correct / max(total, 1)
print(f"[CyberMetric] Final: {correct}/{total} = {acc:.4f} (skipped {skipped})")
return {"acc": acc, "correct": correct, "total": total, "skipped": skipped}
# ===========================================================================
# Standard Evaluation via lm-eval-harness
# ===========================================================================
def evaluate_standard(model, cfg, tokenizer, tasks, device, n_loops=None, limit=None, batch_size=4):
"""Run lm-evaluation-harness benchmarks."""
import lm_eval
from lm_eval.models.huggingface import HFLM
wrapper = UltronHFWrapper(model, cfg, n_loops=n_loops)
wrapper = wrapper.to(device).eval()
lm = HFLM(
pretrained=wrapper,
tokenizer=tokenizer,
max_length=cfg.max_seq_len,
dtype="float32",
batch_size=batch_size,
device=str(device),
trust_remote_code=False,
)
eval_kwargs = {
"model": lm,
"tasks": tasks,
"num_fewshot": 0,
"log_samples": False,
}
if limit is not None:
eval_kwargs["limit"] = limit
print(f"\n[lm-eval] Tasks: {tasks}, n_loops={n_loops or cfg.max_loop_iters}, limit={limit}, bs={batch_size}")
results = lm_eval.simple_evaluate(**eval_kwargs)
return results["results"]
# ===========================================================================
# Depth Extrapolation
# ===========================================================================
def test_depth_extrapolation(model, cfg, tokenizer, device, limit=200, batch_size=4):
"""Test the same model at different loop depths — Ultron's key feature."""
loop_counts = [1, 2, 4, cfg.max_loop_iters, 12, 16]
tasks = ["hellaswag", "arc_easy", "piqa"]
print(f"\n{'='*60}")
print("DEPTH EXTRAPOLATION TEST")
print(f"{'='*60}")
print(f"Loop counts: {loop_counts}")
print(f"Tasks: {tasks}, limit={limit}")
all_results = {}
for n_loops in loop_counts:
print(f"\n--- n_loops = {n_loops} ---")
results = evaluate_standard(
model, cfg, tokenizer, tasks, device,
n_loops=n_loops, limit=limit, batch_size=batch_size
)
all_results[n_loops] = results
for task, scores in results.items():
for m in ["acc_norm,none", "acc,none"]:
if m in scores:
print(f" {task}: {scores[m]:.4f}")
break
return all_results
# ===========================================================================
# Formatting
# ===========================================================================
def format_results_table(results, label=""):
lines = []
if label:
lines.append(f"\n## {label}\n")
lines.append(f"| {'Task':<25} | {'Metric':<15} | {'Score':>8} |")
lines.append(f"|{'-'*27}|{'-'*17}|{'-'*10}|")
for task, scores in sorted(results.items()):
for metric in ["acc_norm,none", "acc,none"]:
if metric in scores:
val = scores[metric]
lines.append(f"| {task:<25} | {metric.replace(',none',''):<15} | {val:>8.4f} |")
break
return "\n".join(lines)
def format_depth_table(all_results, tasks):
lines = ["\n## Depth Extrapolation\n"]
header = f"| {'n_loops':<10} |"
for t in tasks:
header += f" {t:<15} |"
lines.append(header)
lines.append("|" + "-"*12 + "|" + (("-"*17 + "|") * len(tasks)))
for n_loops, results in sorted(all_results.items()):
row = f"| {n_loops:<10} |"
for t in tasks:
if t in results:
for m in ["acc_norm,none", "acc,none"]:
if m in results[t]:
row += f" {results[t][m]:<15.4f} |"
break
else:
row += f" {'N/A':<15} |"
else:
row += f" {'N/A':<15} |"
lines.append(row)
return "\n".join(lines)
# ===========================================================================
# Main
# ===========================================================================
def main():
parser = argparse.ArgumentParser(description="Ultron Comprehensive Evaluation")
parser.add_argument("--models", type=str, nargs="+",
default=["trojan0x/ultron-small-baseline", "trojan0x/ultron-sec-cpt"],
help="HF model IDs to evaluate")
parser.add_argument("--limit", type=int, default=None,
help="Limit samples per task (None = full eval)")
parser.add_argument("--batch_size", type=int, default=4,
help="Eval batch size (lower if OOM)")
parser.add_argument("--skip_security", action="store_true",
help="Skip SecBench + CyberMetric")
parser.add_argument("--skip_depth", action="store_true",
help="Skip depth extrapolation test")
parser.add_argument("--upload", action="store_true", default=True,
help="Upload results to HF Hub")
parser.add_argument("--no_upload", action="store_true",
help="Disable upload to HF Hub")
args = parser.parse_args()
if args.no_upload:
args.upload = False
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[main] Device: {device}")
if device.type == "cuda":
print(f" GPU: {torch.cuda.get_device_name()}")
mem_gb = torch.cuda.get_device_properties(0).total_mem / 1e9
print(f" VRAM: {mem_gb:.1f} GB")
else:
print(" WARNING: Running on CPU — will be very slow!")
# Tokenizer (GPT-2, shared by all Ultron models)
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "left"
standard_tasks = ["hellaswag", "arc_easy", "arc_challenge", "piqa", "winogrande", "boolq"]
mmlu_tasks = ["mmlu_computer_security"]
all_model_results = {}
for model_id in args.models:
print(f"\n{'#'*70}")
print(f"# EVALUATING: {model_id}")
print(f"{'#'*70}")
try:
model, cfg, meta = load_model(model_id, device)
except Exception as e:
print(f"[ERROR] Failed to load {model_id}: {e}")
traceback.print_exc()
continue
model_results = {"meta": meta, "standard": {}, "security": {}, "depth": {}}
# ---- Phase 1: Standard LM Benchmarks (0-shot) ----
print("\n" + "="*60)
print("PHASE 1: Standard LM Benchmarks (0-shot)")
print("="*60)
try:
std_results = evaluate_standard(
model, cfg, tokenizer, standard_tasks, device,
limit=args.limit, batch_size=args.batch_size
)
model_results["standard"] = std_results
print(format_results_table(std_results, f"Standard — {model_id}"))
except Exception as e:
print(f"[ERROR] Standard eval failed: {e}")
traceback.print_exc()
# ---- Phase 2: MMLU Computer Security (5-shot) ----
print("\n" + "="*60)
print("PHASE 2: MMLU Computer Security (5-shot)")
print("="*60)
try:
import lm_eval
from lm_eval.models.huggingface import HFLM
wrapper = UltronHFWrapper(model, cfg)
wrapper = wrapper.to(device).eval()
lm = HFLM(
pretrained=wrapper, tokenizer=tokenizer,
max_length=cfg.max_seq_len, dtype="float32",
batch_size=args.batch_size, device=str(device),
)
mmlu_results = lm_eval.simple_evaluate(
model=lm, tasks=mmlu_tasks, num_fewshot=5,
log_samples=False, limit=args.limit,
)["results"]
model_results["security"]["mmlu_computer_security"] = mmlu_results
print(format_results_table(mmlu_results, "MMLU Computer Security"))
except Exception as e:
print(f"[ERROR] MMLU eval failed: {e}")
traceback.print_exc()
# ---- Phase 3: SecBench + CyberMetric ----
if not args.skip_security:
print("\n" + "="*60)
print("PHASE 3: Security Benchmarks (Direct MCQ)")
print("="*60)
try:
secbench = eval_secbench(model, cfg, tokenizer, device,
limit=args.limit)
model_results["security"]["secbench_english"] = secbench
except Exception as e:
print(f"[ERROR] SecBench failed: {e}")
traceback.print_exc()
try:
cm_limit = args.limit if args.limit else 2000
cybermetric = eval_cybermetric(model, cfg, tokenizer, device,
limit=cm_limit)
model_results["security"]["cybermetric"] = cybermetric
except Exception as e:
print(f"[ERROR] CyberMetric failed: {e}")
traceback.print_exc()
# ---- Phase 4: Depth Extrapolation ----
if not args.skip_depth:
print("\n" + "="*60)
print("PHASE 4: Depth Extrapolation")
print("="*60)
try:
depth_limit = args.limit if args.limit else 200
depth_results = test_depth_extrapolation(
model, cfg, tokenizer, device,
limit=depth_limit, batch_size=args.batch_size
)
model_results["depth"] = {str(k): v for k, v in depth_results.items()}
print(format_depth_table(depth_results, ["hellaswag", "arc_easy", "piqa"]))
except Exception as e:
print(f"[ERROR] Depth extrapolation failed: {e}")
traceback.print_exc()
all_model_results[model_id] = model_results
del model
torch.cuda.empty_cache()
# ---- Final Comparison ----
print("\n" + "#"*70)
print("# FINAL COMPARISON")
print("#"*70)
if len(all_model_results) >= 2:
model_ids = list(all_model_results.keys())
names = [m.split("/")[-1] for m in model_ids]
print(f"\n{'Task':<25} {names[0]:>22} {names[1]:>22} {'Delta':>10}")
print("-" * 82)
for task in standard_tasks:
print(f"{task:<25}", end="")
scores = []
for mid in model_ids:
td = all_model_results[mid].get("standard", {}).get(task, {})
for m in ["acc_norm,none", "acc,none"]:
if m in td:
scores.append(td[m])
print(f" {td[m]:>21.4f}", end="")
break
else:
scores.append(None)
print(f" {'N/A':>21}", end="")
if len(scores) >= 2 and all(s is not None for s in scores[:2]):
d = scores[1] - scores[0]
print(f" {'+' if d>0 else ''}{d:>9.4f}", end="")
print()
print(f"\n{'Security Task':<25} {names[0]:>22} {names[1]:>22} {'Delta':>10}")
print("-" * 82)
for st in ["secbench_english", "cybermetric"]:
print(f"{st:<25}", end="")
scores = []
for mid in model_ids:
sd = all_model_results[mid].get("security", {}).get(st, {})
if "acc" in sd:
scores.append(sd["acc"])
print(f" {sd['acc']:>21.4f}", end="")
else:
scores.append(None)
print(f" {'N/A':>21}", end="")
if len(scores) >= 2 and all(s is not None for s in scores[:2]):
d = scores[1] - scores[0]
print(f" {'+' if d>0 else ''}{d:>9.4f}", end="")
print()
# Save
results_path = "eval_results_full.json"
with open(results_path, "w") as f:
json.dump(all_model_results, f, indent=2, default=str)
print(f"\n[save] Results saved to {results_path}")
if args.upload and not args.no_upload:
api = HfApi()
for model_id in all_model_results:
try:
api.upload_file(
path_or_fileobj=results_path,
path_in_repo="eval_results.json",
repo_id=model_id,
)
print(f"[upload] Results uploaded to {model_id}")
except Exception as e:
print(f"[upload] Failed for {model_id}: {e}")
print("\n[done] Evaluation complete!")
if __name__ == "__main__":
main()