| |
| """ |
| Ultron Comprehensive Evaluation — Standard LM + Security Benchmarks |
| |
| Evaluates both the general pretrained model and the cybersecurity CPT model |
| side-by-side on: |
| 1. Standard LM benchmarks: HellaSwag, ARC-Easy, ARC-Challenge, PIQA, WinoGrande, BoolQ |
| 2. Security benchmarks: MMLU computer_security, SecBench English MCQ, CyberMetric |
| 3. Depth extrapolation: Same model at different loop counts (1, 2, 4, 8, 12, 16) |
| |
| All results are uploaded to the respective HF Hub model repos. |
| |
| Usage: |
| # Full eval (both models, all benchmarks) |
| python eval_ultron.py |
| |
| # Quick test (50 samples per task) |
| python eval_ultron.py --limit 50 |
| |
| # Single model only |
| python eval_ultron.py --models trojan0x/ultron-sec-cpt |
| |
| # Skip slow parts |
| python eval_ultron.py --skip_depth --skip_security |
| |
| # Just security benchmarks |
| python eval_ultron.py --skip_depth --limit 200 |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import argparse |
| import types |
| import traceback |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from huggingface_hub import hf_hub_download, snapshot_download, HfApi |
| from transformers import GPT2TokenizerFast |
|
|
|
|
| |
| def setup_ultron(): |
| """Download Ultron model code from Hub and add to path.""" |
| repo_path = snapshot_download("trojan0x/ultron", allow_patterns=["ultron/*.py"]) |
| sys.path.insert(0, repo_path) |
| print(f"[setup] Ultron code loaded from: {repo_path}") |
| return repo_path |
|
|
| ULTRON_PATH = setup_ultron() |
| from ultron.model import Ultron, UltronConfig |
|
|
|
|
| |
| |
| |
|
|
| def load_model(model_id, device="cuda"): |
| """Load trained Ultron model from HF Hub.""" |
| print(f"\n{'='*60}") |
| print(f"Loading model: {model_id}") |
| print(f"{'='*60}") |
|
|
| |
| if "sec-cpt" in model_id or "sec_cpt" in model_id: |
| ckpt_name = "ultron_sec_cpt_final.pt" |
| elif "moe" in model_id: |
| ckpt_name = "ultron_moe_final.pt" |
| else: |
| ckpt_name = "ultron_final.pt" |
|
|
| ckpt_path = hf_hub_download(model_id, ckpt_name) |
| ckpt = torch.load(ckpt_path, map_location="cpu", weights_only=False) |
|
|
| |
| cfg_dict = ckpt["config"] |
| cfg = UltronConfig(**cfg_dict) |
|
|
| |
| model = Ultron(cfg) |
| model.load_state_dict(ckpt["model_state_dict"]) |
| |
| model = model.float().to(device) |
| model.eval() |
|
|
| step = ckpt.get("step", "unknown") |
| tokens = ckpt.get("tokens_seen", "unknown") |
| rho = model.get_spectral_radius() |
|
|
| print(f" Checkpoint: {ckpt_name}") |
| print(f" Params: {model.get_num_params(False):,} total, {model.get_num_params(True):,} non-embedding") |
| print(f" Trained: {step} steps, {tokens} tokens") |
| print(f" rho(A): {rho:.6f} {'OK' if rho < 1 else 'UNSTABLE!'}") |
| print(f" Config: dim={cfg.dim}, heads={cfg.n_heads}, kv_heads={cfg.n_kv_heads}") |
| print(f" Architecture: {cfg.prelude_layers}P + {cfg.recurrent_layers}R x {cfg.max_loop_iters}L + {cfg.coda_layers}C") |
| print(f" Effective depth: {cfg.prelude_layers + cfg.recurrent_layers * cfg.max_loop_iters + cfg.coda_layers} layers") |
| print(f" max_seq_len={cfg.max_seq_len}, vocab_size={cfg.vocab_size}") |
|
|
| return model, cfg, {"step": step, "tokens_seen": tokens, "rho_A": rho} |
|
|
|
|
| |
| |
| |
|
|
| class UltronHFWrapper(nn.Module): |
| """Wraps Ultron to look like a HuggingFace CausalLM for lm-eval-harness. |
| |
| Fixes: |
| 1. tie_weights() — HFLM calls this unconditionally |
| 2. Left truncation — sequences > max_seq_len get trimmed from left |
| 3. float32 — avoids bf16 softmax NaN in attention |
| 4. config attributes — HFLM reads model_type, n_positions, etc. |
| """ |
|
|
| def __init__(self, model, cfg, n_loops=None): |
| super().__init__() |
| self.model = model |
| self.n_loops = n_loops or cfg.max_loop_iters |
| self.max_seq_len = cfg.max_seq_len |
|
|
| |
| self.config = types.SimpleNamespace( |
| model_type="gpt2", |
| vocab_size=cfg.vocab_size, |
| n_positions=cfg.max_seq_len, |
| max_position_embeddings=cfg.max_seq_len, |
| n_embd=cfg.dim, |
| hidden_size=cfg.dim, |
| is_encoder_decoder=False, |
| pad_token_id=None, |
| ) |
| self.generation_config = types.SimpleNamespace( |
| do_sample=False, |
| temperature=1.0, |
| ) |
|
|
| def tie_weights(self): |
| """No-op — HFLM calls this unconditionally during init.""" |
| pass |
|
|
| def forward(self, input_ids=None, attention_mask=None, **kwargs): |
| """Forward pass with left-truncation safety.""" |
| if input_ids.shape[1] > self.max_seq_len: |
| input_ids = input_ids[:, -self.max_seq_len:] |
| if attention_mask is not None: |
| attention_mask = attention_mask[:, -self.max_seq_len:] |
|
|
| logits = self.model(input_ids, n_loops=self.n_loops) |
| return types.SimpleNamespace(logits=logits) |
|
|
| def parameters(self): |
| return self.model.parameters() |
|
|
| def named_parameters(self, *args, **kwargs): |
| return self.model.named_parameters(*args, **kwargs) |
|
|
| def to(self, *args, **kwargs): |
| self.model = self.model.to(*args, **kwargs) |
| return self |
|
|
| def eval(self): |
| self.model.eval() |
| return self |
|
|
| def train(self, mode=True): |
| self.model.train(mode) |
| return self |
|
|
|
|
| |
| |
| |
|
|
| def eval_secbench(model, cfg, tokenizer, device, n_loops=None, limit=None): |
| """Evaluate on SecBench English MCQs (log-likelihood over answer choices).""" |
| from datasets import load_dataset |
|
|
| print("\n[SecBench] Loading dataset...") |
| ds = load_dataset("RISys-Lab/Benchmarks_CyberSec_SecBench", "MCQs_English", split="test") |
| if limit: |
| ds = ds.select(range(min(limit, len(ds)))) |
| print(f"[SecBench] Evaluating {len(ds)} questions") |
|
|
| n_loops = n_loops or cfg.max_loop_iters |
| model.eval() |
| correct = 0 |
| total = 0 |
| label_map = {"A": 0, "B": 1, "C": 2, "D": 3} |
|
|
| for i, row in enumerate(ds): |
| question = row["question"] |
| answers = row["answers"] |
| label = row["label"] |
| gt_idx = label_map.get(label, -1) |
| if gt_idx == -1: |
| continue |
|
|
| choices = ["A", "B", "C", "D"] |
| log_probs = [] |
|
|
| for j, ch in enumerate(choices): |
| prompt = f"Question: {question}\nAnswer: {ch}. {answers[j]}" |
| tokens = tokenizer.encode(prompt, return_tensors="pt").to(device) |
| if tokens.shape[1] > cfg.max_seq_len: |
| tokens = tokens[:, -cfg.max_seq_len:] |
|
|
| with torch.no_grad(): |
| logits = model(tokens, n_loops=n_loops) |
|
|
| |
| answer_text = f" {ch}. {answers[j]}" |
| answer_tokens = tokenizer.encode(answer_text) |
| n_answer = len(answer_tokens) |
|
|
| lp_all = F.log_softmax(logits[0, -(n_answer+1):-1, :], dim=-1) |
| answer_ids = tokens[0, -n_answer:] |
| lp = sum(lp_all[k, answer_ids[k]].item() for k in range(n_answer)) |
| log_probs.append(lp / max(n_answer, 1)) |
|
|
| pred = max(range(4), key=lambda k: log_probs[k]) |
| if pred == gt_idx: |
| correct += 1 |
| total += 1 |
|
|
| if (i + 1) % 100 == 0: |
| print(f" [{i+1}/{len(ds)}] acc = {correct/total:.4f}") |
|
|
| acc = correct / max(total, 1) |
| print(f"[SecBench] Final: {correct}/{total} = {acc:.4f}") |
| return {"acc": acc, "correct": correct, "total": total} |
|
|
|
|
| def eval_cybermetric(model, cfg, tokenizer, device, n_loops=None, limit=None): |
| """Evaluate on CyberMetric MCQs (nested JSON format).""" |
| from datasets import load_dataset |
|
|
| print("\n[CyberMetric] Loading dataset...") |
| ds = load_dataset("tihanyin/CyberMetric", split="train") |
| if limit: |
| ds = ds.select(range(min(limit, len(ds)))) |
| print(f"[CyberMetric] Evaluating {len(ds)} questions") |
|
|
| n_loops = n_loops or cfg.max_loop_iters |
| model.eval() |
| correct = 0 |
| total = 0 |
| skipped = 0 |
| label_map = {"A": 0, "B": 1, "C": 2, "D": 3} |
|
|
| for i, row in enumerate(ds): |
| q_data = row["questions"] |
| question = q_data.get("question", "") |
| answers_dict = q_data.get("answers", {}) |
| gt_letter = q_data.get("correct_answer", q_data.get("answer", None)) |
| if gt_letter is None: |
| skipped += 1 |
| continue |
| gt_letter = str(gt_letter).strip().upper() |
| gt_idx = label_map.get(gt_letter, -1) |
| if gt_idx == -1: |
| skipped += 1 |
| continue |
|
|
| choices = ["A", "B", "C", "D"] |
| log_probs = [] |
|
|
| for j, ch in enumerate(choices): |
| ans_text = answers_dict.get(ch, "") |
| prompt = f"Question: {question}\nAnswer: {ch}. {ans_text}" |
| tokens = tokenizer.encode(prompt, return_tensors="pt").to(device) |
| if tokens.shape[1] > cfg.max_seq_len: |
| tokens = tokens[:, -cfg.max_seq_len:] |
|
|
| with torch.no_grad(): |
| logits = model(tokens, n_loops=n_loops) |
|
|
| answer_text = f" {ch}. {ans_text}" |
| answer_tokens = tokenizer.encode(answer_text) |
| n_answer = len(answer_tokens) |
|
|
| lp_all = F.log_softmax(logits[0, -(n_answer+1):-1, :], dim=-1) |
| answer_ids = tokens[0, -n_answer:] |
| lp = sum(lp_all[k, answer_ids[k]].item() for k in range(n_answer)) |
| log_probs.append(lp / max(n_answer, 1)) |
|
|
| pred = max(range(4), key=lambda k: log_probs[k]) |
| if pred == gt_idx: |
| correct += 1 |
| total += 1 |
|
|
| if (i + 1) % 500 == 0: |
| print(f" [{i+1}/{len(ds)}] acc = {correct/total:.4f} (skipped {skipped})") |
|
|
| acc = correct / max(total, 1) |
| print(f"[CyberMetric] Final: {correct}/{total} = {acc:.4f} (skipped {skipped})") |
| return {"acc": acc, "correct": correct, "total": total, "skipped": skipped} |
|
|
|
|
| |
| |
| |
|
|
| def evaluate_standard(model, cfg, tokenizer, tasks, device, n_loops=None, limit=None, batch_size=4): |
| """Run lm-evaluation-harness benchmarks.""" |
| import lm_eval |
| from lm_eval.models.huggingface import HFLM |
|
|
| wrapper = UltronHFWrapper(model, cfg, n_loops=n_loops) |
| wrapper = wrapper.to(device).eval() |
|
|
| lm = HFLM( |
| pretrained=wrapper, |
| tokenizer=tokenizer, |
| max_length=cfg.max_seq_len, |
| dtype="float32", |
| batch_size=batch_size, |
| device=str(device), |
| trust_remote_code=False, |
| ) |
|
|
| eval_kwargs = { |
| "model": lm, |
| "tasks": tasks, |
| "num_fewshot": 0, |
| "log_samples": False, |
| } |
| if limit is not None: |
| eval_kwargs["limit"] = limit |
|
|
| print(f"\n[lm-eval] Tasks: {tasks}, n_loops={n_loops or cfg.max_loop_iters}, limit={limit}, bs={batch_size}") |
| results = lm_eval.simple_evaluate(**eval_kwargs) |
| return results["results"] |
|
|
|
|
| |
| |
| |
|
|
| def test_depth_extrapolation(model, cfg, tokenizer, device, limit=200, batch_size=4): |
| """Test the same model at different loop depths — Ultron's key feature.""" |
| loop_counts = [1, 2, 4, cfg.max_loop_iters, 12, 16] |
| tasks = ["hellaswag", "arc_easy", "piqa"] |
|
|
| print(f"\n{'='*60}") |
| print("DEPTH EXTRAPOLATION TEST") |
| print(f"{'='*60}") |
| print(f"Loop counts: {loop_counts}") |
| print(f"Tasks: {tasks}, limit={limit}") |
|
|
| all_results = {} |
| for n_loops in loop_counts: |
| print(f"\n--- n_loops = {n_loops} ---") |
| results = evaluate_standard( |
| model, cfg, tokenizer, tasks, device, |
| n_loops=n_loops, limit=limit, batch_size=batch_size |
| ) |
| all_results[n_loops] = results |
| for task, scores in results.items(): |
| for m in ["acc_norm,none", "acc,none"]: |
| if m in scores: |
| print(f" {task}: {scores[m]:.4f}") |
| break |
| return all_results |
|
|
|
|
| |
| |
| |
|
|
| def format_results_table(results, label=""): |
| lines = [] |
| if label: |
| lines.append(f"\n## {label}\n") |
| lines.append(f"| {'Task':<25} | {'Metric':<15} | {'Score':>8} |") |
| lines.append(f"|{'-'*27}|{'-'*17}|{'-'*10}|") |
| for task, scores in sorted(results.items()): |
| for metric in ["acc_norm,none", "acc,none"]: |
| if metric in scores: |
| val = scores[metric] |
| lines.append(f"| {task:<25} | {metric.replace(',none',''):<15} | {val:>8.4f} |") |
| break |
| return "\n".join(lines) |
|
|
|
|
| def format_depth_table(all_results, tasks): |
| lines = ["\n## Depth Extrapolation\n"] |
| header = f"| {'n_loops':<10} |" |
| for t in tasks: |
| header += f" {t:<15} |" |
| lines.append(header) |
| lines.append("|" + "-"*12 + "|" + (("-"*17 + "|") * len(tasks))) |
| for n_loops, results in sorted(all_results.items()): |
| row = f"| {n_loops:<10} |" |
| for t in tasks: |
| if t in results: |
| for m in ["acc_norm,none", "acc,none"]: |
| if m in results[t]: |
| row += f" {results[t][m]:<15.4f} |" |
| break |
| else: |
| row += f" {'N/A':<15} |" |
| else: |
| row += f" {'N/A':<15} |" |
| lines.append(row) |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Ultron Comprehensive Evaluation") |
| parser.add_argument("--models", type=str, nargs="+", |
| default=["trojan0x/ultron-small-baseline", "trojan0x/ultron-sec-cpt"], |
| help="HF model IDs to evaluate") |
| parser.add_argument("--limit", type=int, default=None, |
| help="Limit samples per task (None = full eval)") |
| parser.add_argument("--batch_size", type=int, default=4, |
| help="Eval batch size (lower if OOM)") |
| parser.add_argument("--skip_security", action="store_true", |
| help="Skip SecBench + CyberMetric") |
| parser.add_argument("--skip_depth", action="store_true", |
| help="Skip depth extrapolation test") |
| parser.add_argument("--upload", action="store_true", default=True, |
| help="Upload results to HF Hub") |
| parser.add_argument("--no_upload", action="store_true", |
| help="Disable upload to HF Hub") |
| args = parser.parse_args() |
|
|
| if args.no_upload: |
| args.upload = False |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| print(f"[main] Device: {device}") |
| if device.type == "cuda": |
| print(f" GPU: {torch.cuda.get_device_name()}") |
| mem_gb = torch.cuda.get_device_properties(0).total_mem / 1e9 |
| print(f" VRAM: {mem_gb:.1f} GB") |
| else: |
| print(" WARNING: Running on CPU — will be very slow!") |
|
|
| |
| tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") |
| tokenizer.pad_token = tokenizer.eos_token |
| tokenizer.padding_side = "left" |
|
|
| standard_tasks = ["hellaswag", "arc_easy", "arc_challenge", "piqa", "winogrande", "boolq"] |
| mmlu_tasks = ["mmlu_computer_security"] |
|
|
| all_model_results = {} |
|
|
| for model_id in args.models: |
| print(f"\n{'#'*70}") |
| print(f"# EVALUATING: {model_id}") |
| print(f"{'#'*70}") |
|
|
| try: |
| model, cfg, meta = load_model(model_id, device) |
| except Exception as e: |
| print(f"[ERROR] Failed to load {model_id}: {e}") |
| traceback.print_exc() |
| continue |
|
|
| model_results = {"meta": meta, "standard": {}, "security": {}, "depth": {}} |
|
|
| |
| print("\n" + "="*60) |
| print("PHASE 1: Standard LM Benchmarks (0-shot)") |
| print("="*60) |
| try: |
| std_results = evaluate_standard( |
| model, cfg, tokenizer, standard_tasks, device, |
| limit=args.limit, batch_size=args.batch_size |
| ) |
| model_results["standard"] = std_results |
| print(format_results_table(std_results, f"Standard — {model_id}")) |
| except Exception as e: |
| print(f"[ERROR] Standard eval failed: {e}") |
| traceback.print_exc() |
|
|
| |
| print("\n" + "="*60) |
| print("PHASE 2: MMLU Computer Security (5-shot)") |
| print("="*60) |
| try: |
| import lm_eval |
| from lm_eval.models.huggingface import HFLM |
| wrapper = UltronHFWrapper(model, cfg) |
| wrapper = wrapper.to(device).eval() |
| lm = HFLM( |
| pretrained=wrapper, tokenizer=tokenizer, |
| max_length=cfg.max_seq_len, dtype="float32", |
| batch_size=args.batch_size, device=str(device), |
| ) |
| mmlu_results = lm_eval.simple_evaluate( |
| model=lm, tasks=mmlu_tasks, num_fewshot=5, |
| log_samples=False, limit=args.limit, |
| )["results"] |
| model_results["security"]["mmlu_computer_security"] = mmlu_results |
| print(format_results_table(mmlu_results, "MMLU Computer Security")) |
| except Exception as e: |
| print(f"[ERROR] MMLU eval failed: {e}") |
| traceback.print_exc() |
|
|
| |
| if not args.skip_security: |
| print("\n" + "="*60) |
| print("PHASE 3: Security Benchmarks (Direct MCQ)") |
| print("="*60) |
| try: |
| secbench = eval_secbench(model, cfg, tokenizer, device, |
| limit=args.limit) |
| model_results["security"]["secbench_english"] = secbench |
| except Exception as e: |
| print(f"[ERROR] SecBench failed: {e}") |
| traceback.print_exc() |
|
|
| try: |
| cm_limit = args.limit if args.limit else 2000 |
| cybermetric = eval_cybermetric(model, cfg, tokenizer, device, |
| limit=cm_limit) |
| model_results["security"]["cybermetric"] = cybermetric |
| except Exception as e: |
| print(f"[ERROR] CyberMetric failed: {e}") |
| traceback.print_exc() |
|
|
| |
| if not args.skip_depth: |
| print("\n" + "="*60) |
| print("PHASE 4: Depth Extrapolation") |
| print("="*60) |
| try: |
| depth_limit = args.limit if args.limit else 200 |
| depth_results = test_depth_extrapolation( |
| model, cfg, tokenizer, device, |
| limit=depth_limit, batch_size=args.batch_size |
| ) |
| model_results["depth"] = {str(k): v for k, v in depth_results.items()} |
| print(format_depth_table(depth_results, ["hellaswag", "arc_easy", "piqa"])) |
| except Exception as e: |
| print(f"[ERROR] Depth extrapolation failed: {e}") |
| traceback.print_exc() |
|
|
| all_model_results[model_id] = model_results |
| del model |
| torch.cuda.empty_cache() |
|
|
| |
| print("\n" + "#"*70) |
| print("# FINAL COMPARISON") |
| print("#"*70) |
|
|
| if len(all_model_results) >= 2: |
| model_ids = list(all_model_results.keys()) |
| names = [m.split("/")[-1] for m in model_ids] |
|
|
| print(f"\n{'Task':<25} {names[0]:>22} {names[1]:>22} {'Delta':>10}") |
| print("-" * 82) |
|
|
| for task in standard_tasks: |
| print(f"{task:<25}", end="") |
| scores = [] |
| for mid in model_ids: |
| td = all_model_results[mid].get("standard", {}).get(task, {}) |
| for m in ["acc_norm,none", "acc,none"]: |
| if m in td: |
| scores.append(td[m]) |
| print(f" {td[m]:>21.4f}", end="") |
| break |
| else: |
| scores.append(None) |
| print(f" {'N/A':>21}", end="") |
| if len(scores) >= 2 and all(s is not None for s in scores[:2]): |
| d = scores[1] - scores[0] |
| print(f" {'+' if d>0 else ''}{d:>9.4f}", end="") |
| print() |
|
|
| print(f"\n{'Security Task':<25} {names[0]:>22} {names[1]:>22} {'Delta':>10}") |
| print("-" * 82) |
| for st in ["secbench_english", "cybermetric"]: |
| print(f"{st:<25}", end="") |
| scores = [] |
| for mid in model_ids: |
| sd = all_model_results[mid].get("security", {}).get(st, {}) |
| if "acc" in sd: |
| scores.append(sd["acc"]) |
| print(f" {sd['acc']:>21.4f}", end="") |
| else: |
| scores.append(None) |
| print(f" {'N/A':>21}", end="") |
| if len(scores) >= 2 and all(s is not None for s in scores[:2]): |
| d = scores[1] - scores[0] |
| print(f" {'+' if d>0 else ''}{d:>9.4f}", end="") |
| print() |
|
|
| |
| results_path = "eval_results_full.json" |
| with open(results_path, "w") as f: |
| json.dump(all_model_results, f, indent=2, default=str) |
| print(f"\n[save] Results saved to {results_path}") |
|
|
| if args.upload and not args.no_upload: |
| api = HfApi() |
| for model_id in all_model_results: |
| try: |
| api.upload_file( |
| path_or_fileobj=results_path, |
| path_in_repo="eval_results.json", |
| repo_id=model_id, |
| ) |
| print(f"[upload] Results uploaded to {model_id}") |
| except Exception as e: |
| print(f"[upload] Failed for {model_id}: {e}") |
|
|
| print("\n[done] Evaluation complete!") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|