"""Full-catalog (or sampled) ranking evaluator. Default strategy `full_catalog`: for each user, rank the held-out positive against all items in the catalog. Items the user saw during training (and, for the test split, during validation) are masked to -inf before ranking. We prefer full-catalog over the legacy 1+100 sampled-negatives protocol because sampled metrics are inconsistent with full-catalog metrics — a model that is worse under full-catalog can look better under sampled (Krichene & Rendle, KDD 2020). Sampled is still exposed via config for reproducing NCF-era benchmark numbers, but the evaluator logs a warning. Tie-breaking: if multiple items have the same score, we count items STRICTLY greater than the positive's score. This gives the positive its most favorable possible rank among ties — matches the convention used in most papers. """ from __future__ import annotations import numpy as np import torch from torch import Tensor, nn from ..config import Config from ..logging_utils import get_logger from ..models.base import BaseRecommender from . import metrics _logger = get_logger(__name__) def _unwrap(model: nn.Module) -> nn.Module: """DataParallel only forwards `forward()`. For methods like score_all_items we need the underlying module.""" return model.module if isinstance(model, nn.DataParallel) else model class Evaluator: def __init__( self, *, cfg: Config, val_pairs: np.ndarray, test_pairs: np.ndarray, user_train_positives: list[set[int]], user_val_positives: list[set[int]] | None, num_items: int, device: torch.device, ) -> None: self.cfg = cfg self.k = cfg.evaluation.k self.strategy = cfg.evaluation.strategy self.val_pairs = val_pairs.astype(np.int64, copy=False) self.test_pairs = test_pairs.astype(np.int64, copy=False) self.user_train_positives = user_train_positives self.user_val_positives = user_val_positives self.num_items = int(num_items) self.device = device if self.strategy == "sampled": _logger.warning( "evaluation.strategy='sampled' — results are NOT comparable to " "full_catalog metrics. Use full_catalog for modern benchmarks." ) @torch.no_grad() def evaluate(self, model: BaseRecommender, split: str) -> dict[str, float]: """Compute HR/NDCG/Recall/MAP @ K on the given split.""" if split == "val": pairs = self.val_pairs mask_sources: list[list[set[int]]] = [self.user_train_positives] elif split == "test": pairs = self.test_pairs mask_sources = [self.user_train_positives] if self.user_val_positives is not None: mask_sources.append(self.user_val_positives) else: raise ValueError(f"split must be 'val' or 'test', got {split!r}") was_training = model.training model.eval() try: ranks = self._compute_ranks(model, pairs, mask_sources) finally: if was_training: model.train() k = self.k return { f"hr@{k}": metrics.hit_rate_at_k(ranks, k), f"ndcg@{k}": metrics.ndcg_at_k(ranks, k), f"recall@{k}": metrics.recall_at_k(ranks, k), f"map@{k}": metrics.map_at_k(ranks, k), } # ---------- internals ---------- def _compute_ranks( self, model: BaseRecommender, pairs: np.ndarray, mask_sources: list[list[set[int]]], ) -> np.ndarray: batch_size = self.cfg.evaluation.eval_batch_size num = pairs.shape[0] ranks = np.empty(num, dtype=np.int64) for start in range(0, num, batch_size): end = min(start + batch_size, num) batch = pairs[start:end] users_t = torch.from_numpy(batch[:, 0]).to(self.device) pos_items_t = torch.from_numpy(batch[:, 1]).to(self.device) if self.strategy == "full_catalog": batch_ranks = self._rank_full_catalog( model, users_t, pos_items_t, batch[:, 0], mask_sources ) else: batch_ranks = self._rank_sampled( model, users_t, pos_items_t, batch[:, 0], mask_sources ) ranks[start:end] = batch_ranks return ranks def _rank_full_catalog( self, model: BaseRecommender, users_t: Tensor, pos_items_t: Tensor, users_np: np.ndarray, mask_sources: list[list[set[int]]], ) -> np.ndarray: scores = _unwrap(model).score_all_items(users_t) # [B, N] # Mask items this user has already seen in the relevant splits. The # target positive itself must NOT be masked — we're computing its rank # within the catalog, which requires knowing its score. self._apply_seen_mask(scores, users_np, pos_items_t.cpu().numpy(), mask_sources) pos_scores = scores.gather(1, pos_items_t.unsqueeze(1)) # [B, 1] # Rank = 1 + number of items scoring strictly higher than the positive. higher = (scores > pos_scores).sum(dim=1) + 1 # [B] return higher.cpu().numpy().astype(np.int64) def _rank_sampled( self, model: BaseRecommender, users_t: Tensor, pos_items_t: Tensor, users_np: np.ndarray, mask_sources: list[list[set[int]]], ) -> np.ndarray: num_neg = self.cfg.evaluation.num_sampled_negatives rng = np.random.default_rng(self.cfg.seed) # deterministic per call B = users_np.shape[0] # For each user, sample num_neg items they have NOT seen (and that # aren't the positive we're ranking). neg_arr = np.empty((B, num_neg), dtype=np.int64) for b in range(B): u = int(users_np[b]) forbidden = set() for src in mask_sources: forbidden |= src[u] forbidden.add(int(pos_items_t[b].item())) count = 0 while count < num_neg: j = int(rng.integers(0, self.num_items)) if j in forbidden: continue neg_arr[b, count] = j count += 1 neg_t = torch.from_numpy(neg_arr).to(self.device) # [B, num_neg] m = _unwrap(model) neg_scores = m.score(users_t.unsqueeze(-1).expand_as(neg_t), neg_t) pos_scores = m.score(users_t, pos_items_t).unsqueeze(1) # [B, 1] higher = (neg_scores > pos_scores).sum(dim=1) + 1 return higher.cpu().numpy().astype(np.int64) def _apply_seen_mask( self, scores: Tensor, users_np: np.ndarray, pos_items_np: np.ndarray, mask_sources: list[list[set[int]]], ) -> None: """In-place mask: set scores[b, i] = -inf for i in user_b's seen set (excluding the positive item we're trying to rank).""" neg_inf = float("-inf") for b, u in enumerate(users_np): u = int(u) pos_i = int(pos_items_np[b]) for src in mask_sources: seen = src[u] if not seen: continue # Exclude the target positive from the mask — we need its score. idxs = [i for i in seen if i != pos_i] if idxs: scores[b, idxs] = neg_inf