"""Comprehensive tests for Nova-Sim API. Run these tests with Nova-Sim server running: python nova-sim/mujoco_server.py Then in another terminal: pytest nova-sim/tests/test_api.py -v """ import json import time import pytest import requests from websockets.sync.client import connect BASE_URL = "http://localhost:3004/nova-sim/api/v1" WS_URL = "ws://localhost:3004/nova-sim/api/v1/ws" @pytest.fixture(scope="module") def check_server(): """Check if the server is running before tests.""" try: response = requests.get(f"{BASE_URL}/metadata", timeout=2) response.raise_for_status() except requests.RequestException: pytest.skip("Nova-Sim server is not running at localhost:3004") class TestHTTPEndpoints: """Test HTTP endpoints.""" def test_metadata_endpoint(self, check_server): """Test /metadata endpoint returns expected structure.""" response = requests.get(f"{BASE_URL}/metadata") assert response.status_code == 200 data = response.json() assert "robots" in data assert "actions" in data # camera_feeds, current_selection moved to /env endpoint assert "camera_feeds" not in data assert "overlay_camera_presets" not in data assert "current_selection" not in data assert "default_scene" not in data def test_env_endpoint(self, check_server): """Test GET /env endpoint returns static environment info.""" response = requests.get(f"{BASE_URL}/env") assert response.status_code == 200 data = response.json() # Check required fields assert "robot" in data assert "scene" in data assert "has_gripper" in data assert "action_space" in data assert "observation_space" in data assert "camera_feeds" in data # Validate camera_feeds structure assert isinstance(data["camera_feeds"], list) if data["camera_feeds"]: feed = data["camera_feeds"][0] assert "name" in feed assert "label" in feed assert "url" in feed def test_video_feed_endpoint(self, check_server): """Test /video_feed endpoint is accessible.""" response = requests.get(f"{BASE_URL}/video_feed", stream=True, timeout=5) assert response.status_code == 200 assert "multipart/x-mixed-replace" in response.headers.get("Content-Type", "") class TestWebSocketMessages: """Test WebSocket message functionality.""" def test_websocket_connection(self, check_server): """Test basic WebSocket connection.""" with connect(WS_URL, timeout=10) as ws: # Send client identity (old trainer_identity still works for backward compat) ws.send(json.dumps({ "type": "client_identity", "data": {"client_id": "test-client"} })) # Should receive state messages received = False deadline = time.time() + 5 while time.time() < deadline: try: msg = ws.recv(timeout=1) data = json.loads(msg) if data.get("type") == "state": received = True break except TimeoutError: continue assert received, "Did not receive state message within timeout" def test_reset_message(self, check_server): """Test reset WebSocket message.""" with connect(WS_URL, timeout=10) as ws: # Send reset without seed ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Send reset with seed ws.send(json.dumps({ "type": "reset", "data": {"seed": 42} })) time.sleep(0.5) # Should receive state messages after reset msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" def test_switch_robot_message(self, check_server): """Test switch_robot WebSocket message.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 (may default to ur5_t_push) ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Wait for robot switch # Check /env to verify switch response = requests.get(f"{BASE_URL}/env") data = response.json() # Accept either ur5 or ur5_t_push (default may vary) assert data["robot"] in ["ur5", "ur5_t_push"] def test_home_message(self, check_server): """Test home WebSocket message for UR5 - verify robot actually moves.""" with connect(WS_URL, timeout=10) as ws: # Ensure we're on UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) # Wait for robot switch and get initial state from state stream initial_state = None deadline = time.time() + 5 while time.time() < deadline: try: msg = ws.recv(timeout=1) data = json.loads(msg) if data.get("type") == "state" and data.get("data", {}).get("robot") == "ur5": initial_state = data.get("data", {}) break except Exception: continue assert initial_state is not None, "Did not receive UR5 state after switching robots" # Get home pose from state (should be in observation or root level) home_pose = initial_state.get("observation", {}).get("home_pose") or initial_state.get("home_pose") # If not in state, get from /env as fallback (only once) if home_pose is None: env_resp = requests.get(f"{BASE_URL}/env", timeout=5) env_resp.raise_for_status() home_pose = env_resp.json().get("home_pose") assert home_pose is not None, "Could not find home_pose in state or /env" # Get initial joint positions from observation initial_obs = initial_state.get("observation", {}) initial_joints = initial_obs.get("joint_positions") or initial_obs.get("joint_targets") assert initial_joints is not None, "Could not find joint_positions in state" print(f"\nInitial joints: {initial_joints}") print(f"Home pose: {home_pose}") # Call blocking homing endpoint homing_resp = requests.get( f"{BASE_URL}/homing", params={"timeout_s": 10, "tolerance": 0.05, "poll_interval_s": 0.1}, timeout=15, ) homing_resp.raise_for_status() # Wait for final state messages final_state = None deadline = time.time() + 2 while time.time() < deadline: try: msg = ws.recv(timeout=0.5) data = json.loads(msg) if data.get("type") == "state": final_state = data.get("data", {}) except Exception: continue assert final_state is not None, "Did not receive final state" # Get final joint positions from observation final_obs = final_state.get("observation", {}) final_joints = final_obs.get("joint_positions") or final_obs.get("joint_targets") assert final_joints is not None, "Could not find joint_positions in final state" print(f"Final joints: {final_joints}") # Verify joints have moved toward home # Calculate distance from home for initial and final positions initial_distance = sum(abs(initial_joints[i] - home_pose[i]) for i in range(min(len(initial_joints), len(home_pose)))) final_distance = sum(abs(final_joints[i] - home_pose[i]) for i in range(min(len(final_joints), len(home_pose)))) print(f"Initial distance from home: {initial_distance:.4f}") print(f"Final distance from home: {final_distance:.4f}") # Verify robot moved closer to home (final distance should be much smaller) assert final_distance < initial_distance, f"Robot did not move toward home: initial={initial_distance:.4f}, final={final_distance:.4f}" # Verify robot reached home within tolerance (0.01 rad ~= 0.57 degrees) tolerance = 0.01 assert final_distance < tolerance, f"Robot did not reach home position: distance={final_distance:.4f}, tolerance={tolerance}" def test_teleop_action_message(self, check_server): """Test teleop_action WebSocket message with vx/vy/vz format.""" with connect(WS_URL, timeout=10) as ws: # Ensure we're on UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Send teleop action with new format ws.send(json.dumps({ "type": "teleop_action", "data": {"vx": 0.01, "vy": 0.0, "vz": 0.0} })) # Receive state and check teleop_action msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" state_data = data.get("data", {}) teleop = state_data.get("teleop_action") # Should always have teleop_action (never null) assert teleop is not None assert isinstance(teleop, dict) assert "vx" in teleop assert "vy" in teleop assert "vz" in teleop def test_action_message(self, check_server): """Test action WebSocket message for locomotion robots.""" with connect(WS_URL, timeout=10) as ws: # Switch to Spot ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "spot"} })) time.sleep(2) # Send action ws.send(json.dumps({ "type": "action", "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} })) # Receive state msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" state_data = data.get("data", {}) teleop = state_data.get("teleop_action") # Should have teleop_action with values assert teleop is not None assert isinstance(teleop, dict) class TestStateStructure: """Test WebSocket state message structure.""" def test_state_has_no_static_fields(self, check_server): """Verify state stream doesn't contain robot/scene/has_gripper.""" with connect(WS_URL, timeout=10) as ws: # Get a state message deadline = time.time() + 5 state_data = None while time.time() < deadline: try: msg = ws.recv(timeout=1) data = json.loads(msg) if data.get("type") == "state": state_data = data.get("data", {}) break except TimeoutError: continue assert state_data is not None, "No state message received" # Verify static fields are NOT in state assert "robot" not in state_data, "robot should not be in state stream" assert "scene" not in state_data, "scene should not be in state stream" assert "has_gripper" not in state_data, "has_gripper should not be in state stream" def test_state_always_has_teleop_action(self, check_server): """Verify state always contains teleop_action (never null).""" with connect(WS_URL, timeout=10) as ws: # Get multiple state messages for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) if data.get("type") == "state": state_data = data.get("data", {}) teleop = state_data.get("teleop_action") # Should never be None assert teleop is not None, "teleop_action should never be null" assert isinstance(teleop, dict), "teleop_action should be a dict" # Should have expected keys assert "vx" in teleop assert "vy" in teleop assert "vyaw" in teleop def test_ur5_state_structure(self, check_server): """Test UR5 state message structure.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Get state msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" state_data = data.get("data", {}) # Check UR5-specific fields assert "observation" in state_data obs = state_data["observation"] assert "end_effector" in obs assert "ee_orientation" in obs assert "ee_target" in obs assert "ee_target_orientation" in obs assert "gripper" in obs assert "joint_positions" in obs assert "joint_targets" in obs assert "control_mode" in state_data assert "steps" in state_data assert "reward" in state_data assert "teleop_action" in state_data assert "connected_clients" in state_data # Verify old fields are not in state root assert "target" not in state_data assert "target_orientation" not in state_data assert "use_orientation" not in state_data def test_locomotion_state_structure(self, check_server): """Test locomotion robot state message structure.""" with connect(WS_URL, timeout=10) as ws: # Switch to Spot ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "spot"} })) time.sleep(6) # Increased wait time for robot switch # Get state msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" state_data = data.get("data", {}) # Check locomotion-specific fields assert "observation" in state_data obs = state_data["observation"] assert "position" in obs assert "orientation" in obs assert "steps" in state_data assert "reward" in state_data assert "teleop_action" in state_data assert "connected_clients" in state_data # Verify old fields are removed assert "base_height" not in state_data assert "upright" not in state_data assert "vx" not in state_data assert "vy" not in state_data assert "vyaw" not in state_data class TestBackwardCompatibility: """Test backward compatibility of old API formats.""" def test_teleop_old_format_accepted(self, check_server): """Test that old dx/dy/dz format is still accepted.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Send teleop action with old format (dx/dy/dz) ws.send(json.dumps({ "type": "teleop_action", "data": {"dx": 0.01, "dy": 0.0, "dz": 0.0} })) # Should still work - receive state msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" def test_old_command_type_accepted(self, check_server): """Test that old 'command' message type is still accepted.""" with connect(WS_URL, timeout=10) as ws: # Switch to Spot ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "spot"} })) time.sleep(2) # Send command with old message type ws.send(json.dumps({ "type": "command", "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} })) # Should still work - receive state msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" def test_old_teleop_command_type_accepted(self, check_server): """Test that old 'teleop_command' message type is still accepted.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Send teleop with old message type ws.send(json.dumps({ "type": "teleop_command", "data": {"vx": 0.01, "vy": 0.0, "vz": 0.0} })) # Should still work - receive state msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" class TestRobotMovement: """Test that robots actually move when commands are sent.""" def test_ur5_teleop_moves_robot(self, check_server): """Test UR5 responds to teleop commands and position changes.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Get initial position msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" state_data = data.get("data", {}) obs = state_data.get("observation", {}) initial_ee = obs.get("end_effector", {}) initial_x = initial_ee.get("x", 0) # Send multiple teleop commands to move in +X direction for _ in range(5): ws.send(json.dumps({ "type": "teleop_action", "data": {"vx": 0.02, "vy": 0.0, "vz": 0.0} })) time.sleep(0.1) # Get new position after movement msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) obs = state_data.get("observation", {}) new_ee = obs.get("end_effector", {}) new_x = new_ee.get("x", 0) # Verify position changed (moved in +X direction) # Target should have moved, even if actual position hasn't caught up yet target = obs.get("ee_target", {}) target_x = target.get("x", 0) assert target_x > initial_x, f"Expected target X to increase from {initial_x}, got {target_x}" def test_spot_action_moves_robot(self, check_server): """Test Spot responds to action messages and position changes.""" with connect(WS_URL, timeout=10) as ws: # Switch to Spot ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "spot"} })) time.sleep(4) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Get initial position msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" state_data = data.get("data", {}) obs = state_data.get("observation", {}) initial_pos = obs.get("position", {}) initial_x = initial_pos.get("x", 0) # Send multiple actions to move forward for _ in range(10): ws.send(json.dumps({ "type": "action", "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} })) time.sleep(0.1) # Get new position after movement msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) obs = state_data.get("observation", {}) new_pos = obs.get("position", {}) new_x = new_pos.get("x", 0) # Verify position changed (moved forward in X or fell backward) # Locomotion robots may move slowly or fall, just verify some movement occurred assert abs(new_x - initial_x) > 0.001, f"Expected some X movement from {initial_x}, got {new_x}" def test_g1_action_moves_robot(self, check_server): """Test G1 responds to action messages and position changes.""" with connect(WS_URL, timeout=10) as ws: # Switch to G1 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "g1"} })) time.sleep(4) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Get initial position msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" state_data = data.get("data", {}) obs = state_data.get("observation", {}) initial_pos = obs.get("position", {}) initial_x = initial_pos.get("x", 0) # Send multiple actions to move forward for _ in range(10): ws.send(json.dumps({ "type": "action", "data": {"vx": 0.5, "vy": 0.0, "vyaw": 0.0} })) time.sleep(0.1) # Get new position after movement msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) obs = state_data.get("observation", {}) new_pos = obs.get("position", {}) new_x = new_pos.get("x", 0) # Verify position changed - G1 may fall or move erratically, just verify some movement assert abs(new_x - initial_x) > 0.001, f"Expected some X movement from {initial_x}, got {new_x}" def test_ur5_gripper_command(self, check_server): """Test UR5 gripper command is accepted.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Send gripper close command ws.send(json.dumps({ "type": "gripper", "data": {"action": "close"} })) # Wait and verify state message received time.sleep(0.2) msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" # Send gripper open command ws.send(json.dumps({ "type": "gripper", "data": {"action": "open"} })) # Wait and verify state message received time.sleep(0.2) msg = ws.recv(timeout=2) data = json.loads(msg) assert data.get("type") == "state" def test_ur5_action_with_joint_velocity(self, check_server): """Test UR5 responds to action messages with joint velocities.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Send action with joint 3 velocity set ws.send(json.dumps({ "type": "action", "data": { "vx": 0.0, "vy": 0.0, "vz": 0.0, "vrx": 0.0, "vry": 0.0, "vrz": 0.0, "j1": 0.0, "j2": 0.0, "j3": 0.5, "j4": 0.0, "j5": 0.0, "j6": 0.0, "gripper": 0.0 } })) # Get state and verify teleop_action has joint velocity time.sleep(0.3) for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) # Joint 3 should have velocity 0.5 assert abs(teleop.get("j3", 0) - 0.5) < 0.01, f"Expected j3=0.5, got {teleop.get('j3')}" # Stop by sending zero velocities ws.send(json.dumps({ "type": "action", "data": { "vx": 0.0, "vy": 0.0, "vz": 0.0, "vrx": 0.0, "vry": 0.0, "vrz": 0.0, "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, "gripper": 0.0 } })) time.sleep(0.3) for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) # All velocities should be zero assert abs(teleop.get("j3", 1)) < 0.01 def test_ur5_joint_jog_teleop(self, check_server): """Test joint jogging with start_jog updates teleop_action with joint velocities.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Disable Nova mode to test local teleop tracking ws.send(json.dumps({ "type": "set_nova_mode", "data": { "state_streaming": False, "ik": False } })) time.sleep(0.2) # Start joint jog on joint 3 in positive direction (legacy start_jog still supported) ws.send(json.dumps({ "type": "start_jog", "data": { "jog_type": "joint", "params": { "joint": 3, "direction": "+", "velocity": 0.5 } } })) # Get state and verify teleop_action has joint velocities # Receive multiple messages to ensure we get a fresh state after the jog command time.sleep(0.3) for i in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) teleop = data.get("data", {}).get("teleop_action", {}) obs = data.get("data", {}).get("observation", {}) has_ee = 'end_effector' in obs # UR5 has end_effector, G1/Spot don't print(f"\nDEBUG message {i+1}: robot={'UR5' if has_ee else 'LOCO'}, teleop j3 = {teleop.get('j3', 'MISSING')}") state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) # Debug: print actual values received print(f"\nDEBUG: Final teleop_action = {teleop}") # Joint 3 should have velocity 0.5, others should be 0 assert teleop.get("j3", 0) == 0.5, f"Expected j3=0.5, got {teleop.get('j3')}, full teleop={teleop}" assert teleop.get("j1", 1) == 0.0 assert teleop.get("j2", 1) == 0.0 # Cartesian velocities should be 0 assert teleop.get("vx", 1) == 0.0 assert teleop.get("vy", 1) == 0.0 assert teleop.get("vz", 1) == 0.0 # Stop jog ws.send(json.dumps({"type": "stop_jog"})) # Receive multiple messages to ensure we get a fresh state after stop time.sleep(0.3) for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) assert teleop.get("j3", 1) == 0.0 assert teleop.get("vx", 1) == 0.0 def test_ur5_action_with_cartesian_velocity(self, check_server): """Test UR5 responds to action messages with Cartesian velocities.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Send action with vx velocity set (50 mm/s = 0.05 m/s) ws.send(json.dumps({ "type": "action", "data": { "vx": 0.05, "vy": 0.0, "vz": 0.0, "vrx": 0.0, "vry": 0.0, "vrz": 0.0, "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, "gripper": 0.0 } })) # Get state and verify teleop_action has vx velocity time.sleep(0.3) for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) # vx should be 0.05 m/s assert abs(teleop.get("vx", 0) - 0.05) < 0.01, f"Expected vx=0.05, got {teleop.get('vx')}" # Send action with rotation velocity ws.send(json.dumps({ "type": "action", "data": { "vx": 0.0, "vy": 0.0, "vz": 0.0, "vrx": 0.0, "vry": 0.0, "vrz": -0.3, "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, "gripper": 0.0 } })) # Get state and verify teleop_action has vrz velocity time.sleep(0.3) for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) # vrz should be -0.3 rad/s assert abs(teleop.get("vrz", 0) + 0.3) < 0.01, f"Expected vrz=-0.3, got {teleop.get('vrz')}" # Stop by sending zero velocities ws.send(json.dumps({ "type": "action", "data": { "vx": 0.0, "vy": 0.0, "vz": 0.0, "vrx": 0.0, "vry": 0.0, "vrz": 0.0, "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, "gripper": 0.0 } })) def test_ur5_cartesian_jog_teleop(self, check_server): """Test Cartesian jogging with start_jog updates teleop_action with Cartesian velocities.""" with connect(WS_URL, timeout=10) as ws: # Switch to UR5 ws.send(json.dumps({ "type": "switch_robot", "data": {"robot": "ur5"} })) time.sleep(2) # Reset to clear any previous state ws.send(json.dumps({"type": "reset"})) time.sleep(0.5) # Disable Nova mode to test local teleop tracking ws.send(json.dumps({ "type": "set_nova_mode", "data": { "state_streaming": False, "ik": False } })) time.sleep(0.2) # Start Cartesian translation jog on X axis ws.send(json.dumps({ "type": "start_jog", "data": { "jog_type": "cartesian_translation", "params": { "axis": "x", "direction": "+", "velocity": 50.0, # mm/s "tcp_id": "Flange", "coord_system_id": "world" } } })) # Get state and verify teleop_action has vx velocity # Receive multiple messages to ensure we get a fresh state after the jog command time.sleep(0.3) for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) # vx should have velocity 0.05 m/s (50 mm/s), others should be 0 assert abs(teleop.get("vx", 0) - 0.05) < 0.01, f"Expected vx near 0.05, got {teleop.get('vx')}" assert teleop.get("vy", 1) == 0.0 assert teleop.get("vz", 1) == 0.0 # Joint velocities should be 0 assert teleop.get("j1", 1) == 0.0 # Stop jog ws.send(json.dumps({"type": "stop_jog"})) # Receive multiple messages to ensure stop takes effect time.sleep(0.3) for _ in range(3): ws.recv(timeout=2) # Start Cartesian rotation jog on Z axis ws.send(json.dumps({ "type": "start_jog", "data": { "jog_type": "cartesian_rotation", "params": { "axis": "z", "direction": "-", "velocity": 0.3, # rad/s "tcp_id": "Flange", "coord_system_id": "world" } } })) # Get state and verify teleop_action has vrz velocity # Receive multiple messages to ensure we get a fresh state after the jog command time.sleep(0.3) for _ in range(3): msg = ws.recv(timeout=2) data = json.loads(msg) state_data = data.get("data", {}) teleop = state_data.get("teleop_action", {}) # vrz should have velocity -0.3 rad/s, others should be 0 assert teleop.get("vrz", 0) == -0.3, f"Expected vrz=-0.3, got {teleop.get('vrz')}" assert teleop.get("vrx", 1) == 0.0 assert teleop.get("vry", 1) == 0.0 assert teleop.get("vx", 1) == 0.0 # Stop jog ws.send(json.dumps({"type": "stop_jog"})) if __name__ == "__main__": # Run with: python -m pytest nova-sim/tests/test_api.py -v pytest.main([__file__, "-v"])