| """Comprehensive tests for Nova-Sim API. |
| |
| Run these tests with Nova-Sim server running: |
| python nova-sim/mujoco_server.py |
| |
| Then in another terminal: |
| pytest nova-sim/tests/test_api.py -v |
| """ |
|
|
| import json |
| import time |
| import pytest |
| import requests |
| from websockets.sync.client import connect |
|
|
|
|
| BASE_URL = "http://localhost:3004/nova-sim/api/v1" |
| WS_URL = "ws://localhost:3004/nova-sim/api/v1/ws" |
|
|
|
|
| @pytest.fixture(scope="module") |
| def check_server(): |
| """Check if the server is running before tests.""" |
| try: |
| response = requests.get(f"{BASE_URL}/metadata", timeout=2) |
| response.raise_for_status() |
| except requests.RequestException: |
| pytest.skip("Nova-Sim server is not running at localhost:3004") |
|
|
|
|
| class TestHTTPEndpoints: |
| """Test HTTP endpoints.""" |
|
|
| def test_metadata_endpoint(self, check_server): |
| """Test /metadata endpoint returns expected structure.""" |
| response = requests.get(f"{BASE_URL}/metadata") |
| assert response.status_code == 200 |
|
|
| data = response.json() |
| assert "robots" in data |
| assert "actions" in data |
| |
| assert "camera_feeds" not in data |
| assert "overlay_camera_presets" not in data |
| assert "current_selection" not in data |
| assert "default_scene" not in data |
|
|
| def test_env_endpoint(self, check_server): |
| """Test GET /env endpoint returns static environment info.""" |
| response = requests.get(f"{BASE_URL}/env") |
| assert response.status_code == 200 |
|
|
| data = response.json() |
| |
| assert "robot" in data |
| assert "scene" in data |
| assert "has_gripper" in data |
| assert "action_space" in data |
| assert "observation_space" in data |
| assert "camera_feeds" in data |
|
|
| |
| assert isinstance(data["camera_feeds"], list) |
| if data["camera_feeds"]: |
| feed = data["camera_feeds"][0] |
| assert "name" in feed |
| assert "label" in feed |
| assert "url" in feed |
|
|
| def test_video_feed_endpoint(self, check_server): |
| """Test /video_feed endpoint is accessible.""" |
| response = requests.get(f"{BASE_URL}/video_feed", stream=True, timeout=5) |
| assert response.status_code == 200 |
| assert "multipart/x-mixed-replace" in response.headers.get("Content-Type", "") |
|
|
|
|
| class TestWebSocketMessages: |
| """Test WebSocket message functionality.""" |
|
|
| def test_websocket_connection(self, check_server): |
| """Test basic WebSocket connection.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "client_identity", |
| "data": {"client_id": "test-client"} |
| })) |
|
|
| |
| received = False |
| deadline = time.time() + 5 |
| while time.time() < deadline: |
| try: |
| msg = ws.recv(timeout=1) |
| data = json.loads(msg) |
| if data.get("type") == "state": |
| received = True |
| break |
| except TimeoutError: |
| continue |
|
|
| assert received, "Did not receive state message within timeout" |
|
|
| def test_reset_message(self, check_server): |
| """Test reset WebSocket message.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "reset", |
| "data": {"seed": 42} |
| })) |
| time.sleep(0.5) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
|
|
| def test_switch_robot_message(self, check_server): |
| """Test switch_robot WebSocket message.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| response = requests.get(f"{BASE_URL}/env") |
| data = response.json() |
| |
| assert data["robot"] in ["ur5", "ur5_t_push"] |
|
|
| def test_home_message(self, check_server): |
| """Test home WebSocket message for UR5 - verify robot actually moves.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
|
|
| |
| initial_state = None |
| deadline = time.time() + 5 |
| while time.time() < deadline: |
| try: |
| msg = ws.recv(timeout=1) |
| data = json.loads(msg) |
| if data.get("type") == "state" and data.get("data", {}).get("robot") == "ur5": |
| initial_state = data.get("data", {}) |
| break |
| except Exception: |
| continue |
|
|
| assert initial_state is not None, "Did not receive UR5 state after switching robots" |
|
|
| |
| home_pose = initial_state.get("observation", {}).get("home_pose") or initial_state.get("home_pose") |
|
|
| |
| if home_pose is None: |
| env_resp = requests.get(f"{BASE_URL}/env", timeout=5) |
| env_resp.raise_for_status() |
| home_pose = env_resp.json().get("home_pose") |
|
|
| assert home_pose is not None, "Could not find home_pose in state or /env" |
|
|
| |
| initial_obs = initial_state.get("observation", {}) |
| initial_joints = initial_obs.get("joint_positions") or initial_obs.get("joint_targets") |
| assert initial_joints is not None, "Could not find joint_positions in state" |
| print(f"\nInitial joints: {initial_joints}") |
| print(f"Home pose: {home_pose}") |
|
|
| |
| homing_resp = requests.get( |
| f"{BASE_URL}/homing", |
| params={"timeout_s": 10, "tolerance": 0.05, "poll_interval_s": 0.1}, |
| timeout=15, |
| ) |
| homing_resp.raise_for_status() |
|
|
| |
| final_state = None |
| deadline = time.time() + 2 |
| while time.time() < deadline: |
| try: |
| msg = ws.recv(timeout=0.5) |
| data = json.loads(msg) |
| if data.get("type") == "state": |
| final_state = data.get("data", {}) |
| except Exception: |
| continue |
|
|
| assert final_state is not None, "Did not receive final state" |
|
|
| |
| final_obs = final_state.get("observation", {}) |
| final_joints = final_obs.get("joint_positions") or final_obs.get("joint_targets") |
| assert final_joints is not None, "Could not find joint_positions in final state" |
| print(f"Final joints: {final_joints}") |
|
|
| |
| |
| initial_distance = sum(abs(initial_joints[i] - home_pose[i]) for i in range(min(len(initial_joints), len(home_pose)))) |
| final_distance = sum(abs(final_joints[i] - home_pose[i]) for i in range(min(len(final_joints), len(home_pose)))) |
|
|
| print(f"Initial distance from home: {initial_distance:.4f}") |
| print(f"Final distance from home: {final_distance:.4f}") |
|
|
| |
| assert final_distance < initial_distance, f"Robot did not move toward home: initial={initial_distance:.4f}, final={final_distance:.4f}" |
|
|
| |
| tolerance = 0.01 |
| assert final_distance < tolerance, f"Robot did not reach home position: distance={final_distance:.4f}, tolerance={tolerance}" |
|
|
| def test_teleop_action_message(self, check_server): |
| """Test teleop_action WebSocket message with vx/vy/vz format.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "teleop_action", |
| "data": {"vx": 0.01, "vy": 0.0, "vz": 0.0} |
| })) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action") |
|
|
| |
| assert teleop is not None |
| assert isinstance(teleop, dict) |
| assert "vx" in teleop |
| assert "vy" in teleop |
| assert "vz" in teleop |
|
|
| def test_action_message(self, check_server): |
| """Test action WebSocket message for locomotion robots.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "spot"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} |
| })) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action") |
|
|
| |
| assert teleop is not None |
| assert isinstance(teleop, dict) |
|
|
|
|
| class TestStateStructure: |
| """Test WebSocket state message structure.""" |
|
|
| def test_state_has_no_static_fields(self, check_server): |
| """Verify state stream doesn't contain robot/scene/has_gripper.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| deadline = time.time() + 5 |
| state_data = None |
| while time.time() < deadline: |
| try: |
| msg = ws.recv(timeout=1) |
| data = json.loads(msg) |
| if data.get("type") == "state": |
| state_data = data.get("data", {}) |
| break |
| except TimeoutError: |
| continue |
|
|
| assert state_data is not None, "No state message received" |
|
|
| |
| assert "robot" not in state_data, "robot should not be in state stream" |
| assert "scene" not in state_data, "scene should not be in state stream" |
| assert "has_gripper" not in state_data, "has_gripper should not be in state stream" |
|
|
| def test_state_always_has_teleop_action(self, check_server): |
| """Verify state always contains teleop_action (never null).""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| if data.get("type") == "state": |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action") |
|
|
| |
| assert teleop is not None, "teleop_action should never be null" |
| assert isinstance(teleop, dict), "teleop_action should be a dict" |
|
|
| |
| assert "vx" in teleop |
| assert "vy" in teleop |
| assert "vyaw" in teleop |
|
|
| def test_ur5_state_structure(self, check_server): |
| """Test UR5 state message structure.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
| state_data = data.get("data", {}) |
|
|
| |
| assert "observation" in state_data |
| obs = state_data["observation"] |
| assert "end_effector" in obs |
| assert "ee_orientation" in obs |
| assert "ee_target" in obs |
| assert "ee_target_orientation" in obs |
| assert "gripper" in obs |
| assert "joint_positions" in obs |
| assert "joint_targets" in obs |
| assert "control_mode" in state_data |
| assert "steps" in state_data |
| assert "reward" in state_data |
| assert "teleop_action" in state_data |
| assert "connected_clients" in state_data |
| |
| assert "target" not in state_data |
| assert "target_orientation" not in state_data |
| assert "use_orientation" not in state_data |
|
|
| def test_locomotion_state_structure(self, check_server): |
| """Test locomotion robot state message structure.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "spot"} |
| })) |
| time.sleep(6) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
| state_data = data.get("data", {}) |
|
|
| |
| assert "observation" in state_data |
| obs = state_data["observation"] |
| assert "position" in obs |
| assert "orientation" in obs |
| assert "steps" in state_data |
| assert "reward" in state_data |
| assert "teleop_action" in state_data |
| assert "connected_clients" in state_data |
| |
| assert "base_height" not in state_data |
| assert "upright" not in state_data |
| assert "vx" not in state_data |
| assert "vy" not in state_data |
| assert "vyaw" not in state_data |
|
|
|
|
| class TestBackwardCompatibility: |
| """Test backward compatibility of old API formats.""" |
|
|
| def test_teleop_old_format_accepted(self, check_server): |
| """Test that old dx/dy/dz format is still accepted.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "teleop_action", |
| "data": {"dx": 0.01, "dy": 0.0, "dz": 0.0} |
| })) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
|
|
| def test_old_command_type_accepted(self, check_server): |
| """Test that old 'command' message type is still accepted.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "spot"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "command", |
| "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} |
| })) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
|
|
| def test_old_teleop_command_type_accepted(self, check_server): |
| """Test that old 'teleop_command' message type is still accepted.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "teleop_command", |
| "data": {"vx": 0.01, "vy": 0.0, "vz": 0.0} |
| })) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
|
|
|
|
| class TestRobotMovement: |
| """Test that robots actually move when commands are sent.""" |
|
|
| def test_ur5_teleop_moves_robot(self, check_server): |
| """Test UR5 responds to teleop commands and position changes.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
| state_data = data.get("data", {}) |
| obs = state_data.get("observation", {}) |
| initial_ee = obs.get("end_effector", {}) |
| initial_x = initial_ee.get("x", 0) |
|
|
| |
| for _ in range(5): |
| ws.send(json.dumps({ |
| "type": "teleop_action", |
| "data": {"vx": 0.02, "vy": 0.0, "vz": 0.0} |
| })) |
| time.sleep(0.1) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| obs = state_data.get("observation", {}) |
| new_ee = obs.get("end_effector", {}) |
| new_x = new_ee.get("x", 0) |
|
|
| |
| |
| target = obs.get("ee_target", {}) |
| target_x = target.get("x", 0) |
| assert target_x > initial_x, f"Expected target X to increase from {initial_x}, got {target_x}" |
|
|
| def test_spot_action_moves_robot(self, check_server): |
| """Test Spot responds to action messages and position changes.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "spot"} |
| })) |
| time.sleep(4) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
| state_data = data.get("data", {}) |
| obs = state_data.get("observation", {}) |
| initial_pos = obs.get("position", {}) |
| initial_x = initial_pos.get("x", 0) |
|
|
| |
| for _ in range(10): |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} |
| })) |
| time.sleep(0.1) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| obs = state_data.get("observation", {}) |
| new_pos = obs.get("position", {}) |
| new_x = new_pos.get("x", 0) |
|
|
| |
| |
| assert abs(new_x - initial_x) > 0.001, f"Expected some X movement from {initial_x}, got {new_x}" |
|
|
| def test_g1_action_moves_robot(self, check_server): |
| """Test G1 responds to action messages and position changes.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "g1"} |
| })) |
| time.sleep(4) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
| state_data = data.get("data", {}) |
| obs = state_data.get("observation", {}) |
| initial_pos = obs.get("position", {}) |
| initial_x = initial_pos.get("x", 0) |
|
|
| |
| for _ in range(10): |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} |
| })) |
| time.sleep(0.1) |
|
|
| |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| obs = state_data.get("observation", {}) |
| new_pos = obs.get("position", {}) |
| new_x = new_pos.get("x", 0) |
|
|
| |
| assert abs(new_x - initial_x) > 0.001, f"Expected some X movement from {initial_x}, got {new_x}" |
|
|
| def test_ur5_gripper_command(self, check_server): |
| """Test UR5 gripper command is accepted.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "gripper", |
| "data": {"action": "close"} |
| })) |
|
|
| |
| time.sleep(0.2) |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
|
|
| |
| ws.send(json.dumps({ |
| "type": "gripper", |
| "data": {"action": "open"} |
| })) |
|
|
| |
| time.sleep(0.2) |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| assert data.get("type") == "state" |
|
|
| def test_ur5_action_with_joint_velocity(self, check_server): |
| """Test UR5 responds to action messages with joint velocities.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": { |
| "vx": 0.0, "vy": 0.0, "vz": 0.0, |
| "vrx": 0.0, "vry": 0.0, "vrz": 0.0, |
| "j1": 0.0, "j2": 0.0, "j3": 0.5, "j4": 0.0, "j5": 0.0, "j6": 0.0, |
| "gripper": 0.0 |
| } |
| })) |
|
|
| |
| time.sleep(0.3) |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| |
| assert abs(teleop.get("j3", 0) - 0.5) < 0.01, f"Expected j3=0.5, got {teleop.get('j3')}" |
|
|
| |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": { |
| "vx": 0.0, "vy": 0.0, "vz": 0.0, |
| "vrx": 0.0, "vry": 0.0, "vrz": 0.0, |
| "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, |
| "gripper": 0.0 |
| } |
| })) |
|
|
| time.sleep(0.3) |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| |
| assert abs(teleop.get("j3", 1)) < 0.01 |
|
|
| def test_ur5_joint_jog_teleop(self, check_server): |
| """Test joint jogging with start_jog updates teleop_action with joint velocities.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "set_nova_mode", |
| "data": { |
| "state_streaming": False, |
| "ik": False |
| } |
| })) |
| time.sleep(0.2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "start_jog", |
| "data": { |
| "jog_type": "joint", |
| "params": { |
| "joint": 3, |
| "direction": "+", |
| "velocity": 0.5 |
| } |
| } |
| })) |
|
|
| |
| |
| time.sleep(0.3) |
| for i in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| teleop = data.get("data", {}).get("teleop_action", {}) |
| obs = data.get("data", {}).get("observation", {}) |
| has_ee = 'end_effector' in obs |
| print(f"\nDEBUG message {i+1}: robot={'UR5' if has_ee else 'LOCO'}, teleop j3 = {teleop.get('j3', 'MISSING')}") |
|
|
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| |
| print(f"\nDEBUG: Final teleop_action = {teleop}") |
|
|
| |
| assert teleop.get("j3", 0) == 0.5, f"Expected j3=0.5, got {teleop.get('j3')}, full teleop={teleop}" |
| assert teleop.get("j1", 1) == 0.0 |
| assert teleop.get("j2", 1) == 0.0 |
| |
| assert teleop.get("vx", 1) == 0.0 |
| assert teleop.get("vy", 1) == 0.0 |
| assert teleop.get("vz", 1) == 0.0 |
|
|
| |
| ws.send(json.dumps({"type": "stop_jog"})) |
| |
| time.sleep(0.3) |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| assert teleop.get("j3", 1) == 0.0 |
| assert teleop.get("vx", 1) == 0.0 |
|
|
| def test_ur5_action_with_cartesian_velocity(self, check_server): |
| """Test UR5 responds to action messages with Cartesian velocities.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": { |
| "vx": 0.05, "vy": 0.0, "vz": 0.0, |
| "vrx": 0.0, "vry": 0.0, "vrz": 0.0, |
| "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, |
| "gripper": 0.0 |
| } |
| })) |
|
|
| |
| time.sleep(0.3) |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| |
| assert abs(teleop.get("vx", 0) - 0.05) < 0.01, f"Expected vx=0.05, got {teleop.get('vx')}" |
|
|
| |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": { |
| "vx": 0.0, "vy": 0.0, "vz": 0.0, |
| "vrx": 0.0, "vry": 0.0, "vrz": -0.3, |
| "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, |
| "gripper": 0.0 |
| } |
| })) |
|
|
| |
| time.sleep(0.3) |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| |
| assert abs(teleop.get("vrz", 0) + 0.3) < 0.01, f"Expected vrz=-0.3, got {teleop.get('vrz')}" |
|
|
| |
| ws.send(json.dumps({ |
| "type": "action", |
| "data": { |
| "vx": 0.0, "vy": 0.0, "vz": 0.0, |
| "vrx": 0.0, "vry": 0.0, "vrz": 0.0, |
| "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, |
| "gripper": 0.0 |
| } |
| })) |
|
|
| def test_ur5_cartesian_jog_teleop(self, check_server): |
| """Test Cartesian jogging with start_jog updates teleop_action with Cartesian velocities.""" |
| with connect(WS_URL, timeout=10) as ws: |
| |
| ws.send(json.dumps({ |
| "type": "switch_robot", |
| "data": {"robot": "ur5"} |
| })) |
| time.sleep(2) |
|
|
| |
| ws.send(json.dumps({"type": "reset"})) |
| time.sleep(0.5) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "set_nova_mode", |
| "data": { |
| "state_streaming": False, |
| "ik": False |
| } |
| })) |
| time.sleep(0.2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "start_jog", |
| "data": { |
| "jog_type": "cartesian_translation", |
| "params": { |
| "axis": "x", |
| "direction": "+", |
| "velocity": 50.0, |
| "tcp_id": "Flange", |
| "coord_system_id": "world" |
| } |
| } |
| })) |
|
|
| |
| |
| time.sleep(0.3) |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| |
| assert abs(teleop.get("vx", 0) - 0.05) < 0.01, f"Expected vx near 0.05, got {teleop.get('vx')}" |
| assert teleop.get("vy", 1) == 0.0 |
| assert teleop.get("vz", 1) == 0.0 |
| |
| assert teleop.get("j1", 1) == 0.0 |
|
|
| |
| ws.send(json.dumps({"type": "stop_jog"})) |
| |
| time.sleep(0.3) |
| for _ in range(3): |
| ws.recv(timeout=2) |
|
|
| |
| ws.send(json.dumps({ |
| "type": "start_jog", |
| "data": { |
| "jog_type": "cartesian_rotation", |
| "params": { |
| "axis": "z", |
| "direction": "-", |
| "velocity": 0.3, |
| "tcp_id": "Flange", |
| "coord_system_id": "world" |
| } |
| } |
| })) |
|
|
| |
| |
| time.sleep(0.3) |
| for _ in range(3): |
| msg = ws.recv(timeout=2) |
| data = json.loads(msg) |
| state_data = data.get("data", {}) |
| teleop = state_data.get("teleop_action", {}) |
|
|
| |
| assert teleop.get("vrz", 0) == -0.3, f"Expected vrz=-0.3, got {teleop.get('vrz')}" |
| assert teleop.get("vrx", 1) == 0.0 |
| assert teleop.get("vry", 1) == 0.0 |
| assert teleop.get("vx", 1) == 0.0 |
|
|
| |
| ws.send(json.dumps({"type": "stop_jog"})) |
|
|
|
|
| if __name__ == "__main__": |
| |
| pytest.main([__file__, "-v"]) |
|
|