| |
| """ |
| FAST proper unary converter — vectorized bitpacking via numpy. |
| |
| Instead of iterating columns one at a time, processes plane-by-plane |
| with vectorized comparisons, then packs to uint64 using np.packbits. |
| |
| (c) 2026 OpenTransformers Ltd / Scott Bisset |
| """ |
|
|
| import torch, json, os, sys, gc, shutil |
| from safetensors import safe_open |
| import numpy as np |
|
|
|
|
| def pack_bits_to_uint64(bool_matrix): |
| """ |
| Pack [rows, cols] boolean → [rows, chunks] uint64 |
| where chunks = ceil(cols/64). |
| |
| Bit j of element (r, c) corresponds to column c*64+j. |
| Uses little-endian bit ordering within each uint64. |
| """ |
| rows, cols = bool_matrix.shape |
| chunks = (cols + 63) // 64 |
| |
| |
| if cols % 64: |
| padded = np.zeros((rows, chunks * 64), dtype=np.uint8) |
| padded[:, :cols] = bool_matrix.astype(np.uint8) |
| else: |
| padded = bool_matrix.astype(np.uint8) |
| |
| |
| reshaped = padded.reshape(rows, chunks, 64) |
| |
| |
| |
| result = np.zeros((rows, chunks), dtype=np.uint64) |
| for bit in range(64): |
| result |= reshaped[:, :, bit].astype(np.uint64) << np.uint64(bit) |
| |
| return result |
|
|
|
|
| def encode_fast(weight_f32_np, quantum, K): |
| """ |
| Fast vectorized proper unary encoding. |
| weight_f32_np: [rows, cols] numpy float32 |
| Returns: sign [rows, chunks] uint64, slots [K, rows, chunks] uint64, clip_count |
| """ |
| rows, cols = weight_f32_np.shape |
| chunks = (cols + 63) // 64 |
| |
| inv_q = 1.0 / quantum |
| magnitudes = np.round(np.abs(weight_f32_np) * inv_q).astype(np.int32) |
| clip_count = int(np.sum(magnitudes > K)) |
| magnitudes = np.clip(magnitudes, 0, K) |
| |
| |
| signs_bool = weight_f32_np < 0 |
| sign_packed = pack_bits_to_uint64(signs_bool) |
| |
| |
| |
| slots_packed = np.zeros((K, rows, chunks), dtype=np.uint64) |
| |
| for p in range(K): |
| active = magnitudes > p |
| slots_packed[p] = pack_bits_to_uint64(active) |
| |
| if (p + 1) % 8 == 0 or p == K - 1: |
| print(f" plane {p+1}/{K}", end="\r", flush=True) |
| |
| print(f" {K}/{K} planes done, {clip_count} clipped") |
| return sign_packed, slots_packed, clip_count |
|
|
|
|
| def convert(model_dir, output_dir, K=32, clip_pct=99.9): |
| os.makedirs(output_dir, exist_ok=True) |
| |
| config = json.load(open(os.path.join(model_dir, "config.json"))) |
| print(f"Model: {config.get('model_type', '?')}") |
| print(f" Layers={config['num_hidden_layers']} Hidden={config['hidden_size']} Inter={config['intermediate_size']}") |
| |
| |
| index_path = os.path.join(model_dir, "model.safetensors.index.json") |
| if os.path.exists(index_path): |
| index = json.load(open(index_path)) |
| shards = sorted(set(index["weight_map"].values())) |
| weight_map = index["weight_map"] |
| else: |
| shards = ["model.safetensors"] |
| weight_map = None |
| |
| |
| print("\nScanning weights...") |
| all_abs = [] |
| linear_names = [] |
| global_max = 0.0 |
| |
| for shard in shards: |
| path = os.path.join(model_dir, shard) |
| print(f" {shard}...") |
| with safe_open(path, framework="pt") as f: |
| for name in f.keys(): |
| t = f.get_tensor(name).float() |
| if t.dim() == 2 and "norm" not in name and "embed" not in name: |
| linear_names.append(name) |
| am = t.abs().max().item() |
| if am > global_max: global_max = am |
| idx = torch.randint(0, t.numel(), (2000,)) |
| all_abs.append(t.flatten()[idx].abs()) |
| |
| all_abs_t = torch.cat(all_abs) |
| clip_val = torch.quantile(all_abs_t, clip_pct / 100.0).item() |
| quantum = clip_val / K |
| |
| print(f"\n Absmax={global_max:.6f} P{clip_pct}={clip_val:.6f}") |
| print(f" K={K} quantum={quantum:.8f}") |
| |
| mags = (all_abs_t / quantum).round().clamp(0, K) |
| print(f" Mean mag={mags.mean():.1f} Median={mags.median():.1f} Zero={100*(mags==0).float().mean():.1f}% Clipped={100*(mags==K).float().mean():.1f}%") |
| |
| del all_abs, all_abs_t, mags |
| gc.collect() |
| |
| manifest = { |
| "format": "proper_unary", |
| "quantum": float(quantum), |
| "K": K, |
| "clip_pct": clip_pct, |
| "clip_val": float(clip_val), |
| "global_absmax": float(global_max), |
| "unary": {}, |
| "fp16": [], |
| } |
| |
| total_unary = 0 |
| total_fp16 = 0 |
| total_clip = 0 |
| done = 0 |
| |
| for shard in shards: |
| path = os.path.join(model_dir, shard) |
| |
| |
| shard_lins = [n for n in linear_names if (weight_map or {}).get(n, "model.safetensors") == shard] |
| print(f"\n{shard}: {len(shard_lins)} linear layers") |
| |
| with safe_open(path, framework="pt") as f: |
| |
| for name in f.keys(): |
| if name in linear_names: |
| continue |
| fname = name.replace(".", "_") + ".fp16" |
| out_path = os.path.join(output_dir, fname) |
| if not os.path.exists(out_path): |
| t = f.get_tensor(name).half().numpy() |
| t.view(np.uint16).tofile(out_path) |
| total_fp16 += os.path.getsize(out_path) |
| manifest["fp16"].append(name) |
| print(f" FP16: {name} {t.shape}") |
| |
| |
| for name in shard_lins: |
| fname = name.replace(".", "_") |
| sign_path = os.path.join(output_dir, f"{fname}.usign") |
| slots_path = os.path.join(output_dir, f"{fname}.uslots") |
| |
| if os.path.exists(sign_path) and os.path.exists(slots_path): |
| t_shape = list(f.get_tensor(name).shape) |
| manifest["unary"][name] = t_shape |
| total_unary += os.path.getsize(sign_path) + os.path.getsize(slots_path) |
| done += 1 |
| print(f" Skip: {name}") |
| continue |
| |
| t = f.get_tensor(name).float().numpy() |
| rows, cols = t.shape |
| print(f" {name} [{rows}x{cols}]", flush=True) |
| |
| sign_p, slots_p, clip_c = encode_fast(t, quantum, K) |
| total_clip += clip_c |
| |
| sign_p.tofile(sign_path) |
| slots_p.tofile(slots_path) |
| |
| s_sz = os.path.getsize(sign_path) |
| sl_sz = os.path.getsize(slots_path) |
| total_unary += s_sz + sl_sz |
| |
| manifest["unary"][name] = [rows, cols] |
| done += 1 |
| mb = (s_sz + sl_sz) / 1e6 |
| print(f" → {mb:.1f} MB ({s_sz//1024}KB sign + {sl_sz//1024}KB slots)") |
| |
| del t, sign_p, slots_p |
| gc.collect() |
| |
| |
| for fname in os.listdir(model_dir): |
| if fname.endswith(('.json', '.txt', '.model')) and not fname.startswith('model.safetensors'): |
| src = os.path.join(model_dir, fname) |
| dst = os.path.join(output_dir, fname) |
| if not os.path.exists(dst): |
| shutil.copy2(src, dst) |
| |
| json.dump(manifest, open(os.path.join(output_dir, "manifest.json"), "w"), indent=2) |
| |
| total = total_unary + total_fp16 |
| print(f"\n{'='*60}") |
| print(f"DONE: {done} layers, quantum={quantum:.8f}, K={K}") |
| print(f" Unary: {total_unary/1e9:.2f} GB") |
| print(f" FP16: {total_fp16/1e6:.1f} MB") |
| print(f" Total: {total/1e9:.2f} GB (vs ~7.6 GB BF16 = {total/7.6e9:.1f}x)") |
| print(f" Clipped: {total_clip} values") |
| print(f"{'='*60}") |
|
|
|
|
| if __name__ == "__main__": |
| model_dir = sys.argv[1] if len(sys.argv) > 1 else "qwen3-4b-thinking-hf" |
| output_dir = sys.argv[2] if len(sys.argv) > 2 else "qwen3-4b-proper-unary" |
| K = int(sys.argv[3]) if len(sys.argv) > 3 else 32 |
| clip = float(sys.argv[4]) if len(sys.argv) > 4 else 99.9 |
| convert(model_dir, output_dir, K=K, clip_pct=clip) |
|
|