| """ |
| 评估模块 |
| 三组消融对比(RecoWorld论文核心卖点): |
| A. 离线NDCG baseline(无仿真用户,静态推荐) |
| B. 有仿真用户 + 规则推荐(无RL) |
| C. 完整RecoWorld(仿真用户 + GRPO训练的MLP head) |
| |
| 核心指标: |
| - Session留存率(avg session length / max_session_steps) |
| - 指令跟随率(instruction follow rate) |
| - NDCG@10(离线评估) |
| - ILD(Intra-List Diversity,多样性) |
| - 累计奖励 |
| """ |
| import json |
| import numpy as np |
| from typing import List, Dict, Optional |
| from tqdm import tqdm |
|
|
| from config import cfg |
| from env import RecoWorldEnv, MDPState, KuaiRecEnvData |
| from rec_agent import RecAgent, RankingHead |
| from user_sim import UserSimulator |
|
|
|
|
| |
| |
| |
| def ndcg_at_k(recommended: List[int], relevant: set, k: int = 10) -> float: |
| dcg = sum( |
| 1.0 / np.log2(i + 2) |
| for i, iid in enumerate(recommended[:k]) |
| if iid in relevant |
| ) |
| idcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(relevant), k))) |
| return dcg / idcg if idcg > 0 else 0.0 |
|
|
|
|
| def intra_list_diversity(rec_list: List[int], item_embeddings: np.ndarray) -> float: |
| """ILD: 推荐列表内平均pairwise距离(越高越多样)""" |
| if item_embeddings is None or len(rec_list) < 2: |
| return 0.0 |
| embs = np.array([item_embeddings[iid] |
| for iid in rec_list if iid < len(item_embeddings)], |
| dtype=np.float32) |
| if len(embs) < 2: |
| return 0.0 |
| norms = np.linalg.norm(embs, axis=1, keepdims=True) |
| normed = embs / (norms + 1e-9) |
| sim = normed @ normed.T |
| n = len(normed) |
| upper = sim[np.triu_indices(n, k=1)] |
| return float(1 - np.mean(upper)) |
|
|
|
|
| def instruction_follow_rate(trajectory: List[Dict], env: RecoWorldEnv) -> float: |
| """计算trajectory中指令跟随率""" |
| total, followed = 0, 0 |
| for t in trajectory: |
| instr = t["state"].last_instruction |
| if instr: |
| total += 1 |
| if t["info"].get("instruction_followed", False): |
| followed += 1 |
| return followed / max(total, 1) |
|
|
|
|
| |
| |
| |
| class RuleBasedAgent: |
| """无RL的简单推荐:用户mindset向量FAISS召回,无重排""" |
| def __init__(self, data: KuaiRecEnvData, rec_agent: RecAgent): |
| self.data = data |
| self.rec_agent = rec_agent |
|
|
| def recommend(self, state: MDPState, env=None) -> List[int]: |
| """纯FAISS召回,无MLP重排""" |
| if self.rec_agent.retriever is None: |
| return list(np.random.choice(self.data.n_items, |
| cfg.rec_list_size, replace=False)) |
| candidates = self.rec_agent.retriever.retrieve(state.mindset, cfg.recall_topk) |
| seen = set(state.history_iids[-50:]) |
| candidates = [c for c in candidates if c not in seen] |
| return candidates[:cfg.rec_list_size] |
|
|
|
|
| |
| |
| |
| def evaluate_episode(uid: int, env: RecoWorldEnv, agent, |
| user_sim: UserSimulator, |
| use_user_sim: bool = True) -> Dict: |
| state = env.reset(uid) |
| trajectory = [] |
| total_reward = 0.0 |
| session_length = 0 |
|
|
| |
| hist = env.data.user_histories.get(uid, []) |
| cutoff = int(len(hist) * 0.8) |
| test_items = set(hist[cutoff:]) |
|
|
| all_ndcg = [] |
| all_ild = [] |
|
|
| while not state.done: |
| rec_list = agent.recommend(state, env) |
|
|
| if use_user_sim: |
| user_actions, instruction = user_sim.evaluate_recommendations(state, rec_list) |
| else: |
| |
| user_actions = [] |
| instruction = "" |
| for iid in rec_list: |
| wr = float(env._wr_matrix[uid, iid]) if env._wr_matrix is not None else 0.0 |
| if wr >= cfg.watch_ratio_threshold: |
| user_actions.append("click") |
| else: |
| user_actions.append("skip") |
|
|
| result = env.step(state, rec_list, user_actions, instruction) |
|
|
| ndcg = ndcg_at_k(rec_list, test_items, cfg.ndcg_k) |
| ild = intra_list_diversity(rec_list, env.data.item_embeddings) |
| all_ndcg.append(ndcg) |
| all_ild.append(ild) |
|
|
| trajectory.append({ |
| "state": state, |
| "rec_list": rec_list, |
| "user_actions": user_actions, |
| "instruction": instruction, |
| "reward": result.reward, |
| "next_state": result.next_state, |
| "done": result.done, |
| "info": result.info, |
| }) |
|
|
| total_reward += result.reward |
| session_length += 1 |
| state = result.next_state |
|
|
| return { |
| "uid": uid, |
| "total_reward": total_reward, |
| "session_length": session_length, |
| "retention_rate": session_length / cfg.max_session_steps, |
| "avg_ndcg": np.mean(all_ndcg), |
| "avg_ild": np.mean(all_ild), |
| "instruction_follow_rate": instruction_follow_rate(trajectory, env), |
| "trajectory_len": len(trajectory), |
| } |
|
|
|
|
| |
| |
| |
| def run_ablation(env: RecoWorldEnv, rec_agent: RecAgent, |
| user_sim: UserSimulator, eval_users: List[int]) -> Dict: |
| """ |
| 三组消融: |
| A: 离线NDCG(无仿真用户,规则agent) |
| B: 仿真用户 + 规则召回(无RL) |
| C: 仿真用户 + GRPO训练MLP(完整RecoWorld) |
| """ |
| rule_agent = RuleBasedAgent(env.data, rec_agent) |
| results = {} |
|
|
| |
| print("\n[Ablation A] 离线baseline(无仿真用户)") |
| scores_a = [] |
| for uid in tqdm(eval_users[:cfg.eval_episodes]): |
| r = evaluate_episode(uid, env, rule_agent, user_sim, use_user_sim=False) |
| scores_a.append(r) |
| results["A_offline_baseline"] = _aggregate(scores_a) |
| print(f" NDCG@10: {results['A_offline_baseline']['avg_ndcg']:.4f}") |
|
|
| |
| print("\n[Ablation B] 仿真用户 + 规则推荐(无RL)") |
| scores_b = [] |
| for uid in tqdm(eval_users[:cfg.eval_episodes]): |
| r = evaluate_episode(uid, env, rule_agent, user_sim, use_user_sim=True) |
| scores_b.append(r) |
| results["B_sim_user_rule_rec"] = _aggregate(scores_b) |
| print(f" Retention: {results['B_sim_user_rule_rec']['avg_retention']:.3f} " |
| f"| NDCG@10: {results['B_sim_user_rule_rec']['avg_ndcg']:.4f} " |
| f"| IFR: {results['B_sim_user_rule_rec']['avg_ifr']:.3f}") |
|
|
| |
| print("\n[Ablation C] 完整RecoWorld(仿真用户 + GRPO MLP)") |
| scores_c = [] |
| for uid in tqdm(eval_users[:cfg.eval_episodes]): |
| r = evaluate_episode(uid, env, rec_agent, user_sim, use_user_sim=True) |
| scores_c.append(r) |
| results["C_full_recoworld"] = _aggregate(scores_c) |
| print(f" Retention: {results['C_full_recoworld']['avg_retention']:.3f} " |
| f"| NDCG@10: {results['C_full_recoworld']['avg_ndcg']:.4f} " |
| f"| IFR: {results['C_full_recoworld']['avg_ifr']:.3f}") |
|
|
| |
| print("\n" + "="*60) |
| print("消融实验汇总") |
| print(f"{'实验':35s} {'留存率':>8} {'NDCG@10':>9} {'ILD':>7} {'指令跟随率':>10} {'累计奖励':>9}") |
| for name, r in results.items(): |
| print(f"{name:35s} {r['avg_retention']:>8.3f} {r['avg_ndcg']:>9.4f} " |
| f"{r['avg_ild']:>7.3f} {r['avg_ifr']:>10.3f} {r['avg_reward']:>9.3f}") |
|
|
| with open(f"{cfg.output_dir}/ablation_results.json", "w") as f: |
| json.dump(results, f, indent=2, ensure_ascii=False) |
| print(f"\n结果已保存: {cfg.output_dir}/ablation_results.json") |
| return results |
|
|
|
|
| def _aggregate(scores: List[Dict]) -> Dict: |
| return { |
| "avg_retention": np.mean([s["retention_rate"] for s in scores]), |
| "avg_ndcg": np.mean([s["avg_ndcg"] for s in scores]), |
| "avg_ild": np.mean([s["avg_ild"] for s in scores]), |
| "avg_ifr": np.mean([s["instruction_follow_rate"] for s in scores]), |
| "avg_reward": np.mean([s["total_reward"] for s in scores]), |
| "avg_session_len": np.mean([s["session_length"] for s in scores]), |
| "n_episodes": len(scores), |
| } |
|
|