RemiFabre commited on
Commit
1bd30d1
·
1 Parent(s): 4dc3463

Tentative fix for the long standing HeadWobbling desync issue

Browse files
src/reachy_mini_conversation_demo/audio/head_wobbler.py CHANGED
@@ -5,7 +5,7 @@ import logging
5
  import queue
6
  import threading
7
  import time
8
- from typing import Optional
9
 
10
  import numpy as np
11
 
@@ -25,16 +25,25 @@ class HeadWobbler:
25
  self._base_ts: Optional[float] = None
26
  self._hops_done: int = 0
27
 
28
- self.audio_queue: queue.Queue = queue.Queue()
 
 
29
  self.sway = SwayRollRT()
30
 
 
 
 
 
 
31
  self._stop_event = threading.Event()
32
  self._thread: Optional[threading.Thread] = None
33
 
34
  def feed(self, delta_b64: str) -> None:
35
  """Thread-safe: push audio into the consumer queue."""
36
  buf = np.frombuffer(base64.b64decode(delta_b64), dtype=np.int16).reshape(1, -1)
37
- self.audio_queue.put((SAMPLE_RATE, buf))
 
 
38
 
39
  def start(self) -> None:
40
  """Start the head wobbler loop in a thread."""
@@ -56,54 +65,84 @@ class HeadWobbler:
56
 
57
  logger.debug("Head wobbler thread started")
58
  while not self._stop_event.is_set():
 
59
  try:
60
- sr, chunk = self.audio_queue.get_nowait() # (1,N) int16
61
  except queue.Empty:
62
  # avoid while to never exit
63
  time.sleep(MOVEMENT_LATENCY_S)
64
  continue
65
 
66
- pcm = np.asarray(chunk).squeeze(0)
67
- results = self.sway.feed(pcm, sr)
68
- self.audio_queue.task_done()
 
 
69
 
70
- if self._base_ts is None:
71
- self._base_ts = time.time()
 
72
 
73
- i = 0
74
- while i < len(results):
75
  if self._base_ts is None:
76
- self._base_ts = time.time()
77
- continue
78
-
79
- target = self._base_ts + MOVEMENT_LATENCY_S + self._hops_done * hop_dt
80
- now = time.time()
81
-
82
- if now - target >= hop_dt:
83
- lag_hops = int((now - target) / hop_dt)
84
- drop = min(lag_hops, len(results) - i - 1)
85
- if drop > 0:
86
- self._hops_done += drop
87
- i += drop
88
- continue
89
-
90
- if target > now:
91
- time.sleep(target - now)
92
-
93
- r = results[i]
94
- offsets = (
95
- r["x_mm"] / 1000.0,
96
- r["y_mm"] / 1000.0,
97
- r["z_mm"] / 1000.0,
98
- r["roll_rad"],
99
- r["pitch_rad"],
100
- r["yaw_rad"],
101
- )
102
-
103
- self._apply_offsets(offsets)
104
-
105
- self._hops_done += 1
106
- i += 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  logger.debug("Head wobbler thread exited")
108
 
