Georg commited on
Commit
c1ce8c4
·
1 Parent(s): 8dfd85d

Enhance episode control and jogging management in mujoco_server.py and nova_jogger.py

Browse files

- Added episode duration tracking and watchdog functionality to automatically truncate episodes exceeding the maximum duration.
- Introduced timestamps for teleoperation commands to improve debugging and state management.
- Updated jogging state management to ensure proper handling of active jogging commands and WebSocket connection states in nova_jogger.py.
- Refactored episode control state initialization and reset logic for better clarity and functionality.

mujoco_server.py CHANGED
@@ -99,6 +99,8 @@ needs_robot_switch = None # Robot to switch to
99
  episode_control_state = {
100
  "terminate": False,
101
  "truncate": False,
 
 
102
  }
103
 
104
  # Perception state - stores latest detected object poses
@@ -130,6 +132,8 @@ last_teleop_action: dict[str, Any] = {
130
  "j6": 0.0,
131
  }
132
  teleop_lock = threading.Lock()
 
 
133
 
134
  # Homing state
135
  homing_state = {
@@ -191,6 +195,11 @@ def _reset_active_environment() -> None:
191
  if env is not None:
192
  env.reset()
193
  _reset_camera_for_current_robot()
 
 
 
 
 
194
 
195
 
196
  def _schedule_robot_switch(robot: str, scene: Optional[str]) -> None:
@@ -696,6 +705,12 @@ def switch_robot(robot_type, scene_name=None):
696
  current_scene = active_scene
697
  env.reset()
698
 
 
 
 
 
 
 
699
  # Initialize gripper value in teleop_action for UR5
700
  with teleop_lock:
701
  if robot_type in ("ur5", "ur5_t_push"):
@@ -1052,6 +1067,7 @@ def _signal_episode_control(action: str):
1052
 
1053
  with episode_control_lock:
1054
  episode_control_state[action] = True
 
1055
  status_label = "terminated" if action == "terminate" else "truncated"
1056
  payload = {
1057
  "action": action,
@@ -1064,6 +1080,11 @@ def _signal_episode_control(action: str):
1064
  with mujoco_lock:
1065
  if env is not None:
1066
  env.reset()
 
 
 
 
 
1067
 
1068
 
1069
  def _consume_episode_control_flags():
@@ -1193,7 +1214,7 @@ def simulation_loop():
1193
 
1194
  def _run_iteration():
1195
  nonlocal broadcast_counter
1196
- global needs_robot_switch, latest_frame
1197
 
1198
  if needs_robot_switch is not None:
1199
  with mujoco_lock:
@@ -1203,6 +1224,28 @@ def simulation_loop():
1203
  )
1204
  needs_robot_switch = None
1205
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1206
  with mujoco_lock:
1207
  if env is None:
1208
  return
@@ -1528,7 +1571,7 @@ def _step_multi_axis_homing_locked(tolerance: float = 0.01) -> dict:
1528
 
1529
  def handle_ws_message(ws, data):
1530
  """Handle incoming WebSocket message."""
1531
- global needs_robot_switch, camera_follow, t_target_visible, last_teleop_action
1532
 
1533
  msg_type = data.get('type')
1534
 
@@ -1584,9 +1627,13 @@ def handle_ws_message(ws, data):
1584
  any_joint = any(abs(v) > 0.001 for v in joint_velocities)
1585
  any_cartesian = any(abs(v) > 0.001 for v in cartesian_translation + cartesian_rotation)
1586
 
 
 
 
1587
  # Apply the appropriate jogging mode
1588
  if any_joint:
1589
  env.start_multi_joint_jog(joint_velocities)
 
1590
  elif any_cartesian:
1591
  # Convert m/s to mm/s for translations
1592
  translation_mm_s = [v * 1000.0 for v in cartesian_translation]
@@ -1597,9 +1644,11 @@ def handle_ws_message(ws, data):
1597
  tcp_id='Flange',
1598
  coord_system_id='world'
1599
  )
 
