File size: 13,179 Bytes
f143208
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
"""
Comprehensive verification test for LiquidFlow.
Tests: syntax, imports, forward pass, backward pass, dimension correctness,
gradient flow, training step, and performance.

Run: python test_verify.py
"""
import sys
import os
import time
import traceback

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

import torch
import torch.nn as nn
import torch.nn.functional as F

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Device: {DEVICE}")
print(f"PyTorch: {torch.__version__}")
if DEVICE == 'cuda':
    print(f"GPU: {torch.cuda.get_device_name(0)}")
print("=" * 70)

errors = []
passed = 0

def test(name, fn):
    global passed, errors
    try:
        fn()
        print(f"  ✓ {name}")
        passed += 1
    except Exception as e:
        msg = f"  ✗ {name}: {e}"
        print(msg)
        traceback.print_exc()
        errors.append(msg)

# ============================================================
# TEST 1: CfC Cell
# ============================================================
print("\n=== 1. CfC Cell ===")

def test_cfc_forward():
    from liquid_flow.cfc_cell import CfCCell
    cell = CfCCell(dim=64).to(DEVICE)
    x = torch.randn(2, 256, 64, device=DEVICE)
    out = cell(x)
    assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}"

def test_cfc_backward():
    from liquid_flow.cfc_cell import CfCCell
    cell = CfCCell(dim=64).to(DEVICE)
    x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True)
    out = cell(x)
    loss = out.sum()
    loss.backward()
    assert x.grad is not None, "No gradient on input"
    assert not torch.isnan(x.grad).any(), "NaN in gradients"

def test_cfc_block_2d():
    from liquid_flow.cfc_cell import CfCBlock
    block = CfCBlock(dim=64).to(DEVICE)
    x = torch.randn(2, 64, 16, 16, device=DEVICE)
    out = block(x)
    assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}"

def test_cfc_block_backward():
    from liquid_flow.cfc_cell import CfCBlock
    block = CfCBlock(dim=64).to(DEVICE)
    x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True)
    out = block(x)
    loss = out.sum()
    loss.backward()
    assert x.grad is not None

test("CfC forward [B,L,D]", test_cfc_forward)
test("CfC backward (grad flow)", test_cfc_backward)
test("CfC Block 2D [B,C,H,W]", test_cfc_block_2d)
test("CfC Block backward", test_cfc_block_backward)

# ============================================================
# TEST 2: Mamba-2 SSD
# ============================================================
print("\n=== 2. Mamba-2 SSD ===")

def test_mamba2_forward():
    from liquid_flow.mamba2_ssd import Mamba2SSD
    ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE)
    x = torch.randn(2, 256, 64, device=DEVICE)
    out = ssd(x)
    assert out.shape == (2, 256, 64), f"Expected (2,256,64), got {out.shape}"

def test_mamba2_backward():
    from liquid_flow.mamba2_ssd import Mamba2SSD
    ssd = Mamba2SSD(dim=64, d_state=8, expand=2).to(DEVICE)
    x = torch.randn(2, 256, 64, device=DEVICE, requires_grad=True)
    out = ssd(x)
    loss = out.sum()
    loss.backward()
    assert x.grad is not None, "No gradient on input"
    assert not torch.isnan(x.grad).any(), "NaN in gradients"

def test_mamba2_block_2d():
    from liquid_flow.mamba2_ssd import Mamba2Block
    block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE)
    x = torch.randn(2, 64, 16, 16, device=DEVICE)
    out = block(x)
    assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}"

def test_mamba2_block_backward():
    from liquid_flow.mamba2_ssd import Mamba2Block
    block = Mamba2Block(dim=64, d_state=8, expand=2).to(DEVICE)
    x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True)
    out = block(x)
    loss = out.sum()
    loss.backward()
    assert x.grad is not None

def test_mamba2_odd_length():
    """Test with non-power-of-2 sequence length."""
    from liquid_flow.mamba2_ssd import Mamba2SSD
    ssd = Mamba2SSD(dim=64, d_state=8, expand=2, chunk_size=16).to(DEVICE)
    x = torch.randn(2, 253, 64, device=DEVICE)  # Odd length
    out = ssd(x)
    assert out.shape == (2, 253, 64), f"Expected (2,253,64), got {out.shape}"

