Georg commited on
Commit
07bb707
·
1 Parent(s): d428055

Add environment loading utility and integrate into Nova API and Jogger

Browse files

- Introduced a new utility module for loading environment variables from `.env.local` to streamline configuration management.
- Integrated the environment loading functionality into the Nova API and Jogger modules to ensure proper configuration during runtime.
- Updated the UR5 environment to utilize the loaded home pose for joint initialization, enhancing simulation consistency.

robots/ur5/env_loader.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility helpers for loading Nova-related environment files.
3
+ """
4
+
5
+ import os
6
+ from pathlib import Path
7
+
8
+
9
+ def load_local_env() -> None:
10
+ """Load `.env.local` from the nova-sim root so configs pick up saved credentials."""
11
+ root = Path(__file__).resolve().parents[2]
12
+ env_path = root / ".env.local"
13
+ if not env_path.exists():
14
+ return
15
+
16
+ with env_path.open("r") as fp:
17
+ for raw in fp:
18
+ line = raw.strip()
19
+ if not line or line.startswith("#") or "=" not in line:
20
+ continue
21
+ key, value = line.split("=", 1)
22
+ os.environ.setdefault(key.strip(), value.strip())
robots/ur5/nova_api.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import json
2
  import math
3
  import os
@@ -6,6 +7,7 @@ import time
6
  import urllib.error
7
  import urllib.request
8
  from dataclasses import dataclass
 
9
  from typing import Any, Dict, Optional, Tuple
10
 
11
  try:
@@ -16,6 +18,21 @@ except Exception as e: # pragma: no cover - optional dependency
16
  connect = None
17
 
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  @dataclass
20
  class NovaApiConfig:
21
  instance_url: str
@@ -103,6 +120,7 @@ class NovaApiClient:
103
  self._state_thread: Optional[threading.Thread] = None
104
  self._state_ws = None
105
  self._motion_group_description: Optional[Dict[str, Any]] = None
 
106
 
107
  def start_state_stream(self) -> None:
108
  if connect is None:
@@ -187,6 +205,20 @@ class NovaApiClient:
187
  continue
188
  with self._state_lock:
189
  self._latest_state = data
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
  ws.close()
191
  except Exception as exc:
192
  print(f"Nova state stream error: {exc}")
 
1
+ import importlib.util
2
  import json
3
  import math
4
  import os
 
7
  import urllib.error
8
  import urllib.request
9
  from dataclasses import dataclass
10
+ from pathlib import Path
11
  from typing import Any, Dict, Optional, Tuple
12
 
13
  try:
 
18
  connect = None
19
 
20
 
21
+ def _load_local_env_module():
22
+ try:
23
+ from robots.ur5 import env_loader
24
+ except ImportError:
25
+ env_path = Path(__file__).resolve().parent / "env_loader.py"
26
+ spec = importlib.util.spec_from_file_location("robots.ur5.env_loader", env_path)
27
+ module = importlib.util.module_from_spec(spec)
28
+ spec.loader.exec_module(module)
29
+ env_loader = module
30
+ env_loader.load_local_env()
31
+
32
+
33
+ _load_local_env_module()
34
+
35
+
36
  @dataclass
37
  class NovaApiConfig:
38
  instance_url: str
 
120
  self._state_thread: Optional[threading.Thread] = None
121
  self._state_ws = None
122
  self._motion_group_description: Optional[Dict[str, Any]] = None
123
+ self._last_state_log_time = 0.0
124
 
125
  def start_state_stream(self) -> None:
126
  if connect is None:
 
205
  continue
206
  with self._state_lock:
207
  self._latest_state = data
208
+ now = time.time()
209
+ if now - self._last_state_log_time >= 10.0:
210
+ self._last_state_log_time = now
211
+ result = data.get("result", data)
212
+ seq = result.get("sequence_number", "N/A")
213
+ joints = result.get("joint_position")
214
+ joint_info = f"{len(joints)} joints" if isinstance(joints, list) else "no joints"
215
+ payload = {
216
+ "seq": seq,
217
+ "joint_position": joints,
218
+ "timestamp": result.get("timestamp")
219
+ }
220
+ payload_str = json.dumps(payload, default=str)
221
+ print(f"[Nova] State stream update (seq={seq}, {joint_info}) payload={payload_str}")
222
  ws.close()
223
  except Exception as exc:
224
  print(f"Nova state stream error: {exc}")
robots/ur5/nova_jogger.py CHANGED
@@ -3,11 +3,13 @@ Nova Jogger Client
3
  Handles bidirectional jogging control for Nova API motion groups.
4
  """
