Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from sklearn.neighbors import KNeighborsClassifier | |
| from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import joblib | |
| from sklearn.decomposition import TruncatedSVD | |
| from sklearn.metrics import classification_report | |
| from xgboost import XGBClassifier | |
| import nltk | |
| from nltk.tokenize import word_tokenize | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| from nltk.tag import pos_tag | |
| import string | |
| import re | |
| import os | |
| nltk.download('punkt') | |
| nltk.download('averaged_perceptron_tagger_eng') | |
| nltk.download('wordnet') | |
| nltk.download('stopwords') | |
| nltk.download('averaged_perceptron_tagger') | |
| class CollaborativeRecommender: | |
| def __init__(self, svd_matrix, item_to_index, index_to_item): | |
| """ | |
| svd_matrix: 2D numpy array (items x latent features) | |
| item_to_index: dict mapping app_id to row index in svd_matrix | |
| index_to_item: dict mapping row index to app_id | |
| """ | |
| self.svd_matrix : TruncatedSVD = svd_matrix | |
| self.item_to_index = item_to_index | |
| self.index_to_item = index_to_item | |
| def save(self, path: str): | |
| """Save the entire model as a single file using joblib.""" | |
| joblib.dump(self, path) | |
| def load(path: str): | |
| """Load the entire model from a joblib file.""" | |
| return joblib.load(path) | |
| def _get_item_vector(self, app_id): | |
| idx = self.item_to_index.get(app_id) | |
| if idx is None: | |
| raise ValueError(f"app_id {app_id} not found in the model.") | |
| return self.svd_matrix[idx] | |
| def _cosine_similarity(self, vec, matrix): | |
| # Cosine similarity between vec and all rows in matrix | |
| vec_norm = np.linalg.norm(vec) | |
| matrix_norms = np.linalg.norm(matrix, axis=1) | |
| similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10) | |
| return similarity | |
| def get_similarities(self, app_ids,top_n=None): | |
| """ | |
| Input: app_ids - single app_id or list of app_ids | |
| Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending | |
| """ | |
| if isinstance(app_ids, (str, int)): | |
| app_ids = [app_ids] | |
| elif not isinstance(app_ids, (list, tuple, np.ndarray)): | |
| raise TypeError("app_ids must be a string/int or a list of such") | |
| valid_vectors = [] | |
| missing_ids = [] | |
| for app_id in app_ids: | |
| try: | |
| vec = self._get_item_vector(app_id) | |
| valid_vectors.append(vec) | |
| except ValueError: | |
| missing_ids.append(app_id) | |
| if len(valid_vectors) == 0: | |
| raise ValueError("None of the input app_ids were found in the model.") | |
| # Aggregate vectors by averaging if multiple inputs | |
| aggregated_vec = np.mean(valid_vectors, axis=0) | |
| # Compute similarity with all items | |
| similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix) | |
| # Build DataFrame of results | |
| result_df = pd.DataFrame({ | |
| 'app_id': [self.index_to_item[i] for i in range(len(similarities))], | |
| 'collaborative_similarity': similarities | |
| }) | |
| # Exclude the input app_ids themselves from results | |
| result_df = result_df[~result_df['app_id'].isin(app_ids)] | |
| # Sort descending by similarity | |
| result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True) | |
| # If any input app_ids were missing, notify user (optional) | |
| if missing_ids: | |
| print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}") | |
| if top_n: | |
| return result_df.head(top_n) | |
| else: | |
| return result_df | |
| class GameContentRecommender: | |
| def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder): | |
| self.model : KNeighborsClassifier = model | |
| self.genre_encoder : MultiLabelBinarizer = genre_encoder | |
| self.category_encoder : MultiLabelBinarizer = category_encoder | |
| self.price_range_encoder : LabelEncoder = price_range_encoder | |
| self.scaler : MinMaxScaler = scaler | |
| self.app_id_encoder : LabelEncoder = app_id_encoder | |
| def save(self, path: str): | |
| """Save the entire model as a single file using joblib.""" | |
| joblib.dump(self, path) | |
| def load(path: str): | |
| """Load the entire model from a joblib file.""" | |
| return joblib.load(path) | |
| def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None): | |
| genre_dict = {g: 0 for g in self.genre_encoder.classes_} | |
| categories_dict = {c: 0 for c in self.category_encoder.classes_} | |
| for genre in genres: | |
| if genre != 'Unknown' and genre in genre_dict: | |
| genre_dict[genre] = 1 | |
| for category in categories: | |
| if category != 'Unknown' and category in categories_dict: | |
| categories_dict[category] = 1 | |
| price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1)) | |
| scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0] | |
| user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values()) | |
| user_df = pd.DataFrame([user_vector]) | |
| distances, indices = self.model.kneighbors(user_df) | |
| distances = distances.flatten() | |
| indices = indices.flatten() | |
| similarity = 1 / (1 + distances) | |
| app_ids = self.app_id_encoder.inverse_transform(indices) | |
| prediction = pd.DataFrame({ | |
| 'app_id': app_ids, | |
| 'content_probability': similarity | |
| }) | |
| if top_n: | |
| prediction = prediction.head(top_n) | |
| return prediction | |
| class TextBasedRecommendation(): | |
| def __init__(self,classifier,vectorizer,app_id_encoder,history): | |
| self.classifier : XGBClassifier = classifier | |
| self.vectorizer : TfidfVectorizer = vectorizer | |
| self.app_id_encoder : LabelEncoder = app_id_encoder | |
| self.history = history | |
| def save(self, path_prefix: str): | |
| self.classifier.save_model(f"{path_prefix}_xgb.json") | |
| classifier_backup = self.classifier | |
| self.classifier = None | |
| joblib.dump(self, f"{path_prefix}_preprocessor.joblib") | |
| self.classifier = classifier_backup | |
| def load(path_prefix: str): | |
| obj = joblib.load(f"{path_prefix}_preprocessor.joblib") | |
| xgb = XGBClassifier() | |
| xgb.load_model(f"{path_prefix}_xgb.json") | |
| obj.classifier = xgb | |
| return obj | |
| def preprocess(self,text : str): | |
| stopword = stopwords.words('english') | |
| lemmatizer = WordNetLemmatizer() | |
| def convert_postag(postag:str): | |
| if postag.startswith('V'): | |
| return 'v' | |
| elif postag.startswith('R'): | |
| return 'r' | |
| elif postag.startswith('J'): | |
| return 'a' | |
| return 'n' | |
| def clean_space(text : str): | |
| if not isinstance(text, str): | |
| return '' | |
| cleaned = re.sub(r'\s+', ' ', text.replace('\n', ' ')).strip() | |
| return cleaned | |
| def tokenize(text : str): | |
| text = text.lower() | |
| text = clean_space(text) | |
| token = word_tokenize(text) | |
| token = [word for word in token if word not in | |
| string.punctuation and word not in stopword and word.isalpha()] | |
| return token | |
| # lemmatize | |
| def lemmatizing(token : str): | |
| postag = pos_tag(token) | |
| lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag] | |
| return lemmatized | |
| token = tokenize(text) | |
| token = lemmatizing(token) | |
| return " ".join(token) | |
| def get_accuracy(self,X_test,y_test): | |
| y_pred = self.classifier.predict(self.vectorizer.transform(X_test)) | |
| y_test = self.app_id_encoder.transform(y_test) | |
| print(classification_report(y_test,y_pred)) | |
| def predict(self,text,top_n=None): | |
| cleaned_text = self.preprocess(text) | |
| vectorized_text = self.vectorizer.transform([cleaned_text]) | |
| proba = self.classifier.predict_proba(vectorized_text)[0] | |
| class_indices = np.argsort(proba)[::-1] | |
| if top_n is not None: | |
| class_indices = class_indices[:top_n] | |
| class_labels = self.app_id_encoder.inverse_transform(class_indices) | |
| class_probs = proba[class_indices] | |
| return pd.DataFrame({ | |
| 'app_id': class_labels, | |
| 'text_probability': class_probs | |
| }) | |
| class GameRecommendationEnsemble: | |
| def __init__(self,game_content_recommeder,collaborative_recommender,text_based_recommender): | |
| self.game_content_recommeder : GameContentRecommender=game_content_recommeder | |
| self.collaborative_recommender : CollaborativeRecommender=collaborative_recommender | |
| self.text_based_recommender : TextBasedRecommendation = text_based_recommender | |
| def save(self, dir_path: str): | |
| os.makedirs(dir_path, exist_ok=True) | |
| self.game_content_recommeder.save(os.path.join(dir_path, "game_content_recommender.joblib")) | |
| self.collaborative_recommender.save(os.path.join(dir_path, "collaborative_recommender.joblib")) | |
| self.text_based_recommender.save(os.path.join(dir_path, "text_based_recommender")) | |
| def load(dir_path: str): | |
| game_content_recommender = GameContentRecommender.load(os.path.join(dir_path, "game_content_recommender.joblib")) | |
| collaborative_recommender = CollaborativeRecommender.load(os.path.join(dir_path, "collaborative_recommender.joblib")) | |
| text_based_recommender = TextBasedRecommendation.load(os.path.join(dir_path, "text_based_recommender")) | |
| return GameRecommendationEnsemble( | |
| game_content_recommender, | |
| collaborative_recommender, | |
| text_based_recommender | |
| ) | |
| def scale_proba(self,series): | |
| if len(series)<=1: | |
| return pd.Series([1.0] * len(series), index=series.index) | |
| scaler = MinMaxScaler() | |
| scaled = scaler.fit_transform(series.values.reshape(-1, 1)).flatten() | |
| return pd.Series(scaled, index=series.index) | |
| def predict(self, description=None, app_ids=None, price_range=None, year_release=None, | |
| average_playtime=None, game_score=None, dlc_count=None, | |
| genres=None, categories=None, top_n=None, | |
| weight_text=1.0, weight_collab=1.0, weight_content=1.0): | |
| merge_dfs = [] | |
| if description is not None: | |
| text_proba = self.text_based_recommender.predict(description) | |
| text_proba['app_id'] = text_proba['app_id'].astype(str) | |
| text_proba['text_probability'] = self.scale_proba(text_proba['text_probability']) | |
| merge_dfs.append(text_proba) | |
| else: | |
| weight_text=0 | |
| # Collaborative similarity (only if app_ids is provided) | |
| if app_ids is not None: | |
| similar_app = self.collaborative_recommender.get_similarities(app_ids) | |
| similar_app['app_id'] = similar_app['app_id'].astype(str) | |
| similar_app['collaborative_similarity'] = self.scale_proba(similar_app['collaborative_similarity']) | |
| merge_dfs.append(similar_app) | |
| else: | |
| weight_collab = 0 # No weight if not used | |
| if None in (price_range, year_release,average_playtime,game_score,dlc_count, genres, categories): | |
| weight_content=0 | |
| else: | |
| similar_content = self.game_content_recommeder.predict(price_range, year_release,average_playtime,game_score,dlc_count, genres, categories) | |
| similar_content['app_id'] = similar_content['app_id'].astype(str) | |
| similar_content['content_probability'] = self.scale_proba(similar_content['content_probability']) | |
| merge_dfs.append(similar_content) | |
| if not merge_dfs: | |
| return None | |
| from functools import reduce | |
| merged = reduce(lambda left, right: pd.merge(left, right, on='app_id', how='outer'), merge_dfs) | |
| # Fill missing values | |
| merged = merged.fillna(0) | |
| # Final score calculation | |
| def compute_aggregated_score(df, w_text, w_collab, w_content): | |
| # Normalize weights (prevent divide-by-zero if one or more weights are 0) | |
| total_weight = w_text + w_collab + w_content | |
| if total_weight == 0: | |
| raise ValueError("All weights are zero. At least one weight must be positive.") | |
| w_text /= total_weight | |
| w_collab /= total_weight | |
| w_content /= total_weight | |
| df['final_score'] = ( | |
| df.get('text_probability', 0) * w_text + | |
| df.get('collaborative_similarity', 0) * w_collab + | |
| df.get('content_probability', 0) * w_content | |
| ) | |
| return df.sort_values(by='final_score', ascending=False).reset_index(drop=True) | |
| final_df = compute_aggregated_score(merged, weight_text, weight_collab, weight_content) | |
| if top_n: | |
| return final_df.head(top_n) | |
| else: | |
| return final_df |