cduss's picture
init
f0bd6e2
raw
history blame
17.9 kB
"""
Reachy Mini Simple Control Panel - Gradio App
A simple web-based control panel for Reachy Mini robot using HTTP API.
Connects to localhost:8000 by default.
"""
import gradio as gr
import requests
import json
import time
from typing import Optional, Dict, Any
class ReachyMiniClient:
"""HTTP client for Reachy Mini robot."""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.timeout = 5
self.connected = False
def test_connection(self) -> bool:
"""Test if the robot is reachable."""
try:
response = self.session.get(f"{self.base_url}/api/state/full", timeout=2)
return response.status_code == 200
except Exception:
return False
def get_full_state(self) -> Optional[Dict[str, Any]]:
"""Get the full robot state."""
try:
response = self.session.get(
f"{self.base_url}/api/state/full",
params={
"with_control_mode": True,
"with_head_pose": True,
"with_body_yaw": True,
"with_antenna_positions": True,
}
)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
print(f"Error getting state: {e}")
return None
def set_motor_mode(self, mode: str) -> bool:
"""Set motor control mode (enabled, disabled, gravity_compensation)."""
try:
response = self.session.post(f"{self.base_url}/api/motors/set_mode/{mode}")
return response.status_code == 200
except Exception as e:
print(f"Error setting motor mode: {e}")
return False
def wake_up(self) -> tuple[bool, str]:
"""Send wake up command."""
try:
# First check motor status
motor_status = self.session.get(f"{self.base_url}/api/motors/status")
if motor_status.status_code == 200:
mode = motor_status.json().get("mode", "unknown")
if mode == "disabled":
return False, "Cannot wake up: motors are disabled. Enable motors first!"
response = self.session.post(f"{self.base_url}/api/move/play/wake_up")
if response.status_code == 200:
data = response.json()
move_uuid = data.get("uuid", "unknown")
return True, f"Wake up animation started (ID: {move_uuid[:8]}...)"
else:
return False, f"Failed: HTTP {response.status_code} - {response.text}"
except Exception as e:
print(f"Error waking up: {e}")
return False, f"Error: {str(e)}"
def goto_sleep(self) -> tuple[bool, str]:
"""Send go to sleep command."""
try:
# First check motor status
motor_status = self.session.get(f"{self.base_url}/api/motors/status")
if motor_status.status_code == 200:
mode = motor_status.json().get("mode", "unknown")
if mode == "disabled":
return False, "Cannot sleep: motors are disabled. Enable motors first!"
response = self.session.post(f"{self.base_url}/api/move/play/goto_sleep")
if response.status_code == 200:
data = response.json()
move_uuid = data.get("uuid", "unknown")
return True, f"Sleep animation started (ID: {move_uuid[:8]}...)"
else:
return False, f"Failed: HTTP {response.status_code} - {response.text}"
except Exception as e:
print(f"Error going to sleep: {e}")
return False, f"Error: {str(e)}"
def get_running_moves(self) -> Optional[list]:
"""Get list of running moves."""
try:
response = self.session.get(f"{self.base_url}/api/move/running")
if response.status_code == 200:
return response.json()
return None
except Exception as e:
print(f"Error getting running moves: {e}")
return None
def get_daemon_status(self) -> Optional[Dict[str, Any]]:
"""Get daemon status."""
try:
response = self.session.get(f"{self.base_url}/api/daemon/status")
if response.status_code == 200:
return response.json()
return None
except Exception as e:
print(f"Error getting daemon status: {e}")
return None
def set_target(self, head_pose: Optional[Dict[str, float]] = None,
body_yaw: Optional[float] = None,
antennas: Optional[tuple] = None) -> bool:
"""Set target pose for the robot."""
try:
payload = {}
if head_pose is not None:
payload["target_head_pose"] = head_pose
if body_yaw is not None:
payload["target_body_yaw"] = body_yaw
if antennas is not None:
payload["target_antennas"] = list(antennas)
response = self.session.post(
f"{self.base_url}/api/move/set_target",
json=payload
)
return response.status_code == 200
except Exception as e:
print(f"Error setting target: {e}")
return False
# Global client instance
client: Optional[ReachyMiniClient] = None
user_is_controlling = False # Flag to prevent polling from updating sliders during user control
def connect_to_robot():
"""Connect to the robot (localhost:8000 by default)."""
global client
robot_url = "http://localhost:8000"
try:
client = ReachyMiniClient(robot_url)
if client.test_connection():
# Check daemon status
daemon_status = client.get_daemon_status()
daemon_state = "unknown"
if daemon_status:
daemon_state = daemon_status.get("state", "unknown")
# Get initial state
state = client.get_full_state()
if state:
client.connected = True
head_pose = state.get("head_pose", {})
body_yaw = state.get("body_yaw", 0.0)
antennas = state.get("antennas_position", [0.0, 0.0])
control_mode = state.get("control_mode", "unknown")
status_msg = f"Connected | Daemon: {daemon_state}"
if daemon_state != "running":
status_msg += " ⚠️ DAEMON NOT RUNNING"
return (
status_msg,
f"Motor Mode: {control_mode}",
head_pose.get("x", 0.0),
head_pose.get("y", 0.0),
head_pose.get("z", 0.0),
head_pose.get("roll", 0.0),
head_pose.get("pitch", 0.0),
head_pose.get("yaw", 0.0),
body_yaw,
antennas[0] if len(antennas) > 0 else 0.0,
antennas[1] if len(antennas) > 1 else 0.0,
)
client = None
return ("Connection failed - robot not reachable", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0)
except Exception as e:
client = None
return (f"Connection error: {str(e)}", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0)
def poll_robot_state():
"""Poll the robot state at 5Hz (called by Gradio timer)."""
global client, user_is_controlling
if client is None or not client.connected:
return ("Disconnected", "N/A", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update())
try:
state = client.get_full_state()
if state:
control_mode = state.get("control_mode", "unknown")
# If user is actively controlling, don't update sliders (prevents laggy feedback)
if user_is_controlling:
return (
f"Connected to {client.base_url}",
f"Motor Mode: {control_mode}",
gr.update(), # Don't update sliders during user control
gr.update(),
gr.update(),
gr.update(),
gr.update(),
gr.update(),
gr.update(),
gr.update(),
gr.update(),
)
# Update sliders with robot state when user is not controlling
head_pose = state.get("head_pose", {})
body_yaw = state.get("body_yaw", 0.0)
antennas = state.get("antennas_position", [0.0, 0.0])
return (
f"Connected to {client.base_url}",
f"Motor Mode: {control_mode}",
head_pose.get("x", 0.0),
head_pose.get("y", 0.0),
head_pose.get("z", 0.0),
head_pose.get("roll", 0.0),
head_pose.get("pitch", 0.0),
head_pose.get("yaw", 0.0),
body_yaw,
antennas[0] if len(antennas) > 0 else 0.0,
antennas[1] if len(antennas) > 1 else 0.0,
)
else:
client.connected = False
return ("Connection lost", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0)
except Exception as e:
client.connected = False
return (f"Polling error: {str(e)}", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0)
def enable_motors():
"""Enable motors (torque on)."""
global client
if client and client.connected:
success = client.set_motor_mode("enabled")
return "Motors enabled" if success else "Failed to enable motors"
return "Not connected to robot"
def disable_motors():
"""Disable motors (torque off)."""
global client
if client and client.connected:
success = client.set_motor_mode("disabled")
return "Motors disabled" if success else "Failed to disable motors"
return "Not connected to robot"
def send_wake_up():
"""Send wake up command."""
global client
if client and client.connected:
# First, stop any running moves
running = client.get_running_moves()
if running and len(running) > 0:
return f"Wait: {len(running)} move(s) still running. Try again in a moment."
success, message = client.wake_up()
return message
return "Not connected to robot"
def send_goto_sleep():
"""Send go to sleep command."""
global client
if client and client.connected:
# First, stop any running moves
running = client.get_running_moves()
if running and len(running) > 0:
return f"Wait: {len(running)} move(s) still running. Try again in a moment."
success, message = client.goto_sleep()
return message
return "Not connected to robot"
def test_basic_movement():
"""Test if basic movements work at all."""
global client
if client and client.connected:
try:
# Try a simple goto movement - move head slightly forward
response = client.session.post(
f"{client.base_url}/api/move/goto",
json={
"head_pose": {
"x": 0.02, # 2cm forward
"y": 0.0,
"z": 0.0,
"roll": 0.0,
"pitch": 0.0,
"yaw": 0.0
},
"duration": 1.0,
"interpolation": "minjerk"
}
)
if response.status_code == 200:
data = response.json()
return f"Test movement started: {data.get('uuid', 'unknown')[:8]}..."
else:
return f"Test failed: {response.status_code} - {response.text}"
except Exception as e:
return f"Test error: {str(e)}"
return "Not connected to robot"
def update_head_pose(x, y, z, roll, pitch, yaw, body_yaw, ant_left, ant_right):
"""Update robot target pose (called when sliders change)."""
global client, user_is_controlling
# Set flag to prevent polling from overwriting slider values
user_is_controlling = True
if client and client.connected:
head_pose = {
"x": x,
"y": y,
"z": z,
"roll": roll,
"pitch": pitch,
"yaw": yaw,
}
client.set_target(
head_pose=head_pose,
body_yaw=body_yaw,
antennas=(ant_left, ant_right)
)
def stop_controlling():
"""Reset the controlling flag after user stops interacting."""
global user_is_controlling
user_is_controlling = False
return None
# Create Gradio UI
def create_app():
"""Create the Gradio application."""
with gr.Blocks(title="Reachy Mini Simple Control Panel") as app:
gr.Markdown("# Reachy Mini Simple Control Panel")
gr.Markdown("Control your Reachy Mini robot via HTTP API")
# Connection Section
with gr.Group():
gr.Markdown("### Connection")
connect_btn = gr.Button("Connect to Robot", variant="primary")
status_text = gr.Textbox(label="Connection Status", value="Disconnected", interactive=False)
motor_status = gr.Textbox(label="Motor Status", value="N/A", interactive=False)
# Motor Control Section
with gr.Group():
gr.Markdown("### Motor Control")
with gr.Row():
enable_btn = gr.Button("Torque On", variant="primary")
disable_btn = gr.Button("Torque Off", variant="stop")
log_output = gr.Textbox(label="Log", lines=2, interactive=False)
# Movement Commands Section
with gr.Group():
gr.Markdown("### Movement Commands")
with gr.Row():
wakeup_btn = gr.Button("Wake Up", variant="primary")
sleep_btn = gr.Button("Go to Sleep")
test_btn = gr.Button("Test Movement", variant="secondary")
# Task Space Control Section
with gr.Group():
gr.Markdown("### Task Space Control - Head Pose")
head_x = gr.Slider(
minimum=-0.1, maximum=0.1, value=0, step=0.001,
label="X (forward/backward) [m]"
)
head_y = gr.Slider(
minimum=-0.1, maximum=0.1, value=0, step=0.001,
label="Y (left/right) [m]"
)
head_z = gr.Slider(
minimum=-0.1, maximum=0.1, value=0, step=0.001,
label="Z (up/down) [m]"
)
head_roll = gr.Slider(
minimum=-0.5, maximum=0.5, value=0, step=0.001,
label="Roll [rad]"
)
head_pitch = gr.Slider(
minimum=-0.8, maximum=0.8, value=0, step=0.001,
label="Pitch [rad]"
)
head_yaw = gr.Slider(
minimum=-1.0, maximum=1.0, value=0, step=0.001,
label="Yaw [rad]"
)
# Body & Antennas Section
with gr.Group():
gr.Markdown("### Body & Antennas")
body_yaw = gr.Slider(
minimum=-1.5, maximum=1.5, value=0, step=0.001,
label="Body Yaw [rad]"
)
antenna_left = gr.Slider(
minimum=-1.0, maximum=1.0, value=0, step=0.001,
label="Left Antenna [rad]"
)
antenna_right = gr.Slider(
minimum=-1.0, maximum=1.0, value=0, step=0.001,
label="Right Antenna [rad]"
)
# Connect button handler
connect_btn.click(
fn=connect_to_robot,
inputs=[],
outputs=[
status_text, motor_status,
head_x, head_y, head_z, head_roll, head_pitch, head_yaw,
body_yaw, antenna_left, antenna_right
]
)
# Motor control handlers
enable_btn.click(fn=enable_motors, inputs=[], outputs=[log_output])
disable_btn.click(fn=disable_motors, inputs=[], outputs=[log_output])
# Movement command handlers
wakeup_btn.click(fn=send_wake_up, inputs=[], outputs=[log_output])
sleep_btn.click(fn=send_goto_sleep, inputs=[], outputs=[log_output])
test_btn.click(fn=test_basic_movement, inputs=[], outputs=[log_output])
# Slider change handlers - update target when any slider changes
all_sliders = [head_x, head_y, head_z, head_roll, head_pitch, head_yaw,
body_yaw, antenna_left, antenna_right]
# Use .change for real-time control updates
for slider in all_sliders:
slider.change(
fn=update_head_pose,
inputs=all_sliders,
outputs=[]
)
# Reset controlling flag when user releases slider
slider.release(
fn=stop_controlling,
inputs=[],
outputs=[]
)
# Polling timer - updates state at 5Hz (every 200ms)
timer = gr.Timer(value=0.2)
timer.tick(
fn=poll_robot_state,
inputs=[],
outputs=[
status_text, motor_status,
head_x, head_y, head_z, head_roll, head_pitch, head_yaw,
body_yaw, antenna_left, antenna_right
]
)
return app
# Create and launch the app
app = create_app()
if __name__ == "__main__":
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)