5
 
 
6
  import json
7
  import threading
8
  import time
9
  import urllib.error
10
  import urllib.request
 
11
  from typing import Optional, Literal, Dict, Any
12
 
13
  try:
@@ -17,6 +19,21 @@ except Exception as e:
17
  connect = None
18
 
19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  JoggingMode = Literal["joint", "cartesian"]
21
  JogDirection = Literal["positive", "negative", "+", "-"] # Accept both formats, normalize to positive/negative
22
  CartesianAxis = Literal["x", "y", "z"]
 
3
  Handles bidirectional jogging control for Nova API motion groups.
4
  """
5
 
6
+ import importlib.util
7
  import json
8
  import threading
9
  import time
10
  import urllib.error
11
  import urllib.request
12
+ from pathlib import Path
13
  from typing import Optional, Literal, Dict, Any
14
 
15
  try:
 
19
  connect = None
20
 
21
 
22
+ def _load_local_env_module():
23
+ try:
24
+ from robots.ur5 import env_loader
25
+ except ImportError:
26
+ env_path = Path(__file__).resolve().parent / "env_loader.py"
27
+ spec = importlib.util.spec_from_file_location("robots.ur5.env_loader", env_path)
28
+ module = importlib.util.module_from_spec(spec)
29
+ spec.loader.exec_module(module)
30
+ env_loader = module
31
+ env_loader.load_local_env()
32
+
33
+
34
+ _load_local_env_module()
35
+
36
+
37
  JoggingMode = Literal["joint", "cartesian"]
38
  JogDirection = Literal["positive", "negative", "+", "-"] # Accept both formats, normalize to positive/negative
39
  CartesianAxis = Literal["x", "y", "z"]
robots/ur5/ur5_env.py CHANGED
@@ -8,6 +8,7 @@ import mujoco
8
  import os
9
  import importlib.util
10
  import math
 
11
  import random
12
 
13
  # Import IK controller
@@ -32,7 +33,7 @@ TABLE_PAD_TOP = 0.35 # meters: upper boundary (positive Y) for randomized targe
32
  TABLE_SURFACE_Z = 0.53 # meters: actual table-top surface height from the XML scene
33
  TABLE_OBJECT_HEIGHT = 0.03 # meters: height of the movable T-object along Z (from scene)
34
  TABLE_MIN_DISTANCE = 0.12 # meters: minimum planar distance between target and object at reset
35
- TARGET_HEIGHT_OFFSET = 0.3 # meters: raise IK target above the goal/object when initializing
36
  TABLE_OBJECT_SPAWN_Z = TABLE_SURFACE_Z + TABLE_OBJECT_HEIGHT # center height so object rests on table
37
 
38
 
@@ -163,8 +164,10 @@ class UR5Env(gym.Env):
163
  )
164
 
165
  # Set initial joint positions to home pose and compute FK
166
- self.data.qpos[:6] = self.DEFAULT_HOME_POSE.copy()
167
- self.data.ctrl[:6] = self.DEFAULT_HOME_POSE.copy()
 
 
168
  mujoco.mj_forward(self.model, self.data)
169
 
170
  # Target position/orientation from FK of home pose
@@ -185,7 +188,7 @@ class UR5Env(gym.Env):
185
  # Control mode: 'ik' (end-effector target) or 'joint' (direct joint positions)
186
  self._control_mode = 'ik'
187
  # Direct joint targets (used when control_mode is 'joint')
188
- self._joint_targets = self.DEFAULT_HOME_POSE.copy()
189
 
190
  # Configure Nova API if requested (must be after Nova attributes are initialized)
191
  if nova_api_config:
@@ -287,6 +290,25 @@ class UR5Env(gym.Env):
287
  self.data.qpos[qpos_adr:qpos_adr + 3] = pos
288
  self.data.qpos[qpos_adr + 3:qpos_adr + 7] = quat
289
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
290
  @staticmethod
291
  def _sample_table_position():
292
  x = random.uniform(TABLE_PAD_LEFT, TABLE_PAD_RIGHT)
@@ -325,6 +347,9 @@ class UR5Env(gym.Env):
325
  x, y, z: Target position in meters
326
  update_joint_targets: If True, compute IK and update joint targets
327
  """
 
 
 
