import numpy as np import pandas as pd from utils.similarity import cosine_similarity, pearson_similarity, adjusted_cosine_similarity from utils.helpers import build_user_item_matrix class CollaborativeFiltering: def __init__(self, ratings_df): self.ratings = ratings_df self.matrix = build_user_item_matrix(ratings_df) self.user_item_matrix = self.matrix.values self.n_users, self.n_items = self.user_item_matrix.shape self.user_ids = self.matrix.index.values self.item_ids = self.matrix.columns.values self.user_means = np.nanmean(self.user_item_matrix, axis=1) self.global_mean = np.nanmean(self.user_item_matrix) self._svd_cache = None self._slope_one_dev = None def _get_user_index(self, user_id): indices = np.where(self.user_ids == user_id)[0] return indices[0] if len(indices) > 0 else None def _get_item_index(self, item_id): indices = np.where(self.item_ids == item_id)[0] return indices[0] if len(indices) > 0 else None def user_based_cf(self, user_id, n_recommendations=10, k=20): u_idx = self._get_user_index(user_id) if u_idx is None: return [] matrix_filled = np.nan_to_num(self.user_item_matrix, nan=self.global_mean) sim_matrix = cosine_similarity(matrix_filled) user_sim = sim_matrix[u_idx] user_sim[u_idx] = 0 user_ratings = self.user_item_matrix[u_idx] unseen = np.where(np.isnan(user_ratings))[0] if len(unseen) == 0: return [] predictions = [] for i_idx in unseen: similar_users = np.argsort(user_sim)[::-1][:k] valid = [] for su in similar_users: if not np.isnan(self.user_item_matrix[su, i_idx]) and user_sim[su] > 0: valid.append(su) if not valid: continue sim_vals = user_sim[valid] ratings_vals = self.user_item_matrix[valid, i_idx] pred = np.average(ratings_vals, weights=sim_vals) predictions.append((int(self.item_ids[i_idx]), float(pred))) predictions.sort(key=lambda x: x[1], reverse=True) return predictions[:n_recommendations] def item_based_cf(self, user_id, n_recommendations=10, k=15): u_idx = self._get_user_index(user_id) if u_idx is None: return [] item_sim = adjusted_cosine_similarity(self.user_item_matrix) user_ratings = self.user_item_matrix[u_idx] unseen = np.where(np.isnan(user_ratings))[0] rated = np.where(~np.isnan(user_ratings))[0] if len(rated) == 0: return [] predictions = [] for i_idx in unseen: sim_to_rated = item_sim[i_idx, rated] best = np.argsort(sim_to_rated)[::-1][:k] valid = [(r, sim_to_rated[r]) for r in best if sim_to_rated[r] > 0 and r < len(rated)] if not valid: continue neighbor_indices = [rated[r[0]] for r in valid] sim_vals = [r[1] for r in valid] rating_vals = user_ratings[neighbor_indices] pred = np.average(rating_vals, weights=sim_vals) predictions.append((int(self.item_ids[i_idx]), float(pred))) predictions.sort(key=lambda x: x[1], reverse=True) return predictions[:n_recommendations] def train_svd_generator(self, n_factors=20, n_epochs=100, lr=0.01, reg=0.02): if self._svd_cache is not None: return matrix_imputed = self.user_item_matrix.copy() matrix_imputed = np.nan_to_num(matrix_imputed, nan=self.global_mean) n_u, n_i = matrix_imputed.shape np.random.seed(42) P = np.random.normal(0, 0.1, (n_u, n_factors)) Q = np.random.normal(0, 0.1, (n_i, n_factors)) bu = np.zeros(n_u) bi = np.zeros(n_i) observed = [] for u in range(n_u): for i in range(n_i): if not np.isnan(self.user_item_matrix[u, i]): observed.append((u, i)) for epoch in range(n_epochs): np.random.shuffle(observed) for u, i in observed: r = self.user_item_matrix[u, i] pred = self.global_mean + bu[u] + bi[i] + np.dot(P[u], Q[i]) err = r - pred bu[u] += lr * (err - reg * bu[u]) bi[i] += lr * (err - reg * bi[i]) P[u] += lr * (err * Q[i] - reg * P[u]) Q[i] += lr * (err * P[u] - reg * Q[i]) yield epoch + 1, n_epochs self._svd_cache = (P, Q, bu, bi) def svd(self, user_id, n_recommendations=10, n_factors=20, n_epochs=100, lr=0.01, reg=0.02): u_idx = self._get_user_index(user_id) if u_idx is None: return [] for _ in self.train_svd_generator(n_factors, n_epochs, lr, reg): pass P, Q, bu, bi = self._svd_cache user_ratings = self.user_item_matrix[u_idx] unseen = np.where(np.isnan(user_ratings))[0] predictions = [] for i_idx in unseen: pred = self.global_mean + bu[u_idx] + bi[i_idx] + np.dot(P[u_idx], Q[i_idx]) predictions.append((int(self.item_ids[i_idx]), float(pred))) predictions.sort(key=lambda x: x[1], reverse=True) return predictions[:n_recommendations] def knn_cf(self, user_id, n_recommendations=10, k=10): u_idx = self._get_user_index(user_id) if u_idx is None: return [] from sklearn.neighbors import NearestNeighbors matrix_imputed = np.nan_to_num(self.user_item_matrix, nan=self.global_mean) nn = NearestNeighbors(n_neighbors=min(k + 1, self.n_users), metric="cosine") nn.fit(matrix_imputed) distances, indices = nn.kneighbors(matrix_imputed[u_idx].reshape(1, -1)) neighbor_indices = indices[0][1:] user_ratings = self.user_item_matrix[u_idx] unseen = np.where(np.isnan(user_ratings))[0] predictions = [] for i_idx in unseen: neighbor_ratings = [] neighbor_dists = [] for ni in neighbor_indices: if not np.isnan(self.user_item_matrix[ni, i_idx]): neighbor_ratings.append(self.user_item_matrix[ni, i_idx]) neighbor_dists.append(distances[0][list(indices[0]).index(ni)] + 1e-6) if not neighbor_ratings: continue weights = 1.0 / np.array(neighbor_dists) pred = np.average(neighbor_ratings, weights=weights) predictions.append((int(self.item_ids[i_idx]), float(pred))) predictions.sort(key=lambda x: x[1], reverse=True) return predictions[:n_recommendations] def compute_slope_one_dev_generator(self): if self._slope_one_dev is not None: return dev = np.zeros((self.n_items, self.n_items)) cnt = np.zeros((self.n_items, self.n_items), dtype=int) for i in range(self.n_items): for j in range(self.n_items): if i == j: continue diff_sum = 0.0 count = 0 for u in range(self.n_users): vi = self.user_item_matrix[u, i] vj = self.user_item_matrix[u, j] if not np.isnan(vi) and not np.isnan(vj): diff_sum += vi - vj count += 1 if count > 0: dev[i, j] = diff_sum / count cnt[i, j] = count yield i + 1, self.n_items self._slope_one_dev = (dev, cnt) def slope_one(self, user_id, n_recommendations=10): u_idx = self._get_user_index(user_id) if u_idx is None: return [] for _ in self.compute_slope_one_dev_generator(): pass dev, cnt = self._slope_one_dev user_ratings = self.user_item_matrix[u_idx] unseen = np.where(np.isnan(user_ratings))[0] rated = np.where(~np.isnan(user_ratings))[0] if len(rated) == 0: return [] predictions = [] for i_idx in unseen: numerator = 0.0 denominator = 0.0 for j_idx in rated: if cnt[i_idx, j_idx] > 0: numerator += user_ratings[j_idx] + dev[i_idx, j_idx] denominator += 1 if denominator > 0: pred = numerator / denominator predictions.append((int(self.item_ids[i_idx]), float(pred))) predictions.sort(key=lambda x: x[1], reverse=True) return predictions[:n_recommendations] def recommend(self, method, user_id, n_recommendations=10, **kwargs): methods = { "user_based": self.user_based_cf, "item_based": self.item_based_cf, "svd": self.svd, "knn": self.knn_cf, "slope_one": self.slope_one, } func = methods.get(method) if func is None: raise ValueError(f"Unknown method: {method}") return func(user_id, n_recommendations=n_recommendations, **kwargs) def get_all_methods(self): return ["user_based", "item_based", "svd", "knn", "slope_one"]