liquid-diffusion / validate.py
krystv's picture
Upload validate.py
8589a61 verified
#!/usr/bin/env python3
"""
LiquidDiffusion β€” Self-Contained Validation Script
Run this to verify everything works before training:
python validate.py
Tests:
1. Model construction at all scales
2. Forward pass at multiple resolutions
3. Backward pass and gradient flow
4. 20-step training stability with random data
5. Sampling with Euler ODE
6. VRAM estimation
7. Full trainer pipeline
"""
import sys
import math
import time
import copy
print("=" * 70)
print("LiquidDiffusion Validation Suite")
print("=" * 70)
# Check imports
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
print(f"βœ“ PyTorch {torch.__version__}")
except ImportError:
print("βœ— PyTorch not installed. Run: pip install torch torchvision")
sys.exit(1)
try:
from torchvision.utils import save_image
print("βœ“ torchvision")
except ImportError:
print("βœ— torchvision not installed. Run: pip install torchvision")
sys.exit(1)
# Import our modules
try:
from liquid_diffusion.model import (
LiquidDiffusionUNet, liquid_diffusion_tiny,
liquid_diffusion_small, liquid_diffusion_base,
SinusoidalTimeEmbedding, ParallelCfCBlock, AdaLN,
)
print("βœ“ liquid_diffusion.model")
except ImportError as e:
print(f"βœ— Failed to import model: {e}")
print(" Make sure you're in the liquid-diffusion directory")
sys.exit(1)
try:
from liquid_diffusion.trainer import RectifiedFlowTrainer, get_cosine_schedule_with_warmup
print("βœ“ liquid_diffusion.trainer")
except ImportError as e:
print(f"βœ— Failed to import trainer: {e}")
sys.exit(1)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"\nDevice: {device}")
if device == 'cuda':
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"VRAM: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB")
all_passed = True
test_num = 0
def test(name):
global test_num
test_num += 1
print(f"\n--- Test {test_num}: {name} ---")
def fail(msg):
global all_passed
all_passed = False
print(f" βœ— FAIL: {msg}")
def ok(msg):
print(f" βœ“ {msg}")
# =========================================================================
test("Model Construction & Parameter Count")
# =========================================================================
for name, factory in [("tiny", liquid_diffusion_tiny), ("small", liquid_diffusion_small), ("base", liquid_diffusion_base)]:
try:
m = factory()
total, trainable = m.count_params()
ok(f"{name:8s}: {total:>12,} params ({total/1e6:.1f}M)")
del m
except Exception as e:
fail(f"{name}: {e}")
# =========================================================================
test("Forward Pass (multiple resolutions)")
# =========================================================================
model = liquid_diffusion_tiny()
for res in [32, 64, 128]:
try:
x = torch.randn(2, 3, res, res)
t = torch.rand(2)
out = model(x, t)
assert out.shape == x.shape, f"Shape mismatch: {out.shape} vs {x.shape}"
assert not torch.isnan(out).any(), "NaN in output"
assert not torch.isinf(out).any(), "Inf in output"
ok(f"{res}x{res}: output shape {out.shape}, range [{out.min():.4f}, {out.max():.4f}]")
except Exception as e:
fail(f"{res}x{res}: {e}")
# =========================================================================
test("Backward Pass (gradient flow)")
# =========================================================================
model = liquid_diffusion_tiny()
x = torch.randn(2, 3, 64, 64, requires_grad=False)
t = torch.rand(2)
out = model(x, t)
loss = out.mean()
loss.backward()
total_params = 0
params_with_grad = 0
nan_grads = 0
zero_grads = 0
for name_p, p in model.named_parameters():
total_params += 1
if p.grad is not None:
params_with_grad += 1
if torch.isnan(p.grad).any():
nan_grads += 1
if p.grad.abs().max() == 0:
zero_grads += 1
if nan_grads > 0:
fail(f"NaN gradients in {nan_grads}/{total_params} parameters")
elif params_with_grad == 0:
fail("No parameters received gradients")
else:
ok(f"{params_with_grad}/{total_params} params have gradients, {nan_grads} NaN, {zero_grads} zero-only")
# Check gradient magnitude distribution
grad_maxes = [p.grad.abs().max().item() for p in model.parameters() if p.grad is not None]
ok(f"Gradient |max| range: [{min(grad_maxes):.2e}, {max(grad_maxes):.2e}]")
# =========================================================================
test("Training Stability (20 steps, random data)")
# =========================================================================
model = liquid_diffusion_tiny()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
losses = []
for step in range(20):
model.train()
x0 = torch.randn(4, 3, 64, 64)
x1 = torch.randn_like(x0)
t_val = torch.rand(4)
t_expand = t_val[:, None, None, None]
x_t = (1 - t_expand) * x0 + t_expand * x1
v_target = x1 - x0
v_pred = model(x_t, t_val)
loss = F.mse_loss(v_pred, v_target)
optimizer.zero_grad()
loss.backward()
gn = torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
losses.append(loss.item())
if step % 5 == 0:
print(f" Step {step:3d}: loss={loss.item():.4f}, grad_norm={gn.item():.4f}")
stable = all(not math.isnan(l) and not math.isinf(l) for l in losses)
not_exploding = max(losses) < 100
if stable:
ok(f"No NaN/Inf in any of {len(losses)} steps")
else:
fail("NaN or Inf detected in loss")
if not_exploding:
ok(f"Loss range: [{min(losses):.4f}, {max(losses):.4f}]")
else:
fail(f"Loss exploded: max={max(losses):.4f}")
# =========================================================================
test("Sampling (Euler ODE, 10 steps)")
# =========================================================================
model.eval()
with torch.no_grad():
z = torch.randn(2, 3, 64, 64)
num_steps = 10
dt = 1.0 / num_steps
for i in range(num_steps, 0, -1):
t_s = torch.full((2,), i / num_steps)
v = model(z, t_s)
z = z - v * dt
z = z.clamp(-1, 1)
if torch.isnan(z).any():
fail("NaN in generated samples")
elif torch.isinf(z).any():
fail("Inf in generated samples")
else:
ok(f"Shape: {z.shape}, range: [{z.min():.3f}, {z.max():.3f}], "
f"mean: {z.mean():.4f}, std: {z.std():.4f}")
# =========================================================================
test("Timestep Sensitivity")
# =========================================================================
model.eval()
x = torch.randn(1, 3, 64, 64)
outputs = {}
for t_val in [0.01, 0.25, 0.5, 0.75, 0.99]:
with torch.no_grad():
out = model(x, torch.tensor([t_val]))
outputs[t_val] = out
print(f" t={t_val:.2f}: mean={out.mean():.6f}, std={out.std():.6f}, |max|={out.abs().max():.6f}")
# Check that different timesteps give different outputs
diff_01_099 = (outputs[0.01] - outputs[0.99]).abs().mean().item()
if diff_01_099 > 1e-6:
ok(f"Timestep affects output (mean diff t=0.01 vs t=0.99: {diff_01_099:.6f})")
else:
fail(f"Timestep has no effect on output (diff={diff_01_099:.10f})")
# =========================================================================
test("Full Trainer Pipeline (CPU, 5 steps)")
# =========================================================================
model = liquid_diffusion_tiny()
trainer = RectifiedFlowTrainer(
model=model,
lr=1e-4,
device='cpu',
use_amp=False, # CPU doesn't support AMP
time_sampling='logit_normal',
)
for step in range(5):
x0 = torch.randn(2, 3, 64, 64)
metrics = trainer.train_step(x0)
if step == 0:
print(f" Step {step}: loss={metrics['loss']:.4f}, grad_norm={metrics['grad_norm']:.4f}")
if math.isnan(metrics['loss']):
fail("Trainer produced NaN loss")
else:
ok(f"Trainer works: final loss={metrics['loss']:.4f}, step={trainer.step}")
# Test sampling
try:
samples = trainer.sample(batch_size=1, image_size=64, num_steps=5, use_ema=True)
if torch.isnan(samples).any():
fail("Trainer sampling produced NaN")
else:
ok(f"Sampling works: shape={samples.shape}, range=[{samples.min():.3f}, {samples.max():.3f}]")
except Exception as e:
fail(f"Sampling failed: {e}")
# Test checkpoint save/load
try:
import tempfile, os
with tempfile.TemporaryDirectory() as tmpdir:
ckpt_path = os.path.join(tmpdir, 'test_ckpt.pt')
trainer.save_checkpoint(ckpt_path)
# Create new trainer and load
model2 = liquid_diffusion_tiny()
trainer2 = RectifiedFlowTrainer(model2, lr=1e-4, device='cpu', use_amp=False)
trainer2.load_checkpoint(ckpt_path)
assert trainer2.step == trainer.step, f"Step mismatch: {trainer2.step} vs {trainer.step}"
ok(f"Checkpoint save/load works (step={trainer2.step})")
except Exception as e:
fail(f"Checkpoint save/load failed: {e}")
# =========================================================================
test("Architecture Properties")
# =========================================================================
m = liquid_diffusion_tiny()
total_blocks = (sum(len(s) for s in m.encoder_blocks) +
len(m.bottleneck) +
sum(len(s) for s in m.decoder_blocks))
# Count attention layers (should be 0)
attention_count = 0
for name_m, module in m.named_modules():
if 'attention' in name_m.lower() or 'attn' in name_m.lower():
attention_count += 1
ok(f"Attention layers: {attention_count} (should be 0)")
ok(f"LiquidCfC blocks: {total_blocks}")
ok(f"Training: Rectified Flow (MSE velocity)")
ok(f"Sampling: Euler ODE (configurable steps)")
# =========================================================================
test("VRAM Estimation for Colab T4 (16GB)")
# =========================================================================
for name, factory, res, bs in [
("tiny @256px bs=8", liquid_diffusion_tiny, 256, 8),
("tiny @256px bs=4", liquid_diffusion_tiny, 256, 4),
("small @256px bs=4", liquid_diffusion_small, 256, 4),
("base @512px bs=2", liquid_diffusion_base, 512, 2),
("tiny @512px bs=4", liquid_diffusion_tiny, 512, 4),
]:
m = factory()
tp = sum(p.numel() for p in m.parameters())
# Conservative VRAM estimate:
# params (fp16) + gradients (fp32) + Adam states (2Γ—fp32) + activations
param_gb = tp * 2 / 1e9 # fp16
grad_gb = tp * 4 / 1e9 # fp32
optim_gb = tp * 8 / 1e9 # Adam: 2Γ— fp32
# Activation estimate: ~4 bytes per element, scale with resolution and batch
act_gb = bs * res * res * max(m.channels) * 4 * len(m.channels) * 2 / 1e9
total_gb = param_gb + grad_gb + optim_gb + act_gb
fits = "βœ“ fits T4" if total_gb < 15 else "βœ— too large"
print(f" {name:25s}: {tp/1e6:5.1f}M params, ~{total_gb:5.1f}GB {fits}")
del m
# =========================================================================
# FINAL SUMMARY
# =========================================================================
print("\n" + "=" * 70)
if all_passed:
print("βœ… ALL TESTS PASSED")
print("\nReady for training! Open the Colab notebook:")
print(" LiquidDiffusion_Training.ipynb")
else:
print("❌ SOME TESTS FAILED β€” check output above")
print("=" * 70)