Spaces:
Build error
Build error
| """Entrypoint for the Reachy Mini conversation app.""" | |
| import os | |
| import sys | |
| import time | |
| import asyncio | |
| import argparse | |
| import threading | |
| from typing import Any, Dict, List, Optional | |
| import gradio as gr | |
| from fastapi import FastAPI | |
| from fastrtc import Stream | |
| from gradio.utils import get_space | |
| from reachy_mini import ReachyMini, ReachyMiniApp | |
| from reachy_mini_conversation_app.utils import ( | |
| parse_args, | |
| setup_logger, | |
| handle_vision_stuff, | |
| log_connection_troubleshooting, | |
| ) | |
| def update_chatbot(chatbot: List[Dict[str, Any]], response: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Update the chatbot with AdditionalOutputs.""" | |
| chatbot.append(response) | |
| return chatbot | |
| def main() -> None: | |
| """Entrypoint for the Reachy Mini conversation app.""" | |
| args, _ = parse_args() | |
| run(args) | |
| def run( | |
| args: argparse.Namespace, | |
| robot: ReachyMini = None, | |
| app_stop_event: Optional[threading.Event] = None, | |
| settings_app: Optional[FastAPI] = None, | |
| instance_path: Optional[str] = None, | |
| ) -> None: | |
| """Run the Reachy Mini conversation app.""" | |
| # Putting these dependencies here makes the dashboard faster to load when the conversation app is installed | |
| from reachy_mini_conversation_app.moves import MovementManager | |
| from reachy_mini_conversation_app.console import LocalStream | |
| from reachy_mini_conversation_app.ollama_handler import OllamaHandler | |
| from reachy_mini_conversation_app.tools.core_tools import ToolDependencies | |
| from reachy_mini_conversation_app.audio.head_wobbler import HeadWobbler | |
| logger = setup_logger(args.debug) | |
| logger.info("Starting Reachy Mini Conversation App") | |
| if args.no_camera and args.head_tracker is not None: | |
| logger.warning( | |
| "Head tracking disabled: --no-camera flag is set. " | |
| "Remove --no-camera to enable head tracking." | |
| ) | |
| if robot is None: | |
| try: | |
| robot_kwargs = {} | |
| if args.robot_name is not None: | |
| robot_kwargs["robot_name"] = args.robot_name | |
| logger.info("Initializing ReachyMini (SDK will auto-detect appropriate backend)") | |
| robot = ReachyMini(**robot_kwargs) | |
| except TimeoutError as e: | |
| logger.error( | |
| "Connection timeout: Failed to connect to Reachy Mini daemon. " | |
| f"Details: {e}" | |
| ) | |
| log_connection_troubleshooting(logger, args.robot_name) | |
| sys.exit(1) | |
| except ConnectionError as e: | |
| logger.error( | |
| "Connection failed: Unable to establish connection to Reachy Mini. " | |
| f"Details: {e}" | |
| ) | |
| log_connection_troubleshooting(logger, args.robot_name) | |
| sys.exit(1) | |
| except Exception as e: | |
| logger.error( | |
| f"Unexpected error during robot initialization: {type(e).__name__}: {e}" | |
| ) | |
| logger.error("Please check your configuration and try again.") | |
| sys.exit(1) | |
| # Auto-enable Gradio in simulation mode (both MuJoCo for deamon and mockup-sim for desktop app) | |
| status = robot.client.get_status() | |
| is_simulation = status.get("simulation_enabled", False) or status.get("mockup_sim_enabled", False) | |
| if is_simulation and not args.gradio: | |
| logger.info("Simulation mode detected. Automatically enabling gradio flag.") | |
| args.gradio = True | |
| camera_worker, _, vision_manager = handle_vision_stuff(args, robot) | |
| movement_manager = MovementManager( | |
| current_robot=robot, | |
| camera_worker=camera_worker, | |
| ) | |
| head_wobbler = HeadWobbler(set_speech_offsets=movement_manager.set_speech_offsets) | |
| deps = ToolDependencies( | |
| reachy_mini=robot, | |
| movement_manager=movement_manager, | |
| camera_worker=camera_worker, | |
| vision_manager=vision_manager, | |
| head_wobbler=head_wobbler, | |
| ) | |
| current_file_path = os.path.dirname(os.path.abspath(__file__)) | |
| logger.debug(f"Current file absolute path: {current_file_path}") | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| resizable=True, | |
| avatar_images=( | |
| os.path.join(current_file_path, "images", "user_avatar.png"), | |
| os.path.join(current_file_path, "images", "reachymini_avatar.png"), | |
| ), | |
| ) | |
| logger.debug(f"Chatbot avatar images: {chatbot.avatar_images}") | |
| handler = OllamaHandler(deps, gradio_mode=args.gradio, instance_path=instance_path) | |
| stream_manager: gr.Blocks | LocalStream | None = None | |
| if args.gradio: | |
| from reachy_mini_conversation_app.gradio_personality import PersonalityUI | |
| personality_ui = PersonalityUI() | |
| personality_ui.create_components() | |
| stream = Stream( | |
| handler=handler, | |
| mode="send-receive", | |
| modality="audio", | |
| additional_inputs=[ | |
| chatbot, | |
| *personality_ui.additional_inputs_ordered(), | |
| ], | |
| additional_outputs=[chatbot], | |
| additional_outputs_handler=update_chatbot, | |
| ui_args={"title": "Talk with Reachy Mini"}, | |
| ) | |
| stream_manager = stream.ui | |
| if not settings_app: | |
| app = FastAPI() | |
| else: | |
| app = settings_app | |
| personality_ui.wire_events(handler, stream_manager) | |
| app = gr.mount_gradio_app(app, stream.ui, path="/") | |
| else: | |
| # In headless mode, wire settings_app + instance_path to console LocalStream | |
| stream_manager = LocalStream( | |
| handler, | |
| robot, | |
| settings_app=settings_app, | |
| instance_path=instance_path, | |
| ) | |
| # Each async service → its own thread/loop | |
| movement_manager.start() | |
| head_wobbler.start() | |
| if camera_worker: | |
| camera_worker.start() | |
| if vision_manager: | |
| vision_manager.start() | |
| def poll_stop_event() -> None: | |
| """Poll the stop event to allow graceful shutdown.""" | |
| if app_stop_event is not None: | |
| app_stop_event.wait() | |
| logger.info("App stop event detected, shutting down...") | |
| try: | |
| stream_manager.close() | |
| except Exception as e: | |
| logger.error(f"Error while closing stream manager: {e}") | |
| if app_stop_event: | |
| threading.Thread(target=poll_stop_event, daemon=True).start() | |
| try: | |
| stream_manager.launch() | |
| except KeyboardInterrupt: | |
| logger.info("Keyboard interruption in main thread... closing server.") | |
| finally: | |
| movement_manager.stop() | |
| head_wobbler.stop() | |
| if camera_worker: | |
| camera_worker.stop() | |
| if vision_manager: | |
| vision_manager.stop() | |
| # Ensure media is explicitly closed before disconnecting | |
| try: | |
| robot.media.close() | |
| except Exception as e: | |
| logger.debug(f"Error closing media during shutdown: {e}") | |
| # prevent connection to keep alive some threads | |
| robot.client.disconnect() | |
| time.sleep(1) | |
| logger.info("Shutdown complete.") | |
| class ReachyMiniConversationApp(ReachyMiniApp): # type: ignore[misc] | |
| """Reachy Mini Apps entry point for the conversation app.""" | |
| custom_app_url = "http://0.0.0.0:7860/" | |
| dont_start_webserver = False | |
| def run(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None: | |
| """Run the Reachy Mini conversation app.""" | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| args, _ = parse_args() | |
| # is_wireless = reachy_mini.client.get_status()["wireless_version"] | |
| # args.head_tracker = None if is_wireless else "mediapipe" | |
| instance_path = self._get_instance_path().parent | |
| run( | |
| args, | |
| robot=reachy_mini, | |
| app_stop_event=stop_event, | |
| settings_app=self.settings_app, | |
| instance_path=instance_path, | |
| ) | |
| if __name__ == "__main__": | |
| app = ReachyMiniConversationApp() | |
| try: | |
| app.wrapped_run() | |
| except KeyboardInterrupt: | |
| app.stop() | |