TasteEngine / recommender /collaborative.py
Abdallah4z's picture
Enhance evaluation API and frontend for collaborative filtering methods
60c3ccb
import numpy as np
import pandas as pd
from utils.similarity import cosine_similarity, pearson_similarity, adjusted_cosine_similarity
from utils.helpers import build_user_item_matrix
class CollaborativeFiltering:
def __init__(self, ratings_df):
self.ratings = ratings_df
self.matrix = build_user_item_matrix(ratings_df)
self.user_item_matrix = self.matrix.values
self.n_users, self.n_items = self.user_item_matrix.shape
self.user_ids = self.matrix.index.values
self.item_ids = self.matrix.columns.values
self.user_means = np.nanmean(self.user_item_matrix, axis=1)
self.global_mean = np.nanmean(self.user_item_matrix)
self._svd_cache = None
self._slope_one_dev = None
def _get_user_index(self, user_id):
indices = np.where(self.user_ids == user_id)[0]
return indices[0] if len(indices) > 0 else None
def _get_item_index(self, item_id):
indices = np.where(self.item_ids == item_id)[0]
return indices[0] if len(indices) > 0 else None
def user_based_cf(self, user_id, n_recommendations=10, k=20):
u_idx = self._get_user_index(user_id)
if u_idx is None:
return []
matrix_filled = np.nan_to_num(self.user_item_matrix, nan=self.global_mean)
sim_matrix = cosine_similarity(matrix_filled)
user_sim = sim_matrix[u_idx]
user_sim[u_idx] = 0
user_ratings = self.user_item_matrix[u_idx]
unseen = np.where(np.isnan(user_ratings))[0]
if len(unseen) == 0:
return []
predictions = []
for i_idx in unseen:
similar_users = np.argsort(user_sim)[::-1][:k]
valid = []
for su in similar_users:
if not np.isnan(self.user_item_matrix[su, i_idx]) and user_sim[su] > 0:
valid.append(su)
if not valid:
continue
sim_vals = user_sim[valid]
ratings_vals = self.user_item_matrix[valid, i_idx]
pred = np.average(ratings_vals, weights=sim_vals)
predictions.append((int(self.item_ids[i_idx]), float(pred)))
predictions.sort(key=lambda x: x[1], reverse=True)
return predictions[:n_recommendations]
def item_based_cf(self, user_id, n_recommendations=10, k=15):
u_idx = self._get_user_index(user_id)
if u_idx is None:
return []
item_sim = adjusted_cosine_similarity(self.user_item_matrix)
user_ratings = self.user_item_matrix[u_idx]
unseen = np.where(np.isnan(user_ratings))[0]
rated = np.where(~np.isnan(user_ratings))[0]
if len(rated) == 0:
return []
predictions = []
for i_idx in unseen:
sim_to_rated = item_sim[i_idx, rated]
best = np.argsort(sim_to_rated)[::-1][:k]
valid = [(r, sim_to_rated[r]) for r in best if sim_to_rated[r] > 0 and r < len(rated)]
if not valid:
continue
neighbor_indices = [rated[r[0]] for r in valid]
sim_vals = [r[1] for r in valid]
rating_vals = user_ratings[neighbor_indices]
pred = np.average(rating_vals, weights=sim_vals)
predictions.append((int(self.item_ids[i_idx]), float(pred)))
predictions.sort(key=lambda x: x[1], reverse=True)
return predictions[:n_recommendations]
def train_svd_generator(self, n_factors=20, n_epochs=100, lr=0.01, reg=0.02):
if self._svd_cache is not None:
return
matrix_imputed = self.user_item_matrix.copy()
matrix_imputed = np.nan_to_num(matrix_imputed, nan=self.global_mean)
n_u, n_i = matrix_imputed.shape
np.random.seed(42)
P = np.random.normal(0, 0.1, (n_u, n_factors))
Q = np.random.normal(0, 0.1, (n_i, n_factors))
bu = np.zeros(n_u)
bi = np.zeros(n_i)
observed = []
for u in range(n_u):
for i in range(n_i):
if not np.isnan(self.user_item_matrix[u, i]):
observed.append((u, i))
for epoch in range(n_epochs):
np.random.shuffle(observed)
for u, i in observed:
r = self.user_item_matrix[u, i]
pred = self.global_mean + bu[u] + bi[i] + np.dot(P[u], Q[i])
err = r - pred
bu[u] += lr * (err - reg * bu[u])
bi[i] += lr * (err - reg * bi[i])
P[u] += lr * (err * Q[i] - reg * P[u])
Q[i] += lr * (err * P[u] - reg * Q[i])
yield epoch + 1, n_epochs
self._svd_cache = (P, Q, bu, bi)
def svd(self, user_id, n_recommendations=10, n_factors=20, n_epochs=100, lr=0.01, reg=0.02):
u_idx = self._get_user_index(user_id)
if u_idx is None:
return []
for _ in self.train_svd_generator(n_factors, n_epochs, lr, reg):
pass
P, Q, bu, bi = self._svd_cache
user_ratings = self.user_item_matrix[u_idx]
unseen = np.where(np.isnan(user_ratings))[0]
predictions = []
for i_idx in unseen:
pred = self.global_mean + bu[u_idx] + bi[i_idx] + np.dot(P[u_idx], Q[i_idx])
predictions.append((int(self.item_ids[i_idx]), float(pred)))
predictions.sort(key=lambda x: x[1], reverse=True)
return predictions[:n_recommendations]
def knn_cf(self, user_id, n_recommendations=10, k=10):
u_idx = self._get_user_index(user_id)
if u_idx is None:
return []
from sklearn.neighbors import NearestNeighbors
matrix_imputed = np.nan_to_num(self.user_item_matrix, nan=self.global_mean)
nn = NearestNeighbors(n_neighbors=min(k + 1, self.n_users), metric="cosine")
nn.fit(matrix_imputed)
distances, indices = nn.kneighbors(matrix_imputed[u_idx].reshape(1, -1))
neighbor_indices = indices[0][1:]
user_ratings = self.user_item_matrix[u_idx]
unseen = np.where(np.isnan(user_ratings))[0]
predictions = []
for i_idx in unseen:
neighbor_ratings = []
neighbor_dists = []
for ni in neighbor_indices:
if not np.isnan(self.user_item_matrix[ni, i_idx]):
neighbor_ratings.append(self.user_item_matrix[ni, i_idx])
neighbor_dists.append(distances[0][list(indices[0]).index(ni)] + 1e-6)
if not neighbor_ratings:
continue
weights = 1.0 / np.array(neighbor_dists)
pred = np.average(neighbor_ratings, weights=weights)
predictions.append((int(self.item_ids[i_idx]), float(pred)))
predictions.sort(key=lambda x: x[1], reverse=True)
return predictions[:n_recommendations]
def compute_slope_one_dev_generator(self):
if self._slope_one_dev is not None:
return
dev = np.zeros((self.n_items, self.n_items))
cnt = np.zeros((self.n_items, self.n_items), dtype=int)
for i in range(self.n_items):
for j in range(self.n_items):
if i == j:
continue
diff_sum = 0.0
count = 0
for u in range(self.n_users):
vi = self.user_item_matrix[u, i]
vj = self.user_item_matrix[u, j]
if not np.isnan(vi) and not np.isnan(vj):
diff_sum += vi - vj
count += 1
if count > 0:
dev[i, j] = diff_sum / count
cnt[i, j] = count
yield i + 1, self.n_items
self._slope_one_dev = (dev, cnt)
def slope_one(self, user_id, n_recommendations=10):
u_idx = self._get_user_index(user_id)
if u_idx is None:
return []
for _ in self.compute_slope_one_dev_generator():
pass
dev, cnt = self._slope_one_dev
user_ratings = self.user_item_matrix[u_idx]
unseen = np.where(np.isnan(user_ratings))[0]
rated = np.where(~np.isnan(user_ratings))[0]
if len(rated) == 0:
return []
predictions = []
for i_idx in unseen:
numerator = 0.0
denominator = 0.0
for j_idx in rated:
if cnt[i_idx, j_idx] > 0:
numerator += user_ratings[j_idx] + dev[i_idx, j_idx]
denominator += 1
if denominator > 0:
pred = numerator / denominator
predictions.append((int(self.item_ids[i_idx]), float(pred)))
predictions.sort(key=lambda x: x[1], reverse=True)
return predictions[:n_recommendations]
def recommend(self, method, user_id, n_recommendations=10, **kwargs):
methods = {
"user_based": self.user_based_cf,
"item_based": self.item_based_cf,
"svd": self.svd,
"knn": self.knn_cf,
"slope_one": self.slope_one,
}
func = methods.get(method)
if func is None:
raise ValueError(f"Unknown method: {method}")
return func(user_id, n_recommendations=n_recommendations, **kwargs)
def get_all_methods(self):
return ["user_based", "item_based", "svd", "knn", "slope_one"]