| """ |
| Unit tests for the 'remain static' option in the StopCube task. |
| |
| Test increment and saturation logic of the 'remain static' option in vqa_options._options_stopcube: |
| - Each time 'remain static' is selected, solve_hold_obj_absTimestep(absTimestep) is called internally; |
| - absTimestep increases as 100 -> 200 -> ... and does not exceed final_target (computed from steps_press and interval); |
| - If env.elapsed_steps goes backward, internal state should reset and start from 100 next time. |
| """ |
| from pathlib import Path |
| import importlib.util |
| import sys |
| import types |
|
|
| from tests._shared.repo_paths import find_repo_root |
|
|
|
|
| |
| PLANNER_SYMBOLS = [ |
| "grasp_and_lift_peg_side", |
| "insert_peg", |
| "solve_button", |
| "solve_button_ready", |
| "solve_hold_obj", |
| "solve_hold_obj_absTimestep", |
| "solve_pickup", |
| "solve_pickup_bin", |
| "solve_push_to_target", |
| "solve_push_to_target_with_peg", |
| "solve_putdown_whenhold", |
| "solve_putonto_whenhold", |
| "solve_putonto_whenhold_binspecial", |
| "solve_swingonto", |
| "solve_swingonto_withDirection", |
| "solve_swingonto_whenhold", |
| "solve_strong_reset", |
| ] |
|
|
|
|
| def _load_vqa_options_module(): |
| """Inject a stub planner module and load vqa_options, returning (module, hold_calls list).""" |
| hold_calls = [] |
|
|
| planner_stub = types.ModuleType("robomme.robomme_env.utils.subgoal_planner_func") |
|
|
| def _noop(*args, **kwargs): |
| return None |
|
|
| for symbol in PLANNER_SYMBOLS: |
| setattr(planner_stub, symbol, _noop) |
|
|
| def _hold_spy(env, planner, absTimestep): |
| """Record the actual absTimestep passed for each 'remain static' call for assertions.""" |
| hold_calls.append(int(absTimestep)) |
| return None |
|
|
| planner_stub.solve_hold_obj_absTimestep = _hold_spy |
|
|
| robomme_pkg = types.ModuleType("robomme") |
| robomme_pkg.__path__ = [] |
| robomme_env_pkg = types.ModuleType("robomme.robomme_env") |
| robomme_env_pkg.__path__ = [] |
| utils_pkg = types.ModuleType("robomme.robomme_env.utils") |
| utils_pkg.__path__ = [] |
|
|
| logging_utils_pkg = types.ModuleType("robomme.logging_utils") |
| import logging |
| logging_utils_pkg.logger = logging.getLogger("dummy") |
|
|
| robomme_pkg.robomme_env = robomme_env_pkg |
| robomme_pkg.logging_utils = logging_utils_pkg |
| robomme_env_pkg.utils = utils_pkg |
| utils_pkg.subgoal_planner_func = planner_stub |
|
|
| injected = { |
| "robomme": robomme_pkg, |
| "robomme.robomme_env": robomme_env_pkg, |
| "robomme.robomme_env.utils": utils_pkg, |
| "robomme.robomme_env.utils.subgoal_planner_func": planner_stub, |
| "robomme.logging_utils": logging_utils_pkg, |
| } |
| previous = {key: sys.modules.get(key) for key in injected} |
| sys.modules.update(injected) |
|
|
| try: |
| repo_root = find_repo_root(__file__) |
| module_path = repo_root / "src" / "robomme" / "robomme_env" / "utils" / "vqa_options.py" |
| spec = importlib.util.spec_from_file_location("robomme.robomme_env.utils.vqa_options", module_path) |
| module = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(module) |
| finally: |
| for key, old_module in previous.items(): |
| if old_module is None: |
| sys.modules.pop(key, None) |
| else: |
| sys.modules[key] = old_module |
|
|
| return module, hold_calls |
|
|
|
|
| class _DummyEnv: |
| """Mock environment exposing only elapsed_steps for _options_stopcube.""" |
| def __init__(self, elapsed_steps=0): |
| self.elapsed_steps = elapsed_steps |
|
|
|
|
| class _DummyBase: |
| """Mock StopCube base providing steps_press and interval to compute final_target.""" |
| def __init__(self, steps_press, interval=30): |
| self.steps_press = steps_press |
| self.interval = interval |
| self.button = object() |
|
|
|
|
| def _get_remain_static_solver(options): |
| """Get the solve function whose label is 'remain static' from StopCube options.""" |
| for option in options: |
| if option.get("action") == "remain static": |
| return option["solve"] |
| raise AssertionError("Missing 'remain static' option") |
|
|
|
|
| def test_stopcube_remain_static_increment_and_saturation(): |
| """Test: when selecting 'remain static' multiple times, absTimestep increments 100->200->... and saturates at final_target.""" |
| module, hold_calls = _load_vqa_options_module() |
| env = _DummyEnv(elapsed_steps=0) |
| base = _DummyBase(steps_press=270, interval=30) |
| options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
| solve_remain_static = _get_remain_static_solver(options) |
|
|
| for _ in range(4): |
| solve_remain_static() |
|
|
| assert hold_calls == [100, 200, 240, 240] |
|
|
|
|
| def test_stopcube_remain_static_small_final_target(): |
| """Test: when final_target is small (e.g., 60), first call hits the cap and later calls stay at 60 (saturation).""" |
| module, hold_calls = _load_vqa_options_module() |
| env = _DummyEnv(elapsed_steps=0) |
| base = _DummyBase(steps_press=90, interval=30) |
| options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
| solve_remain_static = _get_remain_static_solver(options) |
|
|
| solve_remain_static() |
| solve_remain_static() |
|
|
| assert hold_calls == [60, 60] |
|
|
|
|
| def test_stopcube_remain_static_resets_when_elapsed_steps_go_back(): |
| """Test: when elapsed_steps is reduced (e.g., 150 back to 0), internal step state should reset and restart from 100.""" |
| module, hold_calls = _load_vqa_options_module() |
| env = _DummyEnv(elapsed_steps=0) |
| base = _DummyBase(steps_press=270, interval=30) |
| options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
| solve_remain_static = _get_remain_static_solver(options) |
|
|
| solve_remain_static() |
| env.elapsed_steps = 150 |
| solve_remain_static() |
| env.elapsed_steps = 0 |
| solve_remain_static() |
|
|
| assert hold_calls == [100, 200, 100] |
|
|
|
|
| def test_stopcube_option_label_order_stays_stable(): |
| """Test: StopCube option label order must stay fixed: prepare first, then remain static, then press button.""" |
| module, _ = _load_vqa_options_module() |
| env = _DummyEnv(elapsed_steps=0) |
| base = _DummyBase(steps_press=270, interval=30) |
| options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base) |
|
|
| actions = [option.get("action") for option in options] |
| assert actions == [ |
| "move to the top of the button to prepare", |
| "remain static", |
| "press button to stop the cube", |
| ] |
|
|