File size: 11,502 Bytes
8589a61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
#!/usr/bin/env python3
"""
LiquidDiffusion β€” Self-Contained Validation Script

Run this to verify everything works before training:
    python validate.py

Tests:
1. Model construction at all scales
2. Forward pass at multiple resolutions  
3. Backward pass and gradient flow
4. 20-step training stability with random data
5. Sampling with Euler ODE
6. VRAM estimation
7. Full trainer pipeline
"""

import sys
import math
import time
import copy

print("=" * 70)
print("LiquidDiffusion Validation Suite")
print("=" * 70)

# Check imports
try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    print(f"βœ“ PyTorch {torch.__version__}")
except ImportError:
    print("βœ— PyTorch not installed. Run: pip install torch torchvision")
    sys.exit(1)

try:
    from torchvision.utils import save_image
    print("βœ“ torchvision")
except ImportError:
    print("βœ— torchvision not installed. Run: pip install torchvision")
    sys.exit(1)

# Import our modules
try:
    from liquid_diffusion.model import (
        LiquidDiffusionUNet, liquid_diffusion_tiny,
        liquid_diffusion_small, liquid_diffusion_base,
        SinusoidalTimeEmbedding, ParallelCfCBlock, AdaLN,
    )
    print("βœ“ liquid_diffusion.model")
except ImportError as e:
    print(f"βœ— Failed to import model: {e}")
    print("  Make sure you're in the liquid-diffusion directory")
    sys.exit(1)

try:
    from liquid_diffusion.trainer import RectifiedFlowTrainer, get_cosine_schedule_with_warmup
    print("βœ“ liquid_diffusion.trainer")
except ImportError as e:
    print(f"βœ— Failed to import trainer: {e}")
    sys.exit(1)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"\nDevice: {device}")
if device == 'cuda':
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"VRAM: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB")

all_passed = True
test_num = 0

def test(name):
    global test_num
    test_num += 1
    print(f"\n--- Test {test_num}: {name} ---")

def fail(msg):
    global all_passed
    all_passed = False
    print(f"  βœ— FAIL: {msg}")

def ok(msg):
    print(f"  βœ“ {msg}")


# =========================================================================
test("Model Construction & Parameter Count")
# =========================================================================
for name, factory in [("tiny", liquid_diffusion_tiny), ("small", liquid_diffusion_small), ("base", liquid_diffusion_base)]:
    try:
        m = factory()
        total, trainable = m.count_params()
        ok(f"{name:8s}: {total:>12,} params ({total/1e6:.1f}M)")
        del m
    except Exception as e:
        fail(f"{name}: {e}")

# =========================================================================
test("Forward Pass (multiple resolutions)")
# =========================================================================
model = liquid_diffusion_tiny()
for res in [32, 64, 128]:
    try:
        x = torch.randn(2, 3, res, res)
        t = torch.rand(2)
        out = model(x, t)
        assert out.shape == x.shape, f"Shape mismatch: {out.shape} vs {x.shape}"
        assert not torch.isnan(out).any(), "NaN in output"
        assert not torch.isinf(out).any(), "Inf in output"
        ok(f"{res}x{res}: output shape {out.shape}, range [{out.min():.4f}, {out.max():.4f}]")
    except Exception as e:
        fail(f"{res}x{res}: {e}")

# =========================================================================
test("Backward Pass (gradient flow)")
# =========================================================================
model = liquid_diffusion_tiny()
x = torch.randn(2, 3, 64, 64, requires_grad=False)
t = torch.rand(2)
out = model(x, t)
loss = out.mean()
loss.backward()

total_params = 0
params_with_grad = 0
nan_grads = 0
zero_grads = 0
for name_p, p in model.named_parameters():
    total_params += 1
    if p.grad is not None:
        params_with_grad += 1
        if torch.isnan(p.grad).any():
            nan_grads += 1
        if p.grad.abs().max() == 0:
            zero_grads += 1

if nan_grads > 0:
    fail(f"NaN gradients in {nan_grads}/{total_params} parameters")
