|
|
""" |
|
|
Reachy Mini Simple Control Panel - Gradio App |
|
|
|
|
|
A simple web-based control panel for Reachy Mini robot using HTTP API. |
|
|
Connects to localhost:8000 by default. |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import requests |
|
|
import json |
|
|
import time |
|
|
from typing import Optional, Dict, Any |
|
|
|
|
|
|
|
|
class ReachyMiniClient: |
|
|
"""HTTP client for Reachy Mini robot.""" |
|
|
|
|
|
def __init__(self, base_url: str): |
|
|
self.base_url = base_url.rstrip('/') |
|
|
self.session = requests.Session() |
|
|
self.session.timeout = 5 |
|
|
self.connected = False |
|
|
|
|
|
def test_connection(self) -> bool: |
|
|
"""Test if the robot is reachable.""" |
|
|
try: |
|
|
response = self.session.get(f"{self.base_url}/api/state/full", timeout=2) |
|
|
return response.status_code == 200 |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def get_full_state(self) -> Optional[Dict[str, Any]]: |
|
|
"""Get the full robot state.""" |
|
|
try: |
|
|
response = self.session.get( |
|
|
f"{self.base_url}/api/state/full", |
|
|
params={ |
|
|
"with_control_mode": True, |
|
|
"with_head_pose": True, |
|
|
"with_body_yaw": True, |
|
|
"with_antenna_positions": True, |
|
|
} |
|
|
) |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Error getting state: {e}") |
|
|
return None |
|
|
|
|
|
def set_motor_mode(self, mode: str) -> bool: |
|
|
"""Set motor control mode (enabled, disabled, gravity_compensation).""" |
|
|
try: |
|
|
response = self.session.post(f"{self.base_url}/api/motors/set_mode/{mode}") |
|
|
return response.status_code == 200 |
|
|
except Exception as e: |
|
|
print(f"Error setting motor mode: {e}") |
|
|
return False |
|
|
|
|
|
def wake_up(self) -> tuple[bool, str]: |
|
|
"""Send wake up command.""" |
|
|
try: |
|
|
|
|
|
motor_status = self.session.get(f"{self.base_url}/api/motors/status") |
|
|
if motor_status.status_code == 200: |
|
|
mode = motor_status.json().get("mode", "unknown") |
|
|
if mode == "disabled": |
|
|
return False, "Cannot wake up: motors are disabled. Enable motors first!" |
|
|
|
|
|
response = self.session.post(f"{self.base_url}/api/move/play/wake_up") |
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
move_uuid = data.get("uuid", "unknown") |
|
|
return True, f"Wake up animation started (ID: {move_uuid[:8]}...)" |
|
|
else: |
|
|
return False, f"Failed: HTTP {response.status_code} - {response.text}" |
|
|
except Exception as e: |
|
|
print(f"Error waking up: {e}") |
|
|
return False, f"Error: {str(e)}" |
|
|
|
|
|
def goto_sleep(self) -> tuple[bool, str]: |
|
|
"""Send go to sleep command.""" |
|
|
try: |
|
|
|
|
|
motor_status = self.session.get(f"{self.base_url}/api/motors/status") |
|
|
if motor_status.status_code == 200: |
|
|
mode = motor_status.json().get("mode", "unknown") |
|
|
if mode == "disabled": |
|
|
return False, "Cannot sleep: motors are disabled. Enable motors first!" |
|
|
|
|
|
response = self.session.post(f"{self.base_url}/api/move/play/goto_sleep") |
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
move_uuid = data.get("uuid", "unknown") |
|
|
return True, f"Sleep animation started (ID: {move_uuid[:8]}...)" |
|
|
else: |
|
|
return False, f"Failed: HTTP {response.status_code} - {response.text}" |
|
|
except Exception as e: |
|
|
print(f"Error going to sleep: {e}") |
|
|
return False, f"Error: {str(e)}" |
|
|
|
|
|
def get_running_moves(self) -> Optional[list]: |
|
|
"""Get list of running moves.""" |
|
|
try: |
|
|
response = self.session.get(f"{self.base_url}/api/move/running") |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Error getting running moves: {e}") |
|
|
return None |
|
|
|
|
|
def get_daemon_status(self) -> Optional[Dict[str, Any]]: |
|
|
"""Get daemon status.""" |
|
|
try: |
|
|
response = self.session.get(f"{self.base_url}/api/daemon/status") |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Error getting daemon status: {e}") |
|
|
return None |
|
|
|
|
|
def set_target(self, head_pose: Optional[Dict[str, float]] = None, |
|
|
body_yaw: Optional[float] = None, |
|
|
antennas: Optional[tuple] = None) -> bool: |
|
|
"""Set target pose for the robot.""" |
|
|
try: |
|
|
payload = {} |
|
|
if head_pose is not None: |
|
|
payload["target_head_pose"] = head_pose |
|
|
if body_yaw is not None: |
|
|
payload["target_body_yaw"] = body_yaw |
|
|
if antennas is not None: |
|
|
payload["target_antennas"] = list(antennas) |
|
|
|
|
|
response = self.session.post( |
|
|
f"{self.base_url}/api/move/set_target", |
|
|
json=payload |
|
|
) |
|
|
return response.status_code == 200 |
|
|
except Exception as e: |
|
|
print(f"Error setting target: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
client: Optional[ReachyMiniClient] = None |
|
|
user_is_controlling = False |
|
|
|
|
|
|
|
|
def connect_to_robot(): |
|
|
"""Connect to the robot (localhost:8000 by default).""" |
|
|
global client |
|
|
|
|
|
robot_url = "http://localhost:8000" |
|
|
|
|
|
try: |
|
|
client = ReachyMiniClient(robot_url) |
|
|
if client.test_connection(): |
|
|
|
|
|
daemon_status = client.get_daemon_status() |
|
|
daemon_state = "unknown" |
|
|
if daemon_status: |
|
|
daemon_state = daemon_status.get("state", "unknown") |
|
|
|
|
|
|
|
|
state = client.get_full_state() |
|
|
if state: |
|
|
client.connected = True |
|
|
head_pose = state.get("head_pose", {}) |
|
|
body_yaw = state.get("body_yaw", 0.0) |
|
|
antennas = state.get("antennas_position", [0.0, 0.0]) |
|
|
control_mode = state.get("control_mode", "unknown") |
|
|
|
|
|
status_msg = f"Connected | Daemon: {daemon_state}" |
|
|
if daemon_state != "running": |
|
|
status_msg += " ⚠️ DAEMON NOT RUNNING" |
|
|
|
|
|
return ( |
|
|
status_msg, |
|
|
f"Motor Mode: {control_mode}", |
|
|
head_pose.get("x", 0.0), |
|
|
head_pose.get("y", 0.0), |
|
|
head_pose.get("z", 0.0), |
|
|
head_pose.get("roll", 0.0), |
|
|
head_pose.get("pitch", 0.0), |
|
|
head_pose.get("yaw", 0.0), |
|
|
body_yaw, |
|
|
antennas[0] if len(antennas) > 0 else 0.0, |
|
|
antennas[1] if len(antennas) > 1 else 0.0, |
|
|
) |
|
|
|
|
|
client = None |
|
|
return ("Connection failed - robot not reachable", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0) |
|
|
|
|
|
except Exception as e: |
|
|
client = None |
|
|
return (f"Connection error: {str(e)}", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0) |
|
|
|
|
|
|
|
|
def poll_robot_state(): |
|
|
"""Poll the robot state at 5Hz (called by Gradio timer).""" |
|
|
global client, user_is_controlling |
|
|
|
|
|
if client is None or not client.connected: |
|
|
return ("Disconnected", "N/A", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()) |
|
|
|
|
|
try: |
|
|
state = client.get_full_state() |
|
|
if state: |
|
|
control_mode = state.get("control_mode", "unknown") |
|
|
|
|
|
|
|
|
if user_is_controlling: |
|
|
return ( |
|
|
f"Connected to {client.base_url}", |
|
|
f"Motor Mode: {control_mode}", |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
gr.update(), |
|
|
) |
|
|
|
|
|
|
|
|
head_pose = state.get("head_pose", {}) |
|
|
body_yaw = state.get("body_yaw", 0.0) |
|
|
antennas = state.get("antennas_position", [0.0, 0.0]) |
|
|
|
|
|
return ( |
|
|
f"Connected to {client.base_url}", |
|
|
f"Motor Mode: {control_mode}", |
|
|
head_pose.get("x", 0.0), |
|
|
head_pose.get("y", 0.0), |
|
|
head_pose.get("z", 0.0), |
|
|
head_pose.get("roll", 0.0), |
|
|
head_pose.get("pitch", 0.0), |
|
|
head_pose.get("yaw", 0.0), |
|
|
body_yaw, |
|
|
antennas[0] if len(antennas) > 0 else 0.0, |
|
|
antennas[1] if len(antennas) > 1 else 0.0, |
|
|
) |
|
|
else: |
|
|
client.connected = False |
|
|
return ("Connection lost", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0) |
|
|
|
|
|
except Exception as e: |
|
|
client.connected = False |
|
|
return (f"Polling error: {str(e)}", "Disconnected", 0, 0, 0, 0, 0, 0, 0, 0, 0) |
|
|
|
|
|
|
|
|
def enable_motors(): |
|
|
"""Enable motors (torque on).""" |
|
|
global client |
|
|
if client and client.connected: |
|
|
success = client.set_motor_mode("enabled") |
|
|
return "Motors enabled" if success else "Failed to enable motors" |
|
|
return "Not connected to robot" |
|
|
|
|
|
|
|
|
def disable_motors(): |
|
|
"""Disable motors (torque off).""" |
|
|
global client |
|
|
if client and client.connected: |
|
|
success = client.set_motor_mode("disabled") |
|
|
return "Motors disabled" if success else "Failed to disable motors" |
|
|
return "Not connected to robot" |
|
|
|
|
|
|
|
|
def send_wake_up(): |
|
|
"""Send wake up command.""" |
|
|
global client |
|
|
if client and client.connected: |
|
|
|
|
|
running = client.get_running_moves() |
|
|
if running and len(running) > 0: |
|
|
return f"Wait: {len(running)} move(s) still running. Try again in a moment." |
|
|
|
|
|
success, message = client.wake_up() |
|
|
return message |
|
|
return "Not connected to robot" |
|
|
|
|
|
|
|
|
def send_goto_sleep(): |
|
|
"""Send go to sleep command.""" |
|
|
global client |
|
|
if client and client.connected: |
|
|
|
|
|
running = client.get_running_moves() |
|
|
if running and len(running) > 0: |
|
|
return f"Wait: {len(running)} move(s) still running. Try again in a moment." |
|
|
|
|
|
success, message = client.goto_sleep() |
|
|
return message |
|
|
return "Not connected to robot" |
|
|
|
|
|
|
|
|
def test_basic_movement(): |
|
|
"""Test if basic movements work at all.""" |
|
|
global client |
|
|
if client and client.connected: |
|
|
try: |
|
|
|
|
|
response = client.session.post( |
|
|
f"{client.base_url}/api/move/goto", |
|
|
json={ |
|
|
"head_pose": { |
|
|
"x": 0.02, |
|
|
"y": 0.0, |
|
|
"z": 0.0, |
|
|
"roll": 0.0, |
|
|
"pitch": 0.0, |
|
|
"yaw": 0.0 |
|
|
}, |
|
|
"duration": 1.0, |
|
|
"interpolation": "minjerk" |
|
|
} |
|
|
) |
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
return f"Test movement started: {data.get('uuid', 'unknown')[:8]}..." |
|
|
else: |
|
|
return f"Test failed: {response.status_code} - {response.text}" |
|
|
except Exception as e: |
|
|
return f"Test error: {str(e)}" |
|
|
return "Not connected to robot" |
|
|
|
|
|
|
|
|
def update_head_pose(x, y, z, roll, pitch, yaw, body_yaw, ant_left, ant_right): |
|
|
"""Update robot target pose (called when sliders change).""" |
|
|
global client, user_is_controlling |
|
|
|
|
|
|
|
|
user_is_controlling = True |
|
|
|
|
|
if client and client.connected: |
|
|
head_pose = { |
|
|
"x": x, |
|
|
"y": y, |
|
|
"z": z, |
|
|
"roll": roll, |
|
|
"pitch": pitch, |
|
|
"yaw": yaw, |
|
|
} |
|
|
client.set_target( |
|
|
head_pose=head_pose, |
|
|
body_yaw=body_yaw, |
|
|
antennas=(ant_left, ant_right) |
|
|
) |
|
|
|
|
|
|
|
|
def stop_controlling(): |
|
|
"""Reset the controlling flag after user stops interacting.""" |
|
|
global user_is_controlling |
|
|
user_is_controlling = False |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def create_app(): |
|
|
"""Create the Gradio application.""" |
|
|
|
|
|
with gr.Blocks(title="Reachy Mini Simple Control Panel") as app: |
|
|
|
|
|
gr.Markdown("# Reachy Mini Simple Control Panel") |
|
|
gr.Markdown("Control your Reachy Mini robot via HTTP API") |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### Connection") |
|
|
connect_btn = gr.Button("Connect to Robot", variant="primary") |
|
|
status_text = gr.Textbox(label="Connection Status", value="Disconnected", interactive=False) |
|
|
motor_status = gr.Textbox(label="Motor Status", value="N/A", interactive=False) |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### Motor Control") |
|
|
with gr.Row(): |
|
|
enable_btn = gr.Button("Torque On", variant="primary") |
|
|
disable_btn = gr.Button("Torque Off", variant="stop") |
|
|
log_output = gr.Textbox(label="Log", lines=2, interactive=False) |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### Movement Commands") |
|
|
with gr.Row(): |
|
|
wakeup_btn = gr.Button("Wake Up", variant="primary") |
|
|
sleep_btn = gr.Button("Go to Sleep") |
|
|
test_btn = gr.Button("Test Movement", variant="secondary") |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### Task Space Control - Head Pose") |
|
|
|
|
|
head_x = gr.Slider( |
|
|
minimum=-0.1, maximum=0.1, value=0, step=0.001, |
|
|
label="X (forward/backward) [m]" |
|
|
) |
|
|
head_y = gr.Slider( |
|
|
minimum=-0.1, maximum=0.1, value=0, step=0.001, |
|
|
label="Y (left/right) [m]" |
|
|
) |
|
|
head_z = gr.Slider( |
|
|
minimum=-0.1, maximum=0.1, value=0, step=0.001, |
|
|
label="Z (up/down) [m]" |
|
|
) |
|
|
head_roll = gr.Slider( |
|
|
minimum=-0.5, maximum=0.5, value=0, step=0.001, |
|
|
label="Roll [rad]" |
|
|
) |
|
|
head_pitch = gr.Slider( |
|
|
minimum=-0.8, maximum=0.8, value=0, step=0.001, |
|
|
label="Pitch [rad]" |
|
|
) |
|
|
head_yaw = gr.Slider( |
|
|
minimum=-1.0, maximum=1.0, value=0, step=0.001, |
|
|
label="Yaw [rad]" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("### Body & Antennas") |
|
|
|
|
|
body_yaw = gr.Slider( |
|
|
minimum=-1.5, maximum=1.5, value=0, step=0.001, |
|
|
label="Body Yaw [rad]" |
|
|
) |
|
|
antenna_left = gr.Slider( |
|
|
minimum=-1.0, maximum=1.0, value=0, step=0.001, |
|
|
label="Left Antenna [rad]" |
|
|
) |
|
|
antenna_right = gr.Slider( |
|
|
minimum=-1.0, maximum=1.0, value=0, step=0.001, |
|
|
label="Right Antenna [rad]" |
|
|
) |
|
|
|
|
|
|
|
|
connect_btn.click( |
|
|
fn=connect_to_robot, |
|
|
inputs=[], |
|
|
outputs=[ |
|
|
status_text, motor_status, |
|
|
head_x, head_y, head_z, head_roll, head_pitch, head_yaw, |
|
|
body_yaw, antenna_left, antenna_right |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
enable_btn.click(fn=enable_motors, inputs=[], outputs=[log_output]) |
|
|
disable_btn.click(fn=disable_motors, inputs=[], outputs=[log_output]) |
|
|
|
|
|
|
|
|
wakeup_btn.click(fn=send_wake_up, inputs=[], outputs=[log_output]) |
|
|
sleep_btn.click(fn=send_goto_sleep, inputs=[], outputs=[log_output]) |
|
|
test_btn.click(fn=test_basic_movement, inputs=[], outputs=[log_output]) |
|
|
|
|
|
|
|
|
all_sliders = [head_x, head_y, head_z, head_roll, head_pitch, head_yaw, |
|
|
body_yaw, antenna_left, antenna_right] |
|
|
|
|
|
|
|
|
for slider in all_sliders: |
|
|
slider.change( |
|
|
fn=update_head_pose, |
|
|
inputs=all_sliders, |
|
|
outputs=[] |
|
|
) |
|
|
|
|
|
slider.release( |
|
|
fn=stop_controlling, |
|
|
inputs=[], |
|
|
outputs=[] |
|
|
) |
|
|
|
|
|
|
|
|
timer = gr.Timer(value=0.2) |
|
|
timer.tick( |
|
|
fn=poll_robot_state, |
|
|
inputs=[], |
|
|
outputs=[ |
|
|
status_text, motor_status, |
|
|
head_x, head_y, head_z, head_roll, head_pitch, head_yaw, |
|
|
body_yaw, antenna_left, antenna_right |
|
|
] |
|
|
) |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
|
|
|
app = create_app() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False |
|
|
) |
|
|
|