File size: 6,020 Bytes
19ed98b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | #!/usr/bin/env python3
"""
Unary converter for Qwen3 models.
Converts safetensors to unary bitplane format.
(c) 2026 OpenTransformers Ltd / Scott Bisset
"""
import numpy as np
import os, sys, json, time
def load_safetensors_torch(model_dir):
"""Load all safetensors shards using torch backend"""
import torch
from safetensors import safe_open
weights = {}
shard_files = sorted([f for f in os.listdir(model_dir) if f.endswith('.safetensors')])
print(f"Loading {len(shard_files)} shard(s)...")
for sf in shard_files:
path = os.path.join(model_dir, sf)
print(f" {sf}...")
with safe_open(path, framework="pt") as f:
for key in f.keys():
t = f.get_tensor(key)
weights[key] = t.float().numpy() # Convert BF16->FP32
return weights
def quantize_unary_vectorized(w_fp32, n_planes):
"""Quantize a weight matrix to unary bitplane format using vectorized numpy"""
out_dim, in_dim = w_fp32.shape
max_val = n_planes # values from -n_planes to +n_planes
# Scale to [-max_val, max_val]
abs_max = np.abs(w_fp32).max(axis=1, keepdims=True)
abs_max = np.where(abs_max == 0, 1.0, abs_max)
scaled = w_fp32 / abs_max * max_val
rounded = np.clip(np.round(scaled), -max_val, max_val).astype(np.int32)
# Per-row scales
scales = (abs_max.flatten() / max_val).astype(np.float32)
# Sign and magnitude
signs = (rounded < 0) # True = negative
magnitudes = np.abs(rounded) # 0 to n_planes
# Pack into uint64 bitplanes
chunks = (in_dim + 63) // 64
padded = chunks * 64
# Pad to multiple of 64
if padded > in_dim:
signs = np.pad(signs, ((0,0),(0,padded-in_dim)), constant_values=False)
magnitudes = np.pad(magnitudes, ((0,0),(0,padded-in_dim)), constant_values=0)
# Pack sign bits: [out_dim, chunks] as uint64
sign_bits = np.packbits(signs.astype(np.uint8), axis=1, bitorder='little')
sign_u64 = sign_bits.view(np.uint64)[:, :chunks]
# Pack magnitude planes: for each plane p, bit is set if magnitude > p
plane_bits = np.zeros((n_planes, out_dim, chunks), dtype=np.uint64)
for p in range(n_planes):
mask = (magnitudes > p)
packed = np.packbits(mask.astype(np.uint8), axis=1, bitorder='little')
plane_bits[p] = packed.view(np.uint64)[:, :chunks]
return sign_u64, plane_bits, scales
def convert_model(model_dir, output_dir, n_planes=7):
"""Convert a Qwen3 model to unary format"""
os.makedirs(output_dir, exist_ok=True)
# Load config
config = json.load(open(os.path.join(model_dir, "config.json")))
n_layers = config["num_hidden_layers"]
hidden = config["hidden_size"]
print(f"Model: {n_layers} layers, hidden={hidden}, n_planes={n_planes}")
# Load weights
weights = load_safetensors_torch(model_dir)
print(f"Loaded {len(weights)} tensors")
# Identify linear layers (2D weight matrices in attn/mlp)
linear_keys = [k for k in weights if k.endswith(".weight") and weights[k].ndim == 2
and ("proj" in k)]
manifest = {"unary": {}, "fp16": {}}
# Convert linear layers to unary
total = len(linear_keys)
for idx, key in enumerate(sorted(linear_keys)):
w = weights[key]
t0 = time.time()
sign, planes, scales = quantize_unary_vectorized(w, n_planes)
dt = time.time() - t0
# Flatten name for filesystem
fname = key.replace(".", "_")
np.array(sign).tofile(os.path.join(output_dir, f"{fname}.sign"))
np.array(planes).tofile(os.path.join(output_dir, f"{fname}.planes"))
np.array(scales).tofile(os.path.join(output_dir, f"{fname}.scales"))
manifest["unary"][key] = list(w.shape)
sparsity = 1.0 - np.count_nonzero(np.abs(np.round(w / np.abs(w).max(axis=1, keepdims=True) * n_planes)).astype(int)) / w.size
orig_mb = w.nbytes / 1e6
comp_mb = (sign.nbytes + planes.nbytes + scales.nbytes) / 1e6
print(f" [{idx+1}/{total}] {key}: {list(w.shape)} -> {comp_mb:.1f}MB ({orig_mb/comp_mb:.1f}x) [{dt:.1f}s]")
# Save FP16 weights (norms, embeddings, QK-norms)
fp16_keys = [k for k in weights if k not in linear_keys]
for key in sorted(fp16_keys):
w = weights[key]
fname = key.replace(".", "_")
w_fp16 = w.astype(np.float16)
w_fp16.view(np.uint16).tofile(os.path.join(output_dir, f"{fname}.fp16"))
manifest["fp16"][key] = list(w.shape)
print(f" [FP16] {key}: {list(w.shape)} ({w_fp16.nbytes/1e6:.1f}MB)")
# Save manifest and config
manifest["n_planes"] = n_planes
manifest["n_layers"] = n_layers
manifest["config"] = config
with open(os.path.join(output_dir, "manifest.json"), "w") as f:
json.dump(manifest, f, indent=2)
# Copy config
import shutil
shutil.copy(os.path.join(model_dir, "config.json"), os.path.join(output_dir, "config.json"))
# Size summary
total_unary = sum(os.path.getsize(os.path.join(output_dir, f))
for f in os.listdir(output_dir)
if f.endswith((".sign", ".planes", ".scales")))
total_fp16 = sum(os.path.getsize(os.path.join(output_dir, f))
for f in os.listdir(output_dir)
if f.endswith(".fp16"))
orig_total = sum(w.nbytes for w in weights.values())
print(f"\n=== CONVERSION COMPLETE ===")
print(f"Original FP32: {orig_total/1e9:.2f} GB")
print(f"Unary linear: {total_unary/1e9:.2f} GB")
print(f"FP16 other: {total_fp16/1e9:.2f} GB")
print(f"Total: {(total_unary+total_fp16)/1e9:.2f} GB")
print(f"Compression: {orig_total/(total_unary+total_fp16):.1f}x")
if __name__ == "__main__":
model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-hf"
output_dir = sys.argv[2] if len(sys.argv) > 2 else "qwen3-4b-thinking-unary"
n_planes = int(sys.argv[3]) if len(sys.argv) > 3 else 7
convert_model(model_dir, output_dir, n_planes)
|