Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from collections import defaultdict | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| def leave_one_out_by_timestamp(ratings_df): | |
| ratings_df = ratings_df.sort_values(['userId', 'timestamp']) | |
| train_idx, test_idx = [], [] | |
| for user, group in ratings_df.groupby('userId'): | |
| if len(group) > 1: | |
| test_idx.append(group.index[-1]) | |
| train_idx.extend(group.index[:-1]) | |
| else: | |
| test_idx.append(group.index[-1]) | |
| train = ratings_df.loc[train_idx] | |
| test = ratings_df.loc[test_idx] | |
| return train, test | |
| def precision_at_k(ranked_lists, k=10): | |
| precisions = [] | |
| for uid, items in ranked_lists.items(): | |
| relevant = [r for _, _, r in items[:k] if r >= 4] | |
| precisions.append(len(relevant) / k) | |
| return np.mean(precisions) | |
| def recall_at_k(ranked_lists, test_truth, k=10): | |
| recalls = [] | |
| truth = defaultdict(set) | |
| # Accept both DataFrame and ndarray for test_truth | |
| if isinstance(test_truth, pd.DataFrame): | |
| for _, row in test_truth.iterrows(): | |
| uid, iid, r = row['userId'], row['movieId'], row['rating'] | |
| if r >= 4: | |
| truth[uid].add(iid) | |
| else: | |
| for row in test_truth: | |
| # row can be (uid, iid, r, ...) or (uid, iid, r) | |
| uid, iid, r = row[:3] | |
| if r >= 4: | |
| truth[uid].add(iid) | |
| for uid, items in ranked_lists.items(): | |
| recommended = {iid for iid, _, _ in items[:k]} | |
| relevant = truth.get(uid, set()) | |
| if relevant: | |
| recalls.append(len(recommended & relevant) / len(relevant)) | |
| return np.mean(recalls) | |
| def ndcg_at_k(ranked_lists, k=10): | |
| ndcgs = [] | |
| for uid, items in ranked_lists.items(): | |
| dcg = 0.0 | |
| idcg = 0.0 | |
| rels = [1 if r >= 4 else 0 for _, _, r in items[:k]] | |
| for i, rel in enumerate(rels): | |
| dcg += (2**rel - 1) / np.log2(i + 2) | |
| ideal_rels = sorted(rels, reverse=True) | |
| for i, rel in enumerate(ideal_rels): | |
| idcg += (2**rel - 1) / np.log2(i + 2) | |
| if idcg > 0: | |
| ndcgs.append(dcg / idcg) | |
| return np.mean(ndcgs) | |
| def catalog_coverage(ranked_lists, all_items): | |
| recommended = {iid for items in ranked_lists.values() for iid, _, _ in items} | |
| return len(recommended) / len(all_items) | |
| def novelty(ranked_lists, item_popularity): | |
| novelties = [] | |
| total = sum(item_popularity.values()) | |
| for items in ranked_lists.values(): | |
| for iid, _, _ in items: | |
| p = item_popularity.get(iid, 1) / total | |
| novelties.append(-np.log2(p + 1e-9)) | |
| return np.mean(novelties) | |
| def intra_list_diversity(ranked_lists, item_features): | |
| diversities = [] | |
| for items in ranked_lists.values(): | |
| iids = [iid for iid, _, _ in items] | |
| feats = [item_features[iid] for iid in iids if iid in item_features] | |
| if len(feats) > 1: | |
| sims = cosine_similarity(feats) | |
| upper = sims[np.triu_indices_from(sims, k=1)] | |
| diversities.append(1 - np.mean(upper)) | |
| return np.mean(diversities) | |
| def predictions_to_ranked_lists(predictions, k=20): | |
| user_items = defaultdict(list) | |
| for uid, iid, true_r, est, _ in predictions: | |
| user_items[uid].append((iid, est, true_r)) | |
| ranked = {} | |
| for uid, items in user_items.items(): | |
| ranked[uid] = sorted(items, key=lambda x: x[1], reverse=True)[:k] | |
| return ranked | |
| def evaluate_all(predictions, testset, all_items, item_popularity, item_features, k_list=[10, 20]): | |
| ranked_lists = predictions_to_ranked_lists(predictions, k=max(k_list)) | |
| results = {} | |
| for k in k_list: | |
| results[f'Precision@{k}'] = precision_at_k(ranked_lists, k) | |
| results[f'Recall@{k}'] = recall_at_k(ranked_lists, testset, k) | |
| results[f'NDCG@{k}'] = ndcg_at_k(ranked_lists, k) | |
| results['Coverage'] = catalog_coverage(ranked_lists, all_items) | |
| results['Novelty'] = novelty(ranked_lists, item_popularity) | |
| results['Diversity'] = intra_list_diversity(ranked_lists, item_features) | |
| return results | |
| def summarize_results(results_dict): | |
| return pd.DataFrame(results_dict).T | |
| def bootstrap_metric(metric_func, predictions, testset, all_items, item_popularity, item_features, n_bootstrap=100, k=10): | |
| scores = [] | |
| uids = list({p[0] for p in predictions}) | |
| for _ in range(n_bootstrap): | |
| sampled_uids = np.random.choice(uids, size=len(uids), replace=True) | |
| sampled_preds = [p for p in predictions if p[0] in sampled_uids] | |
| ranked_lists = predictions_to_ranked_lists(sampled_preds, k) | |
| score = metric_func(ranked_lists, k) | |
| scores.append(score) | |
| return np.percentile(scores, [2.5, 97.5]) | |