RemiFabre commited on
Commit
d9dc0b8
·
1 Parent(s): e993db5

Adding debug mode

Browse files
Files changed (2) hide show
  1. README.md +4 -0
  2. Theremini/main.py +104 -6
README.md CHANGED
@@ -19,6 +19,10 @@ Theremini turns Reachy Mini into a music instrument. The robot's head roll sets
19
 
20
  When the app starts, Reachy performs a short guided motion (center the head, oscillate the roll, sweep the antenna) so musicians immediately hear how gestures map to sound before the motors are relaxed.
21
 
 
 
 
 
22
  ## Controls
23
 
24
  | Control | Range | Target |
 
19
 
20
  When the app starts, Reachy performs a short guided motion (center the head, oscillate the roll, sweep the antenna) so musicians immediately hear how gestures map to sound before the motors are relaxed.
21
 
22
+ ## Debug mode
23
+
24
+ For latency benchmarking without manual gestures, launch `python main.py --debug`. The robot will loop through a small roll sine sweep followed by a two-second pause, while the console prints motion commands and corresponding sound-trigger times so you can compare movement-to-audio delays on the CM4.
25
+
26
  ## Controls
27
 
28
  | Control | Range | Target |
Theremini/main.py CHANGED
@@ -1,5 +1,6 @@
1
  from __future__ import annotations
2
 
 
3
  import contextlib
4
  import io
5
  import math
@@ -51,7 +52,7 @@ class Theremini(ReachyMiniApp):
51
  custom_app_url: str | None = f"http://localhost:{DASHBOARD_PORT}/index.html"
52
  dont_start_webserver = True
53
 
54
- def __init__(self) -> None:
55
  super().__init__()
56
  self._session = Session(max_threads=1024)
57
  self._parts_cache: dict[str, Any] = {}
@@ -62,6 +63,10 @@ class Theremini(ReachyMiniApp):
62
  self._current_prog: int | None = None
63
  self._dashboard_port = DASHBOARD_PORT
64
  self.custom_app_url = f"http://localhost:{self._dashboard_port}/index.html"
 
 
 
 
65
 
66
  def _get_part_for_prog(self, prog: int):
67
  name = self._active_parts[prog]
@@ -177,15 +182,20 @@ class Theremini(ReachyMiniApp):
177
  self._stop_note()
178
  return
179
 
 
180
  if self._note_handle is None:
181
  self._note_handle = self._theremin.start_note(target_pitch, int(amp * 100))
182
  self._current_pitch = target_pitch
 
183
  elif target_pitch != self._current_pitch:
184
  self._stop_note()
185
  self._note_handle = self._theremin.start_note(target_pitch, int(amp * 100))
186
  self._current_pitch = target_pitch
 
187
  else:
188
  _send_cc(self._theremin, 11, int(amp * 127))
 
 
189
 
190
  def _tick_from_robot(self, reachy_mini: ReachyMini) -> None:
191
  self._ensure_active_parts_fresh()
@@ -325,6 +335,82 @@ class Theremini(ReachyMiniApp):
325
  break
326
  time.sleep(0.02)
327
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
328
  def _angle_for_instrument_index(self, index: int) -> float:
329
  if len(self._active_parts) <= 1:
330
  return 0.0
@@ -395,14 +481,26 @@ class Theremini(ReachyMiniApp):
395
 
396
 
397
  if __name__ == "__main__":
 
 
 
 
 
 
 
 
398
  with ReachyMini(media_backend="no_media") as mini:
399
- app = Theremini()
400
  stop = threading.Event()
401
  try:
402
- print("Running 'Theremini' a ReachyMiniApp...")
403
- print("Press Ctrl+C to stop the app.")
404
- app.run(mini, stop)
405
- print("App has stopped.")
 
 
 
 
406
  except KeyboardInterrupt:
407
  print("Stopping the app...")
408
  stop.set()
 
1
  from __future__ import annotations
2
 
3
+ import argparse
4
  import contextlib
5
  import io
6
  import math
 
52
  custom_app_url: str | None = f"http://localhost:{DASHBOARD_PORT}/index.html"
53
  dont_start_webserver = True
54
 
55
+ def __init__(self, debug_mode: bool = False) -> None:
56
  super().__init__()
57
  self._session = Session(max_threads=1024)
58
  self._parts_cache: dict[str, Any] = {}
 
63
  self._current_prog: int | None = None
64
  self._dashboard_port = DASHBOARD_PORT
65
  self.custom_app_url = f"http://localhost:{self._dashboard_port}/index.html"
66
+ self._debug_mode = debug_mode
67
+ self._debug_cycle_id = 0
68
+ self._debug_last_motion_command: float | None = None
69
+ self._debug_last_motion_log: float | None = None
70
 
71
  def _get_part_for_prog(self, prog: int):
72
  name = self._active_parts[prog]
 
182
  self._stop_note()
183
  return
184
 
185
+ debug_event: str | None = None
186
  if self._note_handle is None:
187
  self._note_handle = self._theremin.start_note(target_pitch, int(amp * 100))
188
  self._current_pitch = target_pitch
189
+ debug_event = "start"
190
  elif target_pitch != self._current_pitch:
191
  self._stop_note()
192
  self._note_handle = self._theremin.start_note(target_pitch, int(amp * 100))