elif params_with_grad == 0:
    fail("No parameters received gradients")
else:
    ok(f"{params_with_grad}/{total_params} params have gradients, {nan_grads} NaN, {zero_grads} zero-only")

# Check gradient magnitude distribution
grad_maxes = [p.grad.abs().max().item() for p in model.parameters() if p.grad is not None]
ok(f"Gradient |max| range: [{min(grad_maxes):.2e}, {max(grad_maxes):.2e}]")

# =========================================================================
test("Training Stability (20 steps, random data)")
# =========================================================================
model = liquid_diffusion_tiny()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)

losses = []
for step in range(20):
    model.train()
    x0 = torch.randn(4, 3, 64, 64)
    x1 = torch.randn_like(x0)
    t_val = torch.rand(4)
    t_expand = t_val[:, None, None, None]
    x_t = (1 - t_expand) * x0 + t_expand * x1
    v_target = x1 - x0
    
    v_pred = model(x_t, t_val)
    loss = F.mse_loss(v_pred, v_target)
    
    optimizer.zero_grad()
    loss.backward()
    gn = torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    optimizer.step()
    
    losses.append(loss.item())
    if step % 5 == 0:
        print(f"    Step {step:3d}: loss={loss.item():.4f}, grad_norm={gn.item():.4f}")

stable = all(not math.isnan(l) and not math.isinf(l) for l in losses)
not_exploding = max(losses) < 100

if stable:
    ok(f"No NaN/Inf in any of {len(losses)} steps")
else:
    fail("NaN or Inf detected in loss")

if not_exploding:
    ok(f"Loss range: [{min(losses):.4f}, {max(losses):.4f}]")
else:
    fail(f"Loss exploded: max={max(losses):.4f}")

# =========================================================================
test("Sampling (Euler ODE, 10 steps)")
# =========================================================================
model.eval()
with torch.no_grad():
    z = torch.randn(2, 3, 64, 64)
    num_steps = 10
    dt = 1.0 / num_steps
    for i in range(num_steps, 0, -1):
        t_s = torch.full((2,), i / num_steps)
        v = model(z, t_s)
        z = z - v * dt
    z = z.clamp(-1, 1)
    
    if torch.isnan(z).any():
        fail("NaN in generated samples")
    elif torch.isinf(z).any():
        fail("Inf in generated samples")
    else:
        ok(f"Shape: {z.shape}, range: [{z.min():.3f}, {z.max():.3f}], "
           f"mean: {z.mean():.4f}, std: {z.std():.4f}")

# =========================================================================
test("Timestep Sensitivity")
# =========================================================================
model.eval()
x = torch.randn(1, 3, 64, 64)
outputs = {}
for t_val in [0.01, 0.25, 0.5, 0.75, 0.99]:
    with torch.no_grad():
        out = model(x, torch.tensor([t_val]))
    outputs[t_val] = out
    print(f"    t={t_val:.2f}: mean={out.mean():.6f}, std={out.std():.6f}, |max|={out.abs().max():.6f}")

# Check that different timesteps give different outputs
diff_01_099 = (outputs[0.01] - outputs[0.99]).abs().mean().item()
if diff_01_099 > 1e-6:
    ok(f"Timestep affects output (mean diff t=0.01 vs t=0.99: {diff_01_099:.6f})")
else:
    fail(f"Timestep has no effect on output (diff={diff_01_099:.10f})")

# =========================================================================
test("Full Trainer Pipeline (CPU, 5 steps)")
# =========================================================================
model = liquid_diffusion_tiny()

trainer = RectifiedFlowTrainer(
    model=model,
    lr=1e-4,
    device='cpu',
    use_amp=False,  # CPU doesn't support AMP
    time_sampling='logit_normal',
)

for step in range(5):
    x0 = torch.randn(2, 3, 64, 64)
    metrics = trainer.train_step(x0)
    if step == 0:
        print(f"    Step {step}: loss={metrics['loss']:.4f}, grad_norm={metrics['grad_norm']:.4f}")

