#!/usr/bin/env python3 """ Q-TensorFormer v4 — Comprehensive Benchmark Suite Compares: 1. Dense Baseline (standard transformer) 2. Tensor-Only (TT-FFN, no quantum) 3. Full v3 (TT-FFN + quantum + adaptive rank) 4. Full v4 (v3 + QKAN DARUAN + energy-aware) Metrics: - Parameters, Perplexity, Latency, Energy, Carbon Usage: python benchmark_v4.py [--epochs N] [--use-qkan] [--output results.json] """ import torch import torch.nn as nn import torch.nn.functional as F from torch.optim import AdamW from torch.utils.data import DataLoader import math import json import time import os import argparse from pathlib import Path from typing import Dict, List, Tuple # ─── DARUAN ────────────────────────────────────────────────────────────── class DARUAN(nn.Module): def __init__(self, n_repeats=3): super().__init__() self.n_repeats = n_repeats self.activation = nn.SiLU() self.pre_weights = nn.ParameterList([ nn.Parameter(torch.ones(1) * 0.1) for _ in range(n_repeats) ]) self.pre_biases = nn.ParameterList([ nn.Parameter(torch.zeros(1)) for _ in range(n_repeats) ]) self.post_weights = nn.ParameterList([ nn.Parameter(torch.ones(1) * 0.5) for _ in range(n_repeats + 1) ]) def forward(self, x): out = self.post_weights[0] * x for r in range(self.n_repeats): z = self.pre_weights[r] * x + self.pre_biases[r] out = out + self.post_weights[r + 1] * self.activation(z) return out # ─── Model Builders ─────────────────────────────────────────────────────── class TransformerBase(nn.Module): """Shared base with configurable FFN.""" def __init__(self, vocab_size, d_model=128, n_layers=3, n_heads=4, max_seq_len=128, dropout=0.1, ffn_type="dense", qkan_repeats=3): super().__init__() self.d_model = d_model self.ffn_type = ffn_type self.embedding = nn.Embedding(vocab_size, d_model) pe = torch.zeros(max_seq_len, d_model) pos = torch.arange(0, max_seq_len).float().unsqueeze(1) div = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000) / d_model)) pe[:, 0::2] = torch.sin(pos * div) pe[:, 1::2] = torch.cos(pos * div) self.register_buffer("pos_encoding", pe.unsqueeze(0)) self.blocks = nn.ModuleList() for _ in range(n_layers): block = nn.ModuleDict({ "ln1": nn.LayerNorm(d_model), "attn": nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True), "ln2": nn.LayerNorm(d_model), "ffn": self._build_ffn(d_model, ffn_type, qkan_repeats), "dropout": nn.Dropout(dropout), }) self.blocks.append(block) self.ln_f = nn.LayerNorm(d_model) self.lm_head = nn.Linear(d_model, vocab_size, bias=False) self.lm_head.weight = self.embedding.weight for name, p in self.named_parameters(): if "weight" in name and p.dim() >= 2: nn.init.xavier_uniform_(p) def _build_ffn(self, d_model, ffn_type, qkan_repeats): expanded = d_model * 4 if ffn_type == "qkan": return nn.Sequential( nn.Linear(d_model, expanded), DARUAN(n_repeats=qkan_repeats), nn.Linear(expanded, d_model), ) elif ffn_type == "dense_small": return nn.Sequential( nn.Linear(d_model, d_model * 2), nn.GELU(), nn.Linear(d_model * 2, d_model), ) else: # dense return nn.Sequential( nn.Linear(d_model, expanded), nn.GELU(), nn.Linear(expanded, d_model), ) def forward(self, input_ids): x = self.embedding(input_ids) x = x + self.pos_encoding[:, :x.size(1), :] for block in self.blocks: r = x xn = block["ln1"](x) ao, _ = block["attn"](xn, xn, xn, need_weights=False) x = r + block["dropout"](ao) r = x fo = block["ffn"](block["ln2"](x)) x = r + block["dropout"](fo) return self.lm_head(self.ln_f(x)) @property def total_params(self): return sum(p.numel() for p in self.parameters()) # ─── Synthetic Data ─────────────────────────────────────────────────────── def create_synthetic_data(vocab_size=10000, seq_len=128, n_train=5000, n_val=500, n_test=500): """Create synthetic language modeling data for quick benchmarks.""" torch.manual_seed(42) train = torch.randint(0, vocab_size, (n_train, seq_len)) val = torch.randint(0, vocab_size, (n_val, seq_len)) test = torch.randint(0, vocab_size, (n_test, seq_len)) train_ds = torch.utils.data.TensorDataset(train, train) val_ds = torch.utils.data.TensorDataset(val, val) test_ds = torch.utils.data.TensorDataset(test, test) return ( DataLoader(train_ds, batch_size=16, shuffle=True), DataLoader(val_ds, batch_size=16), DataLoader(test_ds, batch_size=16), ) # ─── Benchmark Runner ───────────────────────────────────────────────────── def benchmark_model(model, train_loader, val_loader, test_loader, epochs=3, lr=3e-4, device="cpu", label=""): """Train and evaluate a model. Returns metrics dict.""" model = model.to(device) optimizer = AdamW(model.parameters(), lr=lr, weight_decay=0.01) pad_id = 0 best_val_ppl = float("inf") train_times = [] for epoch in range(epochs): model.train() t0 = time.time() total_loss = 0.0 tokens = 0 for inputs, targets in train_loader: inputs, targets = inputs.to(device), targets.to(device) optimizer.zero_grad() logits = model(inputs) loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() total_loss += loss.item() * inputs.numel() tokens += inputs.numel() train_time = time.time() - t0 train_times.append(train_time) train_ppl = math.exp(min(total_loss / max(tokens, 1), 20)) # Validation model.eval() val_loss = 0.0 val_tokens = 0 with torch.no_grad(): for inputs, targets in val_loader: inputs, targets = inputs.to(device), targets.to(device) logits = model(inputs) loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id, reduction="sum") val_loss += loss.item() val_tokens += inputs.numel() val_ppl = math.exp(min(val_loss / max(val_tokens, 1), 20)) best_val_ppl = min(best_val_ppl, val_ppl) print(f" [{label}] E{epoch+1}: train_ppl={train_ppl:.1f} val_ppl={val_ppl:.1f} time={train_time:.1f}s") # Test model.eval() test_loss = 0.0 test_tokens = 0 latency_samples = [] with torch.no_grad(): for inputs, targets in test_loader: inputs, targets = inputs.to(device), targets.to(device) t0 = time.time() logits = model(inputs) t1 = time.time() latency_samples.append((t1 - t0) * 1000 / inputs.size(0)) loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=pad_id, reduction="sum") test_loss += loss.item() test_tokens += inputs.numel() test_ppl = math.exp(min(test_loss / max(test_tokens, 1), 20)) avg_latency = sum(latency_samples) / len(latency_samples) params = model.total_params # Energy estimate flops_per_token = 2 * params energy_uj = flops_per_token * 1.3e-9 * 128 # μJ (GPU approximate) carbon_ng = energy_uj * 400 * 1e-6 # ng CO2 return { "model": label, "params": params, "test_ppl": round(test_ppl, 2), "best_val_ppl": round(best_val_ppl, 2), "avg_latency_ms": round(avg_latency, 3), "energy_uj": round(energy_uj, 2), "carbon_ng": round(carbon_ng, 4), "avg_train_time_s": round(sum(train_times) / len(train_times), 1), "total_train_time_s": round(sum(train_times), 1), "ffn_type": model.ffn_type, "d_model": model.d_model, "n_layers": len(model.blocks), } def main(): parser = argparse.ArgumentParser() parser.add_argument("--epochs", type=int, default=3) parser.add_argument("--d-model", type=int, default=128) parser.add_argument("--n-layers", type=int, default=3) parser.add_argument("--output", type=str, default="benchmark_v4_results.json") parser.add_argument("--device", type=str, default="cpu") args = parser.parse_args() print("=" * 60) print("Q-TensorFormer v4 — Benchmark Suite") print(f"Config: d={args.d_model}, layers={args.n_layers}, epochs={args.epochs}") print("=" * 60) # Synthetic data vocab = 10000 seq_len = 128 train_loader, val_loader, test_loader = create_synthetic_data( vocab_size=vocab, seq_len=seq_len, n_train=3000, n_val=300, n_test=300, ) # Models to compare models = { "dense": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="dense"), "dense_small": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="dense_small"), "qkan_v4": TransformerBase(vocab, args.d_model, args.n_layers, 4, seq_len, ffn_type="qkan", qkan_repeats=3), } results = [] for name, model in models.items(): print(f"\n{'─' * 40}") print(f"Benchmarking: {name}") print(f"Parameters: {model.total_params:,}") print(f"{'─' * 40}") result = benchmark_model( model, train_loader, val_loader, test_loader, epochs=args.epochs, device=args.device, label=name, ) results.append(result) # Summary table print(f"\n{'=' * 80}") print(f"{'Model':<15} {'Params':>10} {'Test PPL':>10} {'Latency':>10} {'Energy':>10} {'CO2':>10}") print(f"{'─' * 80}") for r in results: print(f"{r['model']:<15} {r['params']:>10,} {r['test_ppl']:>10.2f} {r['avg_latency_ms']:>8.2f}ms {r['energy_uj']:>8.2f}μJ {r['carbon_ng']:>8.4f}ng") # Compute compression and quality tradeoffs dense = next(r for r in results if r["model"] == "dense") for r in results: if r["model"] != "dense": r["compression_ratio"] = round(dense["params"] / r["params"], 2) r["ppl_delta"] = round(r["test_ppl"] - dense["test_ppl"], 2) r["energy_reduction_pct"] = round((1 - r["energy_uj"] / dense["energy_uj"]) * 100, 1) print(f"\n{'─' * 80}") print(f"Relative to Dense Baseline:") print(f"{'Model':<15} {'Compression':>12} {'PPL Δ':>10} {'Energy ↓':>10}") print(f"{'─' * 50}") for r in results: if r["model"] != "dense": print(f"{r['model']:<15} {r['compression_ratio']:>9.1f}x {r['ppl_delta']:>+10.2f} {r['energy_reduction_pct']:>8.1f}%") # Save with open(args.output, "w") as f: json.dump(results, f, indent=2) print(f"\n✅ Results saved to {args.output}") return results if __name__ == "__main__": main()