suhail
add: course book markdown files for RAG ingestion
cc303f4
metadata
sidebar_position: 4

Chapter 3: Bridging Python Agents to ROS Controllers (rclpy)

Learning Objectives

  • Understand how to create Python nodes that interface with ROS 2
  • Learn to use rclpy to communicate with ROS 2 systems
  • Implement bridges between Python AI agents and ROS controllers
  • Create robust communication patterns between AI and control systems
  • Handle errors and exceptions in AI-control bridges

Introduction to rclpy

rclpy is the Python client library for ROS 2, providing Python bindings for the ROS 2 middleware. It allows Python developers to create ROS 2 nodes and interact with the ROS 2 ecosystem. This is particularly important for humanoid robotics, where Python is widely used for AI and machine learning applications.

Why Python for AI in Robotics

Python is the dominant language for AI and machine learning development due to:

  • Rich ecosystem: Libraries like TensorFlow, PyTorch, scikit-learn, and OpenAI
  • Rapid prototyping: Easy to develop and test AI algorithms
  • Community support: Large community of AI researchers and practitioners
  • Integration capabilities: Easy to integrate different systems and libraries

Understanding the AI-Control Bridge

In humanoid robotics, there's often a need to bridge AI systems (running in Python) with robotic control systems (often using ROS 2). The bridge typically involves:

  1. Receiving sensor data from ROS 2 topics
  2. Processing data through AI algorithms
  3. Generating commands based on AI decisions
  4. Sending commands to robot controllers via ROS 2

Architecture of AI-Control Bridge

[ROS 2 Sensors] → [Python Bridge Node] → [AI Agent] → [Python Bridge Node] → [ROS 2 Controllers]

Setting up rclpy Nodes

Basic Node Structure

import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32
from geometry_msgs.msg import Twist
from sensor_msgs.msg import JointState
import numpy as np

class AIBridgeNode(Node):
    def __init__(self):
        super().__init__('ai_bridge_node')
        
        # Publishers for sending commands to robot
        self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
        self.joint_cmd_publisher = self.create_publisher(JointState, '/joint_commands', 10)
        
        # Subscribers for receiving sensor data
        self.sensor_subscriber = self.create_subscription(
            JointState,
            '/joint_states',
            self.joint_state_callback,
            10
        )
        
        self.imu_subscriber = self.create_subscription(
            String,  # In practice, this would be sensor_msgs/Imu
            '/imu_data',
            self.imu_callback,
            10
        )
        
        # Store state data that will be processed by AI
        self.current_joint_states = JointState()
        self.imu_data = None
        
        # Timer for AI processing loop
        self.processing_timer = self.create_timer(0.1, self.process_ai_step)  # 10 Hz
        
        self.get_logger().info('AI Bridge Node initialized')
    
    def joint_state_callback(self, msg):
        """Process joint state messages"""
        self.current_joint_states = msg
        self.get_logger().debug(f'Received joint states for {len(msg.name)} joints')
    
    def imu_callback(self, msg):
        """Process IMU data"""
        self.imu_data = msg.data
        self.get_logger().debug(f'Received IMU data: {msg.data}')
    
    def process_ai_step(self):
        """Process one step of AI algorithm"""
        # In a real system, this would call your AI agent
        # For now, we'll implement a simple balance controller
        
        if self.imu_data is not None:
            # Simple example: if robot is tilting, send corrective command
            try:
                # Convert string IMU data to numerical values
                tilt_angle = float(self.imu_data)
                
                if abs(tilt_angle) > 0.5:  # If tilting more than 0.5 radians
                    # Send corrective joint commands
                    cmd_msg = JointState()
                    cmd_msg.header.stamp = self.get_clock().now().to_msg()
                    cmd_msg.name = ['left_ankle_pitch', 'right_ankle_pitch']
                    cmd_msg.position = [-tilt_angle * 0.5, -tilt_angle * 0.5]  # Correcting torque
                    
                    self.joint_cmd_publisher.publish(cmd_msg)
                    self.get_logger().info(f'Sent corrective commands for tilt: {tilt_angle}')
            except ValueError:
                self.get_logger().error(f'Could not parse IMU data: {self.imu_data}')

def main(args=None):
    rclpy.init(args=args)
    ai_bridge_node = AIBridgeNode()
    
    try:
        rclpy.spin(ai_bridge_node)
    except KeyboardInterrupt:
        pass
    finally:
        ai_bridge_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Implementing AI Agent Integration

Simple AI Agent Example

import numpy as np
from sklearn.linear_model import LinearRegression