if math.isnan(metrics['loss']):
    fail("Trainer produced NaN loss")
else:
    ok(f"Trainer works: final loss={metrics['loss']:.4f}, step={trainer.step}")

# Test sampling
try:
    samples = trainer.sample(batch_size=1, image_size=64, num_steps=5, use_ema=True)
    if torch.isnan(samples).any():
        fail("Trainer sampling produced NaN")
    else:
        ok(f"Sampling works: shape={samples.shape}, range=[{samples.min():.3f}, {samples.max():.3f}]")
except Exception as e:
    fail(f"Sampling failed: {e}")

# Test checkpoint save/load
try:
    import tempfile, os
    with tempfile.TemporaryDirectory() as tmpdir:
        ckpt_path = os.path.join(tmpdir, 'test_ckpt.pt')
        trainer.save_checkpoint(ckpt_path)
        
        # Create new trainer and load
        model2 = liquid_diffusion_tiny()
        trainer2 = RectifiedFlowTrainer(model2, lr=1e-4, device='cpu', use_amp=False)
        trainer2.load_checkpoint(ckpt_path)
        
        assert trainer2.step == trainer.step, f"Step mismatch: {trainer2.step} vs {trainer.step}"
        ok(f"Checkpoint save/load works (step={trainer2.step})")
except Exception as e:
    fail(f"Checkpoint save/load failed: {e}")

# =========================================================================
test("Architecture Properties")
# =========================================================================
m = liquid_diffusion_tiny()
total_blocks = (sum(len(s) for s in m.encoder_blocks) + 
                len(m.bottleneck) + 
                sum(len(s) for s in m.decoder_blocks))

# Count attention layers (should be 0)
attention_count = 0
for name_m, module in m.named_modules():
    if 'attention' in name_m.lower() or 'attn' in name_m.lower():
        attention_count += 1

ok(f"Attention layers: {attention_count} (should be 0)")
ok(f"LiquidCfC blocks: {total_blocks}")
ok(f"Training: Rectified Flow (MSE velocity)")
ok(f"Sampling: Euler ODE (configurable steps)")

# =========================================================================
test("VRAM Estimation for Colab T4 (16GB)")
# =========================================================================
for name, factory, res, bs in [
    ("tiny @256px bs=8", liquid_diffusion_tiny, 256, 8),
    ("tiny @256px bs=4", liquid_diffusion_tiny, 256, 4),
    ("small @256px bs=4", liquid_diffusion_small, 256, 4),
    ("base @512px bs=2", liquid_diffusion_base, 512, 2),
    ("tiny @512px bs=4", liquid_diffusion_tiny, 512, 4),
]:
    m = factory()
    tp = sum(p.numel() for p in m.parameters())
    # Conservative VRAM estimate:
    # params (fp16) + gradients (fp32) + Adam states (2Γ—fp32) + activations
    param_gb = tp * 2 / 1e9   # fp16
    grad_gb = tp * 4 / 1e9    # fp32
    optim_gb = tp * 8 / 1e9   # Adam: 2Γ— fp32  
    # Activation estimate: ~4 bytes per element, scale with resolution and batch
    act_gb = bs * res * res * max(m.channels) * 4 * len(m.channels) * 2 / 1e9  
    total_gb = param_gb + grad_gb + optim_gb + act_gb
    fits = "βœ“ fits T4" if total_gb < 15 else "βœ— too large"
    print(f"    {name:25s}: {tp/1e6:5.1f}M params, ~{total_gb:5.1f}GB  {fits}")
    del m


# =========================================================================
# FINAL SUMMARY
# =========================================================================
print("\n" + "=" * 70)
if all_passed:
    print("βœ… ALL TESTS PASSED")
    print("\nReady for training! Open the Colab notebook:")
    print("  LiquidDiffusion_Training.ipynb")
else:
    print("❌ SOME TESTS FAILED β€” check output above")
print("=" * 70)