| from __future__ import annotations |
|
|
| import importlib |
| import inspect |
| from pathlib import Path |
|
|
| import numpy as np |
|
|
| from experiments.run_paper_image_pipeline import load_config, stage_planning, stage_prediction, stage_probe, stage_train |
| from experiments.shared.src.methods import METHODS, PAPER_LEARNED_METHODS, TRADITIONAL_METHODS |
| from experiments.shared.src.utils.parameter_count import parameter_count_by_component |
| from experiments.shared.src.vision.clean_renderer import render_clean_boat_array |
| from experiments.shared.src.vision.pose_from_image import estimate_pose_from_clean_image |
| from experiments.evaluate_flowmo_latent_probes import fit_ridge, predict_ridge, regression_metrics |
|
|
|
|
| def angle_error(a: float, b: float) -> float: |
| return float(abs(np.arctan2(np.sin(a - b), np.cos(a - b)))) |
|
|
|
|
| def test_formal_method_registry_contains_only_paper_methods() -> None: |
| assert PAPER_LEARNED_METHODS == ["flowmo", "leworldmodel", "planet", "tdmpc2"] |
| assert set(METHODS) == { |
| "flowmo", |
| "leworldmodel", |
| "planet", |
| "tdmpc2", |
| "pid_los_controller", |
| "no_flow_los_controller", |
| "current_estimator_los_controller", |
| "oracle_flow_los_controller", |
| } |
| assert TRADITIONAL_METHODS == [ |
| "pid_los_controller", |
| "no_flow_los_controller", |
| "current_estimator_los_controller", |
| "oracle_flow_los_controller", |
| ] |
| assert all(METHODS[name].category == "A_learned_world_model" for name in PAPER_LEARNED_METHODS) |
| assert all(METHODS[name].category == "B_traditional_controller" for name in TRADITIONAL_METHODS) |
|
|
|
|
| def test_paper_method_directories_follow_public_layout() -> None: |
| for method in [*PAPER_LEARNED_METHODS, *TRADITIONAL_METHODS]: |
| root = Path("experiments") / method |
| assert root.is_dir() |
| assert (root / "src").is_dir() |
| assert (root / "result").is_dir() |
| assert (root / "README.md").is_file() |
| for method in PAPER_LEARNED_METHODS: |
| assert (Path("experiments") / method / "checkpoint").is_dir() |
|
|
|
|
| def test_public_experiment_docs_only_describe_ab_categories() -> None: |
| docs = [ |
| Path("experiments/README.md"), |
| Path("experiments/BASELINES.md"), |
| Path("experiments/EXPERIMENT_MATRIX.md"), |
| Path("experiments/METHOD_AUDIT.md"), |
| Path("experiments/TASK_PLAN.md"), |
| ] |
| forbidden = [ |
| "Category " + "C", |
| "C " + "类", |
| "full agent", |
| "full-agent", |
| "paper_table", |
| "related_work_full_agent", |
| ] |
| for path in docs: |
| text = path.read_text() |
| for token in forbidden: |
| assert token not in text |
|
|
|
|
| def test_paper_pipeline_uses_stage_specific_precision() -> None: |
| class Args: |
| methods = None |
| train_episodes = None |
| test_episodes = None |
| train_windows = None |
| test_windows = None |
| batch_size = None |
| steps = None |
| checkpoint_name = "paper.pt" |
| checkpoint_interval = None |
| train_workers = None |
| num_workers = None |
| device = "cuda" |
| precision = None |
| prediction_out = "prediction.json" |
| probe_results = "probe.json" |
| planning_episodes = None |
| max_steps = None |
| make_gifs = None |
| gif_stride = 1 |
| gif_duration_ms = 55 |
| cem_horizon = None |
| cem_population = None |
| cem_elites = None |
| cem_iterations = None |
| cem_action_std = None |
| cem_knots = None |
| cem_w_route = None |
| cem_w_heading_goal = None |
| cem_w_progress = None |
| cem_w_action = None |
| cem_w_smooth = None |
| cem_w_boundary = None |
| cem_w_goal = None |
| cem_w_path = None |
| cem_w_lookahead = None |
| cem_w_via = None |
| cem_route_horizon_distance = None |
| cem_boundary_margin = None |
| planning_workers = None |
| planning_out = "planning" |
|
|
| cfg = load_config("experiments/shared/config/paper_image.json") |
|
|
| def precision_from(cmd: list[str]) -> str: |
| return cmd[cmd.index("--precision") + 1] |
|
|
| assert precision_from(stage_train(cfg, Args)) == "bf16" |
| assert precision_from(stage_prediction(cfg, Args)) == "bf16" |
| assert precision_from(stage_probe(cfg, Args)) == "bf16" |
| assert precision_from(stage_planning(cfg, Args, "reach_target", "twin", "uniform")) == "fp32" |
|
|
|
|
| def test_train_test_and_planning_use_same_flow_families() -> None: |
| cfg = load_config("experiments/shared/config/paper_image.json") |
| assert cfg["data"]["train_source"] == "data/paper/train.npz" |
| assert cfg["data"]["test_source"] == "data/paper/test.npz" |
| assert cfg["prediction_eval"]["out"] == "experiments/reports/paper_prediction.json" |
| assert cfg["flow_families"] == [ |
| "noflow", |
| "uniform", |
| "vortex_center", |
| "double_gyre", |
| "source_sink", |
| "source_sink_pair", |
| "gradient", |
| "shear", |
| "turbulent_patch", |
| "random_fourier", |
| ] |
|
|
|
|
| def test_flowmo_probe_stage_uses_frozen_checkpoint_and_test_split() -> None: |
| class Args: |
| checkpoint_name = "paper.pt" |
| test_episodes = None |
| num_workers = None |
| device = "cuda" |
| precision = None |
| probe_results = "probe.json" |
|
|
| cfg = load_config("experiments/shared/config/paper_image.json") |
| cmd = stage_probe(cfg, Args) |
| assert "experiments.evaluate_flowmo_latent_probes" in cmd |
| assert "--checkpoint-name" in cmd |
| assert cmd[cmd.index("--checkpoint-name") + 1] == "paper.pt" |
| joined = " ".join(cmd) |
| assert "test:data/paper/test.npz:480" in joined |
|
|
|
|
| def test_paper_learned_world_models_are_parameter_matched() -> None: |
| totals = [] |
| for method in PAPER_LEARNED_METHODS: |
| cfg = importlib.import_module(f"experiments.{method}.src.config").default_config() |
| model = importlib.import_module(f"experiments.{method}.src.model").build_model(cfg) |
| totals.append(parameter_count_by_component(model)["total"]) |
| assert max(totals) / min(totals) < 1.03 |
|
|
|
|
| def test_learned_world_models_do_not_use_pose_extractor() -> None: |
| for method in PAPER_LEARNED_METHODS: |
| model_module = importlib.import_module(f"experiments.{method}.src.model") |
| source = inspect.getsource(model_module) |
| assert "pose_from_image" not in source |
| assert "estimate_pose_from_clean_image" not in source |
| for module_name in [ |
| "experiments.train_image_world_models", |
| "experiments.evaluate_image_world_models", |
| "experiments.evaluate_image_planning", |
| ]: |
| source = inspect.getsource(importlib.import_module(module_name)) |
| assert "pose_from_image" not in source |
| assert "estimate_pose_from_clean_image" not in source |
|
|
|
|
| def test_clean_image_pose_extractor_is_accurate_for_traditional_controllers() -> None: |
| for boat in ["twin", "triangle"]: |
| for theta in [-2.4, -1.0, 0.2, 1.6, 2.9]: |
| state = np.array([5.0, 5.0, theta, 0.0, 0.0, 0.0], dtype=np.float32) |
| image = render_clean_boat_array(state, boat, image_size=160, visual_scale=2.5) |
| pose = estimate_pose_from_clean_image(image, visual_scale=2.5) |
| pred_theta = float(np.arctan2(pose[3], pose[2])) |
| assert np.linalg.norm(pose[:2] - state[:2]) < 0.05 |
| assert angle_error(pred_theta, theta) < 0.09 |
|
|
|
|
| def test_linear_probe_regression_recovers_linear_signal() -> None: |
| rng = np.random.default_rng(123) |
| x = rng.normal(size=(128, 5)).astype(np.float32) |
| w = rng.normal(size=(5, 2)).astype(np.float32) |
| y = x @ w + 0.01 * rng.normal(size=(128, 2)).astype(np.float32) |
| model = fit_ridge(x[:96], y[:96], alpha=1.0e-4) |
| pred = predict_ridge(model, x[96:]) |
| metrics = regression_metrics(pred, y[96:], ["a", "b"]) |
| assert metrics["r2_mean"] > 0.99 |
|
|