marionette / tests /test_collision_mic.py
RemiFabre
Use new antenna neutral position [-0.1745, 0.1745] to reduce shaking
263b53b
#!/usr/bin/env python3
"""Measure laptop-to-collision delay using antenna collisions + laptop mic.
Combines the collision test (robot-side position recording) with laptop mic
recording to measure the full end-to-end delay from the laptop issuing a
command to the collision sound being captured.
Delay chain measured:
laptop SSH command β†’ robot receives β†’ daemon β†’ motor β†’ physical collision
β†’ sound through air β†’ laptop mic β†’ detected
Clock sync: Robot prints "MARK_START" to stdout. The laptop notes the time
it receives this line. The robot's t=0 maps to that laptop timestamp (with
~few ms SSH stdout latency).
Usage:
python tests/test_collision_mic.py [--host reachy-mini.local]
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import numpy as np
import sounddevice as sd
import soundfile as sf
sys.path.insert(0, str(Path(__file__).parent))
from audio_analysis import detect_transient_onsets
# Collision parameters (same as test_antenna_collision.py)
RIGHT_REST = -0.68
LEFT_REST = -0.1745
LEFT_COLLISION = 0.70
HOLD_DURATION = 0.2
COLLISION_TIMES = [1.0, 2.3, 4.0, 6.3, 9.4]
ROBOT_USER = "pollen"
ROBOT_PYTHON = "/venvs/apps_venv/bin/python"
REMOTE_RESULTS = "/tmp/collision_positions.json"
LAPTOP_SR = 48000
MIC_DURATION = 30.0 # seconds β€” covers ~8s SSH startup + 10.5s sequence + buffer
# Robot script: same as test_antenna_collision.py
ROBOT_COLLISION_SCRIPT = """\
import numpy as np
import os, sys, time, json
collision_times = json.loads(sys.argv[1])
right_rest = float(sys.argv[2])
left_rest = float(sys.argv[3])
left_collision = float(sys.argv[4])
hold_duration = float(sys.argv[5])
results_path = sys.argv[6]
print(f"robot: connecting to ReachyMini", flush=True)
from reachy_mini import ReachyMini
from reachy_mini.utils import create_head_pose
r = ReachyMini(media_backend="no_media")
r.goto_target(create_head_pose(), antennas=[left_rest, right_rest], duration=1.0)
time.sleep(1.5)
print(f"robot: rest position β€” left={left_rest}, right={right_rest}", flush=True)
print(f"robot: will collide at times: {collision_times}", flush=True)
DT = 0.02
total_duration = max(collision_times) + hold_duration + 1.0
n_steps = int(total_duration / DT)
left_targets = np.full(n_steps, left_rest, dtype=np.float64)
for ct in collision_times:
start_step = int(ct / DT)
end_step = int((ct + hold_duration) / DT)
end_step = min(end_step, n_steps)
left_targets[start_step:end_step] = left_collision
timestamps = []
left_present = []
right_present = []
left_target_log = []
print("robot: MARK_START", flush=True)
t0 = time.monotonic()
for i in range(n_steps):
left = float(left_targets[i])
r.set_target(
head=np.eye(4),
body_yaw=0.0,
antennas=np.array([left, right_rest]),
)
pos = r.get_present_antenna_joint_positions()
elapsed = time.monotonic() - t0
timestamps.append(elapsed)
left_present.append(pos[0])
right_present.append(pos[1])
left_target_log.append(left)
target_time = (i + 1) * DT
now = time.monotonic() - t0
if target_time > now:
time.sleep(target_time - now)
elapsed = time.monotonic() - t0
print(f"robot: finished {n_steps} steps in {elapsed:.3f}s", flush=True)
results = {
"collision_times": collision_times,
"left_collision_target": left_collision,
"right_rest": right_rest,
"hold_duration": hold_duration,
"timestamps": timestamps,
"left_present": left_present,
"right_present": right_present,
"left_target": left_target_log,
}
with open(results_path, "w") as f:
json.dump(results, f)
print(f"robot: saved {len(timestamps)} samples to {results_path}", flush=True)
r.goto_target(create_head_pose(), antennas=[left_rest, right_rest], duration=1.0)
time.sleep(1.5)
print("robot: done", flush=True)
os._exit(0)
"""
def scp_to_robot(local_path: Path, remote_path: str, host: str) -> None:
target = f"{ROBOT_USER}@{host}:{remote_path}"
result = subprocess.run(
["scp", "-o", "ConnectTimeout=5", str(local_path), target],
capture_output=True, text=True, timeout=15,
)
if result.returncode != 0:
raise RuntimeError(f"SCP failed: {result.stderr}")
print(f" Copied to {target}")
def scp_from_robot(remote_path: str, local_path: Path, host: str) -> None:
source = f"{ROBOT_USER}@{host}:{remote_path}"
result = subprocess.run(
["scp", "-o", "ConnectTimeout=5", source, str(local_path)],
capture_output=True, text=True, timeout=15,
)
if result.returncode != 0:
raise RuntimeError(f"SCP failed: {result.stderr}")
print(f" Copied from {source}")
def start_robot(host: str) -> subprocess.Popen:
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(ROBOT_COLLISION_SCRIPT)
local_script = Path(f.name)
remote_script = "/tmp/collision_mic_test.py"
try:
scp_to_robot(local_script, remote_script, host)
finally:
local_script.unlink()
args_str = (
f"{ROBOT_PYTHON} {remote_script} "
f"'{json.dumps(COLLISION_TIMES)}' "
f"{RIGHT_REST} {LEFT_REST} {LEFT_COLLISION} {HOLD_DURATION} "
f"{REMOTE_RESULTS}"
)
proc = subprocess.Popen(
["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{host}", args_str],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
)
return proc
def plot_combined(
mic_audio: np.ndarray,
mic_sr: int,
mic_start: float,
mark_start: float,
robot_data: dict,
detected_mic_times: list[float],
output_path: Path,
) -> None:
"""Plot mic waveform + robot trajectory on aligned time axes."""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
# Time axis for mic (in "mic seconds since mic_start")
mic_t = np.arange(len(mic_audio)) / mic_sr
# Offset to align robot clock with mic clock:
# robot t=0 happened at laptop time mark_start
# mic t=0 happened at laptop time mic_start
# So robot t=X is at mic_t = (mark_start - mic_start) + X
robot_offset = mark_start - mic_start
robot_ts = np.array(robot_data["timestamps"])
left_pos = np.array(robot_data["left_present"])
right_pos = np.array(robot_data["right_present"])
left_tgt = np.array(robot_data["left_target"])
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), sharex=True)
# --- Top: Mic waveform ---
ax1.plot(mic_t, mic_audio, "k-", linewidth=0.3, alpha=0.6)
ax1.set_ylabel("Mic amplitude")
ax1.set_title("Laptop Mic Recording + Detected Collision Transients")
ax1.grid(True, alpha=0.3)
# Expected collision times in mic coordinates
for i, ct in enumerate(COLLISION_TIMES):
mic_expected = robot_offset + ct
label = "Expected (cmd time)" if i == 0 else None
ax1.axvline(mic_expected, color="green", linestyle="--", linewidth=1.2, alpha=0.7, label=label)
# Detected transients
for i, dt in enumerate(detected_mic_times):
label = "Detected transient" if i == 0 else None
ax1.axvline(dt, color="red", linestyle="-", linewidth=1.2, alpha=0.7, label=label)
ax1.legend(loc="upper right", fontsize=9)
# --- Bottom: Robot trajectory ---
# Plot in mic time coordinates
ax2.plot(robot_ts + robot_offset, left_pos, "b-", linewidth=1.5, label="Left antenna (present)")
ax2.plot(robot_ts + robot_offset, right_pos, "r-", linewidth=1.5, label="Right antenna (present)")
ax2.plot(robot_ts + robot_offset, left_tgt, "b--", linewidth=0.8, alpha=0.4, label="Left antenna (target)")
for i, ct in enumerate(COLLISION_TIMES):
mic_expected = robot_offset + ct
label = "Expected (cmd time)" if i == 0 else None
ax2.axvline(mic_expected, color="green", linestyle="--", linewidth=1.2, alpha=0.7, label=label)
for i, dt in enumerate(detected_mic_times):
label = "Detected transient" if i == 0 else None
ax2.axvline(dt, color="red", linestyle="-", linewidth=1.2, alpha=0.7, label=label)
ax2.set_xlabel("Time since mic start (s)")
ax2.set_ylabel("Position (rad)")
ax2.set_title("Robot Antenna Trajectory (aligned to mic clock)")
ax2.legend(loc="upper right", fontsize=9)
ax2.grid(True, alpha=0.3)
# Zoom to active region
active_start = robot_offset - 0.5
active_end = robot_offset + max(COLLISION_TIMES) + 2.0
ax1.set_xlim(active_start, active_end)
fig.tight_layout()
fig.savefig(str(output_path), dpi=150)
plt.close(fig)
print(f" Plot saved to {output_path}")
def main():
parser = argparse.ArgumentParser(description="Collision + mic delay measurement")
parser.add_argument("--host", default="reachy-mini.local")
args = parser.parse_args()
print(f"\n{'='*60}")
print("Collision + Mic Delay Measurement")
print(f"{'='*60}")
print(f" Collision times: {COLLISION_TIMES}")
print(f" Mic duration: {MIC_DURATION}s at {LAPTOP_SR}Hz\n")
# Step 1: Stop running apps
print("[1/5] Stopping any running app...")
subprocess.run(
["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}",
"curl -sf -X POST http://127.0.0.1:8000/api/apps/stop-current-app >/dev/null 2>&1 || true"],
capture_output=True, timeout=10,
)
time.sleep(1)
# Step 2: Start mic recording
print(f"[2/5] Starting mic recording ({MIC_DURATION}s)...")
mic_start = time.monotonic()
mic_data = sd.rec(
int(MIC_DURATION * LAPTOP_SR),
samplerate=LAPTOP_SR, channels=1, dtype="float32",
)
# Step 3: Start robot collision sequence
time.sleep(0.3) # let mic stabilize
print("[3/5] Starting collision sequence on robot...")
proc = start_robot(args.host)
# Read stdout, capture MARK_START timestamp
mark_start = None
print("\n--- Robot output ---")
for line in iter(proc.stdout.readline, ""):
line = line.rstrip()
if not line:
continue
laptop_time = time.monotonic()
print(f" {line}")
if "MARK_START" in line:
mark_start = laptop_time
proc.wait()
print("--- End robot output ---")
# Wait for mic recording to finish
sd.wait()
captured = mic_data.flatten()
mic_end = time.monotonic()
print(f"\n Mic recording done ({mic_end - mic_start:.1f}s)")
if mark_start is None:
print("\nFAILED: Never received MARK_START from robot")
return 1
robot_offset = mark_start - mic_start
print(f" MARK_START received at mic_t={robot_offset:.3f}s")
# Save mic audio
mic_path = Path("tests/collision_mic.wav")
sf.write(str(mic_path), captured, LAPTOP_SR)
print(f" Saved mic audio to {mic_path}")
# Step 4: Fetch robot position data
print("\n[4/5] Fetching position data from robot...")
local_results = Path("tests/collision_positions.json")
scp_from_robot(REMOTE_RESULTS, local_results, args.host)
with open(local_results) as f:
robot_data = json.load(f)
# Step 5: Analyze
print("\n[5/5] Analyzing...")
# Detect transient sounds in mic recording
detected_mic = detect_transient_onsets(captured, LAPTOP_SR, highpass_freq=2000.0)
print(f" Detected {len(detected_mic)} transients in mic at: "
f"{[f'{t:.3f}' for t in detected_mic]}")
# Expected collision times in mic coordinates
expected_mic = [robot_offset + ct for ct in COLLISION_TIMES]
print(f" Expected collision sounds at mic_t: "
f"{[f'{t:.3f}' for t in expected_mic]}")
# Match detected transients to expected collision times
print(f"\n{'='*60}")
print("Delay Analysis: Command β†’ Audible Collision")
print(f"{'='*60}")
matched = 0
delays = []
for i, (et_mic, ct_robot) in enumerate(zip(expected_mic, COLLISION_TIMES)):
if not detected_mic:
print(f" Collision {i+1} (cmd t={ct_robot:.1f}s): NO TRANSIENT DETECTED")
continue
nearest = min(detected_mic, key=lambda d: abs(d - et_mic))
delay_ms = (nearest - et_mic) * 1000
ok = abs(nearest - et_mic) < 0.5 # 500ms tolerance
if ok:
matched += 1
delays.append(delay_ms)
status = "OK" if ok else "MISS"
print(f" Collision {i+1} (cmd t={ct_robot:.1f}s): "
f"expected mic_t={et_mic:.3f}s, detected={nearest:.3f}s, "
f"delay={delay_ms:+.0f}ms [{status}]")
if delays:
print(f"\n Matched: {matched}/{len(COLLISION_TIMES)}")
print(f" Mean delay: {np.mean(delays):+.0f}ms")
print(f" Std delay: {np.std(delays):.0f}ms")
print(f" Min/Max: {min(delays):+.0f}ms / {max(delays):+.0f}ms")
print(f"\n (Positive = sound arrived AFTER command)")
else:
print(f"\n No delays measured β€” no transients matched")
# Generate combined plot
plot_path = Path("tests/collision_mic_plot.png")
plot_combined(
captured, LAPTOP_SR, mic_start, mark_start,
robot_data, detected_mic, plot_path,
)
success = matched >= len(COLLISION_TIMES) - 1
print(f"\n{'='*60}")
if success:
print("RESULT: PASS β€” Collision sounds detected and matched")
else:
print("RESULT: FAIL β€” Could not reliably detect collision sounds")
print(f"{'='*60}\n")
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())