Spaces:
Sleeping
Sleeping
File size: 6,711 Bytes
fd31c97 aae5cfe fd31c97 73969b9 fd31c97 73969b9 fd31c97 73969b9 fd31c97 c3e37b9 fd31c97 73969b9 fd31c97 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | from gensim.models import KeyedVectors
import os
import pandas as pd
import numpy as np
import nltk
from sklearn.metrics.pairwise import cosine_similarity
from nltk.corpus import stopwords
import ollama
from course_project import data_persistence
DATA_FOLDER_LOCATION = "data/"
WORD2VEC_FILE_LOCATION = DATA_FOLDER_LOCATION + "glove.6B.100d.txt"
WORD2VEC_MODEL_FILE_LOCATION = DATA_FOLDER_LOCATION + "glove_model"
MOVIES_BK_FILE_LOCATION = "course_project/database/provisional_database_simulation.pkl"
MOVIES_BK_FILE_TRANSFORMED_LOCATION = "course_project/database/provisional_database_transformed.pkl"
COSINE_SIMILARITY = 0
EUCLIDEAN_SIMILARITY = 1
# ollama pull gemma:7b
def initialize_word2vec_model():
#glove_input_file = glove_filename
model = None
if os.path.exists(WORD2VEC_MODEL_FILE_LOCATION):
print("Word2Vec Model successfully loaded.")
model = KeyedVectors.load(WORD2VEC_MODEL_FILE_LOCATION)
else:
try:
print("Generating Word2Vec Model")
model = KeyedVectors.load_word2vec_format(WORD2VEC_FILE_LOCATION, binary=False, no_header=True)
model.save(WORD2VEC_MODEL_FILE_LOCATION)
print("Word2Vec Model successfully saved.")
except:
print('Could not generate Word2Vec Model')
return model
def sentence_mean_vector(sentence, model):
nltk.download('stopwords')
words = sentence.lower().split()
vectors = []
stop_words = set(stopwords.words('english'))
for word in words:
if word not in stop_words: # Filtering stopwords
try:
vectors.append(model[word])
except KeyError:
# We omit the word if it is not in the model
continue
# Zeros vector is returned if there is no valid value
if len(vectors) == 0:
return np.zeros(model.vector_size)
# Calculating mean value of resulted vectors
mean_vector = np.mean(vectors, axis=0)
return mean_vector
def encode_text_series_mean_method(text_series, file_backup_location=None):
model = initialize_word2vec_model()
result = text_series.apply(lambda text: sentence_mean_vector(text, model))
if file_backup_location!=None:
data_persistence.save_pickle(result, file_backup_location)
return result
def ollama_text_embedding(sentence):
embedding = ollama.embeddings(
prompt=sentence,
model="gemma:7b"
)
return np.array(embedding['embedding'])
def encode_text_series_ollama_method(original_series, file_backup_location=None, batch_size=None):
if batch_size==None:
batch_size = 500
if file_backup_location!=None and os.path.exists(file_backup_location):
embeddings = data_persistence.read_pickle(file_backup_location)
count = embeddings.last_valid_index() + 1
else:
embeddings = pd.Series()
count = 0
while count < len(original_series):
embeddings[count] = ollama_text_embedding(original_series[count])
print(f"Embedding process: {count+1}/{len(original_series)}. {(count+1)*100/len(original_series):.2f}% completed")
if count % batch_size == 0 and count!=0:
data_persistence.save_pickle(embeddings, file_backup_location)
print(f"Batch completed. {count+1}/{len(original_series)} embedding records saved.")
count += 1
if file_backup_location!=None:
data_persistence.save_pickle(embeddings, file_backup_location)
return embeddings
def find_most_similar_coincidences_indexes(n_results, objective_vector, movies_embeddings):
cosine_sim_scores = cosine_similarity([objective_vector.tolist()], movies_embeddings.tolist())[0].tolist()
most_similar_indexes = np.argsort(cosine_sim_scores)[-n_results:][::-1]
return most_similar_indexes.tolist()
def get_winning_choice_per_person(movie_tested, suggestions, similarity=COSINE_SIMILARITY):
if similarity==COSINE_SIMILARITY:
scores = np.array(cosine_similarity([movie_tested.tolist()], suggestions.tolist())[0].tolist())
scores[scores >= 0.99999] = 0
winning_suggestion_index = np.argmax(scores)
winning_score = scores[winning_suggestion_index]
return winning_suggestion_index, winning_score
elif similarity==EUCLIDEAN_SIMILARITY:
distances = np.linalg.norm(np.array(suggestions.tolist()) - movie_tested, axis=1)
scores = np.exp(-0.005*distances)
scores[scores >= 0.99999] = 0
winning_suggestion_index = np.argmax(scores)
winning_score = scores[winning_suggestion_index]
return winning_suggestion_index, winning_score
else:
raise ValueError('Wrong similarity value')
def normalize_to_minus_one_and_one(arr):
min_val = np.min(arr)
max_val = np.max(arr)
# Evitamos la división por cero en caso de que todos los valores sean iguales
if min_val == max_val:
return np.zeros_like(arr)
normalized_arr = 2 * (arr - min_val) / (max_val - min_val) - 1
return normalized_arr
def linearly_transform_embeddings():
movie_data = data_persistence.read_pickle(MOVIES_BK_FILE_LOCATION)
embeddings = movie_data["vector"]
embeddings = np.array(embeddings)
for embedding in embeddings:
# print(embedding)
embedding = normalize_to_minus_one_and_one(np.array(embedding))
embedding = pd.Series(embedding)
embeddings = pd.Series(embeddings)
movie_data["vector"] = embeddings
data_persistence.save_pickle(movie_data, MOVIES_BK_FILE_TRANSFORMED_LOCATION)
def find_recommendations(suggestions_dict, movies_embeddings, n_results=10, rating_weight=0, similarity=COSINE_SIMILARITY):
scores = []
suggestions_considered = []
for movie_tested in movies_embeddings:
movie_tested_score = 0
movie_tested_suggestions_considered = {}
for person in suggestions_dict:
winning_suggestion_index, winning_score = get_winning_choice_per_person(movie_tested, movies_embeddings.loc[suggestions_dict[person]], similarity=similarity)
movie_tested_score += winning_score
movie_tested_suggestions_considered[person] = suggestions_dict[person][winning_suggestion_index]
movie_tested_score /= len(suggestions_dict)
movie_tested_score += rating_weight * movie_tested_score
scores.append(movie_tested_score)
suggestions_considered.append(movie_tested_suggestions_considered)
recommended_movies_indexes = np.argsort(scores)[-n_results:][::-1]
recommendations = [
{"movie_index": idx, "suggestions_considered": suggestions_considered[idx]}
for idx in recommended_movies_indexes
]
return recommendations
|