| """ |
| πΏ Haven Kitchen OS - Reachy Mini App |
| Main entry point that integrates with the Reachy Mini daemon. |
| |
| This module provides the ReachyMiniApp wrapper that: |
| 1. Receives the robot instance from the daemon |
| 2. Runs a demo showcasing Olivia and Brie personas |
| 3. Handles graceful shutdown |
| """ |
|
|
| import threading |
| import time |
| import random |
|
|
| from reachy_mini import ReachyMini, ReachyMiniApp |
| from reachy_mini.utils import create_head_pose |
|
|
| |
| _shared_robot: ReachyMini | None = None |
| _robot_lock = threading.Lock() |
|
|
|
|
| def get_shared_robot() -> ReachyMini | None: |
| """Get the shared robot instance. Used by animations.py.""" |
| with _robot_lock: |
| return _shared_robot |
|
|
|
|
| def set_shared_robot(robot: ReachyMini | None) -> None: |
| """Set the shared robot instance. Called by HavenKitchenOS.run().""" |
| global _shared_robot |
| with _robot_lock: |
| _shared_robot = robot |
|
|
|
|
| |
| |
| |
| COLORS = { |
| "Olivia": [0, 180, 80], |
| "Brie": [255, 160, 0], |
| "neutral": [100, 100, 100], |
| "celebration": [255, 220, 0], |
| } |
|
|
|
|
| def set_eye_color(robot, color_name): |
| """Set eye color by name.""" |
| try: |
| rgb = COLORS.get(color_name, COLORS["neutral"]) |
| if hasattr(robot, 'head'): |
| if hasattr(robot.head, 'l_eye') and hasattr(robot.head.l_eye, 'set_color'): |
| robot.head.l_eye.set_color(rgb) |
| if hasattr(robot.head, 'r_eye') and hasattr(robot.head.r_eye, 'set_color'): |
| robot.head.r_eye.set_color(rgb) |
| except Exception: |
| pass |
|
|
|
|
| class HavenKitchenOs(ReachyMiniApp): |
| """ |
| Haven Kitchen OS - AI Home Management System |
| |
| Features two AI personas: |
| - π©π»βπΌ Olivia: Professional household manager (sage green eyes) |
| - π©π»βπ³ Brie: Enthusiastic personal chef (amber eyes) |
| """ |
| |
| def run(self, reachy_mini: ReachyMini, stop_event: threading.Event): |
| """ |
| Main entry point called by the Reachy Mini daemon. |
| Runs an interactive demo showcasing both personas. |
| """ |
| |
| set_shared_robot(reachy_mini) |
| |
| print("πΏ Haven Kitchen OS started!") |
| print(" π©π»βπΌ Olivia and π©π»βπ³ Brie are ready to help.") |
| |
| try: |
| |
| self._run_demo(reachy_mini, stop_event) |
| |
| finally: |
| |
| print("π Shutting down Haven Kitchen OS...") |
| set_shared_robot(None) |
| |
| |
| try: |
| set_eye_color(reachy_mini, "neutral") |
| reachy_mini.goto_target( |
| head=create_head_pose(), |
| antennas=[0, 0], |
| duration=0.5 |
| ) |
| except: |
| pass |
| |
| print("π Haven Kitchen OS stopped. Goodbye!") |
| |
| def _run_demo(self, robot: ReachyMini, stop_event: threading.Event): |
| """Run the demo loop showcasing Olivia and Brie.""" |
| |
| while not stop_event.is_set(): |
| |
| print("\nπ©π»βπΌ Olivia: Good morning! Let me help organize your day...") |
| |
| |
| set_eye_color(robot, "Olivia") |
| robot.goto_target(head=create_head_pose(roll=8, z=5), duration=0.4) |
| time.sleep(0.3) |
| |
| |
| robot.goto_target(head=create_head_pose(z=15), duration=0.2) |
| time.sleep(0.15) |
| robot.goto_target(head=create_head_pose(z=-5), duration=0.15) |
| time.sleep(0.1) |
| robot.goto_target(head=create_head_pose(z=0), duration=0.2) |
| |
| if stop_event.is_set(): |
| break |
| |
| time.sleep(1.0) |
| |
| |
| print("π©π»βπΌ Olivia: Let me check your schedule...") |
| robot.goto_target(head=create_head_pose(yaw=15, z=20, roll=5), duration=0.5) |
| time.sleep(1.0) |
| robot.goto_target(head=create_head_pose(yaw=0, z=0, roll=0), duration=0.4) |
| |
| if stop_event.is_set(): |
| break |
| |
| time.sleep(1.5) |
| |
| |
| print("\nπ©π»βπΌ Olivia: Time for dinner planning! Let me get Brie...") |
| |
| |
| robot.goto_target(head=create_head_pose(yaw=30, z=-5), duration=0.4) |
| time.sleep(0.3) |
| |
| |
| robot.goto_target(head=create_head_pose(z=10), duration=0.2) |
| time.sleep(0.15) |
| robot.goto_target(head=create_head_pose(z=0), duration=0.2) |
| |
| |
| set_eye_color(robot, "Brie") |
| robot.goto_target(head=create_head_pose(yaw=0, z=0), duration=0.4) |
| |
| if stop_event.is_set(): |
| break |
| |
| time.sleep(1.0) |
| |
| |
| print("\nπ©π»βπ³ Brie: Ooh, dinner time! I'm SO excited to cook with you!") |
| |
| |
| robot.goto_target(antennas=[0.4, 0.4], duration=0.2) |
| time.sleep(0.3) |
| robot.goto_target(head=create_head_pose(z=15), duration=0.2) |
| |
| |
| for _ in range(2): |
| robot.goto_target(antennas=[0.5, -0.5], duration=0.12) |
| time.sleep(0.12) |
| robot.goto_target(antennas=[-0.5, 0.5], duration=0.12) |
| time.sleep(0.12) |
| robot.goto_target(antennas=[0, 0], duration=0.2) |
| robot.goto_target(head=create_head_pose(z=0), duration=0.2) |
| |
| if stop_event.is_set(): |
| break |
| |
| time.sleep(1.0) |
| |
| |
| print("π©π»βπ³ Brie: Hmm, what shall we make tonight?") |
| robot.goto_target(head=create_head_pose(z=25, yaw=10), antennas=[0.2, -0.1], duration=0.4) |
| time.sleep(1.0) |
| |
| |
| robot.goto_target(head=create_head_pose(yaw=25, z=10), duration=0.4) |
| time.sleep(0.3) |
| robot.goto_target(head=create_head_pose(yaw=-25, z=5), duration=0.5) |
| time.sleep(0.3) |
| robot.goto_target(head=create_head_pose(yaw=0, z=0), duration=0.3) |
| |
| if stop_event.is_set(): |
| break |
| |
| time.sleep(0.5) |
| |
| |
| print("π©π»βπ³ Brie: I know! Let's make pasta carbonara! π") |
| set_eye_color(robot, "celebration") |
| robot.goto_target(head=create_head_pose(z=25), antennas=[0.7, 0.7], duration=0.2) |
| time.sleep(0.15) |
| |
| |
| for _ in range(3): |
| robot.goto_target(antennas=[0.5, -0.5], duration=0.1) |
| time.sleep(0.1) |
| robot.goto_target(antennas=[-0.5, 0.5], duration=0.1) |
| time.sleep(0.1) |
| |
| robot.goto_target(head=create_head_pose(z=0), antennas=[0, 0], duration=0.3) |
| set_eye_color(robot, "Brie") |
| |
| if stop_event.is_set(): |
| break |
| |
| time.sleep(2.0) |
| |
| |
| print("\nπ©π»βπ³ Brie: Great! I'll hand you back to Olivia now.") |
| set_eye_color(robot, "Olivia") |
| |
| |
| robot.goto_target(head=create_head_pose(), antennas=[0, 0], duration=0.5) |
| |
| if stop_event.is_set(): |
| break |
| |
| time.sleep(3.0) |
| |
| print("\n--- Demo loop restarting ---\n") |
|
|
|
|
| |
| if __name__ == "__main__": |
| app = HavenKitchenOs() |
| try: |
| app.wrapped_run() |
| except KeyboardInterrupt: |
| app.stop() |
|
|