#!/usr/bin/env python3 """ Log-unary converter. Instead of thermometer (plane p = mag > p), uses binary decomposition (plane p = bit p of magnitude). Fewer planes, wider dynamic range. 3 log-planes: 9 levels (-4 to +4), storage = 3 bitplanes vs 7 linear planes: 15 levels (-7 to +7), storage = 7 bitplanes 4 log-planes: 17 levels (-8 to +8), storage = 4 bitplanes <-- sweet spot 5 log-planes: 33 levels (-16 to +16), storage = 5 bitplanes (c) 2026 OpenTransformers Ltd / Scott Bisset """ import numpy as np import os, sys, json, time, gc def quantize_log_unary(w_fp32, n_planes): """Quantize weight matrix to log-unary format (binary magnitude planes)""" out_dim, in_dim = w_fp32.shape max_level = (1 << n_planes) - 1 # 2^n - 1 # Per-row scale abs_max = np.abs(w_fp32).max(axis=1, keepdims=True) abs_max = np.where(abs_max == 0, 1.0, abs_max) scales = (abs_max.flatten() / max_level).astype(np.float32) # Quantize to integer magnitudes scaled = w_fp32 / abs_max * max_level rounded = np.clip(np.round(scaled), -max_level, max_level).astype(np.int32) signs = (rounded < 0) magnitudes = np.abs(rounded) # Pad to 64-bit chunks chunks = (in_dim + 63) // 64 padded = chunks * 64 if padded > in_dim: signs = np.pad(signs, ((0,0),(0,padded-in_dim)), constant_values=False) magnitudes = np.pad(magnitudes, ((0,0),(0,padded-in_dim)), constant_values=0) # Pack sign bits sign_bits = np.packbits(signs.astype(np.uint8), axis=1, bitorder='little') sign_u64 = sign_bits.view(np.uint64)[:, :chunks] # Pack log-planes: plane p = bit p of magnitude plane_bits = np.zeros((n_planes, out_dim, chunks), dtype=np.uint64) for p in range(n_planes): bit_mask = (magnitudes >> p) & 1 # extract bit p packed = np.packbits(bit_mask.astype(np.uint8), axis=1, bitorder='little') plane_bits[p] = packed.view(np.uint64)[:, :chunks] return sign_u64, plane_bits, scales def convert_model(model_dir, output_dir, n_planes=4): os.makedirs(output_dir, exist_ok=True) config = json.load(open(os.path.join(model_dir, "config.json"))) n_layers = config["num_hidden_layers"] hidden = config["hidden_size"] max_level = (1 << n_planes) - 1 index_file = os.path.join(model_dir, "model.safetensors.index.json") if os.path.exists(index_file): index = json.load(open(index_file)) weight_map = index["weight_map"] shards = sorted(set(weight_map.values())) else: shards = [f for f in os.listdir(model_dir) if f.endswith('.safetensors')] weight_map = None print(f"LOG-UNARY CONVERSION") print(f" Model: {n_layers} layers, hidden={hidden}") print(f" Log-planes: {n_planes} -> {2*max_level+1} levels (range -{max_level}..+{max_level})") print(f" Shards: {len(shards)}") manifest = {"unary": {}, "fp16": {}, "n_planes": n_planes, "n_layers": n_layers, "encoding": "log_unary", "config": config} total_linear = sum(1 for k in (weight_map or {}) if k.endswith(".weight") and "proj" in k) converted = 0 import torch from safetensors import safe_open for si, shard in enumerate(shards): path = os.path.join(model_dir, shard) print(f"\n=== Shard {si+1}/{len(shards)}: {shard} ===") with safe_open(path, framework="pt") as f: for key in sorted(f.keys()): fname = key.replace(".", "_") is_linear = key.endswith(".weight") and "proj" in key and f.get_tensor(key).dim() == 2 if is_linear: sign_path = os.path.join(output_dir, f"{fname}.sign") if os.path.exists(sign_path): manifest["unary"][key] = list(f.get_tensor(key).shape) converted += 1 print(f" [SKIP] {key}") continue w = f.get_tensor(key).float().numpy() t0 = time.time() sign, planes, scales = quantize_log_unary(w, n_planes) dt = time.time() - t0 np.array(sign).tofile(os.path.join(output_dir, f"{fname}.sign")) np.array(planes).tofile(os.path.join(output_dir, f"{fname}.planes")) np.array(scales).tofile(os.path.join(output_dir, f"{fname}.scales")) manifest["unary"][key] = list(w.shape) converted += 1 orig_mb = w.nbytes / 1e6 comp_mb = (sign.nbytes + planes.nbytes + scales.nbytes) / 1e6 print(f" [{converted}/{total_linear}] {key}: {list(w.shape)} " f"-> {comp_mb:.1f}MB ({orig_mb/comp_mb:.1f}x) [{dt:.1f}s]") del w, sign, planes, scales else: fp16_path = os.path.join(output_dir, f"{fname}.fp16") if os.path.exists(fp16_path): manifest["fp16"][key] = list(f.get_tensor(key).shape) print(f" [SKIP] {key}") continue w = f.get_tensor(key).float().numpy() w_fp16 = w.astype(np.float16) w_fp16.view(np.uint16).tofile(fp16_path) manifest["fp16"][key] = list(w.shape) print(f" [FP16] {key}: {list(w.shape)} ({w_fp16.nbytes/1e6:.1f}MB)") del w, w_fp16 gc.collect() with open(os.path.join(output_dir, "manifest.json"), "w") as f: json.dump(manifest, f, indent=2) import shutil for cf in ["config.json", "tokenizer.json", "tokenizer_config.json", "special_tokens_map.json"]: src = os.path.join(model_dir, cf) if os.path.exists(src): shutil.copy(src, os.path.join(output_dir, cf)) total_unary = sum(os.path.getsize(os.path.join(output_dir, f)) for f in os.listdir(output_dir) if f.endswith((".sign",".planes",".scales"))) total_fp16 = sum(os.path.getsize(os.path.join(output_dir, f)) for f in os.listdir(output_dir) if f.endswith(".fp16")) print(f"\n=== LOG-UNARY CONVERSION COMPLETE ===") print(f" Encoding: {n_planes} log-planes (binary magnitude)") print(f" Unary: {total_unary/1e9:.2f} GB") print(f" FP16: {total_fp16/1e9:.2f} GB") print(f" Total: {(total_unary+total_fp16)/1e9:.2f} GB") if __name__ == "__main__": model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-hf" output_dir = sys.argv[2] if len(sys.argv) > 2 else "qwen3-4b-log-unary" n_planes = int(sys.argv[3]) if len(sys.argv) > 3 else 4 convert_model(model_dir, output_dir, n_planes)