""" DataSynthis_ML_JobTask - Movie Recommendation Model A movie recommendation system using collaborative filtering and matrix factorization. """ import pandas as pd import numpy as np from sklearn.metrics.pairwise import cosine_similarity from sklearn.decomposition import TruncatedSVD import os import urllib.request import zipfile import pickle from typing import List, Dict, Optional, Union class MovieRecommender: """ Movie Recommendation Model using collaborative filtering and SVD. """ def __init__(self): self.ratings = None self.movies = None self.user_item_matrix = None self.item_similarity = None self.item_similarity_df = None self.svd_model = None self.pred_svd_df = None self.is_trained = False def load_data(self): """Load MovieLens 100k dataset.""" dataset_url = "http://files.grouplens.org/datasets/movielens/ml-100k.zip" dataset_path = "ml-100k" if not os.path.exists(dataset_path): if os.path.exists("ml-100k.zip"): print("Extracting existing MovieLens 100k dataset...") with zipfile.ZipFile("ml-100k.zip", "r") as zip_ref: zip_ref.extractall(".") print("Extraction complete.") else: print("Downloading MovieLens 100k dataset...") try: urllib.request.urlretrieve(dataset_url, "ml-100k.zip") with zipfile.ZipFile("ml-100k.zip", "r") as zip_ref: zip_ref.extractall(".") print("Download complete.") except Exception as e: print(f"Download failed: {e}") raise Exception("Could not download dataset") # Load ratings self.ratings = pd.read_csv( "ml-100k/u.data", sep="\t", names=["user_id", "movie_id", "rating", "timestamp"] ) # Load movies self.movies = pd.read_csv( "ml-100k/u.item", sep="|", encoding="ISO-8859-1", names=["movie_id", "title", "release_date", "video_release_date", "IMDb_URL", "unknown", "Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"] ) # Remove timestamp column self.ratings.drop("timestamp", axis=1, inplace=True) print(f"Loaded {len(self.ratings)} ratings from {len(self.ratings['user_id'].unique())} users") print(f"Loaded {len(self.movies)} movies") def train(self): """Train the recommendation models.""" if self.ratings is None: self.load_data() # Create user-item matrix self.user_item_matrix = self.ratings.pivot( index='user_id', columns='movie_id', values='rating' ) # Collaborative Filtering - Item-based similarity self.item_similarity = cosine_similarity(self.user_item_matrix.T.fillna(0)) self.item_similarity_df = pd.DataFrame( self.item_similarity, index=self.user_item_matrix.columns, columns=self.user_item_matrix.columns ) # SVD - Matrix Factorization R = self.user_item_matrix.fillna(0) self.svd_model = TruncatedSVD(n_components=20, random_state=42) U = self.svd_model.fit_transform(R) Sigma = np.diag(self.svd_model.singular_values_) Vt = self.svd_model.components_ pred_svd = np.dot(np.dot(U, Sigma), Vt) self.pred_svd_df = pd.DataFrame(pred_svd, index=R.index, columns=R.columns) self.is_trained = True print("Model training completed!") def predict_ratings_cf(self, user_id: int) -> pd.Series: """Predict ratings using collaborative filtering.""" if not self.is_trained: raise ValueError("Model must be trained first") if user_id not in self.user_item_matrix.index: raise ValueError(f"User {user_id} not found in dataset") user_ratings = self.user_item_matrix.loc[user_id] weighted_sum = self.item_similarity_df.dot(user_ratings.fillna(0)) sim_sum = np.abs(self.item_similarity_df).dot(user_ratings.notna().astype(int)) pred = weighted_sum / np.maximum(sim_sum, 1e-9) return pred def recommend_movies(self, user_id: int, n_recommendations: int = 10, method: str = "svd") -> List[Dict]: """ Get movie recommendations for a user. Args: user_id: User ID to get recommendations for n_recommendations: Number of recommendations to return method: "svd" or "cf" (collaborative filtering) Returns: List of dictionaries with movie recommendations """ if not self.is_trained: self.train() # Check if user exists if user_id not in self.user_item_matrix.index: available_users = sorted(self.user_item_matrix.index.tolist()) return [{ "error": f"User {user_id} not found", "available_users": f"Available user IDs: {available_users[:10]}... (showing first 10)" }] # Get predictions if method == "svd": preds = self.pred_svd_df.loc[user_id] else: # collaborative filtering preds = self.predict_ratings_cf(user_id) # Remove already watched movies watched = self.ratings[self.ratings.user_id == user_id].movie_id.values preds = preds.drop(watched, errors='ignore') # Get top recommendations top_movies = preds.sort_values(ascending=False).head(n_recommendations).index recommendations = self.movies[self.movies.movie_id.isin(top_movies)][["movie_id", "title"]] # Convert to list of dictionaries result = [] for _, row in recommendations.iterrows(): result.append({ "movie_id": int(row["movie_id"]), "title": row["title"], "predicted_rating": float(preds[row["movie_id"]]) }) return result def get_user_stats(self, user_id: int) -> Dict: """Get statistics for a user.""" if not self.is_trained: self.train() if user_id not in self.user_item_matrix.index: return {"error": f"User {user_id} not found"} user_ratings = self.ratings[self.ratings.user_id == user_id] return { "user_id": user_id, "total_ratings": len(user_ratings), "average_rating": float(user_ratings["rating"].mean()), "rating_distribution": user_ratings["rating"].value_counts().to_dict() } def get_available_users(self) -> List[int]: """Get list of available user IDs.""" if not self.is_trained: self.train() return sorted(self.user_item_matrix.index.tolist()) def save_model(self, path: str): """Save the trained model.""" if not self.is_trained: raise ValueError("Model must be trained first") model_data = { 'ratings': self.ratings, 'movies': self.movies, 'user_item_matrix': self.user_item_matrix, 'item_similarity_df': self.item_similarity_df, 'svd_model': self.svd_model, 'pred_svd_df': self.pred_svd_df, 'is_trained': self.is_trained } with open(path, 'wb') as f: pickle.dump(model_data, f) print(f"Model saved to {path}") def load_model(self, path: str): """Load a trained model.""" with open(path, 'rb') as f: model_data = pickle.load(f) self.ratings = model_data['ratings'] self.movies = model_data['movies'] self.user_item_matrix = model_data['user_item_matrix'] self.item_similarity_df = model_data['item_similarity_df'] self.svd_model = model_data['svd_model'] self.pred_svd_df = model_data['pred_svd_df'] self.is_trained = model_data['is_trained'] print(f"Model loaded from {path}") # Create a global model instance for inference model = MovieRecommender() def predict(user_id: int, n_recommendations: int = 10, method: str = "svd") -> List[Dict]: """ Inference function for Hugging Face model. Args: user_id: User ID to get recommendations for n_recommendations: Number of recommendations (default: 10) method: Recommendation method - "svd" or "cf" (default: "svd") Returns: List of movie recommendations """ return model.recommend_movies(user_id, n_recommendations, method)