File size: 6,772 Bytes
a75f112 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | #!/usr/bin/env python3
"""Test Nova Jogger WebSocket connection and commands."""
import os
import sys
import time
import json
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Load .env.local
def load_env_file(filepath):
"""Simple .env file loader."""
if not filepath.exists():
return {}
env_vars = {}
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key.strip()] = value.strip()
os.environ[key.strip()] = value.strip()
return env_vars
def main():
# Load .env.local
env_path = Path(__file__).parent.parent / ".env.local"
if not env_path.exists():
print(f"Error: {env_path} not found")
return 1
print(f"Loading environment from {env_path}")
env_vars = load_env_file(env_path)
# Try to import websockets
try:
from websockets.sync.client import connect
print("✓ websockets module imported successfully")
except ImportError as e:
print(f"✗ Failed to import websockets: {e}")
print("Install with: pip install websockets")
return 1
# Get config
instance_url = os.getenv("NOVA_INSTANCE_URL")
access_token = os.getenv("NOVA_ACCESS_TOKEN")
cell_id = os.getenv("NOVA_CELL_ID", "cell")
motion_group_id = os.getenv("NOVA_MOTION_GROUP_ID")
if not all([instance_url, access_token, motion_group_id]):
print("✗ Missing required environment variables")
return 1
# Convert HTTP to WebSocket URL
ws_url = instance_url.replace("https://", "wss://").replace("http://", "ws://")
# Try nova-js style endpoints (continuous velocity)
url_joint = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-joint"
url_tcp = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-tcp"
print(f"\nTesting Nova Jogger WebSocket connection (nova-js style)...")
print(f"Joint endpoint: {url_joint}")
print(f"TCP endpoint: {url_tcp}")
print()
# Try to connect to joint endpoint
headers = [("Authorization", f"Bearer {access_token}")]
ws = None
url = None
try:
print("Connecting to joint endpoint...")
ws = connect(url_joint, open_timeout=10, additional_headers=headers)
url = url_joint
print("✓ WebSocket connected to joint endpoint successfully!")
except Exception as e:
print(f"✗ Joint endpoint failed: {e}")
print("\nTrying TCP endpoint...")
try:
ws = connect(url_tcp, open_timeout=10, additional_headers=headers)
url = url_tcp
print("✓ WebSocket connected to TCP endpoint successfully!")
except Exception as e2:
print(f"✗ Both endpoints failed. TCP error: {e2}")
return 1
try:
# Check if server sends an initial message
print("\nWaiting for initial server message...")
try:
initial_msg = ws.recv(timeout=2.0)
print(f" Initial message: {initial_msg}")
except TimeoutError:
print(" No initial message (timeout)")
# Test 1: Start joint jogging (nova-js style)
if "move-joint" in url:
print("\n[Test 1] Starting joint jog (J0+)...")
jog_cmd = {
"motion_group": motion_group_id,
"joint_velocities": [0.1, 0.0, 0.0, 0.0, 0.0, 0.0]
}
ws.send(json.dumps(jog_cmd))
print(f" Sent: {json.dumps(jog_cmd)}")
# Try to receive response with timeout
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
# Let it jog briefly
print(" Jogging for 1 second...")
time.sleep(1.0)
# Test 2: Stop jogging (send zero velocities)
print("\n[Test 2] Stopping jog (zero velocities)...")
stop_cmd = {
"motion_group": motion_group_id,
"joint_velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
}
ws.send(json.dumps(stop_cmd))
print(f" Sent: {json.dumps(stop_cmd)}")
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
else: # TCP endpoint
print("\n[Test 1] Starting TCP translation (X+)...")
jog_cmd = {
"motion_group": motion_group_id,
"position_direction": {"x": 1, "y": 0, "z": 0},
"rotation_direction": {"x": 0, "y": 0, "z": 0},
"position_velocity": 10.0,
"rotation_velocity": 0.0,
"tcp": "Flange",
"coordinate_system": "world"
}
ws.send(json.dumps(jog_cmd))
print(f" Sent: {json.dumps(jog_cmd)}")
# Try to receive response
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
# Let it jog briefly
print(" Jogging for 1 second...")
time.sleep(1.0)
# Test 2: Stop jogging
print("\n[Test 2] Stopping jog (zero velocities)...")
stop_cmd = {
"motion_group": motion_group_id,
"position_direction": {"x": 0, "y": 0, "z": 0},
"rotation_direction": {"x": 0, "y": 0, "z": 0},
"position_velocity": 0,
"rotation_velocity": 0,
"tcp": "Flange",
"coordinate_system": "world"
}
ws.send(json.dumps(stop_cmd))
print(f" Sent: {json.dumps(stop_cmd)}")
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
# Close connection
print("\n[Test 3] Closing connection...")
ws.close()
print("✓ Connection closed successfully")
print("\n✓ All jogger tests completed!")
return 0
except Exception as e:
print(f"✗ Test failed: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())
|