Q-TensorFormer / benchmark_v4.py
Premchan369's picture
Upload benchmark_v4.py
e398782 verified
Raw
History Blame Contribute Delete
12.1 kB
#!/usr/bin/env python3
"""
Q-TensorFormer v4 — Comprehensive Benchmark Suite
Compares:
1. Dense Baseline (standard transformer)
2. Tensor-Only (TT-FFN, no quantum)
3. Full v3 (TT-FFN + quantum + adaptive rank)
4. Full v4 (v3 + QKAN DARUAN + energy-aware)
Metrics:
- Parameters, Perplexity, Latency, Energy, Carbon
Usage:
python benchmark_v4.py [--epochs N] [--use-qkan] [--output results.json]
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW
from torch.utils.data import DataLoader
import math
import json
import time
import os
import argparse
from pathlib import Path
from typing import Dict, List, Tuple
# ─── DARUAN ──────────────────────────────────────────────────────────────
class DARUAN(nn.Module):
def __init__(self, n_repeats=3):
super().__init__()
self.n_repeats = n_repeats
self.activation = nn.SiLU()
self.pre_weights = nn.ParameterList([
nn.Parameter(torch.ones(1) * 0.1) for _ in range(n_repeats)
])
self.pre_biases = nn.ParameterList([
nn.Parameter(torch.zeros(1)) for _ in range(n_repeats)
])
self.post_weights = nn.ParameterList([
nn.Parameter(torch.ones(1) * 0.5) for _ in range(n_repeats + 1)
])
def forward(self, x):
out = self.post_weights[0] * x
for r in range(self.n_repeats):
z = self.pre_weights[r] * x + self.pre_biases[r]
out = out + self.post_weights[r + 1] * self.activation(z)
return out
# ─── Model Builders ───────────────────────────────────────────────────────
class TransformerBase(nn.Module):
"""Shared base with configurable FFN."""
def __init__(self, vocab_size, d_model=128, n_layers=3, n_heads=4,
max_seq_len=128, dropout=0.1, ffn_type="dense",
qkan_repeats=3):
super().__init__()
self.d_model = d_model
self.ffn_type = ffn_type
self.embedding = nn.Embedding(vocab_size, d_model)
pe = torch.zeros(max_seq_len, d_model)
pos = torch.arange(0, max_seq_len).float().unsqueeze(1)
div = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000) / d_model))
pe[:, 0::2] = torch.sin(pos * div)
pe[:, 1::2] = torch.cos(pos * div)
self.register_buffer("pos_encoding", pe.unsqueeze(0))
self.blocks = nn.ModuleList()
for _ in range(n_layers):
block = nn.ModuleDict({
"ln1": nn.LayerNorm(d_model),
"attn": nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True),
"ln2": nn.LayerNorm(d_model),
"ffn": self._build_ffn(d_model, ffn_type, qkan_repeats),
"dropout": nn.Dropout(dropout),
})
self.blocks.append(block)
self.ln_f = nn.LayerNorm(d_model)
self.lm_head = nn.Linear(d_model, vocab_size, bias=False)
self.lm_head.weight = self.embedding.weight
for name, p in self.named_parameters():
if "weight" in name and p.dim() >= 2:
nn.init.xavier_uniform_(p)
def _build_ffn(self, d_model, ffn_type, qkan_repeats):
expanded = d_model * 4
if ffn_type == "qkan":
return nn.Sequential(
nn.Linear(d_model, expanded),
DARUAN(n_repeats=qkan_repeats),
nn.Linear(expanded, d_model),
)
elif ffn_type == "dense_small":
return nn.Sequential(
nn.Linear(d_model, d_model * 2),
nn.GELU(),
nn.Linear(d_model * 2, d_model),
)
else: # dense
return nn.Sequential(
nn.Linear(d_model, expanded),
nn.GELU(),
nn.Linear(expanded, d_model),
)
def forward(self, input_ids):
x = self.embedding(input_ids)
x = x + self.pos_encoding[:, :x.size(1), :]
for block in self.blocks:
r = x
xn = block["ln1"](x)
ao, _ = block["attn"](xn, xn, xn, need_weights=False)
x = r + block["dropout"](ao)
r = x
fo = block["ffn"](block["ln2"](x))
x = r + block["dropout"](fo)
return self.lm_head(self.ln_f(x))
@property
def total_params(self):
return sum(p.numel() for p in self.parameters())
# ─── Synthetic Data ───────────────────────────────────────────────────────
def create_synthetic_data(vocab_size=10000, seq_len=128, n_train=5000, n_val=500, n_test=500):
"""Create synthetic language modeling data for quick benchmarks."""
torch.manual_seed(42)
train = torch.randint(0, vocab_size, (n_train, seq_len))
val = torch.randint(0, vocab_size, (n_val, seq_len))
test = torch.randint(0, vocab_size, (n_test, seq_len))
train_ds = torch.utils.data.TensorDataset(train, train)
val_ds = torch.utils.data.TensorDataset(val, val)
test_ds = torch.utils.data.TensorDataset(test, test)
return (
DataLoader(train_ds, batch_size=16, shuffle=True),
DataLoader(val_ds, batch_size=16),
DataLoader(test_ds, batch_size=16),
)
# ─── Benchmark Runner ─────────────────────────────────────────────────────
def benchmark_model(model, train_loader, val_loader, test_loader,
epochs=3, lr=3e-4, device="cpu", label=""):
"""Train and evaluate a model. Returns metrics dict."""
model = model.to(device)
optimizer = AdamW(model.parameters(), lr=lr, weight_decay=0.01)
pad_id = 0
best_val_ppl = float("inf")
train_times = []
for epoch in range(epochs):
model.train()
t0 = time.time()
total_loss = 0.0
tokens = 0
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
logits = model(inputs)
loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item() * inputs.numel()
tokens += inputs.numel()
train_time = time.time() - t0
train_times.append(train_time)
train_ppl = math.exp(min(total_loss / max(tokens, 1), 20))
# Validation
model.eval()
val_loss = 0.0
val_tokens = 0
with torch.no_grad():
for inputs, targets in val_loader:
inputs, targets = inputs.to(device), targets.to(device)
logits = model(inputs)
loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id, reduction="sum")
val_loss += loss.item()
val_tokens += inputs.numel()
val_ppl = math.exp(min(val_loss / max(val_tokens, 1), 20))
best_val_ppl = min(best_val_ppl, val_ppl)
print(f" [{label}] E{epoch+1}: train_ppl={train_ppl:.1f} val_ppl={val_ppl:.1f} time={train_time:.1f}s")
# Test
model.eval()
test_loss = 0.0
test_tokens = 0
latency_samples = []
with torch.no_grad():
for inputs, targets in test_loader:
inputs, targets = inputs.to(device), targets.to(device)
t0 = time.time()
logits = model(inputs)
t1 = time.time()
latency_samples.append((t1 - t0) * 1000 / inputs.size(0))
loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id, reduction="sum")
test_loss += loss.item()
test_tokens += inputs.numel()
test_ppl = math.exp(min(test_loss / max(test_tokens, 1), 20))
avg_latency = sum(latency_samples) / len(latency_samples)
params = model.total_params
# Energy estimate
flops_per_token = 2 * params
energy_uj = flops_per_token * 1.3e-9 * 128 # μJ (GPU approximate)
carbon_ng = energy_uj * 400 * 1e-6 # ng CO2
return {
"model": label,
"params": params,
"test_ppl": round(test_ppl, 2),
"best_val_ppl": round(best_val_ppl, 2),
"avg_latency_ms": round(avg_latency, 3),
"energy_uj": round(energy_uj, 2),
"carbon_ng": round(carbon_ng, 4),
"avg_train_time_s": round(sum(train_times) / len(train_times), 1),
"total_train_time_s": round(sum(train_times), 1),
"ffn_type": model.ffn_type,
"d_model": model.d_model,
"n_layers": len(model.blocks),
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", type=int, default=3)
parser.add_argument("--d-model", type=int, default=128)
parser.add_argument("--n-layers", type=int, default=3)
parser.add_argument("--output", type=str, default="benchmark_v4_results.json")
parser.add_argument("--device", type=str, default="cpu")
args = parser.parse_args()
print("=" * 60)
print("Q-TensorFormer v4 — Benchmark Suite")
print(f"Config: d={args.d_model}, layers={args.n_layers}, epochs={args.epochs}")
print("=" * 60)
# Synthetic data
vocab = 10000
seq_len = 128
train_loader, val_loader, test_loader = create_synthetic_data(
vocab_size=vocab, seq_len=seq_len,
n_train=3000, n_val=300, n_test=300,
)
# Models to compare
models = {
"dense": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="dense"),
"dense_small": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="dense_small"),
"qkan_v4": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="qkan", qkan_repeats=3),
}
results = []
for name, model in models.items():
print(f"\n{'─' * 40}")
print(f"Benchmarking: {name}")
print(f"Parameters: {model.total_params:,}")
print(f"{'─' * 40}")
result = benchmark_model(
model, train_loader, val_loader, test_loader,
epochs=args.epochs, device=args.device, label=name,
)
results.append(result)
# Summary table
print(f"\n{'=' * 80}")
print(f"{'Model':<15} {'Params':>10} {'Test PPL':>10} {'Latency':>10} {'Energy':>10} {'CO2':>10}")
print(f"{'─' * 80}")
for r in results:
print(f"{r['model']:<15} {r['params']:>10,} {r['test_ppl']:>10.2f} {r['avg_latency_ms']:>8.2f}ms {r['energy_uj']:>8.2f}μJ {r['carbon_ng']:>8.4f}ng")
# Compute compression and quality tradeoffs
dense = next(r for r in results if r["model"] == "dense")
for r in results:
if r["model"] != "dense":
r["compression_ratio"] = round(dense["params"] / r["params"], 2)
r["ppl_delta"] = round(r["test_ppl"] - dense["test_ppl"], 2)
r["energy_reduction_pct"] = round((1 - r["energy_uj"] / dense["energy_uj"]) * 100, 1)
print(f"\n{'─' * 80}")
print(f"Relative to Dense Baseline:")
print(f"{'Model':<15} {'Compression':>12} {'PPL Δ':>10} {'Energy ↓':>10}")
print(f"{'─' * 50}")
for r in results:
if r["model"] != "dense":
print(f"{r['model']:<15} {r['compression_ratio']:>9.1f}x {r['ppl_delta']:>+10.2f} {r['energy_reduction_pct']:>8.1f}%")
# Save
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
print(f"\n✅ Results saved to {args.output}")
return results
if __name__ == "__main__":
main()