| | |
| | """ |
| | Unary model loader for Qwen3-4B-Thinking. |
| | Loads converted weights and runs inference via unary_engine_v2.so |
| | (c) 2026 OpenTransformers Ltd / Scott Bisset |
| | """ |
| | import ctypes, numpy as np, os, sys, json, time |
| |
|
| | def load_and_run(model_dir, prompt, max_tokens=128, temperature=0.0, top_p=0.9): |
| | |
| | config = json.load(open(os.path.join(model_dir, "config.json"))) |
| | manifest = json.load(open(os.path.join(model_dir, "manifest.json"))) |
| | n_planes = manifest["n_planes"] |
| | n_layers = config["num_hidden_layers"] |
| | hidden = config["hidden_size"] |
| | inter = config["intermediate_size"] |
| | n_heads = config["num_attention_heads"] |
| | n_kv_heads = config["num_key_value_heads"] |
| | head_dim = config.get("head_dim", hidden // n_heads) |
| | vocab = config["vocab_size"] |
| | rope_theta = config.get("rope_theta", 10000.0) |
| | has_attn_bias = 1 if config.get("attention_bias", False) else 0 |
| | tie_embeddings = 1 if config.get("tie_word_embeddings", False) else 0 |
| |
|
| | print(f"Config: {n_layers}L, hidden={hidden}, inter={inter}, heads={n_heads}/{n_kv_heads}, vocab={vocab}") |
| | print(f"QK-Norm: yes, Tied embeddings: {'yes' if tie_embeddings else 'no'}, n_planes={n_planes}") |
| |
|
| | |
| | engine_path = os.path.join(os.path.dirname(os.path.abspath(model_dir)), "unary_engine_v2.so") |
| | lib = ctypes.CDLL(engine_path) |
| |
|
| | |
| | lib.model_alloc.restype = ctypes.c_void_p |
| | lib.model_alloc.argtypes = [ |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ctypes.c_float, |
| | ctypes.c_int, |
| | ctypes.c_int, |
| | ] |
| |
|
| | lib.forward_token.restype = ctypes.POINTER(ctypes.c_float) |
| | lib.forward_token.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_int] |
| |
|
| | lib.generate.restype = ctypes.c_int |
| | lib.generate.argtypes = [ |
| | ctypes.c_void_p, |
| | ctypes.POINTER(ctypes.c_int), ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_int), ctypes.c_int, |
| | ctypes.c_float, ctypes.c_float, ctypes.c_int |
| | ] |
| |
|
| | u16p = ctypes.POINTER(ctypes.c_uint16) |
| | f32p = ctypes.POINTER(ctypes.c_float) |
| | u64p = ctypes.POINTER(ctypes.c_uint64) |
| |
|
| | lib.model_set_embed.argtypes = [ctypes.c_void_p, u16p] |
| | lib.model_set_final_norm.argtypes = [ctypes.c_void_p, f32p] |
| | lib.model_set_lm_head.argtypes = [ctypes.c_void_p, u16p, ctypes.c_int, ctypes.c_int] |
| | lib.layer_set_norms.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p] |
| | lib.layer_set_bias.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p, f32p] |
| | lib.layer_set_qk_norm.argtypes = [ctypes.c_void_p, ctypes.c_int, f32p, f32p] |
| | lib.layer_set_linears.argtypes = [ |
| | ctypes.c_void_p, ctypes.c_int, |
| | |
| | u64p, u64p, f32p, ctypes.c_int, ctypes.c_int, |
| | |
| | u64p, u64p, f32p, ctypes.c_int, ctypes.c_int, |
| | |
| | u64p, u64p, f32p, ctypes.c_int, ctypes.c_int, |
| | |
| | u64p, u64p, f32p, ctypes.c_int, ctypes.c_int, |
| | |
| | u64p, u64p, f32p, ctypes.c_int, ctypes.c_int, |
| | |
| | u64p, u64p, f32p, ctypes.c_int, ctypes.c_int, |
| | |
| | u64p, u64p, f32p, ctypes.c_int, ctypes.c_int, |
| | ctypes.c_int, |
| | ] |
| | lib.model_reset_cache.argtypes = [ctypes.c_void_p] |
| |
|
| | |
| | print("Allocating model...") |
| | model = lib.model_alloc( |
| | n_planes, hidden, inter, n_heads, n_kv_heads, |
| | head_dim, n_layers, vocab, rope_theta, |
| | has_attn_bias, tie_embeddings |
| | ) |
| |
|
| | |
| | _refs = [] |
| |
|
| | def load_fp16(name): |
| | fname = name.replace(".", "_") + ".fp16" |
| | path = os.path.join(model_dir, fname) |
| | data = np.fromfile(path, dtype=np.uint16) |
| | _refs.append(data) |
| | return data.ctypes.data_as(u16p) |
| |
|
| | def load_f32_from_fp16(name): |
| | fname = name.replace(".", "_") + ".fp16" |
| | path = os.path.join(model_dir, fname) |
| | data = np.fromfile(path, dtype=np.uint16) |
| | |
| | f32 = data.view(np.float16).astype(np.float32) |
| | _refs.append(f32) |
| | return f32.ctypes.data_as(f32p) |
| |
|
| | def load_unary(name): |
| | fname = name.replace(".", "_") |
| | sign = np.fromfile(os.path.join(model_dir, f"{fname}.sign"), dtype=np.uint64) |
| | planes = np.fromfile(os.path.join(model_dir, f"{fname}.planes"), dtype=np.uint64) |
| | scales = np.fromfile(os.path.join(model_dir, f"{fname}.scales"), dtype=np.float32) |
| | _refs.extend([sign, planes, scales]) |
| | return (sign.ctypes.data_as(u64p), planes.ctypes.data_as(u64p), |
| | scales.ctypes.data_as(f32p)) |
| |
|
| | |
| | print("Loading embeddings...") |
| | embed_ptr = load_fp16("model.embed_tokens.weight") |
| | lib.model_set_embed(model, embed_ptr) |
| |
|
| | |
| | print("Loading final norm...") |
| | fnorm_ptr = load_f32_from_fp16("model.norm.weight") |
| | lib.model_set_final_norm(model, fnorm_ptr) |
| |
|
| | |
| | print(f"Loading {n_layers} layers...") |
| | for l in range(n_layers): |
| | prefix = f"model.layers.{l}" |
| |
|
| | |
| | in_norm = load_f32_from_fp16(f"{prefix}.input_layernorm.weight") |
| | post_norm = load_f32_from_fp16(f"{prefix}.post_attention_layernorm.weight") |
| | lib.layer_set_norms(model, l, in_norm, post_norm) |
| |
|
| | |
| | q_norm = load_f32_from_fp16(f"{prefix}.self_attn.q_norm.weight") |
| | k_norm = load_f32_from_fp16(f"{prefix}.self_attn.k_norm.weight") |
| | lib.layer_set_qk_norm(model, l, q_norm, k_norm) |
| |
|
| | |
| | q_s, q_p, q_sc = load_unary(f"{prefix}.self_attn.q_proj.weight") |
| | k_s, k_p, k_sc = load_unary(f"{prefix}.self_attn.k_proj.weight") |
| | v_s, v_p, v_sc = load_unary(f"{prefix}.self_attn.v_proj.weight") |
| | o_s, o_p, o_sc = load_unary(f"{prefix}.self_attn.o_proj.weight") |
| | g_s, g_p, g_sc = load_unary(f"{prefix}.mlp.gate_proj.weight") |
| | u_s, u_p, u_sc = load_unary(f"{prefix}.mlp.up_proj.weight") |
| | d_s, d_p, d_sc = load_unary(f"{prefix}.mlp.down_proj.weight") |
| |
|
| | |
| | q_shape = manifest["unary"][f"{prefix}.self_attn.q_proj.weight"] |
| | k_shape = manifest["unary"][f"{prefix}.self_attn.k_proj.weight"] |
| | v_shape = manifest["unary"][f"{prefix}.self_attn.v_proj.weight"] |
| | o_shape = manifest["unary"][f"{prefix}.self_attn.o_proj.weight"] |
| | g_shape = manifest["unary"][f"{prefix}.mlp.gate_proj.weight"] |
| | u_shape = manifest["unary"][f"{prefix}.mlp.up_proj.weight"] |
| | d_shape = manifest["unary"][f"{prefix}.mlp.down_proj.weight"] |
| |
|
| | lib.layer_set_linears( |
| | model, l, |
| | q_s, q_p, q_sc, q_shape[0], q_shape[1], |
| | k_s, k_p, k_sc, k_shape[0], k_shape[1], |
| | v_s, v_p, v_sc, v_shape[0], v_shape[1], |
| | o_s, o_p, o_sc, o_shape[0], o_shape[1], |
| | g_s, g_p, g_sc, g_shape[0], g_shape[1], |
| | u_s, u_p, u_sc, u_shape[0], u_shape[1], |
| | d_s, d_p, d_sc, d_shape[0], d_shape[1], |
| | n_planes |
| | ) |
| |
|
| | if (l + 1) % 6 == 0 or l == n_layers - 1: |
| | print(f" Loaded layer {l+1}/{n_layers}") |
| |
|
| | |
| | print("Tokenizing prompt...") |
| | from transformers import AutoTokenizer |
| | tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) |
| | input_ids = tokenizer.encode(prompt) |
| | print(f"Prompt: {len(input_ids)} tokens") |
| |
|
| | eos_token = config.get("eos_token_id", 151645) |
| |
|
| | |
| | prompt_arr = (ctypes.c_int * len(input_ids))(*input_ids) |
| | out_arr = (ctypes.c_int * max_tokens)() |
| |
|
| | print(f"\nGenerating (temp={temperature}, top_p={top_p})...") |
| | t0 = time.time() |
| | n_generated = lib.generate( |
| | model, prompt_arr, len(input_ids), |
| | out_arr, max_tokens, |
| | ctypes.c_float(temperature), ctypes.c_float(top_p), |
| | eos_token |
| | ) |
| | dt = time.time() - t0 |
| |
|
| | out_ids = [out_arr[i] for i in range(n_generated)] |
| | text = tokenizer.decode(out_ids, skip_special_tokens=True) |
| |
|
| | total_tokens = len(input_ids) + n_generated |
| | print(f"\n=== Output ({n_generated} tokens in {dt:.1f}s = {n_generated/dt:.1f} tok/s) ===") |
| | print(text) |
| | print(f"\nPrefill: {len(input_ids)} tokens, Decode: {n_generated} tokens") |
| | print(f"Total time: {dt:.1f}s, Speed: {total_tokens/dt:.1f} tok/s total, {n_generated/dt:.1f} tok/s decode") |
| |
|
| | return text |
| |
|
| | if __name__ == "__main__": |
| | model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-unary" |
| | prompt = sys.argv[2] if len(sys.argv) > 2 else "What is 2+2? Think step by step." |
| | max_tokens = int(sys.argv[3]) if len(sys.argv) > 3 else 64 |
| | load_and_run(model_dir, prompt, max_tokens=max_tokens) |
| |
|