|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
Offline evaluate the performance of a generated file using reward model and ground truth verifier.
|
|
|
The input is a parquet file that contains N generated sequences and (optional) the ground truth.
|
|
|
|
|
|
"""
|
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
import hydra
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
import ray
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
from verl.utils.fs import copy_to_local
|
|
|
|
|
|
|
|
|
def get_custom_reward_fn(config):
|
|
|
import importlib.util
|
|
|
import os
|
|
|
import sys
|
|
|
|
|
|
reward_fn_config = config.get("custom_reward_function") or {}
|
|
|
file_path = reward_fn_config.get("path")
|
|
|
if not file_path:
|
|
|
return None
|
|
|
|
|
|
if not os.path.exists(file_path):
|
|
|
raise FileNotFoundError(f"Reward function file '{file_path}' not found.")
|
|
|
|
|
|
spec = importlib.util.spec_from_file_location("custom_module", file_path)
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
|
try:
|
|
|
sys.modules["custom_module"] = module
|
|
|
spec.loader.exec_module(module)
|
|
|
except Exception as e:
|
|
|
raise RuntimeError(f"Error loading module from '{file_path}'") from e
|
|
|
|
|
|
function_name = reward_fn_config.get("name")
|
|
|
if not hasattr(module, function_name):
|
|
|
raise AttributeError(f"Reward function '{function_name}' not found in '{file_path}'.")
|
|
|
|
|
|
print(f"using customized reward function '{function_name}' from '{file_path}'")
|
|
|
raw_fn = getattr(module, function_name)
|
|
|
|
|
|
reward_kwargs = dict(reward_fn_config.get("reward_kwargs", {}))
|
|
|
|
|
|
def wrapped_fn(*args, **kwargs):
|
|
|
return raw_fn(*args, **kwargs, **reward_kwargs)
|
|
|
|
|
|
return wrapped_fn
|
|
|
|
|
|
|
|
|
@ray.remote
|
|
|
def process_item(reward_fn, data_source, response_lst, reward_data):
|
|
|
ground_truth = reward_data["ground_truth"]
|
|
|
score_lst = [reward_fn(data_source, r, ground_truth) for r in response_lst]
|
|
|
return data_source, np.mean(score_lst)
|
|
|
|
|
|
|
|
|
@hydra.main(config_path="config", config_name="evaluation", version_base=None)
|
|
|
def main(config):
|
|
|
local_path = copy_to_local(config.data.path)
|
|
|
dataset = pd.read_parquet(local_path)
|
|
|
responses = dataset[config.data.response_key]
|
|
|
data_sources = dataset[config.data.data_source_key]
|
|
|
reward_model_data = dataset[config.data.reward_model_key]
|
|
|
|
|
|
total = len(dataset)
|
|
|
|
|
|
|
|
|
if not ray.is_initialized():
|
|
|
ray.init(num_cpus=config.ray_init.num_cpus)
|
|
|
|
|
|
|
|
|
data_source_reward = defaultdict(list)
|
|
|
compute_score = get_custom_reward_fn(config)
|
|
|
|
|
|
|
|
|
remote_tasks = [process_item.remote(compute_score, data_sources[i], responses[i], reward_model_data[i]) for i in range(total)]
|
|
|
|
|
|
|
|
|
with tqdm(total=total) as pbar:
|
|
|
while len(remote_tasks) > 0:
|
|
|
|
|
|
done_ids, remote_tasks = ray.wait(remote_tasks)
|
|
|
for result_id in done_ids:
|
|
|
data_source, score = ray.get(result_id)
|
|
|
data_source_reward[data_source].append(score)
|
|
|
pbar.update(1)
|
|
|
|
|
|
metric_dict = {}
|
|
|
for data_source, rewards in data_source_reward.items():
|
|
|
metric_dict[f"test_score/{data_source}"] = np.mean(rewards)
|
|
|
|
|
|
print(metric_dict)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|