Spaces:
Running
Running
File size: 9,377 Bytes
ee6161a ec1a56c ee6161a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
"""RMScript App - Web IDE for Reachy Mini scripting."""
import logging
import threading
import time
from typing import Any
import numpy as np
from pydantic import BaseModel
from reachy_mini import ReachyMini, ReachyMiniApp
from rmscript import compile_script
from rmscript.ir import IRAction, IRWaitAction, IRPictureAction, IRPlaySoundAction
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ScriptInput(BaseModel):
source: str
class IRActionInput(BaseModel):
type: str
duration: float = 0.0
head_pose: list[list[float]] | None = None
antennas: list[float] | None = None
body_yaw: float | None = None
sound_name: str | None = None
blocking: bool = False
loop: bool = False
class ExecuteInput(BaseModel):
ir: list[IRActionInput]
class VerifyResponse(BaseModel):
success: bool
errors: list[dict[str, Any]]
warnings: list[dict[str, Any]]
name: str = ""
description: str = ""
class IRActionOutput(BaseModel):
type: str
duration: float = 0.0
head_pose: list[list[float]] | None = None
antennas: list[float] | None = None
body_yaw: float | None = None
sound_name: str | None = None
blocking: bool = False
loop: bool = False
source_line: int = 0
class CompileResponse(BaseModel):
success: bool
errors: list[dict[str, Any]]
warnings: list[dict[str, Any]]
name: str = ""
description: str = ""
ir: list[IRActionOutput]
class ExecuteResponse(BaseModel):
success: bool
message: str = ""
actions_executed: int = 0
class RmscriptApp(ReachyMiniApp):
"""Web IDE for writing and executing rmscript on Reachy Mini."""
custom_app_url: str | None = "http://0.0.0.0:8042"
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None:
# Store reachy_mini for use in API routes
self._reachy_mini = reachy_mini
self._executing = False
# Register API routes on settings_app
self._register_routes()
logger.info("RMScript App started - settings UI at http://localhost:8042")
# Main loop - just wait for stop event
while not stop_event.is_set():
time.sleep(0.1)
logger.info("RMScript App stopped")
def _register_routes(self) -> None:
"""Register FastAPI routes for script compilation and execution."""
print("=" * 50)
print("REGISTERING ROUTES - DEBUG VERSION")
print("=" * 50)
@self.settings_app.get("/api/debug")
async def debug_route():
"""Debug endpoint to check rmscript installation."""
import rmscript
from rmscript.lexer import TokenType
return {
"rmscript_location": rmscript.__file__,
"has_STRING": "STRING" in dir(TokenType),
"TokenType_members": [t for t in dir(TokenType) if not t.startswith("_")]
}
@self.settings_app.post("/api/verify", response_model=VerifyResponse)
async def verify_script_route(input: ScriptInput) -> VerifyResponse:
"""Verify rmscript syntax without generating IR."""
logger.info(f"Verifying script ({len(input.source)} chars)")
try:
result = compile_script(input.source)
errors = [
{"line": e.line, "column": e.column, "message": e.message, "severity": e.severity}
for e in result.errors
]
warnings = [
{"line": w.line, "column": w.column, "message": w.message, "severity": w.severity}
for w in result.warnings
]
return VerifyResponse(
success=result.success,
errors=errors,
warnings=warnings,
name=result.name,
description=result.description,
)
except Exception as e:
logger.error(f"Verification error: {e}")
return VerifyResponse(success=False, errors=[{"line": 0, "column": 0, "message": str(e), "severity": "error"}], warnings=[])
@self.settings_app.post("/api/compile", response_model=CompileResponse)
async def compile_script_route(input: ScriptInput) -> CompileResponse:
"""Compile rmscript to intermediate representation."""
logger.info(f"Compiling script ({len(input.source)} chars)")
try:
result = compile_script(input.source)
errors = [
{"line": e.line, "column": e.column, "message": e.message, "severity": e.severity}
for e in result.errors
]
warnings = [
{"line": w.line, "column": w.column, "message": w.message, "severity": w.severity}
for w in result.warnings
]
ir_json = []
for action in result.ir:
if isinstance(action, IRAction):
ir_json.append(IRActionOutput(
type="action",
duration=action.duration,
head_pose=action.head_pose.tolist() if action.head_pose is not None else None,
antennas=list(action.antennas) if action.antennas is not None else None,
body_yaw=action.body_yaw,
source_line=action.source_line,
))
elif isinstance(action, IRWaitAction):
ir_json.append(IRActionOutput(type="wait", duration=action.duration, source_line=action.source_line))
elif isinstance(action, IRPictureAction):
ir_json.append(IRActionOutput(type="picture", source_line=action.source_line))
elif isinstance(action, IRPlaySoundAction):
ir_json.append(IRActionOutput(
type="sound",
duration=action.duration or 0.0,
sound_name=action.sound_name,
blocking=action.blocking,
loop=action.loop,
source_line=action.source_line,
))
logger.info(f"Compiled {len(ir_json)} IR actions")
return CompileResponse(
success=result.success,
errors=errors,
warnings=warnings,
name=result.name,
description=result.description,
ir=ir_json,
)
except Exception as e:
logger.error(f"Compilation error: {e}")
return CompileResponse(success=False, errors=[{"line": 0, "column": 0, "message": str(e), "severity": "error"}], warnings=[], ir=[])
@self.settings_app.post("/api/execute", response_model=ExecuteResponse)
async def execute_script_route(input: ExecuteInput) -> ExecuteResponse:
"""Execute compiled IR on the robot."""
if self._executing:
return ExecuteResponse(success=False, message="Already executing a script")
self._executing = True
actions_executed = 0
try:
logger.info(f"Executing {len(input.ir)} IR actions")
for action in input.ir:
if action.type == "action":
head_pose = np.array(action.head_pose) if action.head_pose else None
antennas = tuple(action.antennas) if action.antennas else None
self._reachy_mini.goto_target(
head=head_pose,
antennas=antennas,
body_yaw=action.body_yaw,
duration=action.duration,
)
# Wait for movement to complete
time.sleep(action.duration + 0.05)
elif action.type == "wait":
time.sleep(action.duration)
elif action.type == "picture":
# Take picture (could return to frontend in future)
self._reachy_mini.media.get_frame()
logger.info("Picture captured")
elif action.type == "sound":
if action.sound_name:
self._reachy_mini.media.play_sound(action.sound_name)
if action.blocking and action.duration > 0:
time.sleep(action.duration)
actions_executed += 1
logger.info(f"Executed {actions_executed} actions")
return ExecuteResponse(success=True, message="Script executed successfully", actions_executed=actions_executed)
except Exception as e:
logger.error(f"Execution error: {e}")
return ExecuteResponse(success=False, message=str(e), actions_executed=actions_executed)
finally:
self._executing = False
if __name__ == "__main__":
app = RmscriptApp()
app.wrapped_run()
|