Alina commited on
Commit
ee69683
·
unverified ·
2 Parent(s): 46f3109 e278a6c

Merge pull request #34 from pollen-robotics/25-external-threads-should-raise-flags-and-never-mute-states-themselves-this-would-remove-the-need-for-most-locks

Browse files
pyproject.toml CHANGED
@@ -17,6 +17,7 @@ dependencies = [
17
  "mediapipe>=0.10.14",
18
  "num2words",
19
  "onnxruntime",
 
20
  "openai>=2.1",
21
  "PyGObject>=3.42.2,<=3.46.0",
22
  "python-dotenv",
 
17
  "mediapipe>=0.10.14",
18
  "num2words",
19
  "onnxruntime",
20
+ "opencv-python>=4.12.0.88",
21
  "openai>=2.1",
22
  "PyGObject>=3.42.2,<=3.46.0",
23
  "python-dotenv",
src/reachy_mini_conversation_demo/audio/head_wobbler.py CHANGED
@@ -5,7 +5,7 @@ import queue
5
  import base64
6
  import logging
7
  import threading
8
- from typing import Optional
9
 
10
  import numpy as np
11
 
@@ -20,22 +20,29 @@ logger = logging.getLogger(__name__)
20
  class HeadWobbler:
21
  """Converts audio deltas (base64) into head movement offsets."""
22
 
23
- def __init__(self, set_offsets):
24
  """Initialize the head wobbler."""
25
- self._apply_offsets = set_offsets
26
  self._base_ts: Optional[float] = None
27
  self._hops_done: int = 0
28
 
29
- self.audio_queue: queue.Queue = queue.Queue()
30
  self.sway = SwayRollRT()
31
 
 
 
 
 
 
32
  self._stop_event = threading.Event()
33
  self._thread: Optional[threading.Thread] = None
34
 
35
  def feed(self, delta_b64: str) -> None:
36
  """Thread-safe: push audio into the consumer queue."""
37
  buf = np.frombuffer(base64.b64decode(delta_b64), dtype=np.int16).reshape(1, -1)
38
- self.audio_queue.put((SAMPLE_RATE, buf))
 
 
39
 
40
  def start(self) -> None:
41
  """Start the head wobbler loop in a thread."""
@@ -57,54 +64,84 @@ class HeadWobbler:
57
 
58
  logger.debug("Head wobbler thread started")
59
  while not self._stop_event.is_set():
 
60
  try:
61
- sr, chunk = self.audio_queue.get_nowait() # (1,N) int16
62
  except queue.Empty:
63
  # avoid while to never exit
64
  time.sleep(MOVEMENT_LATENCY_S)
65
  continue
66
 
67
- pcm = np.asarray(chunk).squeeze(0)
68
- results = self.sway.feed(pcm, sr)
69
- self.audio_queue.task_done()
 
 
70
 
71
- if self._base_ts is None:
72
- self._base_ts = time.time()
 
73
 
74
- i = 0
75
- while i < len(results):
76
  if self._base_ts is None:
77
- self._base_ts = time.time()
78
- continue
79
-
80
- target = self._base_ts + MOVEMENT_LATENCY_S + self._hops_done * hop_dt
81
- now = time.time()
82
-
83
- if now - target >= hop_dt:
84
- lag_hops = int((now - target) / hop_dt)
85
- drop = min(lag_hops, len(results) - i - 1)
86
- if drop > 0:
87
- self._hops_done += drop
88
- i += drop
89
- continue
90
-
91
- if target > now:
92
- time.sleep(target - now)
93
-
94
- r = results[i]
95
- offsets = (
96
- r["x_mm"] / 1000.0,
97
- r["y_mm"] / 1000.0,
98
- r["z_mm"] / 1000.0,
99
- r["roll_rad"],
100
- r["pitch_rad"],
101
- r["yaw_rad"],
102
- )
103
-
104
- self._apply_offsets(offsets)
105
-
106
- self._hops_done += 1
107
- i += 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
  logger.debug("Head wobbler thread exited")
109
 
110
  '''
@@ -119,8 +156,24 @@ class HeadWobbler:
119
 
120
  def reset(self) -> None:
121
  """Reset the internal state."""
122
- # self.drain_audio_queue()
123
- self.audio_queue = queue.Queue()
124
- self._base_ts = None
125
- self._hops_done = 0
126
- self.sway.reset()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  import base64
6
  import logging
7
  import threading
8
+ from typing import Tuple, Optional
9
 
10
  import numpy as np
11
 
 
20
  class HeadWobbler:
21
  """Converts audio deltas (base64) into head movement offsets."""
22
 
23
+ def __init__(self, set_speech_offsets):
24
  """Initialize the head wobbler."""
25
+ self._apply_offsets = set_speech_offsets
26
  self._base_ts: Optional[float] = None
27
  self._hops_done: int = 0
28
 
29
+ self.audio_queue: queue.Queue[Tuple[int, int, np.ndarray]] = queue.Queue()
30
  self.sway = SwayRollRT()
31
 
32
+ # Synchronization primitives
33
+ self._state_lock = threading.Lock()
34
+ self._sway_lock = threading.Lock()
35
+ self._generation = 0
36
+
37
  self._stop_event = threading.Event()
38
  self._thread: Optional[threading.Thread] = None
39
 
40
  def feed(self, delta_b64: str) -> None:
41
  """Thread-safe: push audio into the consumer queue."""
42
  buf = np.frombuffer(base64.b64decode(delta_b64), dtype=np.int16).reshape(1, -1)
43
+ with self._state_lock:
44
+ generation = self._generation
45
+ self.audio_queue.put((generation, SAMPLE_RATE, buf))
46
 
47
  def start(self) -> None:
48
  """Start the head wobbler loop in a thread."""
 
64
 
65
  logger.debug("Head wobbler thread started")
66
  while not self._stop_event.is_set():
67
+ queue_ref = self.audio_queue
68
  try:
69
+ chunk_generation, sr, chunk = queue_ref.get_nowait() # (gen, sr, data)
70
  except queue.Empty:
71
  # avoid while to never exit
72
  time.sleep(MOVEMENT_LATENCY_S)
73
  continue
74
 
75
+ try:
76
+ with self._state_lock:
77
+ current_generation = self._generation
78
+ if chunk_generation != current_generation:
79
+ continue
80
 
81
+ pcm = np.asarray(chunk).squeeze(0)
82
+ with self._sway_lock:
83
+ results = self.sway.feed(pcm, sr)
84
 
 
 
85
  if self._base_ts is None:
86
+ with self._state_lock:
87
+ if self._base_ts is None:
88
+ self._base_ts = time.time()
89
+
90
+ i = 0
91
+ while i < len(results):
92
+ with self._state_lock:
93
+ if self._generation != current_generation:
94
+ break
95
+ base_ts = self._base_ts
96
+ hops_done = self._hops_done
97
+
98
+ if base_ts is None:
99
+ base_ts = time.time()
100
+ with self._state_lock:
101
+ if self._base_ts is None:
102
+ self._base_ts = base_ts
103
+ hops_done = self._hops_done
104
+
105
+ target = base_ts + MOVEMENT_LATENCY_S + hops_done * hop_dt
106
+ now = time.time()
107
+
108
+ if now - target >= hop_dt:
109
+ lag_hops = int((now - target) / hop_dt)
110
+ drop = min(lag_hops, len(results) - i - 1)
111
+ if drop > 0:
112
+ with self._state_lock:
113
+ self._hops_done += drop
114
+ hops_done = self._hops_done
115
+ i += drop
116
+ continue
117
+
118
+ if target > now:
119
+ time.sleep(target - now)
120
+ with self._state_lock:
121
+ if self._generation != current_generation:
122
+ break
123
+
124
+ r = results[i]
125
+ offsets = (
126
+ r["x_mm"] / 1000.0,
127
+ r["y_mm"] / 1000.0,
128
+ r["z_mm"] / 1000.0,
129
+ r["roll_rad"],
130
+ r["pitch_rad"],
131
+ r["yaw_rad"],
132
+ )
133
+
134
+ with self._state_lock:
135
+ if self._generation != current_generation:
136
+ break
137
+
138
+ self._apply_offsets(offsets)
139
+
140
+ with self._state_lock:
141
+ self._hops_done += 1
142
+ i += 1
143
+ finally:
144
+ queue_ref.task_done()
145
  logger.debug("Head wobbler thread exited")
146
 
147
  '''
 
156
 
157
  def reset(self) -> None:
158
  """Reset the internal state."""
159
+ with self._state_lock:
160
+ self._generation += 1
161
+ self._base_ts = None
162
+ self._hops_done = 0
163
+
164
+ # Drain any queued audio chunks from previous generations
165
+ drained_any = False
166
+ while True:
167
+ try:
168
+ _, _, _ = self.audio_queue.get_nowait()
169
+ except queue.Empty:
170
+ break
171
+ else:
172
+ drained_any = True
173
+ self.audio_queue.task_done()
174
+
175
+ with self._sway_lock:
176
+ self.sway.reset()
177
+
178
+ if drained_any:
179
+ logger.debug("Head wobbler queue drained during reset")
src/reachy_mini_conversation_demo/main.py CHANGED
@@ -31,17 +31,16 @@ def main():
31
  logger = setup_logger(args.debug)
32
  logger.info("Starting Reachy Mini Conversation Demo")
33
 
34
- robot = ReachyMini()
35
 
36
- camera_worker, head_tracker, vision_manager = handle_vision_stuff(args, robot)
37
 
38
  movement_manager = MovementManager(
39
  current_robot=robot,
40
- head_tracker=head_tracker,
41
  camera_worker=camera_worker,
42
  )
43
 
44
- head_wobbler = HeadWobbler(set_offsets=movement_manager.set_offsets)
45
 
46
  deps = ToolDependencies(
47
  reachy_mini=robot,
 
31
  logger = setup_logger(args.debug)
32
  logger.info("Starting Reachy Mini Conversation Demo")
33
 
34
+ robot = ReachyMini(use_sim=False)
35
 
36
+ camera_worker, _, vision_manager = handle_vision_stuff(args, robot)
37
 
38
  movement_manager = MovementManager(
39
  current_robot=robot,
 
40
  camera_worker=camera_worker,
41
  )
42
 
43
+ head_wobbler = HeadWobbler(set_speech_offsets=movement_manager.set_speech_offsets)
44
 
45
  deps = ToolDependencies(
46
  reachy_mini=robot,
src/reachy_mini_conversation_demo/moves.py CHANGED
@@ -1,16 +1,42 @@
1
  """Movement system with sequential primary moves and additive secondary moves.
2
 
3
- This module implements the movement architecture from main_works.py:
4
- - Primary moves (sequential): emotions, dances, goto, breathing
5
- - Secondary moves (additive): speech offsets + face tracking
6
- - Single set_target() control point with pose fusion
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7
  """
8
 
9
  from __future__ import annotations
10
  import time
11
  import logging
12
  import threading
13
- from typing import Tuple, Optional
 
14
  from collections import deque
15
  from dataclasses import dataclass
16
 
@@ -27,6 +53,9 @@ from reachy_mini.utils.interpolation import (
27
 
28
  logger = logging.getLogger(__name__)
29
 
 
 
 
30
  # Type definitions
31
  FullBodyPose = Tuple[np.ndarray, Tuple[float, float], float] # (head_pose_4x4, antennas, body_yaw)
32
 
@@ -113,8 +142,9 @@ def combine_full_body(primary_pose: FullBodyPose, secondary_pose: FullBodyPose)
113
  primary_head, primary_antennas, primary_body_yaw = primary_pose
114
  secondary_head, secondary_antennas, secondary_body_yaw = secondary_pose
115
 
116
- # Combine head poses using compose_world_offset
117
- # primary_head is T_abs, secondary_head is T_off_world
 
118
  combined_head = compose_world_offset(primary_head, secondary_head, reorthonormalize=True)
119
 
120
  # Sum antennas and body_yaw
@@ -171,32 +201,65 @@ class MovementState:
171
 
172
  def update_activity(self) -> None:
173
  """Update the last activity time."""
174
- self.last_activity_time = time.time()
175
 
176
 
177
- class MovementManager:
178
- """Enhanced movement manager that reproduces main_works.py behavior.
 
 
 
 
 
 
 
 
179
 
180
- - Sequential primary moves via queue
181
- - Additive secondary moves (speech + face tracking)
182
- - Single set_target control loop with pose fusion
183
- - Automatic breathing after inactivity
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
184
  """
185
 
186
  def __init__(
187
  self,
188
  current_robot: ReachyMini,
189
- head_tracker=None,
190
  camera_worker=None,
191
  ):
192
  """Initialize movement manager."""
193
  self.current_robot = current_robot
194
- self.head_tracker = head_tracker
195
  self.camera_worker = camera_worker
196
 
 
 
 
197
  # Movement state
198
  self.state = MovementState()
199
- self.state.last_activity_time = time.time()
200
  neutral_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
201
  self.state.last_primary_pose = (neutral_pose, (0.0, 0.0), 0.0)
202
 
@@ -204,186 +267,328 @@ class MovementManager:
204
  self.move_queue = deque()
205
 
206
  # Configuration
207
- self.idle_inactivity_delay = 5.0 # seconds
208
- self.target_frequency = 50.0 # Hz
209
  self.target_period = 1.0 / self.target_frequency
210
 
211
  self._stop_event = threading.Event()
212
  self._thread: Optional[threading.Thread] = None
213
- self._state_lock = threading.RLock()
214
  self._is_listening = False
215
  self._last_commanded_pose: FullBodyPose = clone_full_body_pose(self.state.last_primary_pose)
216
  self._listening_antennas: Tuple[float, float] = self._last_commanded_pose[1]
217
  self._antenna_unfreeze_blend = 1.0
218
  self._antenna_blend_duration = 0.4 # seconds to blend back after listening
219
- self._last_listening_blend_time = time.monotonic()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
 
221
  def queue_move(self, move: Move) -> None:
222
- """Add a move to the primary move queue."""
223
- with self._state_lock:
224
- self.move_queue.append(move)
225
- self.state.update_activity()
226
- logger.info(f"Queued move with duration {move.duration}s, queue size: {len(self.move_queue)}")
 
227
 
228
  def clear_queue(self) -> None:
229
- """Clear all queued moves and stop current move."""
230
- with self._state_lock:
231
- self.move_queue.clear()
232
- self.state.current_move = None
233
- self.state.move_start_time = None
234
- self.state.is_playing_move = False
235
- logger.info("Cleared move queue and stopped current move")
236
 
237
- def set_speech_offsets(self, offsets: Tuple[float, float, float, float, float, float]) -> None:
238
- """Set speech head offsets (secondary move)."""
239
- with self._state_lock:
240
- self.state.speech_offsets = offsets
241
 
242
- def set_offsets(self, offsets: Tuple[float, float, float, float, float, float]) -> None:
243
- """Compatibility alias for set_speech_offsets."""
244
- self.set_speech_offsets(offsets)
245
 
246
- def set_face_tracking_offsets(self, offsets: Tuple[float, float, float, float, float, float]) -> None:
247
- """Set face tracking offsets (secondary move)."""
248
- with self._state_lock:
249
- self.state.face_tracking_offsets = offsets
 
 
250
 
251
  def set_moving_state(self, duration: float) -> None:
252
- """Set legacy moving state for goto moves."""
253
- with self._state_lock:
254
- self.state.moving_start = time.time()
255
- self.state.moving_for = duration
256
- self.state.update_activity()
257
 
258
- def is_idle(self):
259
- """Check if the robot is idle based on inactivity delay."""
260
- with self._state_lock:
261
- if self._is_listening:
262
- return False
263
- current_time = time.time()
264
- time_since_activity = current_time - self.state.last_activity_time
265
- return time_since_activity >= self.idle_inactivity_delay
 
 
 
 
 
 
 
266
 
267
  def mark_user_activity(self) -> None:
268
- """Record recent user activity to delay idle behaviours."""
269
- with self._state_lock:
270
- self.state.update_activity()
271
 
272
  def set_listening(self, listening: bool) -> None:
273
- """Toggle listening mode, freezing antennas when active."""
274
- with self._state_lock:
275
- if self._is_listening == listening:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
276
  return
277
- self._is_listening = listening
278
- self._last_listening_blend_time = time.monotonic()
279
- if listening:
280
- # Capture the last antenna command so we keep that pose during listening
 
 
 
 
 
281
  self._listening_antennas = (
282
  float(self._last_commanded_pose[1][0]),
283
  float(self._last_commanded_pose[1][1]),
284
  )
285
  self._antenna_unfreeze_blend = 0.0
 
 
 
286
  self.state.update_activity()
 
 
 
 
 
 
 
 
287
 
288
  def _manage_move_queue(self, current_time: float) -> None:
289
  """Manage the primary move queue (sequential execution)."""
290
- with self._state_lock:
291
- if self.state.current_move is None or (
292
- self.state.move_start_time is not None
293
- and current_time - self.state.move_start_time >= self.state.current_move.duration
294
- ):
295
- self.state.current_move = None
296
- self.state.move_start_time = None
297
-
298
- if self.move_queue:
299
- self.state.current_move = self.move_queue.popleft()
300
- self.state.move_start_time = current_time
301
- logger.debug(f"Starting new move, duration: {self.state.current_move.duration}s")
 
302
 
303
  def _manage_breathing(self, current_time: float) -> None:
304
  """Manage automatic breathing when idle."""
305
- # Start breathing after inactivity delay if no moves in queue
306
- with self._state_lock:
307
- if self.state.current_move is None and not self.move_queue:
308
- time_since_activity = current_time - self.state.last_activity_time
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
 
310
- if self.is_idle():
311
- try:
312
- _, current_antennas = self.current_robot.get_current_joint_positions()
313
- current_head_pose = self.current_robot.get_current_head_pose()
314
-
315
- breathing_move = BreathingMove(
316
- interpolation_start_pose=current_head_pose,
317
- interpolation_start_antennas=current_antennas,
318
- interpolation_duration=1.0,
319
- )
320
- self.move_queue.append(breathing_move)
321
- self.state.update_activity()
322
- logger.debug(f"Started breathing after {time_since_activity:.1f}s of inactivity")
323
- except Exception as e:
324
- logger.error(f"Failed to start breathing: {e}")
325
-
326
- if (
327
- self.state.current_move is not None
328
- and isinstance(self.state.current_move, BreathingMove)
329
- and self.move_queue
330
- ):
331
- self.state.current_move = None
332
- self.state.move_start_time = None
333
- logger.info("Stopping breathing due to new move activity")
334
 
335
  def _get_primary_pose(self, current_time: float) -> FullBodyPose:
336
  """Get the primary full body pose from current move or neutral."""
337
- with self._state_lock:
338
- # When a primary move is playing, sample it and cache the resulting pose
339
- if self.state.current_move is not None and self.state.move_start_time is not None:
340
- move_time = current_time - self.state.move_start_time
341
- head, antennas, body_yaw = self.state.current_move.evaluate(move_time)
342
-
343
- if head is None:
344
- head = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
345
- if antennas is None:
346
- antennas = np.array([0.0, 0.0])
347
- if body_yaw is None:
348
- body_yaw = 0.0
349
-
350
- antennas_tuple = (float(antennas[0]), float(antennas[1]))
351
- head_copy = head.copy()
352
- primary_full_body_pose = (
353
- head_copy,
354
- antennas_tuple,
355
- float(body_yaw),
356
- )
357
 
358
- self.state.is_playing_move = True
359
- self.state.is_moving = True
360
- self.state.last_primary_pose = clone_full_body_pose(primary_full_body_pose)
361
- else:
362
- # Otherwise reuse the last primary pose so we avoid jumps between moves
363
- self.state.is_playing_move = False
364
- self.state.is_moving = time.time() - self.state.moving_start < self.state.moving_for
365
 
366
- if self.state.last_primary_pose is not None:
367
- primary_full_body_pose = clone_full_body_pose(self.state.last_primary_pose)
368
- else:
369
- neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
370
- primary_full_body_pose = (neutral_head_pose, (0.0, 0.0), 0.0)
371
- self.state.last_primary_pose = clone_full_body_pose(primary_full_body_pose)
372
 
373
  return primary_full_body_pose
374
 
375
  def _get_secondary_pose(self) -> FullBodyPose:
376
  """Get the secondary full body pose from speech and face tracking offsets."""
377
  # Combine speech sway offsets + face tracking offsets for secondary pose
378
- with self._state_lock:
379
- secondary_offsets = [
380
- self.state.speech_offsets[0] + self.state.face_tracking_offsets[0],
381
- self.state.speech_offsets[1] + self.state.face_tracking_offsets[1],
382
- self.state.speech_offsets[2] + self.state.face_tracking_offsets[2],
383
- self.state.speech_offsets[3] + self.state.face_tracking_offsets[3],
384
- self.state.speech_offsets[4] + self.state.face_tracking_offsets[4],
385
- self.state.speech_offsets[5] + self.state.face_tracking_offsets[5],
386
- ]
387
 
388
  secondary_head_pose = create_head_pose(
389
  x=secondary_offsets[0],
@@ -395,127 +600,236 @@ class MovementManager:
395
  degrees=False,
396
  mm=False,
397
  )
398
- return (secondary_head_pose, (0, 0), 0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
399
 
400
  def _update_face_tracking(self, current_time: float) -> None:
401
  """Get face tracking offsets from camera worker thread."""
402
  if self.camera_worker is not None:
403
  # Get face tracking offsets from camera worker thread
404
  offsets = self.camera_worker.get_face_tracking_offsets()
405
- with self._state_lock:
406
- self.state.face_tracking_offsets = offsets
407
  else:
408
  # No camera worker, use neutral offsets
409
- with self._state_lock:
410
- self.state.face_tracking_offsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
411
 
412
  def start(self) -> None:
413
- """Start the move worker loop in a thread."""
 
 
 
414
  self._stop_event.clear()
415
  self._thread = threading.Thread(target=self.working_loop, daemon=True)
416
  self._thread.start()
417
  logger.debug("Move worker started")
418
 
419
  def stop(self) -> None:
420
- """Stop the move worker loop."""
421
  self._stop_event.set()
422
  if self._thread is not None:
423
  self._thread.join()
 
 
424
  logger.debug("Move worker stopped")
425
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426
  def working_loop(self) -> None:
427
  """Control loop main movements - reproduces main_works.py control architecture.
428
 
429
  Single set_target() call with pose fusion.
430
  """
431
- logger.debug("Starting enhanced movement control loop (50Hz)")
432
 
433
  loop_count = 0
434
- last_print_time = time.time()
 
 
435
 
436
  while not self._stop_event.is_set():
437
- loop_start_time = time.time()
438
  loop_count += 1
439
- current_time = time.time()
440
 
441
- # 1. Manage move queue (sequential primary moves)
442
- self._manage_move_queue(current_time)
 
443
 
444
- # 2. Manage breathing (automatic idle behavior)
445
- self._manage_breathing(current_time)
446
 
447
- # 3. Update face tracking offsets
448
- self._update_face_tracking(current_time)
449
 
450
- # 4. Get primary pose from current move or neutral
451
- primary_full_body_pose = self._get_primary_pose(current_time)
452
 
453
- # 5. Get secondary pose from speech and face tracking offsets
454
- secondary_full_body_pose = self._get_secondary_pose()
455
 
456
- # 6. Combine primary and secondary poses
457
- global_full_body_pose = combine_full_body(primary_full_body_pose, secondary_full_body_pose)
458
 
459
- # 7. Extract pose components
460
- head, antennas, body_yaw = global_full_body_pose
461
- now_monotonic = time.monotonic()
462
- with self._state_lock:
463
- listening = self._is_listening
464
- listening_antennas = self._listening_antennas
465
- blend = self._antenna_unfreeze_blend
466
- blend_duration = self._antenna_blend_duration
467
- last_update = self._last_listening_blend_time
468
- self._last_listening_blend_time = now_monotonic
469
 
470
- # Blend antenna outputs back to the live motion when leaving listening mode
471
- if listening:
472
- antennas_cmd = listening_antennas
473
- new_blend = 0.0
474
- else:
475
- dt = max(0.0, now_monotonic - last_update)
476
- if blend_duration <= 0:
477
- new_blend = 1.0
478
- else:
479
- new_blend = min(1.0, blend + dt / blend_duration)
480
- antennas_cmd = (
481
- listening_antennas[0] * (1.0 - new_blend) + antennas[0] * new_blend,
482
- listening_antennas[1] * (1.0 - new_blend) + antennas[1] * new_blend,
483
- )
484
 
485
- with self._state_lock:
486
- if listening:
487
- self._antenna_unfreeze_blend = 0.0
488
- else:
489
- self._antenna_unfreeze_blend = new_blend
490
- if new_blend >= 1.0:
491
- self._listening_antennas = (
492
- float(antennas[0]),
493
- float(antennas[1]),
494
- )
495
-
496
- # 8. Single set_target call - the one and only place we control the robot
497
- try:
498
- self.current_robot.set_target(head=head, antennas=antennas_cmd, body_yaw=body_yaw)
499
- except Exception as e:
500
- logger.error(f"Failed to set robot target: {e}")
501
- else:
502
- with self._state_lock:
503
- self._last_commanded_pose = clone_full_body_pose((head, antennas_cmd, body_yaw))
504
-
505
- # 9. Calculate computation time and adjust sleep for 50Hz
506
- computation_time = time.time() - loop_start_time
507
- sleep_time = max(0, self.target_period - computation_time)
508
-
509
- # 10. Print frequency info every 100 loops (~2 seconds)
510
- if loop_count % 100 == 0:
511
- elapsed = current_time - last_print_time
512
- actual_freq = 100.0 / elapsed if elapsed > 0 else 0
513
- potential_freq = 1.0 / computation_time if computation_time > 0 else float("inf")
514
- logger.debug(
515
- f"Loop freq - Actual: {actual_freq:.1f}Hz, Potential: {potential_freq:.1f}Hz, Target: {self.target_frequency:.1f}Hz"
516
- )
517
- last_print_time = current_time
518
 
519
- time.sleep(sleep_time)
 
520
 
521
  logger.debug("Movement control loop stopped")
 
1
  """Movement system with sequential primary moves and additive secondary moves.
2
 
3
+ Design overview
4
+ - Primary moves (emotions, dances, goto, breathing) are mutually exclusive and run
5
+ sequentially.
6
+ - Secondary moves (speech sway, face tracking) are additive offsets applied on top
7
+ of the current primary pose.
8
+ - There is a single control point to the robot: `ReachyMini.set_target`.
9
+ - The control loop runs near 100 Hz and is phase-aligned via a monotonic clock.
10
+ - Idle behaviour starts an infinite `BreathingMove` after a short inactivity delay
11
+ unless listening is active.
12
+
13
+ Threading model
14
+ - A dedicated worker thread owns all real-time state and issues `set_target`
15
+ commands.
16
+ - Other threads communicate via a command queue (enqueue moves, mark activity,
17
+ toggle listening).
18
+ - Secondary offset producers set pending values guarded by locks; the worker
19
+ snaps them atomically.
20
+
21
+ Units and frames
22
+ - Secondary offsets are interpreted as metres for x/y/z and radians for
23
+ roll/pitch/yaw in the world frame (unless noted by `compose_world_offset`).
24
+ - Antennas and `body_yaw` are in radians.
25
+ - Head pose composition uses `compose_world_offset(primary_head, secondary_head)`;
26
+ the secondary offset must therefore be expressed in the world frame.
27
+
28
+ Safety
29
+ - Listening freezes antennas, then blends them back on unfreeze.
30
+ - Interpolations and blends are used to avoid jumps at all times.
31
+ - `set_target` errors are rate-limited in logs.
32
  """
33
 
34
  from __future__ import annotations
35
  import time
36
  import logging
37
  import threading
38
+ from queue import Empty, Queue
39
+ from typing import Any, Tuple, Optional
40
  from collections import deque
41
  from dataclasses import dataclass
42
 
 
53
 
54
  logger = logging.getLogger(__name__)
55
 
56
+ # Configuration constants
57
+ CONTROL_LOOP_FREQUENCY_HZ = 100.0 # Hz - Target frequency for the movement control loop
58
+
59
  # Type definitions
60
  FullBodyPose = Tuple[np.ndarray, Tuple[float, float], float] # (head_pose_4x4, antennas, body_yaw)
61
 
 
142
  primary_head, primary_antennas, primary_body_yaw = primary_pose
143
  secondary_head, secondary_antennas, secondary_body_yaw = secondary_pose
144
 
145
+ # Combine head poses using compose_world_offset; the secondary pose must be an
146
+ # offset expressed in the world frame (T_off_world) applied to the absolute
147
+ # primary transform (T_abs).
148
  combined_head = compose_world_offset(primary_head, secondary_head, reorthonormalize=True)
149
 
150
  # Sum antennas and body_yaw
 
201
 
202
  def update_activity(self) -> None:
203
  """Update the last activity time."""
204
+ self.last_activity_time = time.monotonic()
205
 
206
 
207
+ @dataclass
208
+ class LoopFrequencyStats:
209
+ """Track rolling loop frequency statistics."""
210
+
211
+ mean: float = 0.0
212
+ m2: float = 0.0
213
+ min_freq: float = float("inf")
214
+ count: int = 0
215
+ last_freq: float = 0.0
216
+ potential_freq: float = 0.0
217
 
218
+ def reset(self) -> None:
219
+ """Reset accumulators while keeping the last potential frequency."""
220
+ self.mean = 0.0
221
+ self.m2 = 0.0
222
+ self.min_freq = float("inf")
223
+ self.count = 0
224
+
225
+
226
+ class MovementManager:
227
+ """Coordinate sequential moves, additive offsets, and robot output at 100 Hz.
228
+
229
+ Responsibilities:
230
+ - Own a real-time loop that samples the current primary move (if any), fuses
231
+ secondary offsets, and calls `set_target` exactly once per tick.
232
+ - Start an idle `BreathingMove` after `idle_inactivity_delay` when not
233
+ listening and no moves are queued.
234
+ - Expose thread-safe APIs so other threads can enqueue moves, mark activity,
235
+ or feed secondary offsets without touching internal state.
236
+
237
+ Timing:
238
+ - All elapsed-time calculations rely on `time.monotonic()` through `self._now`
239
+ to avoid wall-clock jumps.
240
+ - The loop attempts 100 Hz
241
+
242
+ Concurrency:
243
+ - External threads communicate via `_command_queue` messages.
244
+ - Secondary offsets are staged via dirty flags guarded by locks and consumed
245
+ atomically inside the worker loop.
246
  """
247
 
248
  def __init__(
249
  self,
250
  current_robot: ReachyMini,
 
251
  camera_worker=None,
252
  ):
253
  """Initialize movement manager."""
254
  self.current_robot = current_robot
 
255
  self.camera_worker = camera_worker
256
 
257
+ # Single timing source for durations
258
+ self._now = time.monotonic
259
+
260
  # Movement state
261
  self.state = MovementState()
262
+ self.state.last_activity_time = self._now()
263
  neutral_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
264
  self.state.last_primary_pose = (neutral_pose, (0.0, 0.0), 0.0)
265
 
 
267
  self.move_queue = deque()
268
 
269
  # Configuration
270
+ self.idle_inactivity_delay = 0.3 # seconds
271
+ self.target_frequency = CONTROL_LOOP_FREQUENCY_HZ
272
  self.target_period = 1.0 / self.target_frequency
273
 
274
  self._stop_event = threading.Event()
275
  self._thread: Optional[threading.Thread] = None
 
276
  self._is_listening = False
277
  self._last_commanded_pose: FullBodyPose = clone_full_body_pose(self.state.last_primary_pose)
278
  self._listening_antennas: Tuple[float, float] = self._last_commanded_pose[1]
279
  self._antenna_unfreeze_blend = 1.0
280
  self._antenna_blend_duration = 0.4 # seconds to blend back after listening
281
+ self._last_listening_blend_time = self._now()
282
+ self._breathing_active = False # true when breathing move is running or queued
283
+ self._listening_debounce_s = 0.15
284
+ self._last_listening_toggle_time = self._now()
285
+ self._last_set_target_err = 0.0
286
+ self._set_target_err_interval = 1.0 # seconds between error logs
287
+ self._set_target_err_suppressed = 0
288
+
289
+ # Cross-thread signalling
290
+ self._command_queue: Queue[tuple[str, Any]] = Queue()
291
+ self._speech_offsets_lock = threading.Lock()
292
+ self._pending_speech_offsets: Tuple[float, float, float, float, float, float] = (
293
+ 0.0,
294
+ 0.0,
295
+ 0.0,
296
+ 0.0,
297
+ 0.0,
298
+ 0.0,
299
+ )
300
+ self._speech_offsets_dirty = False
301
+
302
+ self._face_offsets_lock = threading.Lock()
303
+ self._pending_face_offsets: Tuple[float, float, float, float, float, float] = (
304
+ 0.0,
305
+ 0.0,
306
+ 0.0,
307
+ 0.0,
308
+ 0.0,
309
+ 0.0,
310
+ )
311
+ self._face_offsets_dirty = False
312
+
313
+ self._shared_state_lock = threading.Lock()
314
+ self._shared_last_activity_time = self.state.last_activity_time
315
+ self._shared_is_listening = self._is_listening
316
+ self._status_lock = threading.Lock()
317
+ self._freq_stats = LoopFrequencyStats()
318
+ self._freq_snapshot = LoopFrequencyStats()
319
 
320
  def queue_move(self, move: Move) -> None:
321
+ """Queue a primary move to run after the currently executing one.
322
+
323
+ Thread-safe: the move is enqueued via the worker command queue so the
324
+ control loop remains the sole mutator of movement state.
325
+ """
326
+ self._command_queue.put(("queue_move", move))
327
 
328
  def clear_queue(self) -> None:
329
+ """Stop the active move and discard any queued primary moves.
 
 
 
 
 
 
330
 
331
+ Thread-safe: executed by the worker thread via the command queue.
332
+ """
333
+ self._command_queue.put(("clear_queue", None))
 
334
 
335
+ def set_speech_offsets(self, offsets: Tuple[float, float, float, float, float, float]) -> None:
336
+ """Update speech-induced secondary offsets (x, y, z, roll, pitch, yaw).
 
337
 
338
+ Offsets are interpreted as metres for translation and radians for
339
+ rotation in the world frame. Thread-safe via a pending snapshot.
340
+ """
341
+ with self._speech_offsets_lock:
342
+ self._pending_speech_offsets = offsets
343
+ self._speech_offsets_dirty = True
344
 
345
  def set_moving_state(self, duration: float) -> None:
346
+ """Mark the robot as actively moving for the provided duration.
 
 
 
 
347
 
348
+ Legacy hook used by goto helpers to keep inactivity and breathing logic
349
+ aware of manual motions. Thread-safe via the command queue.
350
+ """
351
+ self._command_queue.put(("set_moving_state", duration))
352
+
353
+ def is_idle(self) -> bool:
354
+ """Return True when the robot has been inactive longer than the idle delay."""
355
+ with self._shared_state_lock:
356
+ last_activity = self._shared_last_activity_time
357
+ listening = self._shared_is_listening
358
+
359
+ if listening:
360
+ return False
361
+
362
+ return self._now() - last_activity >= self.idle_inactivity_delay
363
 
364
  def mark_user_activity(self) -> None:
365
+ """Record external activity and postpone idle behaviours (thread-safe)."""
366
+ self._command_queue.put(("mark_activity", None))
 
367
 
368
  def set_listening(self, listening: bool) -> None:
369
+ """Enable or disable listening mode without touching shared state directly.
370
+
371
+ While listening:
372
+ - Antenna positions are frozen at the last commanded values.
373
+ - Blending is reset so that upon unfreezing the antennas return smoothly.
374
+ - Idle breathing is suppressed.
375
+
376
+ Thread-safe: the change is posted to the worker command queue.
377
+ """
378
+ with self._shared_state_lock:
379
+ if self._shared_is_listening == listening:
380
+ return
381
+ self._command_queue.put(("set_listening", listening))
382
+
383
+ def _poll_signals(self, current_time: float) -> None:
384
+ """Apply queued commands and pending offset updates."""
385
+ self._apply_pending_offsets()
386
+
387
+ while True:
388
+ try:
389
+ command, payload = self._command_queue.get_nowait()
390
+ except Empty:
391
+ break
392
+ self._handle_command(command, payload, current_time)
393
+
394
+ def _apply_pending_offsets(self) -> None:
395
+ """Apply the most recent speech/face offset updates."""
396
+ speech_offsets: Optional[Tuple[float, float, float, float, float, float]] = None
397
+ with self._speech_offsets_lock:
398
+ if self._speech_offsets_dirty:
399
+ speech_offsets = self._pending_speech_offsets
400
+ self._speech_offsets_dirty = False
401
+
402
+ if speech_offsets is not None:
403
+ self.state.speech_offsets = speech_offsets
404
+ self.state.update_activity()
405
+
406
+ face_offsets: Optional[Tuple[float, float, float, float, float, float]] = None
407
+ with self._face_offsets_lock:
408
+ if self._face_offsets_dirty:
409
+ face_offsets = self._pending_face_offsets
410
+ self._face_offsets_dirty = False
411
+
412
+ if face_offsets is not None:
413
+ self.state.face_tracking_offsets = face_offsets
414
+ self.state.update_activity()
415
+
416
+ def _handle_command(self, command: str, payload: Any, current_time: float) -> None:
417
+ """Handle a single cross-thread command."""
418
+ if command == "queue_move":
419
+ if isinstance(payload, Move):
420
+ self.move_queue.append(payload)
421
+ self.state.update_activity()
422
+ duration = getattr(payload, "duration", None)
423
+ if duration is not None:
424
+ try:
425
+ duration_str = f"{float(duration):.2f}"
426
+ except (TypeError, ValueError):
427
+ duration_str = str(duration)
428
+ else:
429
+ duration_str = "?"
430
+ logger.info(
431
+ "Queued move with duration %ss, queue size: %s",
432
+ duration_str,
433
+ len(self.move_queue),
434
+ )
435
+ else:
436
+ logger.warning("Ignored queue_move command with invalid payload: %s", payload)
437
+ elif command == "clear_queue":
438
+ self.move_queue.clear()
439
+ self.state.current_move = None
440
+ self.state.move_start_time = None
441
+ self.state.is_playing_move = False
442
+ self._breathing_active = False
443
+ logger.info("Cleared move queue and stopped current move")
444
+ elif command == "set_moving_state":
445
+ try:
446
+ duration = float(payload)
447
+ except (TypeError, ValueError):
448
+ logger.warning("Invalid moving state duration: %s", payload)
449
+ return
450
+ self.state.moving_start = current_time
451
+ self.state.moving_for = max(0.0, duration)
452
+ self.state.update_activity()
453
+ elif command == "mark_activity":
454
+ self.state.update_activity()
455
+ elif command == "set_listening":
456
+ desired_state = bool(payload)
457
+ now = self._now()
458
+ if now - self._last_listening_toggle_time < self._listening_debounce_s:
459
  return
460
+ self._last_listening_toggle_time = now
461
+
462
+ if self._is_listening == desired_state:
463
+ return
464
+
465
+ self._is_listening = desired_state
466
+ self._last_listening_blend_time = now
467
+ if desired_state:
468
+ # Freeze: snapshot current commanded antennas and reset blend
469
  self._listening_antennas = (
470
  float(self._last_commanded_pose[1][0]),
471
  float(self._last_commanded_pose[1][1]),
472
  )
473
  self._antenna_unfreeze_blend = 0.0
474
+ else:
475
+ # Unfreeze: restart blending from frozen pose
476
+ self._antenna_unfreeze_blend = 0.0
477
  self.state.update_activity()
478
+ else:
479
+ logger.warning("Unknown command received by MovementManager: %s", command)
480
+
481
+ def _publish_shared_state(self) -> None:
482
+ """Expose idle-related state for external threads."""
483
+ with self._shared_state_lock:
484
+ self._shared_last_activity_time = self.state.last_activity_time
485
+ self._shared_is_listening = self._is_listening
486
 
487
  def _manage_move_queue(self, current_time: float) -> None:
488
  """Manage the primary move queue (sequential execution)."""
489
+ if self.state.current_move is None or (
490
+ self.state.move_start_time is not None
491
+ and current_time - self.state.move_start_time >= self.state.current_move.duration
492
+ ):
493
+ self.state.current_move = None
494
+ self.state.move_start_time = None
495
+
496
+ if self.move_queue:
497
+ self.state.current_move = self.move_queue.popleft()
498
+ self.state.move_start_time = current_time
499
+ # Any real move cancels breathing mode flag
500
+ self._breathing_active = isinstance(self.state.current_move, BreathingMove)
501
+ logger.info(f"Starting new move, duration: {self.state.current_move.duration}s")
502
 
503
  def _manage_breathing(self, current_time: float) -> None:
504
  """Manage automatic breathing when idle."""
505
+ if (
506
+ self.state.current_move is None
507
+ and not self.move_queue
508
+ and not self._is_listening
509
+ and not self._breathing_active
510
+ ):
511
+ idle_for = current_time - self.state.last_activity_time
512
+ if idle_for >= self.idle_inactivity_delay:
513
+ try:
514
+ # These 2 functions return the latest available sensor data from the robot, but don't perform I/O synchronously.
515
+ # Therefore, we accept calling them inside the control loop.
516
+ _, current_antennas = self.current_robot.get_current_joint_positions()
517
+ current_head_pose = self.current_robot.get_current_head_pose()
518
+
519
+ self._breathing_active = True
520
+ self.state.update_activity()
521
+
522
+ breathing_move = BreathingMove(
523
+ interpolation_start_pose=current_head_pose,
524
+ interpolation_start_antennas=current_antennas,
525
+ interpolation_duration=1.0,
526
+ )
527
+ self.move_queue.append(breathing_move)
528
+ logger.info("Started breathing after %.1fs of inactivity", idle_for)
529
+ except Exception as e:
530
+ self._breathing_active = False
531
+ logger.error("Failed to start breathing: %s", e)
532
+
533
+ if isinstance(self.state.current_move, BreathingMove) and self.move_queue:
534
+ self.state.current_move = None
535
+ self.state.move_start_time = None
536
+ self._breathing_active = False
537
+ logger.info("Stopping breathing due to new move activity")
538
 
539
+ if self.state.current_move is not None and not isinstance(self.state.current_move, BreathingMove):
540
+ self._breathing_active = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
541
 
542
  def _get_primary_pose(self, current_time: float) -> FullBodyPose:
543
  """Get the primary full body pose from current move or neutral."""
544
+ # When a primary move is playing, sample it and cache the resulting pose
545
+ if self.state.current_move is not None and self.state.move_start_time is not None:
546
+ move_time = current_time - self.state.move_start_time
547
+ head, antennas, body_yaw = self.state.current_move.evaluate(move_time)
548
+
549
+ if head is None:
550
+ head = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
551
+ if antennas is None:
552
+ antennas = np.array([0.0, 0.0])
553
+ if body_yaw is None:
554
+ body_yaw = 0.0
555
+
556
+ antennas_tuple = (float(antennas[0]), float(antennas[1]))
557
+ head_copy = head.copy()
558
+ primary_full_body_pose = (
559
+ head_copy,
560
+ antennas_tuple,
561
+ float(body_yaw),
562
+ )
 
563
 
564
+ self.state.is_playing_move = True
565
+ self.state.is_moving = True
566
+ self.state.last_primary_pose = clone_full_body_pose(primary_full_body_pose)
567
+ else:
568
+ # Otherwise reuse the last primary pose so we avoid jumps between moves
569
+ self.state.is_playing_move = False
570
+ self.state.is_moving = current_time - self.state.moving_start < self.state.moving_for
571
 
572
+ if self.state.last_primary_pose is not None:
573
+ primary_full_body_pose = clone_full_body_pose(self.state.last_primary_pose)
574
+ else:
575
+ neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
576
+ primary_full_body_pose = (neutral_head_pose, (0.0, 0.0), 0.0)
577
+ self.state.last_primary_pose = clone_full_body_pose(primary_full_body_pose)
578
 
579
  return primary_full_body_pose
580
 
581
  def _get_secondary_pose(self) -> FullBodyPose:
582
  """Get the secondary full body pose from speech and face tracking offsets."""
583
  # Combine speech sway offsets + face tracking offsets for secondary pose
584
+ secondary_offsets = [
585
+ self.state.speech_offsets[0] + self.state.face_tracking_offsets[0],
586
+ self.state.speech_offsets[1] + self.state.face_tracking_offsets[1],
587
+ self.state.speech_offsets[2] + self.state.face_tracking_offsets[2],
588
+ self.state.speech_offsets[3] + self.state.face_tracking_offsets[3],
589
+ self.state.speech_offsets[4] + self.state.face_tracking_offsets[4],
590
+ self.state.speech_offsets[5] + self.state.face_tracking_offsets[5],
591
+ ]
 
592
 
593
  secondary_head_pose = create_head_pose(
594
  x=secondary_offsets[0],
 
600
  degrees=False,
601
  mm=False,
602
  )
603
+ return (secondary_head_pose, (0.0, 0.0), 0.0)
604
+
605
+ def _compose_full_body_pose(self, current_time: float) -> FullBodyPose:
606
+ """Compose primary and secondary poses into a single command pose."""
607
+ primary = self._get_primary_pose(current_time)
608
+ secondary = self._get_secondary_pose()
609
+ return combine_full_body(primary, secondary)
610
+
611
+ def _update_primary_motion(self, current_time: float) -> None:
612
+ """Advance queue state and idle behaviours for this tick."""
613
+ self._manage_move_queue(current_time)
614
+ self._manage_breathing(current_time)
615
+
616
+ def _calculate_blended_antennas(self, target_antennas: Tuple[float, float]) -> Tuple[float, float]:
617
+ """Blend target antennas with listening freeze state and update blending."""
618
+ now = self._now()
619
+ listening = self._is_listening
620
+ listening_antennas = self._listening_antennas
621
+ blend = self._antenna_unfreeze_blend
622
+ blend_duration = self._antenna_blend_duration
623
+ last_update = self._last_listening_blend_time
624
+ self._last_listening_blend_time = now
625
+
626
+ if listening:
627
+ antennas_cmd = listening_antennas
628
+ new_blend = 0.0
629
+ else:
630
+ dt = max(0.0, now - last_update)
631
+ if blend_duration <= 0:
632
+ new_blend = 1.0
633
+ else:
634
+ new_blend = min(1.0, blend + dt / blend_duration)
635
+ antennas_cmd = (
636
+ listening_antennas[0] * (1.0 - new_blend) + target_antennas[0] * new_blend,
637
+ listening_antennas[1] * (1.0 - new_blend) + target_antennas[1] * new_blend,
638
+ )
639
+
640
+ if listening:
641
+ self._antenna_unfreeze_blend = 0.0
642
+ else:
643
+ self._antenna_unfreeze_blend = new_blend
644
+ if new_blend >= 1.0:
645
+ self._listening_antennas = (
646
+ float(target_antennas[0]),
647
+ float(target_antennas[1]),
648
+ )
649
+
650
+ return antennas_cmd
651
+
652
+ def _issue_control_command(self, head: np.ndarray, antennas: Tuple[float, float], body_yaw: float) -> None:
653
+ """Send the fused pose to the robot with throttled error logging."""
654
+ try:
655
+ self.current_robot.set_target(head=head, antennas=antennas, body_yaw=body_yaw)
656
+ except Exception as e:
657
+ now = self._now()
658
+ if now - self._last_set_target_err >= self._set_target_err_interval:
659
+ msg = f"Failed to set robot target: {e}"
660
+ if self._set_target_err_suppressed:
661
+ msg += f" (suppressed {self._set_target_err_suppressed} repeats)"
662
+ self._set_target_err_suppressed = 0
663
+ logger.error(msg)
664
+ self._last_set_target_err = now
665
+ else:
666
+ self._set_target_err_suppressed += 1
667
+ else:
668
+ with self._status_lock:
669
+ self._last_commanded_pose = clone_full_body_pose((head, antennas, body_yaw))
670
+
671
+ def _update_frequency_stats(
672
+ self, loop_start: float, prev_loop_start: float, stats: LoopFrequencyStats
673
+ ) -> LoopFrequencyStats:
674
+ """Update frequency statistics based on the current loop start time."""
675
+ period = loop_start - prev_loop_start
676
+ if period > 0:
677
+ stats.last_freq = 1.0 / period
678
+ stats.count += 1
679
+ delta = stats.last_freq - stats.mean
680
+ stats.mean += delta / stats.count
681
+ stats.m2 += delta * (stats.last_freq - stats.mean)
682
+ stats.min_freq = min(stats.min_freq, stats.last_freq)
683
+ return stats
684
+
685
+ def _schedule_next_tick(self, loop_start: float, stats: LoopFrequencyStats) -> tuple[float, LoopFrequencyStats]:
686
+ """Compute sleep time to maintain target frequency and update potential freq."""
687
+ computation_time = self._now() - loop_start
688
+ stats.potential_freq = 1.0 / computation_time if computation_time > 0 else float("inf")
689
+ sleep_time = max(0.0, self.target_period - computation_time)
690
+ return sleep_time, stats
691
+
692
+ def _record_frequency_snapshot(self, stats: LoopFrequencyStats) -> None:
693
+ """Store a thread-safe snapshot of current frequency statistics."""
694
+ with self._status_lock:
695
+ self._freq_snapshot = LoopFrequencyStats(
696
+ mean=stats.mean,
697
+ m2=stats.m2,
698
+ min_freq=stats.min_freq,
699
+ count=stats.count,
700
+ last_freq=stats.last_freq,
701
+ potential_freq=stats.potential_freq,
702
+ )
703
+
704
+ def _maybe_log_frequency(self, loop_count: int, print_interval_loops: int, stats: LoopFrequencyStats) -> None:
705
+ """Emit frequency telemetry when enough loops have elapsed."""
706
+ if loop_count % print_interval_loops != 0 or stats.count == 0:
707
+ return
708
+
709
+ variance = stats.m2 / stats.count if stats.count > 0 else 0.0
710
+ lowest = stats.min_freq if stats.min_freq != float("inf") else 0.0
711
+ logger.debug(
712
+ "Loop freq - avg: %.2fHz, variance: %.4f, min: %.2fHz, last: %.2fHz, potential: %.2fHz, target: %.1fHz",
713
+ stats.mean,
714
+ variance,
715
+ lowest,
716
+ stats.last_freq,
717
+ stats.potential_freq,
718
+ self.target_frequency,
719
+ )
720
+ stats.reset()
721
 
722
  def _update_face_tracking(self, current_time: float) -> None:
723
  """Get face tracking offsets from camera worker thread."""
724
  if self.camera_worker is not None:
725
  # Get face tracking offsets from camera worker thread
726
  offsets = self.camera_worker.get_face_tracking_offsets()
727
+ self.state.face_tracking_offsets = offsets
 
728
  else:
729
  # No camera worker, use neutral offsets
730
+ self.state.face_tracking_offsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
 
731
 
732
  def start(self) -> None:
733
+ """Start the worker thread that drives the 100 Hz control loop."""
734
+ if self._thread is not None and self._thread.is_alive():
735
+ logger.warning("Move worker already running; start() ignored")
736
+ return
737
  self._stop_event.clear()
738
  self._thread = threading.Thread(target=self.working_loop, daemon=True)
739
  self._thread.start()
740
  logger.debug("Move worker started")
741
 
742
  def stop(self) -> None:
743
+ """Request the worker thread to stop and wait for it to exit."""
744
  self._stop_event.set()
745
  if self._thread is not None:
746
  self._thread.join()
747
+ self._thread = None
748
+ logger.debug("Move worker stopped")
749
  logger.debug("Move worker stopped")
750
 
751
+ def get_status(self) -> dict[str, Any]:
752
+ """Return a lightweight status snapshot for observability."""
753
+ with self._status_lock:
754
+ pose_snapshot = clone_full_body_pose(self._last_commanded_pose)
755
+ freq_snapshot = LoopFrequencyStats(
756
+ mean=self._freq_snapshot.mean,
757
+ m2=self._freq_snapshot.m2,
758
+ min_freq=self._freq_snapshot.min_freq,
759
+ count=self._freq_snapshot.count,
760
+ last_freq=self._freq_snapshot.last_freq,
761
+ potential_freq=self._freq_snapshot.potential_freq,
762
+ )
763
+
764
+ head_matrix = pose_snapshot[0].tolist() if pose_snapshot else None
765
+ antennas = pose_snapshot[1] if pose_snapshot else None
766
+ body_yaw = pose_snapshot[2] if pose_snapshot else None
767
+
768
+ return {
769
+ "queue_size": len(self.move_queue),
770
+ "is_listening": self._is_listening,
771
+ "breathing_active": self._breathing_active,
772
+ "last_commanded_pose": {
773
+ "head": head_matrix,
774
+ "antennas": antennas,
775
+ "body_yaw": body_yaw,
776
+ },
777
+ "loop_frequency": {
778
+ "last": freq_snapshot.last_freq,
779
+ "mean": freq_snapshot.mean,
780
+ "min": freq_snapshot.min_freq,
781
+ "potential": freq_snapshot.potential_freq,
782
+ "samples": freq_snapshot.count,
783
+ },
784
+ }
785
+
786
  def working_loop(self) -> None:
787
  """Control loop main movements - reproduces main_works.py control architecture.
788
 
789
  Single set_target() call with pose fusion.
790
  """
791
+ logger.debug("Starting enhanced movement control loop (100Hz)")
792
 
793
  loop_count = 0
794
+ prev_loop_start = self._now()
795
+ print_interval_loops = max(1, int(self.target_frequency * 2))
796
+ freq_stats = self._freq_stats
797
 
798
  while not self._stop_event.is_set():
799
+ loop_start = self._now()
800
  loop_count += 1
 
801
 
802
+ if loop_count > 1:
803
+ freq_stats = self._update_frequency_stats(loop_start, prev_loop_start, freq_stats)
804
+ prev_loop_start = loop_start
805
 
806
+ # 1) Poll external commands and apply pending offsets (atomic snapshot)
807
+ self._poll_signals(loop_start)
808
 
809
+ # 2) Manage the primary move queue (start new move, end finished move, breathing)
810
+ self._update_primary_motion(loop_start)
811
 
812
+ # 3) Update vision-based secondary offsets
813
+ self._update_face_tracking(loop_start)
814
 
815
+ # 4) Build primary and secondary full-body poses, then fuse them
816
+ head, antennas, body_yaw = self._compose_full_body_pose(loop_start)
817
 
818
+ # 5) Apply listening antenna freeze or blend-back
819
+ antennas_cmd = self._calculate_blended_antennas(antennas)
820
 
821
+ # 6) Single set_target call - the only control point
822
+ self._issue_control_command(head, antennas_cmd, body_yaw)
 
 
 
 
 
 
 
 
823
 
824
+ # 7) Adaptive sleep to align to next tick, then publish shared state
825
+ sleep_time, freq_stats = self._schedule_next_tick(loop_start, freq_stats)
826
+ self._publish_shared_state()
827
+ self._record_frequency_snapshot(freq_stats)
 
 
 
 
 
 
 
 
 
 
828
 
829
+ # 8) Periodic telemetry on loop frequency
830
+ self._maybe_log_frequency(loop_count, print_interval_loops, freq_stats)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
831
 
832
+ if sleep_time > 0:
833
+ time.sleep(sleep_time)
834
 
835
  logger.debug("Movement control loop stopped")
tests/audio/test_head_wobbler.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Regression tests for the audio-driven head wobble behaviour."""
2
+
3
+ import math
4
+ import time
5
+ import base64
6
+ import threading
7
+ from typing import List, Tuple, Callable
8
+
9
+ import numpy as np
10
+
11
+ from reachy_mini_conversation_demo.audio.head_wobbler import HeadWobbler
12
+
13
+
14
+ def _make_audio_chunk(duration_s: float = 0.3, frequency_hz: float = 220.0) -> str:
15
+ """Generate a base64-encoded mono PCM16 sine wave."""
16
+ sample_rate = 24000
17
+ sample_count = int(sample_rate * duration_s)
18
+ t = np.linspace(0, duration_s, sample_count, endpoint=False)
19
+ wave = 0.6 * np.sin(2 * math.pi * frequency_hz * t)
20
+ pcm = np.clip(wave * np.iinfo(np.int16).max, -32768, 32767).astype(np.int16)
21
+ return base64.b64encode(pcm.tobytes()).decode("ascii")
22
+
23
+
24
+ def _wait_for(predicate: Callable[[], bool], timeout: float = 0.6) -> bool:
25
+ """Poll `predicate` until true or timeout."""
26
+ end_time = time.time() + timeout
27
+ while time.time() < end_time:
28
+ if predicate():
29
+ return True
30
+ time.sleep(0.01)
31
+ return False
32
+
33
+
34
+ def _start_wobbler() -> Tuple[HeadWobbler, List[Tuple[float, Tuple[float, float, float, float, float, float]]]]:
35
+ captured: List[Tuple[float, Tuple[float, float, float, float, float, float]]] = []
36
+
37
+ def capture(offsets: Tuple[float, float, float, float, float, float]) -> None:
38
+ captured.append((time.time(), offsets))
39
+
40
+ wobbler = HeadWobbler(set_speech_offsets=capture)
41
+ wobbler.start()
42
+ return wobbler, captured
43
+
44
+
45
+ def test_reset_drops_pending_offsets() -> None:
46
+ """Reset should stop wobble output derived from pre-reset audio."""
47
+ wobbler, captured = _start_wobbler()
48
+ try:
49
+ wobbler.feed(_make_audio_chunk(duration_s=0.35))
50
+ assert _wait_for(lambda: len(captured) > 0), "wobbler did not emit initial offsets"
51
+
52
+ pre_reset_count = len(captured)
53
+ wobbler.reset()
54
+ time.sleep(0.3)
55
+ assert len(captured) == pre_reset_count, "offsets continued after reset without new audio"
56
+ finally:
57
+ wobbler.stop()
58
+
59
+
60
+ def test_reset_allows_future_offsets() -> None:
61
+ """After reset, fresh audio must still produce wobble offsets."""
62
+ wobbler, captured = _start_wobbler()
63
+ try:
64
+ wobbler.feed(_make_audio_chunk(duration_s=0.35))
65
+ assert _wait_for(lambda: len(captured) > 0), "wobbler did not emit initial offsets"
66
+
67
+ wobbler.reset()
68
+ pre_second_count = len(captured)
69
+
70
+ wobbler.feed(_make_audio_chunk(duration_s=0.35, frequency_hz=440.0))
71
+ assert _wait_for(lambda: len(captured) > pre_second_count), "no offsets after reset"
72
+ assert wobbler._thread is not None and wobbler._thread.is_alive()
73
+ finally:
74
+ wobbler.stop()
75
+
76
+
77
+ def test_reset_during_inflight_chunk_keeps_worker(monkeypatch) -> None:
78
+ """Simulate reset during chunk processing to ensure the worker survives."""
79
+ wobbler, captured = _start_wobbler()
80
+ ready = threading.Event()
81
+ release = threading.Event()
82
+
83
+ original_feed = wobbler.sway.feed
84
+
85
+ def blocking_feed(pcm, sr): # type: ignore[no-untyped-def]
86
+ ready.set()
87
+ release.wait(timeout=2.0)
88
+ return original_feed(pcm, sr)
89
+
90
+ monkeypatch.setattr(wobbler.sway, "feed", blocking_feed)
91
+
92
+ try:
93
+ wobbler.feed(_make_audio_chunk(duration_s=0.35))
94
+ assert ready.wait(timeout=1.0), "worker thread did not dequeue audio"
95
+
96
+ wobbler.reset()
97
+ release.set()
98
+
99
+ # Allow the worker to finish processing the first chunk (which should be discarded)
100
+ time.sleep(0.1)
101
+
102
+ assert wobbler._thread is not None and wobbler._thread.is_alive(), "worker thread died after reset"
103
+
104
+ pre_second = len(captured)
105
+ wobbler.feed(_make_audio_chunk(duration_s=0.35, frequency_hz=440.0))
106
+ assert _wait_for(lambda: len(captured) > pre_second), "no offsets emitted after in-flight reset"
107
+ assert wobbler._thread.is_alive()
108
+ finally:
109
+ wobbler.stop()
tests/conftest.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pytest configuration for path setup."""
2
+
3
+ import sys
4
+ from pathlib import Path
5
+
6
+
7
+ PROJECT_ROOT = Path(__file__).resolve().parents[1]
8
+ SRC_PATH = PROJECT_ROOT / "src"
9
+ if str(SRC_PATH) not in sys.path:
10
+ sys.path.insert(0, str(SRC_PATH))