unary-quantization-research / run_pure_unary.py
OpenTransformer's picture
Add files using upload-large-folder tool
19ed98b verified
#!/usr/bin/env python3
"""
Pure unary model loader - ALL matmuls are AND+popcount
(c) 2026 OpenTransformers Ltd / Scott Bisset
"""
import ctypes, numpy as np, os, sys, json, time
def load_and_run(model_dir, prompt, max_tokens=128, temperature=0.0, top_p=0.9, a_planes=4):
config = json.load(open(os.path.join(model_dir, "config.json")))
manifest = json.load(open(os.path.join(model_dir, "manifest.json")))
w_planes = manifest["n_planes"]
n_layers = config["num_hidden_layers"]
hidden = config["hidden_size"]
inter = config["intermediate_size"]
n_heads = config["num_attention_heads"]
n_kv_heads = config["num_key_value_heads"]
head_dim = config.get("head_dim", hidden // n_heads)
vocab = config["vocab_size"]
rope_theta = config.get("rope_theta", 10000.0)
tie_embeddings = 1 if config.get("tie_word_embeddings", False) else 0
print(f"Config: {n_layers}L, hidden={hidden}, inter={inter}, heads={n_heads}/{n_kv_heads}")
print(f"Weight planes: {w_planes}, Activation planes: {a_planes}")
print(f"Plane pairs per element: {w_planes * a_planes}")
print(f"Tied embeddings: {'yes' if tie_embeddings else 'no'}")
engine_path = os.path.join(os.path.dirname(os.path.abspath(model_dir)), "pure_unary_engine.so")
lib = ctypes.CDLL(engine_path)
lib.model_alloc.restype = ctypes.c_void_p
lib.model_alloc.argtypes = [
ctypes.c_int, ctypes.c_int, # w_planes, a_planes
ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int,
ctypes.c_int, ctypes.c_int, ctypes.c_int,
ctypes.c_float, ctypes.c_int,
]
lib.forward_token.restype = ctypes.POINTER(ctypes.c_float)
lib.forward_token.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_int]
lib.generate.restype = ctypes.c_int
lib.generate.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_int), ctypes.c_int,
ctypes.POINTER(ctypes.c_int), ctypes.c_int,
ctypes.c_float, ctypes.c_float, ctypes.c_int
]
u16p = ctypes.POINTER(ctypes.c_uint16)
f32p = ctypes.POINTER(ctypes.c_float)
u64p = ctypes.POINTER(ctypes.c_uint64)
lib.model_set_embed.argtypes = [ctypes.c_void_p, u16p]
lib.model_set_final_norm.argtypes = [ctypes.c_void_p, f32p]
lib.layer_set_norms.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p]
lib.layer_set_qk_norm.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p]
lib.layer_set_linears.argtypes = [
ctypes.c_void_p, ctypes.c_int,
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
ctypes.c_int,
]
lib.model_reset_cache.argtypes = [ctypes.c_void_p]
print("Allocating model...")
model = lib.model_alloc(
w_planes, a_planes,
hidden, inter, n_heads, n_kv_heads,
head_dim, n_layers, vocab, rope_theta, tie_embeddings
)
_refs = []
def load_fp16(name):
fname = name.replace(".", "_") + ".fp16"
data = np.fromfile(os.path.join(model_dir, fname), dtype=np.uint16)
_refs.append(data)
return data.ctypes.data_as(u16p)
def load_f32(name):
fname = name.replace(".", "_") + ".fp16"
data = np.fromfile(os.path.join(model_dir, fname), dtype=np.uint16)
f32 = data.view(np.float16).astype(np.float32)
_refs.append(f32)
return f32.ctypes.data_as(f32p)
def load_unary(name):
fname = name.replace(".", "_")
sign = np.fromfile(os.path.join(model_dir, f"{fname}.sign"), dtype=np.uint64)
planes = np.fromfile(os.path.join(model_dir, f"{fname}.planes"), dtype=np.uint64)
scales = np.fromfile(os.path.join(model_dir, f"{fname}.scales"), dtype=np.float32)
_refs.extend([sign, planes, scales])
return (sign.ctypes.data_as(u64p), planes.ctypes.data_as(u64p),
scales.ctypes.data_as(f32p))
print("Loading embeddings...")
lib.model_set_embed(model, load_fp16("model.embed_tokens.weight"))
print("Loading final norm...")
lib.model_set_final_norm(model, load_f32("model.norm.weight"))
print(f"Loading {n_layers} layers...")
for l in range(n_layers):
p = f"model.layers.{l}"
lib.layer_set_norms(model, l,
load_f32(f"{p}.input_layernorm.weight"),
load_f32(f"{p}.post_attention_layernorm.weight"))
# QK-Norm (Qwen3)
qn_path = os.path.join(model_dir, f"{p.replace('.','_')}_self_attn_q_norm_weight.fp16")
if os.path.exists(qn_path):
lib.layer_set_qk_norm(model, l,
load_f32(f"{p}.self_attn.q_norm.weight"),
load_f32(f"{p}.self_attn.k_norm.weight"))
q_s, q_p, q_sc = load_unary(f"{p}.self_attn.q_proj.weight")
k_s, k_p, k_sc = load_unary(f"{p}.self_attn.k_proj.weight")
v_s, v_p, v_sc = load_unary(f"{p}.self_attn.v_proj.weight")
o_s, o_p, o_sc = load_unary(f"{p}.self_attn.o_proj.weight")
g_s, g_p, g_sc = load_unary(f"{p}.mlp.gate_proj.weight")
u_s, u_p, u_sc = load_unary(f"{p}.mlp.up_proj.weight")
d_s, d_p, d_sc = load_unary(f"{p}.mlp.down_proj.weight")
um = manifest["unary"]
lib.layer_set_linears(model, l,
q_s, q_p, q_sc, um[f"{p}.self_attn.q_proj.weight"][0], um[f"{p}.self_attn.q_proj.weight"][1],
k_s, k_p, k_sc, um[f"{p}.self_attn.k_proj.weight"][0], um[f"{p}.self_attn.k_proj.weight"][1],
v_s, v_p, v_sc, um[f"{p}.self_attn.v_proj.weight"][0], um[f"{p}.self_attn.v_proj.weight"][1],
o_s, o_p, o_sc, um[f"{p}.self_attn.o_proj.weight"][0], um[f"{p}.self_attn.o_proj.weight"][1],
g_s, g_p, g_sc, um[f"{p}.mlp.gate_proj.weight"][0], um[f"{p}.mlp.gate_proj.weight"][1],
u_s, u_p, u_sc, um[f"{p}.mlp.up_proj.weight"][0], um[f"{p}.mlp.up_proj.weight"][1],
d_s, d_p, d_sc, um[f"{p}.mlp.down_proj.weight"][0], um[f"{p}.mlp.down_proj.weight"][1],
w_planes)
if (l + 1) % 6 == 0 or l == n_layers - 1:
print(f" Loaded layer {l+1}/{n_layers}")
# Tokenize
print("Tokenizing...")
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
input_ids = tokenizer.encode(prompt)
print(f"Prompt: {len(input_ids)} tokens -> {repr(prompt[:60])}")
eos_token = config.get("eos_token_id", 151645)
prompt_arr = (ctypes.c_int * len(input_ids))(*input_ids)
out_arr = (ctypes.c_int * max_tokens)()
print(f"\nGenerating (temp={temperature}, top_p={top_p}, a_planes={a_planes})...")
t0 = time.time()
n_gen = lib.generate(
model, prompt_arr, len(input_ids),
out_arr, max_tokens,
ctypes.c_float(temperature), ctypes.c_float(top_p), eos_token
)
dt = time.time() - t0
out_ids = [out_arr[i] for i in range(n_gen)]
text = tokenizer.decode(out_ids, skip_special_tokens=True)
print(f"\n=== PURE UNARY Output ({n_gen} tokens in {dt:.1f}s = {n_gen/dt:.2f} tok/s) ===")
print(text)
print(f"\nDecode speed: {n_gen/dt:.2f} tok/s")
return text
if __name__ == "__main__":
model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-unary"
prompt = sys.argv[2] if len(sys.argv) > 2 else "What is 2+2? Think step by step."
max_tokens = int(sys.argv[3]) if len(sys.argv) > 3 else 32
a_planes = int(sys.argv[4]) if len(sys.argv) > 4 else 4
load_and_run(model_dir, prompt, max_tokens=max_tokens, a_planes=a_planes)