1600
  else:
1601
  # No active velocity - stop jogging
1602
  env.stop_jog()
 
1603
  else:
1604
  # For locomotion robots: use vx, vy, vyaw
1605
  env.set_command(vx, vy, vyaw)
@@ -1628,6 +1677,11 @@ def handle_ws_message(ws, data):
1628
  with mujoco_lock:
1629
  if env is not None:
1630
  obs, info = env.reset(seed=seed)
 
 
 
 
 
1631
 
1632
  elif msg_type == 'switch_robot':
1633
  payload = data.get('data', {})
 
99
  episode_control_state = {
100
  "terminate": False,
101
  "truncate": False,
102
+ "start_time": 0.0, # Timestamp when episode started
103
+ "max_duration": 120.0, # Max episode duration in seconds (safety limit)
104
  }
105
 
106
  # Perception state - stores latest detected object poses
 
132
  "j6": 0.0,
133
  }
134
  teleop_lock = threading.Lock()
135
+ last_teleop_command_time = 0.0 # Timestamp of last teleop command (for debugging)
136
+ jogging_active = False # Track if jogging is currently active
137
 
138
  # Homing state
139
  homing_state = {
 
195
  if env is not None:
196
  env.reset()
197
  _reset_camera_for_current_robot()
198
+ # Track episode start time for duration watchdog
199
+ with episode_control_lock:
200
+ episode_control_state["start_time"] = time.time()
201
+ episode_control_state["terminate"] = False
202
+ episode_control_state["truncate"] = False
203
 
204
 
205
  def _schedule_robot_switch(robot: str, scene: Optional[str]) -> None:
 
705
  current_scene = active_scene
706
  env.reset()
707
 
708
+ # Track episode start time for duration watchdog
709
+ with episode_control_lock:
710
+ episode_control_state["start_time"] = time.time()
711
+ episode_control_state["terminate"] = False
712
+ episode_control_state["truncate"] = False
713
+
714
  # Initialize gripper value in teleop_action for UR5
715
  with teleop_lock:
716
  if robot_type in ("ur5", "ur5_t_push"):
 
1067
 
1068
  with episode_control_lock:
1069
  episode_control_state[action] = True
1070
+ episode_control_state["start_time"] = 0.0 # Reset timer
1071
  status_label = "terminated" if action == "terminate" else "truncated"
1072
  payload = {
1073
  "action": action,
 
1080
  with mujoco_lock:
1081
  if env is not None:
1082
  env.reset()
1083
+ # Track new episode start time
1084
+ with episode_control_lock:
1085
+ episode_control_state["start_time"] = time.time()
1086
+ episode_control_state["terminate"] = False
1087
+ episode_control_state["truncate"] = False
1088
 
1089
 
1090
  def _consume_episode_control_flags():
 
1214
 
1215
  def _run_iteration():
1216
  nonlocal broadcast_counter
1217
+ global needs_robot_switch, latest_frame, jogging_active, last_teleop_command_time
1218
 
1219
  if needs_robot_switch is not None:
1220
  with mujoco_lock:
 
1224
  )
1225
  needs_robot_switch = None
1226
 
1227
+ # Check episode duration watchdog
1228
+ current_time = time.time()
1229
+ with episode_control_lock:
1230
+ episode_start = episode_control_state.get("start_time", 0.0)
1231
+ max_duration = episode_control_state.get("max_duration", 120.0)
1232
+ if episode_start > 0 and (current_time - episode_start) > max_duration:
1233
+ # Episode has exceeded max duration - automatically truncate
1234
+ print(f"[Watchdog] Episode exceeded max duration ({max_duration}s) - truncating episode")
1235
+ episode_control_state["truncate"] = True
1236
+ episode_control_state["start_time"] = 0.0 # Reset timer
1237
+ # Stop any active jogging
1238
+ if jogging_active:
1239
+ with mujoco_lock:
1240
+ if env is not None and current_robot in ("ur5", "ur5_t_push"):
1241
+ stop_jog_fn = getattr(env, "stop_jog", None)
1242
+ if callable(stop_jog_fn):
1243
+ try:
1244
+ stop_jog_fn()
1245
+ jogging_active = False
1246
+ except Exception as e:
1247
+ print(f"[Watchdog] Error stopping jogging during truncation: {e}")
1248
+
1249
  with mujoco_lock:
1250
  if env is None:
1251
  return
 
1571
 
1572
  def handle_ws_message(ws, data):
1573
  """Handle incoming WebSocket message."""
1574
+ global needs_robot_switch, camera_follow, t_target_visible, last_teleop_action, last_teleop_command_time, jogging_active
1575
 
1576
  msg_type = data.get('type')
1577
 
 
1627
  any_joint = any(abs(v) > 0.001 for v in joint_velocities)
1628
  any_cartesian = any(abs(v) > 0.001 for v in cartesian_translation + cartesian_rotation)
1629
 
1630
+ # Update teleop command timestamp
1631
+ last_teleop_command_time = time.time()
1632
+
1633
  # Apply the appropriate jogging mode
1634
  if any_joint:
1635
  env.start_multi_joint_jog(joint_velocities)
1636
+ jogging_active = True
1637
  elif any_cartesian:
1638
  # Convert m/s to mm/s for translations
1639
  translation_mm_s = [v * 1000.0 for v in cartesian_translation]
 
1644
  tcp_id='Flange',
1645
  coord_system_id='world'
1646
  )
