nova-sim / tests /test_api.py
Georg
Implement blocking homing endpoint and refactor homing logic in mujoco_server.py
e9e2294
"""Comprehensive tests for Nova-Sim API.
Run these tests with Nova-Sim server running:
python nova-sim/mujoco_server.py
Then in another terminal:
pytest nova-sim/tests/test_api.py -v
"""
import json
import time
import pytest
import requests
from websockets.sync.client import connect
BASE_URL = "http://localhost:3004/nova-sim/api/v1"
WS_URL = "ws://localhost:3004/nova-sim/api/v1/ws"
@pytest.fixture(scope="module")
def check_server():
"""Check if the server is running before tests."""
try:
response = requests.get(f"{BASE_URL}/metadata", timeout=2)
response.raise_for_status()
except requests.RequestException:
pytest.skip("Nova-Sim server is not running at localhost:3004")
class TestHTTPEndpoints:
"""Test HTTP endpoints."""
def test_metadata_endpoint(self, check_server):
"""Test /metadata endpoint returns expected structure."""
response = requests.get(f"{BASE_URL}/metadata")
assert response.status_code == 200
data = response.json()
assert "robots" in data
assert "actions" in data
# camera_feeds, current_selection moved to /env endpoint
assert "camera_feeds" not in data
assert "overlay_camera_presets" not in data
assert "current_selection" not in data
assert "default_scene" not in data
def test_env_endpoint(self, check_server):
"""Test GET /env endpoint returns static environment info."""
response = requests.get(f"{BASE_URL}/env")
assert response.status_code == 200
data = response.json()
# Check required fields
assert "robot" in data
assert "scene" in data
assert "has_gripper" in data
assert "action_space" in data
assert "observation_space" in data
assert "camera_feeds" in data
# Validate camera_feeds structure
assert isinstance(data["camera_feeds"], list)
if data["camera_feeds"]:
feed = data["camera_feeds"][0]
assert "name" in feed
assert "label" in feed
assert "url" in feed
def test_video_feed_endpoint(self, check_server):
"""Test /video_feed endpoint is accessible."""
response = requests.get(f"{BASE_URL}/video_feed", stream=True, timeout=5)
assert response.status_code == 200
assert "multipart/x-mixed-replace" in response.headers.get("Content-Type", "")
class TestWebSocketMessages:
"""Test WebSocket message functionality."""
def test_websocket_connection(self, check_server):
"""Test basic WebSocket connection."""
with connect(WS_URL, timeout=10) as ws:
# Send client identity (old trainer_identity still works for backward compat)
ws.send(json.dumps({
"type": "client_identity",
"data": {"client_id": "test-client"}
}))
# Should receive state messages
received = False
deadline = time.time() + 5
while time.time() < deadline:
try:
msg = ws.recv(timeout=1)
data = json.loads(msg)
if data.get("type") == "state":
received = True
break
except TimeoutError:
continue
assert received, "Did not receive state message within timeout"
def test_reset_message(self, check_server):
"""Test reset WebSocket message."""
with connect(WS_URL, timeout=10) as ws:
# Send reset without seed
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Send reset with seed
ws.send(json.dumps({
"type": "reset",
"data": {"seed": 42}
}))
time.sleep(0.5)
# Should receive state messages after reset
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
def test_switch_robot_message(self, check_server):
"""Test switch_robot WebSocket message."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5 (may default to ur5_t_push)
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2) # Wait for robot switch
# Check /env to verify switch
response = requests.get(f"{BASE_URL}/env")
data = response.json()
# Accept either ur5 or ur5_t_push (default may vary)
assert data["robot"] in ["ur5", "ur5_t_push"]
def test_home_message(self, check_server):
"""Test home WebSocket message for UR5 - verify robot actually moves."""
with connect(WS_URL, timeout=10) as ws:
# Ensure we're on UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
# Wait for robot switch and get initial state from state stream
initial_state = None
deadline = time.time() + 5
while time.time() < deadline:
try:
msg = ws.recv(timeout=1)
data = json.loads(msg)
if data.get("type") == "state" and data.get("data", {}).get("robot") == "ur5":
initial_state = data.get("data", {})
break
except Exception:
continue
assert initial_state is not None, "Did not receive UR5 state after switching robots"
# Get home pose from state (should be in observation or root level)
home_pose = initial_state.get("observation", {}).get("home_pose") or initial_state.get("home_pose")
# If not in state, get from /env as fallback (only once)
if home_pose is None:
env_resp = requests.get(f"{BASE_URL}/env", timeout=5)
env_resp.raise_for_status()
home_pose = env_resp.json().get("home_pose")
assert home_pose is not None, "Could not find home_pose in state or /env"
# Get initial joint positions from observation
initial_obs = initial_state.get("observation", {})
initial_joints = initial_obs.get("joint_positions") or initial_obs.get("joint_targets")
assert initial_joints is not None, "Could not find joint_positions in state"
print(f"\nInitial joints: {initial_joints}")
print(f"Home pose: {home_pose}")
# Call blocking homing endpoint
homing_resp = requests.get(
f"{BASE_URL}/homing",
params={"timeout_s": 10, "tolerance": 0.05, "poll_interval_s": 0.1},
timeout=15,
)
homing_resp.raise_for_status()
# Wait for final state messages
final_state = None
deadline = time.time() + 2
while time.time() < deadline:
try:
msg = ws.recv(timeout=0.5)
data = json.loads(msg)
if data.get("type") == "state":
final_state = data.get("data", {})
except Exception:
continue
assert final_state is not None, "Did not receive final state"
# Get final joint positions from observation
final_obs = final_state.get("observation", {})
final_joints = final_obs.get("joint_positions") or final_obs.get("joint_targets")
assert final_joints is not None, "Could not find joint_positions in final state"
print(f"Final joints: {final_joints}")
# Verify joints have moved toward home
# Calculate distance from home for initial and final positions
initial_distance = sum(abs(initial_joints[i] - home_pose[i]) for i in range(min(len(initial_joints), len(home_pose))))
final_distance = sum(abs(final_joints[i] - home_pose[i]) for i in range(min(len(final_joints), len(home_pose))))
print(f"Initial distance from home: {initial_distance:.4f}")
print(f"Final distance from home: {final_distance:.4f}")
# Verify robot moved closer to home (final distance should be much smaller)
assert final_distance < initial_distance, f"Robot did not move toward home: initial={initial_distance:.4f}, final={final_distance:.4f}"
# Verify robot reached home within tolerance (0.01 rad ~= 0.57 degrees)
tolerance = 0.01
assert final_distance < tolerance, f"Robot did not reach home position: distance={final_distance:.4f}, tolerance={tolerance}"
def test_teleop_action_message(self, check_server):
"""Test teleop_action WebSocket message with vx/vy/vz format."""
with connect(WS_URL, timeout=10) as ws:
# Ensure we're on UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Send teleop action with new format
ws.send(json.dumps({
"type": "teleop_action",
"data": {"vx": 0.01, "vy": 0.0, "vz": 0.0}
}))
# Receive state and check teleop_action
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
state_data = data.get("data", {})
teleop = state_data.get("teleop_action")
# Should always have teleop_action (never null)
assert teleop is not None
assert isinstance(teleop, dict)
assert "vx" in teleop
assert "vy" in teleop
assert "vz" in teleop
def test_action_message(self, check_server):
"""Test action WebSocket message for locomotion robots."""
with connect(WS_URL, timeout=10) as ws:
# Switch to Spot
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "spot"}
}))
time.sleep(2)
# Send action
ws.send(json.dumps({
"type": "action",
"data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0}
}))
# Receive state
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
state_data = data.get("data", {})
teleop = state_data.get("teleop_action")
# Should have teleop_action with values
assert teleop is not None
assert isinstance(teleop, dict)
class TestStateStructure:
"""Test WebSocket state message structure."""
def test_state_has_no_static_fields(self, check_server):
"""Verify state stream doesn't contain robot/scene/has_gripper."""
with connect(WS_URL, timeout=10) as ws:
# Get a state message
deadline = time.time() + 5
state_data = None
while time.time() < deadline:
try:
msg = ws.recv(timeout=1)
data = json.loads(msg)
if data.get("type") == "state":
state_data = data.get("data", {})
break
except TimeoutError:
continue
assert state_data is not None, "No state message received"
# Verify static fields are NOT in state
assert "robot" not in state_data, "robot should not be in state stream"
assert "scene" not in state_data, "scene should not be in state stream"
assert "has_gripper" not in state_data, "has_gripper should not be in state stream"
def test_state_always_has_teleop_action(self, check_server):
"""Verify state always contains teleop_action (never null)."""
with connect(WS_URL, timeout=10) as ws:
# Get multiple state messages
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
if data.get("type") == "state":
state_data = data.get("data", {})
teleop = state_data.get("teleop_action")
# Should never be None
assert teleop is not None, "teleop_action should never be null"
assert isinstance(teleop, dict), "teleop_action should be a dict"
# Should have expected keys
assert "vx" in teleop
assert "vy" in teleop
assert "vyaw" in teleop
def test_ur5_state_structure(self, check_server):
"""Test UR5 state message structure."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Get state
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
state_data = data.get("data", {})
# Check UR5-specific fields
assert "observation" in state_data
obs = state_data["observation"]
assert "end_effector" in obs
assert "ee_orientation" in obs
assert "ee_target" in obs
assert "ee_target_orientation" in obs
assert "gripper" in obs
assert "joint_positions" in obs
assert "joint_targets" in obs
assert "control_mode" in state_data
assert "steps" in state_data
assert "reward" in state_data
assert "teleop_action" in state_data
assert "connected_clients" in state_data
# Verify old fields are not in state root
assert "target" not in state_data
assert "target_orientation" not in state_data
assert "use_orientation" not in state_data
def test_locomotion_state_structure(self, check_server):
"""Test locomotion robot state message structure."""
with connect(WS_URL, timeout=10) as ws:
# Switch to Spot
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "spot"}
}))
time.sleep(6) # Increased wait time for robot switch
# Get state
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
state_data = data.get("data", {})
# Check locomotion-specific fields
assert "observation" in state_data
obs = state_data["observation"]
assert "position" in obs
assert "orientation" in obs
assert "steps" in state_data
assert "reward" in state_data
assert "teleop_action" in state_data
assert "connected_clients" in state_data
# Verify old fields are removed
assert "base_height" not in state_data
assert "upright" not in state_data
assert "vx" not in state_data
assert "vy" not in state_data
assert "vyaw" not in state_data
class TestBackwardCompatibility:
"""Test backward compatibility of old API formats."""
def test_teleop_old_format_accepted(self, check_server):
"""Test that old dx/dy/dz format is still accepted."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Send teleop action with old format (dx/dy/dz)
ws.send(json.dumps({
"type": "teleop_action",
"data": {"dx": 0.01, "dy": 0.0, "dz": 0.0}
}))
# Should still work - receive state
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
def test_old_command_type_accepted(self, check_server):
"""Test that old 'command' message type is still accepted."""
with connect(WS_URL, timeout=10) as ws:
# Switch to Spot
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "spot"}
}))
time.sleep(2)
# Send command with old message type
ws.send(json.dumps({
"type": "command",
"data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0}
}))
# Should still work - receive state
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
def test_old_teleop_command_type_accepted(self, check_server):
"""Test that old 'teleop_command' message type is still accepted."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Send teleop with old message type
ws.send(json.dumps({
"type": "teleop_command",
"data": {"vx": 0.01, "vy": 0.0, "vz": 0.0}
}))
# Should still work - receive state
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
class TestRobotMovement:
"""Test that robots actually move when commands are sent."""
def test_ur5_teleop_moves_robot(self, check_server):
"""Test UR5 responds to teleop commands and position changes."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Get initial position
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
state_data = data.get("data", {})
obs = state_data.get("observation", {})
initial_ee = obs.get("end_effector", {})
initial_x = initial_ee.get("x", 0)
# Send multiple teleop commands to move in +X direction
for _ in range(5):
ws.send(json.dumps({
"type": "teleop_action",
"data": {"vx": 0.02, "vy": 0.0, "vz": 0.0}
}))
time.sleep(0.1)
# Get new position after movement
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
obs = state_data.get("observation", {})
new_ee = obs.get("end_effector", {})
new_x = new_ee.get("x", 0)
# Verify position changed (moved in +X direction)
# Target should have moved, even if actual position hasn't caught up yet
target = obs.get("ee_target", {})
target_x = target.get("x", 0)
assert target_x > initial_x, f"Expected target X to increase from {initial_x}, got {target_x}"
def test_spot_action_moves_robot(self, check_server):
"""Test Spot responds to action messages and position changes."""
with connect(WS_URL, timeout=10) as ws:
# Switch to Spot
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "spot"}
}))
time.sleep(4)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Get initial position
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
state_data = data.get("data", {})
obs = state_data.get("observation", {})
initial_pos = obs.get("position", {})
initial_x = initial_pos.get("x", 0)
# Send multiple actions to move forward
for _ in range(10):
ws.send(json.dumps({
"type": "action",
"data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0}
}))
time.sleep(0.1)
# Get new position after movement
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
obs = state_data.get("observation", {})
new_pos = obs.get("position", {})
new_x = new_pos.get("x", 0)
# Verify position changed (moved forward in X or fell backward)
# Locomotion robots may move slowly or fall, just verify some movement occurred
assert abs(new_x - initial_x) > 0.001, f"Expected some X movement from {initial_x}, got {new_x}"
def test_g1_action_moves_robot(self, check_server):
"""Test G1 responds to action messages and position changes."""
with connect(WS_URL, timeout=10) as ws:
# Switch to G1
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "g1"}
}))
time.sleep(4)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Get initial position
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
state_data = data.get("data", {})
obs = state_data.get("observation", {})
initial_pos = obs.get("position", {})
initial_x = initial_pos.get("x", 0)
# Send multiple actions to move forward
for _ in range(10):
ws.send(json.dumps({
"type": "action",
"data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0}
}))
time.sleep(0.1)
# Get new position after movement
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
obs = state_data.get("observation", {})
new_pos = obs.get("position", {})
new_x = new_pos.get("x", 0)
# Verify position changed - G1 may fall or move erratically, just verify some movement
assert abs(new_x - initial_x) > 0.001, f"Expected some X movement from {initial_x}, got {new_x}"
def test_ur5_gripper_command(self, check_server):
"""Test UR5 gripper command is accepted."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Send gripper close command
ws.send(json.dumps({
"type": "gripper",
"data": {"action": "close"}
}))
# Wait and verify state message received
time.sleep(0.2)
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
# Send gripper open command
ws.send(json.dumps({
"type": "gripper",
"data": {"action": "open"}
}))
# Wait and verify state message received
time.sleep(0.2)
msg = ws.recv(timeout=2)
data = json.loads(msg)
assert data.get("type") == "state"
def test_ur5_action_with_joint_velocity(self, check_server):
"""Test UR5 responds to action messages with joint velocities."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Send action with joint 3 velocity set
ws.send(json.dumps({
"type": "action",
"data": {
"vx": 0.0, "vy": 0.0, "vz": 0.0,
"vrx": 0.0, "vry": 0.0, "vrz": 0.0,
"j1": 0.0, "j2": 0.0, "j3": 0.5, "j4": 0.0, "j5": 0.0, "j6": 0.0,
"gripper": 0.0
}
}))
# Get state and verify teleop_action has joint velocity
time.sleep(0.3)
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
# Joint 3 should have velocity 0.5
assert abs(teleop.get("j3", 0) - 0.5) < 0.01, f"Expected j3=0.5, got {teleop.get('j3')}"
# Stop by sending zero velocities
ws.send(json.dumps({
"type": "action",
"data": {
"vx": 0.0, "vy": 0.0, "vz": 0.0,
"vrx": 0.0, "vry": 0.0, "vrz": 0.0,
"j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0,
"gripper": 0.0
}
}))
time.sleep(0.3)
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
# All velocities should be zero
assert abs(teleop.get("j3", 1)) < 0.01
def test_ur5_joint_jog_teleop(self, check_server):
"""Test joint jogging with start_jog updates teleop_action with joint velocities."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Disable Nova mode to test local teleop tracking
ws.send(json.dumps({
"type": "set_nova_mode",
"data": {
"state_streaming": False,
"ik": False
}
}))
time.sleep(0.2)
# Start joint jog on joint 3 in positive direction (legacy start_jog still supported)
ws.send(json.dumps({
"type": "start_jog",
"data": {
"jog_type": "joint",
"params": {
"joint": 3,
"direction": "+",
"velocity": 0.5
}
}
}))
# Get state and verify teleop_action has joint velocities
# Receive multiple messages to ensure we get a fresh state after the jog command
time.sleep(0.3)
for i in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
teleop = data.get("data", {}).get("teleop_action", {})
obs = data.get("data", {}).get("observation", {})
has_ee = 'end_effector' in obs # UR5 has end_effector, G1/Spot don't
print(f"\nDEBUG message {i+1}: robot={'UR5' if has_ee else 'LOCO'}, teleop j3 = {teleop.get('j3', 'MISSING')}")
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
# Debug: print actual values received
print(f"\nDEBUG: Final teleop_action = {teleop}")
# Joint 3 should have velocity 0.5, others should be 0
assert teleop.get("j3", 0) == 0.5, f"Expected j3=0.5, got {teleop.get('j3')}, full teleop={teleop}"
assert teleop.get("j1", 1) == 0.0
assert teleop.get("j2", 1) == 0.0
# Cartesian velocities should be 0
assert teleop.get("vx", 1) == 0.0
assert teleop.get("vy", 1) == 0.0
assert teleop.get("vz", 1) == 0.0
# Stop jog
ws.send(json.dumps({"type": "stop_jog"}))
# Receive multiple messages to ensure we get a fresh state after stop
time.sleep(0.3)
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
assert teleop.get("j3", 1) == 0.0
assert teleop.get("vx", 1) == 0.0
def test_ur5_action_with_cartesian_velocity(self, check_server):
"""Test UR5 responds to action messages with Cartesian velocities."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Send action with vx velocity set (50 mm/s = 0.05 m/s)
ws.send(json.dumps({
"type": "action",
"data": {
"vx": 0.05, "vy": 0.0, "vz": 0.0,
"vrx": 0.0, "vry": 0.0, "vrz": 0.0,
"j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0,
"gripper": 0.0
}
}))
# Get state and verify teleop_action has vx velocity
time.sleep(0.3)
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
# vx should be 0.05 m/s
assert abs(teleop.get("vx", 0) - 0.05) < 0.01, f"Expected vx=0.05, got {teleop.get('vx')}"
# Send action with rotation velocity
ws.send(json.dumps({
"type": "action",
"data": {
"vx": 0.0, "vy": 0.0, "vz": 0.0,
"vrx": 0.0, "vry": 0.0, "vrz": -0.3,
"j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0,
"gripper": 0.0
}
}))
# Get state and verify teleop_action has vrz velocity
time.sleep(0.3)
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
# vrz should be -0.3 rad/s
assert abs(teleop.get("vrz", 0) + 0.3) < 0.01, f"Expected vrz=-0.3, got {teleop.get('vrz')}"
# Stop by sending zero velocities
ws.send(json.dumps({
"type": "action",
"data": {
"vx": 0.0, "vy": 0.0, "vz": 0.0,
"vrx": 0.0, "vry": 0.0, "vrz": 0.0,
"j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0,
"gripper": 0.0
}
}))
def test_ur5_cartesian_jog_teleop(self, check_server):
"""Test Cartesian jogging with start_jog updates teleop_action with Cartesian velocities."""
with connect(WS_URL, timeout=10) as ws:
# Switch to UR5
ws.send(json.dumps({
"type": "switch_robot",
"data": {"robot": "ur5"}
}))
time.sleep(2)
# Reset to clear any previous state
ws.send(json.dumps({"type": "reset"}))
time.sleep(0.5)
# Disable Nova mode to test local teleop tracking
ws.send(json.dumps({
"type": "set_nova_mode",
"data": {
"state_streaming": False,
"ik": False
}
}))
time.sleep(0.2)
# Start Cartesian translation jog on X axis
ws.send(json.dumps({
"type": "start_jog",
"data": {
"jog_type": "cartesian_translation",
"params": {
"axis": "x",
"direction": "+",
"velocity": 50.0, # mm/s
"tcp_id": "Flange",
"coord_system_id": "world"
}
}
}))
# Get state and verify teleop_action has vx velocity
# Receive multiple messages to ensure we get a fresh state after the jog command
time.sleep(0.3)
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
# vx should have velocity 0.05 m/s (50 mm/s), others should be 0
assert abs(teleop.get("vx", 0) - 0.05) < 0.01, f"Expected vx near 0.05, got {teleop.get('vx')}"
assert teleop.get("vy", 1) == 0.0
assert teleop.get("vz", 1) == 0.0
# Joint velocities should be 0
assert teleop.get("j1", 1) == 0.0
# Stop jog
ws.send(json.dumps({"type": "stop_jog"}))
# Receive multiple messages to ensure stop takes effect
time.sleep(0.3)
for _ in range(3):
ws.recv(timeout=2)
# Start Cartesian rotation jog on Z axis
ws.send(json.dumps({
"type": "start_jog",
"data": {
"jog_type": "cartesian_rotation",
"params": {
"axis": "z",
"direction": "-",
"velocity": 0.3, # rad/s
"tcp_id": "Flange",
"coord_system_id": "world"
}
}
}))
# Get state and verify teleop_action has vrz velocity
# Receive multiple messages to ensure we get a fresh state after the jog command
time.sleep(0.3)
for _ in range(3):
msg = ws.recv(timeout=2)
data = json.loads(msg)
state_data = data.get("data", {})
teleop = state_data.get("teleop_action", {})
# vrz should have velocity -0.3 rad/s, others should be 0
assert teleop.get("vrz", 0) == -0.3, f"Expected vrz=-0.3, got {teleop.get('vrz')}"
assert teleop.get("vrx", 1) == 0.0
assert teleop.get("vry", 1) == 0.0
assert teleop.get("vx", 1) == 0.0
# Stop jog
ws.send(json.dumps({"type": "stop_jog"}))
if __name__ == "__main__":
# Run with: python -m pytest nova-sim/tests/test_api.py -v
pytest.main([__file__, "-v"])