| |
| """Test Nova Jogger WebSocket connection and commands.""" |
|
|
| import os |
| import sys |
| import time |
| import json |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| |
| def load_env_file(filepath): |
| """Simple .env file loader.""" |
| if not filepath.exists(): |
| return {} |
| env_vars = {} |
| with open(filepath, 'r') as f: |
| for line in f: |
| line = line.strip() |
| if line and not line.startswith('#') and '=' in line: |
| key, value = line.split('=', 1) |
| env_vars[key.strip()] = value.strip() |
| os.environ[key.strip()] = value.strip() |
| return env_vars |
|
|
|
|
| def main(): |
| |
| env_path = Path(__file__).parent.parent / ".env.local" |
| if not env_path.exists(): |
| print(f"Error: {env_path} not found") |
| return 1 |
|
|
| print(f"Loading environment from {env_path}") |
| env_vars = load_env_file(env_path) |
|
|
| |
| try: |
| from websockets.sync.client import connect |
| print("✓ websockets module imported successfully") |
| except ImportError as e: |
| print(f"✗ Failed to import websockets: {e}") |
| print("Install with: pip install websockets") |
| return 1 |
|
|
| |
| instance_url = os.getenv("NOVA_INSTANCE_URL") |
| access_token = os.getenv("NOVA_ACCESS_TOKEN") |
| cell_id = os.getenv("NOVA_CELL_ID", "cell") |
| motion_group_id = os.getenv("NOVA_MOTION_GROUP_ID") |
|
|
| if not all([instance_url, access_token, motion_group_id]): |
| print("✗ Missing required environment variables") |
| return 1 |
|
|
| |
| ws_url = instance_url.replace("https://", "wss://").replace("http://", "ws://") |
|
|
| |
| url_joint = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-joint" |
| url_tcp = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-tcp" |
|
|
| print(f"\nTesting Nova Jogger WebSocket connection (nova-js style)...") |
| print(f"Joint endpoint: {url_joint}") |
| print(f"TCP endpoint: {url_tcp}") |
| print() |
|
|
| |
| headers = [("Authorization", f"Bearer {access_token}")] |
| ws = None |
| url = None |
|
|
| try: |
| print("Connecting to joint endpoint...") |
| ws = connect(url_joint, open_timeout=10, additional_headers=headers) |
| url = url_joint |
| print("✓ WebSocket connected to joint endpoint successfully!") |
| except Exception as e: |
| print(f"✗ Joint endpoint failed: {e}") |
| print("\nTrying TCP endpoint...") |
| try: |
| ws = connect(url_tcp, open_timeout=10, additional_headers=headers) |
| url = url_tcp |
| print("✓ WebSocket connected to TCP endpoint successfully!") |
| except Exception as e2: |
| print(f"✗ Both endpoints failed. TCP error: {e2}") |
| return 1 |
|
|
| try: |
|
|
| |
| print("\nWaiting for initial server message...") |
| try: |
| initial_msg = ws.recv(timeout=2.0) |
| print(f" Initial message: {initial_msg}") |
| except TimeoutError: |
| print(" No initial message (timeout)") |
|
|
| |
| if "move-joint" in url: |
| print("\n[Test 1] Starting joint jog (J0+)...") |
| jog_cmd = { |
| "motion_group": motion_group_id, |
| "joint_velocities": [0.1, 0.0, 0.0, 0.0, 0.0, 0.0] |
| } |
| ws.send(json.dumps(jog_cmd)) |
| print(f" Sent: {json.dumps(jog_cmd)}") |
|
|
| |
| try: |
| response = ws.recv(timeout=2.0) |
| print(f" Response: {response}") |
| except TimeoutError: |
| print(" No response received (timeout after 2s)") |
|
|
| |
| print(" Jogging for 1 second...") |
| time.sleep(1.0) |
|
|
| |
| print("\n[Test 2] Stopping jog (zero velocities)...") |
| stop_cmd = { |
| "motion_group": motion_group_id, |
| "joint_velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] |
| } |
| ws.send(json.dumps(stop_cmd)) |
| print(f" Sent: {json.dumps(stop_cmd)}") |
|
|
| try: |
| response = ws.recv(timeout=2.0) |
| print(f" Response: {response}") |
| except TimeoutError: |
| print(" No response received (timeout after 2s)") |
|
|
| else: |
| print("\n[Test 1] Starting TCP translation (X+)...") |
| jog_cmd = { |
| "motion_group": motion_group_id, |
| "position_direction": {"x": 1, "y": 0, "z": 0}, |
| "rotation_direction": {"x": 0, "y": 0, "z": 0}, |
| "position_velocity": 10.0, |
| "rotation_velocity": 0.0, |
| "tcp": "Flange", |
| "coordinate_system": "world" |
| } |
| ws.send(json.dumps(jog_cmd)) |
| print(f" Sent: {json.dumps(jog_cmd)}") |
|
|
| |
| try: |
| response = ws.recv(timeout=2.0) |
| print(f" Response: {response}") |
| except TimeoutError: |
| print(" No response received (timeout after 2s)") |
|
|
| |
| print(" Jogging for 1 second...") |
| time.sleep(1.0) |
|
|
| |
| print("\n[Test 2] Stopping jog (zero velocities)...") |
| stop_cmd = { |
| "motion_group": motion_group_id, |
| "position_direction": {"x": 0, "y": 0, "z": 0}, |
| "rotation_direction": {"x": 0, "y": 0, "z": 0}, |
| "position_velocity": 0, |
| "rotation_velocity": 0, |
| "tcp": "Flange", |
| "coordinate_system": "world" |
| } |
| ws.send(json.dumps(stop_cmd)) |
| print(f" Sent: {json.dumps(stop_cmd)}") |
|
|
| try: |
| response = ws.recv(timeout=2.0) |
| print(f" Response: {response}") |
| except TimeoutError: |
| print(" No response received (timeout after 2s)") |
|
|
| |
| print("\n[Test 3] Closing connection...") |
| ws.close() |
| print("✓ Connection closed successfully") |
|
|
| print("\n✓ All jogger tests completed!") |
| return 0 |
|
|
| except Exception as e: |
| print(f"✗ Test failed: {type(e).__name__}: {e}") |
| import traceback |
| traceback.print_exc() |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|