Safetensors
miniOneRec-kuairec / evaluate.py
hiiamkik's picture
Upload evaluate.py with huggingface_hub
c21fd3a verified
"""
评估模块
三组消融对比(RecoWorld论文核心卖点):
A. 离线NDCG baseline(无仿真用户,静态推荐)
B. 有仿真用户 + 规则推荐(无RL)
C. 完整RecoWorld(仿真用户 + GRPO训练的MLP head)
核心指标:
- Session留存率(avg session length / max_session_steps)
- 指令跟随率(instruction follow rate)
- NDCG@10(离线评估)
- ILD(Intra-List Diversity,多样性)
- 累计奖励
"""
import json
import numpy as np
from typing import List, Dict, Optional
from tqdm import tqdm
from config import cfg
from env import RecoWorldEnv, MDPState, KuaiRecEnvData
from rec_agent import RecAgent, RankingHead
from user_sim import UserSimulator
# ─────────────────────────────────────────────
# 指标计算
# ─────────────────────────────────────────────
def ndcg_at_k(recommended: List[int], relevant: set, k: int = 10) -> float:
dcg = sum(
1.0 / np.log2(i + 2)
for i, iid in enumerate(recommended[:k])
if iid in relevant
)
idcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(relevant), k)))
return dcg / idcg if idcg > 0 else 0.0
def intra_list_diversity(rec_list: List[int], item_embeddings: np.ndarray) -> float:
"""ILD: 推荐列表内平均pairwise距离(越高越多样)"""
if item_embeddings is None or len(rec_list) < 2:
return 0.0
embs = np.array([item_embeddings[iid]
for iid in rec_list if iid < len(item_embeddings)],
dtype=np.float32)
if len(embs) < 2:
return 0.0
norms = np.linalg.norm(embs, axis=1, keepdims=True)
normed = embs / (norms + 1e-9)
sim = normed @ normed.T
n = len(normed)
upper = sim[np.triu_indices(n, k=1)]
return float(1 - np.mean(upper))
def instruction_follow_rate(trajectory: List[Dict], env: RecoWorldEnv) -> float:
"""计算trajectory中指令跟随率"""
total, followed = 0, 0
for t in trajectory:
instr = t["state"].last_instruction
if instr:
total += 1
if t["info"].get("instruction_followed", False):
followed += 1
return followed / max(total, 1)
# ─────────────────────────────────────────────
# 规则推荐(Baseline B对照)
# ─────────────────────────────────────────────
class RuleBasedAgent:
"""无RL的简单推荐:用户mindset向量FAISS召回,无重排"""
def __init__(self, data: KuaiRecEnvData, rec_agent: RecAgent):
self.data = data
self.rec_agent = rec_agent
def recommend(self, state: MDPState, env=None) -> List[int]:
"""纯FAISS召回,无MLP重排"""
if self.rec_agent.retriever is None:
return list(np.random.choice(self.data.n_items,
cfg.rec_list_size, replace=False))
candidates = self.rec_agent.retriever.retrieve(state.mindset, cfg.recall_topk)
seen = set(state.history_iids[-50:])
candidates = [c for c in candidates if c not in seen]
return candidates[:cfg.rec_list_size]
# ─────────────────────────────────────────────
# 单episode评估
# ─────────────────────────────────────────────
def evaluate_episode(uid: int, env: RecoWorldEnv, agent,
user_sim: UserSimulator,
use_user_sim: bool = True) -> Dict:
state = env.reset(uid)
trajectory = []
total_reward = 0.0
session_length = 0
# 离线相关集合(用户历史后20%作测试集)
hist = env.data.user_histories.get(uid, [])
cutoff = int(len(hist) * 0.8)
test_items = set(hist[cutoff:])
all_ndcg = []
all_ild = []
while not state.done:
rec_list = agent.recommend(state, env)
if use_user_sim:
user_actions, instruction = user_sim.evaluate_recommendations(state, rec_list)
else:
# 离线baseline:直接用watch_ratio判断行为
user_actions = []
instruction = ""
for iid in rec_list:
wr = float(env._wr_matrix[uid, iid]) if env._wr_matrix is not None else 0.0
if wr >= cfg.watch_ratio_threshold:
user_actions.append("click")
else:
user_actions.append("skip")
result = env.step(state, rec_list, user_actions, instruction)
ndcg = ndcg_at_k(rec_list, test_items, cfg.ndcg_k)
ild = intra_list_diversity(rec_list, env.data.item_embeddings)
all_ndcg.append(ndcg)
all_ild.append(ild)
trajectory.append({
"state": state,
"rec_list": rec_list,
"user_actions": user_actions,
"instruction": instruction,
"reward": result.reward,
"next_state": result.next_state,
"done": result.done,
"info": result.info,
})
total_reward += result.reward
session_length += 1
state = result.next_state
return {
"uid": uid,
"total_reward": total_reward,
"session_length": session_length,
"retention_rate": session_length / cfg.max_session_steps,
"avg_ndcg": np.mean(all_ndcg),
"avg_ild": np.mean(all_ild),
"instruction_follow_rate": instruction_follow_rate(trajectory, env),
"trajectory_len": len(trajectory),
}
# ─────────────────────────────────────────────
# 消融实验主函数
# ─────────────────────────────────────────────
def run_ablation(env: RecoWorldEnv, rec_agent: RecAgent,
user_sim: UserSimulator, eval_users: List[int]) -> Dict:
"""
三组消融:
A: 离线NDCG(无仿真用户,规则agent)
B: 仿真用户 + 规则召回(无RL)
C: 仿真用户 + GRPO训练MLP(完整RecoWorld)
"""
rule_agent = RuleBasedAgent(env.data, rec_agent)
results = {}
# ── A: 离线baseline ──
print("\n[Ablation A] 离线baseline(无仿真用户)")
scores_a = []
for uid in tqdm(eval_users[:cfg.eval_episodes]):
r = evaluate_episode(uid, env, rule_agent, user_sim, use_user_sim=False)
scores_a.append(r)
results["A_offline_baseline"] = _aggregate(scores_a)
print(f" NDCG@10: {results['A_offline_baseline']['avg_ndcg']:.4f}")
# ── B: 仿真用户 + 规则推荐 ──
print("\n[Ablation B] 仿真用户 + 规则推荐(无RL)")
scores_b = []
for uid in tqdm(eval_users[:cfg.eval_episodes]):
r = evaluate_episode(uid, env, rule_agent, user_sim, use_user_sim=True)
scores_b.append(r)
results["B_sim_user_rule_rec"] = _aggregate(scores_b)
print(f" Retention: {results['B_sim_user_rule_rec']['avg_retention']:.3f} "
f"| NDCG@10: {results['B_sim_user_rule_rec']['avg_ndcg']:.4f} "
f"| IFR: {results['B_sim_user_rule_rec']['avg_ifr']:.3f}")
# ── C: 完整RecoWorld ──
print("\n[Ablation C] 完整RecoWorld(仿真用户 + GRPO MLP)")
scores_c = []
for uid in tqdm(eval_users[:cfg.eval_episodes]):
r = evaluate_episode(uid, env, rec_agent, user_sim, use_user_sim=True)
scores_c.append(r)
results["C_full_recoworld"] = _aggregate(scores_c)
print(f" Retention: {results['C_full_recoworld']['avg_retention']:.3f} "
f"| NDCG@10: {results['C_full_recoworld']['avg_ndcg']:.4f} "
f"| IFR: {results['C_full_recoworld']['avg_ifr']:.3f}")
# ── 汇总 ──
print("\n" + "="*60)
print("消融实验汇总")
print(f"{'实验':35s} {'留存率':>8} {'NDCG@10':>9} {'ILD':>7} {'指令跟随率':>10} {'累计奖励':>9}")
for name, r in results.items():
print(f"{name:35s} {r['avg_retention']:>8.3f} {r['avg_ndcg']:>9.4f} "
f"{r['avg_ild']:>7.3f} {r['avg_ifr']:>10.3f} {r['avg_reward']:>9.3f}")
with open(f"{cfg.output_dir}/ablation_results.json", "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\n结果已保存: {cfg.output_dir}/ablation_results.json")
return results
def _aggregate(scores: List[Dict]) -> Dict:
return {
"avg_retention": np.mean([s["retention_rate"] for s in scores]),
"avg_ndcg": np.mean([s["avg_ndcg"] for s in scores]),
"avg_ild": np.mean([s["avg_ild"] for s in scores]),
"avg_ifr": np.mean([s["instruction_follow_rate"] for s in scores]),
"avg_reward": np.mean([s["total_reward"] for s in scores]),
"avg_session_len": np.mean([s["session_length"] for s in scores]),
"n_episodes": len(scores),
}