| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from collections import defaultdict |
| from typing import Any |
|
|
| import torch |
|
|
| from verl import DataProto |
| from verl.workers.reward_manager import register |
| from verl.workers.reward_manager.abstract import AbstractRewardManager, RawRewardFn |
|
|
|
|
| @register("batch") |
| class BatchRewardManager(AbstractRewardManager): |
| """ |
| A batch reward manager that computes rewards for a batch of data. |
| |
| Args: |
| tokenizer (Tokenizer): The tokenizer to use for decoding the responses. |
| num_examine (int): The number of responses to examine. |
| compute_score (callable): The function to compute the rewards. |
| reward_fn_key (str): The key to use for the reward function. |
| reward_kwargs (dict): The keyword arguments to pass to the reward function. |
| """ |
|
|
| def __init__( |
| self, tokenizer, num_examine, compute_score: RawRewardFn, reward_fn_key="data_source", **reward_kwargs |
| ): |
| self.tokenizer = tokenizer |
| self.num_examine = num_examine |
| self.compute_score = compute_score |
| self.reward_fn_key = reward_fn_key |
| self.reward_kwargs = reward_kwargs |
|
|
| def verify(self, data): |
| prompt_ids = data.batch["prompts"] |
| response_ids = data.batch["responses"] |
| attention_mask = data.batch["attention_mask"] |
|
|
| prompt_len = prompt_ids.shape[-1] |
| valid_response_lengths = attention_mask[:, prompt_len:].sum(dim=-1) |
|
|
| responses_str = [] |
| for i in range(len(data)): |
| valid_len = valid_response_lengths[i] |
| valid_response_ids = response_ids[i][:valid_len] |
| response_str = self.tokenizer.decode(valid_response_ids, skip_special_tokens=True) |
| responses_str.append(response_str) |
|
|
| ground_truths = [item.non_tensor_batch["reward_model"].get("ground_truth", None) for item in data] |
| data_sources = data.non_tensor_batch[self.reward_fn_key] |
| rollout_reward_scores = data.non_tensor_batch.get("reward_scores", [{} for _ in range(len(data))]) |
| extras = data.non_tensor_batch.get("extra_info", [{} for _ in range(len(data))]) |
|
|
| for i in range(len(data)): |
| extras[i]["rollout_reward_scores"] = rollout_reward_scores[i] |
|
|
| scores = self.compute_score( |
| data_sources=data_sources, |
| solution_strs=responses_str, |
| ground_truths=ground_truths, |
| extra_infos=extras, |
| **self.reward_kwargs, |
| ) |
|
|
| return scores |
|
|
| def __call__(self, data: DataProto, return_dict: bool = False) -> torch.Tensor | dict[str, Any]: |
| |
| reward_from_rm_scores = self._extract_reward_from_rm_scores(data, return_dict) |
| if reward_from_rm_scores is not None: |
| return reward_from_rm_scores |
|
|
| reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32) |
| reward_extra_info = defaultdict(list) |
| prompt_ids = data.batch["prompts"] |
| prompt_len = prompt_ids.shape[-1] |
| attention_mask = data.batch["attention_mask"] |
| valid_response_lengths = attention_mask[:, prompt_len:].sum(dim=-1) |
| data_sources = data.non_tensor_batch[self.reward_fn_key] |
|
|
| scores = self.verify(data) |
| rewards = [] |
| already_printed: dict[str, Any] = {} |
|
|
| for i in range(len(data)): |
| length = valid_response_lengths[i].item() |
| score = scores[i] |
|
|
| if isinstance(score, dict): |
| reward = score["score"] |
| for key, value in score.items(): |
| reward_extra_info[key].append(value) |
| else: |
| reward = score |
|
|
| rewards.append(reward) |
| reward_tensor[i, length - 1] = reward |
|
|
| data_source = data_sources[i] |
| if already_printed.get(data_source, 0) < self.num_examine: |
| response_str = self.tokenizer.decode(data.batch["responses"][i][:length], skip_special_tokens=True) |
| prompt_str = self.tokenizer.decode(data.batch["prompts"][i], skip_special_tokens=True) |
| ground_truth = data[i].non_tensor_batch["reward_model"].get("ground_truth", None) |
| print("[prompt]", prompt_str) |
| print("[response]", response_str) |
| print("[ground_truth]", ground_truth) |
| print("[score]", scores[i]) |
| already_printed[data_source] = already_printed.get(data_source, 0) + 1 |
|
|
| data.batch["acc"] = torch.tensor(rewards, dtype=torch.float32, device=prompt_ids.device) |
|
|
| if return_dict: |
| return {"reward_tensor": reward_tensor, "reward_extra_info": reward_extra_info} |
| else: |
| return reward_tensor |
|
|