RemiFabre commited on
Commit
2b1805b
·
1 Parent(s): fa08634

tools: robot-clap hardware test + live-detector replay harness

Browse files

robot_clap_test.py drives the antennas on the wire schedule (local Lite
daemon), records the mic, decodes; replay_streaming.mjs feeds a saved recording
through the app's exact StreamingOnsetDetector + wire decoder for offline tuning.

tools/replay_streaming.mjs ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ /**
2
+ * Replay a recorded float32 mono buffer through the app's EXACT live decode
3
+ * path: a Web Audio BiquadFilter highpass is emulated here, then blocks are
4
+ * fed to StreamingOnsetDetector (lib/detector.js) just like mic.js does, and
5
+ * onsets are decoded with wire.js. This validates the real live path against
6
+ * real robot-clap audio (no browser, no robot).
7
+ *
8
+ * node tools/replay_streaming.mjs /tmp/robot_clap.f32 --sr 48000 --unit 150 \
9
+ * --expect SOS --highpass 2000 --factor 4
10
+ */
11
+ import { readFileSync } from "node:fs";
12
+ import { StreamingOnsetDetector, highpass } from "../lib/detector.js";
13
+ import { decodeOnsets } from "../lib/wire.js";
14
+ import { makeTiming } from "../lib/timing.js";
15
+
16
+ const args = process.argv.slice(2);
17
+ const path = args[0];
18
+ const opt = (k, d) => { const i = args.indexOf(`--${k}`); return i >= 0 ? args[i + 1] : d; };
19
+ const sr = parseInt(opt("sr", "48000"), 10);
20
+ const unit = parseInt(opt("unit", "150"), 10);
21
+ const expect = (opt("expect", "") || "").toUpperCase();
22
+ const hp = parseFloat(opt("highpass", "2000"));
23
+ const factor = parseFloat(opt("factor", "4"));
24
+ const refractory = parseFloat(opt("refractory", "120"));
25
+
26
+ const buf = readFileSync(path);
27
+ const samples = new Float32Array(buf.buffer, buf.byteOffset, Math.floor(buf.byteLength / 4));
28
+
29
+ // The app applies the highpass via a Web Audio BiquadFilterNode upstream of
30
+ // the ScriptProcessor. Emulate that by pre-filtering, then stream blocks.
31
+ const filtered = highpass(samples, sr, hp);
32
+
33
+ const onsets = [];
34
+ const det = new StreamingOnsetDetector({
35
+ sampleRate: sr,
36
+ thresholdFactor: factor,
37
+ refractoryMs: refractory,
38
+ onOnset: (t) => onsets.push(t),
39
+ });
40
+
41
+ const BLOCK = 2048;
42
+ for (let i = 0; i < filtered.length; i += BLOCK) {
43
+ const block = filtered.subarray(i, Math.min(i + BLOCK, filtered.length));
44
+ det.process(block, (i / sr) * 1000);
45
+ }
46
+
47
+ const t = makeTiming(unit);
48
+ const { morse, text } = decodeOnsets(onsets, t);
49
+ console.log(`samples=${samples.length} (${(samples.length / sr).toFixed(1)}s)`);
50
+ console.log(`onsets detected (live streaming path): ${onsets.length}`);
51
+ console.log(`onset times (ms): ${onsets.map((x) => Math.round(x)).join(", ")}`);
52
+ // inter-onset intervals — reveals rebounds (tiny gaps) vs real spacing
53
+ console.log(`IOIs (ms): ${onsets.slice(1).map((x, i) => Math.round(x - onsets[i])).join(", ")}`);
54
+ console.log(`morse: ${morse}`);
55
+ console.log(`decoded: ${JSON.stringify(text)}`);
56
+ if (expect) {
57
+ const ok = text === expect;
58
+ console.log(`expected ${JSON.stringify(expect)} -> ${ok ? "OK" : "MISMATCH"}`);
59
+ process.exit(ok ? 0 : 2);
60
+ }
tools/robot_clap_test.py ADDED
@@ -0,0 +1,165 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Robot antenna-clap test for the Morse Code app (Lite / local daemon).
3
+
4
+ Drives the antennas to clap on the app's impulse wire-code schedule (same code
5
+ as lib/robot-tapper.js), records the laptop mic, runs the app's detector, and
6
+ decodes — confirming the clap is audible + detectable and letting us tune the
7
+ contact angle / hold / latency lead. Mirrors the proven, safe collision recipe
8
+ from marionette/tests/test_antenna_collision.py (low-PID antennas stall at
9
+ contact; gentle).
10
+
11
+ MOVES THE ROBOT and MAKES NOISE. Run only when ready.
12
+
13
+ /Users/remi/.virtualenvs/mini/bin/python tools/robot_clap_test.py --once
14
+ /Users/remi/.virtualenvs/mini/bin/python tools/robot_clap_test.py --phrase "SOS" --unit 150
15
+ """
16
+ from __future__ import annotations
17
+
18
+ import argparse
19
+ import math
20
+ import sys
21
+ import time
22
+ from pathlib import Path
23
+
24
+ import numpy as np
25
+ import requests
26
+ import sounddevice as sd
27
+
28
+ TOOLS = Path(__file__).resolve().parent
29
+ sys.path.insert(0, str(TOOLS))
30
+ from calibrate_audio import ( # noqa: E402
31
+ timing, text_to_onsets, onsets_to_text, detect_transient_onsets, SR,
32
+ )
33
+
34
+ DEG = math.pi / 180.0
35
+
36
+ # Defaults mirror lib/robot-tapper.js DEFAULT_TAP_PROFILE.
37
+ RIGHT_REST_DEG = -39.0
38
+ LEFT_REST_DEG = 0.0
39
+ COLLISION_DEG = 40.0
40
+ APPROACH_MS = 80.0
41
+ HOLD_MS = 50.0
42
+ RETURN_MS = 80.0
43
+ RATE_HZ = 50.0
44
+ LEAD_MS = 0.0
45
+
46
+
47
+ def left_angle_at(t_ms, onsets, collision, approach, hold, ret, rest):
48
+ for T in onsets:
49
+ a0, h_end, r_end = T - approach, T + hold, T + hold + ret
50
+ if t_ms < a0 or t_ms > r_end:
51
+ continue
52
+ if t_ms <= T:
53
+ f = (t_ms - a0) / approach if approach > 0 else 1.0
54
+ f = min(1.0, max(0.0, f))
55
+ return rest + (collision - rest) * (f * f)
56
+ if t_ms <= h_end:
57
+ return collision
58
+ f = (t_ms - h_end) / ret
59
+ f = min(1.0, max(0.0, f))
60
+ return collision + (rest - collision) * (1 - (1 - f) * (1 - f))
61
+ return rest
62
+
63
+
64
+ def stop_current_app():
65
+ try:
66
+ requests.post("http://127.0.0.1:8000/api/apps/stop-current-app", timeout=4)
67
+ except Exception:
68
+ pass
69
+
70
+
71
+ def run(args):
72
+ from reachy_mini import ReachyMini
73
+ from reachy_mini.utils import create_head_pose
74
+
75
+ t = timing(args.unit)
76
+ onsets = [0.0] if args.once else text_to_onsets(args.phrase, t)
77
+ onsets = [o - args.lead for o in onsets]
78
+ collision = args.collision
79
+ end_ms = (onsets[-1] if onsets else 0) + HOLD_MS + RETURN_MS + 300
80
+ rec_secs = (args.leadin_ms + end_ms + args.settle_ms + 1200) / 1000.0
81
+
82
+ print(f"Phrase {args.phrase!r} unit={args.unit}ms contact={collision}° "
83
+ f"hold={args.hold}ms lead={args.lead}ms")
84
+ print(f"Recording mic for {rec_secs:.1f}s; robot will clap {len(onsets)} time(s).")
85
+
86
+ stop_current_app()
87
+ time.sleep(0.5)
88
+
89
+ r = ReachyMini(media_backend="no_media")
90
+ try:
91
+ r.goto_target(create_head_pose(),
92
+ antennas=[LEFT_REST_DEG * DEG, RIGHT_REST_DEG * DEG], duration=1.0)
93
+ time.sleep(1.2)
94
+
95
+ rec = sd.rec(int(rec_secs * SR), samplerate=SR, channels=1, dtype="float32")
96
+ leadin = args.leadin_ms / 1000.0
97
+ time.sleep(leadin) # silence head so the first onset has a quiet window
98
+
99
+ period = max(0.01, 1.0 / RATE_HZ)
100
+ t0 = time.monotonic()
101
+ while True:
102
+ tm = (time.monotonic() - t0) * 1000.0
103
+ if tm > end_ms:
104
+ break
105
+ left = left_angle_at(tm, onsets, collision, APPROACH_MS, args.hold, RETURN_MS, LEFT_REST_DEG)
106
+ r.set_target(head=np.eye(4), body_yaw=0.0,
107
+ antennas=np.array([left * DEG, RIGHT_REST_DEG * DEG]))
108
+ nxt = t0 + (math.floor((tm / 1000.0) / period) + 1) * period
109
+ dt = nxt - time.monotonic()
110
+ if dt > 0:
111
+ time.sleep(dt)
112
+
113
+ # Let the final clap fully settle BEFORE any return motion — otherwise
114
+ # the return goto jerks the antenna and rings out extra clicks right
115
+ # after the last real onset (corrupting the last symbol).
116
+ time.sleep(args.settle_ms / 1000.0)
117
+ r.goto_target(create_head_pose(),
118
+ antennas=[LEFT_REST_DEG * DEG, RIGHT_REST_DEG * DEG], duration=0.8)
119
+ sd.wait()
120
+ finally:
121
+ try:
122
+ r.goto_target(create_head_pose(), antennas=[0.0, 0.0], duration=0.8)
123
+ time.sleep(1.0)
124
+ except Exception:
125
+ pass
126
+
127
+ audio = rec.flatten()
128
+ if args.save:
129
+ audio.astype(np.float32).tofile(args.save)
130
+ print(f"Saved raw float32 mono @{SR}Hz to {args.save} ({audio.size} samples)")
131
+ detected = detect_transient_onsets(audio, SR, highpass_freq=args.highpass,
132
+ threshold_db=20 * math.log10(args.ratio))
133
+ detected_ms = [d * 1000.0 for d in detected]
134
+ peak = float(np.max(np.abs(audio))) if audio.size else 0.0
135
+
136
+ print(f"\nMic peak amplitude: {peak:.3f} (want clearly > the room floor)")
137
+ print(f"Detected {len(detected_ms)} click(s); expected {len(onsets)}.")
138
+ if not args.once:
139
+ decoded = onsets_to_text(detected_ms, t)
140
+ ok = decoded == args.phrase.upper()
141
+ print(f"Decoded: {decoded!r} [{'OK' if ok else 'MISMATCH'}]")
142
+ return 0 if ok else 2
143
+ print("Single-clap check: a click should be clearly visible above the floor.")
144
+ return 0 if len(detected_ms) >= 1 else 2
145
+
146
+
147
+ def main():
148
+ ap = argparse.ArgumentParser(description="Robot antenna-clap test")
149
+ ap.add_argument("--once", action="store_true", help="one gentle clap (safety check)")
150
+ ap.add_argument("--phrase", default="SOS")
151
+ ap.add_argument("--unit", type=int, default=150)
152
+ ap.add_argument("--collision", type=float, default=COLLISION_DEG)
153
+ ap.add_argument("--hold", type=float, default=HOLD_MS)
154
+ ap.add_argument("--lead", type=float, default=LEAD_MS, help="advance timeline (ms)")
155
+ ap.add_argument("--leadin-ms", type=float, default=600.0)
156
+ ap.add_argument("--settle-ms", type=float, default=600.0,
157
+ help="quiet settle after the last clap before returning to rest")
158
+ ap.add_argument("--highpass", type=float, default=2000.0)
159
+ ap.add_argument("--ratio", type=float, default=0.1)
160
+ ap.add_argument("--save", default=None, help="save raw float32 mono recording to this path")
161
+ return run(ap.parse_args())
162
+
163
+
164
+ if __name__ == "__main__":
165
+ sys.exit(main())