| """ |
| V4 integration tests for Q-TensorFormer. |
| |
| Tests QKAN DARUAN activations, energy-aware training, |
| and the combined v4 pipeline. |
| """ |
|
|
| import torch |
| import sys |
| import os |
|
|
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from src.qkan import DARUAN, QKANLayer, HQKANFFN, create_qkan_ffn |
| from src.energy_v4 import ( |
| EnergyEstimatorV4, ParetoTracker, HARDWARE_PROFILES, |
| estimate_model_energy, HardwareProfile |
| ) |
| from src.config import ModelConfig, TrainingConfig, BudgetConfig |
|
|
|
|
| def test_daruan_basic(): |
| """Test DARUAN activation on scalar input.""" |
| daruan = DARUAN(n_repeats=3, base_activation="silu") |
| x = torch.randn(10) |
| out = daruan(x) |
| assert out.shape == (10,), f"Expected (10,), got {out.shape}" |
| assert not torch.isnan(out).any(), "NaN in DARUAN output" |
| print("✓ DARUAN basic: PASSED") |
|
|
|
|
| def test_daruan_batched(): |
| """Test DARUAN on batched tensor.""" |
| daruan = DARUAN(n_repeats=5, base_activation="gelu") |
| x = torch.randn(32, 128) |
| out = daruan(x) |
| assert out.shape == (32, 128), f"Expected (32, 128), got {out.shape}" |
| assert not torch.isnan(out).any(), "NaN in DARUAN output" |
| print("✓ DARUAN batched: PASSED") |
|
|
|
|
| def test_qkan_layer(): |
| """Test QKANLayer as drop-in for Linear + Activation.""" |
| layer = QKANLayer(128, 256, n_repeats=3) |
| x = torch.randn(16, 128) |
| out = layer(x) |
| assert out.shape == (16, 256), f"Expected (16, 256), got {out.shape}" |
|
|
| params = layer.parameter_count() |
| dense_params = 128 * 256 + 256 |
| print(f" QKAN params: {params} vs dense: {dense_params} ({(1 - params/dense_params)*100:.1f}% reduction)") |
| print("✓ QKANLayer: PASSED") |
|
|
|
|
| def test_hqkan_ffn(): |
| """Test HQKAN FFN as drop-in for transformer FFN.""" |
| ffn = HQKANFFN(hidden_dim=128, ff_multiplier=4, n_repeats=3) |
| x = torch.randn(8, 64, 128) |
| out = ffn(x) |
| assert out.shape == (8, 64, 128), f"Expected (8, 64, 128), got {out.shape}" |
| print(f" HQKAN FFN params: {ffn.total_params}") |
| print("✓ HQKAN FFN: PASSED") |
|
|
|
|
| def test_create_qkan_ffn(): |
| """Test factory function for all QKAN FFN variants.""" |
| |
| ffn_std = create_qkan_ffn(128, 4, n_repeats=3) |
| x = torch.randn(4, 32, 128) |
| out = ffn_std(x) |
| assert out.shape == (4, 32, 128) |
| print("✓ create_qkan_ffn (standard): PASSED") |
|
|
| |
| ffn_tt = create_qkan_ffn(128, 4, n_repeats=3, use_tt=True, tt_rank=4) |
| out = ffn_tt(x) |
| assert out.shape == (4, 32, 128), f"Expected (4, 32, 128), got {out.shape}" |
| print("✓ create_qkan_ffn (TT-hybrid): PASSED") |
|
|
|
|
| def test_energy_estimator(): |
| """Test hardware-aware energy estimator.""" |
| est = EnergyEstimatorV4("cpu_intel_xeon") |
|
|
| |
| flops = 1e9 |
| energy = est.compute_energy(flops, batch_size=16, memory_gb=0.5) |
| assert energy > 0, f"Energy should be positive, got {energy}" |
| print(f" Energy for 1 GFLOP on CPU: {energy:.2f} μJ") |
|
|
| |
| carbon = est.carbon_footprint(energy) |
| assert carbon > 0, f"Carbon should be positive" |
| print(f" Carbon: {carbon:.6f} g CO2") |
| print("✓ EnergyEstimator: PASSED") |
|
|
|
|
| def test_energy_all_hardware(): |
| """Test energy estimation across all hardware targets.""" |
| est = EnergyEstimatorV4() |
| flops = 1e9 |
|
|
| print(" Hardware comparison (1 GFLOP):") |
| for hw_name in ["cpu_intel_xeon", "cpu_apple_m2", "gpu_a100", "edge_tpu", "edge_mobile"]: |
| est.set_hardware(hw_name) |
| energy = est.compute_energy(flops, batch_size=16) |
| print(f" {HARDWARE_PROFILES[hw_name].name}: {energy:.4f} μJ") |
| print("✓ All hardware targets: PASSED") |
|
|
|
|
| def test_quantum_energy(): |
| """Test quantum circuit energy estimation.""" |
| est = EnergyEstimatorV4("cpu_intel_xeon") |
| energy = est.quantum_energy(n_qubits=4, n_layers=2, n_tokens=100) |
| assert energy > 0 |
| print(f" Quantum energy (4 qubits, 2 layers, 100 tokens): {energy:.2f} μJ") |
| print("✓ Quantum energy estimation: PASSED") |
|
|
|
|
| def test_training_energy(): |
| """Test total training energy estimate.""" |
| est = EnergyEstimatorV4("gpu_a100") |
| result = est.training_energy_estimate( |
| total_flops=1e9, |
| n_epochs=10, |
| batch_size=16, |
| dataset_size=10000, |
| quantum_tokens_per_batch=128, |
| n_qubits=4, |
| n_qlayers=2, |
| ) |
| assert "total_energy_uj" in result |
| print(f" Total training energy: {result['total_energy_j']:.4f} J") |
| print(f" Carbon: {result['carbon_g']:.4f} g CO2") |
| print(f" Equivalent smartphone charges: {result['equivalent_smartphone_charges']:.4f}") |
| print("✓ Training energy estimate: PASSED") |
|
|
|
|
| def test_pareto_tracker(): |
| """Test Pareto frontier tracking.""" |
| tracker = ParetoTracker() |
|
|
| |
| assert tracker.record(ppl=100, energy_uj=1000, step=0) |
| assert tracker.record(ppl=80, energy_uj=900, step=1) |
| assert not tracker.record(ppl=90, energy_uj=950, step=2) |
| assert tracker.record(ppl=75, energy_uj=1200, step=3) |
|
|
| summary = tracker.summary() |
| assert summary["points"] == 3, f"Expected 3 Pareto points, got {summary['points']}" |
| print(f" Pareto frontier: {summary['frontier']}") |
| print("✓ ParetoTracker: PASSED") |
|
|
|
|
| def test_budget_integration(): |
| """Test budget constraints with energy-aware optimization.""" |
| config = ModelConfig( |
| d_model=64, n_layers=2, n_heads=4, tt_rank=4, |
| vocab_size=5000, use_quantum=False, |
| ) |
| budget = BudgetConfig( |
| max_params=500000, |
| max_latency_ms=50.0, |
| max_energy_per_query=100.0, |
| ) |
|
|
| |
| config.validate() |
| budget.validate() |
|
|
| print(f" Model config: d={config.d_model}, layers={config.n_layers}") |
| print(f" Budget: params≤{budget.max_params}, latency≤{budget.max_latency_ms}ms, energy≤{budget.max_energy_per_query}μJ") |
| print("✓ Budget integration: PASSED") |
|
|
|
|
| def test_e2e_v4_pipeline(): |
| """End-to-end v4 pipeline test.""" |
| from src.models import create_model |
| from src.config import ModelConfig |
| from src.energy_v4 import estimate_model_energy, EnergyEstimatorV4 |
|
|
| config = ModelConfig( |
| vocab_size=1000, |
| d_model=64, |
| n_layers=2, |
| n_heads=4, |
| tt_rank=4, |
| max_seq_len=64, |
| n_qubits=4, |
| use_quantum=False, |
| ) |
|
|
| model = create_model(config, model_type="qtensor") |
|
|
| |
| x = torch.randint(0, 1000, (2, 16)) |
| logits = model(x) |
| assert logits.shape == (2, 16, 1000), f"Expected (2, 16, 1000), got {logits.shape}" |
|
|
| |
| est = EnergyEstimatorV4("cpu_apple_m2") |
| est_result = estimate_model_energy(model, est, seq_len=64, batch_size=2) |
| print(f" E2E energy: {est_result['energy_uj']:.2f} μJ") |
| print(f" E2E carbon: {est_result['carbon_per_query_ug']:.4f} μg CO2") |
| print(f" E2E params: {est_result['params']}") |
| print("✓ End-to-end v4 pipeline: PASSED") |
|
|
|
|
| if __name__ == "__main__": |
| print("=" * 60) |
| print("Q-TensorFormer v4 — Integration Tests") |
| print("=" * 60) |
|
|
| tests = [ |
| ("DARUAN basic", test_daruan_basic), |
| ("DARUAN batched", test_daruan_batched), |
| ("QKANLayer", test_qkan_layer), |
| ("HQKAN FFN", test_hqkan_ffn), |
| ("create_qkan_ffn", test_create_qkan_ffn), |
| ("EnergyEstimator", test_energy_estimator), |
| ("All Hardware", test_energy_all_hardware), |
| ("Quantum Energy", test_quantum_energy), |
| ("Training Energy", test_training_energy), |
| ("ParetoTracker", test_pareto_tracker), |
| ("Budget Integration", test_budget_integration), |
| ("E2E v4 Pipeline", test_e2e_v4_pipeline), |
| ] |
|
|
| passed = 0 |
| failed = 0 |
| for name, test_fn in tests: |
| try: |
| test_fn() |
| passed += 1 |
| except Exception as e: |
| print(f"✗ {name}: FAILED — {e}") |
| failed += 1 |
|
|
| print(f"\n{'=' * 60}") |
| print(f"Results: {passed}/{passed + failed} tests passed") |
| if failed: |
| print(f"FAILED: {failed} test(s)") |
| else: |
| print("✅ ALL TESTS PASSED") |
|
|