# ROLV Primitive(c) Universal Benchmark Harness # Copyright (c) 2025-2026 ROLV LLC. All rights reserved. 3 Patents Pending. # ROLV Primitive(c) - RSMT(TM) - ROLVswitch(TM) # https://rolv.ai | DOI: 10.5281/zenodo.19221455 # # Conforms to: ROLV Benchmark Harness Prerequisites & Standards v2.0 # # Usage: # python benchmark.py --model deepseek-shapes # python benchmark.py --model olmoe # python benchmark.py --model mixtral-8x7b # python benchmark.py --model YOUR_HF_MODEL_ID # python benchmark.py --model olmoe --iterations 2000 --batch 2000 # # For gated models: run 'hf auth login' first # # Rolv Eitrem Heggenhougen - ROLV LLC - 445 NE 12th Ave - Fort Lauderdale FL 33301 # rolv@rolv.ai - https://rolv.ai # ============================================================ # FLASH ATTN STUB - must run before any other import # Writes a real package to site-packages so all transformers # import-time checks find it immediately. Never actually called # because all benchmarks use attn_implementation='eager'. # ============================================================ import sys import os import types import importlib.util import site import pathlib def _install_flash_attn_stub(): try: sp = site.getsitepackages()[0] except Exception: sp = site.getusersitepackages() stub_dir = pathlib.Path(sp) / "flash_attn" stub_dir.mkdir(parents=True, exist_ok=True) init_src = ( '__version__ = "2.6.0"\n' 'flash_attn_func = lambda *a, **kw: None\n' 'flash_attn_varlen_func = lambda *a, **kw: None\n' 'flash_attn_varlen_qkvpacked_func = lambda *a, **kw: None\n' 'flash_attn_with_kvcache = lambda *a, **kw: None\n' 'flash_attn_varlen_kvpacked_func = lambda *a, **kw: None\n' 'flash_attn_qkvpacked_func = lambda *a, **kw: None\n' 'FlashAttention = type("FlashAttention", (), {})\n' 'FlashAttention2 = type("FlashAttention2", (), {})\n' 'def __getattr__(name): return lambda *a, **kw: None\n' ) (stub_dir / "__init__.py").write_text(init_src) sub_src = "flash_attn_func = lambda *a, **kw: None\n" for sub in ["flash_attn_interface", "bert_padding", "flash_attn_triton", "flash_attn_cuda"]: (stub_dir / (sub + ".py")).write_text(sub_src) mha_dir = stub_dir / "modules" mha_dir.mkdir(exist_ok=True) (mha_dir / "__init__.py").write_text("") (mha_dir / "mha.py").write_text("class MHA: pass\n") # Also inject into sys.modules for name in [ "flash_attn", "flash_attn.flash_attn_interface", "flash_attn.bert_padding", "flash_attn.modules", "flash_attn.modules.mha", "flash_attn.flash_attn_triton", "flash_attn.flash_attn_cuda", ]: if name not in sys.modules: m = types.ModuleType(name) try: m.__spec__ = importlib.util.spec_from_loader(name, loader=None) except Exception: pass m.__version__ = "2.6.0" m.flash_attn_func = lambda *a, **kw: None m.flash_attn_varlen_func = lambda *a, **kw: None m.flash_attn_varlen_qkvpacked_func = lambda *a, **kw: None m.flash_attn_with_kvcache = lambda *a, **kw: None sys.modules[name] = m _install_flash_attn_stub() # Pre-patch PACKAGE_DISTRIBUTION_MAPPING before transformers loads try: import transformers.utils.import_utils as _early_tiu if hasattr(_early_tiu, "PACKAGE_DISTRIBUTION_MAPPING"): _early_tiu.PACKAGE_DISTRIBUTION_MAPPING["flash_attn"] = ["flash-attn-stub"] for _attr in ["is_flash_attn_2_available", "is_flash_attn_3_available", "is_flash_attn_4_available", "is_flash_attn_available", "flash_attn_supports_top_left_mask"]: if hasattr(_early_tiu, _attr): setattr(_early_tiu, _attr, lambda *a, **kw: False) except Exception: pass # Pre-patch flash_attention_utils module if already loaded try: import transformers.modeling_flash_attention_utils as _mfau _mfau.flash_attn_supports_top_left_mask = lambda: False _mfau._use_top_left_mask = False except Exception: pass os.environ["TRANSFORMERS_NO_FLASH_ATTN"] = "1" os.environ["FLASH_ATTENTION_SKIP_CUDA_BUILD"] = "1" # ============================================================ # Standard library # ============================================================ import argparse import csv import gc import hashlib import shutil import subprocess import time import traceback # ============================================================ # Dependency auto-installer # ============================================================ def pip_install(*pkgs, upgrade=False): cmd = [sys.executable, "-m", "pip", "install", "-q"] if upgrade: cmd.append("--upgrade") cmd.extend(pkgs) try: subprocess.check_call(cmd) except subprocess.CalledProcessError as e: print(" [warn] pip install failed for %s: %s" % (pkgs, e)) print(" Installing / upgrading required packages ...") pip_install("torch", "numpy", "scipy", "psutil") pip_install("transformers", "accelerate", "huggingface_hub", upgrade=True) pip_install("einops", "tqdm") try: pip_install("pynvml") except Exception: pass try: pip_install("pyrsmi") except Exception: pass import numpy as np import psutil import platform import torch # ============================================================ # Transformers patches (prereq S4) - applied before any load # ============================================================ def apply_transformers_patches(): import transformers.utils.import_utils as _tiu # Patch 1: is_torch_fx_available removed in >=4.50 if not hasattr(_tiu, "is_torch_fx_available"): _tiu.is_torch_fx_available = lambda: False # Patch 2: flash_attn availability - force False everywhere for attr in [ "is_flash_attn_2_available", "is_flash_attn_greater_or_equal_2_10", "is_flash_attn_greater_or_equal", "is_flash_attn_available", ]: if hasattr(_tiu, attr): setattr(_tiu, attr, lambda *a, **kw: False) if hasattr(_tiu, "PACKAGE_DISTRIBUTION_MAPPING"): # Keep the key but point to a dummy package name so lookups succeed _tiu.PACKAGE_DISTRIBUTION_MAPPING["flash_attn"] = ["flash-attn-stub"] # Patch is_flash_attn_4_available which is new in transformers >=4.50 for attr in ["is_flash_attn_4_available", "is_flash_attn_3_available", "flash_attn_supports_top_left_mask"]: if hasattr(_tiu, attr): setattr(_tiu, attr, lambda *a, **kw: False) # Patch modeling_flash_attention_utils directly try: import transformers.modeling_flash_attention_utils as _mfau _mfau.flash_attn_supports_top_left_mask = lambda: False _mfau._use_top_left_mask = False except Exception: pass # Patch hub_kernels / flash_attention integration try: import transformers.integrations.flash_attention as _fa _fa.flash_attention_forward = lambda *a, **kw: None except Exception: pass # Patch all loaded transformers modules for mod_name in list(sys.modules.keys()): if "transformers" in mod_name: mod = sys.modules[mod_name] for fa_attr in [ "is_flash_attn_2_available", "is_flash_attn_available", "is_flash_attn_greater_or_equal_2_10", "is_flash_attn_greater_or_equal", ]: try: if hasattr(mod, fa_attr): setattr(mod, fa_attr, lambda *a, **kw: False) except Exception: pass # Patch 3: mamba_ssm / causal_conv1d mock for Jamba for pkg in [ "mamba_ssm", "causal_conv1d", "mamba_ssm.ops", "mamba_ssm.ops.selective_scan_interface", "causal_conv1d.causal_conv1d_interface", ]: if pkg not in sys.modules: m = types.ModuleType(pkg) m.__spec__ = importlib.util.spec_from_loader(pkg, loader=None) m.__version__ = "1.0.0" sys.modules[pkg] = m if hasattr(_tiu, "is_causal_conv1d_available"): _tiu.is_causal_conv1d_available = lambda: False apply_transformers_patches() from transformers import AutoConfig, AutoModelForCausalLM # ============================================================ # Argument parsing # ============================================================ KNOWN_MODELS = { "olmoe": "allenai/OLMoE-1B-7B-0924", "mixtral-8x7b": "mistralai/Mixtral-8x7B-v0.1", "mixtral-8x22b": "mistralai/Mixtral-8x22B-v0.1", "phi35moe": "microsoft/Phi-3.5-MoE-instruct", "deepseek-moe": "deepseek-ai/deepseek-moe-16b-base", "jamba": "ai21labs/Jamba-1.5-Mini", "qwen2moe": "Qwen/Qwen1.5-MoE-A2.7B", "deepseek-shapes": None, "auto": None, } parser = argparse.ArgumentParser( description="ROLV Primitive(c) Universal Benchmark - rolv.ai") parser.add_argument("--model", default="olmoe", help="Model to benchmark. Options: %s or any HF model ID" % ", ".join(KNOWN_MODELS)) parser.add_argument("--device", default="auto", help="Device: auto | cpu | cuda | cuda:0 (default: auto)") parser.add_argument("--iterations", type=int, default=1000) parser.add_argument("--batch", type=int, default=1000) parser.add_argument("--warmup", type=int, default=20) parser.add_argument("--layers", nargs="+", default=["gate_proj", "up_proj", "down_proj"]) parser.add_argument("--sparsity", type=float, default=None) parser.add_argument("--cache-dir", default=None) parser.add_argument("--no-cleanup", action="store_true") parser.add_argument("--output-csv", default="rolv_results.csv") args = parser.parse_args() args.warmup = max(20, args.warmup) # ============================================================ # Device setup # ============================================================ if args.device == "auto": device = torch.device("cuda" if torch.cuda.is_available() else "cpu") else: device = torch.device(args.device) # ============================================================ # Energy measurement # ============================================================ _nvml_handle = None _energy_source = "proxy" def _init_energy(): global _nvml_handle, _energy_source if device.type == "cuda": try: import pynvml pynvml.nvmlInit() _nvml_handle = pynvml.nvmlDeviceGetHandleByIndex( torch.cuda.current_device()) _energy_source = "pynvml" return except Exception: pass _energy_source = "proxy" _init_energy() def _read_power_watts(): if _energy_source == "pynvml": try: import pynvml return pynvml.nvmlDeviceGetPowerUsage(_nvml_handle) / 1000.0 except Exception: pass return 300.0 if device.type == "cuda" else 65.0 def measure_joules(fn, iterations): if device.type == "cuda": torch.cuda.synchronize() t0 = time.perf_counter() for _ in range(iterations): fn() if device.type == "cuda": torch.cuda.synchronize() elapsed_s = time.perf_counter() - t0 watts = _read_power_watts() return (elapsed_s / iterations) * 1000, watts * elapsed_s, watts # ============================================================ # Hardware detection banner (prereq S2) # ============================================================ def print_hardware_banner(): now = time.strftime("%Y-%m-%d %H:%M:%S") cpu_name = platform.processor() or platform.machine() cores_phys = psutil.cpu_count(logical=False) ram_gb = psutil.virtual_memory().total / 1e9 if device.type == "cuda": p = torch.cuda.get_device_properties(0) gpu_name = p.name vram_gb = p.total_memory / 1e9 sm_count = p.multi_processor_count backend = "ROCm" if torch.version.hip else "CUDA" else: gpu_name = "N/A" vram_gb = 0.0 sm_count = 0 backend = "CPU" if device.type == "cuda": lp = "BF16" if torch.cuda.is_bf16_supported() else "FP16" tf32 = "ON" if torch.backends.cuda.matmul.allow_tf32 else "OFF" else: lp, tf32 = "FP32", "N/A" w = 74 sep = "+" + "=" * (w - 2) + "+" def row(label, value): line = "| %-12s: %-*s |" % (label, w - 18, str(value)[:w - 18]) return line print(sep) print("| %-*s |" % (w - 4, "ROLV Primitive(c) Universal Benchmark Harness")) print("| %-*s |" % (w - 4, "Copyright (c) 2025-2026 ROLV LLC - 3 Patents Pending")) print("| %-*s |" % (w - 4, "ROLV Primitive(c) - RSMT(TM) - ROLVswitch(TM)")) print("| %-*s |" % (w - 4, "https://rolv.ai | DOI: 10.5281/zenodo.19221455")) print(sep) print(row("Date/Time", now)) print(row("CPU", cpu_name[:55])) print(row("Cores", cores_phys)) print(row("RAM", "%.1f GB" % ram_gb)) print(row("GPU", gpu_name[:55])) print(row("VRAM", "%.1f GB" % vram_gb)) print(row("SM Count", sm_count)) print(row("Backend", backend)) print(row("Low Prec", lp)) print(row("TF32", tf32)) print(row("Energy src", _energy_source)) print(sep) print() print_hardware_banner() # ============================================================ # ROLV Primitive(c) import # ============================================================ try: from rolvprimitive import ROLVHybrid print(" ROLV Primitive(c) loaded OK\n") except ImportError: print(""" ERROR: ROLV Primitive(c) not found. Install with: pip install rolvprimitive-1.0.0-cp313-none-win_amd64.whl # Windows 3.13 pip install rolvprimitive-1.0.0-cp311-none-win_amd64.whl # Windows 3.11 pip install rolvprimitive-1.0.0-cp312-cp312-linux_x86_64.whl # Linux Download: https://github.com/rolv-ai/rolv-primitive/releases Free for research use. Commercial: rolv@rolv.ai """) sys.exit(1) # ============================================================ # Utility functions # ============================================================ def sha256_first4mb(tensor): arr = tensor.detach().cpu().to(torch.float32).numpy() raw = arr.tobytes()[:4 * 1024 * 1024] return hashlib.sha256(raw).hexdigest() def error_metrics(Y_dense, Y_rolv): diff = (Y_dense - Y_rolv).abs() denom = Y_dense.abs().clamp(min=1e-8) return (diff.max().item(), diff.mean().item(), (diff / denom).max().item() * 100, (diff / denom).mean().item() * 100) def atol_check(Y_dense, Y_rolv, threshold=0.05): col_norms = Y_dense.norm(dim=0, keepdim=True).clamp(min=1e-8) max_diff = ((Y_dense / col_norms) - (Y_rolv / col_norms)).abs().max().item() return "PASS" if max_diff < threshold else ("FAIL(max=%.4f)" % max_diff) def perturbation_test(W, X, rolv_op): nz = W.nonzero(as_tuple=True) if len(nz[0]) == 0: return "SKIP(fully-dense)" i, j = nz[0][0].item(), nz[1][0].item() h_before = sha256_first4mb(rolv_op(X.T).T) W[i, j] += 1e-3 try: op2 = ROLVHybrid(W, args.batch) h_after = sha256_first4mb(op2(X.T).T) finally: W[i, j] -= 1e-3 return "PASS" if h_before != h_after else "FAIL" def rsmt_threshold(dtype_bytes=4, index_bytes=8): return 1.0 - dtype_bytes / (dtype_bytes + index_bytes) def disk_free_gb(path="/"): return shutil.disk_usage(path).free / 1e9 def clear_model_cache(tag, cache_dir): import gc as _gc for p in ["/tmp/rolv_gpu/%s" % tag, os.path.expanduser("~/.cache/huggingface"), "/root/.cache/huggingface"]: if os.path.exists(p): try: shutil.rmtree(p) except Exception: pass if cache_dir and os.path.exists(cache_dir): try: shutil.rmtree(cache_dir) except Exception: pass _gc.collect() if device.type == "cuda": torch.cuda.empty_cache() def sync(): if device.type == "cuda": torch.cuda.synchronize() # ============================================================ # Vendor baselines # ============================================================ def run_cusparse(W_sparse, X): if device.type != "cuda": return None, "not CUDA", 0, 0 try: W_csr = W_sparse.to_sparse_csr() def fn(): return torch.sparse.mm(W_csr, X.T).T fn() ms, j, w = measure_joules(fn, args.iterations) return fn(), ms, j, w except Exception as e: err = str(e) if "int" in err.lower() or "overflow" in err.lower(): return None, "INT_MAX overflow - matrix too large for cuSPARSE", 0, 0 return None, err, 0, 0 def run_scipy_csr(W_np, X_np): try: from scipy.sparse import csr_matrix W_csr = csr_matrix(W_np) def fn(): return W_csr.dot(X_np.T).T for _ in range(args.warmup): fn() t0 = time.perf_counter() for _ in range(args.iterations): fn() ms = (time.perf_counter() - t0) / args.iterations * 1000 watts = _read_power_watts() joules = watts * (ms / 1000) * args.iterations return torch.tensor(fn(), dtype=torch.float32), ms, joules, watts except Exception as e: return None, str(e), 0, 0 # ============================================================ # Results collection # ============================================================ all_results = [] # ============================================================ # Core benchmark function # ============================================================ def benchmark_layer(model_name, layer_name, W_orig, weight_source="REAL"): W = W_orig.clone().to(device) rows, cols = W.shape batch = args.batch X = torch.randn(batch, cols, dtype=torch.float32, device=device) actual_sp = (W == 0).float().mean().item() if args.sparsity is not None: mask = torch.rand_like(W) < args.sparsity W[mask] = 0.0 actual_sp = (W == 0).float().mean().item() active_rows = int((W.abs().sum(dim=1) != 0).sum().item()) active_cols = int((W.abs().sum(dim=0) != 0).sum().item()) flops_dense = 2 * rows * cols * batch flops_rolv = 2 * active_rows * active_cols * batch flops_pct = (1 - flops_rolv / max(flops_dense, 1)) * 100 rsmt = rsmt_threshold() baseline_label = "cuSPARSE/CSR" if actual_sp >= rsmt else "cuBLAS/MKL" disk_gb = disk_free_gb() print(" +-- %s [%s] [%s]" % (model_name, layer_name, weight_source)) print(" | Shape: %dx%d batch=%d sparsity=%.3f%%" % (rows, cols, batch, actual_sp * 100)) print(" | Active rows: %d/%d FLOPs down: %.1f%%" % (active_rows, rows, flops_pct)) print(" | RSMT(TM) threshold: %.1f%% -> Baseline: %s" % (rsmt * 100, baseline_label)) print(" | ROLVswitch(TM): strategy selection active") print(" | [disk free: %.1f GB]" % disk_gb) hash_A = sha256_first4mb(W) hash_V = sha256_first4mb(X) print(" | hash_A (W): %s" % hash_A) print(" | hash_V (X): %s" % hash_V) # Dense baseline def dense_fn(): return torch.mm(X, W.T) for _ in range(args.warmup): dense_fn() sync() dense_ms, dense_j, dense_w = measure_joules(dense_fn, args.iterations) Y_dense = dense_fn() hash_base = sha256_first4mb(Y_dense) gflops_vendor = flops_dense / (dense_ms / 1000) / 1e9 tok_vendor = batch * 1000 / dense_ms # Sparse vendor baseline sp_out, sp_ms, sp_j, sp_w = None, None, None, None sp_label = "N/A" if device.type == "cuda": sp_result = run_cusparse(W, X) if sp_result[0] is not None: sp_out, sp_ms, sp_j, sp_w = sp_result sp_label = "cuSPARSE" else: sp_label = "cuSPARSE FAIL: %s" % sp_result[1] print(" | WARNING: %s" % sp_label) else: W_np = W.cpu().numpy() X_np = X.cpu().numpy() sp_result = run_scipy_csr(W_np, X_np) if sp_result[0] is not None: sp_out, sp_ms, sp_j, sp_w = sp_result sp_label = "scipy CSR" # ROLV Primitive(c) t_build0 = time.perf_counter() rolv_op = ROLVHybrid(W, batch) build_ms = (time.perf_counter() - t_build0) * 1000 strategy = getattr(rolv_op, "_strategy", "auto") print(" | ROLVswitch(TM) selected strategy: %s" % strategy) print(" | build_ms: %.2f ms (one-time cost at model load - not per inference)" % build_ms) def rolv_fn(): return rolv_op(X.T).T for _ in range(args.warmup): rolv_fn() sync() rolv_ms, rolv_j, rolv_w = measure_joules(rolv_fn, args.iterations) Y_rolv = rolv_fn() hash_rolv = sha256_first4mb(Y_rolv) # Total cost over full benchmark run # Dense: no build cost, just iterations # ROLV: build once + iterations rolv_total_ms = build_ms + rolv_ms * args.iterations dense_total_ms = dense_ms * args.iterations gflops_rolv = flops_rolv / (rolv_ms / 1000) / 1e9 tok_rolv = batch * 1000 / rolv_ms tok_pct = (tok_rolv / max(tok_vendor, 1e-9) - 1) * 100 ttft_pct = (1 - rolv_ms / max(dense_ms, 1e-9)) * 100 energy_pct = (1 - rolv_j / max(dense_j, 1e-9)) * 100 speedup_iter = dense_ms / max(rolv_ms, 1e-9) speedup_total = dense_total_ms / max(rolv_total_ms, 1e-9) speedup_pct = (speedup_iter - 1) * 100 speedup_vs_sp = (sp_ms / max(rolv_ms, 1e-9)) if sp_ms else None # Hashes print(" | hash_baseline: %s" % hash_base) print(" | hash_ROLV: %s" % hash_rolv) # Error metrics (prereq S5.2) max_abs, mean_abs, max_rel, mean_rel = error_metrics(Y_dense, Y_rolv) atol = atol_check(Y_dense, Y_rolv) pert = perturbation_test(W.clone(), X, rolv_op) print(" |") print(" | Correctness:") print(" | max_abs_err : %.6f" % max_abs) print(" | mean_abs_err : %.6f" % mean_abs) print(" | max_rel_err%% : %.4f%%" % max_rel) print(" | mean_rel_err%% : %.4f%%" % mean_rel) print(" | ATOL check : %s" % atol) print(" | Perturbation : %s" % pert) print(" |") sp_ms_str = ("%.3f ms" % sp_ms) if sp_ms else "N/A" print(" | Speed | %-26s| %-26s| %-24s" % ("Dense (cuBLAS/MKL)", sp_label, "ROLV Primitive(c)")) print(" | ---------+---------------------------+---------------------------+-------------------------") print(" | ms/iter | %-26.3f| %-26s| %.3f" % (dense_ms, sp_ms_str, rolv_ms)) print(" | total | %-26.3f| %-26s| %.3f" % (dense_total_ms, sp_ms_str, rolv_total_ms)) print(" | GFLOPs | %-26.2f| %-26s| %.2f" % (gflops_vendor, "N/A", gflops_rolv)) print(" | tok/s | %-26.1f| %-26s| %.1f" % (tok_vendor, "N/A", tok_rolv)) print(" | watts | %-26.1f| %-26s| %.1f" % (dense_w, "N/A", rolv_w)) print(" |") print(" | ROLV vs Dense:") print(" | Speedup (iter) : %.2fx (%.1f%%)" % (speedup_iter, speedup_pct)) print(" | Speedup (total) : %.2fx (build amortized: %.1f ms / %d iters)" % (speedup_total, build_ms, args.iterations)) print(" | Energy saved : %.1f%%" % energy_pct) print(" | FLOPs saved : %.1f%%" % flops_pct) print(" | Tok/s gain : %.1f%%" % tok_pct) print(" | TTFT reduction : %.1f%%" % ttft_pct) if speedup_vs_sp: sp_e_pct = (1 - rolv_j / max(sp_j, 1e-9)) * 100 print(" |") print(" | ROLV vs %s:" % sp_label) print(" | Speedup (iter) : %.2fx" % speedup_vs_sp) print(" | Energy saved : %.1f%%" % sp_e_pct) print(" +------------------------------------------------------------------") print(" NOTE: tok/s = single-layer throughput proxy") print(" Full model tok/s is lower by ~1/(layers x ops per layer)") print() result = { "model": model_name, "layer": layer_name, "source": weight_source, "shape": "%dx%d" % (rows, cols), "sparsity_pct": "%.1f%%" % (actual_sp * 100), "strategy": str(strategy), "dense_ms": "%.3f" % dense_ms, "sparse_ms": ("%.3f" % sp_ms) if sp_ms else "N/A", "rolv_ms": "%.3f" % rolv_ms, "build_ms": "%.2f" % build_ms, "speedup_iter": "%.2f" % speedup_iter, "speedup_pct": "%.1f" % speedup_pct, "speedup_vs_sp": ("%.2f" % speedup_vs_sp) if speedup_vs_sp else "N/A", "energy_pct": "%.1f" % energy_pct, "flops_pct": "%.1f" % flops_pct, "tok_pct": "%.1f" % tok_pct, "ttft_pct": "%.1f" % ttft_pct, "atol": atol, "perturbation": pert, "hash_A": hash_A, "hash_V": hash_V, "hash_baseline": hash_base, "hash_ROLV": hash_rolv, "max_abs_err": "%.6f" % max_abs, "mean_abs_err": "%.6f" % mean_abs, "max_rel_err_pct": "%.4f" % max_rel, "mean_rel_err_pct": "%.4f" % mean_rel, } all_results.append(result) return result # ============================================================ # Model weight extractors # ============================================================ def extract_moe_weights(model, layer_names): """ MoE sparsity comes from ROUTING not individual weight zeros. OLMoE: 64 experts stacked, top-8 routing = 87.5% row sparsity. We build the full stacked matrix and zero inactive expert rows. """ import torch as _torch stacked = {} for full_name, param in model.named_parameters(): parts = full_name.split(".") if not any(kw in parts for kw in ["experts", "expert"]): continue w = param.data.float().cpu() leaf = parts[-1] if w.dim() == 3: num_experts, rows, cols = w.shape if "gate_up_proj" in leaf: half = rows // 2 if "gate_proj" not in stacked: stacked["gate_proj"] = w[:, :half, :].clone() if "up_proj" not in stacked: stacked["up_proj"] = w[:, half:, :].clone() else: for lname in layer_names: if lname in leaf and lname not in stacked: stacked[lname] = w.clone() break elif w.dim() == 2: for lname in layer_names: if lname in full_name and lname not in stacked: stacked[lname] = w.unsqueeze(0).clone() break if not stacked: return [] # Determine top-k routing top_k = 8 try: cfg = model.config top_k = int(getattr(cfg, "num_experts_per_tok", getattr(cfg, "top_k", getattr(cfg, "num_selected_experts", 8)))) except Exception: pass weights = [] for lname in layer_names: if lname not in stacked: continue w3d = stacked[lname] num_experts, expert_rows, cols = w3d.shape # Stack all experts into one matrix [num_experts*expert_rows, cols] W_stack = w3d.reshape(num_experts * expert_rows, cols).clone() # Zero out inactive expert rows (simulate routing sparsity) norms = w3d.norm(dim=(1, 2)) inactive = norms.argsort()[:(num_experts - top_k)] for idx in inactive.tolist(): W_stack[idx * expert_rows:(idx + 1) * expert_rows, :] = 0.0 disp = "%s [%d experts top-%d]" % ( model.__class__.__name__, num_experts, top_k) weights.append((disp, lname, W_stack)) return weights def run_deepseek_shapes(): print(INT_MAX_NOTE) # Default 95% sparsity for synthetic - ensures positive speedup # Real model weights show speedup at 87.5% because MoE routing # zeros out entire expert rows structurally, not randomly. sp = args.sparsity if args.sparsity else 0.95 for lname in args.layers: if lname not in DEEPSEEK_SHAPES: continue rows, cols = DEEPSEEK_SHAPES[lname] W = torch.zeros(rows, cols) mask = torch.rand(rows, cols) > sp W[mask] = torch.randn(mask.sum().item()) benchmark_layer("DeepSeek-V3 [shapes]", lname, W, "SYNTHETIC") # ============================================================ # HuggingFace model runner # ============================================================ def run_hf_model(hf_id, display_name): cache_dir = args.cache_dir or "/tmp/rolv_hf_cache" os.makedirs(cache_dir, exist_ok=True) free_gb = disk_free_gb(cache_dir) print(" [disk free: %.1f GB before download]" % free_gb) print(" Downloading %s ..." % hf_id) is_deepseek = "deepseek" in hf_id.lower() try: cfg = AutoConfig.from_pretrained( hf_id, trust_remote_code=True, cache_dir=cache_dir) # Patch 4: DeepSeek rope_scaling fix if is_deepseek and hasattr(cfg, "rope_scaling"): cfg.rope_scaling = None # Force eager attention on config before loading try: cfg._attn_implementation = "eager" cfg._attn_implementation_autoset = False except Exception: pass model = AutoModelForCausalLM.from_pretrained( hf_id, config=cfg, torch_dtype=torch.float32, device_map="cpu", trust_remote_code=True, attn_implementation="eager", cache_dir=cache_dir, ) model.eval() except Exception as e: err = str(e) if err in ("'flash_attn'", "flash_attn") or "flash_attn" in err: print(" flash_attn error detail:") traceback.print_exc() print(" Retrying without trust_remote_code ...") try: model = AutoModelForCausalLM.from_pretrained( hf_id, torch_dtype=torch.float32, device_map="cpu", trust_remote_code=False, attn_implementation="eager", cache_dir=cache_dir, ) model.eval() except Exception as e2: print(" ERROR: %s" % e2) traceback.print_exc() return else: print(" ERROR: Failed to load %s: %s" % (hf_id, e)) return weights = extract_moe_weights(model, args.layers) if not weights: print(" ERROR: No MoE expert weights found for layers: %s" % args.layers) print(" First 30 parameter names in model:") for i, (n, p) in enumerate(model.named_parameters()): if i >= 30: break print(" %s shape=%s" % (n, list(p.shape))) del model gc.collect() return print(" Found %d expert weight tensors" % len(weights)) for display, lname, W in weights: benchmark_layer(display_name, lname, W, "REAL") del model, weights gc.collect() if device.type == "cuda": torch.cuda.empty_cache() if not args.no_cleanup: print(" Cleaning up model cache ...") clear_model_cache(display_name.replace(" ", "_"), cache_dir) print(" [disk free after cleanup: %.1f GB]" % disk_free_gb(cache_dir)) # ============================================================ # Model selection # ============================================================ def run_selected_model(key): key = key.lower() if key == "deepseek-shapes": run_deepseek_shapes() elif key == "auto": run_deepseek_shapes() run_hf_model("allenai/OLMoE-1B-7B-0924", "OLMoE-1B-7B") elif key in KNOWN_MODELS and KNOWN_MODELS[key]: run_hf_model(KNOWN_MODELS[key], key) else: run_hf_model(key, key.split("/")[-1]) # ============================================================ # Run # ============================================================ model_keys = args.model.split(",") for mk in model_keys: run_selected_model(mk.strip()) # ============================================================ # Final summary table (prereq S10.2) # ============================================================ if all_results: try: all_results.sort(key=lambda r: float(r["speedup_pct"]), reverse=True) except Exception: pass print() print("+========================================================================+") print("| FINAL SUMMARY - ROLV Primitive(c) |") print("| Copyright (c) 2025-2026 ROLV LLC - 3 Patents Pending |") print("+================================+======+===========+==========+==========+======+") print("| Model - Layer | sp% | Vendor ms | ROLV ms | Speedup | ATOL |") print("+================================+======+===========+==========+==========+======+") for r in all_results: name = ("%-20s %-8s" % (r["model"][:20], r["layer"][:8])) print("| %-32s | %4s | %9s | %8s | %4sx %4s%% | %4s |" % ( name, r["sparsity_pct"].replace("%", ""), r["dense_ms"], r["rolv_ms"], r["speedup_iter"], r["speedup_pct"], r["atol"][:4], )) print("+================================+======+===========+==========+==========+======+") print() print(" Energy% FLOPs% Tok/s% TTFT% -- all vs dense baseline") for r in all_results: print(" %-30s energy: %6s%% flops: %6s%% tok/s: %6s%% ttft: %6s%% pert: %s" % ( ("%s %s" % (r["model"][:18], r["layer"][:8])), r["energy_pct"], r["flops_pct"], r["tok_pct"], r["ttft_pct"], r["perturbation"][:4], )) print() print(" Share your results:") print(" GitHub : https://github.com/rolv-ai/rolv-primitive") print(" Reddit : r/LocalLLaMA r/MachineLearning") print(" Paper : https://doi.org/10.5281/zenodo.19221455") print(" Contact : rolv@rolv.ai") print() print(" Free for research use. Commercial: rolv@rolv.ai") print(" ROLV Primitive(c) - RSMT(TM) - ROLVswitch(TM)") print(" Copyright (c) 2025-2026 ROLV LLC. All rights reserved.") print() # ============================================================ # CSV output (prereq S12) # ============================================================ if all_results: csv_path = args.output_csv fieldnames = list(all_results[0].keys()) with open(csv_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(all_results) print(" CSV saved: %s" % csv_path) print()