| """ |
| Comprehensive verification test for LiquidFlow. |
| Tests: syntax, imports, forward pass, backward pass, dimension correctness, |
| gradient flow, training step, and performance. |
| |
| Run: python test_verify.py |
| """ |
| import sys |
| import os |
| import time |
| import traceback |
|
|
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' |
| print(f"Device: {DEVICE}") |
| print(f"PyTorch: {torch.__version__}") |
| if DEVICE == 'cuda': |
| print(f"GPU: {torch.cuda.get_device_name(0)}") |
| print("=" * 70) |
|
|
| errors = [] |
| passed = 0 |
|
|
| def test(name, fn): |
| global passed, errors |
| try: |
| fn() |
| print(f" ✓ {name}") |
| passed += 1 |
| except Exception as e: |
| msg = f" ✗ {name}: {e}" |
| print(msg) |
| traceback.print_exc() |
| errors.append(msg) |
|
|
| |
| |
| |
| print("\n=== 1. CfC Cell ===") |
|
|
| def test_cfc_forward(): |
| from liquid_flow.cfc_cell import CfCCell |
| cell = CfCCell(dim=64).to(DEVICE) |
| x = torch.randn(2, 256, 64, device=DEVICE) |
| out = cell(x) |
| assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}" |
|
|
| def test_cfc_backward(): |
| from liquid_flow.cfc_cell import CfCCell |
| cell = CfCCell(dim=64).to(DEVICE) |
| x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True) |
| out = cell(x) |
| loss = out.sum() |
| loss.backward() |
| assert x.grad is not None, "No gradient on input" |
| assert not torch.isnan(x.grad).any(), "NaN in gradients" |
|
|
| def test_cfc_block_2d(): |
| from liquid_flow.cfc_cell import CfCBlock |
| block = CfCBlock(dim=64).to(DEVICE) |
| x = torch.randn(2, 64, 16, 16, device=DEVICE) |
| out = block(x) |
| assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}" |
|
|
| def test_cfc_block_backward(): |
| from liquid_flow.cfc_cell import CfCBlock |
| block = CfCBlock(dim=64).to(DEVICE) |
| x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True) |
| out = block(x) |
| loss = out.sum() |
| loss.backward() |
| assert x.grad is not None |
|
|
| test("CfC forward [B,L,D]", test_cfc_forward) |
| test("CfC backward (grad flow)", test_cfc_backward) |
| test("CfC Block 2D [B,C,H,W]", test_cfc_block_2d) |
| test("CfC Block backward", test_cfc_block_backward) |
|
|
| |
| |
| |
| print("\n=== 2. Mamba-2 SSD ===") |
|
|
| def test_mamba2_forward(): |
| from liquid_flow.mamba2_ssd import Mamba2SSD |
| ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE) |
| x = torch.randn(2, 256, 64, device=DEVICE) |
| out = ssd(x) |
| assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}" |
|
|
| def test_mamba2_backward(): |
| from liquid_flow.mamba2_ssd import Mamba2SSD |
| ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE) |
| x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True) |
| out = ssd(x) |
| loss = out.sum() |
| loss.backward() |
| assert x.grad is not None, "No gradient on input" |
| assert not torch.isnan(x.grad).any(), "NaN in gradients" |
|
|
| def test_mamba2_block_2d(): |
| from liquid_flow.mamba2_ssd import Mamba2Block |
| block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE) |
| x = torch.randn(2, 64, 16, 16, device=DEVICE) |
| out = block(x) |
| assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}" |
|
|
| def test_mamba2_block_backward(): |
| from liquid_flow.mamba2_ssd import Mamba2Block |
| block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE) |
| x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True) |
| out = block(x) |
| loss = out.sum() |
| loss.backward() |
| assert x.grad is not None |
|
|
| def test_mamba2_odd_length(): |
| """Test with non-power-of-2 sequence length.""" |
| from liquid_flow.mamba2_ssd import Mamba2SSD |
| ssd = Mamba2SSD(dim=64, d_state=8, expand=2, chunk_size=16).to(DEVICE) |
| x = torch.randn(2, 253, 64, device=DEVICE) |
| out = ssd(x) |
| assert out.shape == (2, 253, 64), f"Expected (2,253,64), got {out.shape}" |
|
|
| test("Mamba2 SSD forward", test_mamba2_forward) |
| test("Mamba2 SSD backward (no in-place crash)", test_mamba2_backward) |
| test("Mamba2 Block 2D", test_mamba2_block_2d) |
| test("Mamba2 Block backward", test_mamba2_block_backward) |
| test("Mamba2 odd sequence length", test_mamba2_odd_length) |
|
|
| |
| |
| |
| print("\n=== 3. LiquidMamba Block ===") |
|
|
| def test_liquid_mamba_forward(): |
| from liquid_flow.liquid_flow_block import LiquidMambaBlock |
| block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE) |
| x = torch.randn(2, 64, 16, 16, device=DEVICE) |
| out = block(x) |
| assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}" |
|
|
| def test_liquid_mamba_backward(): |
| from liquid_flow.liquid_flow_block import LiquidMambaBlock |
| block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE) |
| x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True) |
| out = block(x) |
| loss = out.mean() |
| loss.backward() |
| assert x.grad is not None |
| assert not torch.isnan(x.grad).any() |
|
|
| test("LiquidMamba forward", test_liquid_mamba_forward) |
| test("LiquidMamba backward", test_liquid_mamba_backward) |
|
|
| |
| |
| |
| print("\n=== 4. LiquidFlow Backbone ===") |
|
|
| def test_backbone_forward(): |
| from liquid_flow.liquid_flow_block import LiquidFlowBackbone |
| model = LiquidFlowBackbone( |
| in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8 |
| ).to(DEVICE) |
| x = torch.randn(2, 4, 16, 16, device=DEVICE) |
| t = torch.tensor([100, 500], device=DEVICE) |
| out = model(x, t) |
| assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}" |
|
|
| def test_backbone_backward(): |
| from liquid_flow.liquid_flow_block import LiquidFlowBackbone |
| model = LiquidFlowBackbone( |
| in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8 |
| ).to(DEVICE) |
| x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True) |
| t = torch.tensor([100, 500], device=DEVICE) |
| out = model(x, t) |
| loss = F.mse_loss(out, torch.randn_like(out)) |
| loss.backward() |
| assert x.grad is not None |
| |
| grads_ok = sum(1 for p in model.parameters() if p.grad is not None and not torch.isnan(p.grad).any()) |
| total_params = sum(1 for p in model.parameters() if p.requires_grad) |
| assert grads_ok == total_params, f"Only {grads_ok}/{total_params} params have valid gradients" |
|
|
| def test_backbone_512(): |
| """Test with 512px image (latent = 64×64).""" |
| from liquid_flow.liquid_flow_block import LiquidFlowBackbone |
| model = LiquidFlowBackbone( |
| in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=1, d_state=8 |
| ).to(DEVICE) |
| x = torch.randn(1, 4, 64, 64, device=DEVICE) |
| t = torch.tensor([500], device=DEVICE) |
| out = model(x, t) |
| assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}" |
|
|
| test("Backbone forward (128px)", test_backbone_forward) |
| test("Backbone backward (all grads valid)", test_backbone_backward) |
| test("Backbone 512px (64×64 latent)", test_backbone_512) |
|
|
| |
| |
| |
| print("\n=== 5. Generator + Training ===") |
|
|
| def test_generator_forward(): |
| from liquid_flow.generator import create_liquidflow |
| model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) |
| x = torch.randn(2, 4, 16, 16, device=DEVICE) |
| t = torch.tensor([100, 500], device=DEVICE) |
| out = model(x, t) |
| assert out.shape == x.shape |
|
|
| def test_training_step(): |
| """Full training step: forward + loss + backward + optimizer step.""" |
| from liquid_flow.generator import create_liquidflow |
| model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) |
| optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) |
| |
| x0 = torch.randn(4, 4, 16, 16, device=DEVICE) |
| loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False) |
| |
| assert 'total' in loss_dict |
| assert 'diffusion' in loss_dict |
| assert 'physics' in loss_dict |
| assert loss_dict['total'] > 0 |
| assert not any(v != v for v in loss_dict.values()), "NaN in losses" |
|
|
| def test_training_step_multiple(): |
| """Multiple training steps to verify no accumulation/state bugs.""" |
| from liquid_flow.generator import create_liquidflow |
| model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) |
| optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) |
| |
| losses = [] |
| for _ in range(5): |
| x0 = torch.randn(4, 4, 16, 16, device=DEVICE) |
| loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False) |
| losses.append(loss_dict['total']) |
| assert not (loss_dict['total'] != loss_dict['total']), "NaN loss" |
| |
| |
| assert all(l < 100 for l in losses), f"Loss explosion: {losses}" |
|
|
| def test_sampling(): |
| """Test DDIM sampling produces correct output.""" |
| from liquid_flow.generator import create_liquidflow |
| model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) |
| model.eval() |
| |
| with torch.no_grad(): |
| samples = model.sample(batch_size=2, steps=5, ddim=True, progress=False) |
| |
| assert samples.shape == (2, 4, 16, 16), f"Expected (2,4,16,16), got {samples.shape}" |
| assert not torch.isnan(samples).any(), "NaN in samples" |
|
|
| test("Generator forward", test_generator_forward) |
| test("Full training step (fwd+bwd+optim)", test_training_step) |
| test("5 training steps (no explosion)", test_training_step_multiple) |
| test("DDIM sampling", test_sampling) |
|
|
| |
| |
| |
| print("\n=== 6. Physics Loss ===") |
|
|
| def test_physics_loss(): |
| from liquid_flow.physics_loss import PhysicsRegularizer |
| phys = PhysicsRegularizer().to(DEVICE) |
| phys.train() |
| x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True) |
| total, losses = phys(x) |
| assert total.requires_grad, "Physics loss not differentiable" |
| total.backward() |
| assert x.grad is not None |
|
|
| def test_ddim_estimator(): |
| from liquid_flow.physics_loss import DDIMEstimator |
| x_t = torch.randn(2, 4, 16, 16, device=DEVICE) |
| eps = torch.randn(2, 4, 16, 16, device=DEVICE) |
| alpha_bar = torch.tensor([0.9, 0.5], device=DEVICE) |
| x0 = DDIMEstimator.estimate_x0(x_t, eps, alpha_bar) |
| assert x0.shape == x_t.shape |
| assert not torch.isnan(x0).any() |
|
|
| test("Physics loss (differentiable)", test_physics_loss) |
| test("DDIM estimator", test_ddim_estimator) |
|
|
| |
| |
| |
| print("\n=== 7. Performance ===") |
|
|
| def test_speed(): |
| """Measure forward+backward time for one batch.""" |
| from liquid_flow.generator import create_liquidflow |
| model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) |
| model.train() |
| |
| x = torch.randn(4, 4, 16, 16, device=DEVICE, requires_grad=True) |
| t = torch.randint(0, 1000, (4,), device=DEVICE) |
| |
| |
| out = model(x, t) |
| loss = out.sum() |
| loss.backward() |
| |
| if DEVICE == 'cuda': |
| torch.cuda.synchronize() |
| |
| |
| start = time.time() |
| for _ in range(5): |
| out = model(x, t) |
| loss = out.sum() |
| loss.backward() |
| if DEVICE == 'cuda': |
| torch.cuda.synchronize() |
| elapsed = (time.time() - start) / 5 |
| |
| print(f" → Forward+backward: {elapsed*1000:.1f} ms/batch (tiny, bs=4, 16×16)") |
| assert elapsed < 60, f"Too slow: {elapsed:.1f}s per step" |
|
|
| def test_param_count(): |
| from liquid_flow.generator import create_liquidflow |
| for variant in ['tiny', 'small', 'base']: |
| model = create_liquidflow(variant=variant, image_size=128) |
| n = sum(p.numel() for p in model.parameters()) |
| print(f" → {variant}: {n:,} params ({n/1e6:.1f}M)") |
|
|
| test("Speed (< 60s per step)", test_speed) |
| test("Param counts", test_param_count) |
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| total = passed + len(errors) |
| print(f"Results: {passed}/{total} tests passed") |
|
|
| if errors: |
| print(f"\n{'='*70}") |
| print("FAILURES:") |
| for e in errors: |
| print(f" {e}") |
| print(f"{'='*70}") |
| sys.exit(1) |
| else: |
| print("ALL TESTS PASSED ✓") |
| print("Model is GPU-trainable, no sequential bottlenecks, gradients flow correctly.") |
| print("=" * 70) |
|
|