unary-quantization-research / convert_proper_unary.py
OpenTransformer's picture
Add files using upload-large-folder tool
19ed98b verified
#!/usr/bin/env python3
"""
Convert Qwen3-4B BF16 safetensors → Proper Unary.
Reads safetensors raw bytes (no framework dependency for BF16).
(c) 2026 OpenTransformers Ltd / Scott Bisset
"""
import numpy as np
import json, os, sys, gc, shutil, struct, time
class SafeTensorReader:
"""Read safetensors one tensor at a time (memory efficient)."""
def __init__(self, path):
self.f = open(path, "rb")
header_size = struct.unpack("<Q", self.f.read(8))[0]
self.header = json.loads(self.f.read(header_size).decode("utf-8"))
self.data_start = 8 + header_size
self._meta = {k: v for k, v in self.header.items() if k != "__metadata__"}
def keys(self):
return list(self._meta.keys())
def get(self, name):
meta = self._meta[name]
dtype = meta["dtype"]
shape = tuple(meta["shape"])
start, end = meta["data_offsets"]
self.f.seek(self.data_start + start)
raw = self.f.read(end - start)
if dtype == "BF16":
u16 = np.frombuffer(raw, dtype=np.uint16)
u32 = u16.astype(np.uint32) << 16
return u32.view(np.float32).reshape(shape)
elif dtype == "F16":
return np.frombuffer(raw, dtype=np.float16).reshape(shape).astype(np.float32)
elif dtype == "F32":
return np.frombuffer(raw, dtype=np.float32).reshape(shape).copy()
else:
raise ValueError(f"Unknown dtype {dtype}")
def close(self):
self.f.close()
def encode_proper_unary(weight_f32, K):
"""Encode 2D float32 matrix to proper unary."""
rows, cols = weight_f32.shape
chunks = (cols + 63) // 64
row_absmax = np.abs(weight_f32).max(axis=1).astype(np.float32)
row_absmax = np.maximum(row_absmax, 1e-10)
row_scales = (row_absmax / K).astype(np.float32)
inv_scales = K / row_absmax
magnitudes = np.clip(
np.round(np.abs(weight_f32) * inv_scales[:, None]).astype(np.int32), 0, K)
sign_bits = np.zeros((rows, chunks), dtype=np.uint64)
slot_planes = np.zeros((K, rows, chunks), dtype=np.uint64)
negative = weight_f32 < 0
for j in range(cols):
c = j // 64
b = np.uint64(j % 64)
bit = np.uint64(1) << b
neg_mask = negative[:, j]
if neg_mask.any():
sign_bits[neg_mask, c] |= bit
mag_col = magnitudes[:, j]
for s in range(K):
active = mag_col > s
if not active.any():
break
slot_planes[s, active, c] |= bit
return sign_bits, slot_planes, row_scales
def convert_model(model_dir, output_dir, K=32):
os.makedirs(output_dir, exist_ok=True)
config = json.load(open(os.path.join(model_dir, "config.json")))
for f in ["config.json", "tokenizer.json", "tokenizer_config.json",
"special_tokens_map.json", "generation_config.json"]:
src = os.path.join(model_dir, f)
if os.path.exists(src):
shutil.copy2(src, output_dir)
index_path = os.path.join(model_dir, "model.safetensors.index.json")
if os.path.exists(index_path):
index = json.load(open(index_path))
shard_files = sorted(set(index["weight_map"].values()))
else:
shard_files = ["model.safetensors"]
linear_names = ["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj"]
manifest = {"K": K, "format": "proper_unary", "unary": {}, "fp16": []}
total_linear = 0
total_size = 0
for shard_name in shard_files:
shard_path = os.path.join(model_dir, shard_name)
print(f"\n=== {shard_name} ===", flush=True)
reader = SafeTensorReader(shard_path)
print(f" {len(reader.keys())} tensors", flush=True)
for key in sorted(reader.keys()):
tensor = reader.get(key)
fname = key.replace(".", "_")
is_linear = any(ln + ".weight" in key for ln in linear_names)
if is_linear and tensor.ndim == 2:
rows, cols = tensor.shape
t0 = time.time()
print(f" {key}: {rows}x{cols} K={K}...", end="", flush=True)
sign_bits, slot_planes, row_scales = encode_proper_unary(tensor, K)
dt = time.time() - t0
sign_bits.tofile(os.path.join(output_dir, fname + ".sign"))
slot_planes.tofile(os.path.join(output_dir, fname + ".slots"))
row_scales.tofile(os.path.join(output_dir, fname + ".scales"))
manifest["unary"][key] = [rows, cols]
sz = sign_bits.nbytes + slot_planes.nbytes + row_scales.nbytes
total_size += sz
total_linear += 1
ratio = sz / (rows * cols * 2)
print(f" {sz/1e6:.1f}MB ({ratio:.1f}x) [{dt:.0f}s]", flush=True)
del sign_bits, slot_planes, row_scales
else:
# FP16
t_f16 = tensor.astype(np.float16)
out_data = t_f16.view(np.uint16)
out_data.tofile(os.path.join(output_dir, fname + ".fp16"))
manifest["fp16"].append(key)
sz = out_data.nbytes
total_size += sz
print(f" {key}: {tensor.shape} -> FP16 ({sz/1e6:.1f}MB)", flush=True)
del t_f16, out_data
del tensor
reader.close()
gc.collect()
json.dump(manifest, open(os.path.join(output_dir, "manifest.json"), "w"), indent=2)
print(f"\n{'='*50}", flush=True)
print(f"DONE: {total_linear} layers, K={K}", flush=True)
print(f"Total: {total_size/1e9:.2f} GB (orig ~7.6 GB, ratio {total_size/7.6e9:.1f}x)", flush=True)
if __name__ == "__main__":
model_dir = sys.argv[1] if len(sys.argv) > 1 else "/root/ternary_engine/qwen3-4b-thinking-hf"
output_dir = sys.argv[2] if len(sys.argv) > 2 else "/root/ternary_engine/qwen3-4b-proper-unary"
K = int(sys.argv[3]) if len(sys.argv) > 3 else 32
convert_model(model_dir, output_dir, K)