"""Camera worker thread with frame buffering and face tracking. Ported from main_works.py camera_worker() function to provide: - 30Hz+ camera polling with thread-safe frame buffering - Face tracking integration with smooth interpolation - Latest frame always available for tools """ import time import logging import threading from typing import Any, List, Tuple import numpy as np from numpy.typing import NDArray from scipy.spatial.transform import Rotation as R from reachy_mini import ReachyMini from reachy_mini.utils.interpolation import linear_pose_interpolation logger = logging.getLogger(__name__) class CameraWorker: """Thread-safe camera worker with frame buffering and face tracking.""" def __init__(self, reachy_mini: ReachyMini, head_tracker: Any = None) -> None: """Initialize.""" self.reachy_mini = reachy_mini self.head_tracker = head_tracker # Thread-safe frame storage self.latest_frame: NDArray[np.uint8] | None = None self.frame_lock = threading.Lock() self._stop_event = threading.Event() self._thread: threading.Thread | None = None # Face tracking state self.is_head_tracking_enabled = True self.face_tracking_offsets: List[float] = [ 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ] # x, y, z, roll, pitch, yaw self.face_tracking_lock = threading.Lock() # Face tracking timing variables (same as main_works.py) self.last_face_detected_time: float | None = None self.interpolation_start_time: float | None = None self.interpolation_start_pose: NDArray[np.float32] | None = None self.face_lost_delay = 2.0 # seconds to wait before starting interpolation self.interpolation_duration = 1.0 # seconds to interpolate back to neutral # Track state changes self.previous_head_tracking_state = self.is_head_tracking_enabled def get_latest_frame(self) -> NDArray[np.uint8] | None: """Get the latest frame (thread-safe).""" with self.frame_lock: if self.latest_frame is None: return None # Return a copy in original BGR format (OpenCV native) return self.latest_frame.copy() def get_face_tracking_offsets( self, ) -> Tuple[float, float, float, float, float, float]: """Get current face tracking offsets (thread-safe).""" with self.face_tracking_lock: offsets = self.face_tracking_offsets return (offsets[0], offsets[1], offsets[2], offsets[3], offsets[4], offsets[5]) def set_head_tracking_enabled(self, enabled: bool) -> None: """Enable/disable head tracking.""" self.is_head_tracking_enabled = enabled logger.info(f"Head tracking {'enabled' if enabled else 'disabled'}") def start(self) -> None: """Start the camera worker loop in a thread.""" self._stop_event.clear() self._thread = threading.Thread(target=self.working_loop, daemon=True) self._thread.start() logger.debug("Camera worker started") def stop(self) -> None: """Stop the camera worker loop.""" self._stop_event.set() if self._thread is not None: self._thread.join() logger.debug("Camera worker stopped") def working_loop(self) -> None: """Enable the camera worker loop. Ported from main_works.py camera_worker() with same logic. """ logger.debug("Starting camera working loop") # Initialize head tracker if available neutral_pose = np.eye(4) # Neutral pose (identity matrix) self.previous_head_tracking_state = self.is_head_tracking_enabled while not self._stop_event.is_set(): try: current_time = time.time() # Get frame from robot frame = self.reachy_mini.media.get_frame() if frame is not None: # Thread-safe frame storage with self.frame_lock: self.latest_frame = frame # .copy() # Check if face tracking was just disabled if self.previous_head_tracking_state and not self.is_head_tracking_enabled: # Face tracking was just disabled - start interpolation to neutral self.last_face_detected_time = current_time # Trigger the face-lost logic self.interpolation_start_time = None # Will be set by the face-lost interpolation self.interpolation_start_pose = None # Update tracking state self.previous_head_tracking_state = self.is_head_tracking_enabled # Handle face tracking if enabled and head tracker available if self.is_head_tracking_enabled and self.head_tracker is not None: eye_center, _ = self.head_tracker.get_head_position(frame) if eye_center is not None: # Face detected - immediately switch to tracking self.last_face_detected_time = current_time self.interpolation_start_time = None # Stop any interpolation # Convert normalized coordinates to pixel coordinates h, w, _ = frame.shape eye_center_norm = (eye_center + 1) / 2 eye_center_pixels = [ eye_center_norm[0] * w, eye_center_norm[1] * h, ] # Get the head pose needed to look at the target, but don't perform movement target_pose = self.reachy_mini.look_at_image( eye_center_pixels[0], eye_center_pixels[1], duration=0.0, perform_movement=False, ) # Extract translation and rotation from the target pose directly translation = target_pose[:3, 3] rotation = R.from_matrix(target_pose[:3, :3]).as_euler("xyz", degrees=False) # Scale down translation and rotation because smaller FOV translation *= 0.6 rotation *= 0.6 # Thread-safe update of face tracking offsets (use pose as-is) with self.face_tracking_lock: self.face_tracking_offsets = [ translation[0], translation[1], translation[2], # x, y, z rotation[0], rotation[1], rotation[2], # roll, pitch, yaw ] # No face detected while tracking enabled - set face lost timestamp elif self.last_face_detected_time is None or self.last_face_detected_time == current_time: # Only update if we haven't already set a face lost time # (current_time check prevents overriding the disable-triggered timestamp) pass # Handle smooth interpolation (works for both face-lost and tracking-disabled cases) if self.last_face_detected_time is not None: time_since_face_lost = current_time - self.last_face_detected_time if time_since_face_lost >= self.face_lost_delay: # Start interpolation if not already started if self.interpolation_start_time is None: self.interpolation_start_time = current_time # Capture current pose as start of interpolation with self.face_tracking_lock: current_translation = self.face_tracking_offsets[:3] current_rotation_euler = self.face_tracking_offsets[3:] # Convert to 4x4 pose matrix pose_matrix = np.eye(4, dtype=np.float32) pose_matrix[:3, 3] = current_translation pose_matrix[:3, :3] = R.from_euler( "xyz", current_rotation_euler, ).as_matrix() self.interpolation_start_pose = pose_matrix # Calculate interpolation progress (t from 0 to 1) elapsed_interpolation = current_time - self.interpolation_start_time t = min(1.0, elapsed_interpolation / self.interpolation_duration) # Interpolate between current pose and neutral pose interpolated_pose = linear_pose_interpolation( self.interpolation_start_pose, neutral_pose, t, ) # Extract translation and rotation from interpolated pose translation = interpolated_pose[:3, 3] rotation = R.from_matrix(interpolated_pose[:3, :3]).as_euler("xyz", degrees=False) # Thread-safe update of face tracking offsets with self.face_tracking_lock: self.face_tracking_offsets = [ translation[0], translation[1], translation[2], # x, y, z rotation[0], rotation[1], rotation[2], # roll, pitch, yaw ] # If interpolation is complete, reset timing if t >= 1.0: self.last_face_detected_time = None self.interpolation_start_time = None self.interpolation_start_pose = None # else: Keep current offsets (within 2s delay period) # Small sleep to prevent excessive CPU usage (same as main_works.py) time.sleep(0.04) except Exception as e: logger.error(f"Camera worker error: {e}") time.sleep(0.1) # Longer sleep on error logger.debug("Camera worker thread exited")