class SimpleAIAgent:
    def __init__(self):
        # In a real system, this might be a neural network or other ML model
        self.model = LinearRegression()
        self.is_trained = False
        
        # For humanoid balance control
        self.balance_history = []
        self.max_history = 100  # Store last 100 samples
    
    def predict_control(self, sensor_data):
        """
        Given sensor data, predict appropriate control actions
        sensor_data: dict containing sensor readings
        """
        if not self.is_trained:
            # For untrained model, return simple proportional control
            tilt = sensor_data.get('tilt', 0.0)
            return {'left_ankle_torque': -tilt * 0.5, 'right_ankle_torque': -tilt * 0.5}
        
        # Use trained model to predict control
        # This is a simplified example
        features = np.array([sensor_data['tilt'], sensor_data['angular_velocity']]).reshape(1, -1)
        control_output = self.model.predict(features)
        
        return {
            'left_ankle_torque': float(control_output[0]),
            'right_ankle_torque': float(control_output[1])
        }
    
    def add_training_data(self, sensor_data, control_output):
        """Add training data for future learning"""
        self.balance_history.append({
            'sensor': sensor_data.copy(),
            'control': control_output.copy()
        })
        
        # Keep only recent history
        if len(self.balance_history) > self.max_history:
            self.balance_history.pop(0)
    
    def train_model(self):
        """Train the model with collected data"""
        if len(self.balance_history) < 10:  # Need minimum data
            return False
        
        # Prepare training data
        X = []  # Sensor inputs
        y = []  # Control outputs
        
        for sample in self.balance_history:
            sensor_data = sample['sensor']
            control_data = sample['control']
            
            X.append([sensor_data['tilt'], sensor_data['angular_velocity']])
            y.append([control_data['left_ankle_torque'], control_data['right_ankle_torque']])
        
        X = np.array(X)
        y = np.array(y)
        
        # Train the model
        self.model.fit(X, y)
        self.is_trained = True
        
        return True

Advanced Bridge Node with AI Integration

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState, Imu
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
from builtin_interfaces.msg import Time
import numpy as np
import time

class AdvancedAIBridgeNode(Node):
    def __init__(self):
        super().__init__('advanced_ai_bridge_node')
        
        # Publishers
        self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
        self.joint_cmd_publisher = self.create_publisher(JointState, '/joint_commands', 10)
        self.ai_feedback_publisher = self.create_publisher(Float32, '/ai_control_effort', 10)
        
        # Subscribers
        self.joint_state_subscriber = self.create_subscription(
            JointState,
            '/joint_states',
            self.joint_state_callback,
            10
        )
        
        self.imu_subscriber = self.create_subscription(
            Imu,
            '/imu/data',
            self.imu_callback,
            10
        )
        
        self.force_torque_subscriber = self.create_subscription(
            String,  # In practice, this would use WrenchStamped or similar
            '/ft_sensors',
            self.ft_callback,
            10
        )
        
        # Initialize AI agent
        self.ai_agent = SimpleAIAgent()
        
        # State variables
        self.current_joint_states = JointState()
        self.current_imu = None
        self.ft_data = None
        self.last_control_time = time.time()
        
        # Processing timer
        self.processing_timer = self.create_timer(0.05, self.process_ai_step)  # 20 Hz
        
        # Training timer (slow, for continuous learning)
        self.training_timer = self.create_timer(5.0, self.train_model_if_needed)
        
        self.get_logger().info('Advanced AI Bridge Node initialized')
    
    def joint_state_callback(self, msg):
        """Handle joint state updates"""
        self.current_joint_states = msg
    
    def imu_callback(self, msg):
        """Handle IMU data updates"""
        self.current_imu = msg
    
    def ft_callback(self, msg):
        """Handle force/torque sensor updates"""
        self.ft_data = msg.data  # Simplified string representation
    
    def process_ai_step(self):
        """Main AI processing step"""
        # Gather current sensor data
        sensor_data = self.get_sensor_data()
        
        if sensor_data is None:
            # Insufficient data to proceed
            return
        
        try:
            # Get AI prediction
            control_output = self.ai_agent.predict_control(sensor_data)
            
            # Execute control commands
            self.execute_control_commands(control_output)
            
            # Calculate and publish control effort feedback
            effort = self.calculate_control_effort(control_output)
            effort_msg = Float32()
            effort_msg.data = effort
            self.ai_feedback_publisher.publish(effort_msg)
            
            # Optionally store training data
            self.store_training_data(sensor_data, control_output)
            
            # Update timing for next control step
            self.last_control_time = time.time()
            
        except Exception as e:
            self.get_logger().error(f'Error in AI processing: {str(e)}')
    
    def get_sensor_data(self):
        """Extract relevant sensor data for the AI agent"""
        if self.current_imu is None:
            return None
        
        # Extract relevant information from sensors
        sensor_data = {
            'tilt': self.current_imu.orientation.z,  # Simplified - in real systems, would use proper orientation
            'angular_velocity': self.current_imu.angular_velocity.z,
            'linear_acceleration': self.current_imu.linear_acceleration.x,
            'joint_positions': dict(zip(self.current_joint_states.name, self.current_joint_states.position)),
            'joint_velocities': dict(zip(self.current_joint_states.name, self.current_joint_states.velocity))
        }
        
        # Add time-based features
        dt = time.time() - self.last_control_time
        sensor_data['dt'] = dt
        
        return sensor_data
    
    def execute_control_commands(self, control_output):
        """Execute the control commands from the AI agent"""
        # Create joint command message
        joint_cmd_msg = JointState()
        joint_cmd_msg.header.stamp = self.get_clock().now().to_msg()
        
        # Add commanded joint positions/torques
        for joint_name, torque_value in control_output.items():
            if 'torque' in joint_name:
                # This is a torque command
                joint_name_clean = joint_name.replace('_torque', '')
                joint_cmd_msg.name.append(joint_name_clean)
                joint_cmd_msg.effort.append(torque_value)
            elif 'position' in joint_name:
                # This is a position command
                joint_name_clean = joint_name.replace('_position', '')
                joint_cmd_msg.name.append(joint_name_clean)
                joint_cmd_msg.position.append(torque_value)
        
        # Publish joint commands
        if len(joint_cmd_msg.name) > 0:
            self.joint_cmd_publisher.publish(joint_cmd_msg)
    
    def calculate_control_effort(self, control_output):
        """Calculate a measure of control effort"""
        effort = 0.0
        for value in control_output.values():
            effort += abs(value)
        return effort
    
    def store_training_data(self, sensor_data, control_output):
        """Store data for future training"""
        self.ai_agent.add_training_data(sensor_data, control_output)
    
    def train_model_if_needed(self):
        """Periodically attempt to train the model if enough data is available"""
        success = self.ai_agent.train_model()
        if success:
            self.get_logger().info('AI model retrained successfully')
        else:
            self.get_logger().debug('Not enough data to retrain AI model')

