#!/usr/bin/env python3 """ HexState GGUF Re-Quantizer — GGUF-to-GGUF Q2_K quantization. Reads a source GGUF (F16/BF16/F32), copies all metadata verbatim, and re-quantizes eligible weight tensors to Q2_K using numpy. This bypasses the tokenizer parsing problem entirely — the source GGUF (from llama.cpp's convert_hf_to_gguf.py) has correct metadata. Usage: python3 hexstate_requantize.py input.gguf output.gguf """ import struct import sys import time import os import io import ctypes import numpy as np # ─── HExState C Library (HPC-optimized Q2_K quantization) ────────────────── _HEXSTATE_LIB = None def _load_hexstate_lib(): """Try to load the HExState C shared library for HPC-optimized quantization.""" global _HEXSTATE_LIB if _HEXSTATE_LIB is not None: return _HEXSTATE_LIB lib_dir = os.path.dirname(os.path.abspath(__file__)) lib_path = os.path.join(lib_dir, "libhexstate_q2k.so") if not os.path.exists(lib_path): return None try: lib = ctypes.CDLL(lib_path) # void hexstate_init(void) lib.hexstate_init.restype = None lib.hexstate_init.argtypes = [] # void hexstate_quantize_tensor_q2k(const float*, int64_t, void*, float*, int, int) lib.hexstate_quantize_tensor_q2k.restype = None lib.hexstate_quantize_tensor_q2k.argtypes = [ ctypes.POINTER(ctypes.c_float), # weights ctypes.c_int64, # n_elements ctypes.c_void_p, # output ctypes.POINTER(ctypes.c_float), # out_error ctypes.c_int, # opt_mode (0=HPC, 1=MSE, 2=Hybrid) ctypes.c_int, # verbose ] lib.hexstate_q2k_block_bytes.restype = ctypes.c_int lib.hexstate_q2k_block_bytes.argtypes = [] lib.hexstate_q2k_block_elements.restype = ctypes.c_int lib.hexstate_q2k_block_elements.argtypes = [] # imatrix-aware version lib.hexstate_quantize_tensor_q2k_imat.restype = None lib.hexstate_quantize_tensor_q2k_imat.argtypes = [ ctypes.POINTER(ctypes.c_float), # weights ctypes.c_int64, # n_elements ctypes.c_void_p, # output ctypes.POINTER(ctypes.c_float), # out_error ctypes.c_int, # opt_mode ctypes.POINTER(ctypes.c_float), # imat_importance (can be NULL) ctypes.c_int, # verbose ] # Q8_0 HPC quantizer (Shor pipeline; tied embeddings / LM head) if hasattr(lib, 'hexstate_quantize_tensor_q8_0_hpc'): lib.hexstate_quantize_tensor_q8_0_hpc.restype = None lib.hexstate_quantize_tensor_q8_0_hpc.argtypes = [ ctypes.POINTER(ctypes.c_float), # weights ctypes.c_int64, # n_elements ctypes.c_void_p, # output ctypes.POINTER(ctypes.c_float), # out_error ctypes.POINTER(ctypes.c_float), # imat_importance (can be NULL) ctypes.c_int, # verbose ] # Q4_0 HPC quantizer (for attention tensors) if hasattr(lib, 'hexstate_quantize_tensor_q4_0_hpc'): lib.hexstate_quantize_tensor_q4_0_hpc.restype = None lib.hexstate_quantize_tensor_q4_0_hpc.argtypes = [ ctypes.POINTER(ctypes.c_float), # weights ctypes.c_int64, # n_elements ctypes.c_void_p, # output ctypes.POINTER(ctypes.c_float), # out_error ctypes.POINTER(ctypes.c_float), # imat_importance (can be NULL) ctypes.c_int, # verbose ] lib.hexstate_init() _HEXSTATE_LIB = lib return lib except Exception as e: print(f" WARNING: Failed to load HexState library: {e}") return None def _skip_gguf_kv_value(f, vtype): """Skip a GGUF KV value of the given type.""" import struct as st size_map = {0:1, 1:1, 2:2, 3:2, 4:4, 5:4, 6:4, 7:1, 10:8, 11:8, 12:8} if vtype == 8: # string slen = st.unpack(' normalized importance array (float32) """ import struct as st imat = {} with open(path, 'rb') as f: magic = st.unpack(' 0: importance = np.sqrt(in_sum2 / count) else: importance = np.ones_like(in_sum2) mean = importance.mean() if mean > 1e-30: imat[base_name] = importance / mean else: imat[base_name] = np.ones_like(importance) else: # Legacy format: first 4 bytes were n_entries f.seek(0) n_entries = st.unpack(' 1e-30: imat[name] = values / mean else: imat[name] = np.ones_like(values) return imat def quantize_tensor_q2k_hpc(f32_data, opt_mode=2, importance=None): """Quantize tensor using HexState HPC-optimized C implementation. opt_mode: 0=HPC (BP only), 1=MSE (grid search), 2=Hybrid (recommended) importance: optional per-element importance weights (from imatrix) Returns: (bytes, n_blocks) same as quantize_tensor_q2k() """ lib = _load_hexstate_lib() if lib is None: raise RuntimeError("HexState library not available") n_elements = len(f32_data) if n_elements % QK_K != 0: pad_len = QK_K - (n_elements % QK_K) f32_data = np.concatenate([f32_data, np.zeros(pad_len, dtype=np.float32)]) if importance is not None: importance = np.concatenate([importance, np.ones(pad_len, dtype=np.float32)]) n_elements = len(f32_data) n_blocks = n_elements // QK_K block_bytes = lib.hexstate_q2k_block_bytes() # 84 # Allocate output buffer output = np.zeros(n_blocks * block_bytes, dtype=np.uint8) error = ctypes.c_float(0.0) # Call C quantizer with or without importance weights f32_contiguous = np.ascontiguousarray(f32_data, dtype=np.float32) if importance is not None: imat_contiguous = np.ascontiguousarray(importance, dtype=np.float32) imat_ptr = imat_contiguous.ctypes.data_as(ctypes.POINTER(ctypes.c_float)) else: imat_ptr = None lib.hexstate_quantize_tensor_q2k_imat( f32_contiguous.ctypes.data_as(ctypes.POINTER(ctypes.c_float)), ctypes.c_int64(n_elements), output.ctypes.data_as(ctypes.c_void_p), ctypes.byref(error), ctypes.c_int(opt_mode), imat_ptr, ctypes.c_int(1), # verbose ) return output.tobytes(), n_blocks # ─── Constants ────────────────────────────────────────────────────────────── GGUF_MAGIC = 0x46554747 GGUF_VERSION = 3 ALIGNMENT = 32 QK_K = 256 GGML_TYPE_F32 = 0 GGML_TYPE_F16 = 1 GGML_TYPE_Q4_0 = 2 GGML_TYPE_Q8_0 = 8 GGML_TYPE_Q2_K = 10 GGML_TYPE_BF16 = 30 TYPE_NAME = { 0: "F32", 1: "F16", 2: "Q4_0", 3: "Q4_1", 6: "Q5_0", 7: "Q5_1", 8: "Q8_0", 9: "Q8_1", 10: "Q2_K", 11: "Q3_K", 12: "Q4_K", 13: "Q5_K", 14: "Q6_K", 15: "Q8_K", 30: "BF16", } # Block sizes and byte sizes for each type TYPE_BLOCK_SIZE = { 0: 1, 1: 1, 2: 32, 3: 32, 6: 32, 7: 32, 8: 32, 9: 32, 10: 256, 11: 256, 12: 256, 13: 256, 14: 256, 15: 256, 30: 1, } TYPE_BLOCK_BYTES = { 0: 4, 1: 2, 2: 18, 3: 20, 6: 20, 7: 22, 8: 34, 9: 36, 10: 84, 11: 110, 12: 144, 13: 176, 14: 210, 15: 292, 30: 2, } def align_offset(offset, alignment=ALIGNMENT): return (offset + alignment - 1) & ~(alignment - 1) def read_string(f): slen = struct.unpack('> 16).astype(np.uint16) return bf16.tobytes() # ─── Q2_K quantization — faithful port of ggml quantize_row_q2_K_ref ─────── # Vectorized with numpy for performance. Uses make_qkx2_quants algorithm: # - Weighted MAD error with weights[i] = |x[i]| # - Joint scale+min least-squares solve # - 16-step grid search for initial iscale def quantize_tensor_q8_0(f32_data): """Vectorized ggml-faithful Q8_0 (fallback when the HPC lib is absent). Block: 32 weights -> fp16 d + 32 x int8 = 34 bytes; y = q * d. d = amax/127 (float), q = round(x/d), d stored as fp16 -- matches ggml quantize_row_q8_0_ref. Returns (bytes, n_blocks, sse).""" n = len(f32_data) if n % 32 != 0: f32_data = np.concatenate( [f32_data, np.zeros(32 - n % 32, dtype=np.float32)]) n = len(f32_data) blocks = f32_data.reshape(-1, 32).astype(np.float32) nb = blocks.shape[0] amax = np.max(np.abs(blocks), axis=1) d = amax / 127.0 id_ = np.where(d > 0, 1.0 / np.where(d > 0, d, 1.0), 0.0) qs = np.clip(np.rint(blocks * id_[:, None]), -127, 127).astype(np.int8) d16 = d.astype(' 0 this_scale = np.where(valid_D, (sum_w * sum_xl - sum_x * sum_l) / np.maximum(D, 1e-30), 0.0) this_min = np.where(valid_D, (sum_l2 * sum_x - sum_l * sum_xl) / np.maximum(D, 1e-30), 0.0) # If this_min > 0, clamp to 0 and recompute scale pos_min = this_min > 0 this_min = np.where(pos_min, 0.0, this_min) this_scale = np.where(pos_min & (sum_l2 > 0), sum_xl / np.maximum(sum_l2, 1e-30), this_scale) # Compute error for this trial recon = this_scale[:, :, None] * Laux + this_min[:, :, None] cur_error = (weights * np.abs(recon - data)).sum(axis=2) # Update where this trial is better better = valid_D & (cur_error < best_error) & ~degenerate if better.any(): # Expand mask to weight dimension for L update better3d = better[:, :, None] best_L = np.where(better3d, Laux, best_L) best_error = np.where(better, cur_error, best_error) best_scale = np.where(better, this_scale, best_scale) best_min = np.where(better, this_min, best_min) # the_min = -best_min (make positive) sb_scale = np.maximum(best_scale, 0.0).astype(np.float32) # [n_blocks, 16] sb_the_min = np.maximum(-best_min, 0.0).astype(np.float32) # [n_blocks, 16] # Handle degenerate sub-blocks sb_scale[degenerate] = 0.0 sb_the_min[degenerate] = np.maximum(-sb_min[degenerate], 0.0).astype(np.float32) # ── Phase 2: quantize scales/mins to 4-bit ── max_scale = sb_scale.max(axis=1) # [n_blocks] max_min = sb_the_min.max(axis=1) # [n_blocks] # Quantize sub-block scales to 4-bit has_scale = max_scale > 0 iscale_s = np.where(has_scale, q4scale / np.maximum(max_scale, 1e-30), 0.0) scales_q = np.where(has_scale[:, None], np.clip(np.round(iscale_s[:, None] * sb_scale), 0, 15), 0.0).astype(np.uint8) # Quantize sub-block mins to 4-bit has_min = max_min > 0 iscale_m = np.where(has_min, q4scale / np.maximum(max_min, 1e-30), 0.0) mins_q = np.where(has_min[:, None], np.clip(np.round(iscale_m[:, None] * sb_the_min), 0, 15), 0.0).astype(np.uint8) d_fp16 = np.where(has_scale, max_scale / q4scale, 0.0).astype(np.float16) dmin_fp16 = np.where(has_min, max_min / q4scale, 0.0).astype(np.float16) # ── Phase 3: requantize using fp16-truncated d/dmin ── scales_packed = scales_q | (mins_q << 4) # [n_blocks, 16] d_f32 = d_fp16.astype(np.float32) dmin_f32 = dmin_fp16.astype(np.float32) d_sub = d_f32[:, None] * (scales_packed & 0xF).astype(np.float32) dm_sub = dmin_f32[:, None] * (scales_packed >> 4).astype(np.float32) # l = nearest_int((x + dm) / d), clamp [0,3] valid_d = d_sub > 0 inv_d = np.where(valid_d, 1.0 / np.maximum(d_sub, 1e-30), 0.0) q_vals = np.where(valid_d[:, :, None], np.clip(np.round( (f32_data.reshape(n_blocks, 16, 16) + dm_sub[:, :, None]) * inv_d[:, :, None] ), 0, 3), 0).astype(np.uint8) # ── Phase 4: pack ── q_flat = q_vals.reshape(n_blocks, QK_K) q_groups = q_flat.reshape(n_blocks, 2, 4, 32) qs_packed = (q_groups[:, :, 0, :] | (q_groups[:, :, 1, :] << 2) | (q_groups[:, :, 2, :] << 4) | (q_groups[:, :, 3, :] << 6)).astype(np.uint8) qs_packed = qs_packed.reshape(n_blocks, 64) # Build output: [n_blocks, 84] bytes # Layout matches ggml block_q2_K: scales[16] | qs[64] | d(fp16) | dmin(fp16) result = np.zeros((n_blocks, 84), dtype=np.uint8) result[:, 0:16] = scales_packed result[:, 16:80] = qs_packed result[:, 80:82] = d_fp16.view(np.uint8).reshape(n_blocks, 2) result[:, 82:84] = dmin_fp16.view(np.uint8).reshape(n_blocks, 2) return result.tobytes(), n_blocks def dequant_q2k_fast(q2k_bytes, n_blocks): """Vectorized Q2_K dequantization for RMSE computation. Block layout (84 bytes) — same for both C struct and Python writer: scales[16] (bytes 0-15) | qs[64] (bytes 16-79) | d(fp16, bytes 80-81) | dmin(fp16, bytes 82-83) The C struct BlockQ2K in gguf_format.h is: { uint8_t scales[16]; uint8_t qs[64]; uint16_t d; uint16_t dmin; } Dequantization follows gguf_dequantize_q2_k_block() exactly: For each half (0..1), qs_half = qs[half*32 : half*32+32] For each shift j (0..3): scale_idx = half*8 + j*2 elements [0..15] use scales[scale_idx], from qs_half[0..15] >> (j*2) elements [16..31] use scales[scale_idx+1], from qs_half[16..31] >> (j*2) """ data = np.frombuffer(q2k_bytes, dtype=np.uint8).reshape(n_blocks, 84) # Extract fields scales_packed = data[:, 0:16] # [n_blocks, 16] qs = data[:, 16:80] # [n_blocks, 64] d_fp16 = data[:, 80:82].copy().view(np.float16).astype(np.float32).reshape(n_blocks) dmin_fp16 = data[:, 82:84].copy().view(np.float16).astype(np.float32).reshape(n_blocks) # Extract scale (low 4 bits) and min (high 4 bits) per sub-block sc = (scales_packed & 0xF).astype(np.float32) # [n_blocks, 16] mn = (scales_packed >> 4).astype(np.float32) # [n_blocks, 16] # Compute per-sub-block d_sub and m_sub d_sub = d_fp16[:, np.newaxis] * sc # [n_blocks, 16] m_sub = dmin_fp16[:, np.newaxis] * mn # [n_blocks, 16] # Unpack 2-bit quants from qs[64] into 256 values per block. # Matches C reference: two scales per 32-byte extraction (16 elements each). # half=0: qs[0..31], half=1: qs[32..63] # shift j=0..3: scale_idx = half*8 + j*2 (first 16), +1 (second 16) result = np.zeros((n_blocks, QK_K), dtype=np.float32) for half in range(2): qs_half = qs[:, half * 32:(half + 1) * 32] # [n_blocks, 32] for sub in range(4): # Extract 2-bit quants at this shift position q_vals = ((qs_half >> (sub * 2)) & 3).astype(np.float32) # [n_blocks, 32] base_idx = half * 128 + sub * 32 # First 16 elements: qs_half[0..15], scale index = half*8 + sub*2 si_0 = half * 8 + sub * 2 result[:, base_idx:base_idx + 16] = ( d_sub[:, si_0:si_0+1] * q_vals[:, :16] - m_sub[:, si_0:si_0+1] ) # Second 16 elements: qs_half[16..31], scale index = si_0 + 1 si_1 = si_0 + 1 result[:, base_idx + 16:base_idx + 32] = ( d_sub[:, si_1:si_1+1] * q_vals[:, 16:] - m_sub[:, si_1:si_1+1] ) return result.reshape(-1) def is_attention_tensor(name): """Detect attention Q/K/V/O projection tensors. These are the most sensitive to quantization and get promoted to Q4_0.""" attn_patterns = [ 'attn_q.weight', 'attn_k.weight', 'attn_v.weight', 'attn_output.weight', 'attn_qkv.weight', 'attn_gate.weight', 'self_attn.q_proj.weight', 'self_attn.k_proj.weight', 'self_attn.v_proj.weight', 'self_attn.o_proj.weight', # Qwen 3.6 DeltaNet SSM projections — treat as attention-class 'ssm_in_qkv.weight', 'ssm_in_z.weight', 'ssm_out.weight', 'linear_attn.in_proj_qkv.weight', 'linear_attn.in_proj_z.weight', 'linear_attn.out_proj.weight', ] for pat in attn_patterns: if pat in name: return True return False def should_quantize(name, n_dims, dims, tied_embeddings=False): """Should this tensor be quantized to Q2_K? With iMatrix importance weighting, Q2_K is applied to ALL eligible tensors including embeddings for maximum compression. Tensors kept as-is: - 1D tensors (norms, biases) — always kept - _norm, .bias — normalization layers - ffn_gate_inp — MoE routing gate - layer_output_scale — per-layer scaling factor (scalar) - altup, laurel — small Gemma-specific tensors - token_embd.weight / output.weight — always excluded here. When embeddings are TIED, main() routes token_embd.weight to Q8_0 (HPC Shor pipeline) instead: the same tensor serves as both embedding lookup AND LM head, and Q2_K/Q4_0 there destroys logit precision → looping / repetitive generation. --keep-embd keeps it at source precision instead. """ n_elements = 1 for d in dims: n_elements *= d if n_dims < 2: return False if 'norm' in name: return False if '.bias' in name: return False if 'ffn_gate_inp' in name: return False if 'altup' in name or 'laurel' in name: return False if 'layer_output_scale' in name: return False # Embedding table — this is a lookup, not a matmul; Q2_K destroys # token distinctions. Keep at source precision (F16/BF16). if 'token_embd' in name: return False # LM head output projection — logit precision is critical for generation. # (When tied with embeddings, this is the same tensor and also skipped above.) if name == 'output.weight': return False # DeltaNet state-space parameters — keep at full precision if 'ssm_a' in name or 'A_log' in name: return False if 'ssm_dt' in name or 'dt_bias' in name: return False if 'ssm_conv1d' in name or 'conv1d.weight' in name: return False # When embeddings are tied, token_embd.weight doubles as the output # projection (LM head). It gets routed to Q4_0 in the quant plan # instead of Q2_K — handled in main(), not here. # Skip vision/audio encoder tensors if 'v.' in name and name.startswith('v.'): return False if name.startswith('mm.') or name.startswith('a.'): return False # Small tensors are not worth quantizing if n_elements < QK_K: return False # Must be divisible by QK_K if n_elements % QK_K != 0: return False return True def main(): if len(sys.argv) < 3: print("Usage: python3 hexstate_requantize.py " " [--keep-metadata] [--imatrix FILE] [--keep-embd] [--q2all]") sys.exit(1) input_path = sys.argv[1] output_path = sys.argv[2] keep_metadata = '--keep-metadata' in sys.argv quantize_none = '--quantize-none' in sys.argv q2all = '--q2all' in sys.argv keep_embd = '--keep-embd' in sys.argv # keep tied embedding at source precision instead of Q8_0 # Check for imatrix imatrix_data = None for i, arg in enumerate(sys.argv): if arg == '--imatrix' and i + 1 < len(sys.argv): imat_path = sys.argv[i + 1] if os.path.exists(imat_path): imatrix_data = read_imatrix(imat_path) print(f" Loaded imatrix: {len(imatrix_data)} tensors from {imat_path}") else: print(f" WARNING: imatrix file not found: {imat_path}") break # Check for HPC C library use_hpc = _load_hexstate_lib() is not None print() print(" ╔════════════════════════════════════════════════════════════════╗") print(" ║ HExState GGUF Re-Quantizer ║") print(" ║ GGUF → Q2_K GGUF with metadata passthrough ║") if q2all: print(" ║ Mode: --q2all ALL eligible tensors → Q2_K (test mode) ║") if use_hpc and imatrix_data: print(" ║ Engine: HPC + iMatrix (calibrated sensitivity propagation) ║") elif use_hpc: print(" ║ Engine: HPC (BP + MSE Grid + Sensitivity Propagation) ║") else: print(" ║ Engine: Python (numpy vectorized) ║") print(" ╚════════════════════════════════════════════════════════════════╝") print() start_time = time.time() file_size = os.path.getsize(input_path) print(f" Input: {input_path}") print(f" Size: {file_size / 1024**3:.2f} GB") print(f" Output: {output_path}") print() with open(input_path, 'rb') as fin: # ── Read Header ── magic = struct.unpack('= 2 else ti['n_elements'] if quant_plan[i] == 'EMBD_Q8': # Tied embedding / LM head → Q8_0 (8.5 bpw, 34 B / 32 w) out_type = GGML_TYPE_Q8_0 n_blocks = ti['n_elements'] // 32 out_size = n_blocks * 34 print(f" [EMBD→Q8_0·Shor] {ti['name']} ({ti['n_elements']:,} elements)") elif quant_plan[i] == 'ATTN_Q4': # Attention tensor → Q4_0 HPC (4.5 bpw) out_type = GGML_TYPE_Q4_0 n_blocks = (ti['n_elements'] + 31) // 32 out_size = n_blocks * 18 print(f" [ATTN→Q4_0·HPC] {ti['name']} ({ti['n_elements']} elements)") elif dim0 % QK_K == 0 or q2all: # Q2_K (2.6 bpw, block_size=256) # --q2all forces Q2_K even when dim0 isn't a clean multiple; # the quantizer pads internally to the next QK_K boundary. out_type = GGML_TYPE_Q2_K n_blocks = (ti['n_elements'] + QK_K - 1) // QK_K out_size = n_blocks * 84 if q2all and dim0 % QK_K != 0: print(f" [Q2_K·PADDED] {ti['name']} (dim0={dim0}, padded to QK_K boundary)") elif dim0 % 32 == 0: # Q4_0 fallback (4.5 bpw, block_size=32) out_type = GGML_TYPE_Q4_0 n_blocks = ti['n_elements'] // 32 out_size = n_blocks * 18 quant_plan[i] = 'Q4_0' print(f" Q4_0: {ti['name']} (dims[0]={dim0})") else: out_type = ti['type'] out_size = ti['data_size'] quant_plan[i] = False print(f" Keep: {ti['name']} (dims[0]={dim0})") else: out_type = ti['type'] out_size = ti['data_size'] out_dims = list(ti['dims']) out_tensor_infos.append({ 'name': ti['name'], 'n_dims': ti['n_dims'], 'dims': out_dims, 'type': out_type, 'offset': out_data_offset, 'data_size': out_size, }) out_data_offset += out_size out_data_offset = align_offset(out_data_offset) # ── Update KV pairs ── updated_kv = [] if keep_metadata: print(" --keep-metadata: passing through ALL KV pairs unchanged") updated_kv = list(kv_pairs) else: for key, vtype, raw_value in kv_pairs: if key == 'general.file_type' and vtype == 4: # UINT32 # file_type=10 means Q2_K in llama.cpp updated_kv.append((key, vtype, struct.pack(' # spam). Fix: read the tokens array to find control-looking # tokens, then patch their types to CONTROL (3). # See: https://github.com/ggml-org/llama.cpp/issues/21321 tokens_kv = next((v for k, vt, v in kv_pairs if k == 'tokenizer.ggml.tokens' and vt == 9), None) token_names = [] if tokens_kv: bio = io.BytesIO(tokens_kv) arr_type = struct.unpack(', , , , — sentence markers # - <|turn>, , <|tool_*|> etc — delimiters # NOTE: do NOT mark as CONTROL — Gemma 4 uses # these tokens internally for thinking/channel markers # (e.g. = <|channel>). The llama.cpp parser # handles them via the peg-gemma4 format instead. is_control = False if tname in ('', '', '', '', '', '', ''): is_control = True elif re.match(r'^<\|.*\|?>$', tname) or re.match(r'^<.*\|>$', tname): is_control = True if is_control and ttypes[i] != CONTROL_TYPE: ttypes[i] = CONTROL_TYPE n_fixed += 1 print(f" Fixed {n_fixed} token types to CONTROL (Gemma 4 fix)") # Rebuild the raw value new_raw = struct.pack(' tokens. The fixed template from # llama.cpp PR #21418 pre-fills an empty thought block when # thinking is disabled: <|channel>thought\n # See: https://github.com/ggml-org/llama.cpp/pull/21418 script_dir = os.path.dirname(os.path.abspath(__file__)) workspace_dir = os.path.dirname(script_dir) template_path = os.path.join(workspace_dir, 'llama-cpp-latest', 'models', 'templates', 'google-gemma-4-31B-it.jinja') if os.path.exists(template_path): with open(template_path, 'r') as tf: new_template = tf.read() new_raw = struct.pack(' pos: fout.write(b'\x00' * (aligned - pos)) # ── Write tensor data ── quant_count = 0 total_quant_bytes = 0 total_keep_bytes = 0 total_rmse = 0.0 q2k_rmse_sum = 0.0 q2k_tensor_count = 0 for i, ti in enumerate(tensor_infos): # Progress bar pct = (i + 1) / n_tensors * 100 bar_width = 40 filled = int(bar_width * (i + 1) / n_tensors) bar = '█' * filled + '░' * (bar_width - filled) elapsed = time.time() - start_time eta = elapsed / max(i + 1, 1) * (n_tensors - i - 1) sys.stdout.write(f"\r [{bar}] {pct:5.1f}% ({i+1}/{n_tensors}) {elapsed:.0f}s ETA:{eta:.0f}s {ti['name'][:50]}") sys.stdout.flush() # Read source tensor data abs_offset = data_section_start + ti['offset'] fin.seek(abs_offset) raw_data = fin.read(ti['data_size']) if quant_plan[i] == 'EMBD_Q8': # ── Tied embedding → Q8_0 via the HPC Shor pipeline ── if ti['type'] == GGML_TYPE_BF16: f32 = bf16_to_f32(raw_data, ti['n_elements']) elif ti['type'] == GGML_TYPE_F16: f32 = f16_to_f32(raw_data, ti['n_elements']) elif ti['type'] == GGML_TYPE_F32: f32 = np.frombuffer(raw_data, dtype=np.float32).copy() else: # Can't re-quantize from quantized source — keep fout.write(raw_data) pad = align_offset(fout.tell()) - fout.tell() if pad > 0: fout.write(b'\x00' * pad) continue n_el = ti['n_elements'] n_blocks_q8 = n_el // 32 if use_hpc and hasattr(_HEXSTATE_LIB, 'hexstate_quantize_tensor_q8_0_hpc'): output_buf = np.zeros(n_blocks_q8 * 34, dtype=np.uint8) error = ctypes.c_float(0.0) f32_c = np.ascontiguousarray(f32, dtype=np.float32) imat_ptr = None if imatrix_data and ti['name'] in imatrix_data: iw = imatrix_data[ti['name']] n_cols = iw.shape[0] n_rows = n_el // n_cols if n_cols > 0 else 1 imat_full = np.tile(iw, n_rows)[:n_el].astype(np.float32) imat_c = np.ascontiguousarray(imat_full) imat_ptr = imat_c.ctypes.data_as(ctypes.POINTER(ctypes.c_float)) _HEXSTATE_LIB.hexstate_quantize_tensor_q8_0_hpc( f32_c.ctypes.data_as(ctypes.POINTER(ctypes.c_float)), ctypes.c_int64(n_el), output_buf.ctypes.data_as(ctypes.c_void_p), ctypes.byref(error), imat_ptr, ctypes.c_int(0), ) fout.write(output_buf.tobytes()) rmse8 = float(np.sqrt(error.value / max(n_el, 1))) print(f"\n [Q8_0·Shor] {ti['name']} RMSE={rmse8:.6e}") else: q8_bytes, n_blocks_q8, sse8 = quantize_tensor_q8_0(f32) fout.write(q8_bytes) rmse8 = float(np.sqrt(sse8 / max(n_el, 1))) print(f"\n [Q8_0] {ti['name']} RMSE={rmse8:.6e} (numpy fallback)") quant_count += 1 total_quant_bytes += n_blocks_q8 * 34 elif quant_plan[i] in ('Q4_0', 'ATTN_Q4'): # ── Q4_0 quantization (fallback or attention HPC) ── if ti['type'] == GGML_TYPE_BF16: f32 = bf16_to_f32(raw_data, ti['n_elements']) elif ti['type'] == GGML_TYPE_F16: f32 = f16_to_f32(raw_data, ti['n_elements']) elif ti['type'] == GGML_TYPE_F32: f32 = np.frombuffer(raw_data, dtype=np.float32).copy() else: fout.write(raw_data) pad = align_offset(fout.tell()) - fout.tell() if pad > 0: fout.write(b'\x00' * pad) continue # Pad to 32-element boundary n_el = len(f32) pad_to = ((n_el + 31) // 32) * 32 if pad_to > n_el: f32 = np.concatenate([f32, np.zeros(pad_to - n_el, dtype=np.float32)]) n_el = pad_to n_blocks_q4 = n_el // 32 # Use HPC for attention tensors if available if quant_plan[i] == 'ATTN_Q4' and use_hpc and hasattr(_HEXSTATE_LIB, 'hexstate_quantize_tensor_q4_0_hpc'): output_buf = np.zeros(n_blocks_q4 * 18, dtype=np.uint8) error = ctypes.c_float(0.0) f32_c = np.ascontiguousarray(f32, dtype=np.float32) # Look up imatrix importance imat_ptr = None if imatrix_data and ti['name'] in imatrix_data: iw = imatrix_data[ti['name']] n_cols = iw.shape[0] n_rows = n_el // n_cols if n_cols > 0 else 1 imat_full = np.tile(iw, n_rows)[:n_el].astype(np.float32) imat_c = np.ascontiguousarray(imat_full) imat_ptr = imat_c.ctypes.data_as(ctypes.POINTER(ctypes.c_float)) _HEXSTATE_LIB.hexstate_quantize_tensor_q4_0_hpc( f32_c.ctypes.data_as(ctypes.POINTER(ctypes.c_float)), ctypes.c_int64(n_el), output_buf.ctypes.data_as(ctypes.c_void_p), ctypes.byref(error), imat_ptr, ctypes.c_int(1), # verbose ) fout.write(output_buf.tobytes()) print(f"\n [Q4_0·HPC] {ti['name']} RMSE={np.sqrt(error.value / ti['n_elements']):.6e}") else: # Vectorized Q4_0: process all blocks at once blocks = f32.reshape(-1, 32) amax = np.max(np.abs(blocks), axis=1) d = amax / 7.0 d[d == 0] = 1.0 # avoid div by zero qs = np.clip(np.round(blocks / d[:, None]) + 8, 0, 15).astype(np.uint8) d_orig = amax / 7.0 # restore zeros d_fp16 = d_orig.astype(np.float16) out_buf = bytearray(n_blocks_q4 * 18) for b in range(n_blocks_q4): off = b * 18 struct.pack_into(' 0: fout.write(b'\x00' * pad) continue # Quantize to Q2_K — always use HPC with chunked processing # Each chunk gets full HPC treatment (no size threshold) HPC_CHUNK = 50_000_000 # 50M elements per HPC chunk HPC_CHUNK = (HPC_CHUNK // QK_K) * QK_K # align to QK_K # Look up imatrix importance for this tensor imat_full = None if imatrix_data and ti['name'] in imatrix_data: iw = imatrix_data[ti['name']] n_cols = iw.shape[0] n_rows = ti['n_elements'] // n_cols if n_cols > 0 else 1 imat_full = np.tile(iw, n_rows)[:ti['n_elements']] n_el = ti['n_elements'] if use_hpc and n_el <= HPC_CHUNK: # Small tensor — single HPC pass q2k_data, n_blocks = quantize_tensor_q2k_hpc(f32, opt_mode=2, importance=imat_full) elif use_hpc: # Large tensor — chunked HPC (each chunk gets BP) chunks = [] processed = 0 while processed < n_el: end = min(processed + HPC_CHUNK, n_el) chunk_f32 = f32[processed:end] if len(chunk_f32) % QK_K != 0: pad_len = QK_K - (len(chunk_f32) % QK_K) chunk_f32 = np.concatenate([chunk_f32, np.zeros(pad_len, dtype=np.float32)]) chunk_imp = imat_full[processed:end] if imat_full is not None else None if chunk_imp is not None and len(chunk_imp) < len(chunk_f32): chunk_imp = np.concatenate([chunk_imp, np.ones(len(chunk_f32) - len(chunk_imp), dtype=np.float32)]) chunk_data, _ = quantize_tensor_q2k_hpc(chunk_f32, opt_mode=2, importance=chunk_imp) actual_blocks = (end - processed + QK_K - 1) // QK_K chunks.append(chunk_data[:actual_blocks * 84]) processed = end pct = 100.0 * processed / n_el print(f"\r → {processed/1e6:.0f}M/{n_el/1e6:.0f}M ({pct:.0f}%)", end='', flush=True) print() q2k_data = b''.join(chunks) n_blocks = n_el // QK_K else: # No HPC available — python fallback CHUNK_SIZE = 10_000_000 CHUNK_SIZE = (CHUNK_SIZE // QK_K) * QK_K chunks = [] processed = 0 while processed < n_el: end = min(processed + CHUNK_SIZE, n_el) chunk_data, _ = quantize_tensor_q2k(f32[processed:end]) chunks.append(chunk_data) processed = end pct = 100.0 * processed / n_el print(f"\r → {processed/1e6:.0f}M/{n_el/1e6:.0f}M ({pct:.0f}%)", end='', flush=True) print() q2k_data = b''.join(chunks) n_blocks = n_el // QK_K fout.write(q2k_data) # ── Compute and report exact per-tensor RMSE ── try: CHUNK_BLK = 100_000 # blocks per chunk to bound memory total_se = 0.0 total_n = 0 for ci in range(0, n_blocks, CHUNK_BLK): ce = min(ci + CHUNK_BLK, n_blocks) chunk_q = q2k_data[ci*84:ce*84] deq_chunk = dequant_q2k_fast(chunk_q, ce - ci) orig_chunk = f32[ci*QK_K:ce*QK_K] n_valid = min(len(orig_chunk), len(deq_chunk)) diff = orig_chunk[:n_valid] - deq_chunk[:n_valid] total_se += np.sum(diff ** 2) total_n += n_valid tensor_rmse = np.sqrt(total_se / max(total_n, 1)) q2k_rmse_sum += tensor_rmse q2k_tensor_count += 1 print(f"\n [Q2_K] {ti['name'][:55]} RMSE={tensor_rmse:.6e}") except Exception as e: print(f"\n [Q2_K] {ti['name'][:55]} RMSE=err({e})") quant_count += 1 total_quant_bytes += len(q2k_data) else: # Keep as-is (passthrough) fout.write(raw_data) total_keep_bytes += len(raw_data) # Alignment padding pad = align_offset(fout.tell()) - fout.tell() if pad > 0: fout.write(b'\x00' * pad) final_size = fout.tell() elapsed = time.time() - start_time print(f"\r {'█' * 40} 100.0% ({n_tensors}/{n_tensors}) {elapsed:.0f}s" + " " * 60) print() # ── Summary ── original_bytes = sum(ti['data_size'] for ti in tensor_infos) compression = original_bytes / max(final_size, 1) print(" ╔════════════════════════════════════════════════════════════════╗") print(" ║ RE-QUANTIZATION SUMMARY ║") print(" ╠════════════════════════════════════════════════════════════════╣") print(f" ║ Tensors quantized (Q2_K): {quant_count:<33d} ║") print(f" ║ Tensors kept as-is: {total_keep:<33d} ║") print(f" ║ Q2_K data: {total_quant_bytes:>12,} bytes ({total_quant_bytes/1024**2:>7.1f} MB) ║") print(f" ║ Kept data: {total_keep_bytes:>12,} bytes ({total_keep_bytes/1024**2:>7.1f} MB) ║") print(f" ║ Original size: {file_size:>12,} bytes ({file_size/1024**3:>7.2f} GB) ║") print(f" ║ Output size: {final_size:>12,} bytes ({final_size/1024**3:>7.2f} GB) ║") print(f" ║ Compression: {compression:>42.1f}x ║") if q2k_tensor_count > 0: mean_rmse = q2k_rmse_sum / q2k_tensor_count print(f" ║ Mean Q2_K RMSE: {mean_rmse:>12.6e} ║") print(f" ║ Total time: {elapsed:>39.1f} sec ║") print(" ╚════════════════════════════════════════════════════════════════╝") print() print(f" Output: {output_path}") print() if __name__ == '__main__': main()