193
  self._current_pitch = target_pitch
194
+ debug_event = "pitch-change"
195
  else:
196
  _send_cc(self._theremin, 11, int(amp * 127))
197
+ if self._debug_mode and debug_event:
198
+ self._log_debug_sound_event(debug_event, target_pitch, amp)
199
 
200
  def _tick_from_robot(self, reachy_mini: ReachyMini) -> None:
201
  self._ensure_active_parts_fresh()
 
335
  break
336
  time.sleep(0.02)
337
 
338
+ def _log_debug_sound_event(self, event: str, pitch: int, amp: float) -> None:
339
+ if not self._debug_mode:
340
+ return
341
+ now = time.time()
342
+ since_motion = (
343
+ now - self._debug_last_motion_command
344
+ if self._debug_last_motion_command is not None
345
+ else float("nan")
346
+ )
347
+ print(
348
+ f"[debug][sound] cycle={self._debug_cycle_id} event={event} pitch={pitch}"
349
+ f" amp={amp:.2f} dt_since_motion={since_motion:.3f}s"
350
+ )
351
+
352
+ def _debug_motion_cycle(
353
+ self,
354
+ reachy_mini: ReachyMini,
355
+ stop_event: threading.Event,
356
+ amplitude_deg: float = 15.0,
357
+ sweep_duration: float = 1.0,
358
+ pause_duration: float = 2.0,
359
+ ) -> None:
360
+ self._debug_cycle_id += 1
361
+ cycle_id = self._debug_cycle_id
362
+ print(f"[debug] Starting sweep cycle {cycle_id}")
363
+ start = time.time()
364
+ self._debug_last_motion_log = None
365
+ while not stop_event.is_set():
366
+ elapsed = time.time() - start
367
+ if elapsed >= sweep_duration:
368
+ break
369
+ roll = amplitude_deg * math.sin(2 * math.pi * elapsed / sweep_duration)
370
+ pose = create_head_pose(z=0.0, roll=roll, degrees=True)
371
+ reachy_mini.set_target(head=pose)
372
+ now = time.time()
373
+ self._debug_last_motion_command = now
374
+ if self._debug_mode:
375
+ if self._debug_last_motion_log is None or now - self._debug_last_motion_log >= 0.2:
376
+ print(
377
+ f"[debug][motion] cycle={cycle_id} roll={roll:.2f}°"
378
+ f" t={now:.3f}"
379
+ )
380
+ self._debug_last_motion_log = now
381
+ self._tick_from_robot(reachy_mini)
382
+ time.sleep(0.02)
383
+ print(f"[debug] Cycle {cycle_id} pause {pause_duration}s")
384
+ pause_end = time.time() + pause_duration
385
+ while not stop_event.is_set() and time.time() < pause_end:
386
+ self._tick_from_robot(reachy_mini)
387
+ time.sleep(0.02)
388
+
389
+ def run_debug(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None:
390
+ print("\nDebug mode: scripted sine sweep with 2s gaps to benchmark latency.\n")
391
+ self._debug_mode = True
392
+
393
+ dashboard_server = DashboardServer(port=self._dashboard_port)
394
+ dashboard_server.start()
395
+ self._dashboard_port = dashboard_server.port_in_use
396
+ self.custom_app_url = f"http://localhost:{self._dashboard_port}/index.html"
397
+ if self._dashboard_port != DASHBOARD_PORT:
398
+ print(
399
+ f"Dashboard port {DASHBOARD_PORT} busy, now serving on http://localhost:{self._dashboard_port}/index.html"
400
+ )
401
+ set_status(self._build_status_payload(0.0, 0.0, 0.0, 0.0, None, None, False))
402
+
403
+ reachy_mini.enable_motors()
404
+ try:
405
+ while not stop_event.is_set():
406
+ self._debug_motion_cycle(reachy_mini, stop_event)
407
+ finally:
408
+ reachy_mini.disable_motors()
409
+ self._stop_note()
410
+ set_status({"playing": False})
411
+ dashboard_server.stop()
412
+ print("\nDebug Theremin loop stopped.")
413
+
414
  def _angle_for_instrument_index(self, index: int) -> float:
415
  if len(self._active_parts) <= 1:
416
  return 0.0
 
481
 
482
 
483
  if __name__ == "__main__":
484
+ parser = argparse.ArgumentParser(description="Reachy Mini Theremin controller")
485
+ parser.add_argument(
486
+ "--debug",
487
+ action="store_true",
488
+ help="Run a scripted sine-sweep loop and log timings instead of the interactive mode",
489
+ )
490
+ args = parser.parse_args()
491
+
492
  with ReachyMini(media_backend="no_media") as mini:
493
+ app = Theremini(debug_mode=args.debug)
494
  stop = threading.Event()
495
  try:
496
+ if args.debug:
497
+ print("Press Ctrl+C to stop the debug loop.")
498
+ app.run_debug(mini, stop)
499
+ else:
500
+ print("Running 'Theremini' a ReachyMiniApp...")
501
+ print("Press Ctrl+C to stop the app.")
502
+ app.run(mini, stop)
503
+ print("App has stopped.")
504
  except KeyboardInterrupt:
505
  print("Stopping the app...")
506
  stop.set()