Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.decomposition import TruncatedSVD | |
| from sklearn.neighbors import NearestNeighbors | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from scipy.sparse import csr_matrix | |
| from collections import Counter | |
| from functools import cached_property | |
| plt.style.use("fivethirtyeight") | |
| class DataAnalysis: | |
| def __init__(self) -> None: | |
| self.movies: pd.DataFrame = pd.read_csv( | |
| './data/movies.csv') | |
| self.ratings: pd.DataFrame = pd.read_csv( | |
| './data/ratings.csv') | |
| def ratings_countplot(self,): | |
| fig, ax = plt.subplots(nrows=1, ncols=1) | |
| sns.countplot(data=self.ratings, x='rating', ax=ax) | |
| return fig | |
| def ratings_kdeplot(self,): | |
| fig, ax = plt.subplots(nrows=1, ncols=1) | |
| sns.kdeplot(data=self.ratings, x='rating', ax=ax) | |
| return fig | |
| def ratings_ecdfplot(self,): | |
| fig, ax = plt.subplots(nrows=1, ncols=1) | |
| sns.ecdfplot(data=self.ratings, x='rating', ax=ax) | |
| return fig | |
| def rating_scatterplot(self,): | |
| fig, ax = plt.subplots(nrows=1, ncols=1) | |
| self.ratings[['userId', 'rating']].groupby( | |
| 'userId').mean().plot(ls='', marker='.', ax=ax) | |
| ax.axhline( | |
| y=self.ratings[['userId', 'rating']].groupby( | |
| 'userId').mean().mean().values.item(), | |
| color='red', alpha=0.5 | |
| ) | |
| ax.legend(['Mean user rating', 'Mean rating across users']) | |
| return fig | |
| def most_rated_movie(self, top_k=10): | |
| data = (self.ratings.movieId.value_counts() | |
| .reset_index() | |
| .merge(right=self.movies[['movieId', 'title']], on='movieId')[['title', 'count']] | |
| .rename({'count': 'Number of Ratings', 'title': 'Movie Title'}, axis=1)) | |
| return data.head(top_k) | |
| def rating_stats(self,): | |
| avg_movie_rating = (self.ratings[['movieId', 'rating']] | |
| .groupby('movieId').agg(['mean', 'count']) | |
| .droplevel(axis=1, level=0) | |
| .reset_index(level=0)) | |
| avg_movie_rating = avg_movie_rating.merge( | |
| self.movies[['movieId', 'title']], on='movieId') | |
| avg_movie_rating = (avg_movie_rating | |
| .rename(axis=1, | |
| mapper={'mean': 'Average Rating', | |
| 'count': "Number of Rating", | |
| 'title': 'Movie Title', | |
| 'genres': 'Genres'} | |
| )) | |
| avg_movie_rating = avg_movie_rating.drop(columns='movieId') | |
| return avg_movie_rating | |
| def bayesian_avg(self, C: float, m: float): | |
| return lambda rating: (C*m + rating.sum()) / (C + rating.count()) | |
| def ratings_bayesian_avg(self,): | |
| rating_agg = (self.ratings[['rating', 'movieId']] | |
| .groupby('movieId').agg(['mean', 'count']) | |
| .droplevel(axis=1, level=0) | |
| .reset_index() | |
| ) | |
| C = rating_agg['count'].mean() | |
| m = rating_agg['mean'].mean() | |
| bay_avg_fn = self.bayesian_avg(C=C, m=m) | |
| rating_bay_avg = (self.ratings[['rating', 'movieId']] | |
| .groupby('movieId').agg([bay_avg_fn, 'count']) | |
| ).droplevel(level=0, axis=1).reset_index(level=0) | |
| rating_bay_avg = rating_bay_avg.merge( | |
| self.movies[['title', 'movieId']], on='movieId') | |
| rating_bay_avg = rating_bay_avg.rename({'<lambda_0>': 'Bayesian Average', | |
| 'count': 'Number of ratings', 'title': 'Movie Title'}, axis=1) | |
| return rating_bay_avg.drop(columns=['movieId']) | |
| def genres_count(self,): | |
| movie_genres = self.movies.copy() | |
| movie_genres.genres = self.movies.genres.str.split(pat='|') | |
| genre_counter = Counter( | |
| [genre for genres in movie_genres.genres for genre in genres]) | |
| genre_counter_df = pd.DataFrame( | |
| data=dict(genre_counter.most_common()), index=['Count']) | |
| genre_counter_df.columns.name = "Genres" | |
| genre_counter_df = genre_counter_df.T.reset_index() | |
| fig, ax = plt.subplots(nrows=1, ncols=1) | |
| sns.barplot(data=genre_counter_df, x='Count', y='Genres', ax=ax) | |
| return fig | |
| class Recommender: | |
| def __init__(self) -> None: | |
| self.ratings: pd.DataFrame = pd.read_csv( | |
| './data/ratings.csv') | |
| self.movies: pd.DataFrame = pd.read_csv( | |
| './data/movies.csv') | |
| self.M: int = self.ratings.userId.nunique() | |
| self.N: int = self.ratings.movieId.nunique() | |
| self.ratings_userid_index_map = dict( | |
| zip(self.ratings.userId.unique(), range(self.M))) | |
| self.ratings_movieid_index_map = dict( | |
| zip(self.ratings.movieId.unique(), range(self.N))) | |
| self.ratings_userid_index_invmap = dict( | |
| zip(range(self.M), self.ratings.userId.unique())) | |
| self.ratings_movieid_index_invmap = dict( | |
| zip(range(self.N), self.ratings.movieId.unique())) | |
| self.movie_id_title_map = dict( | |
| zip(self.movies.movieId, self.movies.title)) | |
| self.movie_id_title_invmap = dict( | |
| zip(self.movies.title, self.movies.movieId)) | |
| self.movie_id_index_map = dict( | |
| zip(self.movies.movieId, self.movies.index)) | |
| self.movie_id_index_invmap = dict( | |
| zip(self.movies.index, self.movies.movieId)) | |
| def nearest_neighbors(self, matrix: np.ndarray | csr_matrix): | |
| knn = NearestNeighbors( | |
| n_neighbors=10, algorithm="brute", metric="cosine") | |
| knn.fit(matrix) | |
| return knn | |
| def output_recommendation(self, search_id: int, similar_movies: np.ndarray, mapper_index_id: dict): | |
| response = [] | |
| for i in similar_movies: | |
| movie_id = mapper_index_id[i] | |
| if movie_id != search_id: | |
| response.append(self.movie_id_title_map[movie_id]) | |
| return response | |
| class Collaborative_filtering(Recommender): | |
| def __init__(self) -> None: | |
| super(Collaborative_filtering, self).__init__() | |
| pass | |
| def user_item_matrix(self,) -> csr_matrix: | |
| # build user-item matrix | |
| user_index = [self.ratings_userid_index_map[id] | |
| for id in self.ratings.userId] | |
| movie_index = [self.ratings_movieid_index_map[id] | |
| for id in self.ratings.movieId] | |
| user_item_matrix = csr_matrix( | |
| (self.ratings.rating, (user_index, movie_index)), shape=(self.M, self.N)) | |
| return user_item_matrix | |
| def matrix_factorization(self,) -> np.ndarray: | |
| svd = TruncatedSVD(n_components=20, n_iter=10, random_state=42) | |
| Q = svd.fit_transform(self.user_item_matrix.T) | |
| return Q | |
| def find_similar_movies(self, title: str, k: int = 11, use_matrix_factorization=False) -> np.ndarray: | |
| search_id: int = self.movie_id_title_invmap[title] | |
| movie_index: int = self.ratings_movieid_index_map[search_id] | |
| if use_matrix_factorization: | |
| matrix: np.ndarray = self.matrix_factorization | |
| else: | |
| matrix: csr_matrix = self.user_item_matrix.T | |
| movie_vector: np.ndarray = matrix[movie_index] | |
| if isinstance(movie_vector, np.ndarray): | |
| movie_vector = movie_vector.reshape((1, -1)) | |
| knn = self.nearest_neighbors(matrix=matrix) | |
| neighbors: np.ndarray = knn.kneighbors( | |
| movie_vector, n_neighbors=k, return_distance=False) | |
| response = self.output_recommendation( | |
| search_id=search_id, | |
| similar_movies=neighbors[0], | |
| mapper_index_id=self.ratings_movieid_index_invmap) | |
| return response | |
| class Content_based_filtering(Recommender): | |
| def __init__(self) -> None: | |
| super(Content_based_filtering, self).__init__() | |
| def user_feature_matrix(self,): | |
| movie_genres = self.movies.copy() | |
| movie_genres.genres = self.movies.genres.str.split(pat='|') | |
| genres = set( | |
| [genre_ for genres_ in movie_genres.genres for genre_ in genres_]) | |
| for genre in genres: | |
| movie_genres[genre] = movie_genres.genres.transform( | |
| lambda x: int(genre in x)) | |
| user_feature_matrix = movie_genres.drop( | |
| columns=['movieId', 'title', 'genres']) | |
| return user_feature_matrix | |
| def cosine_similarity(self): | |
| user_feature_matrix = self.user_feature_matrix | |
| similarity_matirx = cosine_similarity( | |
| user_feature_matrix, user_feature_matrix) | |
| return similarity_matirx | |
| def find_similar_movies(self, title: str, k: int = 11): | |
| search_id: int = self.movie_id_title_invmap[title] | |
| search_index: int = self.movie_id_index_map[search_id] | |
| scores: np.ndarray = self.cosine_similarity[search_index] | |
| scores: list[tuple[int, float]] = list(zip(self.movies.index, scores)) | |
| scores = sorted(scores, key=lambda x: x[1], reverse=True) | |
| neighbors: list[int] = [item[0] for item in scores[:k]] | |
| response = self.output_recommendation( | |
| search_id=search_id, | |
| similar_movies=neighbors, | |
| mapper_index_id=self.movie_id_index_invmap) | |
| return response | |
| def find_similar_movies_based_on_feedback(self, vector: list[bool], k: int = 11): | |
| feedback_vector = np.array(vector, dtype=int).reshape((1, -1)) | |
| knn = self.nearest_neighbors(matrix=self.user_feature_matrix) | |
| neighbors: np.ndarray = knn.kneighbors( | |
| feedback_vector, n_neighbors=k, return_distance=False) | |
| response = self.output_recommendation( | |
| search_id=-1, | |
| similar_movies=neighbors[0], | |
| mapper_index_id=self.movie_id_index_invmap) | |
| return response | |