LvMAC's picture
Update app.py
6a8179a verified
raw
history blame
16.5 kB
import pandas as pd
import numpy as np
from surprise import SVD, SVDpp, NMF, KNNBasic, Dataset, Reader
from surprise.model_selection import train_test_split, GridSearchCV
from collections import defaultdict
import gradio as gr
import pickle
import os
class MovieRecommenderEnsemble:
def __init__(self, ratings_path, movies_path):
print("Loading data...")
self.ratings = pd.read_csv(ratings_path)
self.movies = pd.read_csv(movies_path)
# Prepare Surprise dataset
reader = Reader(rating_scale=(0.5, 5.0))
self.data = Dataset.load_from_df(
self.ratings[['userId', 'movieId', 'rating']],
reader
)
# Train-test split for evaluation
self.trainset, self.testset = train_test_split(self.data, test_size=0.2)
# Initialize models
self.models = {}
self.train_all_models()
def train_all_models(self):
"""Train all models with optimal hyperparameters for MovieLens 1M"""
print("\n" + "="*50)
print("Training User-Based Collaborative Filtering...")
print("="*50)
# User-Based CF - Optimal for 1M dataset
user_based_options = {
'name': 'cosine',
'user_based': True,
'min_support': 5
}
self.models['user_based_cf'] = KNNBasic(
k=50,
sim_options=user_based_options
)
self.models['user_based_cf'].fit(self.trainset)
print("✓ User-Based CF trained")
print("\n" + "="*50)
print("Training Item-Based Collaborative Filtering...")
print("="*50)
# Item-Based CF - Optimal for 1M dataset
item_based_options = {
'name': 'cosine',
'user_based': False,
'min_support': 5
}
self.models['item_based_cf'] = KNNBasic(
k=40,
sim_options=item_based_options
)
self.models['item_based_cf'].fit(self.trainset)
print("✓ Item-Based CF trained")
print("\n" + "="*50)
print("Training SVD (Matrix Factorization)...")
print("="*50)
# SVD - Tuned for 1M dataset
self.models['svd'] = SVD(
n_factors=150,
n_epochs=30,
lr_all=0.007,
reg_all=0.05,
random_state=42,
verbose=True
)
self.models['svd'].fit(self.trainset)
print("✓ SVD trained")
print("\n" + "="*50)
print("Training SVD++ (Enhanced Matrix Factorization)...")
print("="*50)
# SVD++ - Includes implicit feedback
self.models['svdpp'] = SVDpp(
n_factors=100,
n_epochs=20,
lr_all=0.007,
reg_all=0.05,
random_state=42,
verbose=True
)
self.models['svdpp'].fit(self.trainset)
print("✓ SVD++ trained")
print("\n" + "="*50)
print("Training NMF (Non-negative Matrix Factorization)...")
print("="*50)
# NMF - Alternative factorization
self.models['nmf'] = NMF(
n_factors=50,
n_epochs=50,
random_state=42,
verbose=True
)
self.models['nmf'].fit(self.trainset)
print("✓ NMF trained")
print("\n" + "="*50)
print("All models trained successfully!")
print("="*50)
def evaluate_models(self):
"""Evaluate all models on test set"""
print("\n" + "="*50)
print("EVALUATING ALL MODELS")
print("="*50)
results = {}
for name, model in self.models.items():
print(f"\nEvaluating {name.upper()}...")
# Get predictions
predictions = model.test(self.testset)
# Calculate RMSE and MAE
rmse = self.calculate_rmse(predictions)
mae = self.calculate_mae(predictions)
# Calculate Precision@10, Recall@10, NDCG@10
precision, recall, ndcg = self.calculate_ranking_metrics(predictions, k=10)
results[name] = {
'RMSE': rmse,
'MAE': mae,
'Precision@10': precision,
'Recall@10': recall,
'NDCG@10': ndcg
}
print(f" RMSE: {rmse:.4f}")
print(f" MAE: {mae:.4f}")
print(f" Precision@10: {precision:.4f}")
print(f" Recall@10: {recall:.4f}")
print(f" NDCG@10: {ndcg:.4f}")
# Determine best model
best_model = max(results.items(), key=lambda x: x[1]['Precision@10'])
print(f"\n{'='*50}")
print(f"BEST MODEL: {best_model[0].upper()}")
print(f"Precision@10: {best_model[1]['Precision@10']:.4f}")
print(f"{'='*50}\n")
return results, best_model[0]
def calculate_rmse(self, predictions):
"""Calculate Root Mean Square Error"""
mse = np.mean([(pred.est - pred.r_ui)**2 for pred in predictions])
return np.sqrt(mse)
def calculate_mae(self, predictions):
"""Calculate Mean Absolute Error"""
return np.mean([abs(pred.est - pred.r_ui) for pred in predictions])
def calculate_ranking_metrics(self, predictions, k=10, threshold=4.0):
"""Calculate Precision@K, Recall@K, and NDCG@K"""
# Organize predictions by user
user_est_true = defaultdict(list)
for uid, _, true_r, est, _ in predictions:
user_est_true[uid].append((est, true_r))
precisions = []
recalls = []
ndcgs = []
for uid, user_ratings in user_est_true.items():
# Sort by estimated rating
user_ratings.sort(key=lambda x: x[0], reverse=True)
# Top k predictions
top_k = user_ratings[:k]
# Calculate metrics
n_rel = sum(1 for (_, true_r) in user_ratings if true_r >= threshold)
n_rec_k = sum(1 for (est, _) in top_k if est >= threshold)
n_rel_and_rec_k = sum(1 for (est, true_r) in top_k
if true_r >= threshold and est >= threshold)
# Precision@K
precision = n_rel_and_rec_k / k if k > 0 else 0
precisions.append(precision)
# Recall@K
recall = n_rel_and_rec_k / n_rel if n_rel > 0 else 0
recalls.append(recall)
# NDCG@K
dcg = sum((2**true_r - 1) / np.log2(i + 2)
for i, (est, true_r) in enumerate(top_k) if true_r >= threshold)
ideal_ratings = sorted([true_r for _, true_r in user_ratings], reverse=True)[:k]
idcg = sum((2**true_r - 1) / np.log2(i + 2)
for i, true_r in enumerate(ideal_ratings) if true_r >= threshold)
ndcg = dcg / idcg if idcg > 0 else 0
ndcgs.append(ndcg)
return np.mean(precisions), np.mean(recalls), np.mean(ndcgs)
def recommend_movies(self, user_id, N, model_name='svd'):
"""
Recommend top N movies for a user using specified model
Args:
user_id: User ID
N: Number of recommendations
model_name: 'user_based_cf', 'item_based_cf', 'svd', 'svdpp', 'nmf', or 'ensemble'
"""
if model_name == 'ensemble':
return self.recommend_ensemble(user_id, N)
if model_name not in self.models:
return f"Model '{model_name}' not found. Available: {list(self.models.keys())}"
model = self.models[model_name]
# Get all movies
all_movies = self.movies['movieId'].unique()
# Get movies user has rated
rated_movies = self.ratings[self.ratings['userId'] == user_id]['movieId'].values
# Get unrated movies
unrated_movies = [m for m in all_movies if m not in rated_movies]
# Predict ratings
predictions = []
for movie_id in unrated_movies:
pred = model.predict(user_id, movie_id)
predictions.append((movie_id, pred.est))
# Sort by predicted rating
predictions.sort(key=lambda x: x[1], reverse=True)
# Get top N
top_n = predictions[:N]
# Format results
results = []
for i, (movie_id, score) in enumerate(top_n, 1):
movie_info = self.movies[self.movies['movieId'] == movie_id]
if len(movie_info) > 0:
title = movie_info['title'].iloc[0]
genres = movie_info['genres'].iloc[0] if 'genres' in movie_info else 'N/A'
results.append({
'rank': i,
'movieId': int(movie_id),
'title': title,
'genres': genres,
'predicted_rating': round(score, 2)
})
return results
def recommend_ensemble(self, user_id, N):
"""Ensemble recommendation using weighted average of all models"""
# Get all movies
all_movies = self.movies['movieId'].unique()
rated_movies = self.ratings[self.ratings['userId'] == user_id]['movieId'].values
unrated_movies = [m for m in all_movies if m not in rated_movies]
# Model weights (based on typical performance)
weights = {
'user_based_cf': 0.20,
'item_based_cf': 0.20,
'svd': 0.25,
'svdpp': 0.25,
'nmf': 0.10
}
# Aggregate predictions
movie_scores = defaultdict(float)
for movie_id in unrated_movies:
weighted_sum = 0
for model_name, model in self.models.items():
pred = model.predict(user_id, movie_id).est
weighted_sum += pred * weights[model_name]
movie_scores[movie_id] = weighted_sum
# Sort and get top N
sorted_movies = sorted(movie_scores.items(), key=lambda x: x[1], reverse=True)[:N]
# Format results
results = []
for i, (movie_id, score) in enumerate(sorted_movies, 1):
movie_info = self.movies[self.movies['movieId'] == movie_id]
if len(movie_info) > 0:
title = movie_info['title'].iloc[0]
genres = movie_info['genres'].iloc[0] if 'genres' in movie_info else 'N/A'
results.append({
'rank': i,
'movieId': int(movie_id),
'title': title,
'genres': genres,
'predicted_rating': round(score, 2)
})
return results
# Initialize recommender system
print("Initializing MovieLens Recommendation System...")
recommender = MovieRecommenderEnsemble('ratings.csv', 'movies.csv')
# Evaluate all models
evaluation_results, best_model_name = recommender.evaluate_models()
# Create Gradio interface
def recommend_interface(user_id, n_recommendations, model_choice):
try:
user_id = int(user_id)
n_recommendations = int(n_recommendations)
# Map display names to internal names
model_map = {
'User-Based CF': 'user_based_cf',
'Item-Based CF': 'item_based_cf',
'SVD': 'svd',
'SVD++': 'svdpp',
'NMF': 'nmf',
'Ensemble (All Models)': 'ensemble'
}
model_name = model_map.get(model_choice, 'svd')
recommendations = recommender.recommend_movies(user_id, n_recommendations, model_name)
if isinstance(recommendations, str):
return recommendations
# Format output
output = f"Top {n_recommendations} recommendations for User {user_id} using {model_choice}:\n\n"
for rec in recommendations:
output += f"{rec['rank']}. {rec['title']}\n"
output += f" Genres: {rec['genres']}\n"
output += f" Predicted Rating: {rec['predicted_rating']}/5.0\n\n"
return output
except ValueError:
return "Error: Please enter a valid user ID"
except Exception as e:
return f"Error: {str(e)}"
def show_evaluation():
"""Display evaluation results"""
output = "MODEL EVALUATION RESULTS\n"
output += "="*60 + "\n\n"
for model_name, metrics in evaluation_results.items():
output += f"{model_name.upper().replace('_', ' ')}\n"
output += "-"*40 + "\n"
for metric, value in metrics.items():
output += f" {metric}: {value:.4f}\n"
output += "\n"
output += "="*60 + "\n"
output += f"BEST MODEL: {best_model_name.upper().replace('_', ' ')}\n"
output += "="*60
return output
# Create Gradio interface
with gr.Blocks(title="MovieLens Recommendation System") as demo:
gr.Markdown("# 🎬 MovieLens Recommendation System")
gr.Markdown("### Trained on MovieLens 1M Dataset (6,040 users, 3,706 movies)")
with gr.Tab("Get Recommendations"):
with gr.Row():
with gr.Column():
user_input = gr.Textbox(
label="User ID",
placeholder="Enter user ID (1-6040)",
value="1"
)
n_input = gr.Slider(
minimum=1,
maximum=20,
value=10,
step=1,
label="Number of Recommendations"
)
model_input = gr.Dropdown(
choices=[
'User-Based CF',
'Item-Based CF',
'SVD',
'SVD++',
'NMF',
'Ensemble (All Models)'
],
value='SVD',
label="Select Model"
)
recommend_btn = gr.Button("Get Recommendations", variant="primary")
with gr.Column():
output = gr.Textbox(
label="Recommendations",
lines=20,
max_lines=30
)
recommend_btn.click(
fn=recommend_interface,
inputs=[user_input, n_input, model_input],
outputs=output
)
with gr.Tab("Model Evaluation"):
gr.Markdown("## Performance Comparison of All Models")
eval_output = gr.Textbox(
label="Evaluation Metrics",
lines=25,
value=show_evaluation()
)
with gr.Tab("About"):
gr.Markdown("""
## About This System
This recommendation system implements multiple collaborative filtering approaches:
### Models Implemented:
1. **User-Based Collaborative Filtering**
- Finds similar users based on rating patterns
- k=50 neighbors, cosine similarity
2. **Item-Based Collaborative Filtering**
- Recommends items similar to those you liked
- k=40 neighbors, cosine similarity
3. **SVD (Singular Value Decomposition)**
- Matrix factorization with 150 latent factors
- 30 epochs, optimized for MovieLens 1M
4. **SVD++ (Enhanced SVD)**
- Includes implicit feedback signals
- 100 factors, 20 epochs
5. **NMF (Non-negative Matrix Factorization)**
- Alternative factorization method
- 50 factors, 50 epochs
6. **Ensemble**
- Weighted combination of all models
- Leverages strengths of each approach
### Evaluation Metrics:
- **RMSE/MAE**: Prediction accuracy
- **Precision@10**: Relevance of top 10 recommendations
- **Recall@10**: Coverage of relevant items
- **NDCG@10**: Ranking quality
### Dataset:
MovieLens 1M - 1 million ratings from 6,040 users on 3,706 movies
""")
demo.launch()