File size: 7,807 Bytes
604e535 761835a 604e535 761835a 604e535 ccf9f1b 604e535 ccf9f1b 604e535 ccf9f1b 604e535 ccf9f1b 604e535 ccf9f1b 604e535 ccf9f1b 604e535 ccf9f1b 604e535 ccf9f1b 604e535 ccf9f1b 604e535 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | from __future__ import annotations
import importlib
import inspect
from pathlib import Path
import numpy as np
from experiments.run_paper_image_pipeline import load_config, stage_planning, stage_prediction, stage_probe, stage_train
from experiments.shared.src.methods import METHODS, PAPER_LEARNED_METHODS, TRADITIONAL_METHODS
from experiments.shared.src.utils.parameter_count import parameter_count_by_component
from experiments.shared.src.vision.clean_renderer import render_clean_boat_array
from experiments.shared.src.vision.pose_from_image import estimate_pose_from_clean_image
from experiments.evaluate_flowmo_latent_probes import fit_ridge, predict_ridge, regression_metrics
def angle_error(a: float, b: float) -> float:
return float(abs(np.arctan2(np.sin(a - b), np.cos(a - b))))
def test_formal_method_registry_contains_only_paper_methods() -> None:
assert PAPER_LEARNED_METHODS == ["flowmo", "leworldmodel", "planet", "tdmpc2"]
assert set(METHODS) == {
"flowmo",
"leworldmodel",
"planet",
"tdmpc2",
"pid_los_controller",
"no_flow_los_controller",
"current_estimator_los_controller",
"oracle_flow_los_controller",
}
assert TRADITIONAL_METHODS == [
"pid_los_controller",
"no_flow_los_controller",
"current_estimator_los_controller",
"oracle_flow_los_controller",
]
assert all(METHODS[name].category == "A_learned_world_model" for name in PAPER_LEARNED_METHODS)
assert all(METHODS[name].category == "B_traditional_controller" for name in TRADITIONAL_METHODS)
def test_paper_method_directories_follow_public_layout() -> None:
for method in [*PAPER_LEARNED_METHODS, *TRADITIONAL_METHODS]:
root = Path("experiments") / method
assert root.is_dir()
assert (root / "src").is_dir()
assert (root / "result").is_dir()
assert (root / "README.md").is_file()
for method in PAPER_LEARNED_METHODS:
assert (Path("experiments") / method / "checkpoint").is_dir()
def test_public_experiment_docs_only_describe_ab_categories() -> None:
docs = [
Path("experiments/README.md"),
Path("experiments/BASELINES.md"),
Path("experiments/EXPERIMENT_MATRIX.md"),
Path("experiments/METHOD_AUDIT.md"),
Path("experiments/TASK_PLAN.md"),
]
forbidden = [
"Category " + "C",
"C " + "类",
"full agent",
"full-agent",
"paper_table",
"related_work_full_agent",
]
for path in docs:
text = path.read_text()
for token in forbidden:
assert token not in text
def test_paper_pipeline_uses_stage_specific_precision() -> None:
class Args:
methods = None
train_episodes = None
test_episodes = None
train_windows = None
test_windows = None
batch_size = None
steps = None
checkpoint_name = "paper.pt"
checkpoint_interval = None
train_workers = None
num_workers = None
device = "cuda"
precision = None
prediction_out = "prediction.json"
probe_results = "probe.json"
planning_episodes = None
max_steps = None
make_gifs = None
gif_stride = 1
gif_duration_ms = 55
cem_horizon = None
cem_population = None
cem_elites = None
cem_iterations = None
cem_action_std = None
cem_knots = None
cem_w_route = None
cem_w_heading_goal = None
cem_w_progress = None
cem_w_action = None
cem_w_smooth = None
cem_w_boundary = None
cem_w_goal = None
cem_w_path = None
cem_w_lookahead = None
cem_w_via = None
cem_route_horizon_distance = None
cem_boundary_margin = None
planning_workers = None
planning_out = "planning"
cfg = load_config("experiments/shared/config/paper_image.json")
def precision_from(cmd: list[str]) -> str:
return cmd[cmd.index("--precision") + 1]
assert precision_from(stage_train(cfg, Args)) == "bf16"
assert precision_from(stage_prediction(cfg, Args)) == "bf16"
assert precision_from(stage_probe(cfg, Args)) == "bf16"
assert precision_from(stage_planning(cfg, Args, "reach_target", "twin", "uniform")) == "fp32"
def test_train_test_and_planning_use_same_flow_families() -> None:
cfg = load_config("experiments/shared/config/paper_image.json")
assert cfg["data"]["train_source"] == "data/paper/train.npz"
assert cfg["data"]["test_source"] == "data/paper/test.npz"
assert cfg["prediction_eval"]["out"] == "experiments/reports/paper_prediction.json"
assert cfg["flow_families"] == [
"noflow",
"uniform",
"vortex_center",
"double_gyre",
"source_sink",
"source_sink_pair",
"gradient",
"shear",
"turbulent_patch",
"random_fourier",
]
def test_flowmo_probe_stage_uses_frozen_checkpoint_and_test_split() -> None:
class Args:
checkpoint_name = "paper.pt"
test_episodes = None
num_workers = None
device = "cuda"
precision = None
probe_results = "probe.json"
cfg = load_config("experiments/shared/config/paper_image.json")
cmd = stage_probe(cfg, Args)
assert "experiments.evaluate_flowmo_latent_probes" in cmd
assert "--checkpoint-name" in cmd
assert cmd[cmd.index("--checkpoint-name") + 1] == "paper.pt"
joined = " ".join(cmd)
assert "test:data/paper/test.npz:480" in joined
def test_paper_learned_world_models_are_parameter_matched() -> None:
totals = []
for method in PAPER_LEARNED_METHODS:
cfg = importlib.import_module(f"experiments.{method}.src.config").default_config()
model = importlib.import_module(f"experiments.{method}.src.model").build_model(cfg)
totals.append(parameter_count_by_component(model)["total"])
assert max(totals) / min(totals) < 1.03
def test_learned_world_models_do_not_use_pose_extractor() -> None:
for method in PAPER_LEARNED_METHODS:
model_module = importlib.import_module(f"experiments.{method}.src.model")
source = inspect.getsource(model_module)
assert "pose_from_image" not in source
assert "estimate_pose_from_clean_image" not in source
for module_name in [
"experiments.train_image_world_models",
"experiments.evaluate_image_world_models",
"experiments.evaluate_image_planning",
]:
source = inspect.getsource(importlib.import_module(module_name))
assert "pose_from_image" not in source
assert "estimate_pose_from_clean_image" not in source
def test_clean_image_pose_extractor_is_accurate_for_traditional_controllers() -> None:
for boat in ["twin", "triangle"]:
for theta in [-2.4, -1.0, 0.2, 1.6, 2.9]:
state = np.array([5.0, 5.0, theta, 0.0, 0.0, 0.0], dtype=np.float32)
image = render_clean_boat_array(state, boat, image_size=160, visual_scale=2.5)
pose = estimate_pose_from_clean_image(image, visual_scale=2.5)
pred_theta = float(np.arctan2(pose[3], pose[2]))
assert np.linalg.norm(pose[:2] - state[:2]) < 0.05
assert angle_error(pred_theta, theta) < 0.09
def test_linear_probe_regression_recovers_linear_signal() -> None:
rng = np.random.default_rng(123)
x = rng.normal(size=(128, 5)).astype(np.float32)
w = rng.normal(size=(5, 2)).astype(np.float32)
y = x @ w + 0.01 * rng.normal(size=(128, 2)).astype(np.float32)
model = fit_ridge(x[:96], y[:96], alpha=1.0e-4)
pred = predict_ridge(model, x[96:])
metrics = regression_metrics(pred, y[96:], ["a", "b"])
assert metrics["r2_mean"] > 0.99
|