--- sidebar_position: 3 --- # Chapter 2: ROS 2 Nodes, Topics, and Services ## Learning Objectives - Understand the fundamental communication patterns in ROS 2 - Create and implement ROS 2 nodes for specific functionality - Master the publish/subscribe communication model using topics - Implement request/response communication using services - Apply communication patterns to humanoid robot systems ## Nodes in ROS 2 A node is an executable that uses ROS 2 to communicate with other nodes. Nodes are the fundamental building blocks of a ROS 2 program. A single system might have many nodes running at once, each performing a specific task. ### Creating a Node In Python, a node is created by extending the `Node` class from `rclpy`: ```python import rclpy from rclpy.node import Node class MinimalPublisher(Node): def __init__(self): super().__init__('minimal_publisher') self.publisher = self.create_publisher(String, 'topic', 10) timer_period = 0.5 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.i = 0 def timer_callback(self): msg = String() msg.data = 'Hello World: %d' % self.i self.publisher.publish(msg) self.get_logger().info('Publishing: "%s"' % msg.data) self.i += 1 def main(args=None): rclpy.init(args=args) minimal_publisher = MinimalPublisher() rclpy.spin(minimal_publisher) minimal_publisher.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ### Node Lifecycle ROS 2 nodes have a well-defined lifecycle that includes: - **Unconfigured**: Initial state after creation - **Inactive**: Configured but not active - **Active**: Fully operational and running - **Finalized**: Cleanup phase before deletion ## Topics and Messages Topics are named buses over which nodes exchange messages. Messages are the data packets sent from publisher nodes to subscriber nodes over topics. The publish/subscribe paradigm is a core communication pattern in ROS 2. ### Message Types ROS 2 provides a rich set of standard message types in the `std_msgs` package: - `String`: Text data - `Int32`, `Float32`: Numeric data - `Bool`: Boolean values - `Header`: Timestamp and frame information Additionally, there are more specialized message types in packages like `sensor_msgs`, `geometry_msgs`, and `nav_msgs`. ### Publishing to a Topic ```python import rclpy from rclpy.node import Node from std_msgs.msg import String class Talker(Node): def __init__(self): super().__init__('talker') self.publisher = self.create_publisher(String, 'chatter', 10) timer_period = 0.5 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.i = 0 def timer_callback(self): msg = String() msg.data = 'Hello World: %d' % self.i self.publisher.publish(msg) self.get_logger().info('Publishing: "%s"' % msg.data) self.i += 1 def main(args=None): rclpy.init(args=args) talker = Talker() rclpy.spin(talker) talker.destroy_node() rclpy.shutdown() ``` ### Subscribing to a Topic ```python import rclpy from rclpy.node import Node from std_msgs.msg import String class Listener(Node): def __init__(self): super().__init__('listener') self.subscription = self.create_subscription( String, 'chatter', self.listener_callback, 10) self.subscription # prevent unused variable warning def listener_callback(self, msg): self.get_logger().info('I heard: "%s"' % msg.data) def main(args=None): rclpy.init(args=args) listener = Listener() rclpy.spin(listener) listener.destroy_node() rclpy.shutdown() ``` ## Services in ROS 2 Services provide a request/reply communication pattern in ROS 2. A service client sends a request to a service server, which processes the request and returns a response. ### Service Types Service types are defined using `.srv` files, which specify the request and response messages: ``` # Request message string name int32 age --- # Response message bool success string message ``` ### Creating a Service Server ```python from rclpy.node import Node from example_interfaces.srv import AddTwoInts class MinimalService(Node): def __init__(self): super().__init__('minimal_service') self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback) def add_two_ints_callback(self, request, response): response.sum = request.a + request.b self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b)) return response def main(): rclpy.init() minimal_service = MinimalService() rclpy.spin(minimal_service) rclpy.shutdown() ``` ### Creating a Service Client ```python import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class MinimalClient(Node): def __init__(self): super().__init__('minimal_client') self.cli = self.create_client(AddTwoInts, 'add_two_ints') while not self.cli.wait_for_service(timeout_sec=1.0): self.get_logger().info('service not available, waiting again...') self.req = AddTwoInts.Request() def send_request(self, a, b): self.req.a = a self.req.b = b self.future = self.cli.call_async(self.req) rclpy.spin_until_future_complete(self, self.future) return self.future.result() def main(): rclpy.init() minimal_client = MinimalClient() response = minimal_client.send_request(1, 2) minimal_client.get_logger().info( 'Result of add_two_ints: for %d + %d = %d' % (1, 2, response.sum)) minimal_client.destroy_node() rclpy.shutdown() ``` ## Quality of Service (QoS) in ROS 2 QoS profiles allow you to configure how messages are delivered between publishers and subscribers. This is important for real-time systems and reliable communication: ```python from rclpy.qos import QoSProfile, QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy # Create a QoS profile for real-time performance qos_profile = QoSProfile( depth=10, history=QoSHistoryPolicy.RMW_QOS_HISTORY_POLICY_KEEP_LAST, reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT, durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE ) publisher = self.create_publisher(String, 'topic', qos_profile) ``` ## Application to Humanoid Robotics In humanoid robotics, ROS 2 communication patterns are used extensively: ### Joint Control - **Topics**: Joint states published at high frequency - **Services**: Calibration routines, mode switching - **Actions**: Complex movements that take time to complete ### Sensor Integration - **Topics**: Camera images, IMU data, force/torque sensors - **Services**: Sensor configuration, calibration - **Actions**: Long-running sensor tasks like mapping ### Navigation - **Topics**: Odometry, laser scans, costmaps - **Services**: Global planning, costmap updates - **Actions**: Path following, navigation goals ## ROS 2 Tools for Communication ### ros2 topic ```bash # List all topics ros2 topic list # Echo messages on a topic ros2 topic echo /chatter std_msgs/msg/String # Publish a message to a topic ros2 topic pub /chatter std_msgs/msg/String "data: Hello" ``` ### ros2 service ```bash # List all services ros2 service list # Call a service ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 1, b: 2}" ``` ## Example: Simple Humanoid Control Node ```python import rclpy from rclpy.node import Node from std_msgs.msg import String from sensor_msgs.msg import JointState from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint class HumanoidController(Node): def __init__(self): super().__init__('humanoid_controller') # Publishers self.joint_cmd_publisher = self.create_publisher( JointTrajectory, '/joint_trajectory_controller/joint_trajectory', 10 ) # Subscribers self.joint_state_subscriber = self.create_subscription( JointState, '/joint_states', self.joint_state_callback, 10 ) # Command subscriber self.command_subscriber = self.create_subscription( String, '/humanoid_commands', self.command_callback, 10 ) # Store current joint states self.current_joint_states = JointState() self.get_logger().info('Humanoid Controller initialized') def joint_state_callback(self, msg): self.current_joint_states = msg def command_callback(self, msg): command = msg.data self.get_logger().info(f'Received command: {command}') if command == 'wave': self.execute_wave_motion() elif command == 'stand': self.move_to_standing_position() def execute_wave_motion(self): # Create a joint trajectory for waving trajectory = JointTrajectory() trajectory.joint_names = ['right_shoulder_roll', 'right_elbow_pitch'] # Create trajectory points point1 = JointTrajectoryPoint() point1.positions = [0.0, 0.0] # neutral position point1.time_from_start.sec = 1 trajectory.points.append(point1) point2 = JointTrajectoryPoint() point2.positions = [0.5, -0.5] # wave position point2.time_from_start.sec = 2 trajectory.points.append(point2) point3 = JointTrajectoryPoint() point3.positions = [0.0, 0.0] # return to neutral point3.time_from_start.sec = 3 trajectory.points.append(point3) # Publish the trajectory self.joint_cmd_publisher.publish(trajectory) def move_to_standing_position(self): # Move to a predefined standing position trajectory = JointTrajectory() trajectory.joint_names = [ 'left_hip_pitch', 'left_knee_pitch', 'left_ankle_pitch', 'right_hip_pitch', 'right_knee_pitch', 'right_ankle_pitch' ] point = JointTrajectoryPoint() point.positions = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # standing position point.time_from_start.sec = 2 trajectory.points.append(point) self.joint_cmd_publisher.publish(trajectory) def main(args=None): rclpy.init(args=args) controller = HumanoidController() rclpy.spin(controller) controller.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## Summary Nodes, topics, and services form the foundation of communication in ROS 2 systems. Understanding these concepts is essential for developing complex robotic systems like humanoid robots. The publish/subscribe model is ideal for sensor data and state updates, while services are perfect for request/response interactions. Proper use of Quality of Service settings ensures reliable communication for time-critical applications. ## Exercises 1. Create a publisher node that publishes joint commands at 50Hz 2. Create a subscriber node that listens to IMU data and logs orientation 3. Implement a service that takes a target position and plans a trajectory ## Next Steps In the next chapter, we'll explore how to bridge Python AI agents with ROS controllers using rclpy, connecting the AI systems we'll develop with the physical robot control.