def main(args=None):
    rclpy.init(args=args)
    node = AdvancedAIBridgeNode()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Real-World Considerations

Performance Optimization

When bridging Python AI agents with ROS 2 controllers, performance is critical:

import threading
from queue import Queue, Empty
import numpy as np

class OptimizedAIBridgeNode(Node):
    def __init__(self):
        super().__init__('optimized_ai_bridge_node')
        
        # Publishers and subscribers (similar to previous example)
        self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
        self.joint_cmd_publisher = self.create_publisher(JointState, '/joint_commands', 10)
        
        self.joint_state_subscriber = self.create_subscription(
            JointState,
            '/joint_states',
            self.joint_state_callback,
            10
        )
        
        # Separate thread for AI processing
        self.ai_input_queue = Queue(maxsize=10)  # Limit queue size
        self.ai_output_queue = Queue(maxsize=10)
        
        # Start AI processing thread
        self.ai_thread = threading.Thread(target=self.ai_processing_loop, daemon=True)
        self.ai_thread.start()
        
        # Timer for sensor data processing
        self.sensor_timer = self.create_timer(0.02, self.process_sensor_data)  # 50 Hz
        self.get_logger().info('Optimized AI Bridge Node initialized')
    
    def joint_state_callback(self, msg):
        """Non-blocking sensor data processing"""
        try:
            # Convert to numpy array for efficient processing
            joint_pos = np.array(msg.position)
            joint_vel = np.array(msg.velocity)
            
            # Prepare sensor data packet
            sensor_data = {
                'timestamp': time.time(),
                'joint_positions': joint_pos,
                'joint_velocities': joint_vel,
                'joint_names': msg.name
            }
            
            # Add to AI input queue if there's space
            try:
                self.ai_input_queue.put_nowait(sensor_data)
            except:
                # Queue is full, drop the oldest data
                try:
                    self.ai_input_queue.get_nowait()
                    self.ai_input_queue.put_nowait(sensor_data)
                except:
                    pass  # Still full, drop this data point
        except Exception as e:
            self.get_logger().error(f'Error in joint callback: {str(e)}')
    
    def ai_processing_loop(self):
        """Dedicated thread for AI processing"""
        while rclpy.ok():
            try:
                # Get the most recent sensor data
                sensor_data = None
                while True:
                    try:
                        sensor_data = self.ai_input_queue.get_nowait()
                    except Empty:
                        break  # No more recent data
                
                if sensor_data is not None:
                    # Process with AI model (this can be computationally expensive)
                    control_output = self.process_with_ai(sensor_data)
                    
                    # Add to output queue
                    try:
                        self.ai_output_queue.put_nowait(control_output)
                    except:
                        # Output queue full, drop the result
                        pass
                        
            except Exception as e:
                self.get_logger().error(f'Error in AI thread: {str(e)}')
            
            # Brief sleep to prevent busy waiting
            time.sleep(0.001)
    
    def process_with_ai(self, sensor_data):
        """AI processing function (runs in separate thread)"""
        # This is where the heavy AI computation happens
        # For example, running a neural network
        pass
    
    def process_sensor_data(self):
        """Process sensor data and send commands"""
        try:
            # Get the most recent AI output
            ai_output = None
            while True:
                try:
                    ai_output = self.ai_output_queue.get_nowait()
                except Empty:
                    break  # No more recent outputs
            
            if ai_output is not None:
                # Execute the commands produced by the AI
                self.execute_control_commands(ai_output)
        except Exception as e:
            self.get_logger().error(f'Error in sensor timer: {str(e)}')

