tomrikert commited on
Commit
375b02a
·
1 Parent(s): 1607ad8

Switch OpenClaw bridge from HTTP to WebSocket protocol

Browse files

The OpenClaw gateway speaks WebSocket natively, not HTTP REST. The old
HTTP-based bridge (httpx POST to /v1/chat/completions) would connect but
hang forever waiting for a response.

Rewrite openclaw_bridge.py to use the gateway's WebSocket protocol:
- Connect handshake with challenge/auth/scopes
- Persistent connection with background listener task
- chat.send with streaming event collection (agent deltas, lifecycle)
- Proper disconnect cleanup on shutdown

Also includes: room-scanning when no face detected, improved face
tracking EMA tuning, and thinking animation offsets in movement system.

pyproject.toml CHANGED
@@ -43,9 +43,7 @@ dependencies = [
43
  "numpy",
44
  "scipy",
45
 
46
- # OpenClaw gateway client
47
- "httpx>=0.27.0",
48
- "httpx-sse>=0.4.0",
49
  "websockets>=12.0",
50
 
51
  # Gradio UI
 
43
  "numpy",
44
  "scipy",
45
 
46
+ # OpenClaw gateway client (WebSocket protocol)
 
 
47
  "websockets>=12.0",
48
 
49
  # Gradio UI
src/reachy_mini_openclaw/camera_worker.py CHANGED
@@ -3,6 +3,7 @@
3
  Provides:
4
  - 30Hz+ camera polling with thread-safe frame buffering
5
  - Face tracking integration with smooth interpolation
 
6
  - Latest frame always available for tools
7
  - Smooth return to neutral when face is lost
8
 
@@ -26,7 +27,14 @@ logger = logging.getLogger(__name__)
26
 
27
 
28
  class CameraWorker:
29
- """Thread-safe camera worker with frame buffering and face tracking."""
 
 
 
 
 
 
 
30
 
31
  def __init__(self, reachy_mini: ReachyMini, head_tracker: Any = None) -> None:
32
  """Initialize camera worker.
@@ -61,15 +69,29 @@ class CameraWorker:
61
  # Track state changes
62
  self.previous_head_tracking_state = self.is_head_tracking_enabled
63
 
64
- # Tracking scale factor (adjust responsiveness)
65
- self.tracking_scale = 0.6 # Scale down movements for smoother tracking
 
 
66
 
67
  # Smoothing factor for exponential moving average (0.0-1.0)
68
- # Lower = smoother but slower response, Higher = faster but more jitter
69
- self.smoothing_alpha = 0.15 # Smooth out jitter from detection noise
 
70
 
71
  # Previous smoothed offsets for EMA calculation
72
  self._smoothed_offsets: List[float] = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
 
 
 
 
 
 
 
 
 
 
 
73
 
74
  def get_latest_frame(self) -> Optional[NDArray[np.uint8]]:
75
  """Get the latest frame (thread-safe).
@@ -100,8 +122,13 @@ class CameraWorker:
100
  Args:
101
  enabled: Whether to enable face tracking
102
  """
 
 
 
 
 
103
  self.is_head_tracking_enabled = enabled
104
- logger.info(f"Head tracking {'enabled' if enabled else 'disabled'}")
105
 
106
  def start(self) -> None:
107
  """Start the camera worker loop in a thread."""
@@ -117,6 +144,41 @@ class CameraWorker:
117
  self._thread.join(timeout=2.0)
118
  logger.info("Camera worker stopped")
119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  def _working_loop(self) -> None:
121
  """Main camera worker loop.
122
 
@@ -127,6 +189,10 @@ class CameraWorker:
127
  # Neutral pose for interpolation target
128
  neutral_pose = np.eye(4, dtype=np.float32)
129
  self.previous_head_tracking_state = self.is_head_tracking_enabled
 
 
 
 
130
 
131
  while not self._stop_event.is_set():
132
  try:
@@ -146,6 +212,7 @@ class CameraWorker:
146
  self.last_face_detected_time = current_time
147
  self.interpolation_start_time = None
148
  self.interpolation_start_pose = None
 
149
 
150
  # Update tracking state
151
  self.previous_head_tracking_state = self.is_head_tracking_enabled
@@ -161,7 +228,7 @@ class CameraWorker:
161
  time.sleep(0.04)
162
 
163
  except Exception as e:
164
- logger.error(f"Camera worker error: {e}")
165
  time.sleep(0.1)
166
 
167
  logger.debug("Camera worker thread exited")
@@ -182,7 +249,18 @@ class CameraWorker:
182
  eye_center, _ = self.head_tracker.get_head_position(frame)
183
 
184
  if eye_center is not None:
185
- # Face detected - immediately switch to tracking
 
 
 
 
 
 
 
 
 
 
 
186
  self.last_face_detected_time = current_time
187
  self.interpolation_start_time = None # Stop any interpolation
188
 
@@ -206,7 +284,7 @@ class CameraWorker:
206
  translation = target_pose[:3, 3]
207
  rotation = R.from_matrix(target_pose[:3, :3]).as_euler("xyz", degrees=False)
208
 
209
- # Scale down for smoother tracking
210
  translation *= self.tracking_scale
211
  rotation *= self.tracking_scale
212
 
@@ -229,8 +307,13 @@ class CameraWorker:
229
  self.face_tracking_offsets = smoothed
230
 
231
  else:
232
- # No face detected - handle smooth interpolation back to neutral
233
- self._interpolate_to_neutral(current_time, neutral_pose)
 
 
 
 
 
234
 
235
  def _interpolate_to_neutral(
236
  self,
@@ -239,11 +322,15 @@ class CameraWorker:
239
  ) -> None:
240
  """Interpolate face tracking offsets back to neutral when face is lost.
241
 
 
 
242
  Args:
243
  current_time: Current timestamp
244
  neutral_pose: Target neutral pose matrix
245
  """
246
  if self.last_face_detected_time is None:
 
 
247
  return
248
 
249
  time_since_face_lost = current_time - self.last_face_detected_time
@@ -286,8 +373,10 @@ class CameraWorker:
286
  rotation[0], rotation[1], rotation[2],
287
  ]
288
 
289
- # If interpolation is complete, reset timing
290
  if t >= 1.0:
291
  self.last_face_detected_time = None
292
  self.interpolation_start_time = None
293
  self.interpolation_start_pose = None
 
 
 
3
  Provides:
4
  - 30Hz+ camera polling with thread-safe frame buffering
5
  - Face tracking integration with smooth interpolation
6
+ - Room scanning when no face is detected
7
  - Latest frame always available for tools
8
  - Smooth return to neutral when face is lost
9
 
 
27
 
28
 
29
  class CameraWorker:
30
+ """Thread-safe camera worker with frame buffering and face tracking.
31
+
32
+ State machine for face tracking:
33
+ SCANNING -- no face known, sweeping the room to find one
34
+ TRACKING -- face detected, following it with head offsets
35
+ WAITING -- face just lost, holding position briefly
36
+ RETURNING -- interpolating back to neutral before scanning again
37
+ """
38
 
39
  def __init__(self, reachy_mini: ReachyMini, head_tracker: Any = None) -> None:
40
  """Initialize camera worker.
 
69
  # Track state changes
70
  self.previous_head_tracking_state = self.is_head_tracking_enabled
71
 
72
+ # Tracking scale factor (proportional gain for the camera-head servo loop).
73
+ # 0.85 provides accurate convergence via closed-loop feedback while
74
+ # avoiding single-frame overshoot that causes jitter.
75
+ self.tracking_scale = 0.85
76
 
77
  # Smoothing factor for exponential moving average (0.0-1.0)
78
+ # At 25Hz with alpha=0.25, 95% convergence ~0.5s -- smooth enough to
79
+ # filter detection noise, responsive enough to feel like eye contact.
80
+ self.smoothing_alpha = 0.25
81
 
82
  # Previous smoothed offsets for EMA calculation
83
  self._smoothed_offsets: List[float] = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
84
+
85
+ # --- Room scanning state ---
86
+ # When no face is visible, the robot periodically sweeps the room.
87
+ self._scanning = False
88
+ self._scanning_start_time = 0.0
89
+ # Scanning pattern: sinusoidal yaw sweep
90
+ self._scan_yaw_amplitude = np.deg2rad(35) # ±35 degrees
91
+ self._scan_period = 8.0 # seconds for a full left-right-left cycle
92
+ self._scan_pitch_offset = np.deg2rad(3) # slight upward tilt while scanning
93
+ # Start scanning immediately at boot (before any face has ever been seen)
94
+ self._ever_seen_face = False
95
 
96
  def get_latest_frame(self) -> Optional[NDArray[np.uint8]]:
97
  """Get the latest frame (thread-safe).
 
