File size: 3,398 Bytes
06c11b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-
"""
test_eepose_error_handling.py
=============================
Heavy test: Verifies that in the `ee_pose` action space, sending an unreachable
target position to the environment properly triggers an error caught by the 
DemonstrationWrapper, returning info["status"] = "error" instead of crashing.

This test iterates over all 16 tasks defined in the benchmark.

Run with:
    cd /data/hongzefu/robomme_benchmark
    uv run python -m pytest tests/dataset/test_eepose_error_handling.py -v -s
"""

from __future__ import annotations

import sys
from pathlib import Path

import pytest

from tests._shared.repo_paths import find_repo_root

# Ensure the src directory is on the path
pytestmark = pytest.mark.dataset

_PROJECT_ROOT = find_repo_root(__file__)
sys.path.insert(0, str(_PROJECT_ROOT / "src"))

from robomme.env_record_wrapper import BenchmarkEnvBuilder
from robomme.robomme_env import *  # noqa: F401,F403

# All 16 benchmark tasks
ALL_TASKS = [
    "BinFill", 
    "PickXtimes", 
    "SwingXtimes", 
    "StopCube",
    "VideoUnmask", 
    "VideoUnmaskSwap", 
    "ButtonUnmask", 
    "ButtonUnmaskSwap",
    "PickHighlight", 
    "VideoRepick", 
    "VideoPlaceButton", 
    "VideoPlaceOrder",
    "MoveCube", 
    "InsertPeg", 
    "PatternLock", 
    "RouteStick"
]


@pytest.mark.parametrize("task_id", ALL_TASKS)
def test_eepose_unreachable_action_status_error(task_id: str) -> None:
    """
    Instantiate the environment for the given task with 'ee_pose' action space.
    Send a completely unreachable position to provoke an IK/physics failure.
    Assert that DemonstrationWrapper catches it and sets info["status"] = "error".
    """
    print(f"\n[{task_id}] Building environment for ee_pose error handling test...")
    
    env_builder = BenchmarkEnvBuilder(
        env_id=task_id,
        dataset="train",
        action_space="ee_pose",
        gui_render=False,
    )
    
    # We don't need all the observations for this test, keeping it lightweight
    env = env_builder.make_env_for_episode(
        episode_idx=0,
        include_maniskill_obs=False,
    )
    
    try:
        # 1. Reset the environment
        env.reset()
        
        # 2. Define an unreachable action
        # The ee_pose space is (x, y, z, r, p, y, gripper)
        # Coordinates (100.0, 100.0, 100.0) are far out of reach for the robot.
        unreachable_action = [100.0, 100.0, 100.0, 0.0, 0.0, 0.0, 1.0]

        # 3. Step the environment
        obs, reward, terminated, truncated, info = env.step(unreachable_action)

        # 4. Assertions
        assert info is not None, "info dict should not be None"
        
        status = info.get("status")
        assert status == "error", f"Expected info['status'] == 'error', got {status!r}"
        
        error_msg = info.get("error_message")
        assert error_msg is not None, "Expected 'error_message' to be present in info"
        assert isinstance(error_msg, str) and len(error_msg) > 0, (
            f"Expected 'error_message' to be a non-empty string, got {error_msg!r}"
        )

        print(f"[{task_id}] ✓ Successfully caught error: {error_msg}")

    finally:
        env.close()


def main() -> None:
    print("Run this test with pytest:")
    print("uv run python -m pytest tests/dataset/test_eepose_error_handling.py -v -s")
    sys.exit(2)


if __name__ == "__main__":
    main()