nova-sim / tests /test_homing.py
Georg
Implement blocking homing endpoint and refactor homing logic in mujoco_server.py
e9e2294
"""Comprehensive homing test for nova-sim."""
import time
import math
import sys
import os
import json
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test_client import NovaSimTestClient
def test_homing():
"""Test multi-axis homing: verify robot moves all joints simultaneously to home pose."""
print("\n" + "=" * 70)
print("Multi-Axis Homing Test (Nova API v2)")
print("=" * 70)
print("\nThis test verifies:")
print(" 1. Robot position can be read from state stream")
print(" 2. Calling the blocking homing endpoint triggers multi-axis homing")
print(" 3. All joints move simultaneously toward home_pose")
print(" 4. Robot reaches home within acceptable tolerance")
print(" 5. Multi-axis jogging is faster than sequential")
# Expected home pose (from scene XML)
expected_home = [-3.22, -1.14, 1.93, -2.36, -1.57, -1.72]
# Create test client
print(f"\n[1] Connecting to Nova-Sim...")
client = NovaSimTestClient("http://localhost:3004/nova-sim/api/v1")
try:
client.connect()
print("✓ Connected to Nova-Sim")
# Get initial position
time.sleep(0.5)
initial_joints = client.get_joint_positions()
print(f"\n[2] Initial joint positions:")
print(f" [{', '.join(f'{j:7.4f}' for j in initial_joints)}]")
# Calculate initial error from home
initial_errors = [abs(actual - target) for actual, target in zip(initial_joints, expected_home)]
initial_max_error = max(initial_errors)
print(f" Max error from home: {initial_max_error:.4f} rad ({math.degrees(initial_max_error):.2f}°)")
# Check if already at home
if initial_max_error < 0.1:
print(f" ℹ Robot already near home position")
print(f" Note: In digital twin mode, robot position comes from real hardware")
# Start homing using the blocking endpoint
print(f"\n[3] Starting multi-axis homing (Nova API v2)...")
print(" All joints will move simultaneously toward home")
print(" Calling /homing with a 30 second timeout...")
home_start_time = time.time()
joints_before = initial_joints.copy()
home_result = client.home_blocking(timeout_s=30.0, tolerance=0.01, poll_interval_s=0.1)
elapsed = time.time() - home_start_time
print(f" Homing result: {home_result.get('status')} in {elapsed:.2f}s")
homing_time = time.time() - home_start_time
# Get final position
time.sleep(0.5)
final_joints = client.get_joint_positions()
print(f"\n[4] Final joint positions after {homing_time:.1f}s:")
print(f" [{', '.join(f'{j:7.4f}' for j in final_joints)}]")
# Calculate movement
total_movement = sum(abs(after - before) for before, after in zip(joints_before, final_joints))
print(f" Total joint movement: {total_movement:.3f} rad")
# Verify final position
print(f"\n[5] Verifying final position against configured home...")
print(f" Expected home: [{', '.join(f'{j:7.4f}' for j in expected_home)}]")
joint_errors = [abs(actual - target) for actual, target in zip(final_joints, expected_home)]
print(f"\n Joint errors:")
tolerance = 0.1 # 0.1 rad = 5.7 degrees
for i, (error, actual, target) in enumerate(zip(joint_errors, final_joints, expected_home), 1):
status = "✓" if error < tolerance else "⚠"
print(f" {status} Joint {i}: error={error:.4f} rad ({math.degrees(error):.2f}°), "
f"actual={actual:.4f}, target={target:.4f}")
max_error = max(joint_errors)
avg_error = sum(joint_errors) / len(joint_errors)
print(f"\n Max error: {max_error:.4f} rad ({math.degrees(max_error):.2f}°)")
print(f" Average error: {avg_error:.4f} rad ({math.degrees(avg_error):.2f}°)")
# Check scene objects
scene_objects = client.get_scene_objects()
if scene_objects:
print(f"\n[6] Scene objects:")
for obj in scene_objects:
pos = obj["position"]
print(f" - {obj['name']}: pos=({pos['x']:.3f}, {pos['y']:.3f}, {pos['z']:.3f})")
# Assessment
print("\n" + "=" * 70)
success = False
if max_error < 0.01:
print("✓ EXCELLENT - Robot at home within 0.01 rad (0.57°)")
success = True
elif max_error < 0.05:
print("✓ GOOD - Robot at home within 0.05 rad (2.9°)")
success = True
elif max_error < 0.1:
print("⚠ ACCEPTABLE - Robot within 0.1 rad (5.7°) of home")
success = True
else:
print(f"✗ FAILED - Robot not at home (max error: {math.degrees(max_error):.1f}°)")
print(f"\nNote: In digital twin mode with Nova API connected:")
print(f" - Robot position mirrors real hardware")
print(f" - Homing requires working Nova API jogging integration")
print(f" - Check that Nova Jogger is properly configured and connected")
if total_movement < 0.01:
print(f"\n⚠ WARNING: Robot did not move during homing (movement: {total_movement:.3f} rad)")
print(f" - This may indicate jogging commands are not reaching the robot")
print(f" - Check server logs for jogging errors")
print("=" * 70)
# Only assert success on movement if robot wasn't already at home
if initial_max_error > 0.1:
assert success, f"Robot did not reach home position (max error: {math.degrees(max_error):.1f}°)"
if total_movement < 0.01:
print(f"\n⚠ Robot didn't move but test passed (was it already at home?)")
else:
print(f"\n✓ Test completed (robot was already near home)")
finally:
# Cleanup
try:
client.close()
print("\n✓ Disconnected from Nova-Sim")
except Exception as e:
print(f"\n⚠ Error during disconnect: {e}")
def test_drift_prevention():
"""Test that robot doesn't drift after blocking homing completes."""
print("\n" + "=" * 70)
print("Drift Prevention Test")
print("=" * 70)
print("\nThis test verifies that the robot stops moving after blocking homing.\n")
client = NovaSimTestClient("http://localhost:3004/nova-sim/api/v1")
try:
client.connect()
print("✓ Connected to Nova-Sim")
# Get initial position
time.sleep(0.5)
initial_joints = client.get_joint_positions()
print(f"\n[1] Initial joint positions:")
print(f" [{', '.join(f'{j:7.4f}' for j in initial_joints)}]")
# Run homing
print(f"\n[2] Calling blocking homing endpoint...")
client.home_blocking(timeout_s=10.0, tolerance=0.05, poll_interval_s=0.1)
# Get position right after stopping
time.sleep(0.1)
position_after_homing = client.get_joint_positions()
print(f"\n[3] Position immediately after homing:")
print(f" [{', '.join(f'{j:7.4f}' for j in position_after_homing)}]")
# Wait 2 seconds and check if robot drifted
print(f"\n[4] Waiting 2 seconds to check for drift...")
time.sleep(2.0)
final_position = client.get_joint_positions()
print(f"\n[5] Position 2 seconds after homing:")
print(f" [{', '.join(f'{j:7.4f}' for j in final_position)}]")
# Calculate drift
max_drift = 0
drift_joint = None
for i, (before, after) in enumerate(zip(position_after_homing, final_position)):
drift = abs(after - before)
if drift > max_drift:
max_drift = drift
drift_joint = i
print(f"\n[6] Results:")
print(f" Max drift: {max_drift:.4f} rad ({math.degrees(max_drift):.2f}°)")
if drift_joint is not None:
print(f" Joint with max drift: Joint {drift_joint + 1}")
# A small amount of drift is acceptable due to gravity/dynamics
# But large drift indicates jogging didn't stop
drift_threshold = 0.005 # 0.005 rad ≈ 0.29°
print("\n" + "=" * 70)
if max_drift < drift_threshold:
print(f"✓ NO DRIFT - Robot stopped properly (drift < {drift_threshold:.4f} rad)")
success = True
else:
print(f"✗ DRIFT DETECTED - Robot kept moving after homing!")
print(f" Check server logs for jogging stop behavior.")
success = False
print("=" * 70)
client.close()
print("\n✓ Disconnected")
assert success, f"Drift detected: {max_drift:.4f} rad exceeds threshold {drift_threshold:.4f} rad"
except Exception as e:
print(f"\n✗ Test error: {e}")
import traceback
traceback.print_exc()
try:
client.close()
except:
pass
raise
def test_overshoot_detection():
"""Test that homing detects and corrects overshoots by reducing velocity."""
print("\n" + "=" * 70)
print("Overshoot Detection Test")
print("=" * 70)
print("\nThis test verifies that homing detects overshoots and reduces velocity.")
print("It will move joint 1 away from home, then home it and monitor for overshoots.\n")
client = NovaSimTestClient("http://localhost:3004/nova-sim/api/v1")
try:
client.connect()
print("✓ Connected to Nova-Sim")
# Get home position
time.sleep(0.5)
home_joints = [-3.2200, -1.1400, 1.9300, -2.3600, -1.5700, -1.7200]
print(f"\n[1] Home position:")
print(f" [{', '.join(f'{j:7.4f}' for j in home_joints)}]")
# Move joint 1 far from home
print(f"\n[2] Moving joint 1 to +1.0 rad (far from home of -3.22 rad)...")
# Send action messages to move joint 1 positive
duration = 2.0
rate_hz = 10
num_messages = int(duration * rate_hz)
for i in range(num_messages):
client.ws.send(json.dumps({
'type': 'action',
'data': {'j1': 1.0} # Move joint 1 in positive direction
}))
time.sleep(1.0 / rate_hz)
# Stop moving
client.ws.send(json.dumps({'type': 'action', 'data': {}}))
time.sleep(0.5)
# Get position after movement
moved_joints = client.get_joint_positions()
print(f"\n[3] Position after moving joint 1:")
print(f" [{', '.join(f'{j:7.4f}' for j in moved_joints)}]")
print(f" Joint 1 error from home: {abs(moved_joints[0] - home_joints[0]):.4f} rad")
# Now home using the blocking endpoint
print(f"\n[4] Starting homing...")
client.home_blocking(timeout_s=15.0, tolerance=0.01, poll_interval_s=0.1)
# Get final position
time.sleep(0.5)
final_joints = client.get_joint_positions()
print(f"\n[5] Final position:")
print(f" [{', '.join(f'{j:7.4f}' for j in final_joints)}]")
# Check all joints
print(f"\n[6] Final errors from home:")
max_error = 0
for i, (actual, target) in enumerate(zip(final_joints, home_joints)):
error = abs(actual - target)
max_error = max(max_error, error)
status = "✓" if error < 0.01 else "✗"
print(f" {status} Joint {i+1}: error={error:.4f} rad ({math.degrees(error):.2f}°)")
print(f"\n[7] Results:")
print(f" Max final error: {max_error:.4f} rad ({math.degrees(max_error):.2f}°)")
print("\n" + "=" * 70)
if max_error < 0.05:
print(f"✓ Homing succeeded (max error < 0.05 rad)")
success = True
else:
print(f"⚠ Homing incomplete (max error = {max_error:.4f} rad)")
success = False
print(f"✓ Homing completed via blocking endpoint")
print("=" * 70)
client.close()
print("\n✓ Disconnected")
assert success, f"Homing failed with max error {max_error:.4f} rad"
except Exception as e:
print(f"\n✗ Test error: {e}")
import traceback
traceback.print_exc()
try:
client.close()
except:
pass
raise
if __name__ == "__main__":
print("\n" + "=" * 70)
print("Nova-Sim Homing Test")
print("=" * 70)
print("\nThis test requires Nova-Sim to be running.")
print("Start it with:")
print(" cd /Users/georgpuschel/repos/robot-ml/nova-sim")
print(" source .venv/bin/activate")
print(" python mujoco_server.py --robot ur5_t_push")
if sys.stdin.isatty():
print("\nPress Enter when Nova-Sim is ready...")
input()
else:
print("\nRunning in non-interactive mode, proceeding in 2 seconds...\n")
time.sleep(2)
# Check if specific test requested
test_name = sys.argv[1] if len(sys.argv) > 1 else "all"
all_passed = True
try:
if test_name in ("all", "basic"):
print("\n" + "="*70)
print("Running: Basic Homing Test")
print("="*70)
try:
test_homing()
print("\n✓ Basic Homing Test PASSED")
except Exception as e:
print(f"\n✗ Basic Homing Test FAILED: {e}")
all_passed = False
if test_name in ("all", "drift"):
print("\n" + "="*70)
print("Running: Drift Prevention Test")
print("="*70)
try:
test_drift_prevention()
print("\n✓ Drift Prevention Test PASSED")
except Exception as e:
print(f"\n✗ Drift Prevention Test FAILED: {e}")
all_passed = False
if test_name in ("all", "overshoot"):
print("\n" + "="*70)
print("Running: Overshoot Detection Test")
print("="*70)
try:
test_overshoot_detection()
print("\n✓ Overshoot Detection Test PASSED")
except Exception as e:
print(f"\n✗ Overshoot Detection Test FAILED: {e}")
all_passed = False
print("\n" + "="*70)
if all_passed:
print("✓ ALL TESTS COMPLETED SUCCESSFULLY")
print("="*70)
print("\nCheck /tmp/nova_sim_server.log for detailed server logs")
sys.exit(0)
else:
print("✗ SOME TESTS FAILED")
print("="*70)
print("\nCheck /tmp/nova_sim_server.log for detailed server logs")
sys.exit(1)
except KeyboardInterrupt:
print("\n\n✗ Tests interrupted by user")
sys.exit(1)