File size: 2,000 Bytes
ef6a683 e9c7386 ef6a683 66e65e2 68cf2b8 ef6a683 66e65e2 68cf2b8 ef6a683 66e65e2 ef6a683 66e65e2 ef6a683 e9c7386 ef6a683 66e65e2 ef6a683 e9c7386 ef6a683 e9c7386 ef6a683 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
#!/usr/bin/env python3
import sys
from pathlib import Path
import time
# Add sim module to path
sys.path.insert(0, str(Path(__file__).parent))
import yaml
from sim.simulator_factory import SimulatorFactory, init_channel
def main(n_envs=1, use_async_envs: bool = False,
publish_images=True, camera_port=5555, cameras=None, **kwargs):
# Load config
config_path = Path(__file__).parent / "config.yaml"
with open(config_path) as f:
config = yaml.safe_load(f)
# Override config with default values
enable_offscreen = publish_images or config.get("ENABLE_OFFSCREEN", False)
# Configure cameras if requested
camera_configs = {}
if enable_offscreen:
camera_list = cameras or ["head_camera"]
for cam_name in camera_list:
camera_configs[cam_name] = {"height": 480, "width": 640}
print(f"📷 Cameras: {', '.join(camera_list)} → ZMQ port {camera_port}")
print("="*60)
# Initialize DDS channel
init_channel(config=config)
# Create simulator
sim = SimulatorFactory.create_simulator(
config=config,
env_name="default",
onscreen=config.get("ENABLE_ONSCREEN", True),
offscreen=enable_offscreen,
camera_configs=camera_configs,
)
# Start simulator
print("\nSimulator running. Press Ctrl+C to exit.")
if enable_offscreen and publish_images:
print(f"Camera images publishing on tcp://localhost:{camera_port}")
try:
if publish_images:
sim.start_image_publish_subprocess(
start_method="spawn",
camera_port=camera_port,
)
time.sleep(1)
sim.start()
except KeyboardInterrupt:
print("+++++Simulator interrupted by user.")
except Exception as e:
print(f"++++error in simulator: {e} ++++")
finally:
print("++++closing simulator ++++")
sim.close()
if __name__ == "__main__":
main()
|