LiquidFlow-Gen / test_verify.py
krystv's picture
Upload test_verify.py
f143208 verified
"""
Comprehensive verification test for LiquidFlow.
Tests: syntax, imports, forward pass, backward pass, dimension correctness,
gradient flow, training step, and performance.
Run: python test_verify.py
"""
import sys
import os
import time
import traceback
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import torch
import torch.nn as nn
import torch.nn.functional as F
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Device: {DEVICE}")
print(f"PyTorch: {torch.__version__}")
if DEVICE == 'cuda':
print(f"GPU: {torch.cuda.get_device_name(0)}")
print("=" * 70)
errors = []
passed = 0
def test(name, fn):
global passed, errors
try:
fn()
print(f" ✓ {name}")
passed += 1
except Exception as e:
msg = f" ✗ {name}: {e}"
print(msg)
traceback.print_exc()
errors.append(msg)
# ============================================================
# TEST 1: CfC Cell
# ============================================================
print("\n=== 1. CfC Cell ===")
def test_cfc_forward():
from liquid_flow.cfc_cell import CfCCell
cell = CfCCell(dim=64).to(DEVICE)
x = torch.randn(2, 256, 64, device=DEVICE)
out = cell(x)
assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}"
def test_cfc_backward():
from liquid_flow.cfc_cell import CfCCell
cell = CfCCell(dim=64).to(DEVICE)
x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True)
out = cell(x)
loss = out.sum()
loss.backward()
assert x.grad is not None, "No gradient on input"
assert not torch.isnan(x.grad).any(), "NaN in gradients"
def test_cfc_block_2d():
from liquid_flow.cfc_cell import CfCBlock
block = CfCBlock(dim=64).to(DEVICE)
x = torch.randn(2, 64, 16, 16, device=DEVICE)
out = block(x)
assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}"
def test_cfc_block_backward():
from liquid_flow.cfc_cell import CfCBlock
block = CfCBlock(dim=64).to(DEVICE)
x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True)
out = block(x)
loss = out.sum()
loss.backward()
assert x.grad is not None
test("CfC forward [B,L,D]", test_cfc_forward)
test("CfC backward (grad flow)", test_cfc_backward)
test("CfC Block 2D [B,C,H,W]", test_cfc_block_2d)
test("CfC Block backward", test_cfc_block_backward)
# ============================================================
# TEST 2: Mamba-2 SSD
# ============================================================
print("\n=== 2. Mamba-2 SSD ===")
def test_mamba2_forward():
from liquid_flow.mamba2_ssd import Mamba2SSD
ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE)
x = torch.randn(2, 256, 64, device=DEVICE)
out = ssd(x)
assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}"
def test_mamba2_backward():
from liquid_flow.mamba2_ssd import Mamba2SSD
ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE)
x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True)
out = ssd(x)
loss = out.sum()
loss.backward()
assert x.grad is not None, "No gradient on input"
assert not torch.isnan(x.grad).any(), "NaN in gradients"
def test_mamba2_block_2d():
from liquid_flow.mamba2_ssd import Mamba2Block
block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE)
x = torch.randn(2, 64, 16, 16, device=DEVICE)
out = block(x)
assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}"
def test_mamba2_block_backward():
from liquid_flow.mamba2_ssd import Mamba2Block
block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE)
x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True)
out = block(x)
loss = out.sum()
loss.backward()
assert x.grad is not None
def test_mamba2_odd_length():
"""Test with non-power-of-2 sequence length."""
from liquid_flow.mamba2_ssd import Mamba2SSD
ssd = Mamba2SSD(dim=64, d_state=8, expand=2, chunk_size=16).to(DEVICE)
x = torch.randn(2, 253, 64, device=DEVICE) # Odd length
out = ssd(x)
assert out.shape == (2, 253, 64), f"Expected (2,253,64), got {out.shape}"
test("Mamba2 SSD forward", test_mamba2_forward)
test("Mamba2 SSD backward (no in-place crash)", test_mamba2_backward)
test("Mamba2 Block 2D", test_mamba2_block_2d)
test("Mamba2 Block backward", test_mamba2_block_backward)
test("Mamba2 odd sequence length", test_mamba2_odd_length)
# ============================================================
# TEST 3: LiquidMamba Block
# ============================================================
print("\n=== 3. LiquidMamba Block ===")
def test_liquid_mamba_forward():
from liquid_flow.liquid_flow_block import LiquidMambaBlock
block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE)
x = torch.randn(2, 64, 16, 16, device=DEVICE)
out = block(x)
assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}"
def test_liquid_mamba_backward():
from liquid_flow.liquid_flow_block import LiquidMambaBlock
block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE)
x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True)
out = block(x)
loss = out.mean()
loss.backward()
assert x.grad is not None
assert not torch.isnan(x.grad).any()
test("LiquidMamba forward", test_liquid_mamba_forward)
test("LiquidMamba backward", test_liquid_mamba_backward)
# ============================================================
# TEST 4: Full Backbone
# ============================================================
print("\n=== 4. LiquidFlow Backbone ===")
def test_backbone_forward():
from liquid_flow.liquid_flow_block import LiquidFlowBackbone
model = LiquidFlowBackbone(
in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8
).to(DEVICE)
x = torch.randn(2, 4, 16, 16, device=DEVICE) # 128px latent
t = torch.tensor([100, 500], device=DEVICE)
out = model(x, t)
assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}"
def test_backbone_backward():
from liquid_flow.liquid_flow_block import LiquidFlowBackbone
model = LiquidFlowBackbone(
in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8
).to(DEVICE)
x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True)
t = torch.tensor([100, 500], device=DEVICE)
out = model(x, t)
loss = F.mse_loss(out, torch.randn_like(out))
loss.backward()
assert x.grad is not None
# Check model params have gradients
grads_ok = sum(1 for p in model.parameters() if p.grad is not None and not torch.isnan(p.grad).any())
total_params = sum(1 for p in model.parameters() if p.requires_grad)
assert grads_ok == total_params, f"Only {grads_ok}/{total_params} params have valid gradients"
def test_backbone_512():
"""Test with 512px image (latent = 64×64)."""
from liquid_flow.liquid_flow_block import LiquidFlowBackbone
model = LiquidFlowBackbone(
in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=1, d_state=8
).to(DEVICE)
x = torch.randn(1, 4, 64, 64, device=DEVICE) # 512px latent
t = torch.tensor([500], device=DEVICE)
out = model(x, t)
assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}"
test("Backbone forward (128px)", test_backbone_forward)
test("Backbone backward (all grads valid)", test_backbone_backward)
test("Backbone 512px (64×64 latent)", test_backbone_512)
# ============================================================
# TEST 5: Full Generator + Training Step
# ============================================================
print("\n=== 5. Generator + Training ===")
def test_generator_forward():
from liquid_flow.generator import create_liquidflow
model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
x = torch.randn(2, 4, 16, 16, device=DEVICE)
t = torch.tensor([100, 500], device=DEVICE)
out = model(x, t)
assert out.shape == x.shape
def test_training_step():
"""Full training step: forward + loss + backward + optimizer step."""
from liquid_flow.generator import create_liquidflow
model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
x0 = torch.randn(4, 4, 16, 16, device=DEVICE)
loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False)
assert 'total' in loss_dict
assert 'diffusion' in loss_dict
assert 'physics' in loss_dict
assert loss_dict['total'] > 0
assert not any(v != v for v in loss_dict.values()), "NaN in losses" # NaN check
def test_training_step_multiple():
"""Multiple training steps to verify no accumulation/state bugs."""
from liquid_flow.generator import create_liquidflow
model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
losses = []
for _ in range(5):
x0 = torch.randn(4, 4, 16, 16, device=DEVICE)
loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False)
losses.append(loss_dict['total'])
assert not (loss_dict['total'] != loss_dict['total']), "NaN loss"
# Losses should not explode
assert all(l < 100 for l in losses), f"Loss explosion: {losses}"
def test_sampling():
"""Test DDIM sampling produces correct output."""
from liquid_flow.generator import create_liquidflow
model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
model.eval()
with torch.no_grad():
samples = model.sample(batch_size=2, steps=5, ddim=True, progress=False)
assert samples.shape == (2, 4, 16, 16), f"Expected (2,4,16,16), got {samples.shape}"
assert not torch.isnan(samples).any(), "NaN in samples"
test("Generator forward", test_generator_forward)
test("Full training step (fwd+bwd+optim)", test_training_step)
test("5 training steps (no explosion)", test_training_step_multiple)
test("DDIM sampling", test_sampling)
# ============================================================
# TEST 6: Physics Loss
# ============================================================
print("\n=== 6. Physics Loss ===")
def test_physics_loss():
from liquid_flow.physics_loss import PhysicsRegularizer
phys = PhysicsRegularizer().to(DEVICE)
phys.train()
x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True)
total, losses = phys(x)
assert total.requires_grad, "Physics loss not differentiable"
total.backward()
assert x.grad is not None
def test_ddim_estimator():
from liquid_flow.physics_loss import DDIMEstimator
x_t = torch.randn(2, 4, 16, 16, device=DEVICE)
eps = torch.randn(2, 4, 16, 16, device=DEVICE)
alpha_bar = torch.tensor([0.9, 0.5], device=DEVICE)
x0 = DDIMEstimator.estimate_x0(x_t, eps, alpha_bar)
assert x0.shape == x_t.shape
assert not torch.isnan(x0).any()
test("Physics loss (differentiable)", test_physics_loss)
test("DDIM estimator", test_ddim_estimator)
# ============================================================
# TEST 7: Performance / Speed
# ============================================================
print("\n=== 7. Performance ===")
def test_speed():
"""Measure forward+backward time for one batch."""
from liquid_flow.generator import create_liquidflow
model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
model.train()
x = torch.randn(4, 4, 16, 16, device=DEVICE, requires_grad=True)
t = torch.randint(0, 1000, (4,), device=DEVICE)
# Warmup
out = model(x, t)
loss = out.sum()
loss.backward()
if DEVICE == 'cuda':
torch.cuda.synchronize()
# Timed run
start = time.time()
for _ in range(5):
out = model(x, t)
loss = out.sum()
loss.backward()
if DEVICE == 'cuda':
torch.cuda.synchronize()
elapsed = (time.time() - start) / 5
print(f" → Forward+backward: {elapsed*1000:.1f} ms/batch (tiny, bs=4, 16×16)")
assert elapsed < 60, f"Too slow: {elapsed:.1f}s per step"
def test_param_count():
from liquid_flow.generator import create_liquidflow
for variant in ['tiny', 'small', 'base']:
model = create_liquidflow(variant=variant, image_size=128)
n = sum(p.numel() for p in model.parameters())
print(f" → {variant}: {n:,} params ({n/1e6:.1f}M)")
test("Speed (< 60s per step)", test_speed)
test("Param counts", test_param_count)
# ============================================================
# SUMMARY
# ============================================================
print("\n" + "=" * 70)
total = passed + len(errors)
print(f"Results: {passed}/{total} tests passed")
if errors:
print(f"\n{'='*70}")
print("FAILURES:")
for e in errors:
print(f" {e}")
print(f"{'='*70}")
sys.exit(1)
else:
print("ALL TESTS PASSED ✓")
print("Model is GPU-trainable, no sequential bottlenecks, gradients flow correctly.")
print("=" * 70)