| """ |
| 🎭 Haven Animation System |
| Brings Olivia & Brie to life through expressive Reachy Mini movements! |
| |
| Modified to use the shared robot instance from the Reachy Mini daemon |
| instead of creating its own connection. |
| """ |
|
|
| import threading |
| import time |
| import random |
| import math |
| import warnings |
| import sys |
| import os |
|
|
| |
| warnings.filterwarnings("ignore", category=UserWarning) |
| warnings.filterwarnings("ignore", message=".*Unknown task UUID.*") |
| warnings.filterwarnings("ignore", message=".*callback error.*") |
|
|
| |
| class SuppressReachyErrors: |
| """Context manager to suppress reachy_mini's internal callback errors.""" |
| def __enter__(self): |
| self._original_stderr = sys.stderr |
| sys.stderr = open(os.devnull, 'w') |
| return self |
| def __exit__(self, *args): |
| sys.stderr.close() |
| sys.stderr = self._original_stderr |
|
|
| |
| REACHY_AVAILABLE = False |
| try: |
| from reachy_mini import ReachyMini |
| from reachy_mini.utils import create_head_pose |
| REACHY_AVAILABLE = True |
| except ImportError: |
| |
| ReachyMini = None |
| def create_head_pose(**kwargs): |
| return None |
|
|
| |
| _animation_thread = None |
| _stop_animation = threading.Event() |
|
|
| |
| |
| |
| COLORS = { |
| "Olivia": [0, 180, 80], |
| "Brie": [255, 160, 0], |
| "neutral": [100, 100, 100], |
| "listening": [80, 80, 200], |
| "thinking": [150, 100, 200], |
| "celebration": [255, 220, 0], |
| } |
|
|
| |
| EYES = COLORS |
|
|
| def set_eye_color(robot, color_name_or_rgb): |
| """Set eye color by name or RGB values.""" |
| try: |
| if isinstance(color_name_or_rgb, str): |
| rgb = COLORS.get(color_name_or_rgb, COLORS["neutral"]) |
| else: |
| rgb = color_name_or_rgb |
| |
| if hasattr(robot, 'head'): |
| if hasattr(robot.head, 'l_eye') and hasattr(robot.head.l_eye, 'set_color'): |
| robot.head.l_eye.set_color(rgb) |
| if hasattr(robot.head, 'r_eye') and hasattr(robot.head.r_eye, 'set_color'): |
| robot.head.r_eye.set_color(rgb) |
| except Exception: |
| pass |
|
|
| def pulse_eyes(robot, color1, color2, duration=2.0, speed=0.3): |
| """Pulse between two colors for effect.""" |
| start = time.time() |
| toggle = True |
| while time.time() - start < duration and not _stop_animation.is_set(): |
| set_eye_color(robot, color1 if toggle else color2) |
| toggle = not toggle |
| time.sleep(speed) |
|
|
| def fade_eyes(robot, from_color, to_color, duration=1.0, steps=10): |
| """Smooth fade between colors.""" |
| try: |
| if isinstance(from_color, str): |
| from_color = COLORS.get(from_color, COLORS["neutral"]) |
| if isinstance(to_color, str): |
| to_color = COLORS.get(to_color, COLORS["neutral"]) |
| |
| for i in range(steps + 1): |
| t = i / steps |
| r = int(from_color[0] + (to_color[0] - from_color[0]) * t) |
| g = int(from_color[1] + (to_color[1] - from_color[1]) * t) |
| b = int(from_color[2] + (to_color[2] - from_color[2]) * t) |
| set_eye_color(robot, [r, g, b]) |
| time.sleep(duration / steps) |
| except Exception: |
| pass |
|
|
| def get_robot(): |
| """ |
| Get the robot instance. |
| |
| Priority: |
| 1. Use shared robot from daemon (when running as Reachy Mini app) |
| 2. Fall back to creating own connection (for standalone testing) |
| 3. Return None if not available (cloud deployment) |
| """ |
| if not REACHY_AVAILABLE: |
| return None |
| |
| |
| try: |
| try: |
| from . import get_shared_robot |
| except ImportError: |
| from main import get_shared_robot |
| shared = get_shared_robot() |
| if shared is not None: |
| return shared |
| except ImportError: |
| pass |
| |
| |
| try: |
| with SuppressReachyErrors(): |
| return ReachyMini(media_backend="no_media") |
| except Exception: |
| return None |
|
|
| def stop_current_animation(): |
| """Stop any running animation.""" |
| global _stop_animation |
| _stop_animation.set() |
| time.sleep(0.1) |
| _stop_animation.clear() |
|
|
| |
| |
| |
|
|
| def head_nod(robot, intensity=1.0): |
| """Simple affirmative nod.""" |
| z = 15 * intensity |
| robot.goto_target(head=create_head_pose(z=z), duration=0.2) |
| time.sleep(0.15) |
| robot.goto_target(head=create_head_pose(z=-5), duration=0.15) |
| time.sleep(0.1) |
| robot.goto_target(head=create_head_pose(z=0), duration=0.2) |
|
|
| def head_shake(robot, intensity=1.0): |
| """Gentle 'no' or uncertainty shake.""" |
| yaw = 20 * intensity |
| robot.goto_target(head=create_head_pose(yaw=yaw), duration=0.2) |
| time.sleep(0.1) |
| robot.goto_target(head=create_head_pose(yaw=-yaw), duration=0.2) |
| time.sleep(0.1) |
| robot.goto_target(head=create_head_pose(yaw=0), duration=0.2) |
|
|
| def head_tilt(robot, direction="right", hold=0.5): |
| """Curious/thoughtful head tilt.""" |
| roll = 15 if direction == "right" else -15 |
| robot.goto_target(head=create_head_pose(roll=roll), duration=0.3) |
| time.sleep(hold) |
| robot.goto_target(head=create_head_pose(roll=0), duration=0.3) |
|
|
| def antenna_wiggle(robot, times=2, speed=0.15): |
| """Happy antenna wiggle!""" |
| for _ in range(times): |
| robot.goto_target(antennas=[0.5, -0.5], duration=speed) |
| time.sleep(speed) |
| robot.goto_target(antennas=[-0.5, 0.5], duration=speed) |
| time.sleep(speed) |
| robot.goto_target(antennas=[0.0, 0.0], duration=0.2) |
|
|
| def antenna_perk(robot): |
| """Alert/interested antenna perk up.""" |
| robot.goto_target(antennas=[0.4, 0.4], duration=0.2) |
| time.sleep(0.3) |
| robot.goto_target(antennas=[0.0, 0.0], duration=0.3) |
|
|
| def antenna_droop(robot): |
| """Sad/sympathetic antenna droop.""" |
| robot.goto_target(antennas=[-0.3, -0.3], duration=0.4) |
| time.sleep(0.5) |
| robot.goto_target(antennas=[0.0, 0.0], duration=0.4) |
|
|
| def look_around(robot): |
| """Curious looking around.""" |
| robot.goto_target(head=create_head_pose(yaw=25, z=10), duration=0.4) |
| time.sleep(0.3) |
| robot.goto_target(head=create_head_pose(yaw=-25, z=5), duration=0.5) |
| time.sleep(0.3) |
| robot.goto_target(head=create_head_pose(yaw=0, z=0), duration=0.3) |
|
|
|
|
| |
| |
| |
|
|
| def olivia_greeting(robot): |
| """Olivia's warm, professional greeting.""" |
| set_eye_color(robot, "Olivia") |
| robot.goto_target(head=create_head_pose(roll=8, z=5), duration=0.4) |
| time.sleep(0.2) |
| head_nod(robot, intensity=0.7) |
| robot.goto_target(head=create_head_pose(roll=0, z=0), duration=0.3) |
|
|
| def olivia_thinking(robot): |
| """Olivia processing/thinking.""" |
| set_eye_color(robot, "Olivia") |
| robot.goto_target(head=create_head_pose(yaw=15, z=20, roll=5), duration=0.5) |
| time.sleep(0.8) |
| robot.goto_target(head=create_head_pose(yaw=0, z=0, roll=0), duration=0.4) |
|
|
| def olivia_speaking(robot, duration=3.0): |
| """Subtle movements while Olivia speaks.""" |
| set_eye_color(robot, "Olivia") |
| start = time.time() |
| while time.time() - start < duration and not _stop_animation.is_set(): |
| z = random.uniform(-3, 5) |
| yaw = random.uniform(-5, 5) |
| roll = random.uniform(-3, 3) |
| robot.goto_target(head=create_head_pose(z=z, yaw=yaw, roll=roll), duration=0.4) |
| time.sleep(0.3 + random.uniform(0, 0.3)) |
| robot.goto_target(head=create_head_pose(), duration=0.3) |
|
|
| def olivia_handoff(robot): |
| """Olivia handing off to Brie.""" |
| robot.goto_target(head=create_head_pose(yaw=30, z=-5), duration=0.4) |
| time.sleep(0.3) |
| head_nod(robot, intensity=0.5) |
| time.sleep(0.2) |
| fade_eyes(robot, EYES["Olivia"], EYES["Brie"], duration=0.5) |
| robot.goto_target(head=create_head_pose(yaw=0, z=0), duration=0.4) |
|
|
| |
| |
| |
|
|
| def brie_greeting(robot): |
| """Brie's enthusiastic greeting!""" |
| set_eye_color(robot, "Brie") |
| antenna_perk(robot) |
| time.sleep(0.1) |
| robot.goto_target(head=create_head_pose(z=15), duration=0.2) |
| antenna_wiggle(robot, times=2, speed=0.12) |
| robot.goto_target(head=create_head_pose(z=0), duration=0.2) |
|
|
| def brie_excited(robot): |
| """Brie gets excited about cooking!""" |
| set_eye_color(robot, "Brie") |
| pulse_eyes(robot, COLORS["Brie"], COLORS["celebration"], duration=0.6, speed=0.15) |
| for _ in range(2): |
| robot.goto_target(head=create_head_pose(z=20), antennas=[0.6, -0.6], duration=0.15) |
| time.sleep(0.1) |
| robot.goto_target(head=create_head_pose(z=0), antennas=[-0.6, 0.6], duration=0.15) |
| time.sleep(0.1) |
| robot.goto_target(head=create_head_pose(), antennas=[0, 0], duration=0.2) |
| set_eye_color(robot, "Brie") |
|
|
| def brie_speaking(robot, duration=3.0): |
| """Animated movements while Brie speaks about recipes.""" |
| set_eye_color(robot, "Brie") |
| start = time.time() |
| while time.time() - start < duration and not _stop_animation.is_set(): |
| z = random.uniform(-5, 15) |
| yaw = random.uniform(-10, 10) |
| roll = random.uniform(-8, 8) |
| ant_l = random.uniform(-0.2, 0.3) |
| ant_r = random.uniform(-0.2, 0.3) |
| robot.goto_target( |
| head=create_head_pose(z=z, yaw=yaw, roll=roll), |
| antennas=[ant_l, ant_r], |
| duration=0.35 |
| ) |
| time.sleep(0.25 + random.uniform(0, 0.2)) |
| robot.goto_target(head=create_head_pose(), antennas=[0, 0], duration=0.3) |
|
|
| def brie_step_complete(robot): |
| """Celebration when user completes a cooking step!""" |
| set_eye_color(robot, "Brie") |
| set_eye_color(robot, "celebration") |
| robot.goto_target(head=create_head_pose(z=25), antennas=[0.7, 0.7], duration=0.2) |
| time.sleep(0.15) |
| antenna_wiggle(robot, times=3, speed=0.1) |
| robot.goto_target(head=create_head_pose(z=0), duration=0.3) |
| set_eye_color(robot, "Brie") |
|
|
| def brie_thinking_recipe(robot): |
| """Brie thinking about what to cook.""" |
| set_eye_color(robot, "Brie") |
| robot.goto_target(head=create_head_pose(z=25, yaw=10), antennas=[0.2, -0.1], duration=0.4) |
| time.sleep(0.5) |
| robot.goto_target(head=create_head_pose(z=20, yaw=-10), antennas=[-0.1, 0.2], duration=0.4) |
| time.sleep(0.4) |
| set_eye_color(robot, "celebration") |
| robot.goto_target(head=create_head_pose(z=15), antennas=[0.4, 0.4], duration=0.2) |
| time.sleep(0.2) |
| set_eye_color(robot, "Brie") |
| robot.goto_target(head=create_head_pose(), antennas=[0, 0], duration=0.3) |
|
|
|
|
| |
| |
| |
|
|
| def react_happy(robot): |
| """Generic happy reaction.""" |
| set_eye_color(robot, "celebration") |
| antenna_perk(robot) |
| head_nod(robot, intensity=0.8) |
| antenna_wiggle(robot, times=1) |
| set_eye_color(robot, "neutral") |
|
|
| def react_sympathetic(robot): |
| """Sympathetic/understanding reaction.""" |
| set_eye_color(robot, [100, 150, 200]) |
| head_tilt(robot, "right", hold=0.3) |
| antenna_droop(robot) |
| head_nod(robot, intensity=0.5) |
|
|
| def react_curious(robot): |
| """Curious/interested reaction.""" |
| set_eye_color(robot, "thinking") |
| antenna_perk(robot) |
| head_tilt(robot, "left", hold=0.4) |
| look_around(robot) |
| set_eye_color(robot, "neutral") |
|
|
| def react_listening(robot): |
| """Show active listening.""" |
| set_eye_color(robot, "listening") |
| robot.goto_target(head=create_head_pose(z=8, roll=5), antennas=[0.15, 0.15], duration=0.3) |
| time.sleep(0.5) |
|
|
| |
| |
| |
|
|
| def idle_breathing(robot, duration=10.0): |
| """Subtle breathing-like movement when idle.""" |
| set_eye_color(robot, "neutral") |
| start = time.time() |
| t = 0 |
| while time.time() - start < duration and not _stop_animation.is_set(): |
| z = 3 * math.sin(t * 0.5) |
| ant = 0.05 * math.sin(t * 0.3) |
| robot.goto_target( |
| head=create_head_pose(z=z), |
| antennas=[ant, ant], |
| duration=0.5 |
| ) |
| time.sleep(0.4) |
| t += 0.4 |
| robot.goto_target(head=create_head_pose(), antennas=[0, 0], duration=0.3) |
|
|
| def idle_look_around(robot): |
| """Occasional curious look around when idle.""" |
| set_eye_color(robot, "neutral") |
| positions = [ |
| (15, 5, 0), |
| (-10, 10, 5), |
| (0, -5, -3), |
| (20, 8, -5), |
| (0, 0, 0), |
| ] |
| for yaw, z, roll in positions: |
| if _stop_animation.is_set(): |
| break |
| robot.goto_target(head=create_head_pose(yaw=yaw, z=z, roll=roll), duration=0.6) |
| time.sleep(0.4 + random.uniform(0, 0.5)) |
|
|
|
|
| |
| |
| |
|
|
| def animate(animation_name, persona="Olivia", **kwargs): |
| """ |
| Main animation entry point. Call this from the app! |
| |
| Examples: |
| animate("greeting", persona="Olivia") |
| animate("speaking", persona="Brie", duration=5.0) |
| animate("step_complete", persona="Brie") |
| animate("thinking") |
| """ |
| global _animation_thread |
| |
| robot = get_robot() |
| if robot is None: |
| return |
| |
| stop_current_animation() |
| |
| def run_animation(): |
| with SuppressReachyErrors(): |
| try: |
| |
| if persona == "Olivia": |
| if animation_name == "greeting": |
| olivia_greeting(robot) |
| elif animation_name == "thinking": |
| olivia_thinking(robot) |
| elif animation_name == "speaking": |
| olivia_speaking(robot, kwargs.get("duration", 3.0)) |
| elif animation_name == "handoff": |
| olivia_handoff(robot) |
| else: |
| head_nod(robot) |
| |
| elif persona == "Brie": |
| if animation_name == "greeting": |
| brie_greeting(robot) |
| elif animation_name == "thinking": |
| brie_thinking_recipe(robot) |
| elif animation_name == "speaking": |
| brie_speaking(robot, kwargs.get("duration", 3.0)) |
| elif animation_name == "excited": |
| brie_excited(robot) |
| elif animation_name == "step_complete": |
| brie_step_complete(robot) |
| else: |
| antenna_wiggle(robot) |
| |
| |
| elif animation_name == "happy": |
| react_happy(robot) |
| elif animation_name == "sympathetic": |
| react_sympathetic(robot) |
| elif animation_name == "curious": |
| react_curious(robot) |
| elif animation_name == "listening": |
| react_listening(robot) |
| elif animation_name == "idle": |
| idle_breathing(robot, kwargs.get("duration", 10.0)) |
| elif animation_name == "look_around": |
| idle_look_around(robot) |
| |
| except Exception: |
| pass |
| |
| _animation_thread = threading.Thread(target=run_animation, daemon=True) |
| _animation_thread.start() |
|
|
| def animate_sync(animation_name, persona="Olivia", **kwargs): |
| """Synchronous version - waits for animation to complete.""" |
| animate(animation_name, persona, **kwargs) |
| if _animation_thread: |
| _animation_thread.join() |
|
|
| def idle_mode(duration=30.0): |
| """Start idle animations when not actively conversing.""" |
| animate("idle", duration=duration) |
|
|