File size: 5,370 Bytes
38d8dc2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import re
import numpy as np
from typing import List, Dict, Union, Optional
from sentence_transformers import SentenceTransformer, util
from multiprocessing import Pool, cpu_count
# 全局初始化 SentenceTransformer 模型,并移动到 GPU
embedder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2').to("cuda")
def compute_rewards(
completions: List[str],
min_len: Union[int, List[int]] = 100,
max_len: Union[int, List[int]] = 300,
weights: Union[tuple, List[tuple]] = (0.25, 0.25, 0.25, 0.25),
return_components: bool = False,
**kwargs
) -> Union[List[float], Dict[str, List[float]]]:
"""并行优化的奖励计算函数"""
keywords = kwargs["keywords"]
n_samples = len(completions)
min_len = _to_list(min_len, n_samples)
max_len = _to_list(max_len, n_samples)
weights = _to_list(weights, n_samples)
# 并行计算各子奖励
with Pool(cpu_count()) as pool:
length_rewards = pool.starmap(_length_reward, zip(completions, min_len, max_len))
format_rewards = pool.map(_format_reward, completions)
keyword_rewards = _batch_keyword_reward(completions, keywords) # 这个用 GPU 计算
language_rewards = pool.map(_language_reward, completions)
# 加权求和总奖励
total_rewards = [
w[0] * lr + w[1] * fr + w[2] * kr + w[3] * lang_r
for w, lr, fr, kr, lang_r in zip(weights, length_rewards, format_rewards, keyword_rewards, language_rewards)
]
if return_components:
return {
"rewards": total_rewards,
"length_rewards": length_rewards,
"format_rewards": format_rewards,
"keyword_rewards": keyword_rewards,
"language_rewards": language_rewards,
}
return total_rewards
# -------------- 并行子函数 --------------
def _to_list(val: Union[any, List[any]], n: int) -> List[any]:
"""转换为样本级列表"""
return val if isinstance(val, list) else [val] * n
def _length_reward(text: str, min_len: int, max_len: int) -> float:
"""单样本长度奖励"""
original = text.split("</think>:", 1)[1].strip() if "</think>:" in text else text.strip()
length = len(original)
if length < min_len:
return length / min_len + 1 # 1~2线性增长
elif length > max_len:
return max_len / length + 1 # 2~1线性衰减
return 2.0
def _format_reward(text: str) -> float:
"""单样本格式奖励"""
if "<think>" not in text or "</think>:" not in text:
return -2.0
think_content = text.split("<think>")[1].split("</think>")[0].strip()
return 2.0 if think_content else -2.0
def _batch_keyword_reward(texts: List[str], keywords_list: List[List[str]]) -> List[float]:
"""批量关键词匹配(优化:使用 GPU 并行计算)"""
originals = [text.split("</think>:", 1)[1].strip() if "</think>:" in text else text.strip() for text in texts]
valid_indices = [i for i, orig in enumerate(originals) if orig and keywords_list[i]]
if not valid_indices:
return [0.8 if not kw else -2.0 for kw in keywords_list] # 无关键词时默认0.8
valid_originals = [originals[i] for i in valid_indices]
valid_keywords = [keywords_list[i] for i in valid_indices]
# 让计算在 GPU 上执行
original_embs = embedder.encode(valid_originals, convert_to_tensor=True)
keyword_embs = [embedder.encode(kw, convert_to_tensor=True) for kw in valid_keywords]
similarities = [
util.pytorch_cos_sim(orig_emb, kw_emb).mean().item()
for orig_emb, kw_emb in zip(original_embs, keyword_embs)
]
# 分配奖励
rewards = []
sim_idx = 0
for i, kw in enumerate(keywords_list):
if i in valid_indices:
sim = similarities[sim_idx]
rewards.append(2.0 if sim >= 0.6 else (1.2 if sim >= 0.4 else 0.8))
sim_idx += 1
else:
rewards.append(0.8 if not kw else -2.0)
return rewards
def _language_reward(text: str) -> float:
"""单样本语言奖励"""
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
ratio = chinese_chars / max(1, len(text))
if ratio >= 0.9:
return 2.0
elif ratio >= 0.7:
return 1.4
return 0.7
# ------------ 运行示例 ------------
if __name__ == "__main__":
samples = [
"科技<think>技术创新是关键</think>:人工智能在医疗领域的应用正在改变诊断方式。",
"无效样本<think></think>:无意义内容",
"经济<think>宏观经济分析</think>:全球供应链重构对发展中国家影响深远。"
]
keywords = [
["科技", "人工智能"],
[], # 空关键词
["经济", "供应链"]
]
# 并行计算
rewards = compute_rewards(
completions=samples,
keywords=keywords,
min_len=[50, 10, 80],
return_components=True
)
print("总奖励:", rewards["rewards"])
print("长度奖励:", rewards["length_rewards"])
print("格式奖励:", rewards["format_rewards"])
print("关键词奖励:", rewards["keyword_rewards"])
print("语言奖励:", rewards["language_rewards"])
|