""" Comprehensive verification test for LiquidFlow. Tests: syntax, imports, forward pass, backward pass, dimension correctness, gradient flow, training step, and performance. Run: python test_verify.py """ import sys import os import time import traceback sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import torch import torch.nn as nn import torch.nn.functional as F DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"Device: {DEVICE}") print(f"PyTorch: {torch.__version__}") if DEVICE == 'cuda': print(f"GPU: {torch.cuda.get_device_name(0)}") print("=" * 70) errors = [] passed = 0 def test(name, fn): global passed, errors try: fn() print(f" ✓ {name}") passed += 1 except Exception as e: msg = f" ✗ {name}: {e}" print(msg) traceback.print_exc() errors.append(msg) # ============================================================ # TEST 1: CfC Cell # ============================================================ print("\n=== 1. CfC Cell ===") def test_cfc_forward(): from liquid_flow.cfc_cell import CfCCell cell = CfCCell(dim=64).to(DEVICE) x = torch.randn(2, 256, 64, device=DEVICE) out = cell(x) assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}" def test_cfc_backward(): from liquid_flow.cfc_cell import CfCCell cell = CfCCell(dim=64).to(DEVICE) x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True) out = cell(x) loss = out.sum() loss.backward() assert x.grad is not None, "No gradient on input" assert not torch.isnan(x.grad).any(), "NaN in gradients" def test_cfc_block_2d(): from liquid_flow.cfc_cell import CfCBlock block = CfCBlock(dim=64).to(DEVICE) x = torch.randn(2, 64, 16, 16, device=DEVICE) out = block(x) assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}" def test_cfc_block_backward(): from liquid_flow.cfc_cell import CfCBlock block = CfCBlock(dim=64).to(DEVICE) x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True) out = block(x) loss = out.sum() loss.backward() assert x.grad is not None test("CfC forward [B,L,D]", test_cfc_forward) test("CfC backward (grad flow)", test_cfc_backward) test("CfC Block 2D [B,C,H,W]", test_cfc_block_2d) test("CfC Block backward", test_cfc_block_backward) # ============================================================ # TEST 2: Mamba-2 SSD # ============================================================ print("\n=== 2. Mamba-2 SSD ===") def test_mamba2_forward(): from liquid_flow.mamba2_ssd import Mamba2SSD ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE) x = torch.randn(2, 256, 64, device=DEVICE) out = ssd(x) assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}" def test_mamba2_backward(): from liquid_flow.mamba2_ssd import Mamba2SSD ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE) x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True) out = ssd(x) loss = out.sum() loss.backward() assert x.grad is not None, "No gradient on input" assert not torch.isnan(x.grad).any(), "NaN in gradients" def test_mamba2_block_2d(): from liquid_flow.mamba2_ssd import Mamba2Block block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE) x = torch.randn(2, 64, 16, 16, device=DEVICE) out = block(x) assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}" def test_mamba2_block_backward(): from liquid_flow.mamba2_ssd import Mamba2Block block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE) x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True) out = block(x) loss = out.sum() loss.backward() assert x.grad is not None def test_mamba2_odd_length(): """Test with non-power-of-2 sequence length.""" from liquid_flow.mamba2_ssd import Mamba2SSD ssd = Mamba2SSD(dim=64, d_state=8, expand=2, chunk_size=16).to(DEVICE) x = torch.randn(2, 253, 64, device=DEVICE) # Odd length out = ssd(x) assert out.shape == (2, 253, 64), f"Expected (2,253,64), got {out.shape}" test("Mamba2 SSD forward", test_mamba2_forward) test("Mamba2 SSD backward (no in-place crash)", test_mamba2_backward) test("Mamba2 Block 2D", test_mamba2_block_2d) test("Mamba2 Block backward", test_mamba2_block_backward) test("Mamba2 odd sequence length", test_mamba2_odd_length) # ============================================================ # TEST 3: LiquidMamba Block # ============================================================ print("\n=== 3. LiquidMamba Block ===") def test_liquid_mamba_forward(): from liquid_flow.liquid_flow_block import LiquidMambaBlock block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE) x = torch.randn(2, 64, 16, 16, device=DEVICE) out = block(x) assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}" def test_liquid_mamba_backward(): from liquid_flow.liquid_flow_block import LiquidMambaBlock block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE) x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True) out = block(x) loss = out.mean() loss.backward() assert x.grad is not None assert not torch.isnan(x.grad).any() test("LiquidMamba forward", test_liquid_mamba_forward) test("LiquidMamba backward", test_liquid_mamba_backward) # ============================================================ # TEST 4: Full Backbone # ============================================================ print("\n=== 4. LiquidFlow Backbone ===") def test_backbone_forward(): from liquid_flow.liquid_flow_block import LiquidFlowBackbone model = LiquidFlowBackbone( in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8 ).to(DEVICE) x = torch.randn(2, 4, 16, 16, device=DEVICE) # 128px latent t = torch.tensor([100, 500], device=DEVICE) out = model(x, t) assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}" def test_backbone_backward(): from liquid_flow.liquid_flow_block import LiquidFlowBackbone model = LiquidFlowBackbone( in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8 ).to(DEVICE) x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True) t = torch.tensor([100, 500], device=DEVICE) out = model(x, t) loss = F.mse_loss(out, torch.randn_like(out)) loss.backward() assert x.grad is not None # Check model params have gradients grads_ok = sum(1 for p in model.parameters() if p.grad is not None and not torch.isnan(p.grad).any()) total_params = sum(1 for p in model.parameters() if p.requires_grad) assert grads_ok == total_params, f"Only {grads_ok}/{total_params} params have valid gradients" def test_backbone_512(): """Test with 512px image (latent = 64×64).""" from liquid_flow.liquid_flow_block import LiquidFlowBackbone model = LiquidFlowBackbone( in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=1, d_state=8 ).to(DEVICE) x = torch.randn(1, 4, 64, 64, device=DEVICE) # 512px latent t = torch.tensor([500], device=DEVICE) out = model(x, t) assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}" test("Backbone forward (128px)", test_backbone_forward) test("Backbone backward (all grads valid)", test_backbone_backward) test("Backbone 512px (64×64 latent)", test_backbone_512) # ============================================================ # TEST 5: Full Generator + Training Step # ============================================================ print("\n=== 5. Generator + Training ===") def test_generator_forward(): from liquid_flow.generator import create_liquidflow model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) x = torch.randn(2, 4, 16, 16, device=DEVICE) t = torch.tensor([100, 500], device=DEVICE) out = model(x, t) assert out.shape == x.shape def test_training_step(): """Full training step: forward + loss + backward + optimizer step.""" from liquid_flow.generator import create_liquidflow model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) x0 = torch.randn(4, 4, 16, 16, device=DEVICE) loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False) assert 'total' in loss_dict assert 'diffusion' in loss_dict assert 'physics' in loss_dict assert loss_dict['total'] > 0 assert not any(v != v for v in loss_dict.values()), "NaN in losses" # NaN check def test_training_step_multiple(): """Multiple training steps to verify no accumulation/state bugs.""" from liquid_flow.generator import create_liquidflow model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) losses = [] for _ in range(5): x0 = torch.randn(4, 4, 16, 16, device=DEVICE) loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False) losses.append(loss_dict['total']) assert not (loss_dict['total'] != loss_dict['total']), "NaN loss" # Losses should not explode assert all(l < 100 for l in losses), f"Loss explosion: {losses}" def test_sampling(): """Test DDIM sampling produces correct output.""" from liquid_flow.generator import create_liquidflow model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) model.eval() with torch.no_grad(): samples = model.sample(batch_size=2, steps=5, ddim=True, progress=False) assert samples.shape == (2, 4, 16, 16), f"Expected (2,4,16,16), got {samples.shape}" assert not torch.isnan(samples).any(), "NaN in samples" test("Generator forward", test_generator_forward) test("Full training step (fwd+bwd+optim)", test_training_step) test("5 training steps (no explosion)", test_training_step_multiple) test("DDIM sampling", test_sampling) # ============================================================ # TEST 6: Physics Loss # ============================================================ print("\n=== 6. Physics Loss ===") def test_physics_loss(): from liquid_flow.physics_loss import PhysicsRegularizer phys = PhysicsRegularizer().to(DEVICE) phys.train() x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True) total, losses = phys(x) assert total.requires_grad, "Physics loss not differentiable" total.backward() assert x.grad is not None def test_ddim_estimator(): from liquid_flow.physics_loss import DDIMEstimator x_t = torch.randn(2, 4, 16, 16, device=DEVICE) eps = torch.randn(2, 4, 16, 16, device=DEVICE) alpha_bar = torch.tensor([0.9, 0.5], device=DEVICE) x0 = DDIMEstimator.estimate_x0(x_t, eps, alpha_bar) assert x0.shape == x_t.shape assert not torch.isnan(x0).any() test("Physics loss (differentiable)", test_physics_loss) test("DDIM estimator", test_ddim_estimator) # ============================================================ # TEST 7: Performance / Speed # ============================================================ print("\n=== 7. Performance ===") def test_speed(): """Measure forward+backward time for one batch.""" from liquid_flow.generator import create_liquidflow model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE) model.train() x = torch.randn(4, 4, 16, 16, device=DEVICE, requires_grad=True) t = torch.randint(0, 1000, (4,), device=DEVICE) # Warmup out = model(x, t) loss = out.sum() loss.backward() if DEVICE == 'cuda': torch.cuda.synchronize() # Timed run start = time.time() for _ in range(5): out = model(x, t) loss = out.sum() loss.backward() if DEVICE == 'cuda': torch.cuda.synchronize() elapsed = (time.time() - start) / 5 print(f" → Forward+backward: {elapsed*1000:.1f} ms/batch (tiny, bs=4, 16×16)") assert elapsed < 60, f"Too slow: {elapsed:.1f}s per step" def test_param_count(): from liquid_flow.generator import create_liquidflow for variant in ['tiny', 'small', 'base']: model = create_liquidflow(variant=variant, image_size=128) n = sum(p.numel() for p in model.parameters()) print(f" → {variant}: {n:,} params ({n/1e6:.1f}M)") test("Speed (< 60s per step)", test_speed) test("Param counts", test_param_count) # ============================================================ # SUMMARY # ============================================================ print("\n" + "=" * 70) total = passed + len(errors) print(f"Results: {passed}/{total} tests passed") if errors: print(f"\n{'='*70}") print("FAILURES:") for e in errors: print(f" {e}") print(f"{'='*70}") sys.exit(1) else: print("ALL TESTS PASSED ✓") print("Model is GPU-trainable, no sequential bottlenecks, gradients flow correctly.") print("=" * 70)