122
  Args:
123
  enabled: Whether to enable face tracking
124
  """
125
+ if enabled and not self.is_head_tracking_enabled:
126
+ # Reset smoothed offsets so tracking converges quickly from scratch
127
+ self._smoothed_offsets = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
128
+ # Start scanning immediately when re-enabled
129
+ self._start_scanning()
130
  self.is_head_tracking_enabled = enabled
131
+ logger.info("Head tracking %s", "enabled" if enabled else "disabled")
132
 
133
  def start(self) -> None:
134
  """Start the camera worker loop in a thread."""
 
144
  self._thread.join(timeout=2.0)
145
  logger.info("Camera worker stopped")
146
 
147
+ # ------------------------------------------------------------------
148
+ # Scanning helpers
149
+ # ------------------------------------------------------------------
150
+
151
+ def _start_scanning(self) -> None:
152
+ """Begin the room-scanning sweep."""
153
+ if not self._scanning:
154
+ self._scanning = True
155
+ self._scanning_start_time = time.time()
156
+ logger.debug("Started room scanning")
157
+
158
+ def _stop_scanning(self) -> None:
159
+ """Stop the room-scanning sweep."""
160
+ if self._scanning:
161
+ self._scanning = False
162
+ logger.debug("Stopped room scanning")
163
+
164
+ def _update_scanning_offsets(self, current_time: float) -> None:
165
+ """Compute scanning offsets -- a slow yaw sweep with slight pitch up.
166
+
167
+ The sweep is sinusoidal so the head slows at the extremes (more natural)
168
+ and the face detector gets a chance to catch faces at the edges.
169
+ """
170
+ t = current_time - self._scanning_start_time
171
+
172
+ yaw = float(self._scan_yaw_amplitude * np.sin(2 * np.pi * t / self._scan_period))
173
+ pitch = float(self._scan_pitch_offset)
174
+
175
+ with self.face_tracking_lock:
176
+ self.face_tracking_offsets = [0.0, 0.0, 0.0, 0.0, pitch, yaw]
177
+
178
+ # ------------------------------------------------------------------
179
+ # Main loop
180
+ # ------------------------------------------------------------------
181
+
182
  def _working_loop(self) -> None:
183
  """Main camera worker loop.
184
 
 
189
  # Neutral pose for interpolation target
190
  neutral_pose = np.eye(4, dtype=np.float32)
191
  self.previous_head_tracking_state = self.is_head_tracking_enabled
192
+
193
+ # Begin scanning right away so the robot looks for a face on startup
194
+ if self.is_head_tracking_enabled and self.head_tracker is not None:
195
+ self._start_scanning()
196
 
197
  while not self._stop_event.is_set():
198
  try:
 
212
  self.last_face_detected_time = current_time
213
  self.interpolation_start_time = None
214
  self.interpolation_start_pose = None
215
+ self._stop_scanning()
216
 
217
  # Update tracking state
218
  self.previous_head_tracking_state = self.is_head_tracking_enabled
 
228
  time.sleep(0.04)
229
 
230
  except Exception as e:
231
+ logger.error("Camera worker error: %s", e)
232
  time.sleep(0.1)
233
 
234
  logger.debug("Camera worker thread exited")
 
249
  eye_center, _ = self.head_tracker.get_head_position(frame)
250
 
251
  if eye_center is not None:
252
+ # Face detected!
253
+ if not self._ever_seen_face:
254
+ self._ever_seen_face = True
255
+ logger.info("Face detected for the first time")
256
+
257
+ # Stop scanning if we were scanning
258
+ if self._scanning:
259
+ self._stop_scanning()
260
+ # Seed the EMA from current scanning offsets for smooth transition
261
+ with self.face_tracking_lock:
262
+ self._smoothed_offsets = list(self.face_tracking_offsets)
263
+
264
  self.last_face_detected_time = current_time
265
  self.interpolation_start_time = None # Stop any interpolation
266
 
 
284
  translation = target_pose[:3, 3]
285
  rotation = R.from_matrix(target_pose[:3, :3]).as_euler("xyz", degrees=False)
286
 
287
+ # Scale for smoother closed-loop convergence
288
  translation *= self.tracking_scale
289
  rotation *= self.tracking_scale
290
 
 
307
  self.face_tracking_offsets = smoothed
308
 
309
  else:
310
+ # No face detected
311
+ if self._scanning:
312
+ # Already scanning -- keep sweeping the room
313
+ self._update_scanning_offsets(current_time)
314
+ else:
315
+ # Not scanning yet -- go through the wait/return/scan sequence
316
+ self._interpolate_to_neutral(current_time, neutral_pose)
317
 
318
  def _interpolate_to_neutral(
319
  self,
 
322
  ) -> None:
323
  """Interpolate face tracking offsets back to neutral when face is lost.
324
 
325
+ Once interpolation completes, automatically starts room scanning.
326
+
327
  Args:
328
  current_time: Current timestamp
329
  neutral_pose: Target neutral pose matrix
330
  """
331
  if self.last_face_detected_time is None:
332
+ # Never seen a face -- go straight to scanning
333
+ self._start_scanning()
334
  return
335
 
336
  time_since_face_lost = current_time - self.last_face_detected_time
 
373
  rotation[0], rotation[1], rotation[2],
374
  ]
375
 
376
+ # If interpolation is complete, start scanning the room
377
  if t >= 1.0:
378
  self.last_face_detected_time = None
379
  self.interpolation_start_time = None
380
  self.interpolation_start_pose = None
381
+ self._smoothed_offsets = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
382
+ self._start_scanning()
src/reachy_mini_openclaw/config.py CHANGED
@@ -25,7 +25,7 @@ class Config:
25
  OPENAI_VOICE: str = field(default_factory=lambda: os.getenv("OPENAI_VOICE", "cedar"))
26
 
27
  # OpenClaw Gateway Configuration
28
- OPENCLAW_GATEWAY_URL: str = field(default_factory=lambda: os.getenv("OPENCLAW_GATEWAY_URL", "http://localhost:18789"))
29
  OPENCLAW_TOKEN: Optional[str] = field(default_factory=lambda: os.getenv("OPENCLAW_TOKEN"))
30
  OPENCLAW_AGENT_ID: str = field(default_factory=lambda: os.getenv("OPENCLAW_AGENT_ID", "main"))
31
  # Session key for OpenClaw - uses "main" to share context with WhatsApp and other channels
 
25
  OPENAI_VOICE: str = field(default_factory=lambda: os.getenv("OPENAI_VOICE", "cedar"))
26
 
27
  # OpenClaw Gateway Configuration
28
+ OPENCLAW_GATEWAY_URL: str = field(default_factory=lambda: os.getenv("OPENCLAW_GATEWAY_URL", "ws://localhost:18789"))
29
  OPENCLAW_TOKEN: Optional[str] = field(default_factory=lambda: os.getenv("OPENCLAW_TOKEN"))
30
  OPENCLAW_AGENT_ID: str = field(default_factory=lambda: os.getenv("OPENCLAW_AGENT_ID", "main"))
31
  # Session key for OpenClaw - uses "main" to share context with WhatsApp and other channels
src/reachy_mini_openclaw/gradio_app.py CHANGED
@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
17
 
18
 
