Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from surprise import SVD, SVDpp, NMF, KNNBasic, Dataset, Reader | |
| from surprise.model_selection import train_test_split, GridSearchCV | |
| from collections import defaultdict | |
| import gradio as gr | |
| import pickle | |
| import os | |
| class MovieRecommenderEnsemble: | |
| def __init__(self, ratings_path, movies_path): | |
| print("Loading data...") | |
| self.ratings = pd.read_csv(ratings_path) | |
| self.movies = pd.read_csv(movies_path) | |
| # Prepare Surprise dataset | |
| reader = Reader(rating_scale=(0.5, 5.0)) | |
| self.data = Dataset.load_from_df( | |
| self.ratings[['userId', 'movieId', 'rating']], | |
| reader | |
| ) | |
| # Train-test split for evaluation | |
| self.trainset, self.testset = train_test_split(self.data, test_size=0.2) | |
| # Initialize models | |
| self.models = {} | |
| self.train_all_models() | |
| def train_all_models(self): | |
| """Train all models with optimal hyperparameters for MovieLens 1M""" | |
| print("\n" + "="*50) | |
| print("Training User-Based Collaborative Filtering...") | |
| print("="*50) | |
| # User-Based CF - Optimal for 1M dataset | |
| user_based_options = { | |
| 'name': 'cosine', | |
| 'user_based': True, | |
| 'min_support': 5 | |
| } | |
| self.models['user_based_cf'] = KNNBasic( | |
| k=50, | |
| sim_options=user_based_options | |
| ) | |
| self.models['user_based_cf'].fit(self.trainset) | |
| print("✓ User-Based CF trained") | |
| print("\n" + "="*50) | |
| print("Training Item-Based Collaborative Filtering...") | |
| print("="*50) | |
| # Item-Based CF - Optimal for 1M dataset | |
| item_based_options = { | |
| 'name': 'cosine', | |
| 'user_based': False, | |
| 'min_support': 5 | |
| } | |
| self.models['item_based_cf'] = KNNBasic( | |
| k=40, | |
| sim_options=item_based_options | |
| ) | |
| self.models['item_based_cf'].fit(self.trainset) | |
| print("✓ Item-Based CF trained") | |
| print("\n" + "="*50) | |
| print("Training SVD (Matrix Factorization)...") | |
| print("="*50) | |
| # SVD - Tuned for 1M dataset | |
| self.models['svd'] = SVD( | |
| n_factors=150, | |
| n_epochs=30, | |
| lr_all=0.007, | |
| reg_all=0.05, | |
| random_state=42, | |
| verbose=True | |
| ) | |
| self.models['svd'].fit(self.trainset) | |
| print("✓ SVD trained") | |
| print("\n" + "="*50) | |
| print("Training SVD++ (Enhanced Matrix Factorization)...") | |
| print("="*50) | |
| # SVD++ - Includes implicit feedback | |
| self.models['svdpp'] = SVDpp( | |
| n_factors=100, | |
| n_epochs=20, | |
| lr_all=0.007, | |
| reg_all=0.05, | |
| random_state=42, | |
| verbose=True | |
| ) | |
| self.models['svdpp'].fit(self.trainset) | |
| print("✓ SVD++ trained") | |
| print("\n" + "="*50) | |
| print("Training NMF (Non-negative Matrix Factorization)...") | |
| print("="*50) | |
| # NMF - Alternative factorization | |
| self.models['nmf'] = NMF( | |
| n_factors=50, | |
| n_epochs=50, | |
| random_state=42, | |
| verbose=True | |
| ) | |
| self.models['nmf'].fit(self.trainset) | |
| print("✓ NMF trained") | |
| print("\n" + "="*50) | |
| print("All models trained successfully!") | |
| print("="*50) | |
| def evaluate_models(self): | |
| """Evaluate all models on test set""" | |
| print("\n" + "="*50) | |
| print("EVALUATING ALL MODELS") | |
| print("="*50) | |
| results = {} | |
| for name, model in self.models.items(): | |
| print(f"\nEvaluating {name.upper()}...") | |
| # Get predictions | |
| predictions = model.test(self.testset) | |
| # Calculate RMSE and MAE | |
| rmse = self.calculate_rmse(predictions) | |
| mae = self.calculate_mae(predictions) | |
| # Calculate Precision@10, Recall@10, NDCG@10 | |
| precision, recall, ndcg = self.calculate_ranking_metrics(predictions, k=10) | |
| results[name] = { | |
| 'RMSE': rmse, | |
| 'MAE': mae, | |
| 'Precision@10': precision, | |
| 'Recall@10': recall, | |
| 'NDCG@10': ndcg | |
| } | |
| print(f" RMSE: {rmse:.4f}") | |
| print(f" MAE: {mae:.4f}") | |
| print(f" Precision@10: {precision:.4f}") | |
| print(f" Recall@10: {recall:.4f}") | |
| print(f" NDCG@10: {ndcg:.4f}") | |
| # Determine best model | |
| best_model = max(results.items(), key=lambda x: x[1]['Precision@10']) | |
| print(f"\n{'='*50}") | |
| print(f"BEST MODEL: {best_model[0].upper()}") | |
| print(f"Precision@10: {best_model[1]['Precision@10']:.4f}") | |
| print(f"{'='*50}\n") | |
| return results, best_model[0] | |
| def calculate_rmse(self, predictions): | |
| """Calculate Root Mean Square Error""" | |
| mse = np.mean([(pred.est - pred.r_ui)**2 for pred in predictions]) | |
| return np.sqrt(mse) | |
| def calculate_mae(self, predictions): | |
| """Calculate Mean Absolute Error""" | |
| return np.mean([abs(pred.est - pred.r_ui) for pred in predictions]) | |
| def calculate_ranking_metrics(self, predictions, k=10, threshold=4.0): | |
| """Calculate Precision@K, Recall@K, and NDCG@K""" | |
| # Organize predictions by user | |
| user_est_true = defaultdict(list) | |
| for uid, _, true_r, est, _ in predictions: | |
| user_est_true[uid].append((est, true_r)) | |
| precisions = [] | |
| recalls = [] | |
| ndcgs = [] | |
| for uid, user_ratings in user_est_true.items(): | |
| # Sort by estimated rating | |
| user_ratings.sort(key=lambda x: x[0], reverse=True) | |
| # Top k predictions | |
| top_k = user_ratings[:k] | |
| # Calculate metrics | |
| n_rel = sum(1 for (_, true_r) in user_ratings if true_r >= threshold) | |
| n_rec_k = sum(1 for (est, _) in top_k if est >= threshold) | |
| n_rel_and_rec_k = sum(1 for (est, true_r) in top_k | |
| if true_r >= threshold and est >= threshold) | |
| # Precision@K | |
| precision = n_rel_and_rec_k / k if k > 0 else 0 | |
| precisions.append(precision) | |
| # Recall@K | |
| recall = n_rel_and_rec_k / n_rel if n_rel > 0 else 0 | |
| recalls.append(recall) | |
| # NDCG@K | |
| dcg = sum((2**true_r - 1) / np.log2(i + 2) | |
| for i, (est, true_r) in enumerate(top_k) if true_r >= threshold) | |
| ideal_ratings = sorted([true_r for _, true_r in user_ratings], reverse=True)[:k] | |
| idcg = sum((2**true_r - 1) / np.log2(i + 2) | |
| for i, true_r in enumerate(ideal_ratings) if true_r >= threshold) | |
| ndcg = dcg / idcg if idcg > 0 else 0 | |
| ndcgs.append(ndcg) | |
| return np.mean(precisions), np.mean(recalls), np.mean(ndcgs) | |
| def recommend_movies(self, user_id, N, model_name='svd'): | |
| """ | |
| Recommend top N movies for a user using specified model | |
| Args: | |
| user_id: User ID | |
| N: Number of recommendations | |
| model_name: 'user_based_cf', 'item_based_cf', 'svd', 'svdpp', 'nmf', or 'ensemble' | |
| """ | |
| if model_name == 'ensemble': | |
| return self.recommend_ensemble(user_id, N) | |
| if model_name not in self.models: | |
| return f"Model '{model_name}' not found. Available: {list(self.models.keys())}" | |
| model = self.models[model_name] | |
| # Get all movies | |
| all_movies = self.movies['movieId'].unique() | |
| # Get movies user has rated | |
| rated_movies = self.ratings[self.ratings['userId'] == user_id]['movieId'].values | |
| # Get unrated movies | |
| unrated_movies = [m for m in all_movies if m not in rated_movies] | |
| # Predict ratings | |
| predictions = [] | |
| for movie_id in unrated_movies: | |
| pred = model.predict(user_id, movie_id) | |
| predictions.append((movie_id, pred.est)) | |
| # Sort by predicted rating | |
| predictions.sort(key=lambda x: x[1], reverse=True) | |
| # Get top N | |
| top_n = predictions[:N] | |
| # Format results | |
| results = [] | |
| for i, (movie_id, score) in enumerate(top_n, 1): | |
| movie_info = self.movies[self.movies['movieId'] == movie_id] | |
| if len(movie_info) > 0: | |
| title = movie_info['title'].iloc[0] | |
| genres = movie_info['genres'].iloc[0] if 'genres' in movie_info else 'N/A' | |
| results.append({ | |
| 'rank': i, | |
| 'movieId': int(movie_id), | |
| 'title': title, | |
| 'genres': genres, | |
| 'predicted_rating': round(score, 2) | |
| }) | |
| return results | |
| def recommend_ensemble(self, user_id, N): | |
| """Ensemble recommendation using weighted average of all models""" | |
| # Get all movies | |
| all_movies = self.movies['movieId'].unique() | |
| rated_movies = self.ratings[self.ratings['userId'] == user_id]['movieId'].values | |
| unrated_movies = [m for m in all_movies if m not in rated_movies] | |
| # Model weights (based on typical performance) | |
| weights = { | |
| 'user_based_cf': 0.20, | |
| 'item_based_cf': 0.20, | |
| 'svd': 0.25, | |
| 'svdpp': 0.25, | |
| 'nmf': 0.10 | |
| } | |
| # Aggregate predictions | |
| movie_scores = defaultdict(float) | |
| for movie_id in unrated_movies: | |
| weighted_sum = 0 | |
| for model_name, model in self.models.items(): | |
| pred = model.predict(user_id, movie_id).est | |
| weighted_sum += pred * weights[model_name] | |
| movie_scores[movie_id] = weighted_sum | |
| # Sort and get top N | |
| sorted_movies = sorted(movie_scores.items(), key=lambda x: x[1], reverse=True)[:N] | |
| # Format results | |
| results = [] | |
| for i, (movie_id, score) in enumerate(sorted_movies, 1): | |
| movie_info = self.movies[self.movies['movieId'] == movie_id] | |
| if len(movie_info) > 0: | |
| title = movie_info['title'].iloc[0] | |
| genres = movie_info['genres'].iloc[0] if 'genres' in movie_info else 'N/A' | |
| results.append({ | |
| 'rank': i, | |
| 'movieId': int(movie_id), | |
| 'title': title, | |
| 'genres': genres, | |
| 'predicted_rating': round(score, 2) | |
| }) | |
| return results | |
| # Initialize recommender system | |
| print("Initializing MovieLens Recommendation System...") | |
| recommender = MovieRecommenderEnsemble('ratings.csv', 'movies.csv') | |
| # Evaluate all models | |
| evaluation_results, best_model_name = recommender.evaluate_models() | |
| # Create Gradio interface | |
| def recommend_interface(user_id, n_recommendations, model_choice): | |
| try: | |
| user_id = int(user_id) | |
| n_recommendations = int(n_recommendations) | |
| # Map display names to internal names | |
| model_map = { | |
| 'User-Based CF': 'user_based_cf', | |
| 'Item-Based CF': 'item_based_cf', | |
| 'SVD': 'svd', | |
| 'SVD++': 'svdpp', | |
| 'NMF': 'nmf', | |
| 'Ensemble (All Models)': 'ensemble' | |
| } | |
| model_name = model_map.get(model_choice, 'svd') | |
| recommendations = recommender.recommend_movies(user_id, n_recommendations, model_name) | |
| if isinstance(recommendations, str): | |
| return recommendations | |
| # Format output | |
| output = f"Top {n_recommendations} recommendations for User {user_id} using {model_choice}:\n\n" | |
| for rec in recommendations: | |
| output += f"{rec['rank']}. {rec['title']}\n" | |
| output += f" Genres: {rec['genres']}\n" | |
| output += f" Predicted Rating: {rec['predicted_rating']}/5.0\n\n" | |
| return output | |
| except ValueError: | |
| return "Error: Please enter a valid user ID" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def show_evaluation(): | |
| """Display evaluation results""" | |
| output = "MODEL EVALUATION RESULTS\n" | |
| output += "="*60 + "\n\n" | |
| for model_name, metrics in evaluation_results.items(): | |
| output += f"{model_name.upper().replace('_', ' ')}\n" | |
| output += "-"*40 + "\n" | |
| for metric, value in metrics.items(): | |
| output += f" {metric}: {value:.4f}\n" | |
| output += "\n" | |
| output += "="*60 + "\n" | |
| output += f"BEST MODEL: {best_model_name.upper().replace('_', ' ')}\n" | |
| output += "="*60 | |
| return output | |
| # Create Gradio interface | |
| with gr.Blocks(title="MovieLens Recommendation System") as demo: | |
| gr.Markdown("# 🎬 MovieLens Recommendation System") | |
| gr.Markdown("### Trained on MovieLens 1M Dataset (6,040 users, 3,706 movies)") | |
| with gr.Tab("Get Recommendations"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| user_input = gr.Textbox( | |
| label="User ID", | |
| placeholder="Enter user ID (1-6040)", | |
| value="1" | |
| ) | |
| n_input = gr.Slider( | |
| minimum=1, | |
| maximum=20, | |
| value=10, | |
| step=1, | |
| label="Number of Recommendations" | |
| ) | |
| model_input = gr.Dropdown( | |
| choices=[ | |
| 'User-Based CF', | |
| 'Item-Based CF', | |
| 'SVD', | |
| 'SVD++', | |
| 'NMF', | |
| 'Ensemble (All Models)' | |
| ], | |
| value='SVD', | |
| label="Select Model" | |
| ) | |
| recommend_btn = gr.Button("Get Recommendations", variant="primary") | |
| with gr.Column(): | |
| output = gr.Textbox( | |
| label="Recommendations", | |
| lines=20, | |
| max_lines=30 | |
| ) | |
| recommend_btn.click( | |
| fn=recommend_interface, | |
| inputs=[user_input, n_input, model_input], | |
| outputs=output | |
| ) | |
| with gr.Tab("Model Evaluation"): | |
| gr.Markdown("## Performance Comparison of All Models") | |
| eval_output = gr.Textbox( | |
| label="Evaluation Metrics", | |
| lines=25, | |
| value=show_evaluation() | |
| ) | |
| with gr.Tab("About"): | |
| gr.Markdown(""" | |
| ## About This System | |
| This recommendation system implements multiple collaborative filtering approaches: | |
| ### Models Implemented: | |
| 1. **User-Based Collaborative Filtering** | |
| - Finds similar users based on rating patterns | |
| - k=50 neighbors, cosine similarity | |
| 2. **Item-Based Collaborative Filtering** | |
| - Recommends items similar to those you liked | |
| - k=40 neighbors, cosine similarity | |
| 3. **SVD (Singular Value Decomposition)** | |
| - Matrix factorization with 150 latent factors | |
| - 30 epochs, optimized for MovieLens 1M | |
| 4. **SVD++ (Enhanced SVD)** | |
| - Includes implicit feedback signals | |
| - 100 factors, 20 epochs | |
| 5. **NMF (Non-negative Matrix Factorization)** | |
| - Alternative factorization method | |
| - 50 factors, 50 epochs | |
| 6. **Ensemble** | |
| - Weighted combination of all models | |
| - Leverages strengths of each approach | |
| ### Evaluation Metrics: | |
| - **RMSE/MAE**: Prediction accuracy | |
| - **Precision@10**: Relevance of top 10 recommendations | |
| - **Recall@10**: Coverage of relevant items | |
| - **NDCG@10**: Ranking quality | |
| ### Dataset: | |
| MovieLens 1M - 1 million ratings from 6,040 users on 3,706 movies | |
| """) | |
| demo.launch() |