| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ |
| Generate responses given a dataset of prompts |
| """ |
|
|
| import os |
|
|
| import aiohttp |
| import hydra |
| import numpy as np |
| import ray |
|
|
| os.environ["NCCL_DEBUG"] = "WARN" |
| os.environ["TOKENIZERS_PARALLELISM"] = "true" |
| |
|
|
| import asyncio |
| from pprint import pprint |
|
|
| import pandas as pd |
| from omegaconf import OmegaConf |
| from openai.types.chat import ChatCompletion |
|
|
| from verl.utils.hdfs_io import makedirs |
| from verl.workers.rollout.replica import get_rollout_replica_class |
|
|
|
|
| async def start_server(config): |
| tp_size = config.actor_rollout_ref.rollout.tensor_model_parallel_size |
| num_replicas = (config.trainer.n_gpus_per_node * config.trainer.nnodes) // tp_size |
| rollout_config = config.actor_rollout_ref.rollout |
| model_config = config.actor_rollout_ref.model |
| |
| rollout_server_class = get_rollout_replica_class(config.actor_rollout_ref.rollout.name) |
| rollout_servers = [ |
| rollout_server_class( |
| replica_rank=replica_rank, |
| config=rollout_config, |
| model_config=model_config, |
| gpus_per_node=config.trainer.n_gpus_per_node, |
| ) |
| for replica_rank in range(num_replicas) |
| ] |
| await asyncio.gather(*[server.init_standalone() for server in rollout_servers]) |
|
|
| server_handles = [server._server_handle for server in rollout_servers] |
| server_addresses = [server._server_address for server in rollout_servers] |
| assert len(server_handles) == num_replicas |
| assert len(server_addresses) == num_replicas |
|
|
| return server_handles, server_addresses |
|
|
|
|
| async def submit_request(server_address, **chat_complete_request): |
| try: |
| extra_headers = chat_complete_request.pop("extra_headers", {}) |
| timeout = aiohttp.ClientTimeout(total=None) |
| session = aiohttp.ClientSession(timeout=timeout) |
| async with session.post( |
| url=f"http://{server_address}/v1/chat/completions", |
| headers={"Authorization": "Bearer token-abc123", **extra_headers}, |
| json=chat_complete_request, |
| ) as resp: |
| data = await resp.json() |
| return ChatCompletion(**data) |
| finally: |
| await session.close() |
|
|
|
|
| async def generate_per_replica(server_address, model_path: str, n_samples: int, sampling_params: dict, chat_lst: list): |
| |
| |
|
|
| |
| |
| |
| |
|
|
| chat_complete_request = [ |
| { |
| "model": model_path, |
| "messages": messages, |
| **sampling_params, |
| } |
| for messages in chat_lst |
| for _ in range(n_samples) |
| ] |
|
|
| tasks = [submit_request(server_address, **req) for req in chat_complete_request] |
| results = await asyncio.gather(*tasks) |
| return results |
|
|
|
|
| async def generate( |
| server_addresses: list, model_path: str, n_samples: int, sampling_params: dict, chat_numpy: np.ndarray |
| ): |
| num_replicas = len(server_addresses) |
| chat_sub_array = np.array_split(chat_numpy, num_replicas) |
| chat_sub_array = [chat.tolist() for chat in chat_sub_array] |
| assert len(server_addresses) == len(chat_sub_array) |
| results = await asyncio.gather( |
| *[ |
| generate_per_replica(server_addresses[i], model_path, n_samples, sampling_params, chat_sub_array[i]) |
| for i in range(num_replicas) |
| ] |
| ) |
| return results |
|
|
|
|
| @hydra.main(config_path="config", config_name="ppo_trainer", version_base=None) |
| def main(config): |
| ray.init(runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_USE_V1": "1"}}) |
|
|
| pprint(OmegaConf.to_container(config, resolve=True)) |
| OmegaConf.resolve(config) |
|
|
| n_samples = config.actor_rollout_ref.rollout.n |
|
|
| if config.actor_rollout_ref.rollout.temperature == 0.0: |
| assert n_samples == 1, "When temperature=0, n_samples must be 1." |
| assert n_samples >= 1, "n_samples should always >= 1" |
|
|
| sampling_params = { |
| "temperature": config.actor_rollout_ref.rollout.temperature, |
| "top_p": config.actor_rollout_ref.rollout.top_p, |
| |
| "max_tokens": config.actor_rollout_ref.rollout.response_length, |
| } |
|
|
| from omegaconf import ListConfig |
|
|
| train_files = config.data.train_files |
| if not isinstance(train_files, list | ListConfig): |
| train_files = [train_files] |
|
|
| |
|
|
| datasets = [] |
| for train_file in train_files: |
| dataset = pd.read_parquet(train_file) |
| datasets.append(dataset) |
|
|
| |
| dataset = pd.concat(datasets, axis=0, ignore_index=True) |
| chat_lst = dataset[config.data.prompt_key].tolist() |
| chat_lst = [chat.tolist() for chat in chat_lst] |
| chat_numpy = np.array(chat_lst) |
|
|
| |
| server_handles, server_addresses = asyncio.run(start_server(config)) |
|
|
| |
| gen_results = asyncio.run( |
| generate(server_addresses, config.actor_rollout_ref.model.path, n_samples, sampling_params, chat_numpy) |
| ) |
|
|
| |
| import itertools |
|
|
| results = list(itertools.chain.from_iterable(gen_results)) |
|
|
| |
| results = np.array([result.choices[0].message.content for result in results]) |
| results = np.reshape(results, (-1, n_samples)) |
|
|
| assert results.shape == (len(chat_lst), n_samples) |
|
|
| results = results.tolist() |
|
|
| |
| dataset["responses"] = results |
|
|
| |
| output_dir = os.path.dirname(config.data.output_path) |
| makedirs(output_dir, exist_ok=True) |
| print(f"Saving results to {config.data.output_path}") |
| dataset.to_parquet(config.data.output_path) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|