""" Formula Engine: Discovers mathematical patterns in Qwen 0.5B weights and creates compact formula representations for reconstruction. Architecture of Qwen2.5-0.5B-Instruct: - hidden_size: 896 - intermediate_size: 4864 - num_attention_heads: 14 - num_key_value_heads: 2 - num_hidden_layers: 24 - vocab_size: 151936 Weight layers per transformer block: - self_attn.q_proj: (896, 896) - 14 heads - self_attn.k_proj: (128, 896) - 2 KV heads - self_attn.v_proj: (128, 896) - 2 KV heads - self_attn.o_proj: (896, 896) - mlp.gate_proj: (4864, 896) - mlp.up_proj: (4864, 896) - mlp.down_proj: (896, 4864) - input_layernorm: (896,) - post_attention_layernorm: (896,) Plus: - embed_tokens: (151936, 896) - shared with lm_head (tied) Compression Strategy: - MLP layers (gate, up, down): SVD low-rank factorization (biggest savings) - Attention K/V: SVD (small rectangular matrices) - Attention Q/O: Quantization to int8 + scale factors - Norms: Store as-is (tiny - 896 floats each) - Embeddings: SVD + quantization hybrid """ import torch import numpy as np import json import os from pathlib import Path from typing import Dict, Tuple, Optional import struct import io class FormulaEngine: """ Discovers compact formula representations for neural network weights. A "formula" here means: instead of storing the full weight matrix W (shape m×n), we store a factorized/compressed representation that can reconstruct W. Methods: 1. SVD: W ≈ U_r @ diag(S_r) @ V_r^T (stores U_r, S_r, V_r) 2. Quantized: W ≈ scale * W_int8 + zero_point 3. Hybrid: SVD on large matrices + quantization on small ones """ def __init__(self, compression_ratio: float = 0.4, quality_threshold: float = 0.99): """ Args: compression_ratio: Target ratio of compressed/original size (0.4 = 60% smaller) quality_threshold: Minimum cosine similarity for reconstruction """ self.compression_ratio = compression_ratio self.quality_threshold = quality_threshold self.formulas = {} # name -> formula dict self.metadata = {} def analyze_weight(self, name: str, weight: torch.Tensor) -> dict: """Analyze a weight tensor to find best compression strategy.""" shape = weight.shape numel = weight.numel() # Basic statistics stats = { "name": name, "shape": list(shape), "numel": numel, "bytes_fp16": numel * 2, "mean": float(weight.float().mean()), "std": float(weight.float().std()), "min": float(weight.float().min()), "max": float(weight.float().max()), } if len(shape) == 2: m, n = shape # SVD break-even: rank r saves space when r < m*n/(m+n+1) breakeven_rank = (m * n) / (m + n + 1) target_rank = int(breakeven_rank * self.compression_ratio) stats["breakeven_rank"] = int(breakeven_rank) stats["target_rank"] = target_rank stats["svd_viable"] = target_rank > 0 and target_rank < min(m, n) # Check if quantization would be better for this layer # For square matrices, SVD savings are minimal if m == n: stats["recommended"] = "quantize" elif max(m, n) / min(m, n) > 2: stats["recommended"] = "svd" else: stats["recommended"] = "svd" if stats["svd_viable"] else "quantize" elif len(shape) == 1: stats["recommended"] = "store_raw" # Norms are tiny else: stats["recommended"] = "quantize" return stats def svd_compress(self, weight: torch.Tensor, rank: int) -> dict: """ Compress weight via truncated SVD. W ≈ U[:, :r] @ diag(S[:r]) @ Vh[:r, :] Storage: U (m×r) + S (r) + Vh (r×n) in float16 vs original: W (m×n) in float16 Savings when: r * (m + n + 1) < m * n """ W = weight.float() U, S, Vh = torch.linalg.svd(W, full_matrices=False) # Truncate to rank r U_r = U[:, :rank].half() S_r = S[:rank].half() Vh_r = Vh[:rank, :].half() # Calculate compression stats original_size = W.numel() * 2 # fp16 compressed_size = (U_r.numel() + S_r.numel() + Vh_r.numel()) * 2 # Calculate reconstruction error W_reconstructed = (U_r.float() @ torch.diag(S_r.float()) @ Vh_r.float()) mse = float(((W - W_reconstructed) ** 2).mean()) cos_sim = float(torch.nn.functional.cosine_similarity( W.reshape(1, -1), W_reconstructed.reshape(1, -1) )) # Energy captured (fraction of total singular value energy) energy = float((S[:rank] ** 2).sum() / (S ** 2).sum()) return { "type": "svd", "rank": rank, "original_shape": list(weight.shape), "original_bytes": original_size, "compressed_bytes": compressed_size, "compression_ratio": compressed_size / original_size, "mse": mse, "cosine_similarity": cos_sim, "energy_captured": energy, "U": U_r, "S": S_r, "Vh": Vh_r, } def quantize_compress(self, weight: torch.Tensor, bits: int = 4) -> dict: """ Compress weight via quantization. W ≈ scale * W_int + zero_point (per-channel) For 4-bit: each value stored in 4 bits (2 values per byte) """ W = weight.float() if len(W.shape) == 2: # Per-channel (per-row) quantization w_min = W.min(dim=1, keepdim=True).values w_max = W.max(dim=1, keepdim=True).values else: w_min = W.min() w_max = W.max() # Compute scale and zero point qmax = (2 ** bits) - 1 scale = (w_max - w_min) / qmax scale = scale.clamp(min=1e-8) zero_point = (-w_min / scale).round().clamp(0, qmax) # Quantize W_q = ((W - w_min) / scale).round().clamp(0, qmax).to(torch.uint8) # Dequantize for error measurement W_reconstructed = W_q.float() * scale + w_min mse = float(((W - W_reconstructed) ** 2).mean()) cos_sim = float(torch.nn.functional.cosine_similarity( W.reshape(1, -1), W_reconstructed.reshape(1, -1) )) # Storage calculation original_size = W.numel() * 2 # fp16 quant_size = W.numel() * bits / 8 # quantized weights meta_size = (scale.numel() + zero_point.numel()) * 2 # scales in fp16 compressed_size = int(quant_size + meta_size) return { "type": "quantize", "bits": bits, "original_shape": list(weight.shape), "original_bytes": original_size, "compressed_bytes": compressed_size, "compression_ratio": compressed_size / original_size, "mse": mse, "cosine_similarity": cos_sim, "W_q": W_q, "scale": scale.half(), "zero_point": zero_point.half(), "w_min": w_min.half(), } def find_best_formula(self, name: str, weight: torch.Tensor) -> dict: """ Find the best compression formula for a given weight tensor. Tries multiple approaches and picks the one with best quality/size tradeoff. """ analysis = self.analyze_weight(name, weight) if analysis["recommended"] == "store_raw": # Tiny tensors (layer norms) - just store as-is return { "type": "raw", "original_shape": list(weight.shape), "original_bytes": weight.numel() * 2, "compressed_bytes": weight.numel() * 2, "compression_ratio": 1.0, "mse": 0.0, "cosine_similarity": 1.0, "data": weight.half(), } results = [] if analysis["recommended"] == "svd" and len(weight.shape) == 2: # Try multiple ranks m, n = weight.shape breakeven = analysis["breakeven_rank"] for ratio in [0.3, 0.4, 0.5, 0.6]: rank = max(1, int(breakeven * ratio)) rank = min(rank, min(m, n) - 1) result = self.svd_compress(weight, rank) result["approach"] = f"svd_rank{rank}" results.append(result) # Always try quantization for bits in [4, 8]: result = self.quantize_compress(weight, bits) result["approach"] = f"quant_{bits}bit" results.append(result) # Pick best: highest quality that meets compression target valid = [r for r in results if r["cosine_similarity"] >= self.quality_threshold] if not valid: # If nothing meets quality, pick best quality regardless valid = sorted(results, key=lambda x: x["cosine_similarity"], reverse=True) # Among valid, pick smallest best = min(valid, key=lambda x: x["compressed_bytes"]) # Clean up - remove tensors for reporting report = {k: v for k, v in best.items() if not isinstance(v, torch.Tensor)} return best def compress_model(self, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct", output_dir: str = "./formula_weights"): """ Compress entire model and save formulas. """ from transformers import AutoModelForCausalLM print(f"Loading model: {model_name}") model = AutoModelForCausalLM.from_pretrained( model_name, dtype=torch.float16, ) os.makedirs(output_dir, exist_ok=True) total_original = 0 total_compressed = 0 formula_index = {} print("\n" + "="*80) print("FORMULA DISCOVERY ENGINE - Analyzing all model weights") print("="*80) for name, param in model.named_parameters(): weight = param.data print(f"\n{'─'*60}") print(f"Layer: {name}") print(f" Shape: {list(weight.shape)} | Elements: {weight.numel():,} | Size: {weight.numel()*2/1024/1024:.2f} MB") # Find best formula formula = self.find_best_formula(name, weight) print(f" Formula: {formula['type']} | Compression: {formula['compression_ratio']:.3f}") print(f" Quality: cos_sim={formula['cosine_similarity']:.6f} | MSE={formula['mse']:.2e}") print(f" Size: {formula['original_bytes']/1024/1024:.2f} MB → {formula['compressed_bytes']/1024/1024:.2f} MB") total_original += formula["original_bytes"] total_compressed += formula["compressed_bytes"] # Save formula tensors formula_file = name.replace(".", "_") + ".pt" save_data = {} if formula["type"] == "svd": save_data = { "type": "svd", "rank": formula["rank"], "shape": formula["original_shape"], "U": formula["U"], "S": formula["S"], "Vh": formula["Vh"], } elif formula["type"] == "quantize": save_data = { "type": "quantize", "bits": formula["bits"], "shape": formula["original_shape"], "W_q": formula["W_q"], "scale": formula["scale"], "zero_point": formula["zero_point"], "w_min": formula["w_min"], } elif formula["type"] == "raw": save_data = { "type": "raw", "shape": formula["original_shape"], "data": formula["data"], } torch.save(save_data, os.path.join(output_dir, formula_file)) # Index entry formula_index[name] = { "file": formula_file, "type": formula["type"], "shape": formula["original_shape"], "compression_ratio": formula["compression_ratio"], "cosine_similarity": formula["cosine_similarity"], "mse": formula["mse"], } if formula["type"] == "svd": formula_index[name]["rank"] = formula["rank"] elif formula["type"] == "quantize": formula_index[name]["bits"] = formula["bits"] # Save index with open(os.path.join(output_dir, "formula_index.json"), "w") as f: json.dump(formula_index, f, indent=2) # Summary print("\n" + "="*80) print("COMPRESSION SUMMARY") print("="*80) print(f"Original model size: {total_original/1024/1024:.1f} MB") print(f"Compressed formula size: {total_compressed/1024/1024:.1f} MB") print(f"Overall compression: {total_compressed/total_original:.3f} ({(1-total_compressed/total_original)*100:.1f}% smaller)") print(f"Formula files saved to: {output_dir}/") self.metadata = { "model_name": model_name, "original_size_mb": total_original / 1024 / 1024, "compressed_size_mb": total_compressed / 1024 / 1024, "compression_ratio": total_compressed / total_original, "num_layers": len(formula_index), } with open(os.path.join(output_dir, "metadata.json"), "w") as f: json.dump(self.metadata, f, indent=2) return formula_index class FormulaReconstructor: """ Reconstructs model weights from formula files. This is what runs at inference time instead of loading full weights. """ def __init__(self, formula_dir: str): self.formula_dir = formula_dir with open(os.path.join(formula_dir, "formula_index.json"), "r") as f: self.index = json.load(f) def reconstruct_weight(self, name: str) -> torch.Tensor: """Reconstruct a single weight from its formula.""" info = self.index[name] data = torch.load( os.path.join(self.formula_dir, info["file"]), map_location="cpu", weights_only=True ) if data["type"] == "svd": # W = U @ diag(S) @ Vh U = data["U"].float() S = data["S"].float() Vh = data["Vh"].float() W = U @ torch.diag(S) @ Vh return W.half() elif data["type"] == "quantize": # W = W_q * scale + w_min W_q = data["W_q"].float() scale = data["scale"].float() w_min = data["w_min"].float() W = W_q * scale + w_min return W.half() elif data["type"] == "raw": return data["data"] raise ValueError(f"Unknown formula type: {data['type']}") def reconstruct_all(self) -> Dict[str, torch.Tensor]: """Reconstruct all weights.""" weights = {} for name in self.index: weights[name] = self.reconstruct_weight(name) return weights def load_into_model(self, model): """Load reconstructed weights into a model.""" state_dict = {} for name in self.index: state_dict[name] = self.reconstruct_weight(name) model.load_state_dict(state_dict, strict=False) return model if __name__ == "__main__": # Run the formula discovery engine engine = FormulaEngine( compression_ratio=0.4, # Target 60% compression quality_threshold=0.995 # Minimum cosine similarity ) formula_index = engine.compress_model( model_name="Qwen/Qwen2.5-0.5B-Instruct", output_dir="./formula_weights" ) print("\n\nFormula discovery complete!") print(f"Results saved to ./formula_weights/")