Spaces:
Sleeping
Sleeping
| from gensim.models import KeyedVectors | |
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import nltk | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from nltk.corpus import stopwords | |
| import ollama | |
| from course_project import data_persistence | |
| DATA_FOLDER_LOCATION = "data/" | |
| WORD2VEC_FILE_LOCATION = DATA_FOLDER_LOCATION + "glove.6B.100d.txt" | |
| WORD2VEC_MODEL_FILE_LOCATION = DATA_FOLDER_LOCATION + "glove_model" | |
| MOVIES_BK_FILE_LOCATION = "course_project/database/provisional_database_simulation.pkl" | |
| MOVIES_BK_FILE_TRANSFORMED_LOCATION = "course_project/database/provisional_database_transformed.pkl" | |
| COSINE_SIMILARITY = 0 | |
| EUCLIDEAN_SIMILARITY = 1 | |
| # ollama pull gemma:7b | |
| def initialize_word2vec_model(): | |
| #glove_input_file = glove_filename | |
| model = None | |
| if os.path.exists(WORD2VEC_MODEL_FILE_LOCATION): | |
| print("Word2Vec Model successfully loaded.") | |
| model = KeyedVectors.load(WORD2VEC_MODEL_FILE_LOCATION) | |
| else: | |
| try: | |
| print("Generating Word2Vec Model") | |
| model = KeyedVectors.load_word2vec_format(WORD2VEC_FILE_LOCATION, binary=False, no_header=True) | |
| model.save(WORD2VEC_MODEL_FILE_LOCATION) | |
| print("Word2Vec Model successfully saved.") | |
| except: | |
| print('Could not generate Word2Vec Model') | |
| return model | |
| def sentence_mean_vector(sentence, model): | |
| nltk.download('stopwords') | |
| words = sentence.lower().split() | |
| vectors = [] | |
| stop_words = set(stopwords.words('english')) | |
| for word in words: | |
| if word not in stop_words: # Filtering stopwords | |
| try: | |
| vectors.append(model[word]) | |
| except KeyError: | |
| # We omit the word if it is not in the model | |
| continue | |
| # Zeros vector is returned if there is no valid value | |
| if len(vectors) == 0: | |
| return np.zeros(model.vector_size) | |
| # Calculating mean value of resulted vectors | |
| mean_vector = np.mean(vectors, axis=0) | |
| return mean_vector | |
| def encode_text_series_mean_method(text_series, file_backup_location=None): | |
| model = initialize_word2vec_model() | |
| result = text_series.apply(lambda text: sentence_mean_vector(text, model)) | |
| if file_backup_location!=None: | |
| data_persistence.save_pickle(result, file_backup_location) | |
| return result | |
| def ollama_text_embedding(sentence): | |
| embedding = ollama.embeddings( | |
| prompt=sentence, | |
| model="gemma:7b" | |
| ) | |
| return np.array(embedding['embedding']) | |
| def encode_text_series_ollama_method(original_series, file_backup_location=None, batch_size=None): | |
| if batch_size==None: | |
| batch_size = 500 | |
| if file_backup_location!=None and os.path.exists(file_backup_location): | |
| embeddings = data_persistence.read_pickle(file_backup_location) | |
| count = embeddings.last_valid_index() + 1 | |
| else: | |
| embeddings = pd.Series() | |
| count = 0 | |
| while count < len(original_series): | |
| embeddings[count] = ollama_text_embedding(original_series[count]) | |
| print(f"Embedding process: {count+1}/{len(original_series)}. {(count+1)*100/len(original_series):.2f}% completed") | |
| if count % batch_size == 0 and count!=0: | |
| data_persistence.save_pickle(embeddings, file_backup_location) | |
| print(f"Batch completed. {count+1}/{len(original_series)} embedding records saved.") | |
| count += 1 | |
| if file_backup_location!=None: | |
| data_persistence.save_pickle(embeddings, file_backup_location) | |
| return embeddings | |
| def find_most_similar_coincidences_indexes(n_results, objective_vector, movies_embeddings): | |
| cosine_sim_scores = cosine_similarity([objective_vector.tolist()], movies_embeddings.tolist())[0].tolist() | |
| most_similar_indexes = np.argsort(cosine_sim_scores)[-n_results:][::-1] | |
| return most_similar_indexes.tolist() | |
| def get_winning_choice_per_person(movie_tested, suggestions, similarity=COSINE_SIMILARITY): | |
| if similarity==COSINE_SIMILARITY: | |
| scores = np.array(cosine_similarity([movie_tested.tolist()], suggestions.tolist())[0].tolist()) | |
| scores[scores >= 0.99999] = 0 | |
| winning_suggestion_index = np.argmax(scores) | |
| winning_score = scores[winning_suggestion_index] | |
| return winning_suggestion_index, winning_score | |
| elif similarity==EUCLIDEAN_SIMILARITY: | |
| distances = np.linalg.norm(np.array(suggestions.tolist()) - movie_tested, axis=1) | |
| scores = np.exp(-0.005*distances) | |
| scores[scores >= 0.99999] = 0 | |
| winning_suggestion_index = np.argmax(scores) | |
| winning_score = scores[winning_suggestion_index] | |
| return winning_suggestion_index, winning_score | |
| else: | |
| raise ValueError('Wrong similarity value') | |
| def normalize_to_minus_one_and_one(arr): | |
| min_val = np.min(arr) | |
| max_val = np.max(arr) | |
| # Evitamos la división por cero en caso de que todos los valores sean iguales | |
| if min_val == max_val: | |
| return np.zeros_like(arr) | |
| normalized_arr = 2 * (arr - min_val) / (max_val - min_val) - 1 | |
| return normalized_arr | |
| def linearly_transform_embeddings(): | |
| movie_data = data_persistence.read_pickle(MOVIES_BK_FILE_LOCATION) | |
| embeddings = movie_data["vector"] | |
| embeddings = np.array(embeddings) | |
| for embedding in embeddings: | |
| # print(embedding) | |
| embedding = normalize_to_minus_one_and_one(np.array(embedding)) | |
| embedding = pd.Series(embedding) | |
| embeddings = pd.Series(embeddings) | |
| movie_data["vector"] = embeddings | |
| data_persistence.save_pickle(movie_data, MOVIES_BK_FILE_TRANSFORMED_LOCATION) | |
| def find_recommendations(suggestions_dict, movies_embeddings, n_results=10, rating_weight=0, similarity=COSINE_SIMILARITY): | |
| scores = [] | |
| suggestions_considered = [] | |
| for movie_tested in movies_embeddings: | |
| movie_tested_score = 0 | |
| movie_tested_suggestions_considered = {} | |
| for person in suggestions_dict: | |
| winning_suggestion_index, winning_score = get_winning_choice_per_person(movie_tested, movies_embeddings.loc[suggestions_dict[person]], similarity=similarity) | |
| movie_tested_score += winning_score | |
| movie_tested_suggestions_considered[person] = suggestions_dict[person][winning_suggestion_index] | |
| movie_tested_score /= len(suggestions_dict) | |
| movie_tested_score += rating_weight * movie_tested_score | |
| scores.append(movie_tested_score) | |
| suggestions_considered.append(movie_tested_suggestions_considered) | |
| recommended_movies_indexes = np.argsort(scores)[-n_results:][::-1] | |
| recommendations = [ | |
| {"movie_index": idx, "suggestions_considered": suggestions_considered[idx]} | |
| for idx in recommended_movies_indexes | |
| ] | |
| return recommendations | |