import pandas as pd import numpy as np from collections import defaultdict from sklearn.metrics.pairwise import cosine_similarity def leave_one_out_by_timestamp(ratings_df): ratings_df = ratings_df.sort_values(['userId', 'timestamp']) train_idx, test_idx = [], [] for user, group in ratings_df.groupby('userId'): if len(group) > 1: test_idx.append(group.index[-1]) train_idx.extend(group.index[:-1]) else: test_idx.append(group.index[-1]) train = ratings_df.loc[train_idx] test = ratings_df.loc[test_idx] return train, test def precision_at_k(ranked_lists, k=10): precisions = [] for uid, items in ranked_lists.items(): relevant = [r for _, _, r in items[:k] if r >= 4] precisions.append(len(relevant) / k) return np.mean(precisions) def recall_at_k(ranked_lists, test_truth, k=10): recalls = [] truth = defaultdict(set) # Accept both DataFrame and ndarray for test_truth if isinstance(test_truth, pd.DataFrame): for _, row in test_truth.iterrows(): uid, iid, r = row['userId'], row['movieId'], row['rating'] if r >= 4: truth[uid].add(iid) else: for row in test_truth: # row can be (uid, iid, r, ...) or (uid, iid, r) uid, iid, r = row[:3] if r >= 4: truth[uid].add(iid) for uid, items in ranked_lists.items(): recommended = {iid for iid, _, _ in items[:k]} relevant = truth.get(uid, set()) if relevant: recalls.append(len(recommended & relevant) / len(relevant)) return np.mean(recalls) def ndcg_at_k(ranked_lists, k=10): ndcgs = [] for uid, items in ranked_lists.items(): dcg = 0.0 idcg = 0.0 rels = [1 if r >= 4 else 0 for _, _, r in items[:k]] for i, rel in enumerate(rels): dcg += (2**rel - 1) / np.log2(i + 2) ideal_rels = sorted(rels, reverse=True) for i, rel in enumerate(ideal_rels): idcg += (2**rel - 1) / np.log2(i + 2) if idcg > 0: ndcgs.append(dcg / idcg) return np.mean(ndcgs) def catalog_coverage(ranked_lists, all_items): recommended = {iid for items in ranked_lists.values() for iid, _, _ in items} return len(recommended) / len(all_items) def novelty(ranked_lists, item_popularity): novelties = [] total = sum(item_popularity.values()) for items in ranked_lists.values(): for iid, _, _ in items: p = item_popularity.get(iid, 1) / total novelties.append(-np.log2(p + 1e-9)) return np.mean(novelties) def intra_list_diversity(ranked_lists, item_features): diversities = [] for items in ranked_lists.values(): iids = [iid for iid, _, _ in items] feats = [item_features[iid] for iid in iids if iid in item_features] if len(feats) > 1: sims = cosine_similarity(feats) upper = sims[np.triu_indices_from(sims, k=1)] diversities.append(1 - np.mean(upper)) return np.mean(diversities) def predictions_to_ranked_lists(predictions, k=20): user_items = defaultdict(list) for uid, iid, true_r, est, _ in predictions: user_items[uid].append((iid, est, true_r)) ranked = {} for uid, items in user_items.items(): ranked[uid] = sorted(items, key=lambda x: x[1], reverse=True)[:k] return ranked def evaluate_all(predictions, testset, all_items, item_popularity, item_features, k_list=[10, 20]): ranked_lists = predictions_to_ranked_lists(predictions, k=max(k_list)) results = {} for k in k_list: results[f'Precision@{k}'] = precision_at_k(ranked_lists, k) results[f'Recall@{k}'] = recall_at_k(ranked_lists, testset, k) results[f'NDCG@{k}'] = ndcg_at_k(ranked_lists, k) results['Coverage'] = catalog_coverage(ranked_lists, all_items) results['Novelty'] = novelty(ranked_lists, item_popularity) results['Diversity'] = intra_list_diversity(ranked_lists, item_features) return results def summarize_results(results_dict): return pd.DataFrame(results_dict).T def bootstrap_metric(metric_func, predictions, testset, all_items, item_popularity, item_features, n_bootstrap=100, k=10): scores = [] uids = list({p[0] for p in predictions}) for _ in range(n_bootstrap): sampled_uids = np.random.choice(uids, size=len(uids), replace=True) sampled_preds = [p for p in predictions if p[0] in sampled_uids] ranked_lists = predictions_to_ranked_lists(sampled_preds, k) score = metric_func(ranked_lists, k) scores.append(score) return np.percentile(scores, [2.5, 97.5])