328
  self._target_pos = np.array([x, y, z], dtype=np.float32)
329
  # Update mocap body position for visualization
330
  self.data.mocap_pos[0] = self._target_pos
@@ -397,7 +422,7 @@ class UR5Env(gym.Env):
397
  try:
398
  if jog_type == 'joint':
399
  return self._nova_jogger.start_joint_jog(
400
- joint=kwargs.get('joint', 0),
401
  direction=kwargs.get('direction', '+'),
402
  velocity_rads_per_sec=kwargs.get('velocity', 0.5)
403
  )
@@ -613,8 +638,8 @@ class UR5Env(gym.Env):
613
  mujoco.mj_resetData(self.model, self.data)
614
 
615
  # Set to home pose
616
- self.data.qpos[:6] = self.DEFAULT_HOME_POSE.copy()
617
- self.data.ctrl[:6] = self.DEFAULT_HOME_POSE.copy()
618
  if self.has_gripper:
619
  self.data.ctrl[6] = 0 # Gripper open (Robotiq: 0=open, 255=closed)
620
 
@@ -645,7 +670,7 @@ class UR5Env(gym.Env):
645
  self.data.mocap_pos[0] = self._target_pos
646
  if self.has_gripper:
647
  self._gripper_target = 0.0 # Open gripper
648
- self._joint_targets = self.DEFAULT_HOME_POSE.copy()
649
 
650
  self.steps = 0
651
 
@@ -654,8 +679,19 @@ class UR5Env(gym.Env):
654
 
655
  def step(self, action):
656
  """Step with explicit action (for RL training)."""
657
- # Apply action to actuators
658
- self.data.ctrl[:] = action
 
 
 
 
 
 
 
 
 
 
 
659
 
660
  # Step simulation
661
  mujoco.mj_step(self.model, self.data)
@@ -802,5 +838,19 @@ class UR5Env(gym.Env):
802
  self.data.qvel[:6] = 0.0
803
  self.data.ctrl[:6] = self.data.qpos[:6]
804
  mujoco.mj_forward(self.model, self.data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
805
  self._last_nova_sequence = sequence
806
  return True
 
8
  import os
9
  import importlib.util
10
  import math
11
+ import xml.etree.ElementTree as ET
12
  import random
13
 
14
  # Import IK controller
 
33
  TABLE_SURFACE_Z = 0.53 # meters: actual table-top surface height from the XML scene
34
  TABLE_OBJECT_HEIGHT = 0.03 # meters: height of the movable T-object along Z (from scene)
35
  TABLE_MIN_DISTANCE = 0.12 # meters: minimum planar distance between target and object at reset
36
+ TARGET_HEIGHT_OFFSET = 0.05 # raise IK target slightly above the goal/object so the stick ends close to the table
37
  TABLE_OBJECT_SPAWN_Z = TABLE_SURFACE_Z + TABLE_OBJECT_HEIGHT # center height so object rests on table
38
 
39
 
 
164
  )
165
 
166
  # Set initial joint positions to home pose and compute FK
167
+ loaded_pose = self._load_home_pose(model_path)
168
+ self.home_pose = loaded_pose if loaded_pose is not None else self.DEFAULT_HOME_POSE.copy()
169
+ self.data.qpos[:6] = self.home_pose.copy()
170
+ self.data.ctrl[:6] = self.home_pose.copy()
171
  mujoco.mj_forward(self.model, self.data)
172
 
173
  # Target position/orientation from FK of home pose
 
188
  # Control mode: 'ik' (end-effector target) or 'joint' (direct joint positions)
189
  self._control_mode = 'ik'
190
  # Direct joint targets (used when control_mode is 'joint')
191
+ self._joint_targets = self._compute_joint_targets()
192
 
193
  # Configure Nova API if requested (must be after Nova attributes are initialized)
194
  if nova_api_config:
 
290
  self.data.qpos[qpos_adr:qpos_adr + 3] = pos
291
  self.data.qpos[qpos_adr + 3:qpos_adr + 7] = quat
292
 
293
+ def _load_home_pose(self, xml_path: str) -> np.ndarray | None:
294
+ """Load the UR5 home qpos from the scene XML keyframe named 'home'."""
295
+ try:
296
+ tree = ET.parse(xml_path)
297
+ root = tree.getroot()
298
+ except Exception:
299
+ return None
300
+ for key in root.findall(".//key"):
301
+ if key.get("name") != "home":
302
+ continue
303
+ qpos = key.get("qpos")
304
+ if not qpos:
305
+ continue
306
+ values = [float(val) for val in qpos.split()]
307
+ if len(values) < 6:
308
+ continue
309
+ return np.array(values[:6], dtype=np.float32)
310
+ return None
311
+
312
  @staticmethod
