qwen-formula-engine / formula_engine.py
arudradey's picture
Upload formula_engine.py with huggingface_hub
9e1709d verified
"""
Formula Engine: Discovers mathematical patterns in Qwen 0.5B weights
and creates compact formula representations for reconstruction.
Architecture of Qwen2.5-0.5B-Instruct:
- hidden_size: 896
- intermediate_size: 4864
- num_attention_heads: 14
- num_key_value_heads: 2
- num_hidden_layers: 24
- vocab_size: 151936
Weight layers per transformer block:
- self_attn.q_proj: (896, 896) - 14 heads
- self_attn.k_proj: (128, 896) - 2 KV heads
- self_attn.v_proj: (128, 896) - 2 KV heads
- self_attn.o_proj: (896, 896)
- mlp.gate_proj: (4864, 896)
- mlp.up_proj: (4864, 896)
- mlp.down_proj: (896, 4864)
- input_layernorm: (896,)
- post_attention_layernorm: (896,)
Plus:
- embed_tokens: (151936, 896) - shared with lm_head (tied)
Compression Strategy:
- MLP layers (gate, up, down): SVD low-rank factorization (biggest savings)
- Attention K/V: SVD (small rectangular matrices)
- Attention Q/O: Quantization to int8 + scale factors
- Norms: Store as-is (tiny - 896 floats each)
- Embeddings: SVD + quantization hybrid
"""
import torch
import numpy as np
import json
import os
from pathlib import Path
from typing import Dict, Tuple, Optional
import struct
import io
class FormulaEngine:
"""
Discovers compact formula representations for neural network weights.
A "formula" here means: instead of storing the full weight matrix W (shape m×n),
we store a factorized/compressed representation that can reconstruct W.
Methods:
1. SVD: W ≈ U_r @ diag(S_r) @ V_r^T (stores U_r, S_r, V_r)
2. Quantized: W ≈ scale * W_int8 + zero_point
3. Hybrid: SVD on large matrices + quantization on small ones
"""
def __init__(self, compression_ratio: float = 0.4, quality_threshold: float = 0.99):
"""
Args:
compression_ratio: Target ratio of compressed/original size (0.4 = 60% smaller)
quality_threshold: Minimum cosine similarity for reconstruction
"""
self.compression_ratio = compression_ratio
self.quality_threshold = quality_threshold
self.formulas = {} # name -> formula dict
self.metadata = {}
def analyze_weight(self, name: str, weight: torch.Tensor) -> dict:
"""Analyze a weight tensor to find best compression strategy."""
shape = weight.shape
numel = weight.numel()
# Basic statistics
stats = {
"name": name,
"shape": list(shape),
"numel": numel,
"bytes_fp16": numel * 2,
"mean": float(weight.float().mean()),
"std": float(weight.float().std()),
"min": float(weight.float().min()),
"max": float(weight.float().max()),
}
if len(shape) == 2:
m, n = shape
# SVD break-even: rank r saves space when r < m*n/(m+n+1)
breakeven_rank = (m * n) / (m + n + 1)
target_rank = int(breakeven_rank * self.compression_ratio)
stats["breakeven_rank"] = int(breakeven_rank)
stats["target_rank"] = target_rank
stats["svd_viable"] = target_rank > 0 and target_rank < min(m, n)
# Check if quantization would be better for this layer
# For square matrices, SVD savings are minimal
if m == n:
stats["recommended"] = "quantize"
elif max(m, n) / min(m, n) > 2:
stats["recommended"] = "svd"
else:
stats["recommended"] = "svd" if stats["svd_viable"] else "quantize"
elif len(shape) == 1:
stats["recommended"] = "store_raw" # Norms are tiny
else:
stats["recommended"] = "quantize"
return stats
def svd_compress(self, weight: torch.Tensor, rank: int) -> dict:
"""
Compress weight via truncated SVD.
W ≈ U[:, :r] @ diag(S[:r]) @ Vh[:r, :]
Storage: U (m×r) + S (r) + Vh (r×n) in float16
vs original: W (m×n) in float16
Savings when: r * (m + n + 1) < m * n
"""
W = weight.float()
U, S, Vh = torch.linalg.svd(W, full_matrices=False)
# Truncate to rank r
U_r = U[:, :rank].half()
S_r = S[:rank].half()
Vh_r = Vh[:rank, :].half()
# Calculate compression stats
original_size = W.numel() * 2 # fp16
compressed_size = (U_r.numel() + S_r.numel() + Vh_r.numel()) * 2
# Calculate reconstruction error
W_reconstructed = (U_r.float() @ torch.diag(S_r.float()) @ Vh_r.float())
mse = float(((W - W_reconstructed) ** 2).mean())
cos_sim = float(torch.nn.functional.cosine_similarity(
W.reshape(1, -1), W_reconstructed.reshape(1, -1)
))
# Energy captured (fraction of total singular value energy)
energy = float((S[:rank] ** 2).sum() / (S ** 2).sum())
return {
"type": "svd",
"rank": rank,
"original_shape": list(weight.shape),
"original_bytes": original_size,
"compressed_bytes": compressed_size,
"compression_ratio": compressed_size / original_size,
"mse": mse,
"cosine_similarity": cos_sim,
"energy_captured": energy,
"U": U_r,
"S": S_r,
"Vh": Vh_r,
}
def quantize_compress(self, weight: torch.Tensor, bits: int = 4) -> dict:
"""
Compress weight via quantization.
W ≈ scale * W_int + zero_point (per-channel)
For 4-bit: each value stored in 4 bits (2 values per byte)
"""
W = weight.float()
if len(W.shape) == 2:
# Per-channel (per-row) quantization
w_min = W.min(dim=1, keepdim=True).values
w_max = W.max(dim=1, keepdim=True).values
else:
w_min = W.min()
w_max = W.max()
# Compute scale and zero point
qmax = (2 ** bits) - 1
scale = (w_max - w_min) / qmax
scale = scale.clamp(min=1e-8)
zero_point = (-w_min / scale).round().clamp(0, qmax)
# Quantize
W_q = ((W - w_min) / scale).round().clamp(0, qmax).to(torch.uint8)
# Dequantize for error measurement
W_reconstructed = W_q.float() * scale + w_min
mse = float(((W - W_reconstructed) ** 2).mean())
cos_sim = float(torch.nn.functional.cosine_similarity(
W.reshape(1, -1), W_reconstructed.reshape(1, -1)
))
# Storage calculation
original_size = W.numel() * 2 # fp16
quant_size = W.numel() * bits / 8 # quantized weights
meta_size = (scale.numel() + zero_point.numel()) * 2 # scales in fp16
compressed_size = int(quant_size + meta_size)
return {
"type": "quantize",
"bits": bits,
"original_shape": list(weight.shape),
"original_bytes": original_size,
"compressed_bytes": compressed_size,
"compression_ratio": compressed_size / original_size,
"mse": mse,
"cosine_similarity": cos_sim,
"W_q": W_q,
"scale": scale.half(),
"zero_point": zero_point.half(),
"w_min": w_min.half(),
}
def find_best_formula(self, name: str, weight: torch.Tensor) -> dict:
"""
Find the best compression formula for a given weight tensor.
Tries multiple approaches and picks the one with best quality/size tradeoff.
"""
analysis = self.analyze_weight(name, weight)
if analysis["recommended"] == "store_raw":
# Tiny tensors (layer norms) - just store as-is
return {
"type": "raw",
"original_shape": list(weight.shape),
"original_bytes": weight.numel() * 2,
"compressed_bytes": weight.numel() * 2,
"compression_ratio": 1.0,
"mse": 0.0,
"cosine_similarity": 1.0,
"data": weight.half(),
}
results = []
if analysis["recommended"] == "svd" and len(weight.shape) == 2:
# Try multiple ranks
m, n = weight.shape
breakeven = analysis["breakeven_rank"]
for ratio in [0.3, 0.4, 0.5, 0.6]:
rank = max(1, int(breakeven * ratio))
rank = min(rank, min(m, n) - 1)
result = self.svd_compress(weight, rank)
result["approach"] = f"svd_rank{rank}"
results.append(result)
# Always try quantization
for bits in [4, 8]:
result = self.quantize_compress(weight, bits)
result["approach"] = f"quant_{bits}bit"
results.append(result)
# Pick best: highest quality that meets compression target
valid = [r for r in results if r["cosine_similarity"] >= self.quality_threshold]
if not valid:
# If nothing meets quality, pick best quality regardless
valid = sorted(results, key=lambda x: x["cosine_similarity"], reverse=True)
# Among valid, pick smallest
best = min(valid, key=lambda x: x["compressed_bytes"])
# Clean up - remove tensors for reporting
report = {k: v for k, v in best.items() if not isinstance(v, torch.Tensor)}
return best
def compress_model(self, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct",
output_dir: str = "./formula_weights"):
"""
Compress entire model and save formulas.
"""
from transformers import AutoModelForCausalLM
print(f"Loading model: {model_name}")
model = AutoModelForCausalLM.from_pretrained(
model_name,
dtype=torch.float16,
)
os.makedirs(output_dir, exist_ok=True)
total_original = 0
total_compressed = 0
formula_index = {}
print("\n" + "="*80)
print("FORMULA DISCOVERY ENGINE - Analyzing all model weights")
print("="*80)
for name, param in model.named_parameters():
weight = param.data
print(f"\n{'─'*60}")
print(f"Layer: {name}")
print(f" Shape: {list(weight.shape)} | Elements: {weight.numel():,} | Size: {weight.numel()*2/1024/1024:.2f} MB")
# Find best formula
formula = self.find_best_formula(name, weight)
print(f" Formula: {formula['type']} | Compression: {formula['compression_ratio']:.3f}")
print(f" Quality: cos_sim={formula['cosine_similarity']:.6f} | MSE={formula['mse']:.2e}")
print(f" Size: {formula['original_bytes']/1024/1024:.2f} MB → {formula['compressed_bytes']/1024/1024:.2f} MB")
total_original += formula["original_bytes"]
total_compressed += formula["compressed_bytes"]
# Save formula tensors
formula_file = name.replace(".", "_") + ".pt"
save_data = {}
if formula["type"] == "svd":
save_data = {
"type": "svd",
"rank": formula["rank"],
"shape": formula["original_shape"],
"U": formula["U"],
"S": formula["S"],
"Vh": formula["Vh"],
}
elif formula["type"] == "quantize":
save_data = {
"type": "quantize",
"bits": formula["bits"],
"shape": formula["original_shape"],
"W_q": formula["W_q"],
"scale": formula["scale"],
"zero_point": formula["zero_point"],
"w_min": formula["w_min"],
}
elif formula["type"] == "raw":
save_data = {
"type": "raw",
"shape": formula["original_shape"],
"data": formula["data"],
}
torch.save(save_data, os.path.join(output_dir, formula_file))
# Index entry
formula_index[name] = {
"file": formula_file,
"type": formula["type"],
"shape": formula["original_shape"],
"compression_ratio": formula["compression_ratio"],
"cosine_similarity": formula["cosine_similarity"],
"mse": formula["mse"],
}
if formula["type"] == "svd":
formula_index[name]["rank"] = formula["rank"]
elif formula["type"] == "quantize":
formula_index[name]["bits"] = formula["bits"]
# Save index
with open(os.path.join(output_dir, "formula_index.json"), "w") as f:
json.dump(formula_index, f, indent=2)
# Summary
print("\n" + "="*80)
print("COMPRESSION SUMMARY")
print("="*80)
print(f"Original model size: {total_original/1024/1024:.1f} MB")
print(f"Compressed formula size: {total_compressed/1024/1024:.1f} MB")
print(f"Overall compression: {total_compressed/total_original:.3f} ({(1-total_compressed/total_original)*100:.1f}% smaller)")
print(f"Formula files saved to: {output_dir}/")
self.metadata = {
"model_name": model_name,
"original_size_mb": total_original / 1024 / 1024,
"compressed_size_mb": total_compressed / 1024 / 1024,
"compression_ratio": total_compressed / total_original,
"num_layers": len(formula_index),
}
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump(self.metadata, f, indent=2)
return formula_index
class FormulaReconstructor:
"""
Reconstructs model weights from formula files.
This is what runs at inference time instead of loading full weights.
"""
def __init__(self, formula_dir: str):
self.formula_dir = formula_dir
with open(os.path.join(formula_dir, "formula_index.json"), "r") as f:
self.index = json.load(f)
def reconstruct_weight(self, name: str) -> torch.Tensor:
"""Reconstruct a single weight from its formula."""
info = self.index[name]
data = torch.load(
os.path.join(self.formula_dir, info["file"]),
map_location="cpu",
weights_only=True
)
if data["type"] == "svd":
# W = U @ diag(S) @ Vh
U = data["U"].float()
S = data["S"].float()
Vh = data["Vh"].float()
W = U @ torch.diag(S) @ Vh
return W.half()
elif data["type"] == "quantize":
# W = W_q * scale + w_min
W_q = data["W_q"].float()
scale = data["scale"].float()
w_min = data["w_min"].float()
W = W_q * scale + w_min
return W.half()
elif data["type"] == "raw":
return data["data"]
raise ValueError(f"Unknown formula type: {data['type']}")
def reconstruct_all(self) -> Dict[str, torch.Tensor]:
"""Reconstruct all weights."""
weights = {}
for name in self.index:
weights[name] = self.reconstruct_weight(name)
return weights
def load_into_model(self, model):
"""Load reconstructed weights into a model."""
state_dict = {}
for name in self.index:
state_dict[name] = self.reconstruct_weight(name)
model.load_state_dict(state_dict, strict=False)
return model
if __name__ == "__main__":
# Run the formula discovery engine
engine = FormulaEngine(
compression_ratio=0.4, # Target 60% compression
quality_threshold=0.995 # Minimum cosine similarity
)
formula_index = engine.compress_model(
model_name="Qwen/Qwen2.5-0.5B-Instruct",
output_dir="./formula_weights"
)
print("\n\nFormula discovery complete!")
print(f"Results saved to ./formula_weights/")