TasteEngine / recommender /evaluation.py
Ahmed694200's picture
Fix SVD convergence and approach comparison data leakage
cabd6cc
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error
class Evaluator:
def __init__(self, ratings_df, predictions_df=None):
self.ratings = ratings_df
self.predictions = predictions_df
self._test_ratings = None
def set_test_ratings(self, test_ratings):
self._test_ratings = test_ratings
def _get_relevant_for_user(self, user_id, rating_threshold=3.5):
if self._test_ratings is not None:
relevant = self._test_ratings[
(self._test_ratings["user_id"] == user_id) &
(self._test_ratings["rating"] >= rating_threshold)
]["product_id"].tolist()
else:
relevant = self.ratings[
(self.ratings["user_id"] == user_id) &
(self.ratings["rating"] >= rating_threshold)
]["product_id"].tolist()
return relevant
def rmse(self, y_true, y_pred):
return float(np.sqrt(mean_squared_error(y_true, y_pred)))
def mae(self, y_true, y_pred):
return float(mean_absolute_error(y_true, y_pred))
def precision_at_k(self, recommended_items, relevant_items, k=5):
top_k = recommended_items[:k]
hits = len(set(top_k) & set(relevant_items))
return hits / k if k > 0 else 0
def recall_at_k(self, recommended_items, relevant_items, k=5):
top_k = recommended_items[:k]
hits = len(set(top_k) & set(relevant_items))
return hits / len(relevant_items) if len(relevant_items) > 0 else 0
def f1_at_k(self, recommended_items, relevant_items, k=5):
p = self.precision_at_k(recommended_items, relevant_items, k)
r = self.recall_at_k(recommended_items, relevant_items, k)
return 2 * p * r / (p + r) if (p + r) > 0 else 0
def coverage(self, recommended_items_list, total_items):
recommended_set = set()
for items in recommended_items_list:
recommended_set.update(items)
return len(recommended_set) / total_items if total_items > 0 else 0
def evaluate_cf_method(self, method_name, cf_instance, test_ratings, k=5, rating_threshold=3.5, max_users=20):
y_true = []
y_pred = []
all_recommended = []
all_relevant_counts = []
user_ids = test_ratings["user_id"].unique()[:max_users]
for user_id in user_ids:
user_test = test_ratings[test_ratings["user_id"] == user_id]
actual_items = user_test[user_test["rating"] >= rating_threshold]["product_id"].tolist()
try:
recs = cf_instance.recommend(method_name, user_id, n_recommendations=20)
except Exception:
recs = []
rec_items = [r[0] for r in recs]
all_recommended.append(rec_items)
all_relevant_counts.append(len(actual_items))
for _, row in user_test.iterrows():
y_true.append(row["rating"])
found = False
for rec_id, pred_rating in recs:
if rec_id == row["product_id"]:
y_pred.append(pred_rating)
found = True
break
if not found:
y_pred.append(2.5)
user_precisions = []
user_recalls = []
user_f1s = []
for i, user_id in enumerate(user_ids):
user_test = test_ratings[test_ratings["user_id"] == user_id]
relevant = user_test[user_test["rating"] >= rating_threshold]["product_id"].tolist()
if not relevant or i >= len(all_recommended):
continue
rec_items = all_recommended[i]
user_precisions.append(self.precision_at_k(rec_items, relevant, k))
user_recalls.append(self.recall_at_k(rec_items, relevant, k))
user_f1s.append(self.f1_at_k(rec_items, relevant, k))
total_items = len(cf_instance.item_ids) if hasattr(cf_instance, "item_ids") else None
return {
"method": method_name,
"RMSE": self.rmse(y_true[-len(y_pred):], y_pred),
"MAE": self.mae(y_true[-len(y_pred):], y_pred),
f"Precision@{k}": round(np.mean(user_precisions), 4) if user_precisions else 0,
f"Recall@{k}": round(np.mean(user_recalls), 4) if user_recalls else 0,
f"F1@{k}": round(np.mean(user_f1s), 4) if user_f1s else 0,
"Coverage": self.coverage(all_recommended, total_items) if total_items else 0,
}
def compare_cf_methods(self, cf_instance, test_ratings, k=5):
methods = cf_instance.get_all_methods()
results = []
for method in methods:
try:
result = self.evaluate_cf_method(method, cf_instance, test_ratings, k)
results.append(result)
except Exception as e:
results.append({"method": method, "error": str(e)})
return results
def evaluate_approach(self, approach_name, recommender_fn, test_users, products_df, k=5):
all_recommended = []
precisions = []
recalls = []
for user_id in test_users:
try:
recs = recommender_fn(user_id)
except Exception:
recs = []
rec_items = [r[0] for r in recs]
all_recommended.append(rec_items)
if hasattr(self, "_get_relevant_for_user"):
relevant = self._get_relevant_for_user(user_id)
precisions.append(self.precision_at_k(rec_items, relevant, k))
recalls.append(self.recall_at_k(rec_items, relevant, k))
return {
"approach": approach_name,
f"Precision@{k}": round(np.mean(precisions), 4) if precisions else 0,
f"Recall@{k}": round(np.mean(recalls), 4) if recalls else 0,
"Coverage": self.coverage(all_recommended, len(products_df)),
}
def compare_approaches(self, cf_instance, cb_instance, kb_instance, test_ratings, products_df, k=5):
self.set_test_ratings(test_ratings)
test_users = test_ratings["user_id"].unique()[:20]
def cf_recommender(uid):
return cf_instance.recommend("item_based", uid, n_recommendations=10)
results = []
try:
cf_result = self.evaluate_approach("Collaborative Filtering", cf_recommender, test_users, products_df, k)
results.append(cf_result)
except Exception as e:
results.append({"approach": "Collaborative Filtering", "error": str(e)})
return results