| """ |
| Formula Engine: Discovers mathematical patterns in Qwen 0.5B weights |
| and creates compact formula representations for reconstruction. |
| |
| Architecture of Qwen2.5-0.5B-Instruct: |
| - hidden_size: 896 |
| - intermediate_size: 4864 |
| - num_attention_heads: 14 |
| - num_key_value_heads: 2 |
| - num_hidden_layers: 24 |
| - vocab_size: 151936 |
| |
| Weight layers per transformer block: |
| - self_attn.q_proj: (896, 896) - 14 heads |
| - self_attn.k_proj: (128, 896) - 2 KV heads |
| - self_attn.v_proj: (128, 896) - 2 KV heads |
| - self_attn.o_proj: (896, 896) |
| - mlp.gate_proj: (4864, 896) |
| - mlp.up_proj: (4864, 896) |
| - mlp.down_proj: (896, 4864) |
| - input_layernorm: (896,) |
| - post_attention_layernorm: (896,) |
| |
| Plus: |
| - embed_tokens: (151936, 896) - shared with lm_head (tied) |
| |
| Compression Strategy: |
| - MLP layers (gate, up, down): SVD low-rank factorization (biggest savings) |
| - Attention K/V: SVD (small rectangular matrices) |
| - Attention Q/O: Quantization to int8 + scale factors |
| - Norms: Store as-is (tiny - 896 floats each) |
| - Embeddings: SVD + quantization hybrid |
| """ |
|
|
| import torch |
| import numpy as np |
| import json |
| import os |
| from pathlib import Path |
| from typing import Dict, Tuple, Optional |
| import struct |
| import io |
|
|
|
|
| class FormulaEngine: |
| """ |
| Discovers compact formula representations for neural network weights. |
| |
| A "formula" here means: instead of storing the full weight matrix W (shape m×n), |
| we store a factorized/compressed representation that can reconstruct W. |
| |
| Methods: |
| 1. SVD: W ≈ U_r @ diag(S_r) @ V_r^T (stores U_r, S_r, V_r) |
| 2. Quantized: W ≈ scale * W_int8 + zero_point |
| 3. Hybrid: SVD on large matrices + quantization on small ones |
| """ |
| |
| def __init__(self, compression_ratio: float = 0.4, quality_threshold: float = 0.99): |
| """ |
| Args: |
| compression_ratio: Target ratio of compressed/original size (0.4 = 60% smaller) |
| quality_threshold: Minimum cosine similarity for reconstruction |
| """ |
| self.compression_ratio = compression_ratio |
| self.quality_threshold = quality_threshold |
| self.formulas = {} |
| self.metadata = {} |
| |
| def analyze_weight(self, name: str, weight: torch.Tensor) -> dict: |
| """Analyze a weight tensor to find best compression strategy.""" |
| shape = weight.shape |
| numel = weight.numel() |
| |
| |
| stats = { |
| "name": name, |
| "shape": list(shape), |
| "numel": numel, |
| "bytes_fp16": numel * 2, |
| "mean": float(weight.float().mean()), |
| "std": float(weight.float().std()), |
| "min": float(weight.float().min()), |
| "max": float(weight.float().max()), |
| } |
| |
| if len(shape) == 2: |
| m, n = shape |
| |
| breakeven_rank = (m * n) / (m + n + 1) |
| target_rank = int(breakeven_rank * self.compression_ratio) |
| |
| stats["breakeven_rank"] = int(breakeven_rank) |
| stats["target_rank"] = target_rank |
| stats["svd_viable"] = target_rank > 0 and target_rank < min(m, n) |
| |
| |
| |
| if m == n: |
| stats["recommended"] = "quantize" |
| elif max(m, n) / min(m, n) > 2: |
| stats["recommended"] = "svd" |
| else: |
| stats["recommended"] = "svd" if stats["svd_viable"] else "quantize" |
| elif len(shape) == 1: |
| stats["recommended"] = "store_raw" |
| else: |
| stats["recommended"] = "quantize" |
| |
| return stats |
| |
| def svd_compress(self, weight: torch.Tensor, rank: int) -> dict: |
| """ |
| Compress weight via truncated SVD. |
| W ≈ U[:, :r] @ diag(S[:r]) @ Vh[:r, :] |
| |
| Storage: U (m×r) + S (r) + Vh (r×n) in float16 |
| vs original: W (m×n) in float16 |
| |
| Savings when: r * (m + n + 1) < m * n |
| """ |
| W = weight.float() |
| U, S, Vh = torch.linalg.svd(W, full_matrices=False) |
| |
| |
| U_r = U[:, :rank].half() |
| S_r = S[:rank].half() |
| Vh_r = Vh[:rank, :].half() |
| |
| |
| original_size = W.numel() * 2 |
| compressed_size = (U_r.numel() + S_r.numel() + Vh_r.numel()) * 2 |
| |
| |
| W_reconstructed = (U_r.float() @ torch.diag(S_r.float()) @ Vh_r.float()) |
| mse = float(((W - W_reconstructed) ** 2).mean()) |
| cos_sim = float(torch.nn.functional.cosine_similarity( |
| W.reshape(1, -1), W_reconstructed.reshape(1, -1) |
| )) |
| |
| |
| energy = float((S[:rank] ** 2).sum() / (S ** 2).sum()) |
| |
| return { |
| "type": "svd", |
| "rank": rank, |
| "original_shape": list(weight.shape), |
| "original_bytes": original_size, |
| "compressed_bytes": compressed_size, |
| "compression_ratio": compressed_size / original_size, |
| "mse": mse, |
| "cosine_similarity": cos_sim, |
| "energy_captured": energy, |
| "U": U_r, |
| "S": S_r, |
| "Vh": Vh_r, |
| } |
| |
| def quantize_compress(self, weight: torch.Tensor, bits: int = 4) -> dict: |
| """ |
| Compress weight via quantization. |
| W ≈ scale * W_int + zero_point (per-channel) |
| |
| For 4-bit: each value stored in 4 bits (2 values per byte) |
| """ |
| W = weight.float() |
| |
| if len(W.shape) == 2: |
| |
| w_min = W.min(dim=1, keepdim=True).values |
| w_max = W.max(dim=1, keepdim=True).values |
| else: |
| w_min = W.min() |
| w_max = W.max() |
| |
| |
| qmax = (2 ** bits) - 1 |
| scale = (w_max - w_min) / qmax |
| scale = scale.clamp(min=1e-8) |
| zero_point = (-w_min / scale).round().clamp(0, qmax) |
| |
| |
| W_q = ((W - w_min) / scale).round().clamp(0, qmax).to(torch.uint8) |
| |
| |
| W_reconstructed = W_q.float() * scale + w_min |
| mse = float(((W - W_reconstructed) ** 2).mean()) |
| cos_sim = float(torch.nn.functional.cosine_similarity( |
| W.reshape(1, -1), W_reconstructed.reshape(1, -1) |
| )) |
| |
| |
| original_size = W.numel() * 2 |
| quant_size = W.numel() * bits / 8 |
| meta_size = (scale.numel() + zero_point.numel()) * 2 |
| compressed_size = int(quant_size + meta_size) |
| |
| return { |
| "type": "quantize", |
| "bits": bits, |
| "original_shape": list(weight.shape), |
| "original_bytes": original_size, |
| "compressed_bytes": compressed_size, |
| "compression_ratio": compressed_size / original_size, |
| "mse": mse, |
| "cosine_similarity": cos_sim, |
| "W_q": W_q, |
| "scale": scale.half(), |
| "zero_point": zero_point.half(), |
| "w_min": w_min.half(), |
| } |
| |
| def find_best_formula(self, name: str, weight: torch.Tensor) -> dict: |
| """ |
| Find the best compression formula for a given weight tensor. |
| Tries multiple approaches and picks the one with best quality/size tradeoff. |
| """ |
| analysis = self.analyze_weight(name, weight) |
| |
| if analysis["recommended"] == "store_raw": |
| |
| return { |
| "type": "raw", |
| "original_shape": list(weight.shape), |
| "original_bytes": weight.numel() * 2, |
| "compressed_bytes": weight.numel() * 2, |
| "compression_ratio": 1.0, |
| "mse": 0.0, |
| "cosine_similarity": 1.0, |
| "data": weight.half(), |
| } |
| |
| results = [] |
| |
| if analysis["recommended"] == "svd" and len(weight.shape) == 2: |
| |
| m, n = weight.shape |
| breakeven = analysis["breakeven_rank"] |
| |
| for ratio in [0.3, 0.4, 0.5, 0.6]: |
| rank = max(1, int(breakeven * ratio)) |
| rank = min(rank, min(m, n) - 1) |
| result = self.svd_compress(weight, rank) |
| result["approach"] = f"svd_rank{rank}" |
| results.append(result) |
| |
| |
| for bits in [4, 8]: |
| result = self.quantize_compress(weight, bits) |
| result["approach"] = f"quant_{bits}bit" |
| results.append(result) |
| |
| |
| valid = [r for r in results if r["cosine_similarity"] >= self.quality_threshold] |
| |
| if not valid: |
| |
| valid = sorted(results, key=lambda x: x["cosine_similarity"], reverse=True) |
| |
| |
| best = min(valid, key=lambda x: x["compressed_bytes"]) |
| |
| |
| report = {k: v for k, v in best.items() if not isinstance(v, torch.Tensor)} |
| return best |
| |
| def compress_model(self, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct", |
| output_dir: str = "./formula_weights"): |
| """ |
| Compress entire model and save formulas. |
| """ |
| from transformers import AutoModelForCausalLM |
| |
| print(f"Loading model: {model_name}") |
| model = AutoModelForCausalLM.from_pretrained( |
| model_name, |
| dtype=torch.float16, |
| ) |
| |
| os.makedirs(output_dir, exist_ok=True) |
| |
| total_original = 0 |
| total_compressed = 0 |
| formula_index = {} |
| |
| print("\n" + "="*80) |
| print("FORMULA DISCOVERY ENGINE - Analyzing all model weights") |
| print("="*80) |
| |
| for name, param in model.named_parameters(): |
| weight = param.data |
| print(f"\n{'─'*60}") |
| print(f"Layer: {name}") |
| print(f" Shape: {list(weight.shape)} | Elements: {weight.numel():,} | Size: {weight.numel()*2/1024/1024:.2f} MB") |
| |
| |
| formula = self.find_best_formula(name, weight) |
| |
| print(f" Formula: {formula['type']} | Compression: {formula['compression_ratio']:.3f}") |
| print(f" Quality: cos_sim={formula['cosine_similarity']:.6f} | MSE={formula['mse']:.2e}") |
| print(f" Size: {formula['original_bytes']/1024/1024:.2f} MB → {formula['compressed_bytes']/1024/1024:.2f} MB") |
| |
| total_original += formula["original_bytes"] |
| total_compressed += formula["compressed_bytes"] |
| |
| |
| formula_file = name.replace(".", "_") + ".pt" |
| save_data = {} |
| |
| if formula["type"] == "svd": |
| save_data = { |
| "type": "svd", |
| "rank": formula["rank"], |
| "shape": formula["original_shape"], |
| "U": formula["U"], |
| "S": formula["S"], |
| "Vh": formula["Vh"], |
| } |
| elif formula["type"] == "quantize": |
| save_data = { |
| "type": "quantize", |
| "bits": formula["bits"], |
| "shape": formula["original_shape"], |
| "W_q": formula["W_q"], |
| "scale": formula["scale"], |
| "zero_point": formula["zero_point"], |
| "w_min": formula["w_min"], |
| } |
| elif formula["type"] == "raw": |
| save_data = { |
| "type": "raw", |
| "shape": formula["original_shape"], |
| "data": formula["data"], |
| } |
| |
| torch.save(save_data, os.path.join(output_dir, formula_file)) |
| |
| |
| formula_index[name] = { |
| "file": formula_file, |
| "type": formula["type"], |
| "shape": formula["original_shape"], |
| "compression_ratio": formula["compression_ratio"], |
| "cosine_similarity": formula["cosine_similarity"], |
| "mse": formula["mse"], |
| } |
| if formula["type"] == "svd": |
| formula_index[name]["rank"] = formula["rank"] |
| elif formula["type"] == "quantize": |
| formula_index[name]["bits"] = formula["bits"] |
| |
| |
| with open(os.path.join(output_dir, "formula_index.json"), "w") as f: |
| json.dump(formula_index, f, indent=2) |
| |
| |
| print("\n" + "="*80) |
| print("COMPRESSION SUMMARY") |
| print("="*80) |
| print(f"Original model size: {total_original/1024/1024:.1f} MB") |
| print(f"Compressed formula size: {total_compressed/1024/1024:.1f} MB") |
| print(f"Overall compression: {total_compressed/total_original:.3f} ({(1-total_compressed/total_original)*100:.1f}% smaller)") |
| print(f"Formula files saved to: {output_dir}/") |
| |
| self.metadata = { |
| "model_name": model_name, |
| "original_size_mb": total_original / 1024 / 1024, |
| "compressed_size_mb": total_compressed / 1024 / 1024, |
| "compression_ratio": total_compressed / total_original, |
| "num_layers": len(formula_index), |
| } |
| |
| with open(os.path.join(output_dir, "metadata.json"), "w") as f: |
| json.dump(self.metadata, f, indent=2) |
| |
| return formula_index |
|
|
|
|
| class FormulaReconstructor: |
| """ |
| Reconstructs model weights from formula files. |
| This is what runs at inference time instead of loading full weights. |
| """ |
| |
| def __init__(self, formula_dir: str): |
| self.formula_dir = formula_dir |
| with open(os.path.join(formula_dir, "formula_index.json"), "r") as f: |
| self.index = json.load(f) |
| |
| def reconstruct_weight(self, name: str) -> torch.Tensor: |
| """Reconstruct a single weight from its formula.""" |
| info = self.index[name] |
| data = torch.load( |
| os.path.join(self.formula_dir, info["file"]), |
| map_location="cpu", |
| weights_only=True |
| ) |
| |
| if data["type"] == "svd": |
| |
| U = data["U"].float() |
| S = data["S"].float() |
| Vh = data["Vh"].float() |
| W = U @ torch.diag(S) @ Vh |
| return W.half() |
| |
| elif data["type"] == "quantize": |
| |
| W_q = data["W_q"].float() |
| scale = data["scale"].float() |
| w_min = data["w_min"].float() |
| W = W_q * scale + w_min |
| return W.half() |
| |
| elif data["type"] == "raw": |
| return data["data"] |
| |
| raise ValueError(f"Unknown formula type: {data['type']}") |
| |
| def reconstruct_all(self) -> Dict[str, torch.Tensor]: |
| """Reconstruct all weights.""" |
| weights = {} |
| for name in self.index: |
| weights[name] = self.reconstruct_weight(name) |
| return weights |
| |
| def load_into_model(self, model): |
| """Load reconstructed weights into a model.""" |
| state_dict = {} |
| for name in self.index: |
| state_dict[name] = self.reconstruct_weight(name) |
| |
| model.load_state_dict(state_dict, strict=False) |
| return model |
|
|
|
|
| if __name__ == "__main__": |
| |
| engine = FormulaEngine( |
| compression_ratio=0.4, |
| quality_threshold=0.995 |
| ) |
| |
| formula_index = engine.compress_model( |
| model_name="Qwen/Qwen2.5-0.5B-Instruct", |
| output_dir="./formula_weights" |
| ) |
| |
| print("\n\nFormula discovery complete!") |
| print(f"Results saved to ./formula_weights/") |
|
|