nova-sim / scripts /test_jogger.py
Georg
Implement Nova API jogging support and enhance UI controls
a75f112
#!/usr/bin/env python3
"""Test Nova Jogger WebSocket connection and commands."""
import os
import sys
import time
import json
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Load .env.local
def load_env_file(filepath):
"""Simple .env file loader."""
if not filepath.exists():
return {}
env_vars = {}
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key.strip()] = value.strip()
os.environ[key.strip()] = value.strip()
return env_vars
def main():
# Load .env.local
env_path = Path(__file__).parent.parent / ".env.local"
if not env_path.exists():
print(f"Error: {env_path} not found")
return 1
print(f"Loading environment from {env_path}")
env_vars = load_env_file(env_path)
# Try to import websockets
try:
from websockets.sync.client import connect
print("✓ websockets module imported successfully")
except ImportError as e:
print(f"✗ Failed to import websockets: {e}")
print("Install with: pip install websockets")
return 1
# Get config
instance_url = os.getenv("NOVA_INSTANCE_URL")
access_token = os.getenv("NOVA_ACCESS_TOKEN")
cell_id = os.getenv("NOVA_CELL_ID", "cell")
motion_group_id = os.getenv("NOVA_MOTION_GROUP_ID")
if not all([instance_url, access_token, motion_group_id]):
print("✗ Missing required environment variables")
return 1
# Convert HTTP to WebSocket URL
ws_url = instance_url.replace("https://", "wss://").replace("http://", "ws://")
# Try nova-js style endpoints (continuous velocity)
url_joint = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-joint"
url_tcp = f"{ws_url}/api/v1/cells/{cell_id}/motion-groups/move-tcp"
print(f"\nTesting Nova Jogger WebSocket connection (nova-js style)...")
print(f"Joint endpoint: {url_joint}")
print(f"TCP endpoint: {url_tcp}")
print()
# Try to connect to joint endpoint
headers = [("Authorization", f"Bearer {access_token}")]
ws = None
url = None
try:
print("Connecting to joint endpoint...")
ws = connect(url_joint, open_timeout=10, additional_headers=headers)
url = url_joint
print("✓ WebSocket connected to joint endpoint successfully!")
except Exception as e:
print(f"✗ Joint endpoint failed: {e}")
print("\nTrying TCP endpoint...")
try:
ws = connect(url_tcp, open_timeout=10, additional_headers=headers)
url = url_tcp
print("✓ WebSocket connected to TCP endpoint successfully!")
except Exception as e2:
print(f"✗ Both endpoints failed. TCP error: {e2}")
return 1
try:
# Check if server sends an initial message
print("\nWaiting for initial server message...")
try:
initial_msg = ws.recv(timeout=2.0)
print(f" Initial message: {initial_msg}")
except TimeoutError:
print(" No initial message (timeout)")
# Test 1: Start joint jogging (nova-js style)
if "move-joint" in url:
print("\n[Test 1] Starting joint jog (J0+)...")
jog_cmd = {
"motion_group": motion_group_id,
"joint_velocities": [0.1, 0.0, 0.0, 0.0, 0.0, 0.0]
}
ws.send(json.dumps(jog_cmd))
print(f" Sent: {json.dumps(jog_cmd)}")
# Try to receive response with timeout
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
# Let it jog briefly
print(" Jogging for 1 second...")
time.sleep(1.0)
# Test 2: Stop jogging (send zero velocities)
print("\n[Test 2] Stopping jog (zero velocities)...")
stop_cmd = {
"motion_group": motion_group_id,
"joint_velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
}
ws.send(json.dumps(stop_cmd))
print(f" Sent: {json.dumps(stop_cmd)}")
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
else: # TCP endpoint
print("\n[Test 1] Starting TCP translation (X+)...")
jog_cmd = {
"motion_group": motion_group_id,
"position_direction": {"x": 1, "y": 0, "z": 0},
"rotation_direction": {"x": 0, "y": 0, "z": 0},
"position_velocity": 10.0,
"rotation_velocity": 0.0,
"tcp": "Flange",
"coordinate_system": "world"
}
ws.send(json.dumps(jog_cmd))
print(f" Sent: {json.dumps(jog_cmd)}")
# Try to receive response
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
# Let it jog briefly
print(" Jogging for 1 second...")
time.sleep(1.0)
# Test 2: Stop jogging
print("\n[Test 2] Stopping jog (zero velocities)...")
stop_cmd = {
"motion_group": motion_group_id,
"position_direction": {"x": 0, "y": 0, "z": 0},
"rotation_direction": {"x": 0, "y": 0, "z": 0},
"position_velocity": 0,
"rotation_velocity": 0,
"tcp": "Flange",
"coordinate_system": "world"
}
ws.send(json.dumps(stop_cmd))
print(f" Sent: {json.dumps(stop_cmd)}")
try:
response = ws.recv(timeout=2.0)
print(f" Response: {response}")
except TimeoutError:
print(" No response received (timeout after 2s)")
# Close connection
print("\n[Test 3] Closing connection...")
ws.close()
print("✓ Connection closed successfully")
print("\n✓ All jogger tests completed!")
return 0
except Exception as e:
print(f"✗ Test failed: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())