|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from sklearn.neighbors import KNeighborsClassifier
|
|
|
from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler
|
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
|
import joblib
|
|
|
from sklearn.decomposition import TruncatedSVD
|
|
|
from sklearn.metrics import classification_report
|
|
|
from xgboost import XGBClassifier
|
|
|
import nltk
|
|
|
from nltk.tokenize import word_tokenize
|
|
|
from nltk.corpus import stopwords
|
|
|
from nltk.stem import WordNetLemmatizer
|
|
|
from nltk.tag import pos_tag
|
|
|
import string
|
|
|
import re
|
|
|
import os
|
|
|
nltk.download('punkt')
|
|
|
nltk.download('averaged_perceptron_tagger_eng')
|
|
|
nltk.download('wordnet')
|
|
|
nltk.download('stopwords')
|
|
|
nltk.download('averaged_perceptron_tagger')
|
|
|
|
|
|
class CollaborativeRecommender:
|
|
|
def __init__(self, svd_matrix, item_to_index, index_to_item):
|
|
|
"""
|
|
|
svd_matrix: 2D numpy array (items x latent features)
|
|
|
item_to_index: dict mapping app_id to row index in svd_matrix
|
|
|
index_to_item: dict mapping row index to app_id
|
|
|
"""
|
|
|
self.svd_matrix : TruncatedSVD = svd_matrix
|
|
|
self.item_to_index = item_to_index
|
|
|
self.index_to_item = index_to_item
|
|
|
|
|
|
def save(self, path: str):
|
|
|
"""Save the entire model as a single file using joblib."""
|
|
|
joblib.dump(self, path)
|
|
|
|
|
|
@staticmethod
|
|
|
def load(path: str):
|
|
|
"""Load the entire model from a joblib file."""
|
|
|
return joblib.load(path)
|
|
|
|
|
|
def _get_item_vector(self, app_id):
|
|
|
idx = self.item_to_index.get(app_id)
|
|
|
if idx is None:
|
|
|
raise ValueError(f"app_id {app_id} not found in the model.")
|
|
|
return self.svd_matrix[idx]
|
|
|
|
|
|
def _cosine_similarity(self, vec, matrix):
|
|
|
|
|
|
vec_norm = np.linalg.norm(vec)
|
|
|
matrix_norms = np.linalg.norm(matrix, axis=1)
|
|
|
similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10)
|
|
|
return similarity
|
|
|
|
|
|
def get_similarities(self, app_ids,top_n=None):
|
|
|
"""
|
|
|
Input: app_ids - single app_id or list of app_ids
|
|
|
Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending
|
|
|
"""
|
|
|
if isinstance(app_ids, (str, int)):
|
|
|
app_ids = [app_ids]
|
|
|
elif not isinstance(app_ids, (list, tuple, np.ndarray)):
|
|
|
raise TypeError("app_ids must be a string/int or a list of such")
|
|
|
|
|
|
valid_vectors = []
|
|
|
missing_ids = []
|
|
|
for app_id in app_ids:
|
|
|
try:
|
|
|
vec = self._get_item_vector(app_id)
|
|
|
valid_vectors.append(vec)
|
|
|
except ValueError:
|
|
|
missing_ids.append(app_id)
|
|
|
|
|
|
if len(valid_vectors) == 0:
|
|
|
raise ValueError("None of the input app_ids were found in the model.")
|
|
|
|
|
|
|
|
|
aggregated_vec = np.mean(valid_vectors, axis=0)
|
|
|
|
|
|
|
|
|
similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix)
|
|
|
|
|
|
|
|
|
result_df = pd.DataFrame({
|
|
|
'app_id': [self.index_to_item[i] for i in range(len(similarities))],
|
|
|
'collaborative_similarity': similarities
|
|
|
})
|
|
|
|
|
|
|
|
|
result_df = result_df[~result_df['app_id'].isin(app_ids)]
|
|
|
|
|
|
|
|
|
result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True)
|
|
|
|
|
|
|
|
|
if missing_ids:
|
|
|
print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}")
|
|
|
if top_n:
|
|
|
return result_df.head(top_n)
|
|
|
else:
|
|
|
return result_df
|
|
|
|
|
|
class GameContentRecommender:
|
|
|
def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder):
|
|
|
self.model : KNeighborsClassifier = model
|
|
|
self.genre_encoder : MultiLabelBinarizer = genre_encoder
|
|
|
self.category_encoder : MultiLabelBinarizer = category_encoder
|
|
|
self.price_range_encoder : LabelEncoder = price_range_encoder
|
|
|
self.scaler : MinMaxScaler = scaler
|
|
|
self.app_id_encoder : LabelEncoder = app_id_encoder
|
|
|
|
|
|
def save(self, path: str):
|
|
|
"""Save the entire model as a single file using joblib."""
|
|
|
joblib.dump(self, path)
|
|
|
|
|
|
@staticmethod
|
|
|
def load(path: str):
|
|
|
"""Load the entire model from a joblib file."""
|
|
|
return joblib.load(path)
|
|
|
|
|
|
def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None):
|
|
|
genre_dict = {g: 0 for g in self.genre_encoder.classes_}
|
|
|
categories_dict = {c: 0 for c in self.category_encoder.classes_}
|
|
|
|
|
|
for genre in genres:
|
|
|
if genre != 'Unknown' and genre in genre_dict:
|
|
|
genre_dict[genre] = 1
|
|
|
|
|
|
for category in categories:
|
|
|
if category != 'Unknown' and category in categories_dict:
|
|
|
categories_dict[category] = 1
|
|
|
|
|
|
price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1))
|
|
|
scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0]
|
|
|
|
|
|
user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values())
|
|
|
|
|
|
user_df = pd.DataFrame([user_vector])
|
|
|
|
|
|
distances, indices = self.model.kneighbors(user_df)
|
|
|
distances = distances.flatten()
|
|
|
indices = indices.flatten()
|
|
|
|
|
|
similarity = 1 / (1 + distances)
|
|
|
|
|
|
app_ids = self.app_id_encoder.inverse_transform(indices)
|
|
|
|
|
|
prediction = pd.DataFrame({
|
|
|
'app_id': app_ids,
|
|
|
'content_probability': similarity
|
|
|
})
|
|
|
|
|
|
if top_n:
|
|
|
prediction = prediction.head(top_n)
|
|
|
|
|
|
return prediction
|
|
|
|
|
|
|
|
|
|
|
|
class TextBasedRecommendation():
|
|
|
def __init__(self,classifier,vectorizer,app_id_encoder,history):
|
|
|
self.classifier : XGBClassifier = classifier
|
|
|
self.vectorizer : TfidfVectorizer = vectorizer
|
|
|
self.app_id_encoder : LabelEncoder = app_id_encoder
|
|
|
self.history = history
|
|
|
|
|
|
def save(self, path_prefix: str):
|
|
|
self.classifier.save_model(f"{path_prefix}_xgb.json")
|
|
|
|
|
|
classifier_backup = self.classifier
|
|
|
self.classifier = None
|
|
|
|
|
|
joblib.dump(self, f"{path_prefix}_preprocessor.joblib")
|
|
|
|
|
|
self.classifier = classifier_backup
|
|
|
|
|
|
@staticmethod
|
|
|
def load(path_prefix: str):
|
|
|
obj = joblib.load(f"{path_prefix}_preprocessor.joblib")
|
|
|
xgb = XGBClassifier()
|
|
|
xgb.load_model(f"{path_prefix}_xgb.json")
|
|
|
obj.classifier = xgb
|
|
|
|
|
|
return obj
|
|
|
|
|
|
def preprocess(self,text : str):
|
|
|
stopword = stopwords.words('english')
|
|
|
lemmatizer = WordNetLemmatizer()
|
|
|
def convert_postag(postag:str):
|
|
|
if postag.startswith('V'):
|
|
|
return 'v'
|
|
|
elif postag.startswith('R'):
|
|
|
return 'r'
|
|
|
elif postag.startswith('J'):
|
|
|
return 'a'
|
|
|
return 'n'
|
|
|
|
|
|
def clean_space(text : str):
|
|
|
if not isinstance(text, str):
|
|
|
return ''
|
|
|
cleaned = re.sub(r'\s+', ' ', text.replace('\n', ' ')).strip()
|
|
|
return cleaned
|
|
|
|
|
|
def tokenize(text : str):
|
|
|
text = text.lower()
|
|
|
text = clean_space(text)
|
|
|
token = word_tokenize(text)
|
|
|
token = [word for word in token if word not in
|
|
|
string.punctuation and word not in stopword and word.isalpha()]
|
|
|
return token
|
|
|
|
|
|
|
|
|
def lemmatizing(token : str):
|
|
|
postag = pos_tag(token)
|
|
|
lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag]
|
|
|
return lemmatized
|
|
|
|
|
|
token = tokenize(text)
|
|
|
token = lemmatizing(token)
|
|
|
return " ".join(token)
|
|
|
|
|
|
def get_accuracy(self,X_test,y_test):
|
|
|
y_pred = self.classifier.predict(self.vectorizer.transform(X_test))
|
|
|
y_test = self.app_id_encoder.transform(y_test)
|
|
|
print(classification_report(y_test,y_pred))
|
|
|
|
|
|
def predict(self,text,top_n=None):
|
|
|
cleaned_text = self.preprocess(text)
|
|
|
vectorized_text = self.vectorizer.transform([cleaned_text])
|
|
|
proba = self.classifier.predict_proba(vectorized_text)[0]
|
|
|
class_indices = np.argsort(proba)[::-1]
|
|
|
if top_n is not None:
|
|
|
class_indices = class_indices[:top_n]
|
|
|
class_labels = self.app_id_encoder.inverse_transform(class_indices)
|
|
|
class_probs = proba[class_indices]
|
|
|
return pd.DataFrame({
|
|
|
'app_id': class_labels,
|
|
|
'text_probability': class_probs
|
|
|
})
|
|
|
|
|
|
class GameRecommendationEnsemble:
|
|
|
def __init__(self,game_content_recommeder,collaborative_recommender,text_based_recommender):
|
|
|
self.game_content_recommeder : GameContentRecommender=game_content_recommeder
|
|
|
self.collaborative_recommender : CollaborativeRecommender=collaborative_recommender
|
|
|
self.text_based_recommender : TextBasedRecommendation = text_based_recommender
|
|
|
|
|
|
def save(self, dir_path: str):
|
|
|
os.makedirs(dir_path, exist_ok=True)
|
|
|
self.game_content_recommeder.save(os.path.join(dir_path, "game_content_recommender.joblib"))
|
|
|
self.collaborative_recommender.save(os.path.join(dir_path, "collaborative_recommender.joblib"))
|
|
|
self.text_based_recommender.save(os.path.join(dir_path, "text_based_recommender"))
|
|
|
|
|
|
@staticmethod
|
|
|
def load(dir_path: str):
|
|
|
game_content_recommender = GameContentRecommender.load(os.path.join(dir_path, "game_content_recommender.joblib"))
|
|
|
collaborative_recommender = CollaborativeRecommender.load(os.path.join(dir_path, "collaborative_recommender.joblib"))
|
|
|
text_based_recommender = TextBasedRecommendation.load(os.path.join(dir_path, "text_based_recommender"))
|
|
|
|
|
|
return GameRecommendationEnsemble(
|
|
|
game_content_recommender,
|
|
|
collaborative_recommender,
|
|
|
text_based_recommender
|
|
|
)
|
|
|
|
|
|
def scale_proba(self,series):
|
|
|
if len(series)<=1:
|
|
|
return pd.Series([1.0] * len(series), index=series.index)
|
|
|
scaler = MinMaxScaler()
|
|
|
scaled = scaler.fit_transform(series.values.reshape(-1, 1)).flatten()
|
|
|
return pd.Series(scaled, index=series.index)
|
|
|
|
|
|
def predict(self, description=None, app_ids=None, price_range=None, year_release=None,
|
|
|
average_playtime=None, game_score=None, dlc_count=None,
|
|
|
genres=None, categories=None, top_n=None,
|
|
|
weight_text=1.0, weight_collab=1.0, weight_content=1.0):
|
|
|
|
|
|
merge_dfs = []
|
|
|
if description is not None:
|
|
|
text_proba = self.text_based_recommender.predict(description)
|
|
|
text_proba['app_id'] = text_proba['app_id'].astype(str)
|
|
|
text_proba['text_probability'] = self.scale_proba(text_proba['text_probability'])
|
|
|
merge_dfs.append(text_proba)
|
|
|
else:
|
|
|
weight_text=0
|
|
|
|
|
|
|
|
|
if app_ids is not None:
|
|
|
similar_app = self.collaborative_recommender.get_similarities(app_ids)
|
|
|
similar_app['app_id'] = similar_app['app_id'].astype(str)
|
|
|
similar_app['collaborative_similarity'] = self.scale_proba(similar_app['collaborative_similarity'])
|
|
|
merge_dfs.append(similar_app)
|
|
|
else:
|
|
|
weight_collab = 0
|
|
|
|
|
|
if None in (price_range, year_release,average_playtime,game_score,dlc_count, genres, categories):
|
|
|
weight_content=0
|
|
|
else:
|
|
|
similar_content = self.game_content_recommeder.predict(price_range, year_release,average_playtime,game_score,dlc_count, genres, categories)
|
|
|
similar_content['app_id'] = similar_content['app_id'].astype(str)
|
|
|
similar_content['content_probability'] = self.scale_proba(similar_content['content_probability'])
|
|
|
merge_dfs.append(similar_content)
|
|
|
|
|
|
if not merge_dfs:
|
|
|
return None
|
|
|
|
|
|
from functools import reduce
|
|
|
merged = reduce(lambda left, right: pd.merge(left, right, on='app_id', how='outer'), merge_dfs)
|
|
|
|
|
|
|
|
|
merged = merged.fillna(0)
|
|
|
|
|
|
|
|
|
def compute_aggregated_score(df, w_text, w_collab, w_content):
|
|
|
|
|
|
total_weight = w_text + w_collab + w_content
|
|
|
if total_weight == 0:
|
|
|
raise ValueError("All weights are zero. At least one weight must be positive.")
|
|
|
|
|
|
w_text /= total_weight
|
|
|
w_collab /= total_weight
|
|
|
w_content /= total_weight
|
|
|
|
|
|
df['final_score'] = (
|
|
|
df.get('text_probability', 0) * w_text +
|
|
|
df.get('collaborative_similarity', 0) * w_collab +
|
|
|
df.get('content_probability', 0) * w_content
|
|
|
)
|
|
|
|
|
|
return df.sort_values(by='final_score', ascending=False).reset_index(drop=True)
|
|
|
final_df = compute_aggregated_score(merged, weight_text, weight_collab, weight_content)
|
|
|
if top_n:
|
|
|
return final_df.head(top_n)
|
|
|
else:
|
|
|
return final_df |