| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| import logging |
| import os |
|
|
| import ray |
| import yaml |
|
|
| from verl.workers.config.rollout import PrometheusConfig |
|
|
| logger = logging.getLogger(__file__) |
| logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN")) |
|
|
|
|
| def update_prometheus_config(config: PrometheusConfig, server_addresses: list[str], rollout_name: str | None = None): |
| """ |
| Update Prometheus configuration file with server addresses and reload on first node. |
| |
| server_addresses: vllm or sglang server addresses |
| |
| rollout_name: name of the rollout backend (e.g., "vllm", "sglang") |
| """ |
|
|
| if not server_addresses: |
| logger.warning("No server addresses available to update Prometheus config") |
| return |
|
|
| try: |
| |
| prometheus_config_json = { |
| "global": {"scrape_interval": "10s", "evaluation_interval": "10s"}, |
| "scrape_configs": [ |
| { |
| "job_name": "ray", |
| "file_sd_configs": [{"files": ["/tmp/ray/prom_metrics_service_discovery.json"]}], |
| }, |
| {"job_name": "rollout", "static_configs": [{"targets": server_addresses}]}, |
| ], |
| } |
|
|
| |
| @ray.remote(num_cpus=0) |
| def write_config_file(config_data, config_path): |
| os.makedirs(os.path.dirname(config_path), exist_ok=True) |
| with open(config_path, "w") as f: |
| yaml.dump(config_data, f, default_flow_style=False, indent=2) |
| return True |
|
|
| |
| @ray.remote(num_cpus=0) |
| def reload_prometheus(port): |
| import socket |
| import subprocess |
|
|
| hostname = socket.gethostname() |
| ip_address = socket.gethostbyname(hostname) |
|
|
| reload_url = f"http://{ip_address}:{port}/-/reload" |
|
|
| try: |
| subprocess.run(["curl", "-X", "POST", reload_url], capture_output=True, text=True, timeout=10) |
| print(f"Reloading Prometheus on node: {reload_url}") |
| except Exception: |
| |
| pass |
|
|
| |
| nodes = ray.nodes() |
| alive_nodes = [node for node in nodes if node["Alive"]] |
|
|
| |
| write_tasks = [] |
| for node in alive_nodes: |
| node_ip = node["NodeManagerAddress"] |
| task = write_config_file.options( |
| resources={"node:" + node_ip: 0.001} |
| ).remote(prometheus_config_json, config.file) |
| write_tasks.append(task) |
|
|
| ray.get(write_tasks) |
|
|
| server_type = rollout_name.upper() if rollout_name else "rollout" |
| print(f"Updated Prometheus configuration at {config.file} with {len(server_addresses)} {server_type} servers") |
|
|
| |
| reload_tasks = [] |
| for node in alive_nodes: |
| node_ip = node["NodeManagerAddress"] |
| task = reload_prometheus.options( |
| resources={"node:" + node_ip: 0.001} |
| ).remote(config.port) |
| reload_tasks.append(task) |
|
|
| ray.get(reload_tasks) |
|
|
| except Exception as e: |
| logger.error(f"Failed to update Prometheus configuration: {e}") |
|
|