unitree-g1-mujoco / run_sim.py
nepyope's picture
Update run_sim.py
e9c7386 verified
#!/usr/bin/env python3
import sys
from pathlib import Path
import time
# Add sim module to path
sys.path.insert(0, str(Path(__file__).parent))
import yaml
from sim.simulator_factory import SimulatorFactory, init_channel
def main(n_envs=1, use_async_envs: bool = False,
publish_images=True, camera_port=5555, cameras=None, **kwargs):
# Load config
config_path = Path(__file__).parent / "config.yaml"
with open(config_path) as f:
config = yaml.safe_load(f)
# Override config with default values
enable_offscreen = publish_images or config.get("ENABLE_OFFSCREEN", False)
# Configure cameras if requested
camera_configs = {}
if enable_offscreen:
camera_list = cameras or ["head_camera"]
for cam_name in camera_list:
camera_configs[cam_name] = {"height": 480, "width": 640}
print(f"📷 Cameras: {', '.join(camera_list)} → ZMQ port {camera_port}")
print("="*60)
# Initialize DDS channel
init_channel(config=config)
# Create simulator
sim = SimulatorFactory.create_simulator(
config=config,
env_name="default",
onscreen=config.get("ENABLE_ONSCREEN", True),
offscreen=enable_offscreen,
camera_configs=camera_configs,
)
# Start simulator
print("\nSimulator running. Press Ctrl+C to exit.")
if enable_offscreen and publish_images:
print(f"Camera images publishing on tcp://localhost:{camera_port}")
try:
if publish_images:
sim.start_image_publish_subprocess(
start_method="spawn",
camera_port=camera_port,
)
time.sleep(1)
sim.start()
except KeyboardInterrupt:
print("+++++Simulator interrupted by user.")
except Exception as e:
print(f"++++error in simulator: {e} ++++")
finally:
print("++++closing simulator ++++")
sim.close()
if __name__ == "__main__":
main()