19
  def launch_gradio(
20
- gateway_url: str = "http://localhost:18789",
21
  robot_name: Optional[str] = None,
22
  enable_camera: bool = True,
23
  enable_openclaw: bool = True,
 
17
 
18
 
19
  def launch_gradio(
20
+ gateway_url: str = "ws://localhost:18789",
21
  robot_name: Optional[str] = None,
22
  enable_camera: bool = True,
23
  enable_openclaw: bool = True,
src/reachy_mini_openclaw/main.py CHANGED
@@ -98,7 +98,7 @@ Examples:
98
  parser.add_argument(
99
  "--gateway-url",
100
  type=str,
101
- default=os.getenv("OPENCLAW_GATEWAY_URL", "http://localhost:18789"),
102
  help="OpenClaw gateway URL (from OPENCLAW_GATEWAY_URL env or default)"
103
  )
104
  parser.add_argument(
@@ -142,7 +142,7 @@ class ClawBodyCore:
142
 
143
  def __init__(
144
  self,
145
- gateway_url: str = "http://localhost:18789",
146
  robot_name: Optional[str] = None,
147
  enable_camera: bool = True,
148
  enable_openclaw: bool = True,
@@ -401,6 +401,28 @@ class ClawBodyCore:
401
  else:
402
  logger.warning("OpenClaw gateway not available - some features disabled")
403
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
404
  # Start movement system
405
  logger.info("Starting movement system...")
406
  self.movement_manager.start()
@@ -461,6 +483,15 @@ class ClawBodyCore:
461
  if self.camera_worker is not None:
462
  self.camera_worker.stop()
463
 
 
 
 
 
 
 
 
 
 
464
  # Close resources if we own them
465
  if self._owns_robot:
466
  try:
@@ -492,7 +523,7 @@ class ClawBodyApp:
492
  loop = asyncio.new_event_loop()
493
  asyncio.set_event_loop(loop)
494
 
495
- gateway_url = os.getenv("OPENCLAW_GATEWAY_URL", "http://localhost:18789")
496
 
497
  app = ClawBodyCore(
498
  gateway_url=gateway_url,
 
98
  parser.add_argument(
99
  "--gateway-url",
100
  type=str,
101
+ default=os.getenv("OPENCLAW_GATEWAY_URL", "ws://localhost:18789"),
102
  help="OpenClaw gateway URL (from OPENCLAW_GATEWAY_URL env or default)"
103
  )
104
  parser.add_argument(
 
142
 
143
  def __init__(
144
  self,
145
+ gateway_url: str = "ws://localhost:18789",
146
  robot_name: Optional[str] = None,
147
  enable_camera: bool = True,
148
  enable_openclaw: bool = True,
 
401
  else:
402
  logger.warning("OpenClaw gateway not available - some features disabled")
403
 
404
+ # Enable motors and move to neutral pose
405
+ logger.info("Enabling motors and moving to neutral position...")
406
+ try:
407
+ self.robot.enable_motors()
408
+ from reachy_mini.utils import create_head_pose
409
+ neutral = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
410
+ self.robot.goto_target(
411
+ head=neutral,
412
+ antennas=[0.0, 0.0],
413
+ duration=2.0,
414
+ body_yaw=0.0,
415
+ )
416
+ time.sleep(2) # Wait for goto to complete
417
+ logger.info("Robot at neutral position with motors enabled")
418
+ except Exception as e:
419
+ logger.error("Failed to initialize robot pose: %s", e)
420
+
421
+ # Wire up camera worker to movement manager for face tracking
422
+ if self.camera_worker is not None:
423
+ self.movement_manager.camera_worker = self.camera_worker
424
+ logger.info("Face tracking connected to movement system")
425
+
426
  # Start movement system
427
  logger.info("Starting movement system...")
428
  self.movement_manager.start()
 
483
  if self.camera_worker is not None:
484
  self.camera_worker.stop()
485
 
486
+ # Disconnect OpenClaw bridge
487
+ if self.openclaw_bridge is not None:
488
+ try:
489
+ asyncio.get_event_loop().run_until_complete(
490
+ self.openclaw_bridge.disconnect()
491
+ )
492
+ except Exception as e:
493
+ logger.debug("OpenClaw disconnect: %s", e)
494
+
495
  # Close resources if we own them
496
  if self._owns_robot:
497
  try:
 
523
  loop = asyncio.new_event_loop()
524
  asyncio.set_event_loop(loop)
525
 
526
+ gateway_url = os.getenv("OPENCLAW_GATEWAY_URL", "ws://localhost:18789")
527
 
528
  app = ClawBodyCore(
529
  gateway_url=gateway_url,
src/reachy_mini_openclaw/moves.py CHANGED
@@ -199,6 +199,7 @@ class MovementState:
199
  last_activity_time: float = 0.0
200
  speech_offsets: SpeechOffsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
201
  face_tracking_offsets: SpeechOffsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
 
202
  last_primary_pose: Optional[FullBodyPose] = None
203
 
204
  def update_activity(self) -> None:
@@ -277,6 +278,12 @@ class MovementManager:
277
  self._pending_speech_offsets: SpeechOffsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
278
  self._speech_dirty = False
279
 
 
 
 
 
 
 
280
  # Shared state lock
281
  self._shared_lock = threading.Lock()
282
  self._shared_last_activity = self.state.last_activity_time
@@ -300,6 +307,15 @@ class MovementManager:
300
  """Set listening state (freezes antennas). Thread-safe."""
301
  self._command_queue.put(("set_listening", listening))
302
 
 
 
 
 
 
 
 
 
 
303
  def is_idle(self) -> bool:
304
  """Check if robot has been idle. Thread-safe."""
305
  with self._shared_lock:
@@ -333,6 +349,50 @@ class MovementManager:
333
  # No camera worker, use neutral offsets
334
  self.state.face_tracking_offsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
336
  def _handle_command(self, cmd: str, payload: Any, current_time: float) -> None:
337
  """Handle a single command."""
338
  if cmd == "queue_move":
@@ -356,6 +416,23 @@ class MovementManager:
356
  else:
357
  self._antenna_unfreeze_blend = 0.0
358
  self.state.update_activity()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
359
 
360
  def _manage_move_queue(self, current_time: float) -> None:
361
  """Advance the move queue."""
@@ -380,6 +457,7 @@ class MovementManager:
380
  and not self.move_queue
381
  and not self._is_listening
382
  and not self._breathing_active
 
383
  ):
384
  idle_for = current_time - self.state.last_activity_time
385
  if idle_for >= self.idle_inactivity_delay:
@@ -429,9 +507,11 @@ class MovementManager:
429
  return (neutral, (0.0, 0.0), 0.0)
430
 
431
  def _get_secondary_pose(self) -> FullBodyPose:
432
- """Get secondary offsets."""
433
  offsets = [
434
- self.state.speech_offsets[i] + self.state.face_tracking_offsets[i]
 
 
435
  for i in range(6)
436
  ]
437
 
@@ -440,7 +520,7 @@ class MovementManager:
440
  roll=offsets[3], pitch=offsets[4], yaw=offsets[5],
441
  degrees=False, mm=False
442
  )
443
- return (secondary_head, (0.0, 0.0), 0.0)
444
 
445
  def _compose_pose(self, current_time: float) -> FullBodyPose:
446
  """Compose final pose from primary and secondary."""
@@ -529,6 +609,9 @@ class MovementManager:
529
  # Update face tracking offsets from camera worker
530
  self._update_face_tracking(loop_start)
531
 
 
 
 
532
  # Compose pose
533
  head, antennas, body_yaw = self._compose_pose(loop_start)
534
 
@@ -555,6 +638,8 @@ class MovementManager:
555
  "queue_size": len(self.move_queue),
556
  "is_listening": self._is_listening,
557
  "breathing_active": self._breathing_active,
 
 
558
  "last_commanded_pose": {
559
  "head": self._last_commanded_pose[0].tolist(),
560
  "antennas": self._last_commanded_pose[1],
 
199
  last_activity_time: float = 0.0
200
  speech_offsets: SpeechOffsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
201
  face_tracking_offsets: SpeechOffsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
202
+ thinking_offsets: SpeechOffsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
203
  last_primary_pose: Optional[FullBodyPose] = None
204
 
205
  def update_activity(self) -> None:
 
278
  self._pending_speech_offsets: SpeechOffsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
279
  self._speech_dirty = False
280
 
281
+ # Processing/thinking animation state
282
+ self._processing = False
283
+ self._processing_start_time = 0.0
284
+ self._thinking_amplitude = 0.0 # 0..1 envelope for smooth fade in/out
285
+ self._thinking_antenna_offsets: Tuple[float, float] = (0.0, 0.0)
286
+
287
  # Shared state lock
288
  self._shared_lock = threading.Lock()
289
  self._shared_last_activity = self.state.last_activity_time
 
307
  """Set listening state (freezes antennas). Thread-safe."""
308
  self._command_queue.put(("set_listening", listening))
309
 
310
+ def set_processing(self, processing: bool) -> None:
311
+ """Set processing state (triggers thinking animation). Thread-safe.
312
+
313
+ When True, the robot shows a continuous 'thinking' animation as
314
+ secondary offsets -- gentle head sway and asymmetric antenna scanning.
315
+ Face tracking continues underneath since this is additive.
316
+ """
317
+ self._command_queue.put(("set_processing", processing))
318
+
319
  def is_idle(self) -> bool:
320
  """Check if robot has been idle. Thread-safe."""
321
  with self._shared_lock:
 
349
  # No camera worker, use neutral offsets
350
  self.state.face_tracking_offsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
351
 
352
+ def _update_thinking_offsets(self, current_time: float) -> None:
353
+ """Compute thinking animation as secondary offsets.
354
+
355
+ Produces a gentle head sway (yaw drift, slight upward pitch, z bob)
356
+ and asymmetric antenna scanning pattern. The amplitude envelope
357
+ smoothly ramps up over 0.5s and decays over 0.5s for organic feel.
358
+ """
359
+ # Update amplitude envelope
360
+ if self._processing:
361
+ # Ramp up over 0.5s
362
+ elapsed = current_time - self._processing_start_time
363
+ self._thinking_amplitude = min(1.0, elapsed / 0.5)
364
+ elif self._thinking_amplitude > 0:
365
+ # Smooth decay at 2.0/s (full decay in 0.5s)
366
+ self._thinking_amplitude = max(
367
+ 0.0, self._thinking_amplitude - 2.0 * self.target_period
368
+ )
369
+
370
+ # If fully decayed, zero everything and bail
371
+ if self._thinking_amplitude < 0.001:
372
+ self._thinking_amplitude = 0.0
373
+ self.state.thinking_offsets = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
374
+ self._thinking_antenna_offsets = (0.0, 0.0)
375
+ return
376
+
377
+ amp = self._thinking_amplitude
378
+ t = current_time - self._processing_start_time
379
+
380
+ # Head offsets (radians / metres -- degrees=False, mm=False)
381
+ # Slow yaw drift: ±12° at 0.15 Hz
382
+ yaw = amp * np.deg2rad(12) * np.sin(2 * np.pi * 0.15 * t)
383
+ # Slight upward pitch: 6° base + 3° oscillation at 0.2 Hz
384
+ pitch = amp * (np.deg2rad(6) + np.deg2rad(3) * np.sin(2 * np.pi * 0.2 * t))
385
+ # Gentle z bob: 3 mm at 0.12 Hz
386
+ z = amp * 0.003 * np.sin(2 * np.pi * 0.12 * t)
387
+
388
+ self.state.thinking_offsets = (0.0, 0.0, z, 0.0, pitch, yaw)
389
+
390
+ # Antenna offsets: asymmetric scan (phase offset creates "searching" feel)
391
+ # ±20° at 0.4 Hz, right antenna lags left by ~70° of phase
392
+ left_ant = amp * np.deg2rad(20) * np.sin(2 * np.pi * 0.4 * t)
393
+ right_ant = amp * np.deg2rad(20) * np.sin(2 * np.pi * 0.4 * t + 1.2)
394
+ self._thinking_antenna_offsets = (left_ant, right_ant)
395
+
396
  def _handle_command(self, cmd: str, payload: Any, current_time: float) -> None:
397
  """Handle a single command."""
398
  if cmd == "queue_move":
 
416
  else:
417
  self._antenna_unfreeze_blend = 0.0
418
  self.state.update_activity()
419
+ elif cmd == "set_processing":
420
+ desired = bool(payload)
421
+ if desired and not self._processing:
422
+ self._processing = True
423
+ self._processing_start_time = self._now()
424
+ # Interrupt breathing so thinking animation is clean
425
+ if self._breathing_active and isinstance(self.state.current_move, BreathingMove):
426
+ self.state.current_move = None
427
+ self.state.move_start_time = None
428
+ self._breathing_active = False
429
+ self.state.update_activity()
430
+ logger.debug("Processing started - thinking animation active")
431
+ elif not desired and self._processing:
432
+ self._processing = False
433
+ # Amplitude will decay smoothly in _update_thinking_offsets
434
+ self.state.update_activity()
435
+ logger.debug("Processing ended - thinking animation decaying")
436
 
437
  def _manage_move_queue(self, current_time: float) -> None:
438
  """Advance the move queue."""
 
457
  and not self.move_queue
458
  and not self._is_listening
459
  and not self._breathing_active
460
+ and not self._processing
461
  ):
462
  idle_for = current_time - self.state.last_activity_time
463
  if idle_for >= self.idle_inactivity_delay:
 
507
  return (neutral, (0.0, 0.0), 0.0)
508
 
509
  def _get_secondary_pose(self) -> FullBodyPose:
510
+ """Get secondary offsets (speech + face tracking + thinking)."""
511
  offsets = [
512
+ self.state.speech_offsets[i]
513
+ + self.state.face_tracking_offsets[i]
514
+ + self.state.thinking_offsets[i]
515
  for i in range(6)
516
  ]
517
 
 
520
  roll=offsets[3], pitch=offsets[4], yaw=offsets[5],
521
  degrees=False, mm=False
522
  )
523
+ return (secondary_head, self._thinking_antenna_offsets, 0.0)
524
 
525
  def _compose_pose(self, current_time: float) -> FullBodyPose:
526
  """Compose final pose from primary and secondary."""
 
609
  # Update face tracking offsets from camera worker
610
  self._update_face_tracking(loop_start)
611
 
612
+ # Update thinking animation offsets
613
+ self._update_thinking_offsets(loop_start)
614
+
615
  # Compose pose
616
  head, antennas, body_yaw = self._compose_pose(loop_start)
617
 
 
638
  "queue_size": len(self.move_queue),
639
  "is_listening": self._is_listening,
640
  "breathing_active": self._breathing_active,
641
+ "processing": self._processing,
642
+ "thinking_amplitude": round(self._thinking_amplitude, 3),
643
  "last_commanded_pose": {
644
  "head": self._last_commanded_pose[0].tolist(),
645
  "antennas": self._last_commanded_pose[1],
src/reachy_mini_openclaw/openai_realtime.py CHANGED
@@ -277,6 +277,7 @@ OpenClaw has access to many capabilities you don't have directly.""",
277
  if event_type == "input_audio_buffer.speech_started":
278
  # User started speaking - stop any current output
279
  self._speaking = False
 
280
  while not self.output_queue.empty():
281
  try:
282
  self.output_queue.get_nowait()
@@ -308,6 +309,9 @@ OpenClaw has access to many capabilities you don't have directly.""",
308
 
309
  # Audio output from TTS
310
  if event_type == "response.audio.delta":
 
 
 
311
  # Feed to head wobbler for expressive movement
312
  if self.deps.head_wobbler is not None:
313
  self.deps.head_wobbler.feed(event.delta)
@@ -337,6 +341,7 @@ OpenClaw has access to many capabilities you don't have directly.""",
337
  # Response completed - sync conversation to OpenClaw
338
  if event_type == "response.done":
339
  self._speaking = False
 
340
  if self.deps.head_wobbler is not None:
341
  self.deps.head_wobbler.reset()
342
  logger.debug("Response completed")
@@ -366,6 +371,10 @@ OpenClaw has access to many capabilities you don't have directly.""",
366
 
367
  logger.info("Tool call: %s(%s)", tool_name, args_json[:50] if len(args_json) > 50 else args_json)
368
 
 
 
 
 
369
  try:
370
  if tool_name == "ask_openclaw":
371
  result = await self._handle_openclaw_query(args_json)
 
277
  if event_type == "input_audio_buffer.speech_started":
278
  # User started speaking - stop any current output
279
  self._speaking = False
280
+ self.deps.movement_manager.set_processing(False)
281
  while not self.output_queue.empty():
282
  try:
283
  self.output_queue.get_nowait()
 
309
 
310
  # Audio output from TTS
311
  if event_type == "response.audio.delta":
312
+ # Audio arriving means we have a response - stop thinking animation
313
+ self.deps.movement_manager.set_processing(False)
314
+
315
  # Feed to head wobbler for expressive movement
316
  if self.deps.head_wobbler is not None:
317
  self.deps.head_wobbler.feed(event.delta)
 
341
  # Response completed - sync conversation to OpenClaw
342
  if event_type == "response.done":
343
  self._speaking = False
344
+ self.deps.movement_manager.set_processing(False)
345
  if self.deps.head_wobbler is not None:
346
  self.deps.head_wobbler.reset()
347
  logger.debug("Response completed")
 
371
 
372
  logger.info("Tool call: %s(%s)", tool_name, args_json[:50] if len(args_json) > 50 else args_json)
373
 
374
+ # Start thinking animation while we process the tool call.
375
+ # It will stop when the next audio delta arrives or response completes.
376
+ self.deps.movement_manager.set_processing(True)
377
+
378
  try:
379
  if tool_name == "ask_openclaw":
380
  result = await self._handle_openclaw_query(args_json)
src/reachy_mini_openclaw/openclaw_bridge.py CHANGED
@@ -1,7 +1,7 @@
1
  """ClawBody - Bridge to OpenClaw Gateway for AI responses.
2
 
3
  This module provides ClawBody's integration with the OpenClaw gateway
4
- using the OpenAI-compatible Chat Completions HTTP API.
5
 
6
  ClawBody uses OpenAI Realtime API for voice I/O (speech recognition + TTS)
7
  but routes all responses through OpenClaw (Clawson) for intelligence.
@@ -10,16 +10,19 @@ but routes all responses through OpenClaw (Clawson) for intelligence.
10
  import json
11
  import asyncio
12
  import logging
 
13
  from typing import Optional, Any, AsyncIterator
14
  from dataclasses import dataclass
15
 
16
- import httpx
17
- from httpx_sse import aconnect_sse
18
 
19
  from reachy_mini_openclaw.config import config
20
 
21
  logger = logging.getLogger(__name__)
22
 
 
 
 
23
 
24
  @dataclass
25
  class OpenClawResponse:
@@ -29,23 +32,21 @@ class OpenClawResponse:
29
 
30
 
31
  class OpenClawBridge:
32
- """Bridge to OpenClaw Gateway using HTTP Chat Completions API.
33
-
34
- This class sends user messages to OpenClaw and receives AI responses.
35
- The robot maintains conversation context and can include images.
36
-
 
37
  Example:
38
  bridge = OpenClawBridge()
39
  await bridge.connect()
40
-
41
  # Simple query
42
  response = await bridge.chat("Hello!")
43
  print(response.content)
44
-
45
- # With image
46
- response = await bridge.chat("What do you see?", image_b64="...")
47
  """
48
-
49
  def __init__(
50
  self,
51
  gateway_url: Optional[str] = None,
@@ -54,304 +55,541 @@ class OpenClawBridge:
54
  timeout: float = 120.0,
55
  ):
56
  """Initialize the OpenClaw bridge.
57
-
58
  Args:
59
- gateway_url: URL of the OpenClaw gateway (default: from env/config)
 
60
  gateway_token: Authentication token (default: from env/config)
61
  agent_id: OpenClaw agent ID to use (default: from env/config)
62
  timeout: Request timeout in seconds
63
  """
64
  import os
65
- # Read from env directly as fallback (config may have been loaded before .env)
66
- self.gateway_url = gateway_url or os.getenv("OPENCLAW_GATEWAY_URL") or config.OPENCLAW_GATEWAY_URL
67
- self.gateway_token = gateway_token or os.getenv("OPENCLAW_TOKEN") or config.OPENCLAW_TOKEN
68
- self.agent_id = agent_id or os.getenv("OPENCLAW_AGENT_ID") or config.OPENCLAW_AGENT_ID
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  self.timeout = timeout
70
-
71
- # Session key - use "main" to share context with WhatsApp and other channels
72
- # The full session key is: agent:<agent_id>:<session_key>
73
- self.session_key = os.getenv("OPENCLAW_SESSION_KEY") or config.OPENCLAW_SESSION_KEY or "main"
74
-
75
- # Connection state
 
 
 
 
 
76
  self._connected = False
77
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  async def connect(self) -> bool:
79
- """Test connection to the OpenClaw gateway.
80
-
81
  Returns:
82
  True if connection successful, False otherwise
83
  """
84
- logger.info("Attempting to connect to OpenClaw at %s (token: %s)",
85
- self.gateway_url, "set" if self.gateway_token else "not set")
 
 
 
86
  try:
87
- # Use longer timeout for first connection (OpenClaw may need to initialize)
88
- async with httpx.AsyncClient(timeout=60.0) as client:
89
- # Test the chat completions endpoint with a simple request
90
- url = f"{self.gateway_url}/v1/chat/completions"
91
- logger.info("Testing endpoint: %s", url)
92
- response = await client.post(
93
- url,
94
- json={
95
- "model": f"openclaw:{self.agent_id}",
96
- "messages": [{"role": "user", "content": "ping"}],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  },
98
- headers=self._get_headers(),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
  )
100
- logger.info("Response status: %d", response.status_code)
101
- if response.status_code == 200:
102
- self._connected = True
103
- logger.info("Connected to OpenClaw gateway at %s", self.gateway_url)
104
- return True
105
- else:
106
- logger.warning("OpenClaw gateway returned %d: %s",
107
- response.status_code, response.text[:100])
108
- self._connected = False
109
- return False
 
 
 
 
 
110
  except Exception as e:
111
- logger.error("Failed to connect to OpenClaw gateway: %s (type: %s)", e, type(e).__name__)
112
- self._connected = False
 
 
 
 
113
  return False
114
-
115
- def _get_headers(self) -> dict[str, str]:
116
- """Get headers for OpenClaw API requests."""
117
- headers = {
118
- "Content-Type": "application/json",
119
- # Use session key header to share context with WhatsApp and other channels
120
- # Format: agent:<agent_id>:<session_key> - default "main" shares with all DMs
121
- "x-openclaw-session-key": f"agent:{self.agent_id}:{self.session_key}",
122
- }
123
- if self.gateway_token:
124
- headers["Authorization"] = f"Bearer {self.gateway_token}"
125
- return headers
126
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
  async def chat(
128
- self,
129
- message: str,
130
  image_b64: Optional[str] = None,
131
  system_context: Optional[str] = None,
132
  ) -> OpenClawResponse:
133
  """Send a message to OpenClaw and get a response.
134
-
135
  OpenClaw maintains conversation memory on its end, so it will be aware
136
  of conversations from other channels (WhatsApp, web, etc.). We only send
137
  the current message and let OpenClaw handle the context.
138
-
139
  Args:
140
  message: The user's message (transcribed speech)
141
- image_b64: Optional base64-encoded image from robot camera
142
- system_context: Optional additional system context
143
-
 
144
  Returns:
145
  OpenClawResponse with the AI's response
146
  """
147
- # Build user message content
148
- if image_b64:
149
- content = [
150
- {"type": "text", "text": message},
151
- {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
152
- ]
153
- else:
154
- content = message
155
-
156
- # Build request messages - just the current message
157
- # OpenClaw maintains conversation memory on its end
158
- request_messages = []
159
-
160
- # Add system context if provided (e.g., "User is speaking to you through the robot")
161
  if system_context:
162
- request_messages.append({"role": "system", "content": system_context})
163
-
164
- # Add the current user message
165
- request_messages.append({"role": "user", "content": content})
166
-
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  try:
168
- async with httpx.AsyncClient(timeout=httpx.Timeout(self.timeout)) as client:
169
- response = await client.post(
170
- f"{self.gateway_url}/v1/chat/completions",
171
- json={
172
- "model": f"openclaw:{self.agent_id}",
173
- "messages": request_messages,
174
- "stream": False,
175
- },
176
- headers=self._get_headers(),
177
- )
178
- response.raise_for_status()
179
-
180
- data = response.json()
181
- choices = data.get("choices", [])
182
- if choices:
183
- assistant_content = choices[0].get("message", {}).get("content", "")
184
- return OpenClawResponse(content=assistant_content)
185
- return OpenClawResponse(content="", error="No response from OpenClaw")
186
-
187
- except httpx.HTTPStatusError as e:
188
- logger.error("OpenClaw HTTP error: %d - %s", e.response.status_code, e.response.text[:200])
189
- return OpenClawResponse(content="", error=f"HTTP {e.response.status_code}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
  except Exception as e:
191
  logger.error("OpenClaw chat error: %s", e)
192
  return OpenClawResponse(content="", error=str(e))
193
-
194
  async def stream_chat(
195
- self,
196
- message: str,
197
  image_b64: Optional[str] = None,
198
  ) -> AsyncIterator[str]:
199
  """Stream a response from OpenClaw.
200
-
201
- OpenClaw maintains conversation memory on its end, so it will be aware
202
- of conversations from other channels (WhatsApp, web, etc.).
203
-
204
  Args:
205
  message: The user's message
206
  image_b64: Optional base64-encoded image
207
-
208
  Yields:
209
  String chunks of the response as they arrive
210
  """
211
- # Build user message content
 
 
 
 
212
  if image_b64:
213
- content = [
214
- {"type": "text", "text": message},
215
- {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
216
- ]
217
- else:
218
- content = message
219
-
220
- # Only send current message - OpenClaw handles memory
221
- request_messages = [{"role": "user", "content": content}]
222
-
223
- async with httpx.AsyncClient(timeout=httpx.Timeout(self.timeout)) as client:
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  try:
225
- async with aconnect_sse(
226
- client,
227
- "POST",
228
- f"{self.gateway_url}/v1/chat/completions",
229
- json={
230
- "model": f"openclaw:{self.agent_id}",
231
- "messages": request_messages,
232
- "stream": True,
233
- },
234
- headers=self._get_headers(),
235
- ) as event_source:
236
- event_source.response.raise_for_status()
237
-
238
- async for sse in event_source.aiter_sse():
239
- if sse.data == "[DONE]":
 
 
 
 
 
 
 
240
  break
241
-
242
- try:
243
- data = json.loads(sse.data)
244
- choices = data.get("choices", [])
245
- if choices:
246
- delta = choices[0].get("delta", {})
247
- chunk = delta.get("content", "")
248
- if chunk:
249
- yield chunk
250
- except json.JSONDecodeError:
251
- continue
252
-
253
- except httpx.HTTPStatusError as e:
254
- logger.error("OpenClaw streaming error: %d", e.response.status_code)
255
- yield f"[Error: HTTP {e.response.status_code}]"
256
- except Exception as e:
257
- logger.error("OpenClaw streaming error: %s", e)
258
- yield f"[Error: {e}]"
259
-
260
  @property
261
  def is_connected(self) -> bool:
262
  """Check if bridge is connected to gateway."""
263
  return self._connected
264
-
265
  async def get_agent_context(self) -> Optional[str]:
266
  """Fetch the agent's current context, personality, and memory summary.
267
-
268
  This asks OpenClaw to provide a summary of:
269
  - The agent's personality and identity
270
  - Recent conversation context
271
  - Important memories about the user
272
- - Current state (time, location awareness, etc.)
273
-
274
  Returns:
275
  A context string to use as system instructions, or None if failed
276
  """
277
  try:
278
- async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
279
- # Ask OpenClaw to summarize its context for the robot body
280
- response = await client.post(
281
- f"{self.gateway_url}/v1/chat/completions",
282
- json={
283
- "model": f"openclaw:{self.agent_id}",
284
- "messages": [
285
- {
286
- "role": "system",
287
- "content": """You are being asked to provide your current context for your robot body.
288
- Output a comprehensive context summary that another AI can use to embody you. Include:
289
-
290
- 1. YOUR IDENTITY: Who you are, your name, your personality traits, how you speak
291
- 2. USER CONTEXT: What you know about the user you're talking to (name, preferences, relationship)
292
- 3. RECENT CONTEXT: Summary of recent conversations or important ongoing topics
293
- 4. MEMORIES: Key things you remember that are relevant to interactions
294
- 5. CURRENT STATE: Any relevant time/date awareness, ongoing tasks, or situational context
295
-
296
- Be specific and personal. This context will be used by your robot body to speak and act AS YOU.
297
- Output ONLY the context summary, no preamble."""
298
- },
299
- {
300
- "role": "user",
301
- "content": "Provide your current context summary for the robot body."
302
- }
303
- ],
304
- "stream": False,
305
- },
306
- headers=self._get_headers(),
307
- )
308
- response.raise_for_status()
309
-
310
- data = response.json()
311
- choices = data.get("choices", [])
312
- if choices:
313
- context = choices[0].get("message", {}).get("content", "")
314
- if context:
315
- logger.info("Retrieved agent context from OpenClaw (%d chars)", len(context))
316
- return context
317
-
318
- logger.warning("No context returned from OpenClaw")
319
  return None
320
-
 
 
 
 
 
 
 
 
 
 
321
  except Exception as e:
322
  logger.error("Failed to get agent context: %s", e)
323
  return None
324
-
325
- async def sync_conversation(self, user_message: str, assistant_response: str) -> None:
 
 
326
  """Sync a conversation turn back to OpenClaw for memory continuity.
327
-
328
- This ensures OpenClaw's memory stays in sync with robot conversations.
329
-
330
  Args:
331
  user_message: What the user said
332
  assistant_response: What the robot/AI responded
333
  """
334
  try:
335
- async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
336
- # Send the conversation to OpenClaw with a special system message
337
- # indicating this is a sync from the robot body
338
- await client.post(
339
- f"{self.gateway_url}/v1/chat/completions",
340
- json={
341
- "model": f"openclaw:{self.agent_id}",
342
- "messages": [
343
- {
344
- "role": "system",
345
- "content": "[ROBOT BODY SYNC] The following conversation happened through your Reachy Mini robot body. Remember it as part of your ongoing conversation with the user."
346
- },
347
- {"role": "user", "content": user_message},
348
- {"role": "assistant", "content": assistant_response}
349
- ],
350
- "stream": False,
351
- },
352
- headers=self._get_headers(),
353
- )
354
- logger.debug("Synced conversation to OpenClaw")
355
  except Exception as e:
356
  logger.debug("Failed to sync conversation: %s", e)
357
 
 
1
  """ClawBody - Bridge to OpenClaw Gateway for AI responses.
2
 
3
  This module provides ClawBody's integration with the OpenClaw gateway
4
+ using the WebSocket protocol (the gateway's native transport).
5
 
6
  ClawBody uses OpenAI Realtime API for voice I/O (speech recognition + TTS)
7
  but routes all responses through OpenClaw (Clawson) for intelligence.
 
10
  import json
11
  import asyncio
12
  import logging
13
+ import uuid
14
  from typing import Optional, Any, AsyncIterator
15
  from dataclasses import dataclass
16
 
17
+ import websockets
 
18
 
19
  from reachy_mini_openclaw.config import config
20
 
21
  logger = logging.getLogger(__name__)
22
 
23
+ # Protocol version supported by this client
24
+ PROTOCOL_VERSION = 3
25
+
26
 
27
  @dataclass
28
  class OpenClawResponse:
 
32
 
33
 
34
  class OpenClawBridge:
35
+ """Bridge to OpenClaw Gateway using WebSocket protocol.
36
+
37
+ The OpenClaw gateway speaks WebSocket with a JSON frame protocol.
38
+ This class handles the connect handshake, authentication, and
39
+ chat operations.
40
+
41
  Example:
42
  bridge = OpenClawBridge()
43
  await bridge.connect()
44
+
45
  # Simple query
46
  response = await bridge.chat("Hello!")
47
  print(response.content)
 
 
 
48
  """
49
+
50
  def __init__(
51
  self,
52
  gateway_url: Optional[str] = None,
 
55
  timeout: float = 120.0,
56
  ):
57
  """Initialize the OpenClaw bridge.
58
+
59
  Args:
60
+ gateway_url: URL of the OpenClaw gateway (default: from env/config).
61
+ Accepts http:// or ws:// schemes; http is converted to ws.
62
  gateway_token: Authentication token (default: from env/config)
63
  agent_id: OpenClaw agent ID to use (default: from env/config)
64
  timeout: Request timeout in seconds
65
  """
66
  import os
67
+
68
+ raw_url = (
69
+ gateway_url
70
+ or os.getenv("OPENCLAW_GATEWAY_URL")
71
+ or config.OPENCLAW_GATEWAY_URL
72
+ )
73
+ # Normalise to ws:// (the gateway listens on the same port for both)
74
+ self.gateway_url = self._normalise_ws_url(raw_url)
75
+
76
+ self.gateway_token = (
77
+ gateway_token
78
+ or os.getenv("OPENCLAW_TOKEN")
79
+ or config.OPENCLAW_TOKEN
80
+ )
81
+ self.agent_id = (
82
+ agent_id
83
+ or os.getenv("OPENCLAW_AGENT_ID")
84
+ or config.OPENCLAW_AGENT_ID
85
+ )
86
  self.timeout = timeout
87
+
88
+ # Session key "main" shares context with WhatsApp and other channels.
89
+ # Full key format: agent:<agent_id>:<session_key>
90
+ self.session_key = (
91
+ os.getenv("OPENCLAW_SESSION_KEY")
92
+ or config.OPENCLAW_SESSION_KEY
93
+ or "main"
94
+ )
95
+
96
+ # Persistent WebSocket state
97
+ self._ws: Optional[websockets.WebSocketClientProtocol] = None
98
  self._connected = False
99
+ self._conn_id: Optional[str] = None
100
+
101
+ # Background listener task & pending request futures
102
+ self._listener_task: Optional[asyncio.Task] = None
103
+ self._pending: dict[str, asyncio.Future] = {}
104
+ # Events keyed by runId -> list of event payloads
105
+ self._run_events: dict[str, asyncio.Queue] = {}
106
+
107
+ # ------------------------------------------------------------------
108
+ # URL helpers
109
+ # ------------------------------------------------------------------
110
+
111
+ @staticmethod
112
+ def _normalise_ws_url(url: str) -> str:
113
+ """Convert http(s) URL to ws(s)."""
114
+ if url.startswith("http://"):
115
+ return "ws://" + url[7:]
116
+ if url.startswith("https://"):
117
+ return "wss://" + url[8:]
118
+ if not url.startswith("ws://") and not url.startswith("wss://"):
119
+ return "ws://" + url
120
+ return url
121
+
122
+ # ------------------------------------------------------------------
123
+ # Connection lifecycle
124
+ # ------------------------------------------------------------------
125
+
126
  async def connect(self) -> bool:
127
+ """Connect to the OpenClaw gateway and authenticate.
128
+
129
  Returns:
130
  True if connection successful, False otherwise
131
  """
132
+ logger.info(
133
+ "Connecting to OpenClaw at %s (token: %s)",
134
+ self.gateway_url,
135
+ "set" if self.gateway_token else "not set",
136
+ )
137
  try:
138
+ self._ws = await websockets.connect(
139
+ self.gateway_url,
140
+ ping_interval=20,
141
+ ping_timeout=30,
142
+ close_timeout=5,
143
+ )
144
+
145
+ # 1. Receive challenge
146
+ raw = await asyncio.wait_for(self._ws.recv(), timeout=10)
147
+ challenge = json.loads(raw)
148
+ if challenge.get("event") != "connect.challenge":
149
+ logger.warning("Unexpected first frame: %s", challenge.get("event"))
150
+
151
+ # 2. Send connect request
152
+ req_id = str(uuid.uuid4())
153
+ connect_req = {
154
+ "type": "req",
155
+ "id": req_id,
156
+ "method": "connect",
157
+ "params": {
158
+ "minProtocol": PROTOCOL_VERSION,
159
+ "maxProtocol": PROTOCOL_VERSION,
160
+ "auth": {"token": self.gateway_token} if self.gateway_token else {},
161
+ "client": {
162
+ "id": "webchat",
163
+ "version": "1.0.0",
164
+ "platform": "linux",
165
+ "mode": "webchat",
166
  },
167
+ "role": "operator",
168
+ "scopes": ["chat", "operator.write", "operator.read"],
169
+ },
170
+ }
171
+ await self._ws.send(json.dumps(connect_req))
172
+
173
+ # 3. Read hello response
174
+ raw = await asyncio.wait_for(self._ws.recv(), timeout=10)
175
+ hello = json.loads(raw)
176
+
177
+ if hello.get("ok"):
178
+ self._connected = True
179
+ payload = hello.get("payload", {})
180
+ server = payload.get("server", {})
181
+ self._conn_id = server.get("connId")
182
+ logger.info(
183
+ "Connected to OpenClaw gateway (server=%s, connId=%s)",
184
+ server.get("host", "?"),
185
+ self._conn_id,
186
  )
187
+ # Start background listener
188
+ self._listener_task = asyncio.create_task(
189
+ self._listen_loop(), name="openclaw-ws-listener"
190
+ )
191
+ return True
192
+ else:
193
+ err = hello.get("error", {})
194
+ logger.error(
195
+ "OpenClaw connect failed: %s - %s",
196
+ err.get("code"),
197
+ err.get("message"),
198
+ )
199
+ await self._close_ws()
200
+ return False
201
+
202
  except Exception as e:
203
+ logger.error(
204
+ "Failed to connect to OpenClaw gateway: %s (%s)",
205
+ e,
206
+ type(e).__name__,
207
+ )
208
+ await self._close_ws()
209
  return False
210
+
211
+ async def disconnect(self) -> None:
212
+ """Disconnect from the gateway."""
213
+ self._connected = False
214
+ if self._listener_task and not self._listener_task.done():
215
+ self._listener_task.cancel()
216
+ try:
217
+ await self._listener_task
218
+ except (asyncio.CancelledError, Exception):
219
+ pass
220
+ await self._close_ws()
221
+
222
+ async def _close_ws(self) -> None:
223
+ self._connected = False
224
+ if self._ws:
225
+ try:
226
+ await self._ws.close()
227
+ except Exception:
228
+ pass
229
+ self._ws = None
230
+
231
+ # ------------------------------------------------------------------
232
+ # Background listener
233
+ # ------------------------------------------------------------------
234
+
235
+ async def _listen_loop(self) -> None:
236
+ """Background task that reads all frames from the WebSocket."""
237
+ try:
238
+ async for raw in self._ws:
239
+ try:
240
+ msg = json.loads(raw)
241
+ except json.JSONDecodeError:
242
+ continue
243
+ await self._dispatch(msg)
244
+ except websockets.ConnectionClosed as e:
245
+ logger.warning("OpenClaw WebSocket closed: %s", e)
246
+ except asyncio.CancelledError:
247
+ return
248
+ except Exception as e:
249
+ logger.error("OpenClaw listener error: %s", e)
250
+ finally:
251
+ self._connected = False
252
+
253
+ async def _dispatch(self, msg: dict) -> None:
254
+ """Route an incoming frame to the right handler."""
255
+ msg_type = msg.get("type")
256
+
257
+ if msg_type == "res":
258
+ # Response to a request we sent
259
+ req_id = msg.get("id")
260
+ fut = self._pending.pop(req_id, None)
261
+ if fut and not fut.done():
262
+ fut.set_result(msg)
263
+
264
+ elif msg_type == "event":
265
+ event_name = msg.get("event", "")
266
+ payload = msg.get("payload", {})
267
+
268
+ # Route agent / chat events to the correct run queue
269
+ run_id = payload.get("runId")
270
+ if run_id and run_id in self._run_events:
271
+ await self._run_events[run_id].put(msg)
272
+
273
+ # Ignore noisy events silently
274
+ if event_name in ("health", "tick"):
275
+ return
276
+
277
+ logger.debug("Event: %s (runId=%s)", event_name, run_id)
278
+
279
+ # ------------------------------------------------------------------
280
+ # Request helpers
281
+ # ------------------------------------------------------------------
282
+
283
+ async def _send_request(
284
+ self, method: str, params: dict, timeout: Optional[float] = None
285
+ ) -> dict:
286
+ """Send a request and wait for the response.
287
+
288
+ Args:
289
+ method: The RPC method name
290
+ params: The params dict
291
+ timeout: Override timeout (defaults to self.timeout)
292
+
293
+ Returns:
294
+ The full response message dict
295
+ """
296
+ if not self._ws or not self._connected:
297
+ return {"ok": False, "error": {"code": "NOT_CONNECTED", "message": "Not connected"}}
298
+
299
+ req_id = str(uuid.uuid4())
300
+ req = {"type": "req", "id": req_id, "method": method, "params": params}
301
+
302
+ fut: asyncio.Future = asyncio.get_event_loop().create_future()
303
+ self._pending[req_id] = fut
304
+
305
+ try:
306
+ await self._ws.send(json.dumps(req))
307
+ result = await asyncio.wait_for(fut, timeout=timeout or self.timeout)
308
+ return result
309
+ except asyncio.TimeoutError:
310
+ self._pending.pop(req_id, None)
311
+ return {"ok": False, "error": {"code": "TIMEOUT", "message": "Request timed out"}}
312
+ except Exception as e:
313
+ self._pending.pop(req_id, None)
314
+ return {"ok": False, "error": {"code": "ERROR", "message": str(e)}}
315
+
316
+ def _full_session_key(self) -> str:
317
+ """Build the full session key: agent:<agentId>:<sessionKey>."""
318
+ return f"agent:{self.agent_id}:{self.session_key}"
319
+
320
+ # ------------------------------------------------------------------
321
+ # Chat API
322
+ # ------------------------------------------------------------------
323
+
324
  async def chat(
325
+ self,
326
+ message: str,
327
  image_b64: Optional[str] = None,
328
  system_context: Optional[str] = None,
329
  ) -> OpenClawResponse:
330
  """Send a message to OpenClaw and get a response.
331
+
332
  OpenClaw maintains conversation memory on its end, so it will be aware
333
  of conversations from other channels (WhatsApp, web, etc.). We only send
334
  the current message and let OpenClaw handle the context.
335
+
336
  Args:
337
  message: The user's message (transcribed speech)
338
+ image_b64: Optional base64-encoded image from robot camera (not yet
339
+ supported over WebSocket chat.send – reserved for future)
340
+ system_context: Optional additional system context (prepended to message)
341
+
342
  Returns:
343
  OpenClawResponse with the AI's response
344
  """
345
+ if not self._connected:
346
+ return OpenClawResponse(content="", error="Not connected to OpenClaw")
347
+
348
+ # Prefix system context if provided
349
+ final_message = message
 
 
 
 
 
 
 
 
 
350
  if system_context:
351
+ final_message = f"[System: {system_context}]\n\n{message}"
352
+
353
+ # If image provided, mention it (WebSocket protocol uses string messages;
354
+ # image passing would require a separate mechanism)
355
+ if image_b64:
356
+ final_message = f"[Image attached]\n{final_message}"
357
+
358
+ idempotency_key = str(uuid.uuid4())
359
+ session_key = self._full_session_key()
360
+
361
+ # Create a queue to collect events for this run
362
+ # We'll get the runId from the response
363
+ params = {
364
+ "idempotencyKey": idempotency_key,
365
+ "sessionKey": session_key,
366
+ "message": final_message,
367
+ }
368
+
369
  try:
370
+ # Send the request
371
+ resp = await self._send_request("chat.send", params, timeout=30)
372
+
373
+ if not resp.get("ok"):
374
+ err = resp.get("error", {})
375
+ error_msg = f"{err.get('code', 'UNKNOWN')}: {err.get('message', 'Unknown error')}"
376
+ logger.error("chat.send failed: %s", error_msg)
377
+ return OpenClawResponse(content="", error=error_msg)
378
+
379
+ run_id = resp.get("payload", {}).get("runId")
380
+ if not run_id:
381
+ return OpenClawResponse(content="", error="No runId in response")
382
+
383
+ # Register a queue to receive events for this run
384
+ event_queue: asyncio.Queue = asyncio.Queue()
385
+ self._run_events[run_id] = event_queue
386
+
387
+ try:
388
+ # Collect the streamed response
389
+ full_text = ""
390
+ while True:
391
+ try:
392
+ event = await asyncio.wait_for(
393
+ event_queue.get(), timeout=self.timeout
394
+ )
395
+ payload = event.get("payload", {})
396
+ event_name = event.get("event", "")
397
+
398
+ if event_name == "agent":
399
+ stream = payload.get("stream")
400
+ data = payload.get("data", {})
401
+
402
+ if stream == "assistant":
403
+ # Accumulate the full text
404
+ full_text = data.get("text", full_text)
405
+
406
+ elif stream == "lifecycle" and data.get("phase") == "end":
407
+ # Run completed
408
+ break
409
+
410
+ elif event_name == "chat":
411
+ state = payload.get("state")
412
+ if state == "final":
413
+ # Extract final text
414
+ msg_payload = payload.get("message", {})
415
+ content_parts = msg_payload.get("content", [])
416
+ if isinstance(content_parts, list):
417
+ for part in content_parts:
418
+ if isinstance(part, dict) and part.get("type") == "text":
419
+ full_text = part.get("text", full_text)
420
+ elif isinstance(content_parts, str):
421
+ full_text = content_parts
422
+ break
423
+
424
+ except asyncio.TimeoutError:
425
+ logger.warning("Timeout waiting for chat response (runId=%s)", run_id)
426
+ if full_text:
427
+ break
428
+ return OpenClawResponse(content="", error="Response timeout")
429
+
430
+ return OpenClawResponse(content=full_text)
431
+
432
+ finally:
433
+ self._run_events.pop(run_id, None)
434
+
435
  except Exception as e:
436
  logger.error("OpenClaw chat error: %s", e)
437
  return OpenClawResponse(content="", error=str(e))
438
+
439
  async def stream_chat(
440
+ self,
441
+ message: str,
442
  image_b64: Optional[str] = None,
443
  ) -> AsyncIterator[str]:
444
  """Stream a response from OpenClaw.
445
+
 
 
 
446
  Args:
447
  message: The user's message
448
  image_b64: Optional base64-encoded image
449
+
450
  Yields:
451
  String chunks of the response as they arrive
452
  """
453
+ if not self._connected:
454
+ yield "[Error: Not connected to OpenClaw]"
455
+ return
456
+
457
+ final_message = message
458
  if image_b64:
459
+ final_message = f"[Image attached]\n{message}"
460
+
461
+ params = {
462
+ "idempotencyKey": str(uuid.uuid4()),
463
+ "sessionKey": self._full_session_key(),
464
+ "message": final_message,
465
+ }
466
+
467
+ try:
468
+ resp = await self._send_request("chat.send", params, timeout=30)
469
+
470
+ if not resp.get("ok"):
471
+ err = resp.get("error", {})
472
+ yield f"[Error: {err.get('message', 'Unknown error')}]"
473
+ return
474
+
475
+ run_id = resp.get("payload", {}).get("runId")
476
+ if not run_id:
477
+ yield "[Error: No runId]"
478
+ return
479
+
480
+ event_queue: asyncio.Queue = asyncio.Queue()
481
+ self._run_events[run_id] = event_queue
482
+
483
  try:
484
+ prev_text = ""
485
+ while True:
486
+ try:
487
+ event = await asyncio.wait_for(
488
+ event_queue.get(), timeout=self.timeout
489
+ )
490
+ payload = event.get("payload", {})
491
+ event_name = event.get("event", "")
492
+
493
+ if event_name == "agent":
494
+ stream = payload.get("stream")
495
+ data = payload.get("data", {})
496
+
497
+ if stream == "assistant":
498
+ delta = data.get("delta", "")
499
+ if delta:
500
+ yield delta
501
+
502
+ elif stream == "lifecycle" and data.get("phase") == "end":
503
+ break
504
+
505
+ elif event_name == "chat" and payload.get("state") == "final":
506
  break
507
+
508
+ except asyncio.TimeoutError:
509
+ yield "[Error: timeout]"
510
+ break
511
+ finally:
512
+ self._run_events.pop(run_id, None)
513
+
514
+ except Exception as e:
515
+ logger.error("OpenClaw streaming error: %s", e)
516
+ yield f"[Error: {e}]"
517
+
 
 
 
 
 
 
 
 
518
  @property
519
  def is_connected(self) -> bool:
520
  """Check if bridge is connected to gateway."""
521
  return self._connected
522
+
523
  async def get_agent_context(self) -> Optional[str]:
524
  """Fetch the agent's current context, personality, and memory summary.
525
+
526
  This asks OpenClaw to provide a summary of:
527
  - The agent's personality and identity
528
  - Recent conversation context
529
  - Important memories about the user
530
+ - Current state
531
+
532
  Returns:
533
  A context string to use as system instructions, or None if failed
534
  """
535
  try:
536
+ response = await self.chat(
537
+ message="Provide your current context summary for the robot body.",
538
+ system_context=(
539
+ "You are being asked to provide your current context for your robot body. "
540
+ "Output a comprehensive context summary that another AI can use to embody you. Include: "
541
+ "1. YOUR IDENTITY: Who you are, your name, your personality traits, how you speak. "
542
+ "2. USER CONTEXT: What you know about the user (name, preferences, relationship). "
543
+ "3. RECENT CONTEXT: Summary of recent conversations or important ongoing topics. "
544
+ "4. MEMORIES: Key things you remember that are relevant to interactions. "
545
+ "5. CURRENT STATE: Any relevant time/date awareness, ongoing tasks. "
546
+ "Be specific and personal. This context will be used by your robot body to speak and act AS YOU. "
547
+ "Output ONLY the context summary, no preamble."
548
+ ),
549
+ )
550
+
551
+ if response.error:
552
+ logger.warning("Failed to get agent context: %s", response.error)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
553
  return None
554
+
555
+ if response.content:
556
+ logger.info(
557
+ "Retrieved agent context from OpenClaw (%d chars)",
558
+ len(response.content),
559
+ )
560
+ return response.content
561
+
562
+ logger.warning("No context returned from OpenClaw")
563
+ return None
564
+
565
  except Exception as e:
566
  logger.error("Failed to get agent context: %s", e)
567
  return None
568
+
569
+ async def sync_conversation(
570
+ self, user_message: str, assistant_response: str
571
+ ) -> None:
572
  """Sync a conversation turn back to OpenClaw for memory continuity.
573
+
 
 
574
  Args:
575
  user_message: What the user said
576
  assistant_response: What the robot/AI responded
577
  """
578
  try:
579
+ await self.chat(
580
+ message=(
581
+ f"[ROBOT BODY SYNC] The following happened through the Reachy Mini robot:\n"
582
+ f"User said: {user_message}\n"
583
+ f"You responded: {assistant_response}\n"
584
+ f"Remember this as part of your ongoing conversation."
585
+ ),
586
+ system_context=(
587
+ "[ROBOT BODY SYNC] The following conversation happened through your "
588
+ "Reachy Mini robot body. Remember it as part of your ongoing conversation "
589
+ "with the user."
590
+ ),
591
+ )
592
+ logger.debug("Synced conversation to OpenClaw")
 
 
 
 
 
 
593
  except Exception as e:
594
  logger.debug("Failed to sync conversation: %s", e)
595