unary-quantization-research / run_qwen3_4b.py
OpenTransformer's picture
Add files using upload-large-folder tool
19ed98b verified
#!/usr/bin/env python3
"""
Unary model loader for Qwen3-4B-Thinking.
Loads converted weights and runs inference via unary_engine_v2.so
(c) 2026 OpenTransformers Ltd / Scott Bisset
"""
import ctypes, numpy as np, os, sys, json, time
def load_and_run(model_dir, prompt, max_tokens=128, temperature=0.0, top_p=0.9):
# Load config
config = json.load(open(os.path.join(model_dir, "config.json")))
manifest = json.load(open(os.path.join(model_dir, "manifest.json")))
n_planes = manifest["n_planes"]
n_layers = config["num_hidden_layers"]
hidden = config["hidden_size"]
inter = config["intermediate_size"]
n_heads = config["num_attention_heads"]
n_kv_heads = config["num_key_value_heads"]
head_dim = config.get("head_dim", hidden // n_heads)
vocab = config["vocab_size"]
rope_theta = config.get("rope_theta", 10000.0)
has_attn_bias = 1 if config.get("attention_bias", False) else 0
tie_embeddings = 1 if config.get("tie_word_embeddings", False) else 0
print(f"Config: {n_layers}L, hidden={hidden}, inter={inter}, heads={n_heads}/{n_kv_heads}, vocab={vocab}")
print(f"QK-Norm: yes, Tied embeddings: {'yes' if tie_embeddings else 'no'}, n_planes={n_planes}")
# Load C engine
engine_path = os.path.join(os.path.dirname(os.path.abspath(model_dir)), "unary_engine_v2.so")
lib = ctypes.CDLL(engine_path)
# Configure function signatures
lib.model_alloc.restype = ctypes.c_void_p
lib.model_alloc.argtypes = [
ctypes.c_int, # n_planes
ctypes.c_int, # hidden
ctypes.c_int, # inter
ctypes.c_int, # n_heads
ctypes.c_int, # n_kv_heads
ctypes.c_int, # head_dim
ctypes.c_int, # n_layers
ctypes.c_int, # vocab
ctypes.c_float, # rope_theta
ctypes.c_int, # has_attn_bias
ctypes.c_int, # tie_embeddings
]
lib.forward_token.restype = ctypes.POINTER(ctypes.c_float)
lib.forward_token.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_int]
lib.generate.restype = ctypes.c_int
lib.generate.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_int), ctypes.c_int,
ctypes.POINTER(ctypes.c_int), ctypes.c_int,
ctypes.c_float, ctypes.c_float, ctypes.c_int
]
u16p = ctypes.POINTER(ctypes.c_uint16)
f32p = ctypes.POINTER(ctypes.c_float)
u64p = ctypes.POINTER(ctypes.c_uint64)
lib.model_set_embed.argtypes = [ctypes.c_void_p, u16p]
lib.model_set_final_norm.argtypes = [ctypes.c_void_p, f32p]
lib.model_set_lm_head.argtypes = [ctypes.c_void_p, u16p, ctypes.c_int, ctypes.c_int]
lib.layer_set_norms.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p]
lib.layer_set_bias.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p, f32p]
lib.layer_set_qk_norm.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p]
lib.layer_set_linears.argtypes = [
ctypes.c_void_p, ctypes.c_int,
# q: sign, planes, scales, out, in
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
# k
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
# v
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
# o
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
# gate
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
# up
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
# down
u64p, u64p, f32p, ctypes.c_int, ctypes.c_int,
ctypes.c_int, # n_planes
]
lib.model_reset_cache.argtypes = [ctypes.c_void_p]
# Allocate model
print("Allocating model...")
model = lib.model_alloc(
n_planes, hidden, inter, n_heads, n_kv_heads,
head_dim, n_layers, vocab, rope_theta,
has_attn_bias, tie_embeddings
)
# Keep references to prevent GC
_refs = []
def load_fp16(name):
fname = name.replace(".", "_") + ".fp16"
path = os.path.join(model_dir, fname)
data = np.fromfile(path, dtype=np.uint16)
_refs.append(data)
return data.ctypes.data_as(u16p)
def load_f32_from_fp16(name):
fname = name.replace(".", "_") + ".fp16"
path = os.path.join(model_dir, fname)
data = np.fromfile(path, dtype=np.uint16)
# Convert FP16 -> FP32
f32 = data.view(np.float16).astype(np.float32)
_refs.append(f32)
return f32.ctypes.data_as(f32p)
def load_unary(name):
fname = name.replace(".", "_")
sign = np.fromfile(os.path.join(model_dir, f"{fname}.sign"), dtype=np.uint64)
planes = np.fromfile(os.path.join(model_dir, f"{fname}.planes"), dtype=np.uint64)
scales = np.fromfile(os.path.join(model_dir, f"{fname}.scales"), dtype=np.float32)
_refs.extend([sign, planes, scales])
return (sign.ctypes.data_as(u64p), planes.ctypes.data_as(u64p),
scales.ctypes.data_as(f32p))
# Load embeddings
print("Loading embeddings...")
embed_ptr = load_fp16("model.embed_tokens.weight")
lib.model_set_embed(model, embed_ptr)
# Load final norm
print("Loading final norm...")
fnorm_ptr = load_f32_from_fp16("model.norm.weight")
lib.model_set_final_norm(model, fnorm_ptr)
# Load layers
print(f"Loading {n_layers} layers...")
for l in range(n_layers):
prefix = f"model.layers.{l}"
# Norms
in_norm = load_f32_from_fp16(f"{prefix}.input_layernorm.weight")
post_norm = load_f32_from_fp16(f"{prefix}.post_attention_layernorm.weight")
lib.layer_set_norms(model, l, in_norm, post_norm)
# QK-Norm
q_norm = load_f32_from_fp16(f"{prefix}.self_attn.q_norm.weight")
k_norm = load_f32_from_fp16(f"{prefix}.self_attn.k_norm.weight")
lib.layer_set_qk_norm(model, l, q_norm, k_norm)
# Linear layers
q_s, q_p, q_sc = load_unary(f"{prefix}.self_attn.q_proj.weight")
k_s, k_p, k_sc = load_unary(f"{prefix}.self_attn.k_proj.weight")
v_s, v_p, v_sc = load_unary(f"{prefix}.self_attn.v_proj.weight")
o_s, o_p, o_sc = load_unary(f"{prefix}.self_attn.o_proj.weight")
g_s, g_p, g_sc = load_unary(f"{prefix}.mlp.gate_proj.weight")
u_s, u_p, u_sc = load_unary(f"{prefix}.mlp.up_proj.weight")
d_s, d_p, d_sc = load_unary(f"{prefix}.mlp.down_proj.weight")
# Dims from manifest
q_shape = manifest["unary"][f"{prefix}.self_attn.q_proj.weight"]
k_shape = manifest["unary"][f"{prefix}.self_attn.k_proj.weight"]
v_shape = manifest["unary"][f"{prefix}.self_attn.v_proj.weight"]
o_shape = manifest["unary"][f"{prefix}.self_attn.o_proj.weight"]
g_shape = manifest["unary"][f"{prefix}.mlp.gate_proj.weight"]
u_shape = manifest["unary"][f"{prefix}.mlp.up_proj.weight"]
d_shape = manifest["unary"][f"{prefix}.mlp.down_proj.weight"]
lib.layer_set_linears(
model, l,
q_s, q_p, q_sc, q_shape[0], q_shape[1],
k_s, k_p, k_sc, k_shape[0], k_shape[1],
v_s, v_p, v_sc, v_shape[0], v_shape[1],
o_s, o_p, o_sc, o_shape[0], o_shape[1],
g_s, g_p, g_sc, g_shape[0], g_shape[1],
u_s, u_p, u_sc, u_shape[0], u_shape[1],
d_s, d_p, d_sc, d_shape[0], d_shape[1],
n_planes
)
if (l + 1) % 6 == 0 or l == n_layers - 1:
print(f" Loaded layer {l+1}/{n_layers}")
# Tokenize
print("Tokenizing prompt...")
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
input_ids = tokenizer.encode(prompt)
print(f"Prompt: {len(input_ids)} tokens")
eos_token = config.get("eos_token_id", 151645)
# Generate
prompt_arr = (ctypes.c_int * len(input_ids))(*input_ids)
out_arr = (ctypes.c_int * max_tokens)()
print(f"\nGenerating (temp={temperature}, top_p={top_p})...")
t0 = time.time()
n_generated = lib.generate(
model, prompt_arr, len(input_ids),
out_arr, max_tokens,
ctypes.c_float(temperature), ctypes.c_float(top_p),
eos_token
)
dt = time.time() - t0
out_ids = [out_arr[i] for i in range(n_generated)]
text = tokenizer.decode(out_ids, skip_special_tokens=True)
total_tokens = len(input_ids) + n_generated
print(f"\n=== Output ({n_generated} tokens in {dt:.1f}s = {n_generated/dt:.1f} tok/s) ===")
print(text)
print(f"\nPrefill: {len(input_ids)} tokens, Decode: {n_generated} tokens")
print(f"Total time: {dt:.1f}s, Speed: {total_tokens/dt:.1f} tok/s total, {n_generated/dt:.1f} tok/s decode")
return text
if __name__ == "__main__":
model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-unary"
prompt = sys.argv[2] if len(sys.argv) > 2 else "What is 2+2? Think step by step."
max_tokens = int(sys.argv[3]) if len(sys.argv) > 3 else 64
load_and_run(model_dir, prompt, max_tokens=max_tokens)