#!/usr/bin/env python3 """ Ultron Comprehensive Evaluation — Standard LM + Security Benchmarks Evaluates both the general pretrained model and the cybersecurity CPT model side-by-side on: 1. Standard LM benchmarks: HellaSwag, ARC-Easy, ARC-Challenge, PIQA, WinoGrande, BoolQ 2. Security benchmarks: MMLU computer_security, SecBench English MCQ, CyberMetric 3. Depth extrapolation: Same model at different loop counts (1, 2, 4, 8, 12, 16) All results are uploaded to the respective HF Hub model repos. Usage: # Full eval (both models, all benchmarks) python eval_ultron.py # Quick test (50 samples per task) python eval_ultron.py --limit 50 # Single model only python eval_ultron.py --models trojan0x/ultron-sec-cpt # Skip slow parts python eval_ultron.py --skip_depth --skip_security # Just security benchmarks python eval_ultron.py --skip_depth --limit 200 """ import os import sys import json import time import argparse import types import traceback import torch import torch.nn as nn import torch.nn.functional as F from huggingface_hub import hf_hub_download, snapshot_download, HfApi from transformers import GPT2TokenizerFast # ---- Setup Ultron model code ---- def setup_ultron(): """Download Ultron model code from Hub and add to path.""" repo_path = snapshot_download("trojan0x/ultron", allow_patterns=["ultron/*.py"]) sys.path.insert(0, repo_path) print(f"[setup] Ultron code loaded from: {repo_path}") return repo_path ULTRON_PATH = setup_ultron() from ultron.model import Ultron, UltronConfig # =========================================================================== # Model Loading # =========================================================================== def load_model(model_id, device="cuda"): """Load trained Ultron model from HF Hub.""" print(f"\n{'='*60}") print(f"Loading model: {model_id}") print(f"{'='*60}") # Determine checkpoint filename based on model repo name if "sec-cpt" in model_id or "sec_cpt" in model_id: ckpt_name = "ultron_sec_cpt_final.pt" elif "moe" in model_id: ckpt_name = "ultron_moe_final.pt" else: ckpt_name = "ultron_final.pt" ckpt_path = hf_hub_download(model_id, ckpt_name) ckpt = torch.load(ckpt_path, map_location="cpu", weights_only=False) # Reconstruct config from saved dict cfg_dict = ckpt["config"] cfg = UltronConfig(**cfg_dict) # Build model and load weights model = Ultron(cfg) model.load_state_dict(ckpt["model_state_dict"]) # float32 for stable eval — 89M fits easily on any GPU model = model.float().to(device) model.eval() step = ckpt.get("step", "unknown") tokens = ckpt.get("tokens_seen", "unknown") rho = model.get_spectral_radius() print(f" Checkpoint: {ckpt_name}") print(f" Params: {model.get_num_params(False):,} total, {model.get_num_params(True):,} non-embedding") print(f" Trained: {step} steps, {tokens} tokens") print(f" rho(A): {rho:.6f} {'OK' if rho < 1 else 'UNSTABLE!'}") print(f" Config: dim={cfg.dim}, heads={cfg.n_heads}, kv_heads={cfg.n_kv_heads}") print(f" Architecture: {cfg.prelude_layers}P + {cfg.recurrent_layers}R x {cfg.max_loop_iters}L + {cfg.coda_layers}C") print(f" Effective depth: {cfg.prelude_layers + cfg.recurrent_layers * cfg.max_loop_iters + cfg.coda_layers} layers") print(f" max_seq_len={cfg.max_seq_len}, vocab_size={cfg.vocab_size}") return model, cfg, {"step": step, "tokens_seen": tokens, "rho_A": rho} # =========================================================================== # HFLM-Compatible Wrapper # =========================================================================== class UltronHFWrapper(nn.Module): """Wraps Ultron to look like a HuggingFace CausalLM for lm-eval-harness. Fixes: 1. tie_weights() — HFLM calls this unconditionally 2. Left truncation — sequences > max_seq_len get trimmed from left 3. float32 — avoids bf16 softmax NaN in attention 4. config attributes — HFLM reads model_type, n_positions, etc. """ def __init__(self, model, cfg, n_loops=None): super().__init__() self.model = model self.n_loops = n_loops or cfg.max_loop_iters self.max_seq_len = cfg.max_seq_len # HFLM reads these attributes self.config = types.SimpleNamespace( model_type="gpt2", vocab_size=cfg.vocab_size, n_positions=cfg.max_seq_len, max_position_embeddings=cfg.max_seq_len, n_embd=cfg.dim, hidden_size=cfg.dim, is_encoder_decoder=False, pad_token_id=None, ) self.generation_config = types.SimpleNamespace( do_sample=False, temperature=1.0, ) def tie_weights(self): """No-op — HFLM calls this unconditionally during init.""" pass def forward(self, input_ids=None, attention_mask=None, **kwargs): """Forward pass with left-truncation safety.""" if input_ids.shape[1] > self.max_seq_len: input_ids = input_ids[:, -self.max_seq_len:] if attention_mask is not None: attention_mask = attention_mask[:, -self.max_seq_len:] logits = self.model(input_ids, n_loops=self.n_loops) return types.SimpleNamespace(logits=logits) def parameters(self): return self.model.parameters() def named_parameters(self, *args, **kwargs): return self.model.named_parameters(*args, **kwargs) def to(self, *args, **kwargs): self.model = self.model.to(*args, **kwargs) return self def eval(self): self.model.eval() return self def train(self, mode=True): self.model.train(mode) return self # =========================================================================== # Security Benchmarks: Direct MCQ Evaluation # =========================================================================== def eval_secbench(model, cfg, tokenizer, device, n_loops=None, limit=None): """Evaluate on SecBench English MCQs (log-likelihood over answer choices).""" from datasets import load_dataset print("\n[SecBench] Loading dataset...") ds = load_dataset("RISys-Lab/Benchmarks_CyberSec_SecBench", "MCQs_English", split="test") if limit: ds = ds.select(range(min(limit, len(ds)))) print(f"[SecBench] Evaluating {len(ds)} questions") n_loops = n_loops or cfg.max_loop_iters model.eval() correct = 0 total = 0 label_map = {"A": 0, "B": 1, "C": 2, "D": 3} for i, row in enumerate(ds): question = row["question"] answers = row["answers"] label = row["label"] gt_idx = label_map.get(label, -1) if gt_idx == -1: continue choices = ["A", "B", "C", "D"] log_probs = [] for j, ch in enumerate(choices): prompt = f"Question: {question}\nAnswer: {ch}. {answers[j]}" tokens = tokenizer.encode(prompt, return_tensors="pt").to(device) if tokens.shape[1] > cfg.max_seq_len: tokens = tokens[:, -cfg.max_seq_len:] with torch.no_grad(): logits = model(tokens, n_loops=n_loops) # Score: mean log-prob of the answer tokens answer_text = f" {ch}. {answers[j]}" answer_tokens = tokenizer.encode(answer_text) n_answer = len(answer_tokens) lp_all = F.log_softmax(logits[0, -(n_answer+1):-1, :], dim=-1) answer_ids = tokens[0, -n_answer:] lp = sum(lp_all[k, answer_ids[k]].item() for k in range(n_answer)) log_probs.append(lp / max(n_answer, 1)) pred = max(range(4), key=lambda k: log_probs[k]) if pred == gt_idx: correct += 1 total += 1 if (i + 1) % 100 == 0: print(f" [{i+1}/{len(ds)}] acc = {correct/total:.4f}") acc = correct / max(total, 1) print(f"[SecBench] Final: {correct}/{total} = {acc:.4f}") return {"acc": acc, "correct": correct, "total": total} def eval_cybermetric(model, cfg, tokenizer, device, n_loops=None, limit=None): """Evaluate on CyberMetric MCQs (nested JSON format).""" from datasets import load_dataset print("\n[CyberMetric] Loading dataset...") ds = load_dataset("tihanyin/CyberMetric", split="train") if limit: ds = ds.select(range(min(limit, len(ds)))) print(f"[CyberMetric] Evaluating {len(ds)} questions") n_loops = n_loops or cfg.max_loop_iters model.eval() correct = 0 total = 0 skipped = 0 label_map = {"A": 0, "B": 1, "C": 2, "D": 3} for i, row in enumerate(ds): q_data = row["questions"] question = q_data.get("question", "") answers_dict = q_data.get("answers", {}) gt_letter = q_data.get("correct_answer", q_data.get("answer", None)) if gt_letter is None: skipped += 1 continue gt_letter = str(gt_letter).strip().upper() gt_idx = label_map.get(gt_letter, -1) if gt_idx == -1: skipped += 1 continue choices = ["A", "B", "C", "D"] log_probs = [] for j, ch in enumerate(choices): ans_text = answers_dict.get(ch, "") prompt = f"Question: {question}\nAnswer: {ch}. {ans_text}" tokens = tokenizer.encode(prompt, return_tensors="pt").to(device) if tokens.shape[1] > cfg.max_seq_len: tokens = tokens[:, -cfg.max_seq_len:] with torch.no_grad(): logits = model(tokens, n_loops=n_loops) answer_text = f" {ch}. {ans_text}" answer_tokens = tokenizer.encode(answer_text) n_answer = len(answer_tokens) lp_all = F.log_softmax(logits[0, -(n_answer+1):-1, :], dim=-1) answer_ids = tokens[0, -n_answer:] lp = sum(lp_all[k, answer_ids[k]].item() for k in range(n_answer)) log_probs.append(lp / max(n_answer, 1)) pred = max(range(4), key=lambda k: log_probs[k]) if pred == gt_idx: correct += 1 total += 1 if (i + 1) % 500 == 0: print(f" [{i+1}/{len(ds)}] acc = {correct/total:.4f} (skipped {skipped})") acc = correct / max(total, 1) print(f"[CyberMetric] Final: {correct}/{total} = {acc:.4f} (skipped {skipped})") return {"acc": acc, "correct": correct, "total": total, "skipped": skipped} # =========================================================================== # Standard Evaluation via lm-eval-harness # =========================================================================== def evaluate_standard(model, cfg, tokenizer, tasks, device, n_loops=None, limit=None, batch_size=4): """Run lm-evaluation-harness benchmarks.""" import lm_eval from lm_eval.models.huggingface import HFLM wrapper = UltronHFWrapper(model, cfg, n_loops=n_loops) wrapper = wrapper.to(device).eval() lm = HFLM( pretrained=wrapper, tokenizer=tokenizer, max_length=cfg.max_seq_len, dtype="float32", batch_size=batch_size, device=str(device), trust_remote_code=False, ) eval_kwargs = { "model": lm, "tasks": tasks, "num_fewshot": 0, "log_samples": False, } if limit is not None: eval_kwargs["limit"] = limit print(f"\n[lm-eval] Tasks: {tasks}, n_loops={n_loops or cfg.max_loop_iters}, limit={limit}, bs={batch_size}") results = lm_eval.simple_evaluate(**eval_kwargs) return results["results"] # =========================================================================== # Depth Extrapolation # =========================================================================== def test_depth_extrapolation(model, cfg, tokenizer, device, limit=200, batch_size=4): """Test the same model at different loop depths — Ultron's key feature.""" loop_counts = [1, 2, 4, cfg.max_loop_iters, 12, 16] tasks = ["hellaswag", "arc_easy", "piqa"] print(f"\n{'='*60}") print("DEPTH EXTRAPOLATION TEST") print(f"{'='*60}") print(f"Loop counts: {loop_counts}") print(f"Tasks: {tasks}, limit={limit}") all_results = {} for n_loops in loop_counts: print(f"\n--- n_loops = {n_loops} ---") results = evaluate_standard( model, cfg, tokenizer, tasks, device, n_loops=n_loops, limit=limit, batch_size=batch_size ) all_results[n_loops] = results for task, scores in results.items(): for m in ["acc_norm,none", "acc,none"]: if m in scores: print(f" {task}: {scores[m]:.4f}") break return all_results # =========================================================================== # Formatting # =========================================================================== def format_results_table(results, label=""): lines = [] if label: lines.append(f"\n## {label}\n") lines.append(f"| {'Task':<25} | {'Metric':<15} | {'Score':>8} |") lines.append(f"|{'-'*27}|{'-'*17}|{'-'*10}|") for task, scores in sorted(results.items()): for metric in ["acc_norm,none", "acc,none"]: if metric in scores: val = scores[metric] lines.append(f"| {task:<25} | {metric.replace(',none',''):<15} | {val:>8.4f} |") break return "\n".join(lines) def format_depth_table(all_results, tasks): lines = ["\n## Depth Extrapolation\n"] header = f"| {'n_loops':<10} |" for t in tasks: header += f" {t:<15} |" lines.append(header) lines.append("|" + "-"*12 + "|" + (("-"*17 + "|") * len(tasks))) for n_loops, results in sorted(all_results.items()): row = f"| {n_loops:<10} |" for t in tasks: if t in results: for m in ["acc_norm,none", "acc,none"]: if m in results[t]: row += f" {results[t][m]:<15.4f} |" break else: row += f" {'N/A':<15} |" else: row += f" {'N/A':<15} |" lines.append(row) return "\n".join(lines) # =========================================================================== # Main # =========================================================================== def main(): parser = argparse.ArgumentParser(description="Ultron Comprehensive Evaluation") parser.add_argument("--models", type=str, nargs="+", default=["trojan0x/ultron-small-baseline", "trojan0x/ultron-sec-cpt"], help="HF model IDs to evaluate") parser.add_argument("--limit", type=int, default=None, help="Limit samples per task (None = full eval)") parser.add_argument("--batch_size", type=int, default=4, help="Eval batch size (lower if OOM)") parser.add_argument("--skip_security", action="store_true", help="Skip SecBench + CyberMetric") parser.add_argument("--skip_depth", action="store_true", help="Skip depth extrapolation test") parser.add_argument("--upload", action="store_true", default=True, help="Upload results to HF Hub") parser.add_argument("--no_upload", action="store_true", help="Disable upload to HF Hub") args = parser.parse_args() if args.no_upload: args.upload = False device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"[main] Device: {device}") if device.type == "cuda": print(f" GPU: {torch.cuda.get_device_name()}") mem_gb = torch.cuda.get_device_properties(0).total_mem / 1e9 print(f" VRAM: {mem_gb:.1f} GB") else: print(" WARNING: Running on CPU — will be very slow!") # Tokenizer (GPT-2, shared by all Ultron models) tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") tokenizer.pad_token = tokenizer.eos_token tokenizer.padding_side = "left" standard_tasks = ["hellaswag", "arc_easy", "arc_challenge", "piqa", "winogrande", "boolq"] mmlu_tasks = ["mmlu_computer_security"] all_model_results = {} for model_id in args.models: print(f"\n{'#'*70}") print(f"# EVALUATING: {model_id}") print(f"{'#'*70}") try: model, cfg, meta = load_model(model_id, device) except Exception as e: print(f"[ERROR] Failed to load {model_id}: {e}") traceback.print_exc() continue model_results = {"meta": meta, "standard": {}, "security": {}, "depth": {}} # ---- Phase 1: Standard LM Benchmarks (0-shot) ---- print("\n" + "="*60) print("PHASE 1: Standard LM Benchmarks (0-shot)") print("="*60) try: std_results = evaluate_standard( model, cfg, tokenizer, standard_tasks, device, limit=args.limit, batch_size=args.batch_size ) model_results["standard"] = std_results print(format_results_table(std_results, f"Standard — {model_id}")) except Exception as e: print(f"[ERROR] Standard eval failed: {e}") traceback.print_exc() # ---- Phase 2: MMLU Computer Security (5-shot) ---- print("\n" + "="*60) print("PHASE 2: MMLU Computer Security (5-shot)") print("="*60) try: import lm_eval from lm_eval.models.huggingface import HFLM wrapper = UltronHFWrapper(model, cfg) wrapper = wrapper.to(device).eval() lm = HFLM( pretrained=wrapper, tokenizer=tokenizer, max_length=cfg.max_seq_len, dtype="float32", batch_size=args.batch_size, device=str(device), ) mmlu_results = lm_eval.simple_evaluate( model=lm, tasks=mmlu_tasks, num_fewshot=5, log_samples=False, limit=args.limit, )["results"] model_results["security"]["mmlu_computer_security"] = mmlu_results print(format_results_table(mmlu_results, "MMLU Computer Security")) except Exception as e: print(f"[ERROR] MMLU eval failed: {e}") traceback.print_exc() # ---- Phase 3: SecBench + CyberMetric ---- if not args.skip_security: print("\n" + "="*60) print("PHASE 3: Security Benchmarks (Direct MCQ)") print("="*60) try: secbench = eval_secbench(model, cfg, tokenizer, device, limit=args.limit) model_results["security"]["secbench_english"] = secbench except Exception as e: print(f"[ERROR] SecBench failed: {e}") traceback.print_exc() try: cm_limit = args.limit if args.limit else 2000 cybermetric = eval_cybermetric(model, cfg, tokenizer, device, limit=cm_limit) model_results["security"]["cybermetric"] = cybermetric except Exception as e: print(f"[ERROR] CyberMetric failed: {e}") traceback.print_exc() # ---- Phase 4: Depth Extrapolation ---- if not args.skip_depth: print("\n" + "="*60) print("PHASE 4: Depth Extrapolation") print("="*60) try: depth_limit = args.limit if args.limit else 200 depth_results = test_depth_extrapolation( model, cfg, tokenizer, device, limit=depth_limit, batch_size=args.batch_size ) model_results["depth"] = {str(k): v for k, v in depth_results.items()} print(format_depth_table(depth_results, ["hellaswag", "arc_easy", "piqa"])) except Exception as e: print(f"[ERROR] Depth extrapolation failed: {e}") traceback.print_exc() all_model_results[model_id] = model_results del model torch.cuda.empty_cache() # ---- Final Comparison ---- print("\n" + "#"*70) print("# FINAL COMPARISON") print("#"*70) if len(all_model_results) >= 2: model_ids = list(all_model_results.keys()) names = [m.split("/")[-1] for m in model_ids] print(f"\n{'Task':<25} {names[0]:>22} {names[1]:>22} {'Delta':>10}") print("-" * 82) for task in standard_tasks: print(f"{task:<25}", end="") scores = [] for mid in model_ids: td = all_model_results[mid].get("standard", {}).get(task, {}) for m in ["acc_norm,none", "acc,none"]: if m in td: scores.append(td[m]) print(f" {td[m]:>21.4f}", end="") break else: scores.append(None) print(f" {'N/A':>21}", end="") if len(scores) >= 2 and all(s is not None for s in scores[:2]): d = scores[1] - scores[0] print(f" {'+' if d>0 else ''}{d:>9.4f}", end="") print() print(f"\n{'Security Task':<25} {names[0]:>22} {names[1]:>22} {'Delta':>10}") print("-" * 82) for st in ["secbench_english", "cybermetric"]: print(f"{st:<25}", end="") scores = [] for mid in model_ids: sd = all_model_results[mid].get("security", {}).get(st, {}) if "acc" in sd: scores.append(sd["acc"]) print(f" {sd['acc']:>21.4f}", end="") else: scores.append(None) print(f" {'N/A':>21}", end="") if len(scores) >= 2 and all(s is not None for s in scores[:2]): d = scores[1] - scores[0] print(f" {'+' if d>0 else ''}{d:>9.4f}", end="") print() # Save results_path = "eval_results_full.json" with open(results_path, "w") as f: json.dump(all_model_results, f, indent=2, default=str) print(f"\n[save] Results saved to {results_path}") if args.upload and not args.no_upload: api = HfApi() for model_id in all_model_results: try: api.upload_file( path_or_fileobj=results_path, path_in_repo="eval_results.json", repo_id=model_id, ) print(f"[upload] Results uploaded to {model_id}") except Exception as e: print(f"[upload] Failed for {model_id}: {e}") print("\n[done] Evaluation complete!") if __name__ == "__main__": main()