unary-quantization-research / convert_qwen3_v2.py
OpenTransformer's picture
Add files using upload-large-folder tool
19ed98b verified
#!/usr/bin/env python3
"""
Memory-efficient unary converter for Qwen3.
Processes one safetensors shard at a time to avoid OOM.
(c) 2026 OpenTransformers Ltd / Scott Bisset
"""
import numpy as np
import os, sys, json, time, gc
def quantize_unary(w_fp32, n_planes):
"""Quantize weight matrix to unary bitplane format"""
out_dim, in_dim = w_fp32.shape
max_val = n_planes
abs_max = np.abs(w_fp32).max(axis=1, keepdims=True)
abs_max = np.where(abs_max == 0, 1.0, abs_max)
scaled = w_fp32 / abs_max * max_val
rounded = np.clip(np.round(scaled), -max_val, max_val).astype(np.int32)
scales = (abs_max.flatten() / max_val).astype(np.float32)
signs = (rounded < 0)
magnitudes = np.abs(rounded)
chunks = (in_dim + 63) // 64
padded = chunks * 64
if padded > in_dim:
signs = np.pad(signs, ((0,0),(0,padded-in_dim)), constant_values=False)
magnitudes = np.pad(magnitudes, ((0,0),(0,padded-in_dim)), constant_values=0)
sign_bits = np.packbits(signs.astype(np.uint8), axis=1, bitorder='little')
sign_u64 = sign_bits.view(np.uint64)[:, :chunks]
plane_bits = np.zeros((n_planes, out_dim, chunks), dtype=np.uint64)
for p in range(n_planes):
mask = (magnitudes > p)
packed = np.packbits(mask.astype(np.uint8), axis=1, bitorder='little')
plane_bits[p] = packed.view(np.uint64)[:, :chunks]
return sign_u64, plane_bits, scales
def convert_model(model_dir, output_dir, n_planes=7):
os.makedirs(output_dir, exist_ok=True)
config = json.load(open(os.path.join(model_dir, "config.json")))
n_layers = config["num_hidden_layers"]
hidden = config["hidden_size"]
# Load index to know which keys are in which shard
index_file = os.path.join(model_dir, "model.safetensors.index.json")
if os.path.exists(index_file):
index = json.load(open(index_file))
weight_map = index["weight_map"]
shards = sorted(set(weight_map.values()))
else:
# Single shard
shards = [f for f in os.listdir(model_dir) if f.endswith('.safetensors')]
weight_map = None
print(f"Model: {n_layers} layers, hidden={hidden}, n_planes={n_planes}")
print(f"Shards: {len(shards)}")
manifest = {"unary": {}, "fp16": {}, "n_planes": n_planes, "n_layers": n_layers, "config": config}
total_converted = 0
total_linear = 0
# Count total linear layers
if weight_map:
total_linear = sum(1 for k in weight_map if k.endswith(".weight") and "proj" in k)
print(f"Total linear layers to convert: {total_linear}")
import torch
from safetensors import safe_open
for shard_idx, shard in enumerate(shards):
shard_path = os.path.join(model_dir, shard)
print(f"\n=== Shard {shard_idx+1}/{len(shards)}: {shard} ===")
with safe_open(shard_path, framework="pt") as f:
keys = list(f.keys())
print(f" {len(keys)} tensors in shard")
for key in sorted(keys):
fname = key.replace(".", "_")
is_linear = key.endswith(".weight") and "proj" in key and f.get_tensor(key).dim() == 2
if is_linear:
# Check if already converted
sign_path = os.path.join(output_dir, f"{fname}.sign")
if os.path.exists(sign_path):
w = f.get_tensor(key)
manifest["unary"][key] = list(w.shape)
total_converted += 1
print(f" [SKIP] {key} already converted")
continue
w = f.get_tensor(key).float().numpy()
t0 = time.time()
sign, planes, scales = quantize_unary(w, n_planes)
dt = time.time() - t0
np.array(sign).tofile(os.path.join(output_dir, f"{fname}.sign"))
np.array(planes).tofile(os.path.join(output_dir, f"{fname}.planes"))
np.array(scales).tofile(os.path.join(output_dir, f"{fname}.scales"))
orig_mb = w.nbytes / 1e6
comp_mb = (sign.nbytes + planes.nbytes + scales.nbytes) / 1e6
total_converted += 1
manifest["unary"][key] = list(w.shape)
print(f" [{total_converted}/{total_linear}] {key}: {list(w.shape)} -> {comp_mb:.1f}MB ({orig_mb/comp_mb:.1f}x) [{dt:.1f}s]")
del w, sign, planes, scales
else:
# FP16 weight (norms, embeddings, etc)
fp16_path = os.path.join(output_dir, f"{fname}.fp16")
if os.path.exists(fp16_path):
w = f.get_tensor(key)
manifest["fp16"][key] = list(w.shape)
print(f" [SKIP] {key} already saved")
continue
w = f.get_tensor(key).float().numpy()
w_fp16 = w.astype(np.float16)
w_fp16.view(np.uint16).tofile(fp16_path)
manifest["fp16"][key] = list(w.shape)
print(f" [FP16] {key}: {list(w.shape)} ({w_fp16.nbytes/1e6:.1f}MB)")
del w, w_fp16
# Force GC between shards
gc.collect()
print(f" Shard done, memory freed")
# Save manifest
with open(os.path.join(output_dir, "manifest.json"), "w") as f:
json.dump(manifest, f, indent=2)
# Copy config
import shutil
for cf in ["config.json", "tokenizer.json", "tokenizer_config.json", "special_tokens_map.json"]:
src = os.path.join(model_dir, cf)
if os.path.exists(src):
shutil.copy(src, os.path.join(output_dir, cf))
# Summary
total_unary = sum(os.path.getsize(os.path.join(output_dir, f))
for f in os.listdir(output_dir)
if f.endswith((".sign", ".planes", ".scales")))
total_fp16 = sum(os.path.getsize(os.path.join(output_dir, f))
for f in os.listdir(output_dir)
if f.endswith(".fp16"))
print(f"\n=== CONVERSION COMPLETE ===")
print(f"Unary linear: {total_unary/1e9:.2f} GB")
print(f"FP16 other: {total_fp16/1e9:.2f} GB")
print(f"Total: {(total_unary+total_fp16)/1e9:.2f} GB")
if __name__ == "__main__":
model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-hf"
output_dir = sys.argv[2] if len(sys.argv) > 2 else "qwen3-4b-thinking-unary"
n_planes = int(sys.argv[3]) if len(sys.argv) > 3 else 7
convert_model(model_dir, output_dir, n_planes)