import numpy as np import pandas as pd from sklearn.neighbors import KNeighborsClassifier from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler from sklearn.feature_extraction.text import TfidfVectorizer import joblib from sklearn.decomposition import TruncatedSVD from sklearn.metrics import classification_report from xgboost import XGBClassifier import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from nltk.tag import pos_tag import string import re import os nltk.download('punkt') nltk.download('averaged_perceptron_tagger_eng') nltk.download('wordnet') nltk.download('stopwords') nltk.download('averaged_perceptron_tagger') class CollaborativeRecommender: def __init__(self, svd_matrix, item_to_index, index_to_item): """ svd_matrix: 2D numpy array (items x latent features) item_to_index: dict mapping app_id to row index in svd_matrix index_to_item: dict mapping row index to app_id """ self.svd_matrix : TruncatedSVD = svd_matrix self.item_to_index = item_to_index self.index_to_item = index_to_item def save(self, path: str): """Save the entire model as a single file using joblib.""" joblib.dump(self, path) @staticmethod def load(path: str): """Load the entire model from a joblib file.""" return joblib.load(path) def _get_item_vector(self, app_id): idx = self.item_to_index.get(app_id) if idx is None: raise ValueError(f"app_id {app_id} not found in the model.") return self.svd_matrix[idx] def _cosine_similarity(self, vec, matrix): # Cosine similarity between vec and all rows in matrix vec_norm = np.linalg.norm(vec) matrix_norms = np.linalg.norm(matrix, axis=1) similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10) return similarity def get_similarities(self, app_ids,top_n=None): """ Input: app_ids - single app_id or list of app_ids Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending """ if isinstance(app_ids, (str, int)): app_ids = [app_ids] elif not isinstance(app_ids, (list, tuple, np.ndarray)): raise TypeError("app_ids must be a string/int or a list of such") valid_vectors = [] missing_ids = [] for app_id in app_ids: try: vec = self._get_item_vector(app_id) valid_vectors.append(vec) except ValueError: missing_ids.append(app_id) if len(valid_vectors) == 0: raise ValueError("None of the input app_ids were found in the model.") # Aggregate vectors by averaging if multiple inputs aggregated_vec = np.mean(valid_vectors, axis=0) # Compute similarity with all items similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix) # Build DataFrame of results result_df = pd.DataFrame({ 'app_id': [self.index_to_item[i] for i in range(len(similarities))], 'collaborative_similarity': similarities }) # Exclude the input app_ids themselves from results result_df = result_df[~result_df['app_id'].isin(app_ids)] # Sort descending by similarity result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True) # If any input app_ids were missing, notify user (optional) if missing_ids: print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}") if top_n: return result_df.head(top_n) else: return result_df class GameContentRecommender: def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder): self.model : KNeighborsClassifier = model self.genre_encoder : MultiLabelBinarizer = genre_encoder self.category_encoder : MultiLabelBinarizer = category_encoder self.price_range_encoder : LabelEncoder = price_range_encoder self.scaler : MinMaxScaler = scaler self.app_id_encoder : LabelEncoder = app_id_encoder def save(self, path: str): """Save the entire model as a single file using joblib.""" joblib.dump(self, path) @staticmethod def load(path: str): """Load the entire model from a joblib file.""" return joblib.load(path) def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None): genre_dict = {g: 0 for g in self.genre_encoder.classes_} categories_dict = {c: 0 for c in self.category_encoder.classes_} for genre in genres: if genre != 'Unknown' and genre in genre_dict: genre_dict[genre] = 1 for category in categories: if category != 'Unknown' and category in categories_dict: categories_dict[category] = 1 price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1)) scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0] user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values()) user_df = pd.DataFrame([user_vector]) distances, indices = self.model.kneighbors(user_df) distances = distances.flatten() indices = indices.flatten() similarity = 1 / (1 + distances) app_ids = self.app_id_encoder.inverse_transform(indices) prediction = pd.DataFrame({ 'app_id': app_ids, 'content_probability': similarity }) if top_n: prediction = prediction.head(top_n) return prediction class TextBasedRecommendation(): def __init__(self,classifier,vectorizer,app_id_encoder,history): self.classifier : XGBClassifier = classifier self.vectorizer : TfidfVectorizer = vectorizer self.app_id_encoder : LabelEncoder = app_id_encoder self.history = history def save(self, path_prefix: str): self.classifier.save_model(f"{path_prefix}_xgb.json") classifier_backup = self.classifier self.classifier = None joblib.dump(self, f"{path_prefix}_preprocessor.joblib") self.classifier = classifier_backup @staticmethod def load(path_prefix: str): obj = joblib.load(f"{path_prefix}_preprocessor.joblib") xgb = XGBClassifier() xgb.load_model(f"{path_prefix}_xgb.json") obj.classifier = xgb return obj def preprocess(self,text : str): stopword = stopwords.words('english') lemmatizer = WordNetLemmatizer() def convert_postag(postag:str): if postag.startswith('V'): return 'v' elif postag.startswith('R'): return 'r' elif postag.startswith('J'): return 'a' return 'n' def clean_space(text : str): if not isinstance(text, str): return '' cleaned = re.sub(r'\s+', ' ', text.replace('\n', ' ')).strip() return cleaned def tokenize(text : str): text = text.lower() text = clean_space(text) token = word_tokenize(text) token = [word for word in token if word not in string.punctuation and word not in stopword and word.isalpha()] return token # lemmatize def lemmatizing(token : str): postag = pos_tag(token) lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag] return lemmatized token = tokenize(text) token = lemmatizing(token) return " ".join(token) def get_accuracy(self,X_test,y_test): y_pred = self.classifier.predict(self.vectorizer.transform(X_test)) y_test = self.app_id_encoder.transform(y_test) print(classification_report(y_test,y_pred)) def predict(self,text,top_n=None): cleaned_text = self.preprocess(text) vectorized_text = self.vectorizer.transform([cleaned_text]) proba = self.classifier.predict_proba(vectorized_text)[0] class_indices = np.argsort(proba)[::-1] if top_n is not None: class_indices = class_indices[:top_n] class_labels = self.app_id_encoder.inverse_transform(class_indices) class_probs = proba[class_indices] return pd.DataFrame({ 'app_id': class_labels, 'text_probability': class_probs }) class GameRecommendationEnsemble: def __init__(self,game_content_recommeder,collaborative_recommender,text_based_recommender): self.game_content_recommeder : GameContentRecommender=game_content_recommeder self.collaborative_recommender : CollaborativeRecommender=collaborative_recommender self.text_based_recommender : TextBasedRecommendation = text_based_recommender def save(self, dir_path: str): os.makedirs(dir_path, exist_ok=True) self.game_content_recommeder.save(os.path.join(dir_path, "game_content_recommender.joblib")) self.collaborative_recommender.save(os.path.join(dir_path, "collaborative_recommender.joblib")) self.text_based_recommender.save(os.path.join(dir_path, "text_based_recommender")) @staticmethod def load(dir_path: str): game_content_recommender = GameContentRecommender.load(os.path.join(dir_path, "game_content_recommender.joblib")) collaborative_recommender = CollaborativeRecommender.load(os.path.join(dir_path, "collaborative_recommender.joblib")) text_based_recommender = TextBasedRecommendation.load(os.path.join(dir_path, "text_based_recommender")) return GameRecommendationEnsemble( game_content_recommender, collaborative_recommender, text_based_recommender ) def scale_proba(self,series): if len(series)<=1: return pd.Series([1.0] * len(series), index=series.index) scaler = MinMaxScaler() scaled = scaler.fit_transform(series.values.reshape(-1, 1)).flatten() return pd.Series(scaled, index=series.index) def predict(self, description=None, app_ids=None, price_range=None, year_release=None, average_playtime=None, game_score=None, dlc_count=None, genres=None, categories=None, top_n=None, weight_text=1.0, weight_collab=1.0, weight_content=1.0): merge_dfs = [] if description is not None: text_proba = self.text_based_recommender.predict(description) text_proba['app_id'] = text_proba['app_id'].astype(str) text_proba['text_probability'] = self.scale_proba(text_proba['text_probability']) merge_dfs.append(text_proba) else: weight_text=0 # Collaborative similarity (only if app_ids is provided) if app_ids is not None: similar_app = self.collaborative_recommender.get_similarities(app_ids) similar_app['app_id'] = similar_app['app_id'].astype(str) similar_app['collaborative_similarity'] = self.scale_proba(similar_app['collaborative_similarity']) merge_dfs.append(similar_app) else: weight_collab = 0 # No weight if not used if None in (price_range, year_release,average_playtime,game_score,dlc_count, genres, categories): weight_content=0 else: similar_content = self.game_content_recommeder.predict(price_range, year_release,average_playtime,game_score,dlc_count, genres, categories) similar_content['app_id'] = similar_content['app_id'].astype(str) similar_content['content_probability'] = self.scale_proba(similar_content['content_probability']) merge_dfs.append(similar_content) if not merge_dfs: return None from functools import reduce merged = reduce(lambda left, right: pd.merge(left, right, on='app_id', how='outer'), merge_dfs) # Fill missing values merged = merged.fillna(0) # Final score calculation def compute_aggregated_score(df, w_text, w_collab, w_content): # Normalize weights (prevent divide-by-zero if one or more weights are 0) total_weight = w_text + w_collab + w_content if total_weight == 0: raise ValueError("All weights are zero. At least one weight must be positive.") w_text /= total_weight w_collab /= total_weight w_content /= total_weight df['final_score'] = ( df.get('text_probability', 0) * w_text + df.get('collaborative_similarity', 0) * w_collab + df.get('content_probability', 0) * w_content ) return df.sort_values(by='final_score', ascending=False).reset_index(drop=True) final_df = compute_aggregated_score(merged, weight_text, weight_collab, weight_content) if top_n: return final_df.head(top_n) else: return final_df