Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Measure laptop-to-collision delay using antenna collisions + laptop mic. | |
| Combines the collision test (robot-side position recording) with laptop mic | |
| recording to measure the full end-to-end delay from the laptop issuing a | |
| command to the collision sound being captured. | |
| Delay chain measured: | |
| laptop SSH command β robot receives β daemon β motor β physical collision | |
| β sound through air β laptop mic β detected | |
| Clock sync: Robot prints "MARK_START" to stdout. The laptop notes the time | |
| it receives this line. The robot's t=0 maps to that laptop timestamp (with | |
| ~few ms SSH stdout latency). | |
| Usage: | |
| python tests/test_collision_mic.py [--host reachy-mini.local] | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| import sounddevice as sd | |
| import soundfile as sf | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| from audio_analysis import detect_transient_onsets | |
| # Collision parameters (same as test_antenna_collision.py) | |
| RIGHT_REST = -0.68 | |
| LEFT_REST = -0.1745 | |
| LEFT_COLLISION = 0.70 | |
| HOLD_DURATION = 0.2 | |
| COLLISION_TIMES = [1.0, 2.3, 4.0, 6.3, 9.4] | |
| ROBOT_USER = "pollen" | |
| ROBOT_PYTHON = "/venvs/apps_venv/bin/python" | |
| REMOTE_RESULTS = "/tmp/collision_positions.json" | |
| LAPTOP_SR = 48000 | |
| MIC_DURATION = 30.0 # seconds β covers ~8s SSH startup + 10.5s sequence + buffer | |
| # Robot script: same as test_antenna_collision.py | |
| ROBOT_COLLISION_SCRIPT = """\ | |
| import numpy as np | |
| import os, sys, time, json | |
| collision_times = json.loads(sys.argv[1]) | |
| right_rest = float(sys.argv[2]) | |
| left_rest = float(sys.argv[3]) | |
| left_collision = float(sys.argv[4]) | |
| hold_duration = float(sys.argv[5]) | |
| results_path = sys.argv[6] | |
| print(f"robot: connecting to ReachyMini", flush=True) | |
| from reachy_mini import ReachyMini | |
| from reachy_mini.utils import create_head_pose | |
| r = ReachyMini(media_backend="no_media") | |
| r.goto_target(create_head_pose(), antennas=[left_rest, right_rest], duration=1.0) | |
| time.sleep(1.5) | |
| print(f"robot: rest position β left={left_rest}, right={right_rest}", flush=True) | |
| print(f"robot: will collide at times: {collision_times}", flush=True) | |
| DT = 0.02 | |
| total_duration = max(collision_times) + hold_duration + 1.0 | |
| n_steps = int(total_duration / DT) | |
| left_targets = np.full(n_steps, left_rest, dtype=np.float64) | |
| for ct in collision_times: | |
| start_step = int(ct / DT) | |
| end_step = int((ct + hold_duration) / DT) | |
| end_step = min(end_step, n_steps) | |
| left_targets[start_step:end_step] = left_collision | |
| timestamps = [] | |
| left_present = [] | |
| right_present = [] | |
| left_target_log = [] | |
| print("robot: MARK_START", flush=True) | |
| t0 = time.monotonic() | |
| for i in range(n_steps): | |
| left = float(left_targets[i]) | |
| r.set_target( | |
| head=np.eye(4), | |
| body_yaw=0.0, | |
| antennas=np.array([left, right_rest]), | |
| ) | |
| pos = r.get_present_antenna_joint_positions() | |
| elapsed = time.monotonic() - t0 | |
| timestamps.append(elapsed) | |
| left_present.append(pos[0]) | |
| right_present.append(pos[1]) | |
| left_target_log.append(left) | |
| target_time = (i + 1) * DT | |
| now = time.monotonic() - t0 | |
| if target_time > now: | |
| time.sleep(target_time - now) | |
| elapsed = time.monotonic() - t0 | |
| print(f"robot: finished {n_steps} steps in {elapsed:.3f}s", flush=True) | |
| results = { | |
| "collision_times": collision_times, | |
| "left_collision_target": left_collision, | |
| "right_rest": right_rest, | |
| "hold_duration": hold_duration, | |
| "timestamps": timestamps, | |
| "left_present": left_present, | |
| "right_present": right_present, | |
| "left_target": left_target_log, | |
| } | |
| with open(results_path, "w") as f: | |
| json.dump(results, f) | |
| print(f"robot: saved {len(timestamps)} samples to {results_path}", flush=True) | |
| r.goto_target(create_head_pose(), antennas=[left_rest, right_rest], duration=1.0) | |
| time.sleep(1.5) | |
| print("robot: done", flush=True) | |
| os._exit(0) | |
| """ | |
| def scp_to_robot(local_path: Path, remote_path: str, host: str) -> None: | |
| target = f"{ROBOT_USER}@{host}:{remote_path}" | |
| result = subprocess.run( | |
| ["scp", "-o", "ConnectTimeout=5", str(local_path), target], | |
| capture_output=True, text=True, timeout=15, | |
| ) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"SCP failed: {result.stderr}") | |
| print(f" Copied to {target}") | |
| def scp_from_robot(remote_path: str, local_path: Path, host: str) -> None: | |
| source = f"{ROBOT_USER}@{host}:{remote_path}" | |
| result = subprocess.run( | |
| ["scp", "-o", "ConnectTimeout=5", source, str(local_path)], | |
| capture_output=True, text=True, timeout=15, | |
| ) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"SCP failed: {result.stderr}") | |
| print(f" Copied from {source}") | |
| def start_robot(host: str) -> subprocess.Popen: | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: | |
| f.write(ROBOT_COLLISION_SCRIPT) | |
| local_script = Path(f.name) | |
| remote_script = "/tmp/collision_mic_test.py" | |
| try: | |
| scp_to_robot(local_script, remote_script, host) | |
| finally: | |
| local_script.unlink() | |
| args_str = ( | |
| f"{ROBOT_PYTHON} {remote_script} " | |
| f"'{json.dumps(COLLISION_TIMES)}' " | |
| f"{RIGHT_REST} {LEFT_REST} {LEFT_COLLISION} {HOLD_DURATION} " | |
| f"{REMOTE_RESULTS}" | |
| ) | |
| proc = subprocess.Popen( | |
| ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{host}", args_str], | |
| stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, | |
| ) | |
| return proc | |
| def plot_combined( | |
| mic_audio: np.ndarray, | |
| mic_sr: int, | |
| mic_start: float, | |
| mark_start: float, | |
| robot_data: dict, | |
| detected_mic_times: list[float], | |
| output_path: Path, | |
| ) -> None: | |
| """Plot mic waveform + robot trajectory on aligned time axes.""" | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| # Time axis for mic (in "mic seconds since mic_start") | |
| mic_t = np.arange(len(mic_audio)) / mic_sr | |
| # Offset to align robot clock with mic clock: | |
| # robot t=0 happened at laptop time mark_start | |
| # mic t=0 happened at laptop time mic_start | |
| # So robot t=X is at mic_t = (mark_start - mic_start) + X | |
| robot_offset = mark_start - mic_start | |
| robot_ts = np.array(robot_data["timestamps"]) | |
| left_pos = np.array(robot_data["left_present"]) | |
| right_pos = np.array(robot_data["right_present"]) | |
| left_tgt = np.array(robot_data["left_target"]) | |
| fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), sharex=True) | |
| # --- Top: Mic waveform --- | |
| ax1.plot(mic_t, mic_audio, "k-", linewidth=0.3, alpha=0.6) | |
| ax1.set_ylabel("Mic amplitude") | |
| ax1.set_title("Laptop Mic Recording + Detected Collision Transients") | |
| ax1.grid(True, alpha=0.3) | |
| # Expected collision times in mic coordinates | |
| for i, ct in enumerate(COLLISION_TIMES): | |
| mic_expected = robot_offset + ct | |
| label = "Expected (cmd time)" if i == 0 else None | |
| ax1.axvline(mic_expected, color="green", linestyle="--", linewidth=1.2, alpha=0.7, label=label) | |
| # Detected transients | |
| for i, dt in enumerate(detected_mic_times): | |
| label = "Detected transient" if i == 0 else None | |
| ax1.axvline(dt, color="red", linestyle="-", linewidth=1.2, alpha=0.7, label=label) | |
| ax1.legend(loc="upper right", fontsize=9) | |
| # --- Bottom: Robot trajectory --- | |
| # Plot in mic time coordinates | |
| ax2.plot(robot_ts + robot_offset, left_pos, "b-", linewidth=1.5, label="Left antenna (present)") | |
| ax2.plot(robot_ts + robot_offset, right_pos, "r-", linewidth=1.5, label="Right antenna (present)") | |
| ax2.plot(robot_ts + robot_offset, left_tgt, "b--", linewidth=0.8, alpha=0.4, label="Left antenna (target)") | |
| for i, ct in enumerate(COLLISION_TIMES): | |
| mic_expected = robot_offset + ct | |
| label = "Expected (cmd time)" if i == 0 else None | |
| ax2.axvline(mic_expected, color="green", linestyle="--", linewidth=1.2, alpha=0.7, label=label) | |
| for i, dt in enumerate(detected_mic_times): | |
| label = "Detected transient" if i == 0 else None | |
| ax2.axvline(dt, color="red", linestyle="-", linewidth=1.2, alpha=0.7, label=label) | |
| ax2.set_xlabel("Time since mic start (s)") | |
| ax2.set_ylabel("Position (rad)") | |
| ax2.set_title("Robot Antenna Trajectory (aligned to mic clock)") | |
| ax2.legend(loc="upper right", fontsize=9) | |
| ax2.grid(True, alpha=0.3) | |
| # Zoom to active region | |
| active_start = robot_offset - 0.5 | |
| active_end = robot_offset + max(COLLISION_TIMES) + 2.0 | |
| ax1.set_xlim(active_start, active_end) | |
| fig.tight_layout() | |
| fig.savefig(str(output_path), dpi=150) | |
| plt.close(fig) | |
| print(f" Plot saved to {output_path}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Collision + mic delay measurement") | |
| parser.add_argument("--host", default="reachy-mini.local") | |
| args = parser.parse_args() | |
| print(f"\n{'='*60}") | |
| print("Collision + Mic Delay Measurement") | |
| print(f"{'='*60}") | |
| print(f" Collision times: {COLLISION_TIMES}") | |
| print(f" Mic duration: {MIC_DURATION}s at {LAPTOP_SR}Hz\n") | |
| # Step 1: Stop running apps | |
| print("[1/5] Stopping any running app...") | |
| subprocess.run( | |
| ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}", | |
| "curl -sf -X POST http://127.0.0.1:8000/api/apps/stop-current-app >/dev/null 2>&1 || true"], | |
| capture_output=True, timeout=10, | |
| ) | |
| time.sleep(1) | |
| # Step 2: Start mic recording | |
| print(f"[2/5] Starting mic recording ({MIC_DURATION}s)...") | |
| mic_start = time.monotonic() | |
| mic_data = sd.rec( | |
| int(MIC_DURATION * LAPTOP_SR), | |
| samplerate=LAPTOP_SR, channels=1, dtype="float32", | |
| ) | |
| # Step 3: Start robot collision sequence | |
| time.sleep(0.3) # let mic stabilize | |
| print("[3/5] Starting collision sequence on robot...") | |
| proc = start_robot(args.host) | |
| # Read stdout, capture MARK_START timestamp | |
| mark_start = None | |
| print("\n--- Robot output ---") | |
| for line in iter(proc.stdout.readline, ""): | |
| line = line.rstrip() | |
| if not line: | |
| continue | |
| laptop_time = time.monotonic() | |
| print(f" {line}") | |
| if "MARK_START" in line: | |
| mark_start = laptop_time | |
| proc.wait() | |
| print("--- End robot output ---") | |
| # Wait for mic recording to finish | |
| sd.wait() | |
| captured = mic_data.flatten() | |
| mic_end = time.monotonic() | |
| print(f"\n Mic recording done ({mic_end - mic_start:.1f}s)") | |
| if mark_start is None: | |
| print("\nFAILED: Never received MARK_START from robot") | |
| return 1 | |
| robot_offset = mark_start - mic_start | |
| print(f" MARK_START received at mic_t={robot_offset:.3f}s") | |
| # Save mic audio | |
| mic_path = Path("tests/collision_mic.wav") | |
| sf.write(str(mic_path), captured, LAPTOP_SR) | |
| print(f" Saved mic audio to {mic_path}") | |
| # Step 4: Fetch robot position data | |
| print("\n[4/5] Fetching position data from robot...") | |
| local_results = Path("tests/collision_positions.json") | |
| scp_from_robot(REMOTE_RESULTS, local_results, args.host) | |
| with open(local_results) as f: | |
| robot_data = json.load(f) | |
| # Step 5: Analyze | |
| print("\n[5/5] Analyzing...") | |
| # Detect transient sounds in mic recording | |
| detected_mic = detect_transient_onsets(captured, LAPTOP_SR, highpass_freq=2000.0) | |
| print(f" Detected {len(detected_mic)} transients in mic at: " | |
| f"{[f'{t:.3f}' for t in detected_mic]}") | |
| # Expected collision times in mic coordinates | |
| expected_mic = [robot_offset + ct for ct in COLLISION_TIMES] | |
| print(f" Expected collision sounds at mic_t: " | |
| f"{[f'{t:.3f}' for t in expected_mic]}") | |
| # Match detected transients to expected collision times | |
| print(f"\n{'='*60}") | |
| print("Delay Analysis: Command β Audible Collision") | |
| print(f"{'='*60}") | |
| matched = 0 | |
| delays = [] | |
| for i, (et_mic, ct_robot) in enumerate(zip(expected_mic, COLLISION_TIMES)): | |
| if not detected_mic: | |
| print(f" Collision {i+1} (cmd t={ct_robot:.1f}s): NO TRANSIENT DETECTED") | |
| continue | |
| nearest = min(detected_mic, key=lambda d: abs(d - et_mic)) | |
| delay_ms = (nearest - et_mic) * 1000 | |
| ok = abs(nearest - et_mic) < 0.5 # 500ms tolerance | |
| if ok: | |
| matched += 1 | |
| delays.append(delay_ms) | |
| status = "OK" if ok else "MISS" | |
| print(f" Collision {i+1} (cmd t={ct_robot:.1f}s): " | |
| f"expected mic_t={et_mic:.3f}s, detected={nearest:.3f}s, " | |
| f"delay={delay_ms:+.0f}ms [{status}]") | |
| if delays: | |
| print(f"\n Matched: {matched}/{len(COLLISION_TIMES)}") | |
| print(f" Mean delay: {np.mean(delays):+.0f}ms") | |
| print(f" Std delay: {np.std(delays):.0f}ms") | |
| print(f" Min/Max: {min(delays):+.0f}ms / {max(delays):+.0f}ms") | |
| print(f"\n (Positive = sound arrived AFTER command)") | |
| else: | |
| print(f"\n No delays measured β no transients matched") | |
| # Generate combined plot | |
| plot_path = Path("tests/collision_mic_plot.png") | |
| plot_combined( | |
| captured, LAPTOP_SR, mic_start, mark_start, | |
| robot_data, detected_mic, plot_path, | |
| ) | |
| success = matched >= len(COLLISION_TIMES) - 1 | |
| print(f"\n{'='*60}") | |
| if success: | |
| print("RESULT: PASS β Collision sounds detected and matched") | |
| else: | |
| print("RESULT: FAIL β Could not reliably detect collision sounds") | |
| print(f"{'='*60}\n") | |
| return 0 if success else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |