dlouapre's picture
dlouapre HF Staff
Initial
ee6161a
raw
history blame
8.82 kB
"""RMScript App - Web IDE for Reachy Mini scripting."""
import logging
import threading
import time
from typing import Any
import numpy as np
from pydantic import BaseModel
from reachy_mini import ReachyMini, ReachyMiniApp
from rmscript import compile_script
from rmscript.ir import IRAction, IRWaitAction, IRPictureAction, IRPlaySoundAction
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ScriptInput(BaseModel):
source: str
class IRActionInput(BaseModel):
type: str
duration: float = 0.0
head_pose: list[list[float]] | None = None
antennas: list[float] | None = None
body_yaw: float | None = None
sound_name: str | None = None
blocking: bool = False
loop: bool = False
class ExecuteInput(BaseModel):
ir: list[IRActionInput]
class VerifyResponse(BaseModel):
success: bool
errors: list[dict[str, Any]]
warnings: list[dict[str, Any]]
name: str = ""
description: str = ""
class IRActionOutput(BaseModel):
type: str
duration: float = 0.0
head_pose: list[list[float]] | None = None
antennas: list[float] | None = None
body_yaw: float | None = None
sound_name: str | None = None
blocking: bool = False
loop: bool = False
source_line: int = 0
class CompileResponse(BaseModel):
success: bool
errors: list[dict[str, Any]]
warnings: list[dict[str, Any]]
name: str = ""
description: str = ""
ir: list[IRActionOutput]
class ExecuteResponse(BaseModel):
success: bool
message: str = ""
actions_executed: int = 0
class RmscriptApp(ReachyMiniApp):
"""Web IDE for writing and executing rmscript on Reachy Mini."""
custom_app_url: str | None = "http://0.0.0.0:8042"
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None:
# Store reachy_mini for use in API routes
self._reachy_mini = reachy_mini
self._executing = False
# Register API routes on settings_app
self._register_routes()
logger.info("RMScript App started - settings UI at http://localhost:8042")
# Main loop - just wait for stop event
while not stop_event.is_set():
time.sleep(0.1)
logger.info("RMScript App stopped")
def _register_routes(self) -> None:
"""Register FastAPI routes for script compilation and execution."""
@self.settings_app.post("/api/verify", response_model=VerifyResponse)
async def verify_script_route(input: ScriptInput) -> VerifyResponse:
"""Verify rmscript syntax without generating IR."""
logger.info(f"Verifying script ({len(input.source)} chars)")
try:
result = compile_script(input.source)
errors = [
{"line": e.line, "column": e.column, "message": e.message, "severity": e.severity}
for e in result.errors
]
warnings = [
{"line": w.line, "column": w.column, "message": w.message, "severity": w.severity}
for w in result.warnings
]
return VerifyResponse(
success=result.success,
errors=errors,
warnings=warnings,
name=result.name,
description=result.description,
)
except Exception as e:
logger.error(f"Verification error: {e}")
return VerifyResponse(success=False, errors=[{"line": 0, "column": 0, "message": str(e), "severity": "error"}], warnings=[])
@self.settings_app.post("/api/compile", response_model=CompileResponse)
async def compile_script_route(input: ScriptInput) -> CompileResponse:
"""Compile rmscript to intermediate representation."""
logger.info(f"Compiling script ({len(input.source)} chars)")
try:
result = compile_script(input.source)
errors = [
{"line": e.line, "column": e.column, "message": e.message, "severity": e.severity}
for e in result.errors
]
warnings = [
{"line": w.line, "column": w.column, "message": w.message, "severity": w.severity}
for w in result.warnings
]
ir_json = []
for action in result.ir:
if isinstance(action, IRAction):
ir_json.append(IRActionOutput(
type="action",
duration=action.duration,
head_pose=action.head_pose.tolist() if action.head_pose is not None else None,
antennas=list(action.antennas) if action.antennas is not None else None,
body_yaw=action.body_yaw,
source_line=action.source_line,
))
elif isinstance(action, IRWaitAction):
ir_json.append(IRActionOutput(type="wait", duration=action.duration, source_line=action.source_line))
elif isinstance(action, IRPictureAction):
ir_json.append(IRActionOutput(type="picture", source_line=action.source_line))
elif isinstance(action, IRPlaySoundAction):
ir_json.append(IRActionOutput(
type="sound",
duration=action.duration or 0.0,
sound_name=action.sound_name,
blocking=action.blocking,
loop=action.loop,
source_line=action.source_line,
))
logger.info(f"Compiled {len(ir_json)} IR actions")
return CompileResponse(
success=result.success,
errors=errors,
warnings=warnings,
name=result.name,
description=result.description,
ir=ir_json,
)
except Exception as e:
logger.error(f"Compilation error: {e}")
return CompileResponse(success=False, errors=[{"line": 0, "column": 0, "message": str(e), "severity": "error"}], warnings=[], ir=[])
@self.settings_app.post("/api/execute", response_model=ExecuteResponse)
async def execute_script_route(input: ExecuteInput) -> ExecuteResponse:
"""Execute compiled IR on the robot."""
if self._executing:
return ExecuteResponse(success=False, message="Already executing a script")
self._executing = True
actions_executed = 0
try:
logger.info(f"Executing {len(input.ir)} IR actions")
for action in input.ir:
if action.type == "action":
head_pose = np.array(action.head_pose) if action.head_pose else None
antennas = tuple(action.antennas) if action.antennas else None
self._reachy_mini.goto_target(
head=head_pose,
antennas=antennas,
body_yaw=action.body_yaw,
duration=action.duration,
)
# Wait for movement to complete
time.sleep(action.duration + 0.05)
elif action.type == "wait":
time.sleep(action.duration)
elif action.type == "picture":
# Take picture (could return to frontend in future)
self._reachy_mini.media.get_frame()
logger.info("Picture captured")
elif action.type == "sound":
if action.sound_name:
self._reachy_mini.media.play_sound(action.sound_name)
if action.blocking and action.duration > 0:
time.sleep(action.duration)
actions_executed += 1
logger.info(f"Executed {actions_executed} actions")
return ExecuteResponse(success=True, message="Script executed successfully", actions_executed=actions_executed)
except Exception as e:
logger.error(f"Execution error: {e}")
return ExecuteResponse(success=False, message=str(e), actions_executed=actions_executed)
finally:
self._executing = False
if __name__ == "__main__":
app = RmscriptApp()
app.wrapped_run()