| |
| """ |
| LiquidDiffusion β Self-Contained Validation Script |
| |
| Run this to verify everything works before training: |
| python validate.py |
| |
| Tests: |
| 1. Model construction at all scales |
| 2. Forward pass at multiple resolutions |
| 3. Backward pass and gradient flow |
| 4. 20-step training stability with random data |
| 5. Sampling with Euler ODE |
| 6. VRAM estimation |
| 7. Full trainer pipeline |
| """ |
|
|
| import sys |
| import math |
| import time |
| import copy |
|
|
| print("=" * 70) |
| print("LiquidDiffusion Validation Suite") |
| print("=" * 70) |
|
|
| |
| try: |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| print(f"β PyTorch {torch.__version__}") |
| except ImportError: |
| print("β PyTorch not installed. Run: pip install torch torchvision") |
| sys.exit(1) |
|
|
| try: |
| from torchvision.utils import save_image |
| print("β torchvision") |
| except ImportError: |
| print("β torchvision not installed. Run: pip install torchvision") |
| sys.exit(1) |
|
|
| |
| try: |
| from liquid_diffusion.model import ( |
| LiquidDiffusionUNet, liquid_diffusion_tiny, |
| liquid_diffusion_small, liquid_diffusion_base, |
| SinusoidalTimeEmbedding, ParallelCfCBlock, AdaLN, |
| ) |
| print("β liquid_diffusion.model") |
| except ImportError as e: |
| print(f"β Failed to import model: {e}") |
| print(" Make sure you're in the liquid-diffusion directory") |
| sys.exit(1) |
|
|
| try: |
| from liquid_diffusion.trainer import RectifiedFlowTrainer, get_cosine_schedule_with_warmup |
| print("β liquid_diffusion.trainer") |
| except ImportError as e: |
| print(f"β Failed to import trainer: {e}") |
| sys.exit(1) |
|
|
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| print(f"\nDevice: {device}") |
| if device == 'cuda': |
| print(f"GPU: {torch.cuda.get_device_name(0)}") |
| print(f"VRAM: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB") |
|
|
| all_passed = True |
| test_num = 0 |
|
|
| def test(name): |
| global test_num |
| test_num += 1 |
| print(f"\n--- Test {test_num}: {name} ---") |
|
|
| def fail(msg): |
| global all_passed |
| all_passed = False |
| print(f" β FAIL: {msg}") |
|
|
| def ok(msg): |
| print(f" β {msg}") |
|
|
|
|
| |
| test("Model Construction & Parameter Count") |
| |
| for name, factory in [("tiny", liquid_diffusion_tiny), ("small", liquid_diffusion_small), ("base", liquid_diffusion_base)]: |
| try: |
| m = factory() |
| total, trainable = m.count_params() |
| ok(f"{name:8s}: {total:>12,} params ({total/1e6:.1f}M)") |
| del m |
| except Exception as e: |
| fail(f"{name}: {e}") |
|
|
| |
| test("Forward Pass (multiple resolutions)") |
| |
| model = liquid_diffusion_tiny() |
| for res in [32, 64, 128]: |
| try: |
| x = torch.randn(2, 3, res, res) |
| t = torch.rand(2) |
| out = model(x, t) |
| assert out.shape == x.shape, f"Shape mismatch: {out.shape} vs {x.shape}" |
| assert not torch.isnan(out).any(), "NaN in output" |
| assert not torch.isinf(out).any(), "Inf in output" |
| ok(f"{res}x{res}: output shape {out.shape}, range [{out.min():.4f}, {out.max():.4f}]") |
| except Exception as e: |
| fail(f"{res}x{res}: {e}") |
|
|
| |
| test("Backward Pass (gradient flow)") |
| |
| model = liquid_diffusion_tiny() |
| x = torch.randn(2, 3, 64, 64, requires_grad=False) |
| t = torch.rand(2) |
| out = model(x, t) |
| loss = out.mean() |
| loss.backward() |
|
|
| total_params = 0 |
| params_with_grad = 0 |
| nan_grads = 0 |
| zero_grads = 0 |
| for name_p, p in model.named_parameters(): |
| total_params += 1 |
| if p.grad is not None: |
| params_with_grad += 1 |
| if torch.isnan(p.grad).any(): |
| nan_grads += 1 |
| if p.grad.abs().max() == 0: |
| zero_grads += 1 |
|
|
| if nan_grads > 0: |
| fail(f"NaN gradients in {nan_grads}/{total_params} parameters") |
| elif params_with_grad == 0: |
| fail("No parameters received gradients") |
| else: |
| ok(f"{params_with_grad}/{total_params} params have gradients, {nan_grads} NaN, {zero_grads} zero-only") |
|
|
| |
| grad_maxes = [p.grad.abs().max().item() for p in model.parameters() if p.grad is not None] |
| ok(f"Gradient |max| range: [{min(grad_maxes):.2e}, {max(grad_maxes):.2e}]") |
|
|
| |
| test("Training Stability (20 steps, random data)") |
| |
| model = liquid_diffusion_tiny() |
| optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01) |
|
|
| losses = [] |
| for step in range(20): |
| model.train() |
| x0 = torch.randn(4, 3, 64, 64) |
| x1 = torch.randn_like(x0) |
| t_val = torch.rand(4) |
| t_expand = t_val[:, None, None, None] |
| x_t = (1 - t_expand) * x0 + t_expand * x1 |
| v_target = x1 - x0 |
| |
| v_pred = model(x_t, t_val) |
| loss = F.mse_loss(v_pred, v_target) |
| |
| optimizer.zero_grad() |
| loss.backward() |
| gn = torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| optimizer.step() |
| |
| losses.append(loss.item()) |
| if step % 5 == 0: |
| print(f" Step {step:3d}: loss={loss.item():.4f}, grad_norm={gn.item():.4f}") |
|
|
| stable = all(not math.isnan(l) and not math.isinf(l) for l in losses) |
| not_exploding = max(losses) < 100 |
|
|
| if stable: |
| ok(f"No NaN/Inf in any of {len(losses)} steps") |
| else: |
| fail("NaN or Inf detected in loss") |
|
|
| if not_exploding: |
| ok(f"Loss range: [{min(losses):.4f}, {max(losses):.4f}]") |
| else: |
| fail(f"Loss exploded: max={max(losses):.4f}") |
|
|
| |
| test("Sampling (Euler ODE, 10 steps)") |
| |
| model.eval() |
| with torch.no_grad(): |
| z = torch.randn(2, 3, 64, 64) |
| num_steps = 10 |
| dt = 1.0 / num_steps |
| for i in range(num_steps, 0, -1): |
| t_s = torch.full((2,), i / num_steps) |
| v = model(z, t_s) |
| z = z - v * dt |
| z = z.clamp(-1, 1) |
| |
| if torch.isnan(z).any(): |
| fail("NaN in generated samples") |
| elif torch.isinf(z).any(): |
| fail("Inf in generated samples") |
| else: |
| ok(f"Shape: {z.shape}, range: [{z.min():.3f}, {z.max():.3f}], " |
| f"mean: {z.mean():.4f}, std: {z.std():.4f}") |
|
|
| |
| test("Timestep Sensitivity") |
| |
| model.eval() |
| x = torch.randn(1, 3, 64, 64) |
| outputs = {} |
| for t_val in [0.01, 0.25, 0.5, 0.75, 0.99]: |
| with torch.no_grad(): |
| out = model(x, torch.tensor([t_val])) |
| outputs[t_val] = out |
| print(f" t={t_val:.2f}: mean={out.mean():.6f}, std={out.std():.6f}, |max|={out.abs().max():.6f}") |
|
|
| |
| diff_01_099 = (outputs[0.01] - outputs[0.99]).abs().mean().item() |
| if diff_01_099 > 1e-6: |
| ok(f"Timestep affects output (mean diff t=0.01 vs t=0.99: {diff_01_099:.6f})") |
| else: |
| fail(f"Timestep has no effect on output (diff={diff_01_099:.10f})") |
|
|
| |
| test("Full Trainer Pipeline (CPU, 5 steps)") |
| |
| model = liquid_diffusion_tiny() |
|
|
| trainer = RectifiedFlowTrainer( |
| model=model, |
| lr=1e-4, |
| device='cpu', |
| use_amp=False, |
| time_sampling='logit_normal', |
| ) |
|
|
| for step in range(5): |
| x0 = torch.randn(2, 3, 64, 64) |
| metrics = trainer.train_step(x0) |
| if step == 0: |
| print(f" Step {step}: loss={metrics['loss']:.4f}, grad_norm={metrics['grad_norm']:.4f}") |
|
|
| if math.isnan(metrics['loss']): |
| fail("Trainer produced NaN loss") |
| else: |
| ok(f"Trainer works: final loss={metrics['loss']:.4f}, step={trainer.step}") |
|
|
| |
| try: |
| samples = trainer.sample(batch_size=1, image_size=64, num_steps=5, use_ema=True) |
| if torch.isnan(samples).any(): |
| fail("Trainer sampling produced NaN") |
| else: |
| ok(f"Sampling works: shape={samples.shape}, range=[{samples.min():.3f}, {samples.max():.3f}]") |
| except Exception as e: |
| fail(f"Sampling failed: {e}") |
|
|
| |
| try: |
| import tempfile, os |
| with tempfile.TemporaryDirectory() as tmpdir: |
| ckpt_path = os.path.join(tmpdir, 'test_ckpt.pt') |
| trainer.save_checkpoint(ckpt_path) |
| |
| |
| model2 = liquid_diffusion_tiny() |
| trainer2 = RectifiedFlowTrainer(model2, lr=1e-4, device='cpu', use_amp=False) |
| trainer2.load_checkpoint(ckpt_path) |
| |
| assert trainer2.step == trainer.step, f"Step mismatch: {trainer2.step} vs {trainer.step}" |
| ok(f"Checkpoint save/load works (step={trainer2.step})") |
| except Exception as e: |
| fail(f"Checkpoint save/load failed: {e}") |
|
|
| |
| test("Architecture Properties") |
| |
| m = liquid_diffusion_tiny() |
| total_blocks = (sum(len(s) for s in m.encoder_blocks) + |
| len(m.bottleneck) + |
| sum(len(s) for s in m.decoder_blocks)) |
|
|
| |
| attention_count = 0 |
| for name_m, module in m.named_modules(): |
| if 'attention' in name_m.lower() or 'attn' in name_m.lower(): |
| attention_count += 1 |
|
|
| ok(f"Attention layers: {attention_count} (should be 0)") |
| ok(f"LiquidCfC blocks: {total_blocks}") |
| ok(f"Training: Rectified Flow (MSE velocity)") |
| ok(f"Sampling: Euler ODE (configurable steps)") |
|
|
| |
| test("VRAM Estimation for Colab T4 (16GB)") |
| |
| for name, factory, res, bs in [ |
| ("tiny @256px bs=8", liquid_diffusion_tiny, 256, 8), |
| ("tiny @256px bs=4", liquid_diffusion_tiny, 256, 4), |
| ("small @256px bs=4", liquid_diffusion_small, 256, 4), |
| ("base @512px bs=2", liquid_diffusion_base, 512, 2), |
| ("tiny @512px bs=4", liquid_diffusion_tiny, 512, 4), |
| ]: |
| m = factory() |
| tp = sum(p.numel() for p in m.parameters()) |
| |
| |
| param_gb = tp * 2 / 1e9 |
| grad_gb = tp * 4 / 1e9 |
| optim_gb = tp * 8 / 1e9 |
| |
| act_gb = bs * res * res * max(m.channels) * 4 * len(m.channels) * 2 / 1e9 |
| total_gb = param_gb + grad_gb + optim_gb + act_gb |
| fits = "β fits T4" if total_gb < 15 else "β too large" |
| print(f" {name:25s}: {tp/1e6:5.1f}M params, ~{total_gb:5.1f}GB {fits}") |
| del m |
|
|
|
|
| |
| |
| |
| print("\n" + "=" * 70) |
| if all_passed: |
| print("β
ALL TESTS PASSED") |
| print("\nReady for training! Open the Colab notebook:") |
| print(" LiquidDiffusion_Training.ipynb") |
| else: |
| print("β SOME TESTS FAILED β check output above") |
| print("=" * 70) |
|
|