import subprocess import sys subprocess.run([sys.executable, "-m", "pip", "install", "--upgrade", "scikit-learn==1.6.1"]) import gradio as gr import pandas as pd import os from component import * from GameRecommender import * import gc from sklearn.model_selection import train_test_split from huggingface_hub import snapshot_download from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler DATASETS = { "converted": "converted.csv", "Cleaned_games": "Cleaned_games.csv", "MergedFragmentData_SAMPLE": "MergedFragmentData_SAMPLE.csv", "Trimmed_Dataset": "Trimmed_Dataset.csv", "UserPreferenceDF": "UserPreferenceDF.csv", } def load_hf_csv_dataset(repo_name, filename): # Download the dataset repo snapshot locally, only for 'data' folder local_path = snapshot_download( repo_id=f"VJyzCELERY/{repo_name}", repo_type="dataset", allow_patterns=["data/*"], ) csv_path = os.path.join(local_path, "data", filename) print(f"Loading {csv_path} ...") return pd.read_csv(csv_path, index_col=False) DATA_BASE_PATH = 'data' MODEL_BASE_PATH = snapshot_download( repo_id="VJyzCELERY/SteamGameRecommender", repo_type="model", allow_patterns=["GameRecommender/*"] ) SEED = 42 RAW_GAMES_DATAPATH = os.path.join(DATA_BASE_PATH,'converted.csv') GAMES_DATAPATH = os.path.join(DATA_BASE_PATH,'Cleaned_games.csv') REVIEWS_DATAPATH = os.path.join(DATA_BASE_PATH,'MergedFragmentData_SAMPLE.csv') TRIMMED_REVIEW_DATAPATH = os.path.join(DATA_BASE_PATH,'Trimmed_Dataset.csv') USER_PREFERENCE_DATAPATH = os.path.join(DATA_BASE_PATH,'UserPreferenceDF.csv') MODEL_PATH = os.path.join(MODEL_BASE_PATH,'GameRecommender') from datasets import load_dataset RAW_GAMES_DS = load_dataset("VJyzCELERY/converted") GAMES_DS = load_dataset("VJyzCELERY/Cleaned_games") REVIEWS_DS = load_dataset("VJyzCELERY/MergedFragmentData_SAMPLE") TRIMMED_REVIEWS_DS = load_dataset("VJyzCELERY/Trimmed_Dataset") USER_PREF_DS = load_dataset("VJyzCELERY/UserPreferenceDF") # load dataset model = GameRecommendationEnsemble.load(MODEL_PATH) vectorizer=model.text_based_recommender.vectorizer review_app_id_encoder=model.text_based_recommender.app_id_encoder genres = model.game_content_recommeder.genre_encoder.classes_.tolist() genres = [genre for genre in genres if genre != 'Unknown'] categories = model.game_content_recommeder.category_encoder.classes_.tolist() categories = [cat for cat in categories if cat != 'Unknown'] price_ranges = model.game_content_recommeder.price_range_encoder.classes_.tolist() selectable_app_ids = list(model.collaborative_recommender.item_to_index.keys()) # df_games = pd.read_csv(GAMES_DATAPATH,index_col=False) # df_games_raw = pd.read_csv(RAW_GAMES_DATAPATH,index_col=False) # df_review_raw = pd.read_csv(REVIEWS_DATAPATH,index_col=False) # df_review_trimmed = pd.read_csv(TRIMMED_REVIEW_DATAPATH,index_col=False) # df_user_pref = pd.read_csv(USER_PREFERENCE_DATAPATH,index_col=False) df_games = GAMES_DS['train'].to_pandas() df_games['app_id'] = df_games['app_id'].astype(str) df_games_raw = RAW_GAMES_DS['train'].to_pandas() df_games_raw['AppID'] = df_games_raw['AppID'].astype(str) df_review_raw = REVIEWS_DS['train'].to_pandas() df_review_raw['steamid'] = df_review_raw['steamid'].astype(str) df_review_raw['appid'] = df_review_raw['appid'].astype(str) df_review_trimmed = TRIMMED_REVIEWS_DS['train'].to_pandas() df_user_pref = USER_PREF_DS['train'].to_pandas() available_names = df_games[df_games['app_id'].astype(str).isin(selectable_app_ids)]['Name'].tolist() min_word=20 df_review_trimmed_filtered = df_review_trimmed[df_review_trimmed['cleaned_review'].apply(lambda x: len(str(x).split()) >=min_word)].reset_index(drop=True) def extract_year(date_str): if isinstance(date_str, str) and len(date_str) >= 4: year_str = date_str[-4:] if year_str.isdigit(): return int(year_str) return None def col_to_list(df,col='Genres'): import ast df[col]=df[col].apply( lambda x: ast.literal_eval(x) if isinstance(x, str) else x ) df[col]=df[col].apply( lambda genres: [g.strip() for g in genres] if isinstance(genres, list) else ['Unknown'] ) return df def apply_price_range_labels(df,labels,bins, price_col='Price', range_col='Price_range'): df[range_col] = pd.cut(df[price_col], bins=bins, labels=labels, right=True) return df price_bins = [-0.01, 0, 5, 10, 20, 30, 40, 50, float('inf')] price_ranges_labels = [ "Free", "Less than $5", "$5 - $9.99", "$10 - $19.99", "$20 - $29.99", "$30 - $39.99", "$40 - $49.99", "$50+" ] df_games_temp = df_games df_games_temp = col_to_list(df_games_temp,'Genres') df_games_temp = col_to_list(df_games_temp,'Categories') df_games_temp = apply_price_range_labels(df_games_temp,price_ranges_labels,price_bins) df_games_temp['Year_Release'] = df_games_temp['Release date'].apply(extract_year) df_games_temp['Game score'] = np.where( (df_games_temp['Positive'] + df_games_temp['Negative']) == 0, 0, (df_games_temp['Positive'] / (df_games_temp['Positive'] + df_games_temp['Negative'])) * 100 ) genre_mlb = MultiLabelBinarizer() genre_mlb = genre_mlb.fit(df_games_temp['Genres']) categories_mlb = MultiLabelBinarizer() categories_mlb = categories_mlb.fit(df_games_temp['Categories']) price_range_le = model.game_content_recommeder.price_range_encoder scaler = MinMaxScaler() scaler = scaler.fit(df_games_temp[['Year_Release','Average playtime forever','Game score','DLC count']].values) numerical_col =['Year_Release','Average playtime forever','Game score','DLC count'] genre_matrix = genre_mlb.transform(df_games_temp['Genres']) genre_df = pd.DataFrame(genre_matrix, columns=genre_mlb.classes_, index=df_games_temp.index) categories_matrix = categories_mlb.transform(df_games_temp['Categories']) categories_df = pd.DataFrame(categories_matrix,columns=categories_mlb.classes_,index=df_games_temp.index) game_df = pd.concat([df_games_temp[['app_id','Price_range']+numerical_col],genre_df,categories_df],axis=1) game_df['Price_range'] = price_range_le.transform(game_df['Price_range']) game_df[numerical_col] = scaler.transform(game_df[numerical_col].values) del categories_matrix,genre_matrix,categories_df,genre_df,scaler,price_range_le,categories_mlb,genre_mlb gc.collect() def recommend_game(description=None, app_name=None, price_range=None, year_release=None, excpected_playtime=None, game_score=None, dlc_count=None, genres=None, categories=None, top_n=5,weight_text=1.0, weight_collab=1.0, weight_content=1.0): if app_name: if isinstance(app_name, (str)): app_name = [app_name] app_ids = df_games[df_games['Name'].isin(app_name)]['app_id'].astype(str).tolist() else: app_ids = None prediction = model.predict(description=description,app_ids=app_ids,price_range=price_range,year_release=year_release,average_playtime=excpected_playtime,game_score=game_score, dlc_count=dlc_count,genres=genres,categories=categories,top_n=top_n,weight_text=weight_text,weight_collab=weight_collab,weight_content=weight_content) app_ids = prediction['app_id'].tolist() output = df_games.loc[df_games['app_id'].astype(str).isin(app_ids)].reset_index() return gr.DataFrame(value=output) # Load external CSS file with open('style.css', 'r') as f: custom_css = f.read() # for nav def set_active_section(btn_id): """ button active function and handle visibility section """ # First set all sections to invisible updates = [gr.update(visible=False) for _ in sections] # Then set the selected section to visible if btn_id in sections: index = list(sections.keys()).index(btn_id) updates[index] = gr.update(visible=True) # Also update button active states button_states = [] for btn in nav_buttons: state = ("active" if btn.elem_id == btn_id else "") button_states.append(gr.update(elem_classes=f"nav-btn {state}")) return updates + button_states """ MAIN DEMO """ with gr.Blocks(css = custom_css) as demo: # container with gr.Row(elem_classes="container"): # navbar with gr.Sidebar(elem_classes="navbar"): # nav header with gr.Column(elem_classes="nav-header"): gr.Markdown("# Game Recommendation by Your Preference") # nav button container with gr.Column(elem_classes="nav-buttons"): # nav button list nav_buttons = [] sections = [ ('Home', 'home'), ("Dataset", "dataset"), ("Exploratory Data Analysis", "eda"), ("Preprocessing Data", "preprocess"), ("Training Result", "training"), ("Our System", "system") ] # create button for label, section_id in sections: button = gr.Button(label, elem_classes="nav-btn", elem_id=f"btn-{section_id}") nav_buttons.append(button) # main content with gr.Column(elem_classes="main-content"): # Home Section """ Introduction section. Using header, h2, p for text formating """ with gr.Column(elem_id="home", elem_classes="content-section", visible=True) as home_section: header('About This System') with gr.Column(elem_classes='content'): h2("Background and Problem") p(''' One of the problem when we are looking for something that we want usually we use an abstract description of what we wanted. This issue is also prevalent when it comes to finding games. When we ask our friend for a game we usually describe them then later on narrow them down by Genres if possible and Price. However, most system only supports the ability to search games by their category and tags such as genres or prices. With that, we wanted to try and make a game recommendation based on description where user can describe the game they are looking for with text and later narrow it down with classification based on their content like genres and price ranges. ''') h2("The Model") p("""The system consists of three model :The first one is the Language Model that will learn users review for a game and use that as a way to describe a game. The Language Model will be a classifier based on a Gradient Boosting model called XGBClassifier. The second model and third model will be the filter model. The second model is a collaborative filter model where it will recommend the user a game based on a game that they have liked in the past or a game that they specify similar to the game they are looking for. This model will learn based on other user who have reviewed a game and a similar game is the game that said user liked other than the input game. This model will use utility matrix and cosine similarity. The third model is a content based model where it will recommend user a game based on their content such as Genres, Categories, Price range, Year Release, etc. This third model will be a KNeighborsClassifier.""") with gr.Column(elem_id="dataset", elem_classes="content-section", visible=False) as dataset_section: """ Dataset Display section. use Dataset() will displaying dataframe. key attribute is optional """ header('DATASET') with gr.Column(elem_classes='datasets-container'): Dataset( df=df_games_raw.head(20), title="1. Games Dataset", source=GAMES_DATAPATH, key="game_data" ) h2(f'Shape : {df_games_raw.shape}') Dataset( df=df_review_raw.head(20), title="2. Steam Review Dataset", source=REVIEWS_DATAPATH, key="reviews" ) h2(f'Shape : {df_review_raw.shape}') # eda section with gr.Column(elem_id="eda", elem_classes="content-section", visible=False) as eda_section: header('EDA System') h2('1. Game Dataset') code_cell('df.head(5)') gr.Dataframe(df_games_raw.head(5)) p(f'Dataset shape : {df_games_raw.shape}') h2('2. Description of data') code_cell('df.describe()') gr.Dataframe(df_games_raw.describe().reset_index()) h2('3. Missing values') gr.Dataframe(show_missing_values(df_games_raw)) h2('4. Distribution of data') dropdown = gr.Dropdown(choices=list(df_games_raw.columns), label="Select Column for Distribution",value=list(df_games_raw.columns)[0] if len(df_games_raw.columns) > 0 else None,allow_custom_value=True) plot_output = gr.Plot(format='png') dropdown.change(plot_distribution, inputs=[gr.State(df_games_raw), dropdown], outputs=plot_output) h2('1. Review Dataset') code_cell('df.head(5)') gr.Dataframe(df_review_raw.head(5)) p(f'Dataset shape : {df_review_raw.shape}') h2('2. Description of data') code_cell('df.describe()') gr.Dataframe(df_review_raw.describe().reset_index()) h2('3. Missing values') gr.Dataframe(show_missing_values(df_review_raw)) h2('4. Distribution of data') dropdown = gr.Dropdown(choices=list(df_review_raw.columns), label="Select Column for Distribution",value=list(df_review_raw.columns)[0] if len(df_review_raw.columns) > 0 else None,allow_custom_value=True) plot_output = gr.Plot(format='png') dropdown.change(plot_distribution, inputs=[gr.State(df_review_raw), dropdown], outputs=plot_output) # preprocess section with gr.Column(elem_id="preprocess", elem_classes="content-section", visible=False) as preprocess_section: header('Preprocess System') h2("1. Review Dataset initial merging") code_cell(""" import pandas as pd import glob import os from langdetect import detect from joblib import Parallel, delayed from tqdm import tqdm folder_path = 'Fragmented_Dataset' csv_files = glob.glob(os.path.join(folder_path, '*.csv')) df_list = [pd.read_csv(file) for file in csv_files] df = pd.concat(df_list, ignore_index=True) min_word = 20 print(f'shape before filtering : {df.shape}') df = df[df['review'].apply(lambda x: len(str(x).split()) >=min_word)].reset_index(drop=True) print(f'shape after filtering : {df.shape}') def detect_lang(text): try: return detect(str(text)) except: return 'error' results = Parallel(n_jobs=6)( delayed(detect_lang)(text) for text in tqdm(df['review'], desc='Detecting Language') ) df['lang'] = results # Filter English reviews only df_english = df[df['lang'] == 'en'].drop(columns=['lang']) df_english.to_csv('english_reviews.csv', index=False) print("Finished filtering English reviews!") """) h2("Output : ") code_cell(""" >> shape before filtering : (15437471, 13) >> shape after filtering : (6531410, 13) >> Finished filtering English reviews! """) h2("2. Data Preprocessing") h2("2.1. Games Data Cleaning") code_cell(""" game_datapath = 'converted.csv' df_games_raw = pd.read_csv('converted.csv',index_col=False) df_games_raw.rename(columns={"AppID": "app_id"}, inplace=True) df_games_raw["Genres"] = df_games_raw["Genres"].apply(lambda x: x.split(",") if isinstance(x, str) else ['NONE']) df_games_raw["Tags"] = df_games_raw["Tags"].apply(lambda x: x.split(",") if isinstance(x, str) else ['NONE']) df_games_raw['Genres'] = df_games_raw['Genres']+df_games_raw['Tags'] def make_set(row): data = [d for d in row if d != 'NONE'] return set(data) df_games_raw['Genres'] = df_games_raw['Genres'].apply(make_set) genres_to_keep = [ 'Action', 'Adventure', 'RPG', 'Strategy', 'Simulation', 'Casual', 'Indie', 'Sports', 'Racing', 'Fighting', 'Puzzle', 'Shooter', 'Platformer', 'MMO', 'Horror', 'Survival', 'Open World', 'Visual Novel', 'Point & Click', 'Sandbox', 'Metroidvania', 'Tactical', 'Rhythm', 'Stealth', 'Rogue-like', 'Rogue-lite' ] df_games_raw['Genres'] = df_games_raw['Genres'].apply(lambda genre_list: [g for g in genre_list if g in genres_to_keep]) df_games_raw = df_games_raw[['app_id','Name','Release date','DLC count','Positive','Negative','Average playtime forever','Price','Developers','Publishers','Detailed description','About the game','Short description','Categories','Genres','Achievements','Windows','Mac','Linux']] df_games_raw["Categories"] = df_games_raw["Categories"].apply(lambda x: x.split(",") if isinstance(x, str) else ['Unknown']) df_games_raw['Detailed description'] = df_games_raw['Detailed description'].fillna('') df_games_raw['About the game'] = df_games_raw['About the game'].fillna('') df_games_raw['Short description'] = df_games_raw['About the game'].fillna('') df_games_raw['Developers'] = df_games_raw['Developers'].fillna('') df_games_raw['Publishers'] = df_games_raw['Publishers'].fillna('') df_games_raw.to_csv('Cleaned_games.csv',index=False) """) # h2('Games Data Cleaned') # gr.Dataframe(df_games.head(20)) h2('2.2. Review Preprocessing') # Dataset(df_review_raw,'Review Data Raw',REVIEWS_DATAPATH) code_cell(""" from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from nltk.tag import pos_tag import string from joblib import Parallel, delayed import multiprocessing from tqdm import tqdm import re import nltk nltk.download('punkt') nltk.download('averaged_perceptron_tagger_eng') nltk.download('wordnet') datapath = 'english_reviews.csv' df = pd.read_csv(datapath) stopword = stopwords.words('english') lemmatizer = WordNetLemmatizer() def convert_postag(postag:str): if postag.startswith('V'): return 'v' elif postag.startswith('R'): return 'r' elif postag.startswith('J'): return 'a' return 'n' def clean_space(text : str): if not isinstance(text, str): return '' # Replace newlines with space, collapse multiple spaces, strip cleaned = re.sub(r'\s+', ' ', text.replace('\\n', ' ')).strip() return cleaned def tokenize(text : str): text = text.lower() # lower sentencees text = clean_space(text) token = word_tokenize(text) # tokenize # remove stopword punctuation and numeric token = [word for word in token if word not in string.punctuation and word not in stopword and word.isalpha()] return token def lemmatizing(token : str): postag = pos_tag(token) lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag] return lemmatized def preprocess(text : str): token = tokenize(text) token = lemmatizing(token) return " ".join(token) num_cores = int(multiprocessing.cpu_count()*0.75) print("Cleaning Data . . .") df["cleaned_review"] = Parallel(n_jobs=num_cores)( delayed(preprocess)(text) for text in tqdm(df["review"], desc="Processing reviews") ) gc.collect() df = df[['steamid','app_id','voted_up','cleaned_review']] df.to_csv('Cleaned_Dataframe.csv',index=False) """) Dataset(df_review_trimmed,'Cleaned Review',source=TRIMMED_REVIEW_DATAPATH,key='trimmed_review') code_cell(""" min_word = 20 df = df[df['cleaned_review'].apply(lambda x: len(str(x).split()) >=min_word)].reset_index(drop=True) """) code_cell(f""" >>> shape before filtering : {df_review_trimmed.shape} >>> shape after filtering : {df_review_trimmed_filtered.shape} >>> number of unique app_ids : {len(set(df_review_trimmed_filtered['app_id']))} """) fig, ax = plt.subplots() df_review_trimmed_filtered['app_id'].value_counts().plot(kind='bar',ax=ax) ax.set_xlabel('app_id') ax.set_ylabel('Count') ax.set_title('Value Counts of app_id') gr.Plot(fig,format='png') class_counts = df_review_trimmed_filtered['app_id'].value_counts() gr.Dataframe(describe_value_counts(class_counts)) code_cell(""" min_row = 4500 max_row = 5000 def sample_group(g): if len(g) > max_row: return g.sample(n=max_row, random_state=SEED) else: return g # Filter categories with at least min_row rows filtered = df.groupby('app_id').filter(lambda x: len(x) >= min_row) # For each app_id, keep only max_row rows (if more, trim to max_row) df = filtered.groupby('app_id', group_keys=False).apply(sample_group).reset_index(drop=True)""") min_row = 4500 max_row = 5000 def sample_group(g): if len(g) > max_row: return g.sample(n=max_row, random_state=SEED) else: return g sampled=df_review_trimmed_filtered.groupby('app_id').filter(lambda x:len(x)>= min_row) sampled = sampled.groupby('app_id',group_keys=False).apply(sample_group).reset_index(drop=True) code_cell(f""" Num of class after sampling : {len(set(sampled['app_id']))} Shape of the sampled df : {sampled.shape} """) fig,ax = plt.subplots() sampled_class_dist = sampled['app_id'].value_counts() sampled_class_dist.plot(kind='bar',ax=ax) ax.set_xlabel('app_id') ax.set_ylabel('Count') ax.set_title('Value Counts of app_id') code_cell(""" df['app_id'].value_counts().plot(kind='bar') plt.xlabel('app_id') plt.ylabel('Count') plt.title('Value Counts of app_id') plt.show() df.to_csv('Cleaned_Trimmed_Dataset.csv',index=False)""") gr.Plot(fig,format='png') code_cell("""class_counts = df['app_id'].value_counts()""") gr.DataFrame(describe_value_counts(sampled_class_dist)) h2('Review Preprocessed!') h2('2.3. User Preference Data') Dataset(df_review_raw,'User Review Dataset',REVIEWS_DATAPATH) code_cell(""" df_review = df_review[['steamid','appid','voted_up']] df_review.to_csv('UserPreferenceDF.csv',index=False) """) Dataset(df_user_pref,'User Preference Dataset',USER_PREFERENCE_DATAPATH) p(f"Dataset Shape : {df_user_pref.shape}") df_liked=df_user_pref[df_user_pref['voted_up']==1] df_liked.rename(columns={'appid':'app_id'},inplace=True) df_liked['voted_up'] = df_liked['voted_up'].astype(int) df_liked['steamid'] = df_liked['steamid'].astype(str) df_liked['app_id'] = df_liked['app_id'].astype(str) df_liked = df_liked.drop_duplicates(subset=['steamid', 'app_id']) code_cell(""" df_liked=df_users[df_users['voted_up']==1] df_liked.rename(columns={'appid':'app_id'},inplace=True) df_liked['voted_up'] = df_liked['voted_up'].astype(int) df_liked['steamid'] = df_liked['steamid'].astype(str) df_liked['app_id'] = df_liked['app_id'].astype(str) df_liked = df_liked.drop_duplicates(subset=['steamid', 'app_id']) """) h2(f"Dataset Shape : {df_liked.shape}") code_cell(""" # Keep users who liked at least 5 games user_counts = df_liked['steamid'].value_counts() df_liked = df_liked[df_liked['steamid'].isin(user_counts[user_counts >= 5].index)] # Keep games liked by at least 10 users game_counts = df_liked['app_id'].value_counts() df_liked = df_liked[df_liked['app_id'].isin(game_counts[game_counts >= 10].index)] df_liked = df_liked.drop_duplicates(subset=['steamid', 'app_id']) """) p(f"Unique steamids: {df_liked['steamid'].nunique()}") p(f"Unique app_ids: {df_liked['app_id'].nunique()}") p(f"Total rows: {len(df_liked)}") h2("We're done here, next stop is Training!") # training section with gr.Column(elem_id="training", elem_classes="content-section", visible=False) as training_section: header('Training Result') h2("Language Model Training") h2('Dataset') gr.Dataframe(sampled.head(15)) code_cell(""" vectorizer = TfidfVectorizer(max_df=0.7,min_df=3,stop_words=None,ngram_range=(1,2)) review_app_id_encoder = LabelEncoder()""") train_df,df_temp = train_test_split(sampled,test_size=0.2,random_state=SEED,stratify=sampled['app_id']) test_df,val_df = train_test_split(df_temp,test_size=0.5,random_state=SEED,stratify=df_temp['app_id']) del df_temp gc.collect() code_cell(""" train_df,df_temp = train_test_split(sampled,test_size=0.2,random_state=SEED,stratify=sampled['app_id']) test_df,val_df = train_test_split(df_temp,test_size=0.5,random_state=SEED,stratify=df_temp['app_id']) """) p(f""" Training : {train_df.shape} Testing : {test_df.shape} Validation : {val_df.shape} """) del train_df,val_df gc.collect() code_cell(""" X_train = vectorizer.fit_transform(train_df['cleaned_review']) y_train = review_app_id_encoder.fit_transform(train_df['app_id']) X_test = vectorizer.transform(test_df['cleaned_review']) y_test = review_app_id_encoder.transform(test_df['app_id']) X_val = vectorizer.transform(val_df['cleaned_review']) y_val = review_app_id_encoder.transform(val_df['app_id'])""") p("""The shape of X_train : (656396, 1795889)}""") code_cell(""" classifier = XGBClassifier( objective='multi:softprob', max_depth=4, learning_rate=0.2, n_estimators=18, subsample=0.7, colsample_bytree=0.7, reg_alpha=1.0, reg_lambda=1.0, tree_method='hist', eval_metric=['mlogloss', 'merror'], early_stopping_rounds=10 )""") code_cell(""" classifier.fit( X_train,y_train, eval_set=[(X_train, y_train), (X_val, y_val)], verbose=True ) """) history = model.text_based_recommender.history n_estimator = np.arange(len(history['validation_0']['merror'])) h2('Training vs Validation log loss') results = { "merror": history['validation_0']['merror'], "mlogloss": history['validation_0']['mlogloss'] } plot_output = gr.Plot(format='png') btn = gr.Button("Generate Plot") btn.click(fn=lambda:plot_training_results(n_estimator,history['validation_0']['mlogloss'],history['validation_1']['mlogloss'],'Training Log Loss','Validation Log Loss','Log Loss','N Estimator'), inputs=[], outputs=plot_output, preprocess=False) h2('Training vs Validation error') plot_outputval = gr.Plot(format='png') btnval = gr.Button("Generate Plot") btnval.click(fn=lambda:plot_training_results(n_estimator,history['validation_0']['merror'],history['validation_1']['merror'],'Training error','Validation error','merror','N Estimator'), inputs=[], outputs=plot_outputval, preprocess=False) y_pred = model.text_based_recommender.classifier.predict(vectorizer.transform(test_df['cleaned_review'])) y_test = model.text_based_recommender.app_id_encoder.transform(test_df['app_id']) class_report = classification_report(y_test,y_pred) h2("Classification Report") code_cell(f""" {class_report} """) h2("Language Model Class") code_cell(""" import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from nltk.tag import pos_tag import string import re import os nltk.download('punkt') nltk.download('averaged_perceptron_tagger_eng') nltk.download('wordnet') class TextBasedRecommendation(): def __init__(self,classifier,vectorizer,app_id_encoder,history): self.classifier : XGBClassifier = classifier self.vectorizer : TfidfVectorizer = vectorizer self.app_id_encoder : LabelEncoder = app_id_encoder self.history = history def updateModel(self): self.classifier.save_model('xgb_model.json') self.classifier.load_model('xgb_model.json') def save(self, path_prefix: str): self.classifier.save_model(f"{path_prefix}_xgb.json") classifier_backup = self.classifier self.classifier = None joblib.dump(self, f"{path_prefix}_preprocessor.joblib") self.classifier = classifier_backup @staticmethod def load(path_prefix: str): obj = joblib.load(f"{path_prefix}_preprocessor.joblib") xgb = XGBClassifier() xgb.load_model(f"{path_prefix}_xgb.json") obj.classifier = xgb return obj def preprocess(self,text : str): stopword = stopwords.words('english') lemmatizer = WordNetLemmatizer() def convert_postag(postag:str): if postag.startswith('V'): return 'v' elif postag.startswith('R'): return 'r' elif postag.startswith('J'): return 'a' return 'n' def clean_space(text : str): if not isinstance(text, str): return '' cleaned = re.sub(r'\s+', ' ', text.replace('\\n', ' ')).strip() return cleaned def tokenize(text : str): text = text.lower() text = clean_space(text) token = word_tokenize(text) token = [word for word in token if word not in string.punctuation and word not in stopword and word.isalpha()] return token # lemmatize def lemmatizing(token : str): postag = pos_tag(token) lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag] return lemmatized token = tokenize(text) token = lemmatizing(token) return " ".join(token) def get_accuracy(self,X_test,y_test): y_pred = self.classifier.predict(self.vectorizer.transform(X_test)) y_test = self.app_id_encoder.transform(y_test) print(classification_report(y_test,y_pred)) def predict(self,text,top_n=None): cleaned_text = self.preprocess(text) vectorized_text = self.vectorizer.transform([cleaned_text]) proba = self.classifier.predict_proba(vectorized_text)[0] class_indices = np.argsort(proba)[::-1] if top_n is not None: class_indices = class_indices[:top_n] class_labels = self.app_id_encoder.inverse_transform(class_indices) class_probs = proba[class_indices] return pd.DataFrame({ 'app_id': class_labels, 'text_probability': class_probs })""") h2("Collaborative Filter Training") h2("Dataset of User Preference") gr.DataFrame(df_liked.head(10)) p(f"Unique steamids: {df_liked['steamid'].nunique()}") p(f"Unique app_ids: {df_liked['app_id'].nunique()}") p(f"Total rows: {len(df_liked)}") p(f"Unique (steamid, app_id) pairs: {df_liked.drop_duplicates(subset=['steamid', 'app_id']).shape[0]}") top_n=3001 # Top n users with most reviews top_users = df_liked['steamid'].value_counts().head(top_n).index # Top n games with most reviews top_games = df_liked['app_id'].value_counts().head(top_n).index df_liked = df_liked[df_liked['steamid'].isin(top_users) & df_liked['app_id'].isin(top_games)] user_item_matrix = df_liked.pivot_table( index='steamid', columns='app_id', values='voted_up', aggfunc='max', fill_value=0 ) user_item_matrix = user_item_matrix.reset_index().head(10) code_cell(""" top_n=3001 # Top n users with most reviews top_users = df_liked['steamid'].value_counts().head(top_n).index # Top n games with most reviews top_games = df_liked['app_id'].value_counts().head(top_n).index df_liked = df_liked[df_liked['steamid'].isin(top_users) & df_liked['app_id'].isin(top_games)] user_item_matrix = df_liked.pivot_table( index='steamid', columns='app_id', values='voted_up', aggfunc='max', fill_value=0 ) """) gr.Dataframe(user_item_matrix) del user_item_matrix gc.collect() code_cell(""" from sklearn.decomposition import TruncatedSVD X = user_item_matrix.T n_components = 100 svd = TruncatedSVD(n_components=n_components, random_state=42) item_embeddings = svd.fit_transform(X) item_list = list(user_item_matrix.columns) unique_items =df_liked['app_id'].unique() item_to_index = {item: idx for idx, item in enumerate(unique_items)} """) h2("Model") code_cell(""" import numpy as np import joblib class CollaborativeRecommender: def __init__(self, svd_matrix, item_to_index, index_to_item): \""" svd_matrix: 2D numpy array (items x latent features) item_to_index: dict mapping app_id to row index in svd_matrix index_to_item: dict mapping row index to app_id \""" self.svd_matrix : TruncatedSVD = svd_matrix self.item_to_index = item_to_index self.index_to_item = index_to_item def save(self, path: str): \"""Save the entire model as a single file using joblib.\""" joblib.dump(self, path) @staticmethod def load(path: str): \"""Load the entire model from a joblib file.\""" return joblib.load(path) def _get_item_vector(self, app_id): idx = self.item_to_index.get(app_id) if idx is None: raise ValueError(f"app_id {app_id} not found in the model.") return self.svd_matrix[idx] def _cosine_similarity(self, vec, matrix): # Cosine similarity between vec and all rows in matrix vec_norm = np.linalg.norm(vec) matrix_norms = np.linalg.norm(matrix, axis=1) similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10) return similarity def get_similarities(self, app_ids,top_n=None): \""" Input: app_ids - single app_id or list of app_ids Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending \""" if isinstance(app_ids, (str, int)): app_ids = [app_ids] elif not isinstance(app_ids, (list, tuple, np.ndarray)): raise TypeError("app_ids must be a string/int or a list of such") valid_vectors = [] missing_ids = [] for app_id in app_ids: try: vec = self._get_item_vector(app_id) valid_vectors.append(vec) except ValueError: missing_ids.append(app_id) if len(valid_vectors) == 0: raise ValueError("None of the input app_ids were found in the model.") # Aggregate vectors by averaging if multiple inputs aggregated_vec = np.mean(valid_vectors, axis=0) # Compute similarity with all items similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix) # Build DataFrame of results result_df = pd.DataFrame({ 'app_id': [self.index_to_item[i] for i in range(len(similarities))], 'collaborative_similarity': similarities }) # Exclude the input app_ids themselves from results result_df = result_df[~result_df['app_id'].isin(app_ids)] # Sort descending by similarity result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True) # If any input app_ids were missing, notify user (optional) if missing_ids: print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}") if top_n: return result_df.head(top_n) else: return result_df""") h2("Content Based Model") code_cell(""" def col_to_list(df,col='Genres'): import ast df[col]=df[col].apply( lambda x: ast.literal_eval(x) if isinstance(x, str) else x ) df[col]=df[col].apply( lambda genres: [g.strip() for g in genres] if isinstance(genres, list) else ['Unknown'] ) return df def apply_price_range_labels(df,labels,bins, price_col='Price', range_col='Price_range'): df[range_col] = pd.cut(df[price_col], bins=bins, labels=labels, right=True) return df price_bins = [-0.01, 0, 5, 10, 20, 30, 40, 50, float('inf')] price_labels = [ "Free", "Less than $5", "$5 - $9.99", "$10 - $19.99", "$20 - $29.99", "$30 - $39.99", "$40 - $49.99", "$50+" ] df = pd.read_csv("Cleaned_games.csv",index_col=False) df = col_to_list(df,'Genres') df = col_to_list(df,'Categories') df = apply_price_range_labels(df,price_labels,price_bins) """) # Dataset(df_games,"The game dataset",GAMES_DATAPATH) code_cell(""" def extract_year(date_str): if isinstance(date_str, str) and len(date_str) >= 4: year_str = date_str[-4:] if year_str.isdigit(): return int(year_str) return None df['Year_Release'] = df['Release date'].apply(extract_year) df['Game score'] = np.where( (df['Positive'] + df['Negative']) == 0, 0, (df['Positive'] / (df['Positive'] + df['Negative'])) * 100 )""") code_cell(""" from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler genre_mlb = MultiLabelBinarizer() genre_mlb = genre_mlb.fit(df['Genres']) categories_mlb = MultiLabelBinarizer() categories_mlb = categories_mlb.fit(df['Categories']) price_range_le = LabelEncoder() price_range_le = price_range_le.fit(price_labels) scaler = MinMaxScaler() scaler = scaler.fit(df[['Year_Release','Average playtime forever','Game score','DLC count']].values) app_id_le = LabelEncoder() app_id_le = app_id_le.fit(df['app_id']) numerical_col =['Year_Release','Average playtime forever','Game score','DLC count']""") code_cell(""" genre_matrix = genre_mlb.transform(df['Genres']) genre_df = pd.DataFrame(genre_matrix, columns=genre_mlb.classes_, index=df.index) categories_matrix = categories_mlb.transform(df['Categories']) categories_df = pd.DataFrame(categories_matrix,columns=categories_mlb.classes_,index=df.index) game_df = pd.concat([df[['app_id','Price_range']+numerical_col],genre_df,categories_df],axis=1)""") gr.Dataframe(df_games_temp.head(10)) del df_games_temp gc.collect() code_cell(""" from sklearn.neighbors import KNeighborsClassifier X = game_df.loc[:,['Year_Release','Average playtime forever','Game score','DLC count','Price_range']+ list(genre_mlb.classes_) + list(categories_mlb.classes_)] y = app_id_le.transform(game_df['app_id']) model = KNeighborsClassifier(n_neighbors=len(y), metric='cosine') model.fit(X.values,y) """) h2("Content Based Recommender Class") code_cell(""" class GameContentRecommender: def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder): self.model : KNeighborsClassifier = model self.genre_encoder : MultiLabelBinarizer = genre_encoder self.category_encoder : MultiLabelBinarizer = category_encoder self.price_range_encoder : LabelEncoder = price_range_encoder self.scaler : MinMaxScaler = scaler self.app_id_encoder : LabelEncoder = app_id_encoder def save(self, path: str): \"""Save the entire model as a single file using joblib.\""" joblib.dump(self, path) @staticmethod def load(path: str): \"""Load the entire model from a joblib file.\""" return joblib.load(path) def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None): # Create one-hot encoded genre and category dicts genre_dict = {g: 0 for g in self.genre_encoder.classes_} categories_dict = {c: 0 for c in self.category_encoder.classes_} for genre in genres: if genre != 'Unknown' and genre in genre_dict: genre_dict[genre] = 1 for category in categories: if category != 'Unknown' and category in categories_dict: categories_dict[category] = 1 # Encode and normalize numeric features price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1)) scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0] user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values()) # Prepare DataFrame for KNN user_df = pd.DataFrame([user_vector]) # Get KNN results distances, indices = self.model.kneighbors(user_df) distances = distances.flatten() indices = indices.flatten() # Convert distances to similarity scores similarity = 1 / (1 + distances) # Decode app_ids app_ids = self.app_id_encoder.inverse_transform(indices) prediction = pd.DataFrame({ 'app_id': app_ids, 'content_probability': similarity }) if top_n: prediction = prediction.head(top_n) return prediction """) h2("After finishing with individual model we finally ensemble them together") code_cell(""" import numpy as np import pandas as pd from sklearn.neighbors import KNeighborsClassifier from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler from sklearn.feature_extraction.text import TfidfVectorizer import joblib from sklearn.decomposition import TruncatedSVD from sklearn.metrics import classification_report from xgboost import XGBClassifier import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from nltk.tag import pos_tag import string import re import os nltk.download('punkt') nltk.download('averaged_perceptron_tagger_eng') nltk.download('wordnet') class CollaborativeRecommender: def __init__(self, svd_matrix, item_to_index, index_to_item): \""" svd_matrix: 2D numpy array (items x latent features) item_to_index: dict mapping app_id to row index in svd_matrix index_to_item: dict mapping row index to app_id \""" self.svd_matrix : TruncatedSVD = svd_matrix self.item_to_index = item_to_index self.index_to_item = index_to_item def save(self, path: str): \"""Save the entire model as a single file using joblib.\""" joblib.dump(self, path) @staticmethod def load(path: str): \"""Load the entire model from a joblib file.\""" return joblib.load(path) def _get_item_vector(self, app_id): idx = self.item_to_index.get(app_id) if idx is None: raise ValueError(f"app_id {app_id} not found in the model.") return self.svd_matrix[idx] def _cosine_similarity(self, vec, matrix): # Cosine similarity between vec and all rows in matrix vec_norm = np.linalg.norm(vec) matrix_norms = np.linalg.norm(matrix, axis=1) similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10) return similarity def get_similarities(self, app_ids,top_n=None): \""" Input: app_ids - single app_id or list of app_ids Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending \""" if isinstance(app_ids, (str, int)): app_ids = [app_ids] elif not isinstance(app_ids, (list, tuple, np.ndarray)): raise TypeError("app_ids must be a string/int or a list of such") valid_vectors = [] missing_ids = [] for app_id in app_ids: try: vec = self._get_item_vector(app_id) valid_vectors.append(vec) except ValueError: missing_ids.append(app_id) if len(valid_vectors) == 0: raise ValueError("None of the input app_ids were found in the model.") # Aggregate vectors by averaging if multiple inputs aggregated_vec = np.mean(valid_vectors, axis=0) # Compute similarity with all items similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix) # Build DataFrame of results result_df = pd.DataFrame({ 'app_id': [self.index_to_item[i] for i in range(len(similarities))], 'collaborative_similarity': similarities }) # Exclude the input app_ids themselves from results result_df = result_df[~result_df['app_id'].isin(app_ids)] # Sort descending by similarity result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True) # If any input app_ids were missing, notify user (optional) if missing_ids: print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}") if top_n: return result_df.head(top_n) else: return result_df class GameContentRecommender: def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder): self.model : KNeighborsClassifier = model self.genre_encoder : MultiLabelBinarizer = genre_encoder self.category_encoder : MultiLabelBinarizer = category_encoder self.price_range_encoder : LabelEncoder = price_range_encoder self.scaler : MinMaxScaler = scaler self.app_id_encoder : LabelEncoder = app_id_encoder def save(self, path: str): \"""Save the entire model as a single file using joblib.\""" joblib.dump(self, path) @staticmethod def load(path: str): \"""Load the entire model from a joblib file.\""" return joblib.load(path) def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None): genre_dict = {g: 0 for g in self.genre_encoder.classes_} categories_dict = {c: 0 for c in self.category_encoder.classes_} for genre in genres: if genre != 'Unknown' and genre in genre_dict: genre_dict[genre] = 1 for category in categories: if category != 'Unknown' and category in categories_dict: categories_dict[category] = 1 price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1)) scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0] user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values()) user_df = pd.DataFrame([user_vector]) distances, indices = self.model.kneighbors(user_df) distances = distances.flatten() indices = indices.flatten() similarity = 1 / (1 + distances) app_ids = self.app_id_encoder.inverse_transform(indices) prediction = pd.DataFrame({ 'app_id': app_ids, 'content_probability': similarity }) if top_n: prediction = prediction.head(top_n) return prediction class TextBasedRecommendation(): def __init__(self,classifier,vectorizer,app_id_encoder,history): self.classifier : XGBClassifier = classifier self.vectorizer : TfidfVectorizer = vectorizer self.app_id_encoder : LabelEncoder = app_id_encoder self.history = history def save(self, path_prefix: str): self.classifier.save_model(f"{path_prefix}_xgb.json") classifier_backup = self.classifier self.classifier = None joblib.dump(self, f"{path_prefix}_preprocessor.joblib") self.classifier = classifier_backup @staticmethod def load(path_prefix: str): obj = joblib.load(f"{path_prefix}_preprocessor.joblib") xgb = XGBClassifier() xgb.load_model(f"{path_prefix}_xgb.json") obj.classifier = xgb return obj def preprocess(self,text : str): stopword = stopwords.words('english') lemmatizer = WordNetLemmatizer() def convert_postag(postag:str): if postag.startswith('V'): return 'v' elif postag.startswith('R'): return 'r' elif postag.startswith('J'): return 'a' return 'n' def clean_space(text : str): if not isinstance(text, str): return '' cleaned = re.sub(r'\s+', ' ', text.replace('\\n', ' ')).strip() return cleaned def tokenize(text : str): text = text.lower() text = clean_space(text) token = word_tokenize(text) token = [word for word in token if word not in string.punctuation and word not in stopword and word.isalpha()] return token # lemmatize def lemmatizing(token : str): postag = pos_tag(token) lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag] return lemmatized token = tokenize(text) token = lemmatizing(token) return " ".join(token) def get_accuracy(self,X_test,y_test): y_pred = self.classifier.predict(self.vectorizer.transform(X_test)) y_test = self.app_id_encoder.transform(y_test) print(classification_report(y_test,y_pred)) def predict(self,text,top_n=None): cleaned_text = self.preprocess(text) vectorized_text = self.vectorizer.transform([cleaned_text]) proba = self.classifier.predict_proba(vectorized_text)[0] class_indices = np.argsort(proba)[::-1] if top_n is not None: class_indices = class_indices[:top_n] class_labels = self.app_id_encoder.inverse_transform(class_indices) class_probs = proba[class_indices] return pd.DataFrame({ 'app_id': class_labels, 'text_probability': class_probs }) class GameRecommendationEnsemble: def __init__(self,game_content_recommeder,collaborative_recommender,text_based_recommender): self.game_content_recommeder : GameContentRecommender=game_content_recommeder self.collaborative_recommender : CollaborativeRecommender=collaborative_recommender self.text_based_recommender : TextBasedRecommendation = text_based_recommender def save(self, dir_path: str): os.makedirs(dir_path, exist_ok=True) self.game_content_recommeder.save(os.path.join(dir_path, "game_content_recommender.joblib")) self.collaborative_recommender.save(os.path.join(dir_path, "collaborative_recommender.joblib")) self.text_based_recommender.save(os.path.join(dir_path, "text_based_recommender")) @staticmethod def load(dir_path: str): game_content_recommender = GameContentRecommender.load(os.path.join(dir_path, "game_content_recommender.joblib")) collaborative_recommender = CollaborativeRecommender.load(os.path.join(dir_path, "collaborative_recommender.joblib")) text_based_recommender = TextBasedRecommendation.load(os.path.join(dir_path, "text_based_recommender")) return GameRecommendationEnsemble( game_content_recommender, collaborative_recommender, text_based_recommender ) def scale_proba(self,series): if len(series)<=1: return pd.Series([1.0] * len(series), index=series.index) scaler = MinMaxScaler() scaled = scaler.fit_transform(series.values.reshape(-1, 1)).flatten() return pd.Series(scaled, index=series.index) def predict(self, description=None, app_ids=None, price_range=None, year_release=None, average_playtime=None, game_score=None, dlc_count=None, genres=None, categories=None, top_n=None, weight_text=1.0, weight_collab=1.0, weight_content=1.0): merge_dfs = [] if description is not None: text_proba = self.text_based_recommender.predict(description) text_proba['app_id'] = text_proba['app_id'].astype(str) text_proba['text_probability'] = self.scale_proba(text_proba['text_probability']) merge_dfs.append(text_proba) else: weight_text=0 # Collaborative similarity (only if app_ids is provided) if app_ids is not None: similar_app = self.collaborative_recommender.get_similarities(app_ids) similar_app['app_id'] = similar_app['app_id'].astype(str) similar_app['collaborative_similarity'] = self.scale_proba(similar_app['collaborative_similarity']) merge_dfs.append(similar_app) else: weight_collab = 0 # No weight if not used if None in (price_range, year_release,average_playtime,game_score,dlc_count, genres, categories): weight_content=0 else: similar_content = self.game_content_recommeder.predict(price_range, year_release,average_playtime,game_score,dlc_count, genres, categories) similar_content['app_id'] = similar_content['app_id'].astype(str) similar_content['content_probability'] = self.scale_proba(similar_content['content_probability']) merge_dfs.append(similar_content) if not merge_dfs: return None from functools import reduce merged = reduce(lambda left, right: pd.merge(left, right, on='app_id', how='outer'), merge_dfs) # Fill missing values merged = merged.fillna(0) # Final score calculation def compute_aggregated_score(df, w_text, w_collab, w_content): # Normalize weights (prevent divide-by-zero if one or more weights are 0) total_weight = w_text + w_collab + w_content if total_weight == 0: raise ValueError("All weights are zero. At least one weight must be positive.") w_text /= total_weight w_collab /= total_weight w_content /= total_weight df['final_score'] = ( df.get('text_probability', 0) * w_text + df.get('collaborative_similarity', 0) * w_collab + df.get('content_probability', 0) * w_content ) return df.sort_values(by='final_score', ascending=False).reset_index(drop=True) final_df = compute_aggregated_score(merged, weight_text, weight_collab, weight_content) if top_n: return final_df.head(top_n) else: return final_df """) # Recommendation system with gr.Column(elem_id="system", elem_classes='content-section', visible=False) as system_section: # special for this section gr.HTML('