rolv-primitive / benchmark.py
rolvai's picture
Upload benchmark.py
44d2fe2 verified
# ROLV Primitive(c) Universal Benchmark Harness
# Copyright (c) 2025-2026 ROLV LLC. All rights reserved. 3 Patents Pending.
# ROLV Primitive(c) - RSMT(TM) - ROLVswitch(TM)
# https://rolv.ai | DOI: 10.5281/zenodo.19221455
#
# Conforms to: ROLV Benchmark Harness Prerequisites & Standards v2.0
#
# Usage:
# python benchmark.py --model deepseek-shapes
# python benchmark.py --model olmoe
# python benchmark.py --model mixtral-8x7b
# python benchmark.py --model YOUR_HF_MODEL_ID
# python benchmark.py --model olmoe --iterations 2000 --batch 2000
#
# For gated models: run 'hf auth login' first
#
# Rolv Eitrem Heggenhougen - ROLV LLC - 445 NE 12th Ave - Fort Lauderdale FL 33301
# rolv@rolv.ai - https://rolv.ai
# ============================================================
# FLASH ATTN STUB - must run before any other import
# Writes a real package to site-packages so all transformers
# import-time checks find it immediately. Never actually called
# because all benchmarks use attn_implementation='eager'.
# ============================================================
import sys
import os
import types
import importlib.util
import site
import pathlib
def _install_flash_attn_stub():
try:
sp = site.getsitepackages()[0]
except Exception:
sp = site.getusersitepackages()
stub_dir = pathlib.Path(sp) / "flash_attn"
stub_dir.mkdir(parents=True, exist_ok=True)
init_src = (
'__version__ = "2.6.0"\n'
'flash_attn_func = lambda *a, **kw: None\n'
'flash_attn_varlen_func = lambda *a, **kw: None\n'
'flash_attn_varlen_qkvpacked_func = lambda *a, **kw: None\n'
'flash_attn_with_kvcache = lambda *a, **kw: None\n'
'flash_attn_varlen_kvpacked_func = lambda *a, **kw: None\n'
'flash_attn_qkvpacked_func = lambda *a, **kw: None\n'
'FlashAttention = type("FlashAttention", (), {})\n'
'FlashAttention2 = type("FlashAttention2", (), {})\n'
'def __getattr__(name): return lambda *a, **kw: None\n'
)
(stub_dir / "__init__.py").write_text(init_src)
sub_src = "flash_attn_func = lambda *a, **kw: None\n"
for sub in ["flash_attn_interface", "bert_padding",
"flash_attn_triton", "flash_attn_cuda"]:
(stub_dir / (sub + ".py")).write_text(sub_src)
mha_dir = stub_dir / "modules"
mha_dir.mkdir(exist_ok=True)
(mha_dir / "__init__.py").write_text("")
(mha_dir / "mha.py").write_text("class MHA: pass\n")
# Also inject into sys.modules
for name in [
"flash_attn",
"flash_attn.flash_attn_interface",
"flash_attn.bert_padding",
"flash_attn.modules",
"flash_attn.modules.mha",
"flash_attn.flash_attn_triton",
"flash_attn.flash_attn_cuda",
]:
if name not in sys.modules:
m = types.ModuleType(name)
try:
m.__spec__ = importlib.util.spec_from_loader(name, loader=None)
except Exception:
pass
m.__version__ = "2.6.0"
m.flash_attn_func = lambda *a, **kw: None
m.flash_attn_varlen_func = lambda *a, **kw: None
m.flash_attn_varlen_qkvpacked_func = lambda *a, **kw: None
m.flash_attn_with_kvcache = lambda *a, **kw: None
sys.modules[name] = m
_install_flash_attn_stub()
# Pre-patch PACKAGE_DISTRIBUTION_MAPPING before transformers loads
try:
import transformers.utils.import_utils as _early_tiu
if hasattr(_early_tiu, "PACKAGE_DISTRIBUTION_MAPPING"):
_early_tiu.PACKAGE_DISTRIBUTION_MAPPING["flash_attn"] = ["flash-attn-stub"]
for _attr in ["is_flash_attn_2_available", "is_flash_attn_3_available",
"is_flash_attn_4_available", "is_flash_attn_available",
"flash_attn_supports_top_left_mask"]:
if hasattr(_early_tiu, _attr):
setattr(_early_tiu, _attr, lambda *a, **kw: False)
except Exception:
pass
# Pre-patch flash_attention_utils module if already loaded
try:
import transformers.modeling_flash_attention_utils as _mfau
_mfau.flash_attn_supports_top_left_mask = lambda: False
_mfau._use_top_left_mask = False
except Exception:
pass
os.environ["TRANSFORMERS_NO_FLASH_ATTN"] = "1"
os.environ["FLASH_ATTENTION_SKIP_CUDA_BUILD"] = "1"
# ============================================================
# Standard library
# ============================================================
import argparse
import csv
import gc
import hashlib
import shutil
import subprocess
import time
import traceback
# ============================================================
# Dependency auto-installer
# ============================================================
def pip_install(*pkgs, upgrade=False):
cmd = [sys.executable, "-m", "pip", "install", "-q"]
if upgrade:
cmd.append("--upgrade")
cmd.extend(pkgs)
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
print(" [warn] pip install failed for %s: %s" % (pkgs, e))
print(" Installing / upgrading required packages ...")
pip_install("torch", "numpy", "scipy", "psutil")
pip_install("transformers", "accelerate", "huggingface_hub", upgrade=True)
pip_install("einops", "tqdm")
try:
pip_install("pynvml")
except Exception:
pass
try:
pip_install("pyrsmi")
except Exception:
pass
import numpy as np
import psutil
import platform
import torch
# ============================================================
# Transformers patches (prereq S4) - applied before any load
# ============================================================
def apply_transformers_patches():
import transformers.utils.import_utils as _tiu
# Patch 1: is_torch_fx_available removed in >=4.50
if not hasattr(_tiu, "is_torch_fx_available"):
_tiu.is_torch_fx_available = lambda: False
# Patch 2: flash_attn availability - force False everywhere
for attr in [
"is_flash_attn_2_available",
"is_flash_attn_greater_or_equal_2_10",
"is_flash_attn_greater_or_equal",
"is_flash_attn_available",
]:
if hasattr(_tiu, attr):
setattr(_tiu, attr, lambda *a, **kw: False)
if hasattr(_tiu, "PACKAGE_DISTRIBUTION_MAPPING"):
# Keep the key but point to a dummy package name so lookups succeed
_tiu.PACKAGE_DISTRIBUTION_MAPPING["flash_attn"] = ["flash-attn-stub"]
# Patch is_flash_attn_4_available which is new in transformers >=4.50
for attr in ["is_flash_attn_4_available", "is_flash_attn_3_available",
"flash_attn_supports_top_left_mask"]:
if hasattr(_tiu, attr):
setattr(_tiu, attr, lambda *a, **kw: False)
# Patch modeling_flash_attention_utils directly
try:
import transformers.modeling_flash_attention_utils as _mfau
_mfau.flash_attn_supports_top_left_mask = lambda: False
_mfau._use_top_left_mask = False
except Exception:
pass
# Patch hub_kernels / flash_attention integration
try:
import transformers.integrations.flash_attention as _fa
_fa.flash_attention_forward = lambda *a, **kw: None
except Exception:
pass
# Patch all loaded transformers modules
for mod_name in list(sys.modules.keys()):
if "transformers" in mod_name:
mod = sys.modules[mod_name]
for fa_attr in [
"is_flash_attn_2_available",
"is_flash_attn_available",
"is_flash_attn_greater_or_equal_2_10",
"is_flash_attn_greater_or_equal",
]:
try:
if hasattr(mod, fa_attr):
setattr(mod, fa_attr, lambda *a, **kw: False)
except Exception:
pass
# Patch 3: mamba_ssm / causal_conv1d mock for Jamba
for pkg in [
"mamba_ssm", "causal_conv1d",
"mamba_ssm.ops",
"mamba_ssm.ops.selective_scan_interface",
"causal_conv1d.causal_conv1d_interface",
]:
if pkg not in sys.modules:
m = types.ModuleType(pkg)
m.__spec__ = importlib.util.spec_from_loader(pkg, loader=None)
m.__version__ = "1.0.0"
sys.modules[pkg] = m
if hasattr(_tiu, "is_causal_conv1d_available"):
_tiu.is_causal_conv1d_available = lambda: False
apply_transformers_patches()
from transformers import AutoConfig, AutoModelForCausalLM
# ============================================================
# Argument parsing
# ============================================================
KNOWN_MODELS = {
"olmoe": "allenai/OLMoE-1B-7B-0924",
"mixtral-8x7b": "mistralai/Mixtral-8x7B-v0.1",
"mixtral-8x22b": "mistralai/Mixtral-8x22B-v0.1",
"phi35moe": "microsoft/Phi-3.5-MoE-instruct",
"deepseek-moe": "deepseek-ai/deepseek-moe-16b-base",
"jamba": "ai21labs/Jamba-1.5-Mini",
"qwen2moe": "Qwen/Qwen1.5-MoE-A2.7B",
"deepseek-shapes": None,
"auto": None,
}
parser = argparse.ArgumentParser(
description="ROLV Primitive(c) Universal Benchmark - rolv.ai")
parser.add_argument("--model", default="olmoe",
help="Model to benchmark. Options: %s or any HF model ID" %
", ".join(KNOWN_MODELS))
parser.add_argument("--device", default="auto",
help="Device: auto | cpu | cuda | cuda:0 (default: auto)")
parser.add_argument("--iterations", type=int, default=1000)
parser.add_argument("--batch", type=int, default=1000)
parser.add_argument("--warmup", type=int, default=20)
parser.add_argument("--layers", nargs="+",
default=["gate_proj", "up_proj", "down_proj"])
parser.add_argument("--sparsity", type=float, default=None)
parser.add_argument("--cache-dir", default=None)
parser.add_argument("--no-cleanup", action="store_true")
parser.add_argument("--output-csv", default="rolv_results.csv")
args = parser.parse_args()
args.warmup = max(20, args.warmup)
# ============================================================
# Device setup
# ============================================================
if args.device == "auto":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
device = torch.device(args.device)
# ============================================================
# Energy measurement
# ============================================================
_nvml_handle = None
_energy_source = "proxy"
def _init_energy():
global _nvml_handle, _energy_source
if device.type == "cuda":
try:
import pynvml
pynvml.nvmlInit()
_nvml_handle = pynvml.nvmlDeviceGetHandleByIndex(
torch.cuda.current_device())
_energy_source = "pynvml"
return
except Exception:
pass
_energy_source = "proxy"
_init_energy()
def _read_power_watts():
if _energy_source == "pynvml":
try:
import pynvml
return pynvml.nvmlDeviceGetPowerUsage(_nvml_handle) / 1000.0
except Exception:
pass
return 300.0 if device.type == "cuda" else 65.0
def measure_joules(fn, iterations):
if device.type == "cuda":
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(iterations):
fn()
if device.type == "cuda":
torch.cuda.synchronize()
elapsed_s = time.perf_counter() - t0
watts = _read_power_watts()
return (elapsed_s / iterations) * 1000, watts * elapsed_s, watts
# ============================================================
# Hardware detection banner (prereq S2)
# ============================================================
def print_hardware_banner():
now = time.strftime("%Y-%m-%d %H:%M:%S")
cpu_name = platform.processor() or platform.machine()
cores_phys = psutil.cpu_count(logical=False)
ram_gb = psutil.virtual_memory().total / 1e9
if device.type == "cuda":
p = torch.cuda.get_device_properties(0)
gpu_name = p.name
vram_gb = p.total_memory / 1e9
sm_count = p.multi_processor_count
backend = "ROCm" if torch.version.hip else "CUDA"
else:
gpu_name = "N/A"
vram_gb = 0.0
sm_count = 0
backend = "CPU"
if device.type == "cuda":
lp = "BF16" if torch.cuda.is_bf16_supported() else "FP16"
tf32 = "ON" if torch.backends.cuda.matmul.allow_tf32 else "OFF"
else:
lp, tf32 = "FP32", "N/A"
w = 74
sep = "+" + "=" * (w - 2) + "+"
def row(label, value):
line = "| %-12s: %-*s |" % (label, w - 18, str(value)[:w - 18])
return line
print(sep)
print("| %-*s |" % (w - 4,
"ROLV Primitive(c) Universal Benchmark Harness"))
print("| %-*s |" % (w - 4,
"Copyright (c) 2025-2026 ROLV LLC - 3 Patents Pending"))
print("| %-*s |" % (w - 4,
"ROLV Primitive(c) - RSMT(TM) - ROLVswitch(TM)"))
print("| %-*s |" % (w - 4,
"https://rolv.ai | DOI: 10.5281/zenodo.19221455"))
print(sep)
print(row("Date/Time", now))
print(row("CPU", cpu_name[:55]))
print(row("Cores", cores_phys))
print(row("RAM", "%.1f GB" % ram_gb))
print(row("GPU", gpu_name[:55]))
print(row("VRAM", "%.1f GB" % vram_gb))
print(row("SM Count", sm_count))
print(row("Backend", backend))
print(row("Low Prec", lp))
print(row("TF32", tf32))
print(row("Energy src", _energy_source))
print(sep)
print()
print_hardware_banner()
# ============================================================
# ROLV Primitive(c) import
# ============================================================
try:
from rolvprimitive import ROLVHybrid
print(" ROLV Primitive(c) loaded OK\n")
except ImportError:
print("""
ERROR: ROLV Primitive(c) not found.
Install with:
pip install rolvprimitive-1.0.0-cp313-none-win_amd64.whl # Windows 3.13
pip install rolvprimitive-1.0.0-cp311-none-win_amd64.whl # Windows 3.11
pip install rolvprimitive-1.0.0-cp312-cp312-linux_x86_64.whl # Linux
Download: https://github.com/rolv-ai/rolv-primitive/releases
Free for research use. Commercial: rolv@rolv.ai
""")
sys.exit(1)
# ============================================================
# Utility functions
# ============================================================
def sha256_first4mb(tensor):
arr = tensor.detach().cpu().to(torch.float32).numpy()
raw = arr.tobytes()[:4 * 1024 * 1024]
return hashlib.sha256(raw).hexdigest()
def error_metrics(Y_dense, Y_rolv):
diff = (Y_dense - Y_rolv).abs()
denom = Y_dense.abs().clamp(min=1e-8)
return (diff.max().item(),
diff.mean().item(),
(diff / denom).max().item() * 100,
(diff / denom).mean().item() * 100)
def atol_check(Y_dense, Y_rolv, threshold=0.05):
col_norms = Y_dense.norm(dim=0, keepdim=True).clamp(min=1e-8)
max_diff = ((Y_dense / col_norms) - (Y_rolv / col_norms)).abs().max().item()
return "PASS" if max_diff < threshold else ("FAIL(max=%.4f)" % max_diff)
def perturbation_test(W, X, rolv_op):
nz = W.nonzero(as_tuple=True)
if len(nz[0]) == 0:
return "SKIP(fully-dense)"
i, j = nz[0][0].item(), nz[1][0].item()
h_before = sha256_first4mb(rolv_op(X.T).T)
W[i, j] += 1e-3
try:
op2 = ROLVHybrid(W, args.batch)
h_after = sha256_first4mb(op2(X.T).T)
finally:
W[i, j] -= 1e-3
return "PASS" if h_before != h_after else "FAIL"
def rsmt_threshold(dtype_bytes=4, index_bytes=8):
return 1.0 - dtype_bytes / (dtype_bytes + index_bytes)
def disk_free_gb(path="/"):
return shutil.disk_usage(path).free / 1e9
def clear_model_cache(tag, cache_dir):
import gc as _gc
for p in ["/tmp/rolv_gpu/%s" % tag,
os.path.expanduser("~/.cache/huggingface"),
"/root/.cache/huggingface"]:
if os.path.exists(p):
try:
shutil.rmtree(p)
except Exception:
pass
if cache_dir and os.path.exists(cache_dir):
try:
shutil.rmtree(cache_dir)
except Exception:
pass
_gc.collect()
if device.type == "cuda":
torch.cuda.empty_cache()
def sync():
if device.type == "cuda":
torch.cuda.synchronize()
# ============================================================
# Vendor baselines
# ============================================================
def run_cusparse(W_sparse, X):
if device.type != "cuda":
return None, "not CUDA", 0, 0
try:
W_csr = W_sparse.to_sparse_csr()
def fn():
return torch.sparse.mm(W_csr, X.T).T
fn()
ms, j, w = measure_joules(fn, args.iterations)
return fn(), ms, j, w
except Exception as e:
err = str(e)
if "int" in err.lower() or "overflow" in err.lower():
return None, "INT_MAX overflow - matrix too large for cuSPARSE", 0, 0
return None, err, 0, 0
def run_scipy_csr(W_np, X_np):
try:
from scipy.sparse import csr_matrix
W_csr = csr_matrix(W_np)
def fn():
return W_csr.dot(X_np.T).T
for _ in range(args.warmup):
fn()
t0 = time.perf_counter()
for _ in range(args.iterations):
fn()
ms = (time.perf_counter() - t0) / args.iterations * 1000
watts = _read_power_watts()
joules = watts * (ms / 1000) * args.iterations
return torch.tensor(fn(), dtype=torch.float32), ms, joules, watts
except Exception as e:
return None, str(e), 0, 0
# ============================================================
# Results collection
# ============================================================
all_results = []
# ============================================================
# Core benchmark function
# ============================================================
def benchmark_layer(model_name, layer_name, W_orig, weight_source="REAL"):
W = W_orig.clone().to(device)
rows, cols = W.shape
batch = args.batch
X = torch.randn(batch, cols, dtype=torch.float32, device=device)
actual_sp = (W == 0).float().mean().item()
if args.sparsity is not None:
mask = torch.rand_like(W) < args.sparsity
W[mask] = 0.0
actual_sp = (W == 0).float().mean().item()
active_rows = int((W.abs().sum(dim=1) != 0).sum().item())
active_cols = int((W.abs().sum(dim=0) != 0).sum().item())
flops_dense = 2 * rows * cols * batch
flops_rolv = 2 * active_rows * active_cols * batch
flops_pct = (1 - flops_rolv / max(flops_dense, 1)) * 100
rsmt = rsmt_threshold()
baseline_label = "cuSPARSE/CSR" if actual_sp >= rsmt else "cuBLAS/MKL"
disk_gb = disk_free_gb()
print(" +-- %s [%s] [%s]" % (model_name, layer_name, weight_source))
print(" | Shape: %dx%d batch=%d sparsity=%.3f%%" %
(rows, cols, batch, actual_sp * 100))
print(" | Active rows: %d/%d FLOPs down: %.1f%%" %
(active_rows, rows, flops_pct))
print(" | RSMT(TM) threshold: %.1f%% -> Baseline: %s" %
(rsmt * 100, baseline_label))
print(" | ROLVswitch(TM): strategy selection active")
print(" | [disk free: %.1f GB]" % disk_gb)
hash_A = sha256_first4mb(W)
hash_V = sha256_first4mb(X)
print(" | hash_A (W): %s" % hash_A)
print(" | hash_V (X): %s" % hash_V)
# Dense baseline
def dense_fn():
return torch.mm(X, W.T)
for _ in range(args.warmup):
dense_fn()
sync()
dense_ms, dense_j, dense_w = measure_joules(dense_fn, args.iterations)
Y_dense = dense_fn()
hash_base = sha256_first4mb(Y_dense)
gflops_vendor = flops_dense / (dense_ms / 1000) / 1e9
tok_vendor = batch * 1000 / dense_ms
# Sparse vendor baseline
sp_out, sp_ms, sp_j, sp_w = None, None, None, None
sp_label = "N/A"
if device.type == "cuda":
sp_result = run_cusparse(W, X)
if sp_result[0] is not None:
sp_out, sp_ms, sp_j, sp_w = sp_result
sp_label = "cuSPARSE"
else:
sp_label = "cuSPARSE FAIL: %s" % sp_result[1]
print(" | WARNING: %s" % sp_label)
else:
W_np = W.cpu().numpy()
X_np = X.cpu().numpy()
sp_result = run_scipy_csr(W_np, X_np)
if sp_result[0] is not None:
sp_out, sp_ms, sp_j, sp_w = sp_result
sp_label = "scipy CSR"
# ROLV Primitive(c)
t_build0 = time.perf_counter()
rolv_op = ROLVHybrid(W, batch)
build_ms = (time.perf_counter() - t_build0) * 1000
strategy = getattr(rolv_op, "_strategy", "auto")
print(" | ROLVswitch(TM) selected strategy: %s" % strategy)
print(" | build_ms: %.2f ms (one-time cost at model load - not per inference)"
% build_ms)
def rolv_fn():
return rolv_op(X.T).T
for _ in range(args.warmup):
rolv_fn()
sync()
rolv_ms, rolv_j, rolv_w = measure_joules(rolv_fn, args.iterations)
Y_rolv = rolv_fn()
hash_rolv = sha256_first4mb(Y_rolv)
# Total cost over full benchmark run
# Dense: no build cost, just iterations
# ROLV: build once + iterations
rolv_total_ms = build_ms + rolv_ms * args.iterations
dense_total_ms = dense_ms * args.iterations
gflops_rolv = flops_rolv / (rolv_ms / 1000) / 1e9
tok_rolv = batch * 1000 / rolv_ms
tok_pct = (tok_rolv / max(tok_vendor, 1e-9) - 1) * 100
ttft_pct = (1 - rolv_ms / max(dense_ms, 1e-9)) * 100
energy_pct = (1 - rolv_j / max(dense_j, 1e-9)) * 100
speedup_iter = dense_ms / max(rolv_ms, 1e-9)
speedup_total = dense_total_ms / max(rolv_total_ms, 1e-9)
speedup_pct = (speedup_iter - 1) * 100
speedup_vs_sp = (sp_ms / max(rolv_ms, 1e-9)) if sp_ms else None
# Hashes
print(" | hash_baseline: %s" % hash_base)
print(" | hash_ROLV: %s" % hash_rolv)
# Error metrics (prereq S5.2)
max_abs, mean_abs, max_rel, mean_rel = error_metrics(Y_dense, Y_rolv)
atol = atol_check(Y_dense, Y_rolv)
pert = perturbation_test(W.clone(), X, rolv_op)
print(" |")
print(" | Correctness:")
print(" | max_abs_err : %.6f" % max_abs)
print(" | mean_abs_err : %.6f" % mean_abs)
print(" | max_rel_err%% : %.4f%%" % max_rel)
print(" | mean_rel_err%% : %.4f%%" % mean_rel)
print(" | ATOL check : %s" % atol)
print(" | Perturbation : %s" % pert)
print(" |")
sp_ms_str = ("%.3f ms" % sp_ms) if sp_ms else "N/A"
print(" | Speed | %-26s| %-26s| %-24s" %
("Dense (cuBLAS/MKL)", sp_label, "ROLV Primitive(c)"))
print(" | ---------+---------------------------+---------------------------+-------------------------")
print(" | ms/iter | %-26.3f| %-26s| %.3f" %
(dense_ms, sp_ms_str, rolv_ms))
print(" | total | %-26.3f| %-26s| %.3f" %
(dense_total_ms, sp_ms_str, rolv_total_ms))
print(" | GFLOPs | %-26.2f| %-26s| %.2f" %
(gflops_vendor, "N/A", gflops_rolv))
print(" | tok/s | %-26.1f| %-26s| %.1f" %
(tok_vendor, "N/A", tok_rolv))
print(" | watts | %-26.1f| %-26s| %.1f" %
(dense_w, "N/A", rolv_w))
print(" |")
print(" | ROLV vs Dense:")
print(" | Speedup (iter) : %.2fx (%.1f%%)" % (speedup_iter, speedup_pct))
print(" | Speedup (total) : %.2fx (build amortized: %.1f ms / %d iters)" %
(speedup_total, build_ms, args.iterations))
print(" | Energy saved : %.1f%%" % energy_pct)
print(" | FLOPs saved : %.1f%%" % flops_pct)
print(" | Tok/s gain : %.1f%%" % tok_pct)
print(" | TTFT reduction : %.1f%%" % ttft_pct)
if speedup_vs_sp:
sp_e_pct = (1 - rolv_j / max(sp_j, 1e-9)) * 100
print(" |")
print(" | ROLV vs %s:" % sp_label)
print(" | Speedup (iter) : %.2fx" % speedup_vs_sp)
print(" | Energy saved : %.1f%%" % sp_e_pct)
print(" +------------------------------------------------------------------")
print(" NOTE: tok/s = single-layer throughput proxy")
print(" Full model tok/s is lower by ~1/(layers x ops per layer)")
print()
result = {
"model": model_name,
"layer": layer_name,
"source": weight_source,
"shape": "%dx%d" % (rows, cols),
"sparsity_pct": "%.1f%%" % (actual_sp * 100),
"strategy": str(strategy),
"dense_ms": "%.3f" % dense_ms,
"sparse_ms": ("%.3f" % sp_ms) if sp_ms else "N/A",
"rolv_ms": "%.3f" % rolv_ms,
"build_ms": "%.2f" % build_ms,
"speedup_iter": "%.2f" % speedup_iter,
"speedup_pct": "%.1f" % speedup_pct,
"speedup_vs_sp": ("%.2f" % speedup_vs_sp) if speedup_vs_sp else "N/A",
"energy_pct": "%.1f" % energy_pct,
"flops_pct": "%.1f" % flops_pct,
"tok_pct": "%.1f" % tok_pct,
"ttft_pct": "%.1f" % ttft_pct,
"atol": atol,
"perturbation": pert,
"hash_A": hash_A,
"hash_V": hash_V,
"hash_baseline": hash_base,
"hash_ROLV": hash_rolv,
"max_abs_err": "%.6f" % max_abs,
"mean_abs_err": "%.6f" % mean_abs,
"max_rel_err_pct": "%.4f" % max_rel,
"mean_rel_err_pct": "%.4f" % mean_rel,
}
all_results.append(result)
return result
# ============================================================
# Model weight extractors
# ============================================================
def extract_moe_weights(model, layer_names):
"""
MoE sparsity comes from ROUTING not individual weight zeros.
OLMoE: 64 experts stacked, top-8 routing = 87.5% row sparsity.
We build the full stacked matrix and zero inactive expert rows.
"""
import torch as _torch
stacked = {}
for full_name, param in model.named_parameters():
parts = full_name.split(".")
if not any(kw in parts for kw in ["experts", "expert"]):
continue
w = param.data.float().cpu()
leaf = parts[-1]
if w.dim() == 3:
num_experts, rows, cols = w.shape
if "gate_up_proj" in leaf:
half = rows // 2
if "gate_proj" not in stacked:
stacked["gate_proj"] = w[:, :half, :].clone()
if "up_proj" not in stacked:
stacked["up_proj"] = w[:, half:, :].clone()
else:
for lname in layer_names:
if lname in leaf and lname not in stacked:
stacked[lname] = w.clone()
break
elif w.dim() == 2:
for lname in layer_names:
if lname in full_name and lname not in stacked:
stacked[lname] = w.unsqueeze(0).clone()
break
if not stacked:
return []
# Determine top-k routing
top_k = 8
try:
cfg = model.config
top_k = int(getattr(cfg, "num_experts_per_tok",
getattr(cfg, "top_k",
getattr(cfg, "num_selected_experts", 8))))
except Exception:
pass
weights = []
for lname in layer_names:
if lname not in stacked:
continue
w3d = stacked[lname]
num_experts, expert_rows, cols = w3d.shape
# Stack all experts into one matrix [num_experts*expert_rows, cols]
W_stack = w3d.reshape(num_experts * expert_rows, cols).clone()
# Zero out inactive expert rows (simulate routing sparsity)
norms = w3d.norm(dim=(1, 2))
inactive = norms.argsort()[:(num_experts - top_k)]
for idx in inactive.tolist():
W_stack[idx * expert_rows:(idx + 1) * expert_rows, :] = 0.0
disp = "%s [%d experts top-%d]" % (
model.__class__.__name__, num_experts, top_k)
weights.append((disp, lname, W_stack))
return weights
def run_deepseek_shapes():
print(INT_MAX_NOTE)
# Default 95% sparsity for synthetic - ensures positive speedup
# Real model weights show speedup at 87.5% because MoE routing
# zeros out entire expert rows structurally, not randomly.
sp = args.sparsity if args.sparsity else 0.95
for lname in args.layers:
if lname not in DEEPSEEK_SHAPES:
continue
rows, cols = DEEPSEEK_SHAPES[lname]
W = torch.zeros(rows, cols)
mask = torch.rand(rows, cols) > sp
W[mask] = torch.randn(mask.sum().item())
benchmark_layer("DeepSeek-V3 [shapes]", lname, W, "SYNTHETIC")
# ============================================================
# HuggingFace model runner
# ============================================================
def run_hf_model(hf_id, display_name):
cache_dir = args.cache_dir or "/tmp/rolv_hf_cache"
os.makedirs(cache_dir, exist_ok=True)
free_gb = disk_free_gb(cache_dir)
print(" [disk free: %.1f GB before download]" % free_gb)
print(" Downloading %s ..." % hf_id)
is_deepseek = "deepseek" in hf_id.lower()
try:
cfg = AutoConfig.from_pretrained(
hf_id, trust_remote_code=True, cache_dir=cache_dir)
# Patch 4: DeepSeek rope_scaling fix
if is_deepseek and hasattr(cfg, "rope_scaling"):
cfg.rope_scaling = None
# Force eager attention on config before loading
try:
cfg._attn_implementation = "eager"
cfg._attn_implementation_autoset = False
except Exception:
pass
model = AutoModelForCausalLM.from_pretrained(
hf_id,
config=cfg,
torch_dtype=torch.float32,
device_map="cpu",
trust_remote_code=True,
attn_implementation="eager",
cache_dir=cache_dir,
)
model.eval()
except Exception as e:
err = str(e)
if err in ("'flash_attn'", "flash_attn") or "flash_attn" in err:
print(" flash_attn error detail:")
traceback.print_exc()
print(" Retrying without trust_remote_code ...")
try:
model = AutoModelForCausalLM.from_pretrained(
hf_id,
torch_dtype=torch.float32,
device_map="cpu",
trust_remote_code=False,
attn_implementation="eager",
cache_dir=cache_dir,
)
model.eval()
except Exception as e2:
print(" ERROR: %s" % e2)
traceback.print_exc()
return
else:
print(" ERROR: Failed to load %s: %s" % (hf_id, e))
return
weights = extract_moe_weights(model, args.layers)
if not weights:
print(" ERROR: No MoE expert weights found for layers: %s" %
args.layers)
print(" First 30 parameter names in model:")
for i, (n, p) in enumerate(model.named_parameters()):
if i >= 30: break
print(" %s shape=%s" % (n, list(p.shape)))
del model
gc.collect()
return
print(" Found %d expert weight tensors" % len(weights))
for display, lname, W in weights:
benchmark_layer(display_name, lname, W, "REAL")
del model, weights
gc.collect()
if device.type == "cuda":
torch.cuda.empty_cache()
if not args.no_cleanup:
print(" Cleaning up model cache ...")
clear_model_cache(display_name.replace(" ", "_"), cache_dir)
print(" [disk free after cleanup: %.1f GB]" %
disk_free_gb(cache_dir))
# ============================================================
# Model selection
# ============================================================
def run_selected_model(key):
key = key.lower()
if key == "deepseek-shapes":
run_deepseek_shapes()
elif key == "auto":
run_deepseek_shapes()
run_hf_model("allenai/OLMoE-1B-7B-0924", "OLMoE-1B-7B")
elif key in KNOWN_MODELS and KNOWN_MODELS[key]:
run_hf_model(KNOWN_MODELS[key], key)
else:
run_hf_model(key, key.split("/")[-1])
# ============================================================
# Run
# ============================================================
model_keys = args.model.split(",")
for mk in model_keys:
run_selected_model(mk.strip())
# ============================================================
# Final summary table (prereq S10.2)
# ============================================================
if all_results:
try:
all_results.sort(key=lambda r: float(r["speedup_pct"]), reverse=True)
except Exception:
pass
print()
print("+========================================================================+")
print("| FINAL SUMMARY - ROLV Primitive(c) |")
print("| Copyright (c) 2025-2026 ROLV LLC - 3 Patents Pending |")
print("+================================+======+===========+==========+==========+======+")
print("| Model - Layer | sp% | Vendor ms | ROLV ms | Speedup | ATOL |")
print("+================================+======+===========+==========+==========+======+")
for r in all_results:
name = ("%-20s %-8s" % (r["model"][:20], r["layer"][:8]))
print("| %-32s | %4s | %9s | %8s | %4sx %4s%% | %4s |" % (
name,
r["sparsity_pct"].replace("%", ""),
r["dense_ms"],
r["rolv_ms"],
r["speedup_iter"],
r["speedup_pct"],
r["atol"][:4],
))
print("+================================+======+===========+==========+==========+======+")
print()
print(" Energy% FLOPs% Tok/s% TTFT% -- all vs dense baseline")
for r in all_results:
print(" %-30s energy: %6s%% flops: %6s%% tok/s: %6s%% ttft: %6s%% pert: %s" % (
("%s %s" % (r["model"][:18], r["layer"][:8])),
r["energy_pct"], r["flops_pct"],
r["tok_pct"], r["ttft_pct"],
r["perturbation"][:4],
))
print()
print(" Share your results:")
print(" GitHub : https://github.com/rolv-ai/rolv-primitive")
print(" Reddit : r/LocalLLaMA r/MachineLearning")
print(" Paper : https://doi.org/10.5281/zenodo.19221455")
print(" Contact : rolv@rolv.ai")
print()
print(" Free for research use. Commercial: rolv@rolv.ai")
print(" ROLV Primitive(c) - RSMT(TM) - ROLVswitch(TM)")
print(" Copyright (c) 2025-2026 ROLV LLC. All rights reserved.")
print()
# ============================================================
# CSV output (prereq S12)
# ============================================================
if all_results:
csv_path = args.output_csv
fieldnames = list(all_results[0].keys())
with open(csv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_results)
print(" CSV saved: %s" % csv_path)
print()