109
  '''
@@ -118,8 +157,24 @@ class HeadWobbler:
118
 
119
  def reset(self) -> None:
120
  """Reset the internal state."""
121
- # self.drain_audio_queue()
122
- self.audio_queue = queue.Queue()
123
- self._base_ts = None
124
- self._hops_done = 0
125
- self.sway.reset()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  import queue
6
  import threading
7
  import time
8
+ from typing import Optional, Tuple
9
 
10
  import numpy as np
11
 
 
25
  self._base_ts: Optional[float] = None
26
  self._hops_done: int = 0
27
 
28
+ self.audio_queue: queue.Queue[
29
+ Tuple[int, int, np.ndarray]
30
+ ] = queue.Queue()
31
  self.sway = SwayRollRT()
32
 
33
+ # Synchronization primitives
34
+ self._state_lock = threading.Lock()
35
+ self._sway_lock = threading.Lock()
36
+ self._generation = 0
37
+
38
  self._stop_event = threading.Event()
39
  self._thread: Optional[threading.Thread] = None
40
 
41
  def feed(self, delta_b64: str) -> None:
42
  """Thread-safe: push audio into the consumer queue."""
43
  buf = np.frombuffer(base64.b64decode(delta_b64), dtype=np.int16).reshape(1, -1)
44
+ with self._state_lock:
45
+ generation = self._generation
46
+ self.audio_queue.put((generation, SAMPLE_RATE, buf))
47
 
48
  def start(self) -> None:
49
  """Start the head wobbler loop in a thread."""
 
65
 
66
  logger.debug("Head wobbler thread started")
67
  while not self._stop_event.is_set():
68
+ queue_ref = self.audio_queue
69
  try:
70
+ chunk_generation, sr, chunk = queue_ref.get_nowait() # (gen, sr, data)
71
  except queue.Empty:
72
  # avoid while to never exit
73
  time.sleep(MOVEMENT_LATENCY_S)
74
  continue
75
 
76
+ try:
77
+ with self._state_lock:
78
+ current_generation = self._generation
79
+ if chunk_generation != current_generation:
80
+ continue
81
 
82
+ pcm = np.asarray(chunk).squeeze(0)
83
+ with self._sway_lock:
84
+ results = self.sway.feed(pcm, sr)
85
 
 
 
86
  if self._base_ts is None:
87
+ with self._state_lock:
88
+ if self._base_ts is None:
89
+ self._base_ts = time.time()
90
+
91
+ i = 0
92
+ while i < len(results):
93
+ with self._state_lock:
94
+ if self._generation != current_generation:
95
+ break
96
+ base_ts = self._base_ts
97
+ hops_done = self._hops_done
98
+
99
+ if base_ts is None:
100
+ base_ts = time.time()
101
+ with self._state_lock:
102
+ if self._base_ts is None:
103
+ self._base_ts = base_ts
104
+ hops_done = self._hops_done
105
+
106
+ target = base_ts + MOVEMENT_LATENCY_S + hops_done * hop_dt
107
+ now = time.time()
108
+
109
+ if now - target >= hop_dt:
110
+ lag_hops = int((now - target) / hop_dt)
111
+ drop = min(lag_hops, len(results) - i - 1)
112
+ if drop > 0:
113
+ with self._state_lock:
114
+ self._hops_done += drop
115
+ hops_done = self._hops_done
116
+ i += drop
117
+ continue
118
+
119
+ if target > now:
120
+ time.sleep(target - now)
121
+ with self._state_lock:
122
+ if self._generation != current_generation:
123
+ break
124
+
125
+ r = results[i]
126
+ offsets = (
127
+ r["x_mm"] / 1000.0,
128
+ r["y_mm"] / 1000.0,
129
+ r["z_mm"] / 1000.0,
130
+ r["roll_rad"],
131
+ r["pitch_rad"],
132
+ r["yaw_rad"],
133
+ )
134
+
135
+ with self._state_lock:
136
+ if self._generation != current_generation:
137
+ break
138
+
139
+ self._apply_offsets(offsets)
140
+
141
+ with self._state_lock:
142
+ self._hops_done += 1
143
+ i += 1
144
+ finally:
145
+ queue_ref.task_done()
146
  logger.debug("Head wobbler thread exited")
147
 
148
  '''
 
157
 
158
  def reset(self) -> None:
159
  """Reset the internal state."""
160
+ with self._state_lock:
161
+ self._generation += 1
162
+ self._base_ts = None
163
+ self._hops_done = 0
164
+
165
+ # Drain any queued audio chunks from previous generations
166
+ drained_any = False
167
+ while True:
168
+ try:
169
+ _, _, _ = self.audio_queue.get_nowait()
170
+ except queue.Empty:
171
+ break
172
+ else:
173
+ drained_any = True
174
+ self.audio_queue.task_done()
175
+
176
+ with self._sway_lock:
177
+ self.sway.reset()
178
+
179
+ if drained_any:
180
+ logger.debug("Head wobbler queue drained during reset")