Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from sklearn.metrics import mean_squared_error, mean_absolute_error | |
| class Evaluator: | |
| def __init__(self, ratings_df, predictions_df=None): | |
| self.ratings = ratings_df | |
| self.predictions = predictions_df | |
| self._test_ratings = None | |
| def set_test_ratings(self, test_ratings): | |
| self._test_ratings = test_ratings | |
| def _get_relevant_for_user(self, user_id, rating_threshold=3.5): | |
| if self._test_ratings is not None: | |
| relevant = self._test_ratings[ | |
| (self._test_ratings["user_id"] == user_id) & | |
| (self._test_ratings["rating"] >= rating_threshold) | |
| ]["product_id"].tolist() | |
| else: | |
| relevant = self.ratings[ | |
| (self.ratings["user_id"] == user_id) & | |
| (self.ratings["rating"] >= rating_threshold) | |
| ]["product_id"].tolist() | |
| return relevant | |
| def rmse(self, y_true, y_pred): | |
| return float(np.sqrt(mean_squared_error(y_true, y_pred))) | |
| def mae(self, y_true, y_pred): | |
| return float(mean_absolute_error(y_true, y_pred)) | |
| def precision_at_k(self, recommended_items, relevant_items, k=5): | |
| top_k = recommended_items[:k] | |
| hits = len(set(top_k) & set(relevant_items)) | |
| return hits / k if k > 0 else 0 | |
| def recall_at_k(self, recommended_items, relevant_items, k=5): | |
| top_k = recommended_items[:k] | |
| hits = len(set(top_k) & set(relevant_items)) | |
| return hits / len(relevant_items) if len(relevant_items) > 0 else 0 | |
| def f1_at_k(self, recommended_items, relevant_items, k=5): | |
| p = self.precision_at_k(recommended_items, relevant_items, k) | |
| r = self.recall_at_k(recommended_items, relevant_items, k) | |
| return 2 * p * r / (p + r) if (p + r) > 0 else 0 | |
| def coverage(self, recommended_items_list, total_items): | |
| recommended_set = set() | |
| for items in recommended_items_list: | |
| recommended_set.update(items) | |
| return len(recommended_set) / total_items if total_items > 0 else 0 | |
| def evaluate_cf_method(self, method_name, cf_instance, test_ratings, k=5, rating_threshold=3.5, max_users=20): | |
| y_true = [] | |
| y_pred = [] | |
| all_recommended = [] | |
| all_relevant_counts = [] | |
| user_ids = test_ratings["user_id"].unique()[:max_users] | |
| for user_id in user_ids: | |
| user_test = test_ratings[test_ratings["user_id"] == user_id] | |
| actual_items = user_test[user_test["rating"] >= rating_threshold]["product_id"].tolist() | |
| try: | |
| recs = cf_instance.recommend(method_name, user_id, n_recommendations=20) | |
| except Exception: | |
| recs = [] | |
| rec_items = [r[0] for r in recs] | |
| all_recommended.append(rec_items) | |
| all_relevant_counts.append(len(actual_items)) | |
| for _, row in user_test.iterrows(): | |
| y_true.append(row["rating"]) | |
| found = False | |
| for rec_id, pred_rating in recs: | |
| if rec_id == row["product_id"]: | |
| y_pred.append(pred_rating) | |
| found = True | |
| break | |
| if not found: | |
| y_pred.append(2.5) | |
| user_precisions = [] | |
| user_recalls = [] | |
| user_f1s = [] | |
| for i, user_id in enumerate(user_ids): | |
| user_test = test_ratings[test_ratings["user_id"] == user_id] | |
| relevant = user_test[user_test["rating"] >= rating_threshold]["product_id"].tolist() | |
| if not relevant or i >= len(all_recommended): | |
| continue | |
| rec_items = all_recommended[i] | |
| user_precisions.append(self.precision_at_k(rec_items, relevant, k)) | |
| user_recalls.append(self.recall_at_k(rec_items, relevant, k)) | |
| user_f1s.append(self.f1_at_k(rec_items, relevant, k)) | |
| total_items = len(cf_instance.item_ids) if hasattr(cf_instance, "item_ids") else None | |
| return { | |
| "method": method_name, | |
| "RMSE": self.rmse(y_true[-len(y_pred):], y_pred), | |
| "MAE": self.mae(y_true[-len(y_pred):], y_pred), | |
| f"Precision@{k}": round(np.mean(user_precisions), 4) if user_precisions else 0, | |
| f"Recall@{k}": round(np.mean(user_recalls), 4) if user_recalls else 0, | |
| f"F1@{k}": round(np.mean(user_f1s), 4) if user_f1s else 0, | |
| "Coverage": self.coverage(all_recommended, total_items) if total_items else 0, | |
| } | |
| def compare_cf_methods(self, cf_instance, test_ratings, k=5): | |
| methods = cf_instance.get_all_methods() | |
| results = [] | |
| for method in methods: | |
| try: | |
| result = self.evaluate_cf_method(method, cf_instance, test_ratings, k) | |
| results.append(result) | |
| except Exception as e: | |
| results.append({"method": method, "error": str(e)}) | |
| return results | |
| def evaluate_approach(self, approach_name, recommender_fn, test_users, products_df, k=5): | |
| all_recommended = [] | |
| precisions = [] | |
| recalls = [] | |
| for user_id in test_users: | |
| try: | |
| recs = recommender_fn(user_id) | |
| except Exception: | |
| recs = [] | |
| rec_items = [r[0] for r in recs] | |
| all_recommended.append(rec_items) | |
| if hasattr(self, "_get_relevant_for_user"): | |
| relevant = self._get_relevant_for_user(user_id) | |
| precisions.append(self.precision_at_k(rec_items, relevant, k)) | |
| recalls.append(self.recall_at_k(rec_items, relevant, k)) | |
| return { | |
| "approach": approach_name, | |
| f"Precision@{k}": round(np.mean(precisions), 4) if precisions else 0, | |
| f"Recall@{k}": round(np.mean(recalls), 4) if recalls else 0, | |
| "Coverage": self.coverage(all_recommended, len(products_df)), | |
| } | |
| def compare_approaches(self, cf_instance, cb_instance, kb_instance, test_ratings, products_df, k=5): | |
| self.set_test_ratings(test_ratings) | |
| test_users = test_ratings["user_id"].unique()[:20] | |
| def cf_recommender(uid): | |
| return cf_instance.recommend("item_based", uid, n_recommendations=10) | |
| results = [] | |
| try: | |
| cf_result = self.evaluate_approach("Collaborative Filtering", cf_recommender, test_users, products_df, k) | |
| results.append(cf_result) | |
| except Exception as e: | |
| results.append({"approach": "Collaborative Filtering", "error": str(e)}) | |
| return results | |