File size: 2,963 Bytes
195056b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import torch

from mapgs.data import collate_samples
from mapgs.train import Trainer, compute_step_losses
from mapgs.train.trainer import _move_batch
from mapgs.eval import Evaluator, psnr, d_rmse
from .conftest import requires_cuda


@requires_cuda
def test_sample_shapes(cfg, dataset):
    from mapgs.data.synthetic import _context_frames
    s = dataset[0]
    Vc = len(_context_frames(cfg)) * 3  # context frames clamp+dedup within num_frames
    assert s.ctx_images.shape == (Vc, 3, cfg.data.height, cfg.data.width)
    assert s.ctx_tids.shape[0] == Vc
    assert s.anchor_pos.shape == (cfg.model.tokens.n_map, 3)
    assert s.sup_depth.shape[0] == s.sup_images.shape[0]


@requires_cuda
def test_collate_and_dynamic_padding(cfg, dataset):
    batch = collate_samples([dataset[0], dataset[1]])
    assert batch["ctx_images"].shape[0] == 2
    if batch["dynamic"] is not None:
        assert batch["dynamic"]["box_centers"].shape[0] == 2
        assert batch["dynamic"]["valid"].shape[0] == 2


@requires_cuda
def test_step_losses_finite_and_backward(cfg, dataset):
    trainer = Trainer(cfg)
    batch = _move_batch(collate_samples([dataset[0], dataset[1]]), "cuda")
    total, log = compute_step_losses(trainer.model, trainer.ras, batch, trainer.criterion, 100, cfg, "cuda")
    assert torch.isfinite(total)
    for k in ["rgb", "mapdepth", "vis", "vert", "free_space"]:
        assert k in log
    total.backward()
    assert sum(p.grad.abs().sum() for p in trainer.model.parameters() if p.grad is not None) > 0


@requires_cuda
def test_amp_train_step_runs(cfg, dataset):
    # guards the autocast (bf16) -> fp32 rendering boundary
    prev = cfg.train.amp
    cfg.train.amp = True
    try:
        trainer = Trainer(cfg)
        log = trainer.train_step(collate_samples([dataset[0], dataset[1]]))
        assert log["total"] == log["total"]  # finite (not NaN)
        assert log["grad_norm"] >= 0
    finally:
        cfg.train.amp = prev


@requires_cuda
def test_training_decreases_loss(cfg, dataset):
    trainer = Trainer(cfg)
    losses = []
    for _ in range(20):
        batch = collate_samples([dataset[0], dataset[1]])
        losses.append(trainer.train_step(batch)["total"])
    assert min(losses[10:]) < losses[0]  # improved over the first step


@requires_cuda
def test_metrics_sanity():
    img = torch.rand(3, 16, 16, device="cuda")
    assert psnr(img, img) > 50
    d = torch.rand(1, 8, 8, device="cuda") + 1
    assert d_rmse(d, d.clone()) < 1e-5


@requires_cuda
def test_evaluator_runs(cfg, dataset):
    trainer = Trainer(cfg)
    ev = Evaluator(trainer.model, cfg, device="cuda")
    interp = ev.interpolation(dataset, max_scenes=2)
    assert "PSNR" in interp and "D-RMSE" in interp
    sweep = ev.extrapolation_sweep(dataset, shifts=[2.0], max_scenes=2, frame=cfg.data.num_frames // 2)
    assert 2.0 in sweep
    lane = ev.lane_consistency(dataset, max_scenes=2, frame=cfg.data.num_frames // 2)
    assert "lane_mIoU" in lane