def main(args=None):
    rclpy.init(args=args)
    node = OptimizedAIBridgeNode()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Error Handling and Recovery

Robust AI-ROS bridges must handle errors gracefully:

import traceback
from rclpy.qos import QoSProfile, ReliabilityPolicy

class RobustAIBridgeNode(Node):
    def __init__(self):
        super().__init__('robust_ai_bridge_node')
        
        # Publishers with custom QoS for reliability
        qos_profile = QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE)
        self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', qos_profile)
        
        # Subscribers (with error handling)
        self.joint_state_subscriber = self.create_subscription(
            JointState,
            '/joint_states',
            self.safe_joint_state_callback,
            qos_profile
        )
        
        # State tracking
        self.system_mode = 'normal'  # normal, degraded, emergency
        self.error_count = 0
        self.max_errors_before_recovery = 5
        
        # Recovery timer
        self.recovery_timer = self.create_timer(1.0, self.monitor_system_health)
        
        self.get_logger().info('Robust AI Bridge Node initialized')
    
    def safe_joint_state_callback(self, msg):
        """Safe callback with error handling"""
        try:
            self.joint_state_callback(msg)
        except Exception as e:
            self.error_count += 1
            error_msg = f'Joint state callback error: {str(e)}\n{traceback.format_exc()}'
            self.get_logger().error(error_msg)
            self.trigger_error_handling()
    
    def trigger_error_handling(self):
        """Handle errors appropriately"""
        if self.error_count >= self.max_errors_before_recovery:
            self.get_logger().error('Too many errors, entering emergency mode')
            self.system_mode = 'emergency'
            self.emergency_stop()
        elif self.error_count >= self.max_errors_before_recovery // 2:
            self.get_logger().warn('High error rate, entering degraded mode')
            self.system_mode = 'degraded'
    
    def emergency_stop(self):
        """Stop all robot motion"""
        # Publish zero velocity commands
        stop_cmd = Twist()
        self.cmd_vel_publisher.publish(stop_cmd)
        
        # Reset joint commands to safe positions
        safe_joint_cmd = JointState()
        safe_joint_cmd.header.stamp = self.get_clock().now().to_msg()
        # Add safe joint positions here
        self.joint_cmd_publisher.publish(safe_joint_cmd)
    
    def monitor_system_health(self):
        """Monitor system health and attempt recovery"""
        if self.system_mode == 'emergency':
            self.get_logger().info('Attempting system recovery...')
            # Reset error count to allow recovery
            self.error_count = 0
            self.system_mode = 'normal'
            self.get_logger().info('System recovery attempted')

Best Practices for AI-ROS Integration

1. Keep AI Processing Separate

Use separate threads or processes for AI computation to avoid blocking ROS communication.

2. Use Appropriate QoS Settings

For control commands, use reliable delivery. For sensor data, best-effort might be sufficient.

3. Implement Proper Error Handling

Always include try-catch blocks around AI processing and implement recovery strategies.

4. Monitor Performance

Track processing times and system load to ensure real-time performance.

5. Log Thoroughly

Keep detailed logs to debug issues that may arise from the AI-ROS interaction.

Summary

Bridging Python AI agents with ROS 2 controllers is a critical capability for modern humanoid robots. This chapter covered the fundamental concepts:

  • Using rclpy to create nodes that interface between AI systems and ROS 2
  • Implementing proper communication patterns between AI and control systems
  • Optimizing performance with threading and queuing
  • Implementing error handling and recovery strategies
  • Following best practices for robust integration

The bridge between AI algorithms and real robot control is where the intelligence of the robot meets the physical world. Proper design of this interface is critical for safe, reliable, and effective humanoid robot operation.

Exercises

  1. Create a simple bridge node that reads joint states and computes a simple control policy
  2. Implement error handling in your bridge to handle sensor failures gracefully
  3. Add a feedback loop that adjusts AI behavior based on control performance

Next Steps

In the next chapter, we'll explore how to model humanoid robots using URDF (Unified Robot Description Format), which is essential for simulation and visualization in ROS 2. We'll see how the joint structures we've been discussing connect to the physical model of the robot.