| | """ |
| | Unit tests for the 'remain static' option in the StopCube task. |
| | |
| | Test increment and saturation logic of the 'remain static' option in vqa_options._options_stopcube: |
| | - Each time 'remain static' is selected, solve_hold_obj_absTimestep(absTimestep) is called internally; |
| | - absTimestep increases as 100 -> 200 -> ... and does not exceed final_target (computed from steps_press and interval); |
| | - If env.elapsed_steps goes backward, internal state should reset and start from 100 next time. |
| | """ |
| | from pathlib import Path |
| | import importlib.util |
| | import sys |
| | import types |
| |
|
| | from tests._shared.repo_paths import find_repo_root |
| |
|
| |
|
| | |
| | PLANNER_SYMBOLS = [ |
| | "grasp_and_lift_peg_side", |
| | "insert_peg", |
| | "solve_button", |
| | "solve_button_ready", |
| | "solve_hold_obj", |
| | "solve_hold_obj_absTimestep", |
| | "solve_pickup", |
| | "solve_pickup_bin", |
| | "solve_push_to_target", |
| | "solve_push_to_target_with_peg", |
| | "solve_putdown_whenhold", |
| | "solve_putonto_whenhold", |
| | "solve_putonto_whenhold_binspecial", |
| | "solve_swingonto", |
| | "solve_swingonto_withDirection", |
| | "solve_swingonto_whenhold", |
| | "solve_strong_reset", |
| | ] |
| |
|
| |
|
| | def _load_vqa_options_module(): |
| | """Inject a stub planner module and load vqa_options, returning (module, hold_calls list).""" |
| | hold_calls = [] |
| |
|
| | planner_stub = types.ModuleType("robomme.robomme_env.utils.subgoal_planner_func") |
| |
|
| | def _noop(*args, **kwargs): |
| | return None |
| |
|
| | for symbol in PLANNER_SYMBOLS: |
| | setattr(planner_stub, symbol, _noop) |
| |
|
| | def _hold_spy(env, planner, absTimestep): |
| | """Record the actual absTimestep passed for each 'remain static' call for assertions.""" |
| | hold_calls.append(int(absTimestep)) |
| | return None |
| |
|
| | planner_stub.solve_hold_obj_absTimestep = _hold_spy |
| |
|
| | robomme_pkg = types.ModuleType("robomme") |
| | robomme_pkg.__path__ = [] |
| | robomme_env_pkg = types.ModuleType("robomme.robomme_env") |
| | robomme_env_pkg.__path__ = [] |
| | utils_pkg = types.ModuleType("robomme.robomme_env.utils") |
| | utils_pkg.__path__ = [] |
| |
|
| | logging_utils_pkg = types.ModuleType("robomme.logging_utils") |
| | import logging |
| | logging_utils_pkg.logger = logging.getLogger("dummy") |
| |
|
| | robomme_pkg.robomme_env = robomme_env_pkg |
| | robomme_pkg.logging_utils = logging_utils_pkg |
| | robomme_env_pkg.utils = utils_pkg |
| | utils_pkg.subgoal_planner_func = planner_stub |
| |
|
| | injected = { |
| | "robomme": robomme_pkg, |
| | "robomme.robomme_env": robomme_env_pkg, |
| | "robomme.robomme_env.utils": utils_pkg, |
| | "robomme.robomme_env.utils.subgoal_planner_func": planner_stub, |
| | "robomme.logging_utils": logging_utils_pkg, |
| | } |
| | previous = {key: sys.modules.get(key) for key in injected} |
| | sys.modules.update(injected) |
| |
|
| | try: |
| | repo_root = find_repo_root(__file__) |
| | module_path = repo_root / "src" / "robomme" / "robomme_env" / "utils" / "vqa_options.py" |
| | spec = importlib.util.spec_from_file_location("robomme.robomme_env.utils.vqa_options", module_path) |
| | module = importlib.util.module_from_spec(spec) |
| | spec.loader.exec_module(module) |
| | finally: |
| | for key, old_module in previous.items(): |
| | if old_module is None: |
| | sys.modules.pop(key, None) |
| | else: |
| | sys.modules[key] = old_module |
| |
|
| | return module, hold_calls |
| |
|
| |
|
| | class _DummyEnv: |
| | """Mock environment exposing only elapsed_steps for _options_stopcube.""" |
| | def __init__(self, elapsed_steps=0): |
| | self.elapsed_steps = elapsed_steps |
| |
|
| |
|
| | class _DummyBase: |
| | """Mock StopCube base providing steps_press and interval to compute final_target.""" |
| | def __init__(self, steps_press, interval=30): |
| | self.steps_press = steps_press |
| | self.interval = interval |
| | self.button = object() |
| |
|
| |
|
| | def _get_remain_static_solver(options): |
| | """Get the solve function whose label is 'remain static' from StopCube options.""" |
| | for option in options: |
| | if option.get("action") == "remain static": |
| | return option["solve"] |
| | raise AssertionError("Missing 'remain static' option") |
| |
|
| |
|
| | def test_stopcube_remain_static_increment_and_saturation(): |
| | """Test: when selecting 'remain static' multiple times, absTimestep increments 100->200->... and saturates at final_target.""" |
| | module, hold_calls = _load_vqa_options_module() |
| | env = _DummyEnv(elapsed_steps=0) |
| | base = _DummyBase(steps_press=270, interval=30) |
| | options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
| | solve_remain_static = _get_remain_static_solver(options) |
| |
|
| | for _ in range(4): |
| | solve_remain_static() |
| |
|
| | assert hold_calls == [100, 200, 240, 240] |
| |
|
| |
|
| | def test_stopcube_remain_static_small_final_target(): |
| | """Test: when final_target is small (e.g., 60), first call hits the cap and later calls stay at 60 (saturation).""" |
| | module, hold_calls = _load_vqa_options_module() |
| | env = _DummyEnv(elapsed_steps=0) |
| | base = _DummyBase(steps_press=90, interval=30) |
| | options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
| | solve_remain_static = _get_remain_static_solver(options) |
| |
|
| | solve_remain_static() |
| | solve_remain_static() |
| |
|
| | assert hold_calls == [60, 60] |
| |
|
| |
|
| | def test_stopcube_remain_static_resets_when_elapsed_steps_go_back(): |
| | """Test: when elapsed_steps is reduced (e.g., 150 back to 0), internal step state should reset and restart from 100.""" |
| | module, hold_calls = _load_vqa_options_module() |
| | env = _DummyEnv(elapsed_steps=0) |
| | base = _DummyBase(steps_press=270, interval=30) |
| | options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
| | solve_remain_static = _get_remain_static_solver(options) |
| |
|
| | solve_remain_static() |
| | env.elapsed_steps = 150 |
| | solve_remain_static() |
| | env.elapsed_steps = 0 |
| | solve_remain_static() |
| |
|
| | assert hold_calls == [100, 200, 100] |
| |
|
| |
|
| | def test_stopcube_option_label_order_stays_stable(): |
| | """Test: StopCube option label order must stay fixed: prepare first, then remain static, then press button.""" |
| | module, _ = _load_vqa_options_module() |
| | env = _DummyEnv(elapsed_steps=0) |
| | base = _DummyBase(steps_press=270, interval=30) |
| | options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
| |
|
| | actions = [option.get("action") for option in options] |
| | assert actions == [ |
| | "move to the top of the button to prepare", |
| | "remain static", |
| | "press button to stop the cube", |
| | ] |
| |
|