|
|
"""
|
|
|
DataSynthis_ML_JobTask - Movie Recommendation Model
|
|
|
A movie recommendation system using collaborative filtering and matrix factorization.
|
|
|
"""
|
|
|
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
from sklearn.decomposition import TruncatedSVD
|
|
|
import os
|
|
|
import urllib.request
|
|
|
import zipfile
|
|
|
import pickle
|
|
|
from typing import List, Dict, Optional, Union
|
|
|
|
|
|
|
|
|
class MovieRecommender:
|
|
|
"""
|
|
|
Movie Recommendation Model using collaborative filtering and SVD.
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.ratings = None
|
|
|
self.movies = None
|
|
|
self.user_item_matrix = None
|
|
|
self.item_similarity = None
|
|
|
self.item_similarity_df = None
|
|
|
self.svd_model = None
|
|
|
self.pred_svd_df = None
|
|
|
self.is_trained = False
|
|
|
|
|
|
def load_data(self):
|
|
|
"""Load MovieLens 100k dataset."""
|
|
|
dataset_url = "http://files.grouplens.org/datasets/movielens/ml-100k.zip"
|
|
|
dataset_path = "ml-100k"
|
|
|
|
|
|
if not os.path.exists(dataset_path):
|
|
|
if os.path.exists("ml-100k.zip"):
|
|
|
print("Extracting existing MovieLens 100k dataset...")
|
|
|
with zipfile.ZipFile("ml-100k.zip", "r") as zip_ref:
|
|
|
zip_ref.extractall(".")
|
|
|
print("Extraction complete.")
|
|
|
else:
|
|
|
print("Downloading MovieLens 100k dataset...")
|
|
|
try:
|
|
|
urllib.request.urlretrieve(dataset_url, "ml-100k.zip")
|
|
|
with zipfile.ZipFile("ml-100k.zip", "r") as zip_ref:
|
|
|
zip_ref.extractall(".")
|
|
|
print("Download complete.")
|
|
|
except Exception as e:
|
|
|
print(f"Download failed: {e}")
|
|
|
raise Exception("Could not download dataset")
|
|
|
|
|
|
|
|
|
self.ratings = pd.read_csv(
|
|
|
"ml-100k/u.data",
|
|
|
sep="\t",
|
|
|
names=["user_id", "movie_id", "rating", "timestamp"]
|
|
|
)
|
|
|
|
|
|
|
|
|
self.movies = pd.read_csv(
|
|
|
"ml-100k/u.item",
|
|
|
sep="|",
|
|
|
encoding="ISO-8859-1",
|
|
|
names=["movie_id", "title", "release_date", "video_release_date", "IMDb_URL",
|
|
|
"unknown", "Action", "Adventure", "Animation", "Children", "Comedy",
|
|
|
"Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror",
|
|
|
"Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"]
|
|
|
)
|
|
|
|
|
|
|
|
|
self.ratings.drop("timestamp", axis=1, inplace=True)
|
|
|
|
|
|
print(f"Loaded {len(self.ratings)} ratings from {len(self.ratings['user_id'].unique())} users")
|
|
|
print(f"Loaded {len(self.movies)} movies")
|
|
|
|
|
|
def train(self):
|
|
|
"""Train the recommendation models."""
|
|
|
if self.ratings is None:
|
|
|
self.load_data()
|
|
|
|
|
|
|
|
|
self.user_item_matrix = self.ratings.pivot(
|
|
|
index='user_id', columns='movie_id', values='rating'
|
|
|
)
|
|
|
|
|
|
|
|
|
self.item_similarity = cosine_similarity(self.user_item_matrix.T.fillna(0))
|
|
|
self.item_similarity_df = pd.DataFrame(
|
|
|
self.item_similarity,
|
|
|
index=self.user_item_matrix.columns,
|
|
|
columns=self.user_item_matrix.columns
|
|
|
)
|
|
|
|
|
|
|
|
|
R = self.user_item_matrix.fillna(0)
|
|
|
self.svd_model = TruncatedSVD(n_components=20, random_state=42)
|
|
|
U = self.svd_model.fit_transform(R)
|
|
|
Sigma = np.diag(self.svd_model.singular_values_)
|
|
|
Vt = self.svd_model.components_
|
|
|
pred_svd = np.dot(np.dot(U, Sigma), Vt)
|
|
|
self.pred_svd_df = pd.DataFrame(pred_svd, index=R.index, columns=R.columns)
|
|
|
|
|
|
self.is_trained = True
|
|
|
print("Model training completed!")
|
|
|
|
|
|
def predict_ratings_cf(self, user_id: int) -> pd.Series:
|
|
|
"""Predict ratings using collaborative filtering."""
|
|
|
if not self.is_trained:
|
|
|
raise ValueError("Model must be trained first")
|
|
|
|
|
|
if user_id not in self.user_item_matrix.index:
|
|
|
raise ValueError(f"User {user_id} not found in dataset")
|
|
|
|
|
|
user_ratings = self.user_item_matrix.loc[user_id]
|
|
|
weighted_sum = self.item_similarity_df.dot(user_ratings.fillna(0))
|
|
|
sim_sum = np.abs(self.item_similarity_df).dot(user_ratings.notna().astype(int))
|
|
|
pred = weighted_sum / np.maximum(sim_sum, 1e-9)
|
|
|
return pred
|
|
|
|
|
|
def recommend_movies(self, user_id: int, n_recommendations: int = 10,
|
|
|
method: str = "svd") -> List[Dict]:
|
|
|
"""
|
|
|
Get movie recommendations for a user.
|
|
|
|
|
|
Args:
|
|
|
user_id: User ID to get recommendations for
|
|
|
n_recommendations: Number of recommendations to return
|
|
|
method: "svd" or "cf" (collaborative filtering)
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries with movie recommendations
|
|
|
"""
|
|
|
if not self.is_trained:
|
|
|
self.train()
|
|
|
|
|
|
|
|
|
if user_id not in self.user_item_matrix.index:
|
|
|
available_users = sorted(self.user_item_matrix.index.tolist())
|
|
|
return [{
|
|
|
"error": f"User {user_id} not found",
|
|
|
"available_users": f"Available user IDs: {available_users[:10]}... (showing first 10)"
|
|
|
}]
|
|
|
|
|
|
|
|
|
if method == "svd":
|
|
|
preds = self.pred_svd_df.loc[user_id]
|
|
|
else:
|
|
|
preds = self.predict_ratings_cf(user_id)
|
|
|
|
|
|
|
|
|
watched = self.ratings[self.ratings.user_id == user_id].movie_id.values
|
|
|
preds = preds.drop(watched, errors='ignore')
|
|
|
|
|
|
|
|
|
top_movies = preds.sort_values(ascending=False).head(n_recommendations).index
|
|
|
recommendations = self.movies[self.movies.movie_id.isin(top_movies)][["movie_id", "title"]]
|
|
|
|
|
|
|
|
|
result = []
|
|
|
for _, row in recommendations.iterrows():
|
|
|
result.append({
|
|
|
"movie_id": int(row["movie_id"]),
|
|
|
"title": row["title"],
|
|
|
"predicted_rating": float(preds[row["movie_id"]])
|
|
|
})
|
|
|
|
|
|
return result
|
|
|
|
|
|
def get_user_stats(self, user_id: int) -> Dict:
|
|
|
"""Get statistics for a user."""
|
|
|
if not self.is_trained:
|
|
|
self.train()
|
|
|
|
|
|
if user_id not in self.user_item_matrix.index:
|
|
|
return {"error": f"User {user_id} not found"}
|
|
|
|
|
|
user_ratings = self.ratings[self.ratings.user_id == user_id]
|
|
|
|
|
|
return {
|
|
|
"user_id": user_id,
|
|
|
"total_ratings": len(user_ratings),
|
|
|
"average_rating": float(user_ratings["rating"].mean()),
|
|
|
"rating_distribution": user_ratings["rating"].value_counts().to_dict()
|
|
|
}
|
|
|
|
|
|
def get_available_users(self) -> List[int]:
|
|
|
"""Get list of available user IDs."""
|
|
|
if not self.is_trained:
|
|
|
self.train()
|
|
|
return sorted(self.user_item_matrix.index.tolist())
|
|
|
|
|
|
def save_model(self, path: str):
|
|
|
"""Save the trained model."""
|
|
|
if not self.is_trained:
|
|
|
raise ValueError("Model must be trained first")
|
|
|
|
|
|
model_data = {
|
|
|
'ratings': self.ratings,
|
|
|
'movies': self.movies,
|
|
|
'user_item_matrix': self.user_item_matrix,
|
|
|
'item_similarity_df': self.item_similarity_df,
|
|
|
'svd_model': self.svd_model,
|
|
|
'pred_svd_df': self.pred_svd_df,
|
|
|
'is_trained': self.is_trained
|
|
|
}
|
|
|
|
|
|
with open(path, 'wb') as f:
|
|
|
pickle.dump(model_data, f)
|
|
|
|
|
|
print(f"Model saved to {path}")
|
|
|
|
|
|
def load_model(self, path: str):
|
|
|
"""Load a trained model."""
|
|
|
with open(path, 'rb') as f:
|
|
|
model_data = pickle.load(f)
|
|
|
|
|
|
self.ratings = model_data['ratings']
|
|
|
self.movies = model_data['movies']
|
|
|
self.user_item_matrix = model_data['user_item_matrix']
|
|
|
self.item_similarity_df = model_data['item_similarity_df']
|
|
|
self.svd_model = model_data['svd_model']
|
|
|
self.pred_svd_df = model_data['pred_svd_df']
|
|
|
self.is_trained = model_data['is_trained']
|
|
|
|
|
|
print(f"Model loaded from {path}")
|
|
|
|
|
|
|
|
|
|
|
|
model = MovieRecommender()
|
|
|
|
|
|
def predict(user_id: int, n_recommendations: int = 10, method: str = "svd") -> List[Dict]:
|
|
|
"""
|
|
|
Inference function for Hugging Face model.
|
|
|
|
|
|
Args:
|
|
|
user_id: User ID to get recommendations for
|
|
|
n_recommendations: Number of recommendations (default: 10)
|
|
|
method: Recommendation method - "svd" or "cf" (default: "svd")
|
|
|
|
|
|
Returns:
|
|
|
List of movie recommendations
|
|
|
"""
|
|
|
return model.recommend_movies(user_id, n_recommendations, method) |