File size: 7,643 Bytes
24c2665 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
from verl.utils.dataset.rl_dataset import RLHFDataset
from verl.utils.model import compute_position_id_with_mask
import verl.utils.torch_functional as verl_F
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from omegaconf import ListConfig
import os
from typing import List, Union
import copy
import pandas as pd
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, PreTrainedTokenizer
from verl.utils.fs import copy_local_path_from_hdfs
from verl.utils.model import compute_position_id_with_mask
import verl.utils.torch_functional as verl_F
def collate_fn(data_list: list[dict]) -> dict:
tensors = {}
non_tensors = {}
for data in data_list:
for key, val in data.items():
if isinstance(val, torch.Tensor):
if key not in tensors:
tensors[key] = []
tensors[key].append(val)
else:
if key not in non_tensors:
non_tensors[key] = []
non_tensors[key].append(val)
for key, val in tensors.items():
tensors[key] = torch.stack(val, dim=0)
for key, val in non_tensors.items():
non_tensors[key] = np.array(val, dtype=object)
output = {}
output.update(tensors)
output.update(non_tensors)
return output
class RLHFDataset(Dataset):
"""
We assume the dataset contains a column that contains prompts and other information
"""
def __init__(self,
parquet_files: Union[str, List[str]],
tokenizer: PreTrainedTokenizer,
prompt_key='prompt',
max_prompt_length=1024,
filter_prompts=True,
cache_dir='~/.cache/verl/rlhf',
chat_template_func=None,
return_raw_chat=False,
truncation='error',
extra_source_key=None,
):
if not isinstance(parquet_files, (List, ListConfig)):
parquet_files = [parquet_files]
# ๋๋ ํ ๋ฆฌ์ธ ๊ฒฝ์ฐ .parquet ํ์ผ๋ง ํํฐ๋ง
filtered_files = []
for file_path in parquet_files:
if os.path.isdir(file_path):
# ๋๋ ํ ๋ฆฌ์์ .parquet ํ์ผ๋ง ์ฐพ๊ธฐ
parquet_files_in_dir = [
os.path.join(file_path, f)
for f in os.listdir(file_path)
if f.endswith('.parquet')
]
filtered_files.extend(sorted(parquet_files_in_dir))
else:
# ํ์ผ ๊ฒฝ๋ก์ธ ๊ฒฝ์ฐ ๊ทธ๋๋ก ์ถ๊ฐ
filtered_files.append(file_path)
self.parquet_files = copy.deepcopy(filtered_files)
self.original_parquet_files = copy.deepcopy(filtered_files) # use for resume
self.cache_dir = os.path.expanduser(cache_dir)
self.tokenizer = tokenizer
self.extra_source_key = extra_source_key
self.prompt_key = prompt_key
self.max_prompt_length = max_prompt_length
self.filter_prompts = filter_prompts
self.return_raw_chat = return_raw_chat
self.chat_template_func = chat_template_func
self.truncation = truncation
# whether to store the dataset in state_dict()
# default not store
self.serialize_dataset = False
self._download()
self._read_files_and_tokenize()
def _download(self, use_origin_parquet=False):
from verl.utils.fs import copy_local_path_from_hdfs
parquet_files = self.parquet_files if not use_origin_parquet else self.original_parquet_files
for i, parquet_file in enumerate(parquet_files):
self.parquet_files[i] = copy_local_path_from_hdfs(src=parquet_file, cache_dir=self.cache_dir)
def _read_files_and_tokenize(self):
dataframes = []
for parquet_file in self.parquet_files:
# read parquet files and cache
dataframe = pd.read_parquet(parquet_file)
dataframes.append(dataframe)
self.dataframe = pd.concat(dataframes)
print(f'original dataset len: {len(self.dataframe)}{". Source: " + self.extra_source_key if self.extra_source_key else ""}')
# filter out too long prompts
tokenizer = self.tokenizer
prompt_key = self.prompt_key
self.dataframe = self.dataframe[self.dataframe.apply(lambda doc: len(
tokenizer.apply_chat_template(doc[prompt_key], add_generation_prompt=True)) <= self.max_prompt_length,
axis=1)]
print(f'filter dataset len: {len(self.dataframe)}{". Source: " + self.extra_source_key if self.extra_source_key else ""}')
def resume_dataset_state(self):
self.serialize_dataset = False if hasattr(self, 'original_parquet_files') else True
# resume dataframe if not it's serialized in data.pt
if not self.serialize_dataset:
self._download(use_origin_parquet=True) # download and resume from original parquet files
self._read_files_and_tokenize()
else:
print(r'old dataloader ckpt file is used, please train from scratch for better ckpt performance')
def __len__(self):
return len(self.dataframe)
def __getitem__(self, item):
"""
Note that we also return the raw_input_ids so that it can be combined with other chat template
"""
row_dict = self.dataframe.iloc[item].to_dict()
chat = row_dict.pop(self.prompt_key)
prompt_with_chat_template = self.tokenizer.apply_chat_template(chat, add_generation_prompt=True, tokenize=False)
input_ids, attention_mask = verl_F.tokenize_and_postprocess_data(prompt=prompt_with_chat_template,
tokenizer=self.tokenizer,
max_length=self.max_prompt_length,
pad_token_id=self.tokenizer.pad_token_id,
left_pad=True,
truncation=self.truncation)
position_ids = compute_position_id_with_mask(attention_mask)
row_dict['input_ids'] = input_ids[0]
row_dict['attention_mask'] = attention_mask[0]
row_dict['position_ids'] = position_ids[0]
# encode prompts without chat template
if self.return_raw_chat:
row_dict['raw_prompt'] = chat.tolist()
# add index for each prompt
index = row_dict.get("extra_info", {}).get("index", 0)
row_dict["index"] = index
return row_dict
def __getstate__(self):
if not self.serialize_dataset:
state = self.__dict__.copy()
if 'dataframe' in state:
del state['dataframe']
return state
return self.__dict__.copy()
|