import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn.decomposition import TruncatedSVD from sklearn.neighbors import NearestNeighbors from sklearn.metrics.pairwise import cosine_similarity from scipy.sparse import csr_matrix from collections import Counter from functools import cached_property plt.style.use("fivethirtyeight") class DataAnalysis: def __init__(self) -> None: self.movies: pd.DataFrame = pd.read_csv( './data/movies.csv') self.ratings: pd.DataFrame = pd.read_csv( './data/ratings.csv') def ratings_countplot(self,): fig, ax = plt.subplots(nrows=1, ncols=1) sns.countplot(data=self.ratings, x='rating', ax=ax) return fig def ratings_kdeplot(self,): fig, ax = plt.subplots(nrows=1, ncols=1) sns.kdeplot(data=self.ratings, x='rating', ax=ax) return fig def ratings_ecdfplot(self,): fig, ax = plt.subplots(nrows=1, ncols=1) sns.ecdfplot(data=self.ratings, x='rating', ax=ax) return fig def rating_scatterplot(self,): fig, ax = plt.subplots(nrows=1, ncols=1) self.ratings[['userId', 'rating']].groupby( 'userId').mean().plot(ls='', marker='.', ax=ax) ax.axhline( y=self.ratings[['userId', 'rating']].groupby( 'userId').mean().mean().values.item(), color='red', alpha=0.5 ) ax.legend(['Mean user rating', 'Mean rating across users']) return fig def most_rated_movie(self, top_k=10): data = (self.ratings.movieId.value_counts() .reset_index() .merge(right=self.movies[['movieId', 'title']], on='movieId')[['title', 'count']] .rename({'count': 'Number of Ratings', 'title': 'Movie Title'}, axis=1)) return data.head(top_k) def rating_stats(self,): avg_movie_rating = (self.ratings[['movieId', 'rating']] .groupby('movieId').agg(['mean', 'count']) .droplevel(axis=1, level=0) .reset_index(level=0)) avg_movie_rating = avg_movie_rating.merge( self.movies[['movieId', 'title']], on='movieId') avg_movie_rating = (avg_movie_rating .rename(axis=1, mapper={'mean': 'Average Rating', 'count': "Number of Rating", 'title': 'Movie Title', 'genres': 'Genres'} )) avg_movie_rating = avg_movie_rating.drop(columns='movieId') return avg_movie_rating def bayesian_avg(self, C: float, m: float): return lambda rating: (C*m + rating.sum()) / (C + rating.count()) def ratings_bayesian_avg(self,): rating_agg = (self.ratings[['rating', 'movieId']] .groupby('movieId').agg(['mean', 'count']) .droplevel(axis=1, level=0) .reset_index() ) C = rating_agg['count'].mean() m = rating_agg['mean'].mean() bay_avg_fn = self.bayesian_avg(C=C, m=m) rating_bay_avg = (self.ratings[['rating', 'movieId']] .groupby('movieId').agg([bay_avg_fn, 'count']) ).droplevel(level=0, axis=1).reset_index(level=0) rating_bay_avg = rating_bay_avg.merge( self.movies[['title', 'movieId']], on='movieId') rating_bay_avg = rating_bay_avg.rename({'': 'Bayesian Average', 'count': 'Number of ratings', 'title': 'Movie Title'}, axis=1) return rating_bay_avg.drop(columns=['movieId']) def genres_count(self,): movie_genres = self.movies.copy() movie_genres.genres = self.movies.genres.str.split(pat='|') genre_counter = Counter( [genre for genres in movie_genres.genres for genre in genres]) genre_counter_df = pd.DataFrame( data=dict(genre_counter.most_common()), index=['Count']) genre_counter_df.columns.name = "Genres" genre_counter_df = genre_counter_df.T.reset_index() fig, ax = plt.subplots(nrows=1, ncols=1) sns.barplot(data=genre_counter_df, x='Count', y='Genres', ax=ax) return fig class Recommender: def __init__(self) -> None: self.ratings: pd.DataFrame = pd.read_csv( './data/ratings.csv') self.movies: pd.DataFrame = pd.read_csv( './data/movies.csv') self.M: int = self.ratings.userId.nunique() self.N: int = self.ratings.movieId.nunique() self.ratings_userid_index_map = dict( zip(self.ratings.userId.unique(), range(self.M))) self.ratings_movieid_index_map = dict( zip(self.ratings.movieId.unique(), range(self.N))) self.ratings_userid_index_invmap = dict( zip(range(self.M), self.ratings.userId.unique())) self.ratings_movieid_index_invmap = dict( zip(range(self.N), self.ratings.movieId.unique())) self.movie_id_title_map = dict( zip(self.movies.movieId, self.movies.title)) self.movie_id_title_invmap = dict( zip(self.movies.title, self.movies.movieId)) self.movie_id_index_map = dict( zip(self.movies.movieId, self.movies.index)) self.movie_id_index_invmap = dict( zip(self.movies.index, self.movies.movieId)) def nearest_neighbors(self, matrix: np.ndarray | csr_matrix): knn = NearestNeighbors( n_neighbors=10, algorithm="brute", metric="cosine") knn.fit(matrix) return knn def output_recommendation(self, search_id: int, similar_movies: np.ndarray, mapper_index_id: dict): response = [] for i in similar_movies: movie_id = mapper_index_id[i] if movie_id != search_id: response.append(self.movie_id_title_map[movie_id]) return response class Collaborative_filtering(Recommender): def __init__(self) -> None: super(Collaborative_filtering, self).__init__() pass @cached_property def user_item_matrix(self,) -> csr_matrix: # build user-item matrix user_index = [self.ratings_userid_index_map[id] for id in self.ratings.userId] movie_index = [self.ratings_movieid_index_map[id] for id in self.ratings.movieId] user_item_matrix = csr_matrix( (self.ratings.rating, (user_index, movie_index)), shape=(self.M, self.N)) return user_item_matrix @cached_property def matrix_factorization(self,) -> np.ndarray: svd = TruncatedSVD(n_components=20, n_iter=10, random_state=42) Q = svd.fit_transform(self.user_item_matrix.T) return Q def find_similar_movies(self, title: str, k: int = 11, use_matrix_factorization=False) -> np.ndarray: search_id: int = self.movie_id_title_invmap[title] movie_index: int = self.ratings_movieid_index_map[search_id] if use_matrix_factorization: matrix: np.ndarray = self.matrix_factorization else: matrix: csr_matrix = self.user_item_matrix.T movie_vector: np.ndarray = matrix[movie_index] if isinstance(movie_vector, np.ndarray): movie_vector = movie_vector.reshape((1, -1)) knn = self.nearest_neighbors(matrix=matrix) neighbors: np.ndarray = knn.kneighbors( movie_vector, n_neighbors=k, return_distance=False) response = self.output_recommendation( search_id=search_id, similar_movies=neighbors[0], mapper_index_id=self.ratings_movieid_index_invmap) return response class Content_based_filtering(Recommender): def __init__(self) -> None: super(Content_based_filtering, self).__init__() @cached_property def user_feature_matrix(self,): movie_genres = self.movies.copy() movie_genres.genres = self.movies.genres.str.split(pat='|') genres = set( [genre_ for genres_ in movie_genres.genres for genre_ in genres_]) for genre in genres: movie_genres[genre] = movie_genres.genres.transform( lambda x: int(genre in x)) user_feature_matrix = movie_genres.drop( columns=['movieId', 'title', 'genres']) return user_feature_matrix @cached_property def cosine_similarity(self): user_feature_matrix = self.user_feature_matrix similarity_matirx = cosine_similarity( user_feature_matrix, user_feature_matrix) return similarity_matirx def find_similar_movies(self, title: str, k: int = 11): search_id: int = self.movie_id_title_invmap[title] search_index: int = self.movie_id_index_map[search_id] scores: np.ndarray = self.cosine_similarity[search_index] scores: list[tuple[int, float]] = list(zip(self.movies.index, scores)) scores = sorted(scores, key=lambda x: x[1], reverse=True) neighbors: list[int] = [item[0] for item in scores[:k]] response = self.output_recommendation( search_id=search_id, similar_movies=neighbors, mapper_index_id=self.movie_id_index_invmap) return response def find_similar_movies_based_on_feedback(self, vector: list[bool], k: int = 11): feedback_vector = np.array(vector, dtype=int).reshape((1, -1)) knn = self.nearest_neighbors(matrix=self.user_feature_matrix) neighbors: np.ndarray = knn.kneighbors( feedback_vector, n_neighbors=k, return_distance=False) response = self.output_recommendation( search_id=-1, similar_movies=neighbors[0], mapper_index_id=self.movie_id_index_invmap) return response