RoboMME / tests /lightweight /test_StopcubeIncrement.py
HongzeFu's picture
HF Space: code-only (no binary assets)
06c11b0
"""
Unit tests for the 'remain static' option in the StopCube task.
Test increment and saturation logic of the 'remain static' option in vqa_options._options_stopcube:
- Each time 'remain static' is selected, solve_hold_obj_absTimestep(absTimestep) is called internally;
- absTimestep increases as 100 -> 200 -> ... and does not exceed final_target (computed from steps_press and interval);
- If env.elapsed_steps goes backward, internal state should reset and start from 100 next time.
"""
from pathlib import Path
import importlib.util
import sys
import types
from tests._shared.repo_paths import find_repo_root
# Keep symbols consistent with planner module dependencies in vqa_options, used to build stubs
PLANNER_SYMBOLS = [
"grasp_and_lift_peg_side",
"insert_peg",
"solve_button",
"solve_button_ready",
"solve_hold_obj",
"solve_hold_obj_absTimestep",
"solve_pickup",
"solve_pickup_bin",
"solve_push_to_target",
"solve_push_to_target_with_peg",
"solve_putdown_whenhold",
"solve_putonto_whenhold",
"solve_putonto_whenhold_binspecial",
"solve_swingonto",
"solve_swingonto_withDirection",
"solve_swingonto_whenhold",
"solve_strong_reset",
]
def _load_vqa_options_module():
"""Inject a stub planner module and load vqa_options, returning (module, hold_calls list)."""
hold_calls = []
planner_stub = types.ModuleType("robomme.robomme_env.utils.subgoal_planner_func")
def _noop(*args, **kwargs):
return None
for symbol in PLANNER_SYMBOLS:
setattr(planner_stub, symbol, _noop)
def _hold_spy(env, planner, absTimestep):
"""Record the actual absTimestep passed for each 'remain static' call for assertions."""
hold_calls.append(int(absTimestep))
return None
planner_stub.solve_hold_obj_absTimestep = _hold_spy
robomme_pkg = types.ModuleType("robomme")
robomme_pkg.__path__ = []
robomme_env_pkg = types.ModuleType("robomme.robomme_env")
robomme_env_pkg.__path__ = []
utils_pkg = types.ModuleType("robomme.robomme_env.utils")
utils_pkg.__path__ = []
logging_utils_pkg = types.ModuleType("robomme.logging_utils")
import logging
logging_utils_pkg.logger = logging.getLogger("dummy")
robomme_pkg.robomme_env = robomme_env_pkg
robomme_pkg.logging_utils = logging_utils_pkg
robomme_env_pkg.utils = utils_pkg
utils_pkg.subgoal_planner_func = planner_stub
injected = {
"robomme": robomme_pkg,
"robomme.robomme_env": robomme_env_pkg,
"robomme.robomme_env.utils": utils_pkg,
"robomme.robomme_env.utils.subgoal_planner_func": planner_stub,
"robomme.logging_utils": logging_utils_pkg,
}
previous = {key: sys.modules.get(key) for key in injected}
sys.modules.update(injected)
try:
repo_root = find_repo_root(__file__)
module_path = repo_root / "src" / "robomme" / "robomme_env" / "utils" / "vqa_options.py"
spec = importlib.util.spec_from_file_location("robomme.robomme_env.utils.vqa_options", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
finally:
for key, old_module in previous.items():
if old_module is None:
sys.modules.pop(key, None)
else:
sys.modules[key] = old_module
return module, hold_calls
class _DummyEnv:
"""Mock environment exposing only elapsed_steps for _options_stopcube."""
def __init__(self, elapsed_steps=0):
self.elapsed_steps = elapsed_steps
class _DummyBase:
"""Mock StopCube base providing steps_press and interval to compute final_target."""
def __init__(self, steps_press, interval=30):
self.steps_press = steps_press
self.interval = interval
self.button = object()
def _get_remain_static_solver(options):
"""Get the solve function whose label is 'remain static' from StopCube options."""
for option in options:
if option.get("action") == "remain static":
return option["solve"]
raise AssertionError("Missing 'remain static' option")
def test_stopcube_remain_static_increment_and_saturation():
"""Test: when selecting 'remain static' multiple times, absTimestep increments 100->200->... and saturates at final_target."""
module, hold_calls = _load_vqa_options_module()
env = _DummyEnv(elapsed_steps=0)
base = _DummyBase(steps_press=270, interval=30) # final_target = 240
options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base)
solve_remain_static = _get_remain_static_solver(options)
for _ in range(4):
solve_remain_static()
assert hold_calls == [100, 200, 240, 240]
def test_stopcube_remain_static_small_final_target():
"""Test: when final_target is small (e.g., 60), first call hits the cap and later calls stay at 60 (saturation)."""
module, hold_calls = _load_vqa_options_module()
env = _DummyEnv(elapsed_steps=0)
base = _DummyBase(steps_press=90, interval=30) # final_target = 60
options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base)
solve_remain_static = _get_remain_static_solver(options)
solve_remain_static()
solve_remain_static()
assert hold_calls == [60, 60]
def test_stopcube_remain_static_resets_when_elapsed_steps_go_back():
"""Test: when elapsed_steps is reduced (e.g., 150 back to 0), internal step state should reset and restart from 100."""
module, hold_calls = _load_vqa_options_module()
env = _DummyEnv(elapsed_steps=0)
base = _DummyBase(steps_press=270, interval=30) # final_target = 240
options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base)
solve_remain_static = _get_remain_static_solver(options)
solve_remain_static() # 100
env.elapsed_steps = 150
solve_remain_static() # 200
env.elapsed_steps = 0
solve_remain_static() # reset -> 100
assert hold_calls == [100, 200, 100]
def test_stopcube_option_label_order_stays_stable():
"""Test: StopCube option label order must stay fixed: prepare first, then remain static, then press button."""
module, _ = _load_vqa_options_module()
env = _DummyEnv(elapsed_steps=0)
base = _DummyBase(steps_press=270, interval=30)
options = module._options_stopcube(env, planner=None, require_target=lambda: None, base=base)
actions = [option.get("action") for option in options]
assert actions == [
"move to the top of the button to prepare",
"remain static",
"press button to stop the cube",
]