1647
+ jogging_active = True
1648
  else:
1649
  # No active velocity - stop jogging
1650
  env.stop_jog()
1651
+ jogging_active = False
1652
  else:
1653
  # For locomotion robots: use vx, vy, vyaw
1654
  env.set_command(vx, vy, vyaw)
 
1677
  with mujoco_lock:
1678
  if env is not None:
1679
  obs, info = env.reset(seed=seed)
1680
+ # Track episode start time for duration watchdog
1681
+ with episode_control_lock:
1682
+ episode_control_state["start_time"] = time.time()
1683
+ episode_control_state["terminate"] = False
1684
+ episode_control_state["truncate"] = False
1685
 
1686
  elif msg_type == 'switch_robot':
1687
  payload = data.get('data', {})
robots/ur5/model/scene_t_push.xml CHANGED
@@ -113,7 +113,7 @@
113
  </body>
114
 
115
  <!-- UR5e robot mounted on table edge -->
116
- <body name="base" pos="0 0 0.53" quat="0 0 0 -1" childclass="ur5e">
117
  <inertial mass="4.0" pos="0 0 0" diaginertia="0.00443333156 0.00443333156 0.0072"/>
118
  <geom mesh="base_0" material="black" class="visual"/>
119
  <geom mesh="base_1" material="jointgray" class="visual"/>
 
113
  </body>
114
 
115
  <!-- UR5e robot mounted on table edge -->
116
+ <body name="base" pos="0 0 0.54" quat="0 0 0 -1" childclass="ur5e">
117
  <inertial mass="4.0" pos="0 0 0" diaginertia="0.00443333156 0.00443333156 0.0072"/>
118
  <geom mesh="base_0" material="black" class="visual"/>
119
  <geom mesh="base_1" material="jointgray" class="visual"/>
robots/ur5/nova_jogger.py CHANGED
@@ -427,7 +427,18 @@ class NovaJogger:
427
  self._current_command = None
428
 
429
  # Send one final stop command using v2 API
 
430
  if self._jogger_ws:
 
 
 
 
 
 
 
 
 
 
431
  # Option 1: Use PauseJoggingRequest (v2 API preferred method)
432
  # Option 2: Send zero velocities (backward compatible)
433
  # Using zero velocities for better compatibility with existing code
