--- sidebar_position: 4 --- # Chapter 3: Bridging Python Agents to ROS Controllers (rclpy) ## Learning Objectives - Understand how to create Python nodes that interface with ROS 2 - Learn to use rclpy to communicate with ROS 2 systems - Implement bridges between Python AI agents and ROS controllers - Create robust communication patterns between AI and control systems - Handle errors and exceptions in AI-control bridges ## Introduction to rclpy rclpy is the Python client library for ROS 2, providing Python bindings for the ROS 2 middleware. It allows Python developers to create ROS 2 nodes and interact with the ROS 2 ecosystem. This is particularly important for humanoid robotics, where Python is widely used for AI and machine learning applications. ### Why Python for AI in Robotics Python is the dominant language for AI and machine learning development due to: - **Rich ecosystem**: Libraries like TensorFlow, PyTorch, scikit-learn, and OpenAI - **Rapid prototyping**: Easy to develop and test AI algorithms - **Community support**: Large community of AI researchers and practitioners - **Integration capabilities**: Easy to integrate different systems and libraries ## Understanding the AI-Control Bridge In humanoid robotics, there's often a need to bridge AI systems (running in Python) with robotic control systems (often using ROS 2). The bridge typically involves: 1. **Receiving sensor data** from ROS 2 topics 2. **Processing data** through AI algorithms 3. **Generating commands** based on AI decisions 4. **Sending commands** to robot controllers via ROS 2 ### Architecture of AI-Control Bridge ``` [ROS 2 Sensors] → [Python Bridge Node] → [AI Agent] → [Python Bridge Node] → [ROS 2 Controllers] ``` ## Setting up rclpy Nodes ### Basic Node Structure ```python import rclpy from rclpy.node import Node from std_msgs.msg import String, Float32 from geometry_msgs.msg import Twist from sensor_msgs.msg import JointState import numpy as np class AIBridgeNode(Node): def __init__(self): super().__init__('ai_bridge_node') # Publishers for sending commands to robot self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10) self.joint_cmd_publisher = self.create_publisher(JointState, '/joint_commands', 10) # Subscribers for receiving sensor data self.sensor_subscriber = self.create_subscription( JointState, '/joint_states', self.joint_state_callback, 10 ) self.imu_subscriber = self.create_subscription( String, # In practice, this would be sensor_msgs/Imu '/imu_data', self.imu_callback, 10 ) # Store state data that will be processed by AI self.current_joint_states = JointState() self.imu_data = None # Timer for AI processing loop self.processing_timer = self.create_timer(0.1, self.process_ai_step) # 10 Hz self.get_logger().info('AI Bridge Node initialized') def joint_state_callback(self, msg): """Process joint state messages""" self.current_joint_states = msg self.get_logger().debug(f'Received joint states for {len(msg.name)} joints') def imu_callback(self, msg): """Process IMU data""" self.imu_data = msg.data self.get_logger().debug(f'Received IMU data: {msg.data}') def process_ai_step(self): """Process one step of AI algorithm""" # In a real system, this would call your AI agent # For now, we'll implement a simple balance controller if self.imu_data is not None: # Simple example: if robot is tilting, send corrective command try: # Convert string IMU data to numerical values tilt_angle = float(self.imu_data) if abs(tilt_angle) > 0.5: # If tilting more than 0.5 radians # Send corrective joint commands cmd_msg = JointState() cmd_msg.header.stamp = self.get_clock().now().to_msg() cmd_msg.name = ['left_ankle_pitch', 'right_ankle_pitch'] cmd_msg.position = [-tilt_angle * 0.5, -tilt_angle * 0.5] # Correcting torque self.joint_cmd_publisher.publish(cmd_msg) self.get_logger().info(f'Sent corrective commands for tilt: {tilt_angle}') except ValueError: self.get_logger().error(f'Could not parse IMU data: {self.imu_data}') def main(args=None): rclpy.init(args=args) ai_bridge_node = AIBridgeNode() try: rclpy.spin(ai_bridge_node) except KeyboardInterrupt: pass finally: ai_bridge_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## Implementing AI Agent Integration ### Simple AI Agent Example ```python import numpy as np from sklearn.linear_model import LinearRegression class SimpleAIAgent: def __init__(self): # In a real system, this might be a neural network or other ML model self.model = LinearRegression() self.is_trained = False # For humanoid balance control self.balance_history = [] self.max_history = 100 # Store last 100 samples def predict_control(self, sensor_data): """ Given sensor data, predict appropriate control actions sensor_data: dict containing sensor readings """ if not self.is_trained: # For untrained model, return simple proportional control tilt = sensor_data.get('tilt', 0.0) return {'left_ankle_torque': -tilt * 0.5, 'right_ankle_torque': -tilt * 0.5} # Use trained model to predict control # This is a simplified example features = np.array([sensor_data['tilt'], sensor_data['angular_velocity']]).reshape(1, -1) control_output = self.model.predict(features) return { 'left_ankle_torque': float(control_output[0]), 'right_ankle_torque': float(control_output[1]) } def add_training_data(self, sensor_data, control_output): """Add training data for future learning""" self.balance_history.append({ 'sensor': sensor_data.copy(), 'control': control_output.copy() }) # Keep only recent history if len(self.balance_history) > self.max_history: self.balance_history.pop(0) def train_model(self): """Train the model with collected data""" if len(self.balance_history) < 10: # Need minimum data return False # Prepare training data X = [] # Sensor inputs y = [] # Control outputs for sample in self.balance_history: sensor_data = sample['sensor'] control_data = sample['control'] X.append([sensor_data['tilt'], sensor_data['angular_velocity']]) y.append([control_data['left_ankle_torque'], control_data['right_ankle_torque']]) X = np.array(X) y = np.array(y) # Train the model self.model.fit(X, y) self.is_trained = True return True ``` ## Advanced Bridge Node with AI Integration ```python import rclpy from rclpy.node import Node from sensor_msgs.msg import JointState, Imu from geometry_msgs.msg import Twist from std_msgs.msg import Float32 from builtin_interfaces.msg import Time import numpy as np import time class AdvancedAIBridgeNode(Node): def __init__(self): super().__init__('advanced_ai_bridge_node') # Publishers self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10) self.joint_cmd_publisher = self.create_publisher(JointState, '/joint_commands', 10) self.ai_feedback_publisher = self.create_publisher(Float32, '/ai_control_effort', 10) # Subscribers self.joint_state_subscriber = self.create_subscription( JointState, '/joint_states', self.joint_state_callback, 10 ) self.imu_subscriber = self.create_subscription( Imu, '/imu/data', self.imu_callback, 10 ) self.force_torque_subscriber = self.create_subscription( String, # In practice, this would use WrenchStamped or similar '/ft_sensors', self.ft_callback, 10 ) # Initialize AI agent self.ai_agent = SimpleAIAgent() # State variables self.current_joint_states = JointState() self.current_imu = None self.ft_data = None self.last_control_time = time.time() # Processing timer self.processing_timer = self.create_timer(0.05, self.process_ai_step) # 20 Hz # Training timer (slow, for continuous learning) self.training_timer = self.create_timer(5.0, self.train_model_if_needed) self.get_logger().info('Advanced AI Bridge Node initialized') def joint_state_callback(self, msg): """Handle joint state updates""" self.current_joint_states = msg def imu_callback(self, msg): """Handle IMU data updates""" self.current_imu = msg def ft_callback(self, msg): """Handle force/torque sensor updates""" self.ft_data = msg.data # Simplified string representation def process_ai_step(self): """Main AI processing step""" # Gather current sensor data sensor_data = self.get_sensor_data() if sensor_data is None: # Insufficient data to proceed return try: # Get AI prediction control_output = self.ai_agent.predict_control(sensor_data) # Execute control commands self.execute_control_commands(control_output) # Calculate and publish control effort feedback effort = self.calculate_control_effort(control_output) effort_msg = Float32() effort_msg.data = effort self.ai_feedback_publisher.publish(effort_msg) # Optionally store training data self.store_training_data(sensor_data, control_output) # Update timing for next control step self.last_control_time = time.time() except Exception as e: self.get_logger().error(f'Error in AI processing: {str(e)}') def get_sensor_data(self): """Extract relevant sensor data for the AI agent""" if self.current_imu is None: return None # Extract relevant information from sensors sensor_data = { 'tilt': self.current_imu.orientation.z, # Simplified - in real systems, would use proper orientation 'angular_velocity': self.current_imu.angular_velocity.z, 'linear_acceleration': self.current_imu.linear_acceleration.x, 'joint_positions': dict(zip(self.current_joint_states.name, self.current_joint_states.position)), 'joint_velocities': dict(zip(self.current_joint_states.name, self.current_joint_states.velocity)) } # Add time-based features dt = time.time() - self.last_control_time sensor_data['dt'] = dt return sensor_data def execute_control_commands(self, control_output): """Execute the control commands from the AI agent""" # Create joint command message joint_cmd_msg = JointState() joint_cmd_msg.header.stamp = self.get_clock().now().to_msg() # Add commanded joint positions/torques for joint_name, torque_value in control_output.items(): if 'torque' in joint_name: # This is a torque command joint_name_clean = joint_name.replace('_torque', '') joint_cmd_msg.name.append(joint_name_clean) joint_cmd_msg.effort.append(torque_value) elif 'position' in joint_name: # This is a position command joint_name_clean = joint_name.replace('_position', '') joint_cmd_msg.name.append(joint_name_clean) joint_cmd_msg.position.append(torque_value) # Publish joint commands if len(joint_cmd_msg.name) > 0: self.joint_cmd_publisher.publish(joint_cmd_msg) def calculate_control_effort(self, control_output): """Calculate a measure of control effort""" effort = 0.0 for value in control_output.values(): effort += abs(value) return effort def store_training_data(self, sensor_data, control_output): """Store data for future training""" self.ai_agent.add_training_data(sensor_data, control_output) def train_model_if_needed(self): """Periodically attempt to train the model if enough data is available""" success = self.ai_agent.train_model() if success: self.get_logger().info('AI model retrained successfully') else: self.get_logger().debug('Not enough data to retrain AI model') def main(args=None): rclpy.init(args=args) node = AdvancedAIBridgeNode() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## Real-World Considerations ### Performance Optimization When bridging Python AI agents with ROS 2 controllers, performance is critical: ```python import threading from queue import Queue, Empty import numpy as np class OptimizedAIBridgeNode(Node): def __init__(self): super().__init__('optimized_ai_bridge_node') # Publishers and subscribers (similar to previous example) self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10) self.joint_cmd_publisher = self.create_publisher(JointState, '/joint_commands', 10) self.joint_state_subscriber = self.create_subscription( JointState, '/joint_states', self.joint_state_callback, 10 ) # Separate thread for AI processing self.ai_input_queue = Queue(maxsize=10) # Limit queue size self.ai_output_queue = Queue(maxsize=10) # Start AI processing thread self.ai_thread = threading.Thread(target=self.ai_processing_loop, daemon=True) self.ai_thread.start() # Timer for sensor data processing self.sensor_timer = self.create_timer(0.02, self.process_sensor_data) # 50 Hz self.get_logger().info('Optimized AI Bridge Node initialized') def joint_state_callback(self, msg): """Non-blocking sensor data processing""" try: # Convert to numpy array for efficient processing joint_pos = np.array(msg.position) joint_vel = np.array(msg.velocity) # Prepare sensor data packet sensor_data = { 'timestamp': time.time(), 'joint_positions': joint_pos, 'joint_velocities': joint_vel, 'joint_names': msg.name } # Add to AI input queue if there's space try: self.ai_input_queue.put_nowait(sensor_data) except: # Queue is full, drop the oldest data try: self.ai_input_queue.get_nowait() self.ai_input_queue.put_nowait(sensor_data) except: pass # Still full, drop this data point except Exception as e: self.get_logger().error(f'Error in joint callback: {str(e)}') def ai_processing_loop(self): """Dedicated thread for AI processing""" while rclpy.ok(): try: # Get the most recent sensor data sensor_data = None while True: try: sensor_data = self.ai_input_queue.get_nowait() except Empty: break # No more recent data if sensor_data is not None: # Process with AI model (this can be computationally expensive) control_output = self.process_with_ai(sensor_data) # Add to output queue try: self.ai_output_queue.put_nowait(control_output) except: # Output queue full, drop the result pass except Exception as e: self.get_logger().error(f'Error in AI thread: {str(e)}') # Brief sleep to prevent busy waiting time.sleep(0.001) def process_with_ai(self, sensor_data): """AI processing function (runs in separate thread)""" # This is where the heavy AI computation happens # For example, running a neural network pass def process_sensor_data(self): """Process sensor data and send commands""" try: # Get the most recent AI output ai_output = None while True: try: ai_output = self.ai_output_queue.get_nowait() except Empty: break # No more recent outputs if ai_output is not None: # Execute the commands produced by the AI self.execute_control_commands(ai_output) except Exception as e: self.get_logger().error(f'Error in sensor timer: {str(e)}') def main(args=None): rclpy.init(args=args) node = OptimizedAIBridgeNode() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## Error Handling and Recovery Robust AI-ROS bridges must handle errors gracefully: ```python import traceback from rclpy.qos import QoSProfile, ReliabilityPolicy class RobustAIBridgeNode(Node): def __init__(self): super().__init__('robust_ai_bridge_node') # Publishers with custom QoS for reliability qos_profile = QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE) self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', qos_profile) # Subscribers (with error handling) self.joint_state_subscriber = self.create_subscription( JointState, '/joint_states', self.safe_joint_state_callback, qos_profile ) # State tracking self.system_mode = 'normal' # normal, degraded, emergency self.error_count = 0 self.max_errors_before_recovery = 5 # Recovery timer self.recovery_timer = self.create_timer(1.0, self.monitor_system_health) self.get_logger().info('Robust AI Bridge Node initialized') def safe_joint_state_callback(self, msg): """Safe callback with error handling""" try: self.joint_state_callback(msg) except Exception as e: self.error_count += 1 error_msg = f'Joint state callback error: {str(e)}\n{traceback.format_exc()}' self.get_logger().error(error_msg) self.trigger_error_handling() def trigger_error_handling(self): """Handle errors appropriately""" if self.error_count >= self.max_errors_before_recovery: self.get_logger().error('Too many errors, entering emergency mode') self.system_mode = 'emergency' self.emergency_stop() elif self.error_count >= self.max_errors_before_recovery // 2: self.get_logger().warn('High error rate, entering degraded mode') self.system_mode = 'degraded' def emergency_stop(self): """Stop all robot motion""" # Publish zero velocity commands stop_cmd = Twist() self.cmd_vel_publisher.publish(stop_cmd) # Reset joint commands to safe positions safe_joint_cmd = JointState() safe_joint_cmd.header.stamp = self.get_clock().now().to_msg() # Add safe joint positions here self.joint_cmd_publisher.publish(safe_joint_cmd) def monitor_system_health(self): """Monitor system health and attempt recovery""" if self.system_mode == 'emergency': self.get_logger().info('Attempting system recovery...') # Reset error count to allow recovery self.error_count = 0 self.system_mode = 'normal' self.get_logger().info('System recovery attempted') ``` ## Best Practices for AI-ROS Integration ### 1. Keep AI Processing Separate Use separate threads or processes for AI computation to avoid blocking ROS communication. ### 2. Use Appropriate QoS Settings For control commands, use reliable delivery. For sensor data, best-effort might be sufficient. ### 3. Implement Proper Error Handling Always include try-catch blocks around AI processing and implement recovery strategies. ### 4. Monitor Performance Track processing times and system load to ensure real-time performance. ### 5. Log Thoroughly Keep detailed logs to debug issues that may arise from the AI-ROS interaction. ## Summary Bridging Python AI agents with ROS 2 controllers is a critical capability for modern humanoid robots. This chapter covered the fundamental concepts: - Using rclpy to create nodes that interface between AI systems and ROS 2 - Implementing proper communication patterns between AI and control systems - Optimizing performance with threading and queuing - Implementing error handling and recovery strategies - Following best practices for robust integration The bridge between AI algorithms and real robot control is where the intelligence of the robot meets the physical world. Proper design of this interface is critical for safe, reliable, and effective humanoid robot operation. ## Exercises 1. Create a simple bridge node that reads joint states and computes a simple control policy 2. Implement error handling in your bridge to handle sensor failures gracefully 3. Add a feedback loop that adjusts AI behavior based on control performance ## Next Steps In the next chapter, we'll explore how to model humanoid robots using URDF (Unified Robot Description Format), which is essential for simulation and visualization in ROS 2. We'll see how the joint structures we've been discussing connect to the physical model of the robot.