| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import os |
| import platform |
| from functools import wraps |
|
|
| import pytest |
| import torch |
|
|
| from lerobot import available_cameras, available_motors, available_robots |
| from lerobot.utils.device_utils import auto_select_torch_device |
| from lerobot.utils.import_utils import is_package_available |
|
|
| DEVICE = os.environ.get("LEROBOT_TEST_DEVICE", str(auto_select_torch_device())) |
|
|
| TEST_ROBOT_TYPES = [] |
| for robot_type in available_robots: |
| TEST_ROBOT_TYPES += [(robot_type, True), (robot_type, False)] |
|
|
| TEST_CAMERA_TYPES = [] |
| for camera_type in available_cameras: |
| TEST_CAMERA_TYPES += [(camera_type, True), (camera_type, False)] |
|
|
| TEST_MOTOR_TYPES = [] |
| for motor_type in available_motors: |
| TEST_MOTOR_TYPES += [(motor_type, True), (motor_type, False)] |
|
|
| |
| OPENCV_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_OPENCV_CAMERA_INDEX", 0)) |
| INTELREALSENSE_SERIAL_NUMBER = int(os.environ.get("LEROBOT_TEST_INTELREALSENSE_SERIAL_NUMBER", 128422271614)) |
|
|
| DYNAMIXEL_PORT = os.environ.get("LEROBOT_TEST_DYNAMIXEL_PORT", "/dev/tty.usbmodem575E0032081") |
| DYNAMIXEL_MOTORS = { |
| "shoulder_pan": [1, "xl430-w250"], |
| "shoulder_lift": [2, "xl430-w250"], |
| "elbow_flex": [3, "xl330-m288"], |
| "wrist_flex": [4, "xl330-m288"], |
| "wrist_roll": [5, "xl330-m288"], |
| "gripper": [6, "xl330-m288"], |
| } |
|
|
| FEETECH_PORT = os.environ.get("LEROBOT_TEST_FEETECH_PORT", "/dev/tty.usbmodem585A0080971") |
| FEETECH_MOTORS = { |
| "shoulder_pan": [1, "sts3215"], |
| "shoulder_lift": [2, "sts3215"], |
| "elbow_flex": [3, "sts3215"], |
| "wrist_flex": [4, "sts3215"], |
| "wrist_roll": [5, "sts3215"], |
| "gripper": [6, "sts3215"], |
| } |
|
|
|
|
| def require_x86_64_kernel(func): |
| """ |
| Decorator that skips the test if plateform device is not an x86_64 cpu. |
| """ |
| from functools import wraps |
|
|
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| if platform.machine() != "x86_64": |
| pytest.skip("requires x86_64 plateform") |
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| def require_cpu(func): |
| """ |
| Decorator that skips the test if device is not cpu. |
| """ |
| from functools import wraps |
|
|
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| if DEVICE != "cpu": |
| pytest.skip("requires cpu") |
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| def require_cuda(func): |
| """ |
| Decorator that skips the test if cuda is not available. |
| """ |
| from functools import wraps |
|
|
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| if not torch.cuda.is_available(): |
| pytest.skip("requires cuda") |
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| def require_hf_token(func): |
| """ |
| Decorator that skips the test if no Hugging Face Hub token is available. |
| """ |
|
|
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| from huggingface_hub import get_token |
|
|
| if get_token() is None: |
| pytest.skip("requires HF token for gated model access") |
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| def require_env(func): |
| """ |
| Decorator that skips the test if the required environment package is not installed. |
| As it need 'env_name' in args, it also checks whether it is provided as an argument. |
| If 'env_name' is None, this check is skipped. |
| """ |
|
|
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| |
| arg_names = func.__code__.co_varnames[: func.__code__.co_argcount] |
| if "env_name" in arg_names: |
| |
| index = arg_names.index("env_name") |
| env_name = args[index] if len(args) > index else kwargs.get("env_name") |
| else: |
| raise ValueError("Function does not have 'env_name' as an argument.") |
|
|
| |
| package_name = f"gym_{env_name}" |
| if env_name is not None and not is_package_available(package_name): |
| pytest.skip(f"gym-{env_name} not installed") |
|
|
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| def require_package_arg(func): |
| """ |
| Decorator that skips the test if the required package is not installed. |
| This is similar to `require_env` but more general in that it can check any package (not just environments). |
| As it need 'required_packages' in args, it also checks whether it is provided as an argument. |
| If 'required_packages' is None, this check is skipped. |
| """ |
|
|
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| |
| arg_names = func.__code__.co_varnames[: func.__code__.co_argcount] |
| if "required_packages" in arg_names: |
| |
| index = arg_names.index("required_packages") |
| required_packages = args[index] if len(args) > index else kwargs.get("required_packages") |
| else: |
| raise ValueError("Function does not have 'required_packages' as an argument.") |
|
|
| if required_packages is None: |
| return func(*args, **kwargs) |
|
|
| |
| for package in required_packages: |
| if not is_package_available(package): |
| pytest.skip(f"{package} not installed") |
|
|
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| def require_package(package_name, import_name=None): |
| """ |
| Decorator that skips the test if the specified package is not installed. |
| """ |
|
|
| def decorator(func): |
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| if not is_package_available(pkg_name=package_name, import_name=import_name): |
| pytest.skip(f"{package_name} not installed") |
| return func(*args, **kwargs) |
|
|
| return wrapper |
|
|
| return decorator |
|
|