| |
| """ |
| Q-TensorFormer v4 — Comprehensive Benchmark Suite |
| |
| Compares: |
| 1. Dense Baseline (standard transformer) |
| 2. Tensor-Only (TT-FFN, no quantum) |
| 3. Full v3 (TT-FFN + quantum + adaptive rank) |
| 4. Full v4 (v3 + QKAN DARUAN + energy-aware) |
| |
| Metrics: |
| - Parameters, Perplexity, Latency, Energy, Carbon |
| |
| Usage: |
| python benchmark_v4.py [--epochs N] [--use-qkan] [--output results.json] |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.optim import AdamW |
| from torch.utils.data import DataLoader |
| import math |
| import json |
| import time |
| import os |
| import argparse |
| from pathlib import Path |
| from typing import Dict, List, Tuple |
|
|
|
|
| |
|
|
| class DARUAN(nn.Module): |
| def __init__(self, n_repeats=3): |
| super().__init__() |
| self.n_repeats = n_repeats |
| self.activation = nn.SiLU() |
| self.pre_weights = nn.ParameterList([ |
| nn.Parameter(torch.ones(1) * 0.1) for _ in range(n_repeats) |
| ]) |
| self.pre_biases = nn.ParameterList([ |
| nn.Parameter(torch.zeros(1)) for _ in range(n_repeats) |
| ]) |
| self.post_weights = nn.ParameterList([ |
| nn.Parameter(torch.ones(1) * 0.5) for _ in range(n_repeats + 1) |
| ]) |
|
|
| def forward(self, x): |
| out = self.post_weights[0] * x |
| for r in range(self.n_repeats): |
| z = self.pre_weights[r] * x + self.pre_biases[r] |
| out = out + self.post_weights[r + 1] * self.activation(z) |
| return out |
|
|
|
|
| |
|
|
| class TransformerBase(nn.Module): |
| """Shared base with configurable FFN.""" |
| def __init__(self, vocab_size, d_model=128, n_layers=3, n_heads=4, |
| max_seq_len=128, dropout=0.1, ffn_type="dense", |
| qkan_repeats=3): |
| super().__init__() |
| self.d_model = d_model |
| self.ffn_type = ffn_type |
|
|
| self.embedding = nn.Embedding(vocab_size, d_model) |
| pe = torch.zeros(max_seq_len, d_model) |
| pos = torch.arange(0, max_seq_len).float().unsqueeze(1) |
| div = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000) / d_model)) |
| pe[:, 0::2] = torch.sin(pos * div) |
| pe[:, 1::2] = torch.cos(pos * div) |
| self.register_buffer("pos_encoding", pe.unsqueeze(0)) |
|
|
| self.blocks = nn.ModuleList() |
| for _ in range(n_layers): |
| block = nn.ModuleDict({ |
| "ln1": nn.LayerNorm(d_model), |
| "attn": nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True), |
| "ln2": nn.LayerNorm(d_model), |
| "ffn": self._build_ffn(d_model, ffn_type, qkan_repeats), |
| "dropout": nn.Dropout(dropout), |
| }) |
| self.blocks.append(block) |
|
|
| self.ln_f = nn.LayerNorm(d_model) |
| self.lm_head = nn.Linear(d_model, vocab_size, bias=False) |
| self.lm_head.weight = self.embedding.weight |
|
|
| for name, p in self.named_parameters(): |
| if "weight" in name and p.dim() >= 2: |
| nn.init.xavier_uniform_(p) |
|
|
| def _build_ffn(self, d_model, ffn_type, qkan_repeats): |
| expanded = d_model * 4 |
| if ffn_type == "qkan": |
| return nn.Sequential( |
| nn.Linear(d_model, expanded), |
| DARUAN(n_repeats=qkan_repeats), |
| nn.Linear(expanded, d_model), |
| ) |
| elif ffn_type == "dense_small": |
| return nn.Sequential( |
| nn.Linear(d_model, d_model * 2), |
| nn.GELU(), |
| nn.Linear(d_model * 2, d_model), |
| ) |
| else: |
| return nn.Sequential( |
| nn.Linear(d_model, expanded), |
| nn.GELU(), |
| nn.Linear(expanded, d_model), |
| ) |
|
|
| def forward(self, input_ids): |
| x = self.embedding(input_ids) |
| x = x + self.pos_encoding[:, :x.size(1), :] |
| for block in self.blocks: |
| r = x |
| xn = block["ln1"](x) |
| ao, _ = block["attn"](xn, xn, xn, need_weights=False) |
| x = r + block["dropout"](ao) |
| r = x |
| fo = block["ffn"](block["ln2"](x)) |
| x = r + block["dropout"](fo) |
| return self.lm_head(self.ln_f(x)) |
|
|
| @property |
| def total_params(self): |
| return sum(p.numel() for p in self.parameters()) |
|
|
|
|
| |
|
|
| def create_synthetic_data(vocab_size=10000, seq_len=128, n_train=5000, n_val=500, n_test=500): |
| """Create synthetic language modeling data for quick benchmarks.""" |
| torch.manual_seed(42) |
| train = torch.randint(0, vocab_size, (n_train, seq_len)) |
| val = torch.randint(0, vocab_size, (n_val, seq_len)) |
| test = torch.randint(0, vocab_size, (n_test, seq_len)) |
|
|
| train_ds = torch.utils.data.TensorDataset(train, train) |
| val_ds = torch.utils.data.TensorDataset(val, val) |
| test_ds = torch.utils.data.TensorDataset(test, test) |
|
|
| return ( |
| DataLoader(train_ds, batch_size=16, shuffle=True), |
| DataLoader(val_ds, batch_size=16), |
| DataLoader(test_ds, batch_size=16), |
| ) |
|
|
|
|
| |
|
|
| def benchmark_model(model, train_loader, val_loader, test_loader, |
| epochs=3, lr=3e-4, device="cpu", label=""): |
| """Train and evaluate a model. Returns metrics dict.""" |
| model = model.to(device) |
| optimizer = AdamW(model.parameters(), lr=lr, weight_decay=0.01) |
| pad_id = 0 |
|
|
| best_val_ppl = float("inf") |
| train_times = [] |
|
|
| for epoch in range(epochs): |
| model.train() |
| t0 = time.time() |
| total_loss = 0.0 |
| tokens = 0 |
|
|
| for inputs, targets in train_loader: |
| inputs, targets = inputs.to(device), targets.to(device) |
| optimizer.zero_grad() |
| logits = model(inputs) |
| loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id) |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| optimizer.step() |
| total_loss += loss.item() * inputs.numel() |
| tokens += inputs.numel() |
|
|
| train_time = time.time() - t0 |
| train_times.append(train_time) |
| train_ppl = math.exp(min(total_loss / max(tokens, 1), 20)) |
|
|
| |
| model.eval() |
| val_loss = 0.0 |
| val_tokens = 0 |
| with torch.no_grad(): |
| for inputs, targets in val_loader: |
| inputs, targets = inputs.to(device), targets.to(device) |
| logits = model(inputs) |
| loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id, reduction="sum") |
| val_loss += loss.item() |
| val_tokens += inputs.numel() |
|
|
| val_ppl = math.exp(min(val_loss / max(val_tokens, 1), 20)) |
| best_val_ppl = min(best_val_ppl, val_ppl) |
|
|
| print(f" [{label}] E{epoch+1}: train_ppl={train_ppl:.1f} val_ppl={val_ppl:.1f} time={train_time:.1f}s") |
|
|
| |
| model.eval() |
| test_loss = 0.0 |
| test_tokens = 0 |
| latency_samples = [] |
|
|
| with torch.no_grad(): |
| for inputs, targets in test_loader: |
| inputs, targets = inputs.to(device), targets.to(device) |
| t0 = time.time() |
| logits = model(inputs) |
| t1 = time.time() |
| latency_samples.append((t1 - t0) * 1000 / inputs.size(0)) |
| loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id, reduction="sum") |
| test_loss += loss.item() |
| test_tokens += inputs.numel() |
|
|
| test_ppl = math.exp(min(test_loss / max(test_tokens, 1), 20)) |
| avg_latency = sum(latency_samples) / len(latency_samples) |
| params = model.total_params |
|
|
| |
| flops_per_token = 2 * params |
| energy_uj = flops_per_token * 1.3e-9 * 128 |
| carbon_ng = energy_uj * 400 * 1e-6 |
|
|
| return { |
| "model": label, |
| "params": params, |
| "test_ppl": round(test_ppl, 2), |
| "best_val_ppl": round(best_val_ppl, 2), |
| "avg_latency_ms": round(avg_latency, 3), |
| "energy_uj": round(energy_uj, 2), |
| "carbon_ng": round(carbon_ng, 4), |
| "avg_train_time_s": round(sum(train_times) / len(train_times), 1), |
| "total_train_time_s": round(sum(train_times), 1), |
| "ffn_type": model.ffn_type, |
| "d_model": model.d_model, |
| "n_layers": len(model.blocks), |
| } |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--epochs", type=int, default=3) |
| parser.add_argument("--d-model", type=int, default=128) |
| parser.add_argument("--n-layers", type=int, default=3) |
| parser.add_argument("--output", type=str, default="benchmark_v4_results.json") |
| parser.add_argument("--device", type=str, default="cpu") |
| args = parser.parse_args() |
|
|
| print("=" * 60) |
| print("Q-TensorFormer v4 — Benchmark Suite") |
| print(f"Config: d={args.d_model}, layers={args.n_layers}, epochs={args.epochs}") |
| print("=" * 60) |
|
|
| |
| vocab = 10000 |
| seq_len = 128 |
| train_loader, val_loader, test_loader = create_synthetic_data( |
| vocab_size=vocab, seq_len=seq_len, |
| n_train=3000, n_val=300, n_test=300, |
| ) |
|
|
| |
| models = { |
| "dense": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="dense"), |
| "dense_small": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="dense_small"), |
| "qkan_v4": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="qkan", qkan_repeats=3), |
| } |
|
|
| results = [] |
| for name, model in models.items(): |
| print(f"\n{'─' * 40}") |
| print(f"Benchmarking: {name}") |
| print(f"Parameters: {model.total_params:,}") |
| print(f"{'─' * 40}") |
|
|
| result = benchmark_model( |
| model, train_loader, val_loader, test_loader, |
| epochs=args.epochs, device=args.device, label=name, |
| ) |
| results.append(result) |
|
|
| |
| print(f"\n{'=' * 80}") |
| print(f"{'Model':<15} {'Params':>10} {'Test PPL':>10} {'Latency':>10} {'Energy':>10} {'CO2':>10}") |
| print(f"{'─' * 80}") |
| for r in results: |
| print(f"{r['model']:<15} {r['params']:>10,} {r['test_ppl']:>10.2f} {r['avg_latency_ms']:>8.2f}ms {r['energy_uj']:>8.2f}μJ {r['carbon_ng']:>8.4f}ng") |
|
|
| |
| dense = next(r for r in results if r["model"] == "dense") |
| for r in results: |
| if r["model"] != "dense": |
| r["compression_ratio"] = round(dense["params"] / r["params"], 2) |
| r["ppl_delta"] = round(r["test_ppl"] - dense["test_ppl"], 2) |
| r["energy_reduction_pct"] = round((1 - r["energy_uj"] / dense["energy_uj"]) * 100, 1) |
| |
| print(f"\n{'─' * 80}") |
| print(f"Relative to Dense Baseline:") |
| print(f"{'Model':<15} {'Compression':>12} {'PPL Δ':>10} {'Energy ↓':>10}") |
| print(f"{'─' * 50}") |
| for r in results: |
| if r["model"] != "dense": |
| print(f"{r['model']:<15} {r['compression_ratio']:>9.1f}x {r['ppl_delta']:>+10.2f} {r['energy_reduction_pct']:>8.1f}%") |
|
|
| |
| with open(args.output, "w") as f: |
| json.dump(results, f, indent=2) |
| print(f"\n✅ Results saved to {args.output}") |
|
|
| return results |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|