| """Ternary quantizer v2 — multi-config single-pass with per-matrix checkpointing. |
| |
| Key improvements over v1: |
| 1. Multi-config: --configs d3scale-sens002,d3scale-sens003,uniform-d2,uniform-d3 |
| Computes per-group MSE-best scales (over a fixed 4-candidate set) ONCE per |
| matrix, derives all configs. ~3x faster than running v1 four times. |
| 2. Per-matrix checkpoint: each matrix's quantized output saved to .checkpoint/ |
| dir as soon as it's done. Crash-resume picks up where it left off. |
| 3. Durable atomic writes (write to .tmp, fsync, rename) — no half-written or |
| post-power-loss-truncated checkpoints. |
| 4. Streaming progress.json — monitors can poll without parsing logs. |
| 5. Per-config HF model assembled at the end from checkpoints. |
| 6. Resume validation: a fingerprint of (model id, revision, codec version, |
| depth-power mapping, tensor shape) is stored in each checkpoint and |
| re-checked on resume. A mismatch causes the stale checkpoint to be |
| discarded and re-quantized rather than silently mixed. |
| |
| What this codec quantizes (and what it does not): |
| - Quantized: every 2D linear weight matrix in the model. |
| - Kept FP16: token embeddings, all *_norm layers, and lm_head. |
| This matches the convention used by GPTQ/AWQ/NF4 and is what the paper's |
| bits-per-weight figures account for. |
| |
| Usage: |
| python quantize_model_v2.py --model Qwen/Qwen2.5-7B \ |
| --configs uniform-d2,uniform-d3 \ |
| --output /path/to/output_root \ |
| --revision <git-sha-of-source-model> \ |
| --workers 8 --dtype float16 |
| |
| Output structure: |
| output_root/ |
| .checkpoint/ |
| matrix_00000__model.layers.0.self_attn.q_proj.npz # all configs in one file |
| matrix_00001__model.layers.0.self_attn.k_proj.npz |
| ... |
| progress.json # live status |
| <config>/ |
| model/ # HF-format output |
| config.json |
| """ |
| import os, sys, time, json, gc, argparse, tempfile |
| from multiprocessing import Pool |
| import numpy as np |
|
|
| |
| |
| |
| GS = 16 |
| DEPTH_POWERS = {1: 1.0, 2: 1.5, 3: 1.2, 4: 1.0} |
|
|
| def build_levels(half, power): |
| int_levels = np.arange(-half, half + 1).astype(np.float64) |
| n = int_levels / max(half, 1) |
| if power != 1.0: |
| return np.sign(n) * np.abs(n) ** power * max(half, 1) |
| return int_levels |
|
|
| def make_boundaries(level_map, zero_boundary=None): |
| """Default = midpoints between levels. If zero_boundary given, override the |
| boundaries straddling 0 (used for d1 with custom zero-zone width).""" |
| boundaries = (level_map[:-1] + level_map[1:]) / 2 |
| if zero_boundary is not None: |
| zero_idx = int(np.argmin(np.abs(level_map))) |
| if zero_idx > 0: |
| boundaries[zero_idx - 1] = -abs(zero_boundary) |
| if zero_idx < len(level_map) - 1: |
| boundaries[zero_idx] = abs(zero_boundary) |
| return boundaries |
|
|
| def compute_best_scale_4cand(groups, depth, power, zero_boundary=None): |
| """Pick the per-group scale that minimises reconstruction MSE among 4 fixed |
| order-statistic candidates of the sorted absolute weights: |
| indices [gs-6, gs-4, gs-2, gs-1] (roughly the 69th/81st/94th/100th |
| percentiles for gs=16). |
| |
| This is a deliberately small candidate set, not an exhaustive sweep. |
| Empirically <1% PPL gap from a dense sweep on Qwen2.5-7B; in exchange |
| quantization is ~50x faster than evaluating every percentile. |
| """ |
| half = (3 ** depth) // 2 |
| gs = groups.shape[1] |
| sa = np.sort(np.abs(groups), axis=1) |
| cand_idx = np.clip(np.array([gs-6, gs-4, gs-2, gs-1]), 0, gs-1) |
| level_map = build_levels(half, power) |
| boundaries = make_boundaries(level_map, zero_boundary) |
| N = len(groups) |
| best_scale = np.zeros(N); best_mse = np.full(N, np.inf) |
| for ki in cand_idx: |
| scales = np.maximum(sa[:, ki] / max(half, 1), 1e-30) |
| normalized = groups / scales[:, None] |
| idx = np.searchsorted(boundaries, normalized.ravel()) |
| idx = np.clip(idx, 0, len(level_map) - 1) |
| q = level_map[idx].reshape(N, gs) |
| recon = q * scales[:, None] |
| mse = np.mean((groups - recon) ** 2, axis=1) |
| better = mse < best_mse |
| best_mse[better] = mse[better]; best_scale[better] = scales[better] |
| return best_scale, best_mse |
|
|
| |
| |
| |
| |
| compute_optimal_scale = compute_best_scale_4cand |
|
|
| def trit_quantize_scales(scales, sd): |
| log_scales = np.log(np.maximum(scales, 1e-30)) |
| half = (3 ** sd) // 2 |
| n_levels = 2 * half + 1 |
| log_min = np.percentile(log_scales, 0.1) |
| log_max = np.max(log_scales) |
| if log_max - log_min < 1e-9: |
| log_max = log_min + 1e-9 |
| codebook_log = np.linspace(log_min, log_max, n_levels) |
| idx = np.argmin(np.abs(log_scales[:, None] - codebook_log[None, :]), axis=1) |
| return np.exp(codebook_log[idx]) |
|
|
| def quantize_with_scale(groups, scale, depth, power, zero_boundary=None): |
| half = (3 ** depth) // 2 |
| level_map = build_levels(half, power) |
| boundaries = make_boundaries(level_map, zero_boundary) |
| scale = np.maximum(scale, 1e-30) |
| normalized = groups / scale[:, None] |
| idx = np.searchsorted(boundaries, normalized.ravel()) |
| idx = np.clip(idx, 0, len(level_map) - 1) |
| q = level_map[idx].reshape(groups.shape) |
| return q * scale[:, None] |
|
|
| |
| |
| |
| CODECS = { |
| 'd3scale-sens002': {'mode': 'adaptive', 'scale_depth': 3, 'threshold': 0.002}, |
| 'd3scale-sens003': {'mode': 'adaptive', 'scale_depth': 3, 'threshold': 0.003}, |
| |
| |
| 'uniform-d1': {'mode': 'uniform', 'scale_depth': 3, 'depth': 1, 'zero_boundary': 0.25}, |
| 'uniform-d2': {'mode': 'uniform', 'scale_depth': 3, 'depth': 2}, |
| 'uniform-d3': {'mode': 'uniform', 'scale_depth': 3, 'depth': 3}, |
| 'uniform-d4': {'mode': 'uniform', 'scale_depth': 3, 'depth': 4}, |
| } |
|
|
| |
| |
| |
| def quantize_matrix_multi(args): |
| """Quantize one matrix for ALL requested configs in a single pass. |
| Returns dict: config_name -> (recon_w, depth_counts, weight_bits, scale_bits, n_groups) |
| """ |
| w_flat, rows, cols, config_names = args |
| w = w_flat.reshape(rows, cols) |
| pad = (GS - cols % GS) % GS |
| if pad > 0: |
| w = np.pad(w, ((0, 0), (0, pad))) |
| groups = w.reshape(-1, GS).astype(np.float64) |
| N = len(groups) |
| group_var = np.maximum(np.var(groups, axis=1), 1e-30) |
|
|
| |
| |
| needed_keys = set() |
| for cn in config_names: |
| cfg = CODECS[cn] |
| if cfg['mode'] == 'adaptive': |
| for d in (2, 3, 4): |
| needed_keys.add((d, None)) |
| else: |
| needed_keys.add((cfg['depth'], cfg.get('zero_boundary'))) |
|
|
| scales_per_key = {} |
| mse_per_key = {} |
| recon_per_key = {} |
| for d, zb in sorted(needed_keys, key=lambda x: (x[0], x[1] or 0)): |
| power = DEPTH_POWERS[d] |
| opt_s, _ = compute_optimal_scale(groups, d, power, zero_boundary=zb) |
| use_s = trit_quantize_scales(opt_s, 3) |
| r = quantize_with_scale(groups, use_s, d, power, zero_boundary=zb) |
| mse = np.mean((groups - r) ** 2, axis=1) |
| scales_per_key[(d, zb)] = use_s |
| mse_per_key[(d, zb)] = mse |
| recon_per_key[(d, zb)] = r |
|
|
| out = {} |
| for cn in config_names: |
| cfg = CODECS[cn] |
| if cfg['mode'] == 'uniform': |
| d = cfg['depth'] |
| zb = cfg.get('zero_boundary') |
| recon = recon_per_key[(d, zb)] |
| depth_counts = {1:0, 2:0, 3:0, 4:0} |
| depth_counts[d] = N |
| wb = N * GS * d * np.log2(3) |
| sb = N * cfg['scale_depth'] * np.log2(3) |
| else: |
| eff_thresh = cfg['threshold'] * 5.5 |
| recon = np.zeros_like(groups) |
| assigned = np.zeros(N, dtype=bool) |
| depth_counts = {1:0, 2:0, 3:0, 4:0} |
| wb = 0.0; sb = 0.0 |
| for d in [2, 3, 4]: |
| unassigned = ~assigned |
| if not np.any(unassigned): |
| break |
| if d == 4: |
| recon[unassigned] = recon_per_key[(4, None)][unassigned] |
| n_d = int(np.sum(unassigned)) |
| depth_counts[d] = n_d |
| wb += n_d * GS * d * np.log2(3) |
| sb += n_d * cfg['scale_depth'] * np.log2(3) |
| break |
| mse_d = mse_per_key[(d, None)][unassigned] |
| meets = (mse_d / group_var[unassigned]) < eff_thresh |
| uidx = np.where(unassigned)[0] |
| midx = uidx[meets] |
| recon[midx] = recon_per_key[(d, None)][midx] |
| assigned[midx] = True |
| n_d = int(np.sum(meets)) |
| depth_counts[d] = n_d |
| wb += n_d * GS * d * np.log2(3) |
| sb += n_d * cfg['scale_depth'] * np.log2(3) |
|
|
| recon_w = recon.reshape(rows, -1)[:, :cols].astype(np.float32) |
| out[cn] = { |
| 'recon_w': recon_w, |
| 'depth_counts': depth_counts, |
| 'weight_bits': float(wb), |
| 'scale_bits': float(sb), |
| 'n_groups': N, |
| } |
| return out |
|
|
| |
| |
| |
| def matrix_ckpt_path(ckpt_dir, idx, name): |
| safe = name.replace('/', '__').replace('.', '_') |
| return os.path.join(ckpt_dir, f'matrix_{idx:05d}__{safe}.npz') |
|
|
| def atomic_save_npz(path, data): |
| """Write `data` to `path` atomically, with fsync before rename so the |
| checkpoint survives power loss / SIGKILL after the rename returns.""" |
| |
| |
| fd, tmp = tempfile.mkstemp(prefix='.tmp_', suffix='.npz', dir=os.path.dirname(path)) |
| os.close(fd) |
| np.savez_compressed(tmp, **data) |
| |
| |
| fd = os.open(tmp, os.O_RDONLY) |
| try: |
| os.fsync(fd) |
| finally: |
| os.close(fd) |
| os.replace(tmp, path) |
| |
| dir_fd = os.open(os.path.dirname(path) or '.', os.O_RDONLY) |
| try: |
| os.fsync(dir_fd) |
| except OSError: |
| pass |
| finally: |
| os.close(dir_fd) |
|
|
|
|
| |
| |
| |
| CODEC_VERSION = 'v2.0' |
|
|
| def codec_fingerprint(model_id, revision, depth_powers, group_size, codec_version): |
| """Stable string that identifies the algorithmic state behind a checkpoint. |
| |
| Two checkpoints with the same fingerprint can be safely interleaved. |
| Two with different fingerprints must not be mixed — a mismatch on resume |
| causes the stale checkpoint to be discarded and re-quantized. |
| """ |
| parts = [ |
| f'codec={codec_version}', |
| f'model={model_id}', |
| f'revision={revision or "unspecified"}', |
| f'gs={group_size}', |
| f'powers=' + ','.join(f'{d}:{p}' for d, p in sorted(depth_powers.items())), |
| ] |
| return '|'.join(parts) |
|
|
| def load_ckpt(path): |
| with np.load(path, allow_pickle=True) as z: |
| return {k: z[k] for k in z.files} |
|
|
| def write_progress(out_root, state): |
| path = os.path.join(out_root, 'progress.json') |
| fd, tmp = tempfile.mkstemp(prefix='.tmp_', dir=out_root) |
| with os.fdopen(fd, 'w') as f: |
| json.dump(state, f, indent=2) |
| os.replace(tmp, path) |
|
|
| |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description='Multi-config ternary quantizer with checkpointing') |
| parser.add_argument('--model', required=True) |
| parser.add_argument('--configs', required=True, |
| help='Comma-separated codec names: ' + ','.join(CODECS.keys())) |
| parser.add_argument('--output', required=True, help='Output root dir') |
| parser.add_argument('--workers', type=int, default=1) |
| parser.add_argument('--dtype', default='float16', choices=['float16', 'bfloat16']) |
| parser.add_argument('--skip-assembly', action='store_true', |
| help='Quantize matrices and checkpoint only; skip final HF model assembly.') |
| parser.add_argument('--matrix-range', default=None, |
| help='Slice of matrices to process: "start:end" (0-indexed, end exclusive). ' |
| 'Use to manually parallelize across processes/machines via shared checkpoint dir.') |
| parser.add_argument('--revision', default=None, |
| help='HuggingFace revision (commit SHA or tag) to pin the source model. ' |
| 'Recommended for reproducibility — without it, the upstream repo can move under you.') |
| args = parser.parse_args() |
|
|
| config_names = [c.strip() for c in args.configs.split(',') if c.strip()] |
| for cn in config_names: |
| if cn not in CODECS: |
| print(f'ERROR: unknown codec {cn}', file=sys.stderr); sys.exit(2) |
|
|
| os.makedirs(args.output, exist_ok=True) |
| ckpt_dir = os.path.join(args.output, '.checkpoint') |
| os.makedirs(ckpt_dir, exist_ok=True) |
|
|
| print(f'=== Quantizing {args.model} ===', flush=True) |
| print(f' configs: {config_names}', flush=True) |
| print(f' workers: {args.workers}', flush=True) |
|
|
| import torch |
| from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig, AutoModel |
|
|
| dtype = torch.bfloat16 if args.dtype == 'bfloat16' else torch.float16 |
| print(' loading model (CPU)...', flush=True) |
| t_load = time.time() |
| _cfg = AutoConfig.from_pretrained(args.model, revision=args.revision, trust_remote_code=True) |
| _arch = ((getattr(_cfg, 'architectures', None) or [''])[0] or '').lower() |
| if 't5' in _arch or 'encoder' in _arch: |
| from transformers import T5EncoderModel |
| print(' loading as T5EncoderModel (encoder-only)', flush=True) |
| model = T5EncoderModel.from_pretrained(args.model, revision=args.revision, torch_dtype=dtype, |
| device_map='cpu', trust_remote_code=True, |
| low_cpu_mem_usage=True) |
| else: |
| try: |
| model = AutoModelForCausalLM.from_pretrained(args.model, revision=args.revision, torch_dtype=dtype, |
| device_map='cpu', trust_remote_code=True, |
| low_cpu_mem_usage=True) |
| except ValueError: |
| print(' fallback to generic AutoModel', flush=True) |
| model = AutoModel.from_pretrained(args.model, revision=args.revision, torch_dtype=dtype, |
| device_map='cpu', trust_remote_code=True, |
| low_cpu_mem_usage=True) |
| try: |
| tokenizer = AutoTokenizer.from_pretrained(args.model, revision=args.revision, trust_remote_code=True) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
| except Exception as e: |
| print(f' tokenizer load failed (ok for encoder-only): {e}', flush=True) |
| tokenizer = None |
| print(f' loaded in {time.time()-t_load:.0f}s', flush=True) |
|
|
| |
| matrices = [] |
| for pn, p in model.named_parameters(): |
| if p.dim() != 2 or 'norm' in pn or 'embed' in pn or 'lm_head' in pn: |
| continue |
| matrices.append((pn, p)) |
| print(f' {len(matrices)} matrices to quantize', flush=True) |
|
|
| |
| range_start, range_end = 0, len(matrices) |
| if args.matrix_range: |
| s, e = args.matrix_range.split(':') |
| range_start = int(s) if s else 0 |
| range_end = int(e) if e else len(matrices) |
| range_end = min(range_end, len(matrices)) |
| print(f' matrix-range: [{range_start}:{range_end})', flush=True) |
|
|
| |
| expected_fp = codec_fingerprint(args.model, args.revision, DEPTH_POWERS, GS, CODEC_VERSION) |
|
|
| |
| todo = [] |
| done_count = 0 |
| discarded_count = 0 |
| for idx, (pn, p) in enumerate(matrices): |
| if idx < range_start or idx >= range_end: |
| continue |
| cp = matrix_ckpt_path(ckpt_dir, idx, pn) |
| if os.path.exists(cp): |
| try: |
| z = np.load(cp, allow_pickle=True) |
| meta = json.loads(str(z['_meta'][()])) |
| |
| have_configs = set(meta.get('configs', [])) |
| ckpt_fp = meta.get('fingerprint') |
| ckpt_shape = tuple(meta.get('shape', ())) |
| cur_shape = tuple(p.shape) |
| if all(cn in have_configs for cn in config_names) \ |
| and ckpt_fp == expected_fp \ |
| and ckpt_shape == cur_shape: |
| done_count += 1 |
| continue |
| if ckpt_fp != expected_fp: |
| print(f' fingerprint mismatch on {cp}: stale={ckpt_fp!r} expected={expected_fp!r} — discarding', flush=True) |
| elif ckpt_shape != cur_shape: |
| print(f' shape mismatch on {cp}: stale={ckpt_shape} current={cur_shape} — discarding', flush=True) |
| else: |
| print(f' missing configs in {cp}: have={have_configs}, need={config_names} — redoing', flush=True) |
| discarded_count += 1 |
| os.remove(cp) |
| except Exception as e: |
| print(f' bad checkpoint {cp}: {e}, will redo', flush=True) |
| os.remove(cp) |
| todo.append((idx, pn, p)) |
| if discarded_count: |
| print(f' discarded {discarded_count} stale checkpoint(s)', flush=True) |
| print(f' {done_count} matrices already checkpointed, {len(todo)} to do', flush=True) |
|
|
| t0 = time.time() |
| state = { |
| 'model': args.model, 'configs': config_names, |
| 'total_matrices': len(matrices), |
| 'done_matrices': done_count, |
| 'started_at': t0, 'updated_at': t0, |
| } |
| write_progress(args.output, state) |
|
|
| def process_one(idx, pn, p): |
| w = p.data.float().numpy() |
| result = quantize_matrix_multi( |
| (w.ravel(), w.shape[0], w.shape[1], config_names)) |
| |
| |
| save_data = {'_meta': np.array(json.dumps({ |
| 'name': pn, 'idx': idx, 'shape': list(w.shape), |
| 'configs': config_names, |
| 'fingerprint': expected_fp, |
| }))} |
| for cn, info in result.items(): |
| save_data[f'{cn}__w'] = info['recon_w'] |
| save_data[f'{cn}__stats'] = np.array(json.dumps({ |
| 'depth_counts': info['depth_counts'], |
| 'weight_bits': info['weight_bits'], |
| 'scale_bits': info['scale_bits'], |
| 'n_groups': info['n_groups'], |
| })) |
| atomic_save_npz(matrix_ckpt_path(ckpt_dir, idx, pn), save_data) |
| return idx |
|
|
| if args.workers > 1 and len(todo) > 1: |
| |
| |
| |
| |
| idx_name = [(idx, pn, list(p.shape)) for idx, pn, p in todo] |
| def gen(): |
| for idx, pn, p in todo: |
| w = p.data.float().numpy() |
| yield (w.ravel(), w.shape[0], w.shape[1], config_names) |
| |
| |
| p.data = __import__('torch').zeros(1, dtype=p.dtype) |
| with Pool(args.workers) as pool: |
| for i, result in enumerate(pool.imap(quantize_matrix_multi, gen(), chunksize=1)): |
| idx, pn, shape = idx_name[i] |
| save_data = {'_meta': np.array(json.dumps({ |
| 'name': pn, 'idx': idx, 'shape': shape, |
| 'configs': config_names, |
| 'fingerprint': expected_fp, |
| }))} |
| for cn, info in result.items(): |
| save_data[f'{cn}__w'] = info['recon_w'] |
| save_data[f'{cn}__stats'] = np.array(json.dumps({ |
| 'depth_counts': info['depth_counts'], |
| 'weight_bits': info['weight_bits'], |
| 'scale_bits': info['scale_bits'], |
| 'n_groups': info['n_groups'], |
| })) |
| atomic_save_npz(matrix_ckpt_path(ckpt_dir, idx, pn), save_data) |
| done_count += 1 |
| state['done_matrices'] = done_count |
| state['updated_at'] = time.time() |
| state['elapsed_s'] = time.time() - t0 |
| if (i+1) % 5 == 0 or (i+1) == len(todo): |
| write_progress(args.output, state) |
| eta = (len(todo) - (i+1)) * (time.time() - t0) / max(i+1, 1) |
| print(f' {done_count}/{len(matrices)} ({time.time()-t0:.0f}s, ETA {eta:.0f}s)', flush=True) |
| else: |
| for i, (idx, pn, p) in enumerate(todo): |
| process_one(idx, pn, p) |
| done_count += 1 |
| state['done_matrices'] = done_count |
| state['updated_at'] = time.time() |
| state['elapsed_s'] = time.time() - t0 |
| if (i+1) % 5 == 0 or (i+1) == len(todo): |
| write_progress(args.output, state) |
| eta = (len(todo) - (i+1)) * (time.time() - t0) / max(i+1, 1) |
| print(f' {done_count}/{len(matrices)} ({time.time()-t0:.0f}s, ETA {eta:.0f}s)', flush=True) |
|
|
| print(f' Quantization complete in {time.time()-t0:.0f}s', flush=True) |
|
|
| |
| if args.matrix_range: |
| |
| slice_done = sum(1 for idx, (pn, p) in enumerate(matrices) |
| if range_start <= idx < range_end |
| and os.path.exists(matrix_ckpt_path(ckpt_dir, idx, pn))) |
| print(f' slice [{range_start}:{range_end}): {slice_done} checkpointed', flush=True) |
| return |
|
|
| if args.skip_assembly: |
| print(' --skip-assembly: not building HF model dirs', flush=True) |
| return |
|
|
| |
| |
| |
| print(' Assembling HF models per config...', flush=True) |
| for cn in config_names: |
| cfg_dir = os.path.join(args.output, cn) |
| os.makedirs(cfg_dir, exist_ok=True) |
| model_dir = os.path.join(cfg_dir, 'model') |
|
|
| |
| total_groups = 0 |
| total_depth = {1:0, 2:0, 3:0, 4:0} |
| total_wb = 0.0; total_sb = 0.0 |
|
|
| |
| name_to_param = {pn: p for pn, p in matrices} |
| for idx, (pn, p) in enumerate(matrices): |
| cp = matrix_ckpt_path(ckpt_dir, idx, pn) |
| z = np.load(cp, allow_pickle=True) |
| recon_w = z[f'{cn}__w'] |
| stats = json.loads(str(z[f'{cn}__stats'][()])) |
| p.data = __import__('torch').from_numpy(recon_w).to(p.dtype) |
| total_groups += stats['n_groups'] |
| for d in [1,2,3,4]: |
| total_depth[d] += stats['depth_counts'].get(str(d), stats['depth_counts'].get(d, 0)) |
| total_wb += stats['weight_bits'] |
| total_sb += stats['scale_bits'] |
|
|
| tg = max(total_groups, 1) |
| trit_bpw = total_wb / (tg * GS) |
| scale_bpw = total_sb / (tg * GS) |
| total_bpw = trit_bpw + scale_bpw |
|
|
| print(f' [{cn}] BPW={total_bpw:.3f} (trit={trit_bpw:.3f}+scale={scale_bpw:.3f})', flush=True) |
| print(f' [{cn}] Saving to {model_dir}...', flush=True) |
| model.save_pretrained(model_dir, safe_serialization=True) |
| if tokenizer is not None: |
| tokenizer.save_pretrained(model_dir) |
|
|
| config = { |
| 'model': os.path.basename(args.model.rstrip('/')), |
| 'model_revision': args.revision, |
| 'codec_version': CODEC_VERSION, |
| 'codec_fingerprint': expected_fp, |
| 'codec': cn, |
| 'bpw': total_bpw, 'trit_bpw': trit_bpw, 'scale_bpw': scale_bpw, |
| 'depth_pcts': {str(d): total_depth[d]/tg for d in [1,2,3,4]}, |
| 'n_matrices': len(matrices), |
| 'group_size': GS, |
| 'fp16_layers': ['lm_head', 'embed_tokens', '*_norm'], |
| 'codec_params': CODECS[cn], |
| } |
| with open(os.path.join(cfg_dir, 'config.json'), 'w') as f: |
| json.dump(config, f, indent=2) |
| print(f' [{cn}] DONE: {cfg_dir}', flush=True) |
|
|
| print(f' ALL CONFIGS COMPLETE in {time.time()-t0:.0f}s total', flush=True) |
|
|
| if __name__ == '__main__': |
| main() |
|
|