--- sidebar_position: 5 --- # Chapter 4: Integrating Gazebo and Unity ## Learning Objectives - Understand how to combine Gazebo and Unity for comprehensive robot simulation - Learn architectural approaches for multi-platform simulation - Implement data synchronization between Gazebo and Unity - Create seamless workflows that leverage the strengths of both platforms - Design hybrid simulation environments for humanoid robots ## Introduction to Multi-Platform Simulation Using both Gazebo and Unity in a single robotics pipeline allows us to leverage the strengths of each platform: - **Gazebo**: Excellent physics simulation, established ROS integration, sensor modeling - **Unity**: High-fidelity rendering, realistic visualization, advanced graphics capabilities The integration of both platforms enables a complete simulation pipeline where physics-correct simulation occurs in Gazebo while high-fidelity visualization and human interaction happen in Unity. ## Approaches to Integration ### 1. Parallel Simulation with Synchronization Run both simulators independently but keep their states synchronized: ``` [Robot Controller] → [Gazebo (Physics)] ↔ [Synchronization Layer] ↔ [Unity (Rendering)] ↓ ↓ ↓ [Physics Results] [State Update] [Visualization] ``` ### 2. Specialized Task Allocation - **Gazebo**: Physics simulation, sensor data generation, navigation planning - **Unity**: High-fidelity rendering, human-robot interaction, realistic scene visualization ### 3. Hybrid Architecture Use a middleware layer to manage data flow between both simulators: ``` [ROS 2 Nodes] | [Middleware Layer] |---------|---------| | | | [Gazebo] [Unity] [Other Tools] (Physics) (Rendering) ``` ## Implementation Architecture ### Synchronization Layer The synchronization layer is critical for maintaining consistency between both simulators: ```python import rclpy from rclpy.node import Node from std_msgs.msg import String from sensor_msgs.msg import JointState from geometry_msgs.msg import PoseStamped, Twist import time import threading class SimulationSynchronizer(Node): def __init__(self): super().__init__('simulation_synchronizer') # Publishers for both simulators self.gazebo_joint_pub = self.create_publisher(JointState, '/gazebo/joint_commands', 10) self.unity_joint_pub = self.create_publisher(JointState, '/unity/joint_commands', 10) # Subscribers from both simulators self.gazebo_state_sub = self.create_subscription( JointState, '/gazebo/joint_states', self.gazebo_state_callback, 10) self.unity_state_sub = self.create_subscription( JointState, '/unity/joint_states', self.unity_state_callback, 10) # Sync timer self.sync_timer = self.create_timer(0.01, self.sync_states) # 100 Hz sync # State storage self.gazebo_state = JointState() self.unity_state = JointState() self.last_sync_time = time.time() self.get_logger().info('Simulation Synchronizer initialized') def gazebo_state_callback(self, msg): """Handle joint states from Gazebo""" self.gazebo_state = msg # Forward to Unity for visualization self.unity_joint_pub.publish(msg) def unity_state_callback(self, msg): """Handle joint states from Unity visualization""" self.unity_state = msg # Could forward to Gazebo if needed # In typical setup, Unity is "slave" to physics simulation def sync_states(self): """Synchronize states between simulators""" # In this example, Gazebo is authoritative for physics # Unity follows Gazebo's state for visualization # Check for significant desync (e.g., due to network delay) if self.gazebo_state.name and self.unity_state.name: if len(self.gazebo_state.position) == len(self.unity_state.position): max_diff = 0 for i in range(len(self.gazebo_state.position)): diff = abs(self.gazebo_state.position[i] - self.unity_state.position[i]) max_diff = max(max_diff, diff) if max_diff > 0.1: # Threshold for significant desync self.get_logger().warn(f'Significant desync detected: {max_diff} rad') # Keep Unity in sync with Gazebo state if self.gazebo_state.name and self.gazebo_state.position: sync_msg = JointState() sync_msg.header.stamp = self.get_clock().now().to_msg() sync_msg.name = self.gazebo_state.name sync_msg.position = self.gazebo_state.position sync_msg.velocity = self.gazebo_state.velocity sync_msg.effort = self.gazebo_state.effort # Send to Unity to update visualization self.unity_joint_pub.publish(sync_msg) def main(args=None): rclpy.init(args=args) synchronizer = SimulationSynchronizer() try: rclpy.spin(synchronizer) except KeyboardInterrupt: pass finally: synchronizer.destroy_node() rclpy.shutdown() ``` ## Advanced Synchronization Techniques ### State Prediction for Network Delays When there are network delays between simulators, we can use prediction: ```python import numpy as np from collections import deque class PredictiveSynchronizer: def __init__(self): self.state_history = deque(maxlen=10) # Store last 10 states self.network_delay_estimate = 0.05 # 50ms delay estimate def add_state(self, state, timestamp): """Add state to history for prediction""" self.state_history.append({ 'state': state, 'timestamp': timestamp }) def predict_state(self, target_time): """Predict state at a future time""" if len(self.state_history) < 2: return None # Simple linear extrapolation between last two states recent_states = list(self.state_history)[-2:] if len(recent_states) < 2: return recent_states[0]['state'] state1 = recent_states[0] state2 = recent_states[1] dt = state2['timestamp'] - state1['timestamp'] if dt <= 0: return state2['state'] # Calculate velocity (simplified for joint positions) dt_target = target_time - state2['timestamp'] predicted_state = JointState() predicted_state.name = state2['state'].name predicted_state.position = [] for i in range(len(state2['state'].position)): if i < len(state1['state'].position): velocity = (state2['state'].position[i] - state1['state'].position[i]) / dt predicted_pos = state2['state'].position[i] + velocity * dt_target predicted_state.position.append(predicted_pos) else: predicted_state.position.append(state2['state'].position[i]) return predicted_state ``` ## Data Exchange Patterns ### 1. Sensor Data Exchange ```python from sensor_msgs.msg import LaserScan, Image, Imu from nav_msgs.msg import Odometry class SensorDataExchange: def __init__(self, node): self.node = node # Publishers to both simulators self.gazebo_sensor_pub = node.create_publisher(LaserScan, '/gazebo/scanner', 10) self.unity_sensor_pub = node.create_publisher(Image, '/unity/camera', 10) # Subscribers from both simulators self.gazebo_odom_sub = node.create_subscription( Odometry, '/gazebo/odom', self.gazebo_odom_callback, 10) self.unity_imu_sub = node.create_subscription( Imu, '/unity/imu', self.unity_imu_callback, 10) # Synchronization flag self.odom_sync_enabled = True self.imu_sync_enabled = True def gazebo_odom_callback(self, msg): """Forward Gazebo odometry to Unity for visualization""" if self.odom_sync_enabled: # Convert odometry to transform for Unity # This would typically send pose data to Unity visualization pass def unity_imu_callback(self, msg): """Forward Unity IMU to Gazebo for simulation""" if self.imu_sync_enabled: # In a real implementation, Unity might send IMU data back to Gazebo # for consistency between visual and physical simulation pass ``` ### 2. Control Command Distribution ```python class ControlDistribution: def __init__(self, node): self.node = node # Subscriber for high-level commands self.cmd_sub = node.create_subscription( Twist, '/cmd_vel', self.command_callback, 10) # Publishers to both simulators self.gazebo_cmd_pub = node.create_publisher(Twist, '/gazebo/cmd_vel', 10) self.unity_cmd_pub = node.create_publisher(Twist, '/unity/cmd_vel', 10) # Control parameters self.distribution_mode = "duplicate" # duplicate or split by function def command_callback(self, msg): """Distribute commands to both simulators""" if self.distribution_mode == "duplicate": # Send the same command to both simulators self.gazebo_cmd_pub.publish(msg) self.unity_cmd_pub.publish(msg) elif self.distribution_mode == "function_split": # Different parts of the command go to different simulators # e.g., locomotion to Gazebo, head movement to Unity self.distribute_by_function(msg) def distribute_by_function(self, cmd): """Distribute command based on function""" # Send translational movement to Gazebo (physics) gazebo_cmd = Twist() gazebo_cmd.linear.x = cmd.linear.x gazebo_cmd.linear.y = cmd.linear.y gazebo_cmd.angular.z = cmd.angular.z # Turning self.gazebo_cmd_pub.publish(gazebo_cmd) # Send head movement to Unity (visualization) unity_cmd = Twist() unity_cmd.angular.x = cmd.angular.x # Neck pitch unity_cmd.angular.y = cmd.angular.y # Neck yaw self.unity_cmd_pub.publish(unity_cmd) ``` ## Real-World Implementation Example ### Launch File for Integrated Simulation ```xml from launch import LaunchDescription from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument from launch.launch_description_sources import PythonLaunchDescriptionSource from launch.substitutions import PathJoinSubstitution, TextSubstitution from launch_ros.actions import Node from launch_ros.substitutions import FindPackageShare def generate_launch_description(): # Launch Gazebo simulation gazebo_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource([ PathJoinSubstitution([ FindPackageShare('gazebo_ros'), 'launch', 'gazebo.launch.py' ]) ]), launch_arguments={ 'world': PathJoinSubstitution([ FindPackageShare('humanoid_simulation'), 'worlds', 'humanoid_world.world' ]) }.items() ) # Launch Unity simulation (this would connect via TCP) unity_node = Node( package='unity_simulation', executable='unity_bridge', name='unity_bridge', parameters=[ {'unity_ip': '127.0.0.1'}, {'unity_port': 10000} ] ) # Launch synchronization node sync_node = Node( package='simulation_integration', executable='simulation_synchronizer', name='simulation_synchronizer' ) # Launch sensor data exchange sensor_exchange_node = Node( package='simulation_integration', executable='sensor_exchange', name='sensor_exchange' ) # Launch control distribution control_dist_node = Node( package='simulation_integration', executable='control_distribution', name='control_distribution' ) return LaunchDescription([ gazebo_launch, unity_node, sync_node, sensor_exchange_node, control_dist_node ]) ``` ## Visualization Data Pipeline ### Converting Gazebo Data for Unity ```csharp // Unity C# script to receive data from Gazebo via ROS-TCP-Connector using UnityEngine; using Unity.Robotics.ROSTCPConnector; using Unity.Robotics.ROSTCPConnector.MessageTypes.Sensor_msgs; using Unity.Robotics.ROSTCPConnector.MessageTypes.Geometry_msgs; public class GazeboDataReceiver : MonoBehaviour { ROSConnection ros; public Transform robotRoot; public Transform[] jointTransforms; public string[] jointNames; void Start() { ros = ROSConnection.instance; ros.Subscribe("/gazebo/joint_states", JointStateCallback); } void JointStateCallback(JointStateMsg jointState) { for (int i = 0; i < jointState.name.Length; i++) { string jointName = jointState.name[i]; double position = jointState.position[i]; // Find corresponding joint in Unity model for (int j = 0; j < jointNames.Length; j++) { if (jointNames[j] == jointName && j < jointTransforms.Length) { // Apply position to Unity joint (convert from radians to degrees) jointTransforms[j].localEulerAngles = new Vector3( 0, (float)(position * Mathf.Rad2Deg), 0 ); break; } } } // Update robot position and orientation // This would come from a separate pose topic in a complete implementation } } ``` ## Performance Considerations ### Optimization Strategies 1. **Selective Synchronization**: Only sync states that are critical for visual consistency 2. **Data Compression**: Reduce data size through quantization or sampling 3. **Asynchronous Processing**: Use threading to prevent blocking operations 4. **Caching**: Cache frequently accessed transformations ```python class OptimizedSynchronizer: def __init__(self): self.last_sync_values = {} # Cache last synced values self.sync_threshold = 0.01 # Only sync if change exceeds threshold self.compression_factor = 4 # Reduce update frequency def should_sync_joint(self, joint_name, new_value): """Determine if a joint needs synchronization""" if joint_name not in self.last_sync_values: return True old_value = self.last_sync_values[joint_name] return abs(new_value - old_value) > self.sync_threshold def compress_state(self, joint_state): """Compress joint state data""" compressed = JointState() compressed.header = joint_state.header # Only include joints that have changed significantly for i, name in enumerate(joint_state.name): if i < len(joint_state.position): if self.should_sync_joint(name, joint_state.position[i]): compressed.name.append(name) compressed.position.append(joint_state.position[i]) self.last_sync_values[name] = joint_state.position[i] return compressed ``` ## Debugging Multi-Platform Simulations ### Diagnostic Tools ```python class SimulationDiagnostics(Node): def __init__(self): super().__init__('simulation_diagnostics') # Publishers for diagnostic data self.diag_pub = self.create_publisher(String, '/simulation_diagnostics', 10) # Diagnostic timer self.diag_timer = self.create_timer(5.0, self.publish_diagnostics) # State tracking self.gazebo_active = True self.unity_active = True self.sync_deviation = 0.0 self.network_latency = 0.0 def publish_diagnostics(self): """Publish diagnostic information""" diag_msg = String() diag_msg.data = f""" Simulation Diagnostics: - Gazebo Active: {self.gazebo_active} - Unity Active: {self.unity_active} - Sync Deviation: {self.sync_deviation:.3f} rad - Network Latency: {self.network_latency:.3f} ms - Last Sync: {time.time()} """ self.diag_pub.publish(diag_msg) ``` ## Use Cases for Integrated Simulation ### 1. Human-Robot Interaction Studies - Physics-correct robot behavior in Gazebo - Realistic human avatars and environments in Unity - Accurate sensor simulation in Gazebo for perception - High-fidelity rendering for human participants ### 2. Training AI Models - Physics-accurate simulation for control training (Gazebo) - High-fidelity rendering for perception training (Unity) - Synthetic data generation with ground truth from both platforms ### 3. System Integration Testing - Complete robot software stack with both simulators - Validation of perception-action loops - Human-in-the-loop testing with realistic interfaces ## Troubleshooting Common Issues 1. **Desynchronization**: Implement state prediction and correction 2. **Network Latency**: Optimize data transmission and add buffering 3. **Performance**: Use selective synchronization and data compression 4. **Data Format Mismatch**: Ensure consistent units and coordinate frames ## Summary This chapter covered the integration of Gazebo and Unity for comprehensive humanoid robot simulation: - Architectural approaches for multi-platform simulation - Implementation of synchronization layers between simulators - Advanced techniques for data exchange and performance optimization - Real-world implementation examples and use cases - Debugging and troubleshooting strategies for integrated simulations The integration of Gazebo and Unity provides a powerful platform for humanoid robot development, combining accurate physics simulation with high-fidelity visualization. This approach enables more comprehensive testing and development of humanoid robots in a realistic virtual environment. ## Exercises 1. Set up a simple synchronization node between two simulated robots 2. Create a launch file that starts both Gazebo and Unity with synchronization 3. Implement a compression algorithm for joint state data ## Next Steps With a complete understanding of physics simulation, sensor modeling, high-fidelity rendering, and simulation integration, you're now ready to explore the NVIDIA Isaac ecosystem in the next module. Isaac provides GPU-accelerated perception and AI capabilities that complement the simulation foundation you've built.