| | |
| | """ |
| | Memory-efficient unary converter for Qwen3. |
| | Processes one safetensors shard at a time to avoid OOM. |
| | (c) 2026 OpenTransformers Ltd / Scott Bisset |
| | """ |
| | import numpy as np |
| | import os, sys, json, time, gc |
| |
|
| | def quantize_unary(w_fp32, n_planes): |
| | """Quantize weight matrix to unary bitplane format""" |
| | out_dim, in_dim = w_fp32.shape |
| | max_val = n_planes |
| |
|
| | abs_max = np.abs(w_fp32).max(axis=1, keepdims=True) |
| | abs_max = np.where(abs_max == 0, 1.0, abs_max) |
| | scaled = w_fp32 / abs_max * max_val |
| | rounded = np.clip(np.round(scaled), -max_val, max_val).astype(np.int32) |
| |
|
| | scales = (abs_max.flatten() / max_val).astype(np.float32) |
| | signs = (rounded < 0) |
| | magnitudes = np.abs(rounded) |
| |
|
| | chunks = (in_dim + 63) // 64 |
| | padded = chunks * 64 |
| |
|
| | if padded > in_dim: |
| | signs = np.pad(signs, ((0,0),(0,padded-in_dim)), constant_values=False) |
| | magnitudes = np.pad(magnitudes, ((0,0),(0,padded-in_dim)), constant_values=0) |
| |
|
| | sign_bits = np.packbits(signs.astype(np.uint8), axis=1, bitorder='little') |
| | sign_u64 = sign_bits.view(np.uint64)[:, :chunks] |
| |
|
| | plane_bits = np.zeros((n_planes, out_dim, chunks), dtype=np.uint64) |
| | for p in range(n_planes): |
| | mask = (magnitudes > p) |
| | packed = np.packbits(mask.astype(np.uint8), axis=1, bitorder='little') |
| | plane_bits[p] = packed.view(np.uint64)[:, :chunks] |
| |
|
| | return sign_u64, plane_bits, scales |
| |
|
| | def convert_model(model_dir, output_dir, n_planes=7): |
| | os.makedirs(output_dir, exist_ok=True) |
| |
|
| | config = json.load(open(os.path.join(model_dir, "config.json"))) |
| | n_layers = config["num_hidden_layers"] |
| | hidden = config["hidden_size"] |
| |
|
| | |
| | index_file = os.path.join(model_dir, "model.safetensors.index.json") |
| | if os.path.exists(index_file): |
| | index = json.load(open(index_file)) |
| | weight_map = index["weight_map"] |
| | shards = sorted(set(weight_map.values())) |
| | else: |
| | |
| | shards = [f for f in os.listdir(model_dir) if f.endswith('.safetensors')] |
| | weight_map = None |
| |
|
| | print(f"Model: {n_layers} layers, hidden={hidden}, n_planes={n_planes}") |
| | print(f"Shards: {len(shards)}") |
| |
|
| | manifest = {"unary": {}, "fp16": {}, "n_planes": n_planes, "n_layers": n_layers, "config": config} |
| | total_converted = 0 |
| | total_linear = 0 |
| |
|
| | |
| | if weight_map: |
| | total_linear = sum(1 for k in weight_map if k.endswith(".weight") and "proj" in k) |
| | print(f"Total linear layers to convert: {total_linear}") |
| |
|
| | import torch |
| | from safetensors import safe_open |
| |
|
| | for shard_idx, shard in enumerate(shards): |
| | shard_path = os.path.join(model_dir, shard) |
| | print(f"\n=== Shard {shard_idx+1}/{len(shards)}: {shard} ===") |
| |
|
| | with safe_open(shard_path, framework="pt") as f: |
| | keys = list(f.keys()) |
| | print(f" {len(keys)} tensors in shard") |
| |
|
| | for key in sorted(keys): |
| | fname = key.replace(".", "_") |
| | is_linear = key.endswith(".weight") and "proj" in key and f.get_tensor(key).dim() == 2 |
| |
|
| | if is_linear: |
| | |
| | sign_path = os.path.join(output_dir, f"{fname}.sign") |
| | if os.path.exists(sign_path): |
| | w = f.get_tensor(key) |
| | manifest["unary"][key] = list(w.shape) |
| | total_converted += 1 |
| | print(f" [SKIP] {key} already converted") |
| | continue |
| |
|
| | w = f.get_tensor(key).float().numpy() |
| | t0 = time.time() |
| | sign, planes, scales = quantize_unary(w, n_planes) |
| | dt = time.time() - t0 |
| |
|
| | np.array(sign).tofile(os.path.join(output_dir, f"{fname}.sign")) |
| | np.array(planes).tofile(os.path.join(output_dir, f"{fname}.planes")) |
| | np.array(scales).tofile(os.path.join(output_dir, f"{fname}.scales")) |
| |
|
| | orig_mb = w.nbytes / 1e6 |
| | comp_mb = (sign.nbytes + planes.nbytes + scales.nbytes) / 1e6 |
| | total_converted += 1 |
| | manifest["unary"][key] = list(w.shape) |
| | print(f" [{total_converted}/{total_linear}] {key}: {list(w.shape)} -> {comp_mb:.1f}MB ({orig_mb/comp_mb:.1f}x) [{dt:.1f}s]") |
| |
|
| | del w, sign, planes, scales |
| | else: |
| | |
| | fp16_path = os.path.join(output_dir, f"{fname}.fp16") |
| | if os.path.exists(fp16_path): |
| | w = f.get_tensor(key) |
| | manifest["fp16"][key] = list(w.shape) |
| | print(f" [SKIP] {key} already saved") |
| | continue |
| |
|
| | w = f.get_tensor(key).float().numpy() |
| | w_fp16 = w.astype(np.float16) |
| | w_fp16.view(np.uint16).tofile(fp16_path) |
| | manifest["fp16"][key] = list(w.shape) |
| | print(f" [FP16] {key}: {list(w.shape)} ({w_fp16.nbytes/1e6:.1f}MB)") |
| | del w, w_fp16 |
| |
|
| | |
| | gc.collect() |
| | print(f" Shard done, memory freed") |
| |
|
| | |
| | with open(os.path.join(output_dir, "manifest.json"), "w") as f: |
| | json.dump(manifest, f, indent=2) |
| |
|
| | |
| | import shutil |
| | for cf in ["config.json", "tokenizer.json", "tokenizer_config.json", "special_tokens_map.json"]: |
| | src = os.path.join(model_dir, cf) |
| | if os.path.exists(src): |
| | shutil.copy(src, os.path.join(output_dir, cf)) |
| |
|
| | |
| | total_unary = sum(os.path.getsize(os.path.join(output_dir, f)) |
| | for f in os.listdir(output_dir) |
| | if f.endswith((".sign", ".planes", ".scales"))) |
| | total_fp16 = sum(os.path.getsize(os.path.join(output_dir, f)) |
| | for f in os.listdir(output_dir) |
| | if f.endswith(".fp16")) |
| |
|
| | print(f"\n=== CONVERSION COMPLETE ===") |
| | print(f"Unary linear: {total_unary/1e9:.2f} GB") |
| | print(f"FP16 other: {total_fp16/1e9:.2f} GB") |
| | print(f"Total: {(total_unary+total_fp16)/1e9:.2f} GB") |
| |
|
| | if __name__ == "__main__": |
| | model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-hf" |
| | output_dir = sys.argv[2] if len(sys.argv) > 2 else "qwen3-4b-thinking-unary" |
| | n_planes = int(sys.argv[3]) if len(sys.argv) > 3 else 7 |
| | convert_model(model_dir, output_dir, n_planes) |
| |
|