@@ -451,7 +462,16 @@ class NovaJogger:
451
  "message_type": "PauseJoggingRequest"
452
  }
453
 
454
- self._jogger_ws.send(json.dumps(command))
 
 
 
 
 
 
 
 
 
455
 
456
  print("[Nova Jogger] Stopped (cleared command)")
457
  return True
@@ -462,7 +482,11 @@ class NovaJogger:
462
 
463
  def disconnect(self):
464
  """Disconnect from the jogger."""
465
- # Stop the send thread first
 
 
 
 
466
  self._stop_send_thread = True
467
  if self._send_thread is not None:
468
  self._send_thread.join(timeout=2.0)
@@ -476,14 +500,25 @@ class NovaJogger:
476
 
477
  if self._jogger_ws:
478
  try:
 
479
  self.stop()
480
- self._jogger_ws.close()
481
  except Exception as e:
482
- print(f"[Nova Jogger] Error during disconnect: {e}")
 
 
 
 
 
 
 
 
 
 
483
  finally:
484
  self._jogger_ws = None
485
  self._connected = False
486
  self._current_mode = None
 
487
 
488
  def is_connected(self) -> bool:
489
  """Check if jogger is connected."""
 
427
  self._current_command = None
428
 
429
  # Send one final stop command using v2 API
430
+ # Check if WebSocket is open before sending
431
  if self._jogger_ws:
432
+ # Check WebSocket state - only send if connection is open
433
+ try:
434
+ ws_state = getattr(self._jogger_ws, 'socket', None)
435
+ if ws_state is None or (hasattr(ws_state, 'fileno') and ws_state.fileno() == -1):
436
+ # WebSocket is closed, skip sending
437
+ print("[Nova Jogger] Stopped (WebSocket already closed)")
438
+ return True
439
+ except:
440
+ pass # If we can't check state, try sending anyway
441
+
442
  # Option 1: Use PauseJoggingRequest (v2 API preferred method)
443
  # Option 2: Send zero velocities (backward compatible)
444
  # Using zero velocities for better compatibility with existing code
 
462
  "message_type": "PauseJoggingRequest"
463
  }
464
 
465
+ try:
466
+ self._jogger_ws.send(json.dumps(command))
467
+ except Exception as send_error:
468
+ # Ignore errors related to closed connections
469
+ error_str = str(send_error).lower()
470
+ if "close frame" in error_str or "closed" in error_str:
471
+ print("[Nova Jogger] Stopped (connection closed during stop)")
472
+ return True
473
+ # Re-raise other errors
474
+ raise
475
 
476
  print("[Nova Jogger] Stopped (cleared command)")
477
  return True
 
482
 
483
  def disconnect(self):
484
  """Disconnect from the jogger."""
485
+ # Clear current command first to stop the send loop
486
+ with self._lock:
487
+ self._current_command = None
488
+
489
+ # Stop the send thread
490
  self._stop_send_thread = True
491
  if self._send_thread is not None:
492
  self._send_thread.join(timeout=2.0)
 
500
 
501
  if self._jogger_ws:
502
  try:
503
+ # Try to send a stop command before closing
504
  self.stop()
 
505
  except Exception as e:
506
+ # Ignore errors during stop - we're disconnecting anyway
507
+ error_str = str(e).lower()
508
+ if "close frame" not in error_str and "closed" not in error_str:
509
+ print(f"[Nova Jogger] Error during stop before disconnect: {e}")
510
+
511
+ try:
512
+ # Close the WebSocket connection
513
+ self._jogger_ws.close(timeout=1.0)
514
+ except Exception as e:
515
+ # Ignore close errors - connection might already be closed
516
+ pass
517
  finally:
518
  self._jogger_ws = None
519
  self._connected = False
520
  self._current_mode = None
521
+ print("[Nova Jogger] Disconnected")
522
 
523
  def is_connected(self) -> bool:
524
  """Check if jogger is connected."""