313
  def _sample_table_position():
314
  x = random.uniform(TABLE_PAD_LEFT, TABLE_PAD_RIGHT)
 
347
  x, y, z: Target position in meters
348
  update_joint_targets: If True, compute IK and update joint targets
349
  """
350
+ if self._use_nova_state_stream and self._nova_client is not None:
351
+ print("[Nova] Ignoring local target updates while Nova state stream is active")
352
+ return
353
  self._target_pos = np.array([x, y, z], dtype=np.float32)
354
  # Update mocap body position for visualization
355
  self.data.mocap_pos[0] = self._target_pos
 
422
  try:
423
  if jog_type == 'joint':
424
  return self._nova_jogger.start_joint_jog(
425
+ joint=max(0, min(5, kwargs.get('joint', 1) - 1)),
426
  direction=kwargs.get('direction', '+'),
427
  velocity_rads_per_sec=kwargs.get('velocity', 0.5)
428
  )
 
638
  mujoco.mj_resetData(self.model, self.data)
639
 
640
  # Set to home pose
641
+ self.data.qpos[:6] = self.home_pose.copy()
642
+ self.data.ctrl[:6] = self.home_pose.copy()
643
  if self.has_gripper:
644
  self.data.ctrl[6] = 0 # Gripper open (Robotiq: 0=open, 255=closed)
645
 
 
670
  self.data.mocap_pos[0] = self._target_pos
671
  if self.has_gripper:
672
  self._gripper_target = 0.0 # Open gripper
673
+ self._joint_targets = self.home_pose.copy()
674
 
675
  self.steps = 0
676
 
 
679
 
680
  def step(self, action):
681
  """Step with explicit action (for RL training)."""
682
+ if self._use_nova_state_stream and self._nova_client is not None:
683
+ synced = self._sync_nova_state()
684
+ if synced and self._latest_nova_joints is not None:
685
+ self.data.ctrl[:6] = self._latest_nova_joints
686
+ # Keep the joints aligned with Nova even though the RL action is ignored
687
+ self.data.qpos[:6] = self._latest_nova_joints
688
+ self.data.qvel[:6] = 0.0
689
+ else:
690
+ # Nova data unavailable; fall back to the provided action
691
+ self.data.ctrl[:] = action
692
+ else:
693
+ # Apply action to actuators (default RL mode)
694
+ self.data.ctrl[:] = action
695
 
696
  # Step simulation
697
  mujoco.mj_step(self.model, self.data)
 
838
  self.data.qvel[:6] = 0.0
839
  self.data.ctrl[:6] = self.data.qpos[:6]
840
  mujoco.mj_forward(self.model, self.data)
841
+ tcp_pose = result.get("tcp_pose") or {}
842
+ pos = tcp_pose.get("position")
843
+ if pos and len(pos) == 3:
844
+ self._target_pos = np.array(pos, dtype=np.float32)
845
+ self.data.mocap_pos[0] = self._target_pos
846
+ orientation = tcp_pose.get("orientation")
847
+ if orientation and len(orientation) == 4:
848
+ roll, pitch, yaw = IKController.quat_to_euler(
849
+ float(orientation[0]),
850
+ float(orientation[1]),
851
+ float(orientation[2]),
852
+ float(orientation[3]),
853
+ )
854
+ self._target_euler = np.array([roll, pitch, yaw], dtype=np.float32)
855
  self._last_nova_sequence = sequence
856
  return True
tests/test_nova_functionality.py ADDED
@@ -0,0 +1,99 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+
4
+ import pytest
5
+ from pathlib import Path
6
+
7
+ from robots.ur5 import nova_api, nova_jogger
8
+
9
+
10
+ def _load_env():
11
+ """Load `.env.local` if present so the Nova config can pull its values."""
12
+ env_path = Path(__file__).parent.parent / ".env.local"
13
+ if not env_path.exists():
14
+ return
15
+ with env_path.open("r") as fp:
16
+ for raw in fp:
17
+ line = raw.strip()
18
+ if not line or line.startswith("#") or "=" not in line:
19
+ continue
20
+ key, value = line.split("=", 1)
21
+ os.environ.setdefault(key.strip(), value.strip())
22
+
23
+
24
+ _load_env()
25
+
26
+
27
+ @pytest.fixture(scope="module")
28
+ def nova_config():
29
+ """Provide NovaApiConfig loaded from environment or skip if not configured."""
30
+ try:
31
+ return nova_api.NovaApiConfig.from_env()
32
+ except ValueError as exc:
33
+ pytest.skip(f"Nova configuration missing: {exc}")
34
+
35
+
36
+ def _create_jogger(config: nova_api.NovaApiConfig) -> nova_jogger.NovaJogger:
37
+ return nova_jogger.NovaJogger(
38
+ config.instance_url,
39
+ config.access_token,
40
+ config.cell_id,
41
+ config.controller_id,
42
+ config.motion_group_id,
43
+ )
44
+
45
+
46
+ def test_state_stream_receives_updates(nova_config):
47
+ """Ensure the state stream connects and delivers joint positions."""
48
+ if nova_api.connect is None:
49
+ pytest.skip("websockets package required for Nova state stream")
50
+
51
+ client = nova_api.NovaApiClient(nova_config)
52
+ client.start_state_stream()
53
+ try:
54
+ deadline = time.time() + 10.0
55
+ state = None
56
+ while time.time() < deadline:
57
+ state = client.get_latest_state()
58
+ if state:
59
+ break
60
+ time.sleep(0.1)
61
+ assert state, "No state updates were received"
62
+ result = state.get("result", state)
63
+ assert "joint_position" in result, "Expected joint_position field in state payload"
64
+ assert isinstance(result["joint_position"], list)
65
+ assert client.is_state_stream_connected()
66
+ finally:
67
+ client.stop()
68
+
69
+
70
+ def test_jogger_joint_connection(nova_config):
71
+ """Connect the jogger and exercise a zero-velocity joint command."""
72
+ if nova_jogger.connect is None:
73
+ pytest.skip("websockets package required for Nova jogging")
74
+
75
+ jogger = _create_jogger(nova_config)
76
+ try:
77
+ assert jogger.connect()
78
+ assert jogger.start_joint_jog(0, "+", velocity_rads_per_sec=0.0)
79
+ assert jogger.is_connected()
80
+ time.sleep(0.2)
81
+ assert jogger.stop()
82
+ finally:
83
+ jogger.disconnect()
84
+
85
+
86
+ def test_jogger_cartesian_connection(nova_config):
87
+ """Connect the jogger and exercise a zero-velocity cartesian command."""
88
+ if nova_jogger.connect is None:
89
+ pytest.skip("websockets package required for Nova jogging")
90
+
91
+ jogger = _create_jogger(nova_config)
92
+ try:
93
+ assert jogger.connect()
94
+ assert jogger.start_cartesian_translation("z", "+", velocity_mm_per_sec=0.0)
95
+ assert jogger.is_connected()
96
+ time.sleep(0.2)
97
+ assert jogger.stop()
98
+ finally:
99
+ jogger.disconnect()
tests/test_nova_state_stream_commands.py ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import math
2
+ import time
3
+
4
+ import pytest
5
+
6
+ from robots.ur5 import env_loader, nova_api, nova_jogger
7
+
8
+
9
+ env_loader.load_local_env()
10
+
11
+
12
+ def _get_result(state: dict) -> dict:
13
+ return state.get("result", state)
14
+
15
+
16
+ def _wait_for_state_condition(client: nova_api.NovaApiClient, predicate, timeout: float = 10.0) -> dict:
17
+ deadline = time.monotonic() + timeout
18
+ last_seq = None
19
+ while time.monotonic() < deadline:
20
+ state = client.get_latest_state()
21
+ if not state:
22
+ time.sleep(0.1)
23
+ continue
24
+ result = _get_result(state)
25
+ seq = result.get("sequence_number")
26
+ if seq is not None and seq == last_seq:
27
+ time.sleep(0.1)
28
+ continue
29
+ last_seq = seq
30
+ if predicate(result):
31
+ return result
32
+ time.sleep(0.1)
33
+ pytest.fail("Timed out waiting for the requested Nova state")
34
+
35
+
36
+ def _joints_changed(result: dict, baseline: list[float], threshold: float = 0.01) -> bool:
37
+ joints = result.get("joint_position") or []
38
+ if len(joints) < len(baseline):
39
+ return False
40
+ return any(abs(joints[i] - baseline[i]) >= threshold for i in range(len(baseline)))
41
+
42
+
43
+ def _tcp_moved(result: dict, baseline_pos: list[float], threshold: float = 0.01) -> bool:
44
+ tcp_pose = result.get("tcp_pose") or {}
45
+ pos = tcp_pose.get("position")
46
+ if not pos or len(pos) < 3:
47
+ return False
48
+ return math.dist(pos[:3], baseline_pos[:3]) >= threshold
49
+
50
+
51
+ @pytest.fixture(scope="module")
52
+ def nova_config():
53
+ try:
54
+ return nova_api.NovaApiConfig.from_env()
55
+ except ValueError as exc:
56
+ pytest.skip(f"Nova configuration missing: {exc}")
57
+
58
+
59
+ def _create_jogger(config: nova_api.NovaApiConfig) -> nova_jogger.NovaJogger:
60
+ jogger = nova_jogger.NovaJogger(
61
+ config.instance_url,
62
+ config.access_token,
63
+ config.cell_id,
64
+ config.controller_id,
65
+ config.motion_group_id,
66
+ )
67
+ assert jogger.connect()
68
+ return jogger
69
+
70
+
71
+ def test_state_stream_updates_after_joint_jog(nova_config):
72
+ if nova_api.connect is None or nova_jogger.connect is None:
73
+ pytest.skip("websockets dependency required for state stream and jogging")
74
+
75
+ client = nova_api.NovaApiClient(nova_config)
76
+ client.start_state_stream()
77
+ jogger = _create_jogger(nova_config)
78
+ try:
79
+ baseline = _wait_for_state_condition(client, lambda _: True, timeout=15.0)
80
+ baseline_joints = baseline.get("joint_position") or []
81
+ assert len(baseline_joints) >= 6, "State stream must report 6 joint positions"
82
+
83
+ assert jogger.start_joint_jog(0, "+", velocity_rads_per_sec=0.2)
84
+ time.sleep(1.0)
85
+ assert jogger.stop()
86
+
87
+ updated = _wait_for_state_condition(
88
+ client,
89
+ lambda result: _joints_changed(result, baseline_joints, threshold=0.008),
90
+ timeout=15.0,
91
+ )
92
+ assert _joints_changed(updated, baseline_joints, threshold=0.008)
93
+ finally:
94
+ jogger.disconnect()
95
+ client.stop()
96
+
97
+
98
+ def test_state_stream_reflects_tcp_translation(nova_config):
99
+ if nova_api.connect is None or nova_jogger.connect is None:
100
+ pytest.skip("websockets dependency required for state stream and jogging")
101
+
102
+ client = nova_api.NovaApiClient(nova_config)
103
+ client.start_state_stream()
104
+ jogger = _create_jogger(nova_config)
105
+ try:
106
+ baseline = _wait_for_state_condition(client, lambda state: bool(state.get("tcp_pose")), timeout=15.0)
107
+ tcp_pose = baseline.get("tcp_pose") or {}
108
+ position = tcp_pose.get("position")
109
+ if not position or len(position) < 3:
110
+ pytest.skip("TCP pose not available from state stream")
111
+
112
+ assert jogger.start_cartesian_translation("z", "+", velocity_mm_per_sec=20.0)
113
+ time.sleep(1.2)
114
+ assert jogger.stop()
115
+
116
+ moved = _wait_for_state_condition(
117
+ client,
118
+ lambda result: _tcp_moved(result, position, threshold=0.005),
119
+ timeout=15.0,
120
+ )
121
+ assert _tcp_moved(moved, position, threshold=0.005)
122
+ finally:
123
+ jogger.disconnect()
124
+ client.stop()
tests/test_ur5_home_pose.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import mujoco
2
+ import numpy as np
3
+ from robots.ur5.ur5_env import UR5Env
4
+
5
+
6
+ def test_ur5_ik_targets_match_home_pose():
7
+ """Ensure the IK solution satisfies the Cartesian pose derived from the XML home qpos."""
8
+ env = UR5Env(render_mode=None, width=1, height=1, scene_name="scene_t_push")
9
+ try:
10
+ ik_solution = env._compute_joint_targets()
11
+
12
+ env.data.qpos[:6] = ik_solution
13
+ mujoco.mj_forward(env.model, env.data)
14
+
15
+ ee_pos = env.get_end_effector_pos()
16
+ assert np.allclose(ee_pos, env._target_pos, atol=1e-2)
17
+ finally:
18
+ env.close()