| """ |
| Validation Test Script for Nova API v2 Jogging |
| |
| This script validates that v2 jogging message formats work correctly with a real Nova instance |
| BEFORE migrating the nova_jogger.py module. This is Phase 0 and must pass before implementation. |
| |
| Prerequisites: |
| - .env.local configured with Nova credentials |
| - Robot in safe position |
| - Robot controller accessible |
| |
| Usage: |
| python scripts/validate_v2_jogging.py |
| """ |
|
|
| import json |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Any, Dict, Optional |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| try: |
| from websockets.sync.client import connect |
| except ImportError: |
| print("ERROR: websockets package required") |
| print("Install with: pip install websockets") |
| sys.exit(1) |
|
|
| |
| def load_env_file(filepath: Path): |
| """Load environment variables from .env file.""" |
| if not filepath.exists(): |
| return |
| with open(filepath, 'r') as f: |
| for line in f: |
| line = line.strip() |
| if line and not line.startswith('#') and '=' in line: |
| key, value = line.split('=', 1) |
| os.environ[key.strip()] = value.strip() |
|
|
| env_local_path = Path(__file__).parent.parent / '.env.local' |
| load_env_file(env_local_path) |
|
|
| |
| INSTANCE_URL = os.getenv('NOVA_INSTANCE_URL') or os.getenv('NOVA_API') |
| ACCESS_TOKEN = os.getenv('NOVA_ACCESS_TOKEN') |
| CELL_ID = os.getenv('NOVA_CELL_ID', 'cell') |
| CONTROLLER_ID = os.getenv('NOVA_CONTROLLER_ID') |
| MOTION_GROUP_ID = os.getenv('NOVA_MOTION_GROUP_ID') |
| TCP_NAME = os.getenv('NOVA_TCP_NAME', 'Flange') |
|
|
| |
| test_results = [] |
|
|
| def check_config(): |
| """Verify required configuration is present.""" |
| print("=" * 70) |
| print("Nova API v2 Jogging Validation") |
| print("=" * 70) |
| print() |
|
|
| missing = [] |
| if not INSTANCE_URL: |
| missing.append("NOVA_INSTANCE_URL (or NOVA_API)") |
| if not ACCESS_TOKEN: |
| missing.append("NOVA_ACCESS_TOKEN") |
| if not CONTROLLER_ID: |
| missing.append("NOVA_CONTROLLER_ID") |
| if not MOTION_GROUP_ID: |
| missing.append("NOVA_MOTION_GROUP_ID") |
|
|
| if missing: |
| print("ERROR: Missing required environment variables:") |
| for var in missing: |
| print(f" - {var}") |
| print() |
| print("Please configure .env.local with Nova credentials") |
| sys.exit(1) |
|
|
| print("Configuration:") |
| print(f" Instance URL: {INSTANCE_URL}") |
| print(f" Cell ID: {CELL_ID}") |
| print(f" Controller ID: {CONTROLLER_ID}") |
| print(f" Motion Group ID: {MOTION_GROUP_ID}") |
| print(f" TCP Name: {TCP_NAME}") |
| print() |
|
|
| def create_websocket_url() -> str: |
| """Create the v2 jogging WebSocket URL.""" |
| ws_url = INSTANCE_URL.replace("https://", "wss://").replace("http://", "ws://") |
| url = f"{ws_url}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/execution/jogging" |
| |
| url += f"?token={ACCESS_TOKEN}" |
| return url |
|
|
| def log_test(name: str, passed: bool, message: str = "", details: Any = None): |
| """Log test result.""" |
| status = "✓ PASS" if passed else "✗ FAIL" |
| print(f"[{status}] {name}") |
| if message: |
| print(f" {message}") |
| if details: |
| print(f" Details: {json.dumps(details, indent=8)}") |
| print() |
|
|
| test_results.append({ |
| "test": name, |
| "passed": passed, |
| "message": message, |
| "details": details |
| }) |
|
|
| def test_1_connect_and_initialize(): |
| """Test 1: Connect to WebSocket and send InitializeJoggingRequest.""" |
| print("-" * 70) |
| print("Test 1: Connect and Initialize") |
| print("-" * 70) |
|
|
| try: |
| url = create_websocket_url() |
| print(f"Connecting to: {url[:80]}...") |
|
|
| ws = connect(url, open_timeout=10) |
| log_test("WebSocket Connection", True, "Successfully connected") |
|
|
| |
| init_msg = { |
| "message_type": "InitializeJoggingRequest", |
| "motion_group": MOTION_GROUP_ID, |
| "tcp": TCP_NAME, |
| "response_coordinate_system": "world" |
| } |
| print(f"Sending: {json.dumps(init_msg, indent=2)}") |
| ws.send(json.dumps(init_msg)) |
|
|
| |
| print("Waiting for response...") |
| response = ws.recv(timeout=5.0) |
| response_data = json.loads(response) |
| print(f"Received: {json.dumps(response_data, indent=2)}") |
|
|
| |
| result = response_data.get("result", response_data) |
| if "kind" in result: |
| if result["kind"] == "INITIALIZE_RECEIVED": |
| log_test("InitializeJoggingRequest", True, "Received INITIALIZE_RECEIVED", result) |
| ws.close() |
| return True, ws |
| else: |
| log_test("InitializeJoggingRequest", False, f"Unexpected kind: {result['kind']}", result) |
| ws.close() |
| return False, None |
| else: |
| log_test("InitializeJoggingRequest", False, "No 'kind' field in response", response_data) |
| ws.close() |
| return False, None |
|
|
| except Exception as e: |
| log_test("Test 1", False, f"Exception: {type(e).__name__}: {e}") |
| return False, None |
|
|
| def test_2_joint_velocity(): |
| """Test 2: Send JointVelocityRequest.""" |
| print("-" * 70) |
| print("Test 2: Joint Velocity Command") |
| print("-" * 70) |
|
|
| try: |
| url = create_websocket_url() |
| ws = connect(url, open_timeout=10) |
|
|
| |
| init_msg = { |
| "message_type": "InitializeJoggingRequest", |
| "motion_group": MOTION_GROUP_ID, |
| "tcp": TCP_NAME |
| } |
| ws.send(json.dumps(init_msg)) |
| response = ws.recv(timeout=5.0) |
| init_data = json.loads(response) |
| result = init_data.get('result', init_data) |
| print(f"Init response: {result.get('kind')}") |
|
|
| |
| initial_pose = init_data.get('current_pose', {}) |
| initial_joints = initial_pose.get('joint_position', {}).get('joints', []) |
| if initial_joints: |
| print(f"Initial joint 0 position: {initial_joints[0]:.4f} rad") |
|
|
| |
| jog_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0.05, 0, 0, 0, 0, 0] |
| } |
| print(f"Sending: {json.dumps(jog_msg, indent=2)}") |
|
|
| |
| moved = False |
| for i in range(5): |
| ws.send(json.dumps(jog_msg)) |
| time.sleep(0.1) |
| |
| try: |
| response = ws.recv(timeout=0.05) |
| state_data = json.loads(response) |
| if state_data.get('kind') == 'STATE_UPDATE': |
| current_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', []) |
| if current_joints and initial_joints: |
| delta = abs(current_joints[0] - initial_joints[0]) |
| if delta > 0.001: |
| moved = True |
| print(f" Robot moved! Joint 0 delta: {delta:.4f} rad") |
| except: |
| pass |
|
|
| |
| stop_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0, 0, 0, 0, 0, 0] |
| } |
| ws.send(json.dumps(stop_msg)) |
|
|
| |
| time.sleep(0.2) |
| try: |
| while True: |
| response = ws.recv(timeout=0.1) |
| state_data = json.loads(response) |
| if state_data.get('kind') == 'STATE_UPDATE': |
| final_joints = state_data.get('current_pose', {}).get('joint_position', {}).get('joints', []) |
| if final_joints and initial_joints: |
| delta = abs(final_joints[0] - initial_joints[0]) |
| if delta > 0.001: |
| moved = True |
| print(f"Final joint 0 position: {final_joints[0]:.4f} rad (moved {delta:.4f} rad)") |
| except: |
| pass |
|
|
| status_msg = "Joint velocity commands sent successfully" |
| if moved: |
| status_msg += " - ROBOT MOVED CONFIRMED" |
| else: |
| status_msg += " - WARNING: No movement detected" |
|
|
| log_test("JointVelocityRequest", True, status_msg) |
| ws.close() |
| return True |
|
|
| except Exception as e: |
| log_test("Test 2", False, f"Exception: {type(e).__name__}: {e}") |
| return False |
|
|
| def test_3_tcp_velocity(): |
| """Test 3: Send TcpVelocityRequest.""" |
| print("-" * 70) |
| print("Test 3: TCP Velocity Command") |
| print("-" * 70) |
|
|
| try: |
| url = create_websocket_url() |
| ws = connect(url, open_timeout=10) |
|
|
| |
| init_msg = { |
| "message_type": "InitializeJoggingRequest", |
| "motion_group": MOTION_GROUP_ID, |
| "tcp": TCP_NAME |
| } |
| ws.send(json.dumps(init_msg)) |
| response = ws.recv(timeout=5.0) |
| init_data = json.loads(response) |
| result = init_data.get('result', init_data) |
| print(f"Init response: {result.get('kind')}") |
|
|
| |
| tcp_msg = { |
| "message_type": "TcpVelocityRequest", |
| "translation": [5.0, 0.0, 0.0], |
| "rotation": [0.0, 0.0, 0.0], |
| "use_tool_coordinate_system": False |
| } |
| print(f"Sending: {json.dumps(tcp_msg, indent=2)}") |
|
|
| |
| for i in range(5): |
| ws.send(json.dumps(tcp_msg)) |
| time.sleep(0.1) |
|
|
| |
| stop_msg = { |
| "message_type": "TcpVelocityRequest", |
| "translation": [0.0, 0.0, 0.0], |
| "rotation": [0.0, 0.0, 0.0], |
| "use_tool_coordinate_system": False |
| } |
| ws.send(json.dumps(stop_msg)) |
|
|
| log_test("TcpVelocityRequest", True, "TCP velocity commands sent successfully") |
| ws.close() |
| return True |
|
|
| except Exception as e: |
| log_test("Test 3", False, f"Exception: {type(e).__name__}: {e}") |
| return False |
|
|
| def test_4_pause(): |
| """Test 4: Send PauseJoggingRequest.""" |
| print("-" * 70) |
| print("Test 4: Pause Command") |
| print("-" * 70) |
|
|
| try: |
| url = create_websocket_url() |
| ws = connect(url, open_timeout=10) |
|
|
| |
| init_msg = { |
| "message_type": "InitializeJoggingRequest", |
| "motion_group": MOTION_GROUP_ID, |
| "tcp": TCP_NAME |
| } |
| ws.send(json.dumps(init_msg)) |
| response = ws.recv(timeout=5.0) |
| init_data = json.loads(response) |
| result = init_data.get('result', init_data) |
| print(f"Init response: {result.get('kind')}") |
|
|
| |
| jog_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0.05, 0, 0, 0, 0, 0] |
| } |
| ws.send(json.dumps(jog_msg)) |
| time.sleep(0.2) |
|
|
| |
| pause_msg = {"message_type": "PauseJoggingRequest"} |
| print(f"Sending: {json.dumps(pause_msg, indent=2)}") |
| ws.send(json.dumps(pause_msg)) |
|
|
| log_test("PauseJoggingRequest", True, "Pause command sent successfully") |
| ws.close() |
| return True |
|
|
| except Exception as e: |
| log_test("Test 4", False, f"Exception: {type(e).__name__}: {e}") |
| return False |
|
|
| def test_5_continuous_sending(): |
| """Test 5: Continuous command sending at 10Hz.""" |
| print("-" * 70) |
| print("Test 5: Continuous Sending (10Hz)") |
| print("-" * 70) |
|
|
| try: |
| url = create_websocket_url() |
| ws = connect(url, open_timeout=10) |
|
|
| |
| init_msg = { |
| "message_type": "InitializeJoggingRequest", |
| "motion_group": MOTION_GROUP_ID, |
| "tcp": TCP_NAME |
| } |
| ws.send(json.dumps(init_msg)) |
| response = ws.recv(timeout=5.0) |
| init_data = json.loads(response) |
| result = init_data.get('result', init_data) |
| print(f"Init response: {result.get('kind')}") |
|
|
| |
| jog_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0.03, 0, 0, 0, 0, 0] |
| } |
|
|
| print("Sending 50 commands at 10Hz (5 seconds)...") |
| start_time = time.time() |
| for i in range(50): |
| ws.send(json.dumps(jog_msg)) |
| time.sleep(0.1) |
| if (i + 1) % 10 == 0: |
| print(f" Sent {i + 1}/50 commands") |
| duration = time.time() - start_time |
| print(f"Completed in {duration:.2f} seconds") |
|
|
| |
| stop_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0, 0, 0, 0, 0, 0] |
| } |
| ws.send(json.dumps(stop_msg)) |
|
|
| log_test("Continuous Sending", True, f"50 commands sent at 10Hz in {duration:.2f}s") |
| ws.close() |
| return True |
|
|
| except Exception as e: |
| log_test("Test 5", False, f"Exception: {type(e).__name__}: {e}") |
| return False |
|
|
| def get_robot_state(): |
| """Get current robot state from state stream.""" |
| try: |
| import urllib.request |
| url = f"{INSTANCE_URL}/api/v2/cells/{CELL_ID}/controllers/{CONTROLLER_ID}/motion-groups/{MOTION_GROUP_ID}/state" |
| headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} |
| req = urllib.request.Request(url, headers=headers) |
| with urllib.request.urlopen(req, timeout=5) as response: |
| data = json.loads(response.read()) |
| return data |
| except Exception as e: |
| print(f"Failed to get robot state: {e}") |
| return None |
|
|
| def test_6_multi_axis_joint(): |
| """Test 6: Move two joints simultaneously and verify movement.""" |
| print("-" * 70) |
| print("Test 6: Multi-Axis Joint Movement with Verification") |
| print("-" * 70) |
|
|
| try: |
| |
| initial_state = get_robot_state() |
| if not initial_state: |
| log_test("Multi-Axis Joint Movement", False, "Could not get initial robot state") |
| return False |
|
|
| initial_joints = initial_state.get('joint_position', []) |
| if not initial_joints or len(initial_joints) < 2: |
| log_test("Multi-Axis Joint Movement", False, "Invalid initial joint data") |
| return False |
|
|
| print(f"Initial joints [0,1]: [{initial_joints[0]:.4f}, {initial_joints[1]:.4f}] rad") |
|
|
| |
| url = create_websocket_url() |
| ws = connect(url, open_timeout=10) |
|
|
| |
| init_msg = { |
| "message_type": "InitializeJoggingRequest", |
| "motion_group": MOTION_GROUP_ID, |
| "tcp": TCP_NAME |
| } |
| ws.send(json.dumps(init_msg)) |
| response = ws.recv(timeout=5.0) |
| init_data = json.loads(response) |
| result = init_data.get('result', init_data) |
| print(f"Init response: {result.get('kind')}") |
|
|
| |
| jog_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0.05, 0.05, 0, 0, 0, 0] |
| } |
| print(f"Sending: {json.dumps(jog_msg, indent=2)}") |
|
|
| |
| print("Jogging for 1 second...") |
| for i in range(10): |
| ws.send(json.dumps(jog_msg)) |
| time.sleep(0.1) |
|
|
| |
| stop_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0, 0, 0, 0, 0, 0] |
| } |
| ws.send(json.dumps(stop_msg)) |
| ws.close() |
|
|
| |
| time.sleep(0.5) |
|
|
| |
| final_state = get_robot_state() |
| if not final_state: |
| log_test("Multi-Axis Joint Movement", False, "Could not get final robot state") |
| return False |
|
|
| final_joints = final_state.get('joint_position', []) |
| if not final_joints or len(final_joints) < 2: |
| log_test("Multi-Axis Joint Movement", False, "Invalid final joint data") |
| return False |
|
|
| print(f"Final joints [0,1]: [{final_joints[0]:.4f}, {final_joints[1]:.4f}] rad") |
|
|
| |
| delta_j0 = abs(final_joints[0] - initial_joints[0]) |
| delta_j1 = abs(final_joints[1] - initial_joints[1]) |
|
|
| print(f"Delta joint 0: {delta_j0:.4f} rad") |
| print(f"Delta joint 1: {delta_j1:.4f} rad") |
|
|
| |
| movement_threshold = 0.001 |
| j0_moved = delta_j0 > movement_threshold |
| j1_moved = delta_j1 > movement_threshold |
|
|
| if j0_moved and j1_moved: |
| log_test("Multi-Axis Joint Movement", True, |
| f"Both joints moved as expected (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})", |
| {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) |
| elif j0_moved or j1_moved: |
| log_test("Multi-Axis Joint Movement", False, |
| f"Only one joint moved (Δj0={delta_j0:.4f}, Δj1={delta_j1:.4f})", |
| {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) |
| else: |
| log_test("Multi-Axis Joint Movement", False, |
| "No movement detected in either joint", |
| {"delta_joint_0": delta_j0, "delta_joint_1": delta_j1}) |
|
|
| return j0_moved and j1_moved |
|
|
| except Exception as e: |
| log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}") |
| import traceback |
| traceback.print_exc() |
| return False |
|
|
| def test_7_mode_switching(): |
| """Test 6: Dynamic mode switching between joint and TCP.""" |
| print("-" * 70) |
| print("Test 6: Mode Switching (Joint ↔ TCP)") |
| print("-" * 70) |
|
|
| try: |
| url = create_websocket_url() |
| ws = connect(url, open_timeout=10) |
|
|
| |
| init_msg = { |
| "message_type": "InitializeJoggingRequest", |
| "motion_group": MOTION_GROUP_ID, |
| "tcp": TCP_NAME |
| } |
| ws.send(json.dumps(init_msg)) |
| response = ws.recv(timeout=5.0) |
| init_data = json.loads(response) |
| result = init_data.get('result', init_data) |
| print(f"Init response: {result.get('kind')}") |
|
|
| |
| print("Phase 1: Joint jogging...") |
| joint_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0.05, 0, 0, 0, 0, 0] |
| } |
| for i in range(10): |
| ws.send(json.dumps(joint_msg)) |
| time.sleep(0.1) |
|
|
| |
| print("Phase 2: Switching to TCP jogging...") |
| tcp_msg = { |
| "message_type": "TcpVelocityRequest", |
| "translation": [5.0, 0.0, 0.0], |
| "rotation": [0.0, 0.0, 0.0], |
| "use_tool_coordinate_system": False |
| } |
| for i in range(10): |
| ws.send(json.dumps(tcp_msg)) |
| time.sleep(0.1) |
|
|
| |
| print("Phase 3: Switching back to joint jogging...") |
| joint_msg2 = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0, 0, 0, 0, 0, 0.05] |
| } |
| for i in range(10): |
| ws.send(json.dumps(joint_msg2)) |
| time.sleep(0.1) |
|
|
| |
| stop_msg = { |
| "message_type": "JointVelocityRequest", |
| "velocity": [0, 0, 0, 0, 0, 0] |
| } |
| ws.send(json.dumps(stop_msg)) |
|
|
| log_test("Mode Switching", True, "Successfully switched between joint and TCP modes") |
| ws.close() |
| return True |
|
|
| except Exception as e: |
| log_test("Test 6", False, f"Exception: {type(e).__name__}: {e}") |
| return False |
|
|
| def print_summary(): |
| """Print test summary.""" |
| print() |
| print("=" * 70) |
| print("VALIDATION SUMMARY") |
| print("=" * 70) |
| print() |
|
|
| passed = sum(1 for r in test_results if r["passed"]) |
| total = len(test_results) |
|
|
| print(f"Tests passed: {passed}/{total}") |
| print() |
|
|
| if passed == total: |
| print("✓ ALL TESTS PASSED") |
| print() |
| print("The v2 API message formats have been validated.") |
| print("You can now proceed with Phase 1 (implementation).") |
| else: |
| print("✗ SOME TESTS FAILED") |
| print() |
| print("Please review the failures above before proceeding.") |
| print("Do NOT proceed to Phase 1 until all tests pass!") |
|
|
| print() |
| print("=" * 70) |
|
|
| |
| results_file = Path(__file__).parent / "v2_jogging_validation_results.json" |
| with open(results_file, 'w') as f: |
| json.dump({ |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
| "passed": passed, |
| "total": total, |
| "all_passed": passed == total, |
| "results": test_results |
| }, f, indent=2) |
| print(f"Results saved to: {results_file}") |
|
|
| def main(): |
| """Run all validation tests.""" |
| import sys |
| check_config() |
|
|
| |
| auto_mode = "--auto" in sys.argv |
|
|
| if not auto_mode: |
| print("WARNING: This will move the robot!") |
| print("Ensure the robot is in a safe position before proceeding.") |
| print() |
| response = input("Continue? [y/N]: ") |
| if response.lower() != 'y': |
| print("Aborted.") |
| return |
| else: |
| print("Running in AUTO mode (skipping safety confirmation)") |
|
|
| print() |
|
|
| |
| test_1_connect_and_initialize() |
| test_2_joint_velocity() |
| test_3_tcp_velocity() |
| test_4_pause() |
| test_5_continuous_sending() |
| test_6_multi_axis_joint() |
| test_7_mode_switching() |
|
|
| |
| print_summary() |
|
|
| if __name__ == "__main__": |
| main() |
|
|