VJyzCELERY
idk
7d9c98e
import subprocess
import sys
subprocess.run([sys.executable, "-m", "pip", "install", "--upgrade", "scikit-learn==1.6.1"])
import gradio as gr
import pandas as pd
import os
from component import *
from GameRecommender import *
import gc
from sklearn.model_selection import train_test_split
from huggingface_hub import snapshot_download
from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler
DATASETS = {
"converted": "converted.csv",
"Cleaned_games": "Cleaned_games.csv",
"MergedFragmentData_SAMPLE": "MergedFragmentData_SAMPLE.csv",
"Trimmed_Dataset": "Trimmed_Dataset.csv",
"UserPreferenceDF": "UserPreferenceDF.csv",
}
def load_hf_csv_dataset(repo_name, filename):
# Download the dataset repo snapshot locally, only for 'data' folder
local_path = snapshot_download(
repo_id=f"VJyzCELERY/{repo_name}",
repo_type="dataset",
allow_patterns=["data/*"],
)
csv_path = os.path.join(local_path, "data", filename)
print(f"Loading {csv_path} ...")
return pd.read_csv(csv_path, index_col=False)
DATA_BASE_PATH = 'data'
MODEL_BASE_PATH = snapshot_download(
repo_id="VJyzCELERY/SteamGameRecommender",
repo_type="model",
allow_patterns=["GameRecommender/*"]
)
SEED = 42
RAW_GAMES_DATAPATH = os.path.join(DATA_BASE_PATH,'converted.csv')
GAMES_DATAPATH = os.path.join(DATA_BASE_PATH,'Cleaned_games.csv')
REVIEWS_DATAPATH = os.path.join(DATA_BASE_PATH,'MergedFragmentData_SAMPLE.csv')
TRIMMED_REVIEW_DATAPATH = os.path.join(DATA_BASE_PATH,'Trimmed_Dataset.csv')
USER_PREFERENCE_DATAPATH = os.path.join(DATA_BASE_PATH,'UserPreferenceDF.csv')
MODEL_PATH = os.path.join(MODEL_BASE_PATH,'GameRecommender')
from datasets import load_dataset
RAW_GAMES_DS = load_dataset("VJyzCELERY/converted")
GAMES_DS = load_dataset("VJyzCELERY/Cleaned_games")
REVIEWS_DS = load_dataset("VJyzCELERY/MergedFragmentData_SAMPLE")
TRIMMED_REVIEWS_DS = load_dataset("VJyzCELERY/Trimmed_Dataset")
USER_PREF_DS = load_dataset("VJyzCELERY/UserPreferenceDF")
# load dataset
model = GameRecommendationEnsemble.load(MODEL_PATH)
vectorizer=model.text_based_recommender.vectorizer
review_app_id_encoder=model.text_based_recommender.app_id_encoder
genres = model.game_content_recommeder.genre_encoder.classes_.tolist()
genres = [genre for genre in genres if genre != 'Unknown']
categories = model.game_content_recommeder.category_encoder.classes_.tolist()
categories = [cat for cat in categories if cat != 'Unknown']
price_ranges = model.game_content_recommeder.price_range_encoder.classes_.tolist()
selectable_app_ids = list(model.collaborative_recommender.item_to_index.keys())
# df_games = pd.read_csv(GAMES_DATAPATH,index_col=False)
# df_games_raw = pd.read_csv(RAW_GAMES_DATAPATH,index_col=False)
# df_review_raw = pd.read_csv(REVIEWS_DATAPATH,index_col=False)
# df_review_trimmed = pd.read_csv(TRIMMED_REVIEW_DATAPATH,index_col=False)
# df_user_pref = pd.read_csv(USER_PREFERENCE_DATAPATH,index_col=False)
df_games = GAMES_DS['train'].to_pandas()
df_games['app_id'] = df_games['app_id'].astype(str)
df_games_raw = RAW_GAMES_DS['train'].to_pandas()
df_games_raw['AppID'] = df_games_raw['AppID'].astype(str)
df_review_raw = REVIEWS_DS['train'].to_pandas()
df_review_raw['steamid'] = df_review_raw['steamid'].astype(str)
df_review_raw['appid'] = df_review_raw['appid'].astype(str)
df_review_trimmed = TRIMMED_REVIEWS_DS['train'].to_pandas()
df_user_pref = USER_PREF_DS['train'].to_pandas()
available_names = df_games[df_games['app_id'].astype(str).isin(selectable_app_ids)]['Name'].tolist()
min_word=20
df_review_trimmed_filtered = df_review_trimmed[df_review_trimmed['cleaned_review'].apply(lambda x: len(str(x).split()) >=min_word)].reset_index(drop=True)
def extract_year(date_str):
if isinstance(date_str, str) and len(date_str) >= 4:
year_str = date_str[-4:]
if year_str.isdigit():
return int(year_str)
return None
def col_to_list(df,col='Genres'):
import ast
df[col]=df[col].apply(
lambda x: ast.literal_eval(x) if isinstance(x, str) else x
)
df[col]=df[col].apply(
lambda genres: [g.strip() for g in genres] if isinstance(genres, list) else ['Unknown']
)
return df
def apply_price_range_labels(df,labels,bins, price_col='Price', range_col='Price_range'):
df[range_col] = pd.cut(df[price_col], bins=bins, labels=labels, right=True)
return df
price_bins = [-0.01, 0, 5, 10, 20, 30, 40, 50, float('inf')]
price_ranges_labels = [
"Free",
"Less than $5",
"$5 - $9.99",
"$10 - $19.99",
"$20 - $29.99",
"$30 - $39.99",
"$40 - $49.99",
"$50+"
]
df_games_temp = df_games
df_games_temp = col_to_list(df_games_temp,'Genres')
df_games_temp = col_to_list(df_games_temp,'Categories')
df_games_temp = apply_price_range_labels(df_games_temp,price_ranges_labels,price_bins)
df_games_temp['Year_Release'] = df_games_temp['Release date'].apply(extract_year)
df_games_temp['Game score'] = np.where(
(df_games_temp['Positive'] + df_games_temp['Negative']) == 0,
0,
(df_games_temp['Positive'] / (df_games_temp['Positive'] + df_games_temp['Negative'])) * 100
)
genre_mlb = MultiLabelBinarizer()
genre_mlb = genre_mlb.fit(df_games_temp['Genres'])
categories_mlb = MultiLabelBinarizer()
categories_mlb = categories_mlb.fit(df_games_temp['Categories'])
price_range_le = model.game_content_recommeder.price_range_encoder
scaler = MinMaxScaler()
scaler = scaler.fit(df_games_temp[['Year_Release','Average playtime forever','Game score','DLC count']].values)
numerical_col =['Year_Release','Average playtime forever','Game score','DLC count']
genre_matrix = genre_mlb.transform(df_games_temp['Genres'])
genre_df = pd.DataFrame(genre_matrix, columns=genre_mlb.classes_, index=df_games_temp.index)
categories_matrix = categories_mlb.transform(df_games_temp['Categories'])
categories_df = pd.DataFrame(categories_matrix,columns=categories_mlb.classes_,index=df_games_temp.index)
game_df = pd.concat([df_games_temp[['app_id','Price_range']+numerical_col],genre_df,categories_df],axis=1)
game_df['Price_range'] = price_range_le.transform(game_df['Price_range'])
game_df[numerical_col] = scaler.transform(game_df[numerical_col].values)
del categories_matrix,genre_matrix,categories_df,genre_df,scaler,price_range_le,categories_mlb,genre_mlb
gc.collect()
def recommend_game(description=None, app_name=None, price_range=None, year_release=None,
excpected_playtime=None, game_score=None, dlc_count=None,
genres=None, categories=None, top_n=5,weight_text=1.0, weight_collab=1.0, weight_content=1.0):
if app_name:
if isinstance(app_name, (str)):
app_name = [app_name]
app_ids = df_games[df_games['Name'].isin(app_name)]['app_id'].astype(str).tolist()
else:
app_ids = None
prediction = model.predict(description=description,app_ids=app_ids,price_range=price_range,year_release=year_release,average_playtime=excpected_playtime,game_score=game_score,
dlc_count=dlc_count,genres=genres,categories=categories,top_n=top_n,weight_text=weight_text,weight_collab=weight_collab,weight_content=weight_content)
app_ids = prediction['app_id'].tolist()
output = df_games.loc[df_games['app_id'].astype(str).isin(app_ids)].reset_index()
return gr.DataFrame(value=output)
# Load external CSS file
with open('style.css', 'r') as f:
custom_css = f.read()
# for nav
def set_active_section(btn_id):
"""
button active function and handle visibility section
"""
# First set all sections to invisible
updates = [gr.update(visible=False) for _ in sections]
# Then set the selected section to visible
if btn_id in sections:
index = list(sections.keys()).index(btn_id)
updates[index] = gr.update(visible=True)
# Also update button active states
button_states = []
for btn in nav_buttons:
state = ("active" if btn.elem_id == btn_id else "")
button_states.append(gr.update(elem_classes=f"nav-btn {state}"))
return updates + button_states
"""
MAIN DEMO
"""
with gr.Blocks(css = custom_css) as demo:
# container
with gr.Row(elem_classes="container"):
# navbar
with gr.Sidebar(elem_classes="navbar"):
# nav header
with gr.Column(elem_classes="nav-header"):
gr.Markdown("# Game Recommendation by Your Preference")
# nav button container
with gr.Column(elem_classes="nav-buttons"):
# nav button list
nav_buttons = []
sections = [
('Home', 'home'),
("Dataset", "dataset"),
("Exploratory Data Analysis", "eda"),
("Preprocessing Data", "preprocess"),
("Training Result", "training"),
("Our System", "system")
]
# create button
for label, section_id in sections:
button = gr.Button(label, elem_classes="nav-btn", elem_id=f"btn-{section_id}")
nav_buttons.append(button)
# main content
with gr.Column(elem_classes="main-content"):
# Home Section
"""
Introduction section. Using header, h2, p for text formating
"""
with gr.Column(elem_id="home", elem_classes="content-section", visible=True) as home_section:
header('About This System')
with gr.Column(elem_classes='content'):
h2("Background and Problem")
p('''
One of the problem when we are looking for something that we want usually we use an abstract description of what we wanted.
This issue is also prevalent when it comes to finding games. When we ask our friend for a game we usually describe them then later on narrow them down by Genres if possible and Price.
However, most system only supports the ability to search games by their category and tags such as genres or prices.
With that, we wanted to try and make a game recommendation based on description where user can describe the game they are looking for with text and later narrow it down with classification based on their content like genres and price ranges.
''')
h2("The Model")
p("""The system consists of three model :The first one is the Language Model that will learn users review for a game and use that as a way to describe a game.
The Language Model will be a classifier based on a Gradient Boosting model called XGBClassifier.
The second model and third model will be the filter model.
The second model is a collaborative filter model where it will recommend the user a game based on a game that they have liked in the past or a game that they specify similar to the game they are looking for.
This model will learn based on other user who have reviewed a game and a similar game is the game that said user liked other than the input game. This model will use utility matrix and cosine similarity.
The third model is a content based model where it will recommend user a game based on their content such as Genres, Categories, Price range, Year Release, etc.
This third model will be a KNeighborsClassifier.""")
with gr.Column(elem_id="dataset", elem_classes="content-section", visible=False) as dataset_section:
"""
Dataset Display section. use Dataset()
will displaying dataframe.
key attribute is optional
"""
header('DATASET')
with gr.Column(elem_classes='datasets-container'):
Dataset(
df=df_games_raw.head(20),
title="1. Games Dataset",
source=GAMES_DATAPATH,
key="game_data"
)
h2(f'Shape : {df_games_raw.shape}')
Dataset(
df=df_review_raw.head(20),
title="2. Steam Review Dataset",
source=REVIEWS_DATAPATH,
key="reviews"
)
h2(f'Shape : {df_review_raw.shape}')
# eda section
with gr.Column(elem_id="eda", elem_classes="content-section", visible=False) as eda_section:
header('EDA System')
h2('1. Game Dataset')
code_cell('df.head(5)')
gr.Dataframe(df_games_raw.head(5))
p(f'Dataset shape : {df_games_raw.shape}')
h2('2. Description of data')
code_cell('df.describe()')
gr.Dataframe(df_games_raw.describe().reset_index())
h2('3. Missing values')
gr.Dataframe(show_missing_values(df_games_raw))
h2('4. Distribution of data')
dropdown = gr.Dropdown(choices=list(df_games_raw.columns), label="Select Column for Distribution",value=list(df_games_raw.columns)[0] if len(df_games_raw.columns) > 0 else None,allow_custom_value=True)
plot_output = gr.Plot(format='png')
dropdown.change(plot_distribution, inputs=[gr.State(df_games_raw), dropdown], outputs=plot_output)
h2('1. Review Dataset')
code_cell('df.head(5)')
gr.Dataframe(df_review_raw.head(5))
p(f'Dataset shape : {df_review_raw.shape}')
h2('2. Description of data')
code_cell('df.describe()')
gr.Dataframe(df_review_raw.describe().reset_index())
h2('3. Missing values')
gr.Dataframe(show_missing_values(df_review_raw))
h2('4. Distribution of data')
dropdown = gr.Dropdown(choices=list(df_review_raw.columns), label="Select Column for Distribution",value=list(df_review_raw.columns)[0] if len(df_review_raw.columns) > 0 else None,allow_custom_value=True)
plot_output = gr.Plot(format='png')
dropdown.change(plot_distribution, inputs=[gr.State(df_review_raw), dropdown], outputs=plot_output)
# preprocess section
with gr.Column(elem_id="preprocess", elem_classes="content-section", visible=False) as preprocess_section:
header('Preprocess System')
h2("1. Review Dataset initial merging")
code_cell("""
import pandas as pd
import glob
import os
from langdetect import detect
from joblib import Parallel, delayed
from tqdm import tqdm
folder_path = 'Fragmented_Dataset'
csv_files = glob.glob(os.path.join(folder_path, '*.csv'))
df_list = [pd.read_csv(file) for file in csv_files]
df = pd.concat(df_list, ignore_index=True)
min_word = 20
print(f'shape before filtering : {df.shape}')
df = df[df['review'].apply(lambda x: len(str(x).split()) >=min_word)].reset_index(drop=True)
print(f'shape after filtering : {df.shape}')
def detect_lang(text):
try:
return detect(str(text))
except:
return 'error'
results = Parallel(n_jobs=6)(
delayed(detect_lang)(text) for text in tqdm(df['review'], desc='Detecting Language')
)
df['lang'] = results
# Filter English reviews only
df_english = df[df['lang'] == 'en'].drop(columns=['lang'])
df_english.to_csv('english_reviews.csv', index=False)
print("Finished filtering English reviews!")
""")
h2("Output : ")
code_cell("""
>> shape before filtering : (15437471, 13)
>> shape after filtering : (6531410, 13)
>> Finished filtering English reviews!
""")
h2("2. Data Preprocessing")
h2("2.1. Games Data Cleaning")
code_cell("""
game_datapath = 'converted.csv'
df_games_raw = pd.read_csv('converted.csv',index_col=False)
df_games_raw.rename(columns={"AppID": "app_id"}, inplace=True)
df_games_raw["Genres"] = df_games_raw["Genres"].apply(lambda x: x.split(",") if isinstance(x, str) else ['NONE'])
df_games_raw["Tags"] = df_games_raw["Tags"].apply(lambda x: x.split(",") if isinstance(x, str) else ['NONE'])
df_games_raw['Genres'] = df_games_raw['Genres']+df_games_raw['Tags']
def make_set(row):
data = [d for d in row if d != 'NONE']
return set(data)
df_games_raw['Genres'] = df_games_raw['Genres'].apply(make_set)
genres_to_keep = [
'Action', 'Adventure', 'RPG', 'Strategy', 'Simulation',
'Casual', 'Indie', 'Sports', 'Racing', 'Fighting',
'Puzzle', 'Shooter', 'Platformer', 'MMO', 'Horror',
'Survival', 'Open World', 'Visual Novel', 'Point & Click',
'Sandbox', 'Metroidvania', 'Tactical', 'Rhythm',
'Stealth', 'Rogue-like', 'Rogue-lite'
]
df_games_raw['Genres'] = df_games_raw['Genres'].apply(lambda genre_list: [g for g in genre_list if g in genres_to_keep])
df_games_raw = df_games_raw[['app_id','Name','Release date','DLC count','Positive','Negative','Average playtime forever','Price','Developers','Publishers','Detailed description','About the game','Short description','Categories','Genres','Achievements','Windows','Mac','Linux']]
df_games_raw["Categories"] = df_games_raw["Categories"].apply(lambda x: x.split(",") if isinstance(x, str) else ['Unknown'])
df_games_raw['Detailed description'] = df_games_raw['Detailed description'].fillna('')
df_games_raw['About the game'] = df_games_raw['About the game'].fillna('')
df_games_raw['Short description'] = df_games_raw['About the game'].fillna('')
df_games_raw['Developers'] = df_games_raw['Developers'].fillna('')
df_games_raw['Publishers'] = df_games_raw['Publishers'].fillna('')
df_games_raw.to_csv('Cleaned_games.csv',index=False)
""")
# h2('Games Data Cleaned')
# gr.Dataframe(df_games.head(20))
h2('2.2. Review Preprocessing')
# Dataset(df_review_raw,'Review Data Raw',REVIEWS_DATAPATH)
code_cell("""
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
import string
from joblib import Parallel, delayed
import multiprocessing
from tqdm import tqdm
import re
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger_eng')
nltk.download('wordnet')
datapath = 'english_reviews.csv'
df = pd.read_csv(datapath)
stopword = stopwords.words('english')
lemmatizer = WordNetLemmatizer()
def convert_postag(postag:str):
if postag.startswith('V'):
return 'v'
elif postag.startswith('R'):
return 'r'
elif postag.startswith('J'):
return 'a'
return 'n'
def clean_space(text : str):
if not isinstance(text, str):
return ''
# Replace newlines with space, collapse multiple spaces, strip
cleaned = re.sub(r'\s+', ' ', text.replace('\\n', ' ')).strip()
return cleaned
def tokenize(text : str):
text = text.lower() # lower sentencees
text = clean_space(text)
token = word_tokenize(text) # tokenize
# remove stopword punctuation and numeric
token = [word for word in token if word not in
string.punctuation and word not in stopword and word.isalpha()]
return token
def lemmatizing(token : str):
postag = pos_tag(token)
lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag]
return lemmatized
def preprocess(text : str):
token = tokenize(text)
token = lemmatizing(token)
return " ".join(token)
num_cores = int(multiprocessing.cpu_count()*0.75)
print("Cleaning Data . . .")
df["cleaned_review"] = Parallel(n_jobs=num_cores)(
delayed(preprocess)(text) for text in tqdm(df["review"], desc="Processing reviews")
)
gc.collect()
df = df[['steamid','app_id','voted_up','cleaned_review']]
df.to_csv('Cleaned_Dataframe.csv',index=False)
""")
Dataset(df_review_trimmed,'Cleaned Review',source=TRIMMED_REVIEW_DATAPATH,key='trimmed_review')
code_cell("""
min_word = 20
df = df[df['cleaned_review'].apply(lambda x: len(str(x).split()) >=min_word)].reset_index(drop=True)
""")
code_cell(f"""
>>> shape before filtering : {df_review_trimmed.shape}
>>> shape after filtering : {df_review_trimmed_filtered.shape}
>>> number of unique app_ids : {len(set(df_review_trimmed_filtered['app_id']))}
""")
fig, ax = plt.subplots()
df_review_trimmed_filtered['app_id'].value_counts().plot(kind='bar',ax=ax)
ax.set_xlabel('app_id')
ax.set_ylabel('Count')
ax.set_title('Value Counts of app_id')
gr.Plot(fig,format='png')
class_counts = df_review_trimmed_filtered['app_id'].value_counts()
gr.Dataframe(describe_value_counts(class_counts))
code_cell("""
min_row = 4500
max_row = 5000
def sample_group(g):
if len(g) > max_row:
return g.sample(n=max_row, random_state=SEED)
else:
return g
# Filter categories with at least min_row rows
filtered = df.groupby('app_id').filter(lambda x: len(x) >= min_row)
# For each app_id, keep only max_row rows (if more, trim to max_row)
df = filtered.groupby('app_id', group_keys=False).apply(sample_group).reset_index(drop=True)""")
min_row = 4500
max_row = 5000
def sample_group(g):
if len(g) > max_row:
return g.sample(n=max_row, random_state=SEED)
else:
return g
sampled=df_review_trimmed_filtered.groupby('app_id').filter(lambda x:len(x)>= min_row)
sampled = sampled.groupby('app_id',group_keys=False).apply(sample_group).reset_index(drop=True)
code_cell(f"""
Num of class after sampling : {len(set(sampled['app_id']))}
Shape of the sampled df : {sampled.shape}
""")
fig,ax = plt.subplots()
sampled_class_dist = sampled['app_id'].value_counts()
sampled_class_dist.plot(kind='bar',ax=ax)
ax.set_xlabel('app_id')
ax.set_ylabel('Count')
ax.set_title('Value Counts of app_id')
code_cell("""
df['app_id'].value_counts().plot(kind='bar')
plt.xlabel('app_id')
plt.ylabel('Count')
plt.title('Value Counts of app_id')
plt.show()
df.to_csv('Cleaned_Trimmed_Dataset.csv',index=False)""")
gr.Plot(fig,format='png')
code_cell("""class_counts = df['app_id'].value_counts()""")
gr.DataFrame(describe_value_counts(sampled_class_dist))
h2('Review Preprocessed!')
h2('2.3. User Preference Data')
Dataset(df_review_raw,'User Review Dataset',REVIEWS_DATAPATH)
code_cell("""
df_review = df_review[['steamid','appid','voted_up']]
df_review.to_csv('UserPreferenceDF.csv',index=False)
""")
Dataset(df_user_pref,'User Preference Dataset',USER_PREFERENCE_DATAPATH)
p(f"Dataset Shape : {df_user_pref.shape}")
df_liked=df_user_pref[df_user_pref['voted_up']==1]
df_liked.rename(columns={'appid':'app_id'},inplace=True)
df_liked['voted_up'] = df_liked['voted_up'].astype(int)
df_liked['steamid'] = df_liked['steamid'].astype(str)
df_liked['app_id'] = df_liked['app_id'].astype(str)
df_liked = df_liked.drop_duplicates(subset=['steamid', 'app_id'])
code_cell("""
df_liked=df_users[df_users['voted_up']==1]
df_liked.rename(columns={'appid':'app_id'},inplace=True)
df_liked['voted_up'] = df_liked['voted_up'].astype(int)
df_liked['steamid'] = df_liked['steamid'].astype(str)
df_liked['app_id'] = df_liked['app_id'].astype(str)
df_liked = df_liked.drop_duplicates(subset=['steamid', 'app_id'])
""")
h2(f"Dataset Shape : {df_liked.shape}")
code_cell("""
# Keep users who liked at least 5 games
user_counts = df_liked['steamid'].value_counts()
df_liked = df_liked[df_liked['steamid'].isin(user_counts[user_counts >= 5].index)]
# Keep games liked by at least 10 users
game_counts = df_liked['app_id'].value_counts()
df_liked = df_liked[df_liked['app_id'].isin(game_counts[game_counts >= 10].index)]
df_liked = df_liked.drop_duplicates(subset=['steamid', 'app_id'])
""")
p(f"Unique steamids: {df_liked['steamid'].nunique()}")
p(f"Unique app_ids: {df_liked['app_id'].nunique()}")
p(f"Total rows: {len(df_liked)}")
h2("We're done here, next stop is Training!")
# training section
with gr.Column(elem_id="training", elem_classes="content-section", visible=False) as training_section:
header('Training Result')
h2("Language Model Training")
h2('Dataset')
gr.Dataframe(sampled.head(15))
code_cell("""
vectorizer = TfidfVectorizer(max_df=0.7,min_df=3,stop_words=None,ngram_range=(1,2))
review_app_id_encoder = LabelEncoder()""")
train_df,df_temp = train_test_split(sampled,test_size=0.2,random_state=SEED,stratify=sampled['app_id'])
test_df,val_df = train_test_split(df_temp,test_size=0.5,random_state=SEED,stratify=df_temp['app_id'])
del df_temp
gc.collect()
code_cell("""
train_df,df_temp = train_test_split(sampled,test_size=0.2,random_state=SEED,stratify=sampled['app_id'])
test_df,val_df = train_test_split(df_temp,test_size=0.5,random_state=SEED,stratify=df_temp['app_id'])
""")
p(f"""
Training : {train_df.shape}
Testing : {test_df.shape}
Validation : {val_df.shape}
""")
del train_df,val_df
gc.collect()
code_cell("""
X_train = vectorizer.fit_transform(train_df['cleaned_review'])
y_train = review_app_id_encoder.fit_transform(train_df['app_id'])
X_test = vectorizer.transform(test_df['cleaned_review'])
y_test = review_app_id_encoder.transform(test_df['app_id'])
X_val = vectorizer.transform(val_df['cleaned_review'])
y_val = review_app_id_encoder.transform(val_df['app_id'])""")
p("""The shape of X_train : (656396, 1795889)}""")
code_cell("""
classifier = XGBClassifier(
objective='multi:softprob',
max_depth=4,
learning_rate=0.2,
n_estimators=18,
subsample=0.7,
colsample_bytree=0.7,
reg_alpha=1.0,
reg_lambda=1.0,
tree_method='hist',
eval_metric=['mlogloss', 'merror'],
early_stopping_rounds=10
)""")
code_cell("""
classifier.fit(
X_train,y_train,
eval_set=[(X_train, y_train), (X_val, y_val)],
verbose=True
)
""")
history = model.text_based_recommender.history
n_estimator = np.arange(len(history['validation_0']['merror']))
h2('Training vs Validation log loss')
results = {
"merror": history['validation_0']['merror'],
"mlogloss": history['validation_0']['mlogloss']
}
plot_output = gr.Plot(format='png')
btn = gr.Button("Generate Plot")
btn.click(fn=lambda:plot_training_results(n_estimator,history['validation_0']['mlogloss'],history['validation_1']['mlogloss'],'Training Log Loss','Validation Log Loss','Log Loss','N Estimator'), inputs=[], outputs=plot_output, preprocess=False)
h2('Training vs Validation error')
plot_outputval = gr.Plot(format='png')
btnval = gr.Button("Generate Plot")
btnval.click(fn=lambda:plot_training_results(n_estimator,history['validation_0']['merror'],history['validation_1']['merror'],'Training error','Validation error','merror','N Estimator'), inputs=[], outputs=plot_outputval, preprocess=False)
y_pred = model.text_based_recommender.classifier.predict(vectorizer.transform(test_df['cleaned_review']))
y_test = model.text_based_recommender.app_id_encoder.transform(test_df['app_id'])
class_report = classification_report(y_test,y_pred)
h2("Classification Report")
code_cell(f"""
{class_report}
""")
h2("Language Model Class")
code_cell("""
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
import string
import re
import os
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger_eng')
nltk.download('wordnet')
class TextBasedRecommendation():
def __init__(self,classifier,vectorizer,app_id_encoder,history):
self.classifier : XGBClassifier = classifier
self.vectorizer : TfidfVectorizer = vectorizer
self.app_id_encoder : LabelEncoder = app_id_encoder
self.history = history
def updateModel(self):
self.classifier.save_model('xgb_model.json')
self.classifier.load_model('xgb_model.json')
def save(self, path_prefix: str):
self.classifier.save_model(f"{path_prefix}_xgb.json")
classifier_backup = self.classifier
self.classifier = None
joblib.dump(self, f"{path_prefix}_preprocessor.joblib")
self.classifier = classifier_backup
@staticmethod
def load(path_prefix: str):
obj = joblib.load(f"{path_prefix}_preprocessor.joblib")
xgb = XGBClassifier()
xgb.load_model(f"{path_prefix}_xgb.json")
obj.classifier = xgb
return obj
def preprocess(self,text : str):
stopword = stopwords.words('english')
lemmatizer = WordNetLemmatizer()
def convert_postag(postag:str):
if postag.startswith('V'):
return 'v'
elif postag.startswith('R'):
return 'r'
elif postag.startswith('J'):
return 'a'
return 'n'
def clean_space(text : str):
if not isinstance(text, str):
return ''
cleaned = re.sub(r'\s+', ' ', text.replace('\\n', ' ')).strip()
return cleaned
def tokenize(text : str):
text = text.lower()
text = clean_space(text)
token = word_tokenize(text)
token = [word for word in token if word not in
string.punctuation and word not in stopword and word.isalpha()]
return token
# lemmatize
def lemmatizing(token : str):
postag = pos_tag(token)
lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag]
return lemmatized
token = tokenize(text)
token = lemmatizing(token)
return " ".join(token)
def get_accuracy(self,X_test,y_test):
y_pred = self.classifier.predict(self.vectorizer.transform(X_test))
y_test = self.app_id_encoder.transform(y_test)
print(classification_report(y_test,y_pred))
def predict(self,text,top_n=None):
cleaned_text = self.preprocess(text)
vectorized_text = self.vectorizer.transform([cleaned_text])
proba = self.classifier.predict_proba(vectorized_text)[0]
class_indices = np.argsort(proba)[::-1]
if top_n is not None:
class_indices = class_indices[:top_n]
class_labels = self.app_id_encoder.inverse_transform(class_indices)
class_probs = proba[class_indices]
return pd.DataFrame({
'app_id': class_labels,
'text_probability': class_probs
})""")
h2("Collaborative Filter Training")
h2("Dataset of User Preference")
gr.DataFrame(df_liked.head(10))
p(f"Unique steamids: {df_liked['steamid'].nunique()}")
p(f"Unique app_ids: {df_liked['app_id'].nunique()}")
p(f"Total rows: {len(df_liked)}")
p(f"Unique (steamid, app_id) pairs: {df_liked.drop_duplicates(subset=['steamid', 'app_id']).shape[0]}")
top_n=3001
# Top n users with most reviews
top_users = df_liked['steamid'].value_counts().head(top_n).index
# Top n games with most reviews
top_games = df_liked['app_id'].value_counts().head(top_n).index
df_liked = df_liked[df_liked['steamid'].isin(top_users) & df_liked['app_id'].isin(top_games)]
user_item_matrix = df_liked.pivot_table(
index='steamid',
columns='app_id',
values='voted_up',
aggfunc='max',
fill_value=0
)
user_item_matrix = user_item_matrix.reset_index().head(10)
code_cell("""
top_n=3001
# Top n users with most reviews
top_users = df_liked['steamid'].value_counts().head(top_n).index
# Top n games with most reviews
top_games = df_liked['app_id'].value_counts().head(top_n).index
df_liked = df_liked[df_liked['steamid'].isin(top_users) & df_liked['app_id'].isin(top_games)]
user_item_matrix = df_liked.pivot_table(
index='steamid',
columns='app_id',
values='voted_up',
aggfunc='max',
fill_value=0
)
""")
gr.Dataframe(user_item_matrix)
del user_item_matrix
gc.collect()
code_cell("""
from sklearn.decomposition import TruncatedSVD
X = user_item_matrix.T
n_components = 100
svd = TruncatedSVD(n_components=n_components, random_state=42)
item_embeddings = svd.fit_transform(X)
item_list = list(user_item_matrix.columns)
unique_items =df_liked['app_id'].unique()
item_to_index = {item: idx for idx, item in enumerate(unique_items)}
""")
h2("Model")
code_cell("""
import numpy as np
import joblib
class CollaborativeRecommender:
def __init__(self, svd_matrix, item_to_index, index_to_item):
\"""
svd_matrix: 2D numpy array (items x latent features)
item_to_index: dict mapping app_id to row index in svd_matrix
index_to_item: dict mapping row index to app_id
\"""
self.svd_matrix : TruncatedSVD = svd_matrix
self.item_to_index = item_to_index
self.index_to_item = index_to_item
def save(self, path: str):
\"""Save the entire model as a single file using joblib.\"""
joblib.dump(self, path)
@staticmethod
def load(path: str):
\"""Load the entire model from a joblib file.\"""
return joblib.load(path)
def _get_item_vector(self, app_id):
idx = self.item_to_index.get(app_id)
if idx is None:
raise ValueError(f"app_id {app_id} not found in the model.")
return self.svd_matrix[idx]
def _cosine_similarity(self, vec, matrix):
# Cosine similarity between vec and all rows in matrix
vec_norm = np.linalg.norm(vec)
matrix_norms = np.linalg.norm(matrix, axis=1)
similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10)
return similarity
def get_similarities(self, app_ids,top_n=None):
\"""
Input: app_ids - single app_id or list of app_ids
Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending
\"""
if isinstance(app_ids, (str, int)):
app_ids = [app_ids]
elif not isinstance(app_ids, (list, tuple, np.ndarray)):
raise TypeError("app_ids must be a string/int or a list of such")
valid_vectors = []
missing_ids = []
for app_id in app_ids:
try:
vec = self._get_item_vector(app_id)
valid_vectors.append(vec)
except ValueError:
missing_ids.append(app_id)
if len(valid_vectors) == 0:
raise ValueError("None of the input app_ids were found in the model.")
# Aggregate vectors by averaging if multiple inputs
aggregated_vec = np.mean(valid_vectors, axis=0)
# Compute similarity with all items
similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix)
# Build DataFrame of results
result_df = pd.DataFrame({
'app_id': [self.index_to_item[i] for i in range(len(similarities))],
'collaborative_similarity': similarities
})
# Exclude the input app_ids themselves from results
result_df = result_df[~result_df['app_id'].isin(app_ids)]
# Sort descending by similarity
result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True)
# If any input app_ids were missing, notify user (optional)
if missing_ids:
print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}")
if top_n:
return result_df.head(top_n)
else:
return result_df""")
h2("Content Based Model")
code_cell("""
def col_to_list(df,col='Genres'):
import ast
df[col]=df[col].apply(
lambda x: ast.literal_eval(x) if isinstance(x, str) else x
)
df[col]=df[col].apply(
lambda genres: [g.strip() for g in genres] if isinstance(genres, list) else ['Unknown']
)
return df
def apply_price_range_labels(df,labels,bins, price_col='Price', range_col='Price_range'):
df[range_col] = pd.cut(df[price_col], bins=bins, labels=labels, right=True)
return df
price_bins = [-0.01, 0, 5, 10, 20, 30, 40, 50, float('inf')]
price_labels = [
"Free",
"Less than $5",
"$5 - $9.99",
"$10 - $19.99",
"$20 - $29.99",
"$30 - $39.99",
"$40 - $49.99",
"$50+"
]
df = pd.read_csv("Cleaned_games.csv",index_col=False)
df = col_to_list(df,'Genres')
df = col_to_list(df,'Categories')
df = apply_price_range_labels(df,price_labels,price_bins)
""")
# Dataset(df_games,"The game dataset",GAMES_DATAPATH)
code_cell("""
def extract_year(date_str):
if isinstance(date_str, str) and len(date_str) >= 4:
year_str = date_str[-4:]
if year_str.isdigit():
return int(year_str)
return None
df['Year_Release'] = df['Release date'].apply(extract_year)
df['Game score'] = np.where(
(df['Positive'] + df['Negative']) == 0,
0,
(df['Positive'] / (df['Positive'] + df['Negative'])) * 100
)""")
code_cell("""
from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler
genre_mlb = MultiLabelBinarizer()
genre_mlb = genre_mlb.fit(df['Genres'])
categories_mlb = MultiLabelBinarizer()
categories_mlb = categories_mlb.fit(df['Categories'])
price_range_le = LabelEncoder()
price_range_le = price_range_le.fit(price_labels)
scaler = MinMaxScaler()
scaler = scaler.fit(df[['Year_Release','Average playtime forever','Game score','DLC count']].values)
app_id_le = LabelEncoder()
app_id_le = app_id_le.fit(df['app_id'])
numerical_col =['Year_Release','Average playtime forever','Game score','DLC count']""")
code_cell("""
genre_matrix = genre_mlb.transform(df['Genres'])
genre_df = pd.DataFrame(genre_matrix, columns=genre_mlb.classes_, index=df.index)
categories_matrix = categories_mlb.transform(df['Categories'])
categories_df = pd.DataFrame(categories_matrix,columns=categories_mlb.classes_,index=df.index)
game_df = pd.concat([df[['app_id','Price_range']+numerical_col],genre_df,categories_df],axis=1)""")
gr.Dataframe(df_games_temp.head(10))
del df_games_temp
gc.collect()
code_cell("""
from sklearn.neighbors import KNeighborsClassifier
X = game_df.loc[:,['Year_Release','Average playtime forever','Game score','DLC count','Price_range']+ list(genre_mlb.classes_) + list(categories_mlb.classes_)]
y = app_id_le.transform(game_df['app_id'])
model = KNeighborsClassifier(n_neighbors=len(y), metric='cosine')
model.fit(X.values,y)
""")
h2("Content Based Recommender Class")
code_cell("""
class GameContentRecommender:
def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder):
self.model : KNeighborsClassifier = model
self.genre_encoder : MultiLabelBinarizer = genre_encoder
self.category_encoder : MultiLabelBinarizer = category_encoder
self.price_range_encoder : LabelEncoder = price_range_encoder
self.scaler : MinMaxScaler = scaler
self.app_id_encoder : LabelEncoder = app_id_encoder
def save(self, path: str):
\"""Save the entire model as a single file using joblib.\"""
joblib.dump(self, path)
@staticmethod
def load(path: str):
\"""Load the entire model from a joblib file.\"""
return joblib.load(path)
def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None):
# Create one-hot encoded genre and category dicts
genre_dict = {g: 0 for g in self.genre_encoder.classes_}
categories_dict = {c: 0 for c in self.category_encoder.classes_}
for genre in genres:
if genre != 'Unknown' and genre in genre_dict:
genre_dict[genre] = 1
for category in categories:
if category != 'Unknown' and category in categories_dict:
categories_dict[category] = 1
# Encode and normalize numeric features
price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1))
scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0]
user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values())
# Prepare DataFrame for KNN
user_df = pd.DataFrame([user_vector])
# Get KNN results
distances, indices = self.model.kneighbors(user_df)
distances = distances.flatten()
indices = indices.flatten()
# Convert distances to similarity scores
similarity = 1 / (1 + distances)
# Decode app_ids
app_ids = self.app_id_encoder.inverse_transform(indices)
prediction = pd.DataFrame({
'app_id': app_ids,
'content_probability': similarity
})
if top_n:
prediction = prediction.head(top_n)
return prediction
""")
h2("After finishing with individual model we finally ensemble them together")
code_cell("""
import numpy as np
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import joblib
from sklearn.decomposition import TruncatedSVD
from sklearn.metrics import classification_report
from xgboost import XGBClassifier
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
import string
import re
import os
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger_eng')
nltk.download('wordnet')
class CollaborativeRecommender:
def __init__(self, svd_matrix, item_to_index, index_to_item):
\"""
svd_matrix: 2D numpy array (items x latent features)
item_to_index: dict mapping app_id to row index in svd_matrix
index_to_item: dict mapping row index to app_id
\"""
self.svd_matrix : TruncatedSVD = svd_matrix
self.item_to_index = item_to_index
self.index_to_item = index_to_item
def save(self, path: str):
\"""Save the entire model as a single file using joblib.\"""
joblib.dump(self, path)
@staticmethod
def load(path: str):
\"""Load the entire model from a joblib file.\"""
return joblib.load(path)
def _get_item_vector(self, app_id):
idx = self.item_to_index.get(app_id)
if idx is None:
raise ValueError(f"app_id {app_id} not found in the model.")
return self.svd_matrix[idx]
def _cosine_similarity(self, vec, matrix):
# Cosine similarity between vec and all rows in matrix
vec_norm = np.linalg.norm(vec)
matrix_norms = np.linalg.norm(matrix, axis=1)
similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10)
return similarity
def get_similarities(self, app_ids,top_n=None):
\"""
Input: app_ids - single app_id or list of app_ids
Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending
\"""
if isinstance(app_ids, (str, int)):
app_ids = [app_ids]
elif not isinstance(app_ids, (list, tuple, np.ndarray)):
raise TypeError("app_ids must be a string/int or a list of such")
valid_vectors = []
missing_ids = []
for app_id in app_ids:
try:
vec = self._get_item_vector(app_id)
valid_vectors.append(vec)
except ValueError:
missing_ids.append(app_id)
if len(valid_vectors) == 0:
raise ValueError("None of the input app_ids were found in the model.")
# Aggregate vectors by averaging if multiple inputs
aggregated_vec = np.mean(valid_vectors, axis=0)
# Compute similarity with all items
similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix)
# Build DataFrame of results
result_df = pd.DataFrame({
'app_id': [self.index_to_item[i] for i in range(len(similarities))],
'collaborative_similarity': similarities
})
# Exclude the input app_ids themselves from results
result_df = result_df[~result_df['app_id'].isin(app_ids)]
# Sort descending by similarity
result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True)
# If any input app_ids were missing, notify user (optional)
if missing_ids:
print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}")
if top_n:
return result_df.head(top_n)
else:
return result_df
class GameContentRecommender:
def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder):
self.model : KNeighborsClassifier = model
self.genre_encoder : MultiLabelBinarizer = genre_encoder
self.category_encoder : MultiLabelBinarizer = category_encoder
self.price_range_encoder : LabelEncoder = price_range_encoder
self.scaler : MinMaxScaler = scaler
self.app_id_encoder : LabelEncoder = app_id_encoder
def save(self, path: str):
\"""Save the entire model as a single file using joblib.\"""
joblib.dump(self, path)
@staticmethod
def load(path: str):
\"""Load the entire model from a joblib file.\"""
return joblib.load(path)
def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None):
genre_dict = {g: 0 for g in self.genre_encoder.classes_}
categories_dict = {c: 0 for c in self.category_encoder.classes_}
for genre in genres:
if genre != 'Unknown' and genre in genre_dict:
genre_dict[genre] = 1
for category in categories:
if category != 'Unknown' and category in categories_dict:
categories_dict[category] = 1
price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1))
scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0]
user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values())
user_df = pd.DataFrame([user_vector])
distances, indices = self.model.kneighbors(user_df)
distances = distances.flatten()
indices = indices.flatten()
similarity = 1 / (1 + distances)
app_ids = self.app_id_encoder.inverse_transform(indices)
prediction = pd.DataFrame({
'app_id': app_ids,
'content_probability': similarity
})
if top_n:
prediction = prediction.head(top_n)
return prediction
class TextBasedRecommendation():
def __init__(self,classifier,vectorizer,app_id_encoder,history):
self.classifier : XGBClassifier = classifier
self.vectorizer : TfidfVectorizer = vectorizer
self.app_id_encoder : LabelEncoder = app_id_encoder
self.history = history
def save(self, path_prefix: str):
self.classifier.save_model(f"{path_prefix}_xgb.json")
classifier_backup = self.classifier
self.classifier = None
joblib.dump(self, f"{path_prefix}_preprocessor.joblib")
self.classifier = classifier_backup
@staticmethod
def load(path_prefix: str):
obj = joblib.load(f"{path_prefix}_preprocessor.joblib")
xgb = XGBClassifier()
xgb.load_model(f"{path_prefix}_xgb.json")
obj.classifier = xgb
return obj
def preprocess(self,text : str):
stopword = stopwords.words('english')
lemmatizer = WordNetLemmatizer()
def convert_postag(postag:str):
if postag.startswith('V'):
return 'v'
elif postag.startswith('R'):
return 'r'
elif postag.startswith('J'):
return 'a'
return 'n'
def clean_space(text : str):
if not isinstance(text, str):
return ''
cleaned = re.sub(r'\s+', ' ', text.replace('\\n', ' ')).strip()
return cleaned
def tokenize(text : str):
text = text.lower()
text = clean_space(text)
token = word_tokenize(text)
token = [word for word in token if word not in
string.punctuation and word not in stopword and word.isalpha()]
return token
# lemmatize
def lemmatizing(token : str):
postag = pos_tag(token)
lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag]
return lemmatized
token = tokenize(text)
token = lemmatizing(token)
return " ".join(token)
def get_accuracy(self,X_test,y_test):
y_pred = self.classifier.predict(self.vectorizer.transform(X_test))
y_test = self.app_id_encoder.transform(y_test)
print(classification_report(y_test,y_pred))
def predict(self,text,top_n=None):
cleaned_text = self.preprocess(text)
vectorized_text = self.vectorizer.transform([cleaned_text])
proba = self.classifier.predict_proba(vectorized_text)[0]
class_indices = np.argsort(proba)[::-1]
if top_n is not None:
class_indices = class_indices[:top_n]
class_labels = self.app_id_encoder.inverse_transform(class_indices)
class_probs = proba[class_indices]
return pd.DataFrame({
'app_id': class_labels,
'text_probability': class_probs
})
class GameRecommendationEnsemble:
def __init__(self,game_content_recommeder,collaborative_recommender,text_based_recommender):
self.game_content_recommeder : GameContentRecommender=game_content_recommeder
self.collaborative_recommender : CollaborativeRecommender=collaborative_recommender
self.text_based_recommender : TextBasedRecommendation = text_based_recommender
def save(self, dir_path: str):
os.makedirs(dir_path, exist_ok=True)
self.game_content_recommeder.save(os.path.join(dir_path, "game_content_recommender.joblib"))
self.collaborative_recommender.save(os.path.join(dir_path, "collaborative_recommender.joblib"))
self.text_based_recommender.save(os.path.join(dir_path, "text_based_recommender"))
@staticmethod
def load(dir_path: str):
game_content_recommender = GameContentRecommender.load(os.path.join(dir_path, "game_content_recommender.joblib"))
collaborative_recommender = CollaborativeRecommender.load(os.path.join(dir_path, "collaborative_recommender.joblib"))
text_based_recommender = TextBasedRecommendation.load(os.path.join(dir_path, "text_based_recommender"))
return GameRecommendationEnsemble(
game_content_recommender,
collaborative_recommender,
text_based_recommender
)
def scale_proba(self,series):
if len(series)<=1:
return pd.Series([1.0] * len(series), index=series.index)
scaler = MinMaxScaler()
scaled = scaler.fit_transform(series.values.reshape(-1, 1)).flatten()
return pd.Series(scaled, index=series.index)
def predict(self, description=None, app_ids=None, price_range=None, year_release=None,
average_playtime=None, game_score=None, dlc_count=None,
genres=None, categories=None, top_n=None,
weight_text=1.0, weight_collab=1.0, weight_content=1.0):
merge_dfs = []
if description is not None:
text_proba = self.text_based_recommender.predict(description)
text_proba['app_id'] = text_proba['app_id'].astype(str)
text_proba['text_probability'] = self.scale_proba(text_proba['text_probability'])
merge_dfs.append(text_proba)
else:
weight_text=0
# Collaborative similarity (only if app_ids is provided)
if app_ids is not None:
similar_app = self.collaborative_recommender.get_similarities(app_ids)
similar_app['app_id'] = similar_app['app_id'].astype(str)
similar_app['collaborative_similarity'] = self.scale_proba(similar_app['collaborative_similarity'])
merge_dfs.append(similar_app)
else:
weight_collab = 0 # No weight if not used
if None in (price_range, year_release,average_playtime,game_score,dlc_count, genres, categories):
weight_content=0
else:
similar_content = self.game_content_recommeder.predict(price_range, year_release,average_playtime,game_score,dlc_count, genres, categories)
similar_content['app_id'] = similar_content['app_id'].astype(str)
similar_content['content_probability'] = self.scale_proba(similar_content['content_probability'])
merge_dfs.append(similar_content)
if not merge_dfs:
return None
from functools import reduce
merged = reduce(lambda left, right: pd.merge(left, right, on='app_id', how='outer'), merge_dfs)
# Fill missing values
merged = merged.fillna(0)
# Final score calculation
def compute_aggregated_score(df, w_text, w_collab, w_content):
# Normalize weights (prevent divide-by-zero if one or more weights are 0)
total_weight = w_text + w_collab + w_content
if total_weight == 0:
raise ValueError("All weights are zero. At least one weight must be positive.")
w_text /= total_weight
w_collab /= total_weight
w_content /= total_weight
df['final_score'] = (
df.get('text_probability', 0) * w_text +
df.get('collaborative_similarity', 0) * w_collab +
df.get('content_probability', 0) * w_content
)
return df.sort_values(by='final_score', ascending=False).reset_index(drop=True)
final_df = compute_aggregated_score(merged, weight_text, weight_collab, weight_content)
if top_n:
return final_df.head(top_n)
else:
return final_df
""")
# Recommendation system
with gr.Column(elem_id="system", elem_classes='content-section', visible=False) as system_section:
# special for this section
gr.HTML('<h1 class="header-title">Game Recommendation System</h1>', elem_id='system')
with gr.Row():
with gr.Column(min_width=500, elem_classes='input-column'):
app_name = input_choice(
Label='Select games that you liked',
Choices=available_names,
Multiselect=True
)
year = input_number(
Label='Year Release',
Precision=0,
minimum=0
)
expected_playtime = input_number(
Label='Expected Playtime (Hours)',
Precision=2,
minimum=0
)
expected_score = input_number(
Label='Expected Score (% Positive)',
Precision=2,
minimum=0
)
dlc_count = input_number(
Label='DLC Count',
Precision=0,
minimum=0
)
description = input_paragaph_textbox('Description', 'Describe the game (max 1200 characters)...')
genre = input_choice(
Label="Select Your Genre (Multiple Choice)",
Choices=genres,
Multiselect=True
)
categories = input_choice(
Label="Select Your Categories (Multiple Choice)",
Choices=categories,
Multiselect=True
)
# single selection (multiselect=False)
price_range = input_choice(
Label="Select Your Price Range (Only Single Choice)",
Choices=price_ranges,
Multiselect=False
)
top_n= input_number(
Label='Output amount',
Precision=0,
minimum=0,
value=10
)
weight_text = input_number(
Label='Weight Text',
Precision=2,
minimum=0,
maximum=1,
value=0.5,
step=0.01
)
weight_collab = input_number(
Label='Weight Of Collaborative Model',
Precision=2,
minimum=0,
maximum=1,
value=0.5,
step=0.01
)
weight_content = input_number(
Label='Weight Of Content Based Model',
Precision=2,
minimum=0,
maximum=1,
value=0.5,
step=0.01
)
submit_btn = gr.Button("Get Recommendations", variant="primary", elem_id="submit-btn")
# Results column
with gr.Column(min_width=500, elem_classes='results-column'):
h2('Result')
with gr.Column(elem_id='Output'):
# Results column using the modular component
h2('Recommended Game')
recommended_game = gr.DataFrame()
# click button logic
submit_btn.click(
fn=recommend_game,
inputs=[description,app_name,price_range,year,expected_playtime,expected_score,dlc_count, genre, categories,top_n,weight_text,weight_collab,weight_content],
outputs=recommended_game
)
# Navigation logic
sections = {
"btn-home": home_section,
"btn-dataset": dataset_section,
"btn-eda": eda_section,
"btn-preprocess": preprocess_section,
"btn-training": training_section,
"btn-system": system_section
}
# Set click events for navigation buttons
for btn in nav_buttons:
btn.click(
set_active_section,
inputs=gr.State(btn.elem_id),
outputs=list(sections.values()) + nav_buttons
)
demo.launch()