#!/usr/bin/env python3 """Test Nova Jogger WebSocket connection and commands.""" import os import sys import time import json from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) # Load .env.local def load_env_file(filepath): """Simple .env file loader.""" if not filepath.exists(): return {} env_vars = {} with open(filepath, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) env_vars[key.strip()] = value.strip() os.environ[key.strip()] = value.strip() return env_vars def main(): # Load .env.local env_path = Path(__file__).parent.parent / ".env.local" if not env_path.exists(): print(f"Error: {env_path} not found") return 1 print(f"Loading environment from {env_path}") env_vars = load_env_file(env_path) # Try to import websockets try: from websockets.sync.client import connect print("✓ websockets module imported successfully") except ImportError as e: print(f"✗ Failed to import websockets: {e}") print("Install with: pip install websockets") return 1 # Get config instance_url = os.getenv("NOVA_INSTANCE_URL") access_token = os.getenv("NOVA_ACCESS_TOKEN") cell_id = os.getenv("NOVA_CELL_ID", "cell") motion_group_id = os.getenv("NOVA_MOTION_GROUP_ID") if not all([instance_url, access_token, motion_group_id]): print("✗ Missing required environment variables") return 1 # Convert HTTP to WebSocket URL ws_url = instance_url.replace("https://", "wss://").replace("http://", "ws://") # Try nova-js style endpoints (continuous velocity) url_joint = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-joint" url_tcp = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-tcp" print(f"\nTesting Nova Jogger WebSocket connection (nova-js style)...") print(f"Joint endpoint: {url_joint}") print(f"TCP endpoint: {url_tcp}") print() # Try to connect to joint endpoint headers = [("Authorization", f"Bearer {access_token}")] ws = None url = None try: print("Connecting to joint endpoint...") ws = connect(url_joint, open_timeout=10, additional_headers=headers) url = url_joint print("✓ WebSocket connected to joint endpoint successfully!") except Exception as e: print(f"✗ Joint endpoint failed: {e}") print("\nTrying TCP endpoint...") try: ws = connect(url_tcp, open_timeout=10, additional_headers=headers) url = url_tcp print("✓ WebSocket connected to TCP endpoint successfully!") except Exception as e2: print(f"✗ Both endpoints failed. TCP error: {e2}") return 1 try: # Check if server sends an initial message print("\nWaiting for initial server message...") try: initial_msg = ws.recv(timeout=2.0) print(f" Initial message: {initial_msg}") except TimeoutError: print(" No initial message (timeout)") # Test 1: Start joint jogging (nova-js style) if "move-joint" in url: print("\n[Test 1] Starting joint jog (J0+)...") jog_cmd = { "motion_group": motion_group_id, "joint_velocities": [0.1, 0.0, 0.0, 0.0, 0.0, 0.0] } ws.send(json.dumps(jog_cmd)) print(f" Sent: {json.dumps(jog_cmd)}") # Try to receive response with timeout try: response = ws.recv(timeout=2.0) print(f" Response: {response}") except TimeoutError: print(" No response received (timeout after 2s)") # Let it jog briefly print(" Jogging for 1 second...") time.sleep(1.0) # Test 2: Stop jogging (send zero velocities) print("\n[Test 2] Stopping jog (zero velocities)...") stop_cmd = { "motion_group": motion_group_id, "joint_velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] } ws.send(json.dumps(stop_cmd)) print(f" Sent: {json.dumps(stop_cmd)}") try: response = ws.recv(timeout=2.0) print(f" Response: {response}") except TimeoutError: print(" No response received (timeout after 2s)") else: # TCP endpoint print("\n[Test 1] Starting TCP translation (X+)...") jog_cmd = { "motion_group": motion_group_id, "position_direction": {"x": 1, "y": 0, "z": 0}, "rotation_direction": {"x": 0, "y": 0, "z": 0}, "position_velocity": 10.0, "rotation_velocity": 0.0, "tcp": "Flange", "coordinate_system": "world" } ws.send(json.dumps(jog_cmd)) print(f" Sent: {json.dumps(jog_cmd)}") # Try to receive response try: response = ws.recv(timeout=2.0) print(f" Response: {response}") except TimeoutError: print(" No response received (timeout after 2s)") # Let it jog briefly print(" Jogging for 1 second...") time.sleep(1.0) # Test 2: Stop jogging print("\n[Test 2] Stopping jog (zero velocities)...") stop_cmd = { "motion_group": motion_group_id, "position_direction": {"x": 0, "y": 0, "z": 0}, "rotation_direction": {"x": 0, "y": 0, "z": 0}, "position_velocity": 0, "rotation_velocity": 0, "tcp": "Flange", "coordinate_system": "world" } ws.send(json.dumps(stop_cmd)) print(f" Sent: {json.dumps(stop_cmd)}") try: response = ws.recv(timeout=2.0) print(f" Response: {response}") except TimeoutError: print(" No response received (timeout after 2s)") # Close connection print("\n[Test 3] Closing connection...") ws.close() print("✓ Connection closed successfully") print("\n✓ All jogger tests completed!") return 0 except Exception as e: print(f"✗ Test failed: {type(e).__name__}: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())