test("Mamba2 SSD forward", test_mamba2_forward)
test("Mamba2 SSD backward (no in-place crash)", test_mamba2_backward)
test("Mamba2 Block 2D", test_mamba2_block_2d)
test("Mamba2 Block backward", test_mamba2_block_backward)
test("Mamba2 odd sequence length", test_mamba2_odd_length)

# ============================================================
# TEST 3: LiquidMamba Block
# ============================================================
print("\n=== 3. LiquidMamba Block ===")

def test_liquid_mamba_forward():
    from liquid_flow.liquid_flow_block import LiquidMambaBlock
    block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE)
    x = torch.randn(2, 64, 16, 16, device=DEVICE)
    out = block(x)
    assert out.shape == (2, 64, 16, 16), f"Expected (2,64,16,16), got {out.shape}"

def test_liquid_mamba_backward():
    from liquid_flow.liquid_flow_block import LiquidMambaBlock
    block = LiquidMambaBlock(dim=64, d_state=8, expand=2).to(DEVICE)
    x = torch.randn(2, 64, 16, 16, device=DEVICE, requires_grad=True)
    out = block(x)
    loss = out.mean()
    loss.backward()
    assert x.grad is not None
    assert not torch.isnan(x.grad).any()

test("LiquidMamba forward", test_liquid_mamba_forward)
test("LiquidMamba backward", test_liquid_mamba_backward)

# ============================================================
# TEST 4: Full Backbone
# ============================================================
print("\n=== 4. LiquidFlow Backbone ===")

def test_backbone_forward():
    from liquid_flow.liquid_flow_block import LiquidFlowBackbone
    model = LiquidFlowBackbone(
        in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8
    ).to(DEVICE)
    x = torch.randn(2, 4, 16, 16, device=DEVICE)  # 128px latent
    t = torch.tensor([100, 500], device=DEVICE)
    out = model(x, t)
    assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}"

def test_backbone_backward():
    from liquid_flow.liquid_flow_block import LiquidFlowBackbone
    model = LiquidFlowBackbone(
        in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=2, d_state=8
    ).to(DEVICE)
    x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True)
    t = torch.tensor([100, 500], device=DEVICE)
    out = model(x, t)
    loss = F.mse_loss(out, torch.randn_like(out))
    loss.backward()
    assert x.grad is not None
    # Check model params have gradients
    grads_ok = sum(1 for p in model.parameters() if p.grad is not None and not torch.isnan(p.grad).any())
    total_params = sum(1 for p in model.parameters() if p.requires_grad)
    assert grads_ok == total_params, f"Only {grads_ok}/{total_params} params have valid gradients"

def test_backbone_512():
    """Test with 512px image (latent = 64×64)."""
    from liquid_flow.liquid_flow_block import LiquidFlowBackbone
    model = LiquidFlowBackbone(
        in_channels=4, hidden_dim=64, num_stages=2, blocks_per_stage=1, d_state=8
    ).to(DEVICE)
    x = torch.randn(1, 4, 64, 64, device=DEVICE)  # 512px latent
    t = torch.tensor([500], device=DEVICE)
    out = model(x, t)
    assert out.shape == x.shape, f"Expected {x.shape}, got {out.shape}"

test("Backbone forward (128px)", test_backbone_forward)
test("Backbone backward (all grads valid)", test_backbone_backward)
test("Backbone 512px (64×64 latent)", test_backbone_512)

# ============================================================
# TEST 5: Full Generator + Training Step
# ============================================================
print("\n=== 5. Generator + Training ===")

def test_generator_forward():
    from liquid_flow.generator import create_liquidflow
    model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
    x = torch.randn(2, 4, 16, 16, device=DEVICE)
    t = torch.tensor([100, 500], device=DEVICE)
    out = model(x, t)
    assert out.shape == x.shape

def test_training_step():
    """Full training step: forward + loss + backward + optimizer step."""
    from liquid_flow.generator import create_liquidflow
    model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    
    x0 = torch.randn(4, 4, 16, 16, device=DEVICE)
    loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False)
    
    assert 'total' in loss_dict
    assert 'diffusion' in loss_dict
    assert 'physics' in loss_dict
    assert loss_dict['total'] > 0
    assert not any(v != v for v in loss_dict.values()), "NaN in losses"  # NaN check

