Georg commited on
Commit ·
59beae4
1
Parent(s): f6d1b5e
Refactor environment configuration and mujoco_server.py for improved Nova API integration
Browse files- Removed the NOVA_USE_STATE_STREAM variable from .env.example as it is no longer needed for digital twin mode.
- Introduced a new .env.off.local file for comprehensive Nova API configuration, including instance URL, access token, and robot settings.
- Enhanced mujoco_server.py to support bidirectional control and state synchronization with the Nova API, allowing for real-time interaction while maintaining physics simulation.
- Updated UR5 environment to manage joint positions from Nova, ensuring accurate control and interaction with simulated objects.
- .env.example +0 -5
- mujoco_server.py +8 -19
- robots/ur5/ur5_env.py +14 -2
.env.example
CHANGED
|
@@ -25,8 +25,3 @@ NOVA_TCP_NAME=Flange
|
|
| 25 |
|
| 26 |
# Optional: Response rate in milliseconds (default: 200)
|
| 27 |
NOVA_RESPONSE_RATE_MS=200
|
| 28 |
-
|
| 29 |
-
# Optional: Enable digital twin mode (mirror real robot state)
|
| 30 |
-
# When true: simulation displays real robot pose (no physics simulation)
|
| 31 |
-
# When false (default): local simulation runs with Nova jogging control
|
| 32 |
-
NOVA_USE_STATE_STREAM=false
|
|
|
|
| 25 |
|
| 26 |
# Optional: Response rate in milliseconds (default: 200)
|
| 27 |
NOVA_RESPONSE_RATE_MS=200
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mujoco_server.py
CHANGED
|
@@ -170,21 +170,15 @@ def init_ur5(scene_name="scene"):
|
|
| 170 |
# Check if Nova API is configured via environment variables
|
| 171 |
nova_config = None
|
| 172 |
if os.getenv('NOVA_INSTANCE_URL') and os.getenv('NOVA_ACCESS_TOKEN'):
|
| 173 |
-
# Nova API is configured -
|
| 174 |
-
# State streaming
|
| 175 |
-
#
|
| 176 |
-
#
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
if use_state_stream:
|
| 180 |
-
print("Nova API: Digital Twin Mode - mirroring real robot state (no local physics)")
|
| 181 |
-
else:
|
| 182 |
-
print("Nova API: Jogging Mode - local simulation with Nova jog control")
|
| 183 |
-
|
| 184 |
nova_config = {
|
| 185 |
-
"use_state_stream":
|
| 186 |
-
"use_ik": False, #
|
| 187 |
-
"use_jogging": True #
|
| 188 |
}
|
| 189 |
|
| 190 |
if scene_name == "scene_t_push":
|
|
@@ -1041,11 +1035,6 @@ def index():
|
|
| 1041 |
<span id="nova_mode_text">Digital Twin Mode</span> - Read Only
|
| 1042 |
</div>
|
| 1043 |
</div>
|
| 1044 |
-
<strong style="font-size: 0.9em;">Controllers:</strong><br>
|
| 1045 |
-
<span style="font-size: 0.85em;">
|
| 1046 |
-
State: <span id="nova_state_controller" style="color: var(--wb-highlight);">Internal</span><br>
|
| 1047 |
-
IK: <span id="nova_ik_controller" style="color: var(--wb-highlight);">Internal</span>
|
| 1048 |
-
</span>
|
| 1049 |
</div>
|
| 1050 |
</div>
|
| 1051 |
</div>
|
|
|
|
| 170 |
# Check if Nova API is configured via environment variables
|
| 171 |
nova_config = None
|
| 172 |
if os.getenv('NOVA_INSTANCE_URL') and os.getenv('NOVA_ACCESS_TOKEN'):
|
| 173 |
+
# Nova API is configured - attempt bidirectional control
|
| 174 |
+
# State streaming: Mirror real robot pose while still running physics for objects (T-shape)
|
| 175 |
+
# Jogging: UI can send directional jog commands to control the real robot
|
| 176 |
+
# Note: If Nova connection fails, UR5Env will automatically fall back to internal IK
|
| 177 |
+
print("Nova API credentials detected - attempting bidirectional control")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 178 |
nova_config = {
|
| 179 |
+
"use_state_stream": True, # Attempt to mirror real robot pose
|
| 180 |
+
"use_ik": False, # Use jogging API instead of Nova IK (falls back to internal IK if connection fails)
|
| 181 |
+
"use_jogging": True # Attempt bidirectional robot control
|
| 182 |
}
|
| 183 |
|
| 184 |
if scene_name == "scene_t_push":
|
|
|
|
| 1035 |
<span id="nova_mode_text">Digital Twin Mode</span> - Read Only
|
| 1036 |
</div>
|
| 1037 |
</div>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1038 |
</div>
|
| 1039 |
</div>
|
| 1040 |
</div>
|
robots/ur5/ur5_env.py
CHANGED
|
@@ -139,6 +139,7 @@ class UR5Env(gym.Env):
|
|
| 139 |
self._use_nova_ik = False
|
| 140 |
self._use_nova_jogging = False
|
| 141 |
self._last_nova_sequence = None
|
|
|
|
| 142 |
|
| 143 |
# IK controller for end-effector control
|
| 144 |
self.controller = IKController(
|
|
@@ -561,8 +562,17 @@ class UR5Env(gym.Env):
|
|
| 561 |
def step_with_controller(self, dt: float = 0.002):
|
| 562 |
"""Step using controller (IK or direct joint control)."""
|
| 563 |
if self._use_nova_state_stream:
|
| 564 |
-
# In digital twin mode,
|
|
|
|
| 565 |
self._sync_nova_state()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 566 |
return self._get_obs()
|
| 567 |
|
| 568 |
if self._control_mode == 'ik':
|
|
@@ -645,7 +655,9 @@ class UR5Env(gym.Env):
|
|
| 645 |
if not joint_positions or len(joint_positions) < 6:
|
| 646 |
return False
|
| 647 |
|
| 648 |
-
|
|
|
|
|
|
|
| 649 |
self.data.qvel[:6] = 0.0
|
| 650 |
self.data.ctrl[:6] = self.data.qpos[:6]
|
| 651 |
mujoco.mj_forward(self.model, self.data)
|
|
|
|
| 139 |
self._use_nova_ik = False
|
| 140 |
self._use_nova_jogging = False
|
| 141 |
self._last_nova_sequence = None
|
| 142 |
+
self._latest_nova_joints = None
|
| 143 |
|
| 144 |
# IK controller for end-effector control
|
| 145 |
self.controller = IKController(
|
|
|
|
| 562 |
def step_with_controller(self, dt: float = 0.002):
|
| 563 |
"""Step using controller (IK or direct joint control)."""
|
| 564 |
if self._use_nova_state_stream:
|
| 565 |
+
# In digital twin mode, sync robot pose from Nova but still run physics
|
| 566 |
+
# This allows simulated objects (like T-shape) to interact with the real robot
|
| 567 |
self._sync_nova_state()
|
| 568 |
+
|
| 569 |
+
# Set robot joint positions from Nova (override qpos)
|
| 570 |
+
if hasattr(self, '_latest_nova_joints') and self._latest_nova_joints is not None:
|
| 571 |
+
self.data.qpos[:6] = self._latest_nova_joints
|
| 572 |
+
|
| 573 |
+
# Still run physics step for other objects (T-shape, etc.)
|
| 574 |
+
mujoco.mj_step(self.model, self.data)
|
| 575 |
+
self.steps += 1
|
| 576 |
return self._get_obs()
|
| 577 |
|
| 578 |
if self._control_mode == 'ik':
|
|
|
|
| 655 |
if not joint_positions or len(joint_positions) < 6:
|
| 656 |
return False
|
| 657 |
|
| 658 |
+
# Store latest joints for use in step_with_controller
|
| 659 |
+
self._latest_nova_joints = np.array(joint_positions[:6], dtype=np.float32)
|
| 660 |
+
self.data.qpos[:6] = self._latest_nova_joints
|
| 661 |
self.data.qvel[:6] = 0.0
|
| 662 |
self.data.ctrl[:6] = self.data.qpos[:6]
|
| 663 |
mujoco.mj_forward(self.model, self.data)
|