def test_training_step_multiple():
    """Multiple training steps to verify no accumulation/state bugs."""
    from liquid_flow.generator import create_liquidflow
    model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    
    losses = []
    for _ in range(5):
        x0 = torch.randn(4, 4, 16, 16, device=DEVICE)
        loss_dict = model.training_step(x0, optimizer, scaler=None, use_amp=False)
        losses.append(loss_dict['total'])
        assert not (loss_dict['total'] != loss_dict['total']), "NaN loss"
    
    # Losses should not explode
    assert all(l < 100 for l in losses), f"Loss explosion: {losses}"

def test_sampling():
    """Test DDIM sampling produces correct output."""
    from liquid_flow.generator import create_liquidflow
    model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
    model.eval()
    
    with torch.no_grad():
        samples = model.sample(batch_size=2, steps=5, ddim=True, progress=False)
    
    assert samples.shape == (2, 4, 16, 16), f"Expected (2,4,16,16), got {samples.shape}"
    assert not torch.isnan(samples).any(), "NaN in samples"

test("Generator forward", test_generator_forward)
test("Full training step (fwd+bwd+optim)", test_training_step)
test("5 training steps (no explosion)", test_training_step_multiple)
test("DDIM sampling", test_sampling)

# ============================================================
# TEST 6: Physics Loss
# ============================================================
print("\n=== 6. Physics Loss ===")

def test_physics_loss():
    from liquid_flow.physics_loss import PhysicsRegularizer
    phys = PhysicsRegularizer().to(DEVICE)
    phys.train()
    x = torch.randn(2, 4, 16, 16, device=DEVICE, requires_grad=True)
    total, losses = phys(x)
    assert total.requires_grad, "Physics loss not differentiable"
    total.backward()
    assert x.grad is not None

def test_ddim_estimator():
    from liquid_flow.physics_loss import DDIMEstimator
    x_t = torch.randn(2, 4, 16, 16, device=DEVICE)
    eps = torch.randn(2, 4, 16, 16, device=DEVICE)
    alpha_bar = torch.tensor([0.9, 0.5], device=DEVICE)
    x0 = DDIMEstimator.estimate_x0(x_t, eps, alpha_bar)
    assert x0.shape == x_t.shape
    assert not torch.isnan(x0).any()

test("Physics loss (differentiable)", test_physics_loss)
test("DDIM estimator", test_ddim_estimator)

# ============================================================
# TEST 7: Performance / Speed
# ============================================================
print("\n=== 7. Performance ===")

def test_speed():
    """Measure forward+backward time for one batch."""
    from liquid_flow.generator import create_liquidflow
    model = create_liquidflow(variant='tiny', image_size=128).to(DEVICE)
    model.train()
    
    x = torch.randn(4, 4, 16, 16, device=DEVICE, requires_grad=True)
    t = torch.randint(0, 1000, (4,), device=DEVICE)
    
    # Warmup
    out = model(x, t)
    loss = out.sum()
    loss.backward()
    
    if DEVICE == 'cuda':
        torch.cuda.synchronize()
    
    # Timed run
    start = time.time()
    for _ in range(5):
        out = model(x, t)
        loss = out.sum()
        loss.backward()
        if DEVICE == 'cuda':
            torch.cuda.synchronize()
    elapsed = (time.time() - start) / 5
    
    print(f"       → Forward+backward: {elapsed*1000:.1f} ms/batch (tiny, bs=4, 16×16)")
    assert elapsed < 60, f"Too slow: {elapsed:.1f}s per step"

def test_param_count():
    from liquid_flow.generator import create_liquidflow
    for variant in ['tiny', 'small', 'base']:
        model = create_liquidflow(variant=variant, image_size=128)
        n = sum(p.numel() for p in model.parameters())
        print(f"       → {variant}: {n:,} params ({n/1e6:.1f}M)")

test("Speed (< 60s per step)", test_speed)
test("Param counts", test_param_count)

# ============================================================
# SUMMARY
# ============================================================
print("\n" + "=" * 70)
total = passed + len(errors)
print(f"Results: {passed}/{total} tests passed")

if errors:
    print(f"\n{'='*70}")
    print("FAILURES:")
    for e in errors:
        print(f"  {e}")
    print(f"{'='*70}")
    sys.exit(1)
else:
    print("ALL TESTS PASSED ✓")
    print("Model is GPU-trainable, no sequential bottlenecks, gradients flow correctly.")
    print("=" * 70)