Spaces:
Sleeping
Sleeping
| # Xinghao Chen, Weichen Chen | |
| # Xinghao Chen is responsible for the algorithm part. Weichen Chen is responsible for the visualization part. | |
| # Project's url: https://huggingface.co/spaces/AveMujica/Semantic_Search_Demo | |
| import streamlit as st | |
| import numpy as np | |
| import numpy.linalg as la | |
| import pickle | |
| import os | |
| import gdown | |
| from sentence_transformers import SentenceTransformer | |
| import matplotlib.pyplot as plt | |
| import math | |
| from huggingface_hub import hf_hub_download | |
| def download_embeddings(model_type): | |
| """Download embeddings based on dimension - from Google Drive for 25d/50d, from HF for 100d""" | |
| embeddings_temp = f"embeddings_{model_type}_temp.npy" | |
| word_index_temp = f"word_index_dict_{model_type}_temp.pkl" | |
| if model_type == "100d": | |
| # Download from Hugging Face model repository | |
| print("Downloading 100d embeddings from Hugging Face...\n") | |
| try: | |
| hf_hub_download( | |
| repo_id="AveMujica/glove-twitter-100d", | |
| filename="embeddings_100d_temp.npy", | |
| local_dir=".", | |
| local_dir_use_symlinks=False | |
| ) | |
| hf_hub_download( | |
| repo_id="AveMujica/glove-twitter-100d", | |
| filename="word_index_dict_100d_temp.pkl", | |
| local_dir=".", | |
| local_dir_use_symlinks=False | |
| ) | |
| except Exception as e: | |
| st.error(f"Error downloading from Hugging Face: {str(e)}") | |
| raise e | |
| else: | |
| # Use existing Google Drive download logic for 25d and 50d | |
| word_index_id, embeddings_id = get_model_id_gdrive(model_type) | |
| print("Downloading word index dictionary....\n") | |
| gdown.download(id=word_index_id, output=word_index_temp, quiet=False) | |
| print("Downloading embeddings...\n\n") | |
| gdown.download(id=embeddings_id, output=embeddings_temp, quiet=False) | |
| def load_glove_embeddings(glove_path="Data/embeddings.pkl"): | |
| with open(glove_path, "rb") as f: | |
| embeddings_dict = pickle.load(f, encoding="latin1") | |
| return embeddings_dict | |
| def get_model_id_gdrive(model_type): | |
| if model_type == "25d": | |
| word_index_id = "13qMXs3-oB9C6kfSRMwbAtzda9xuAUtt8" | |
| embeddings_id = "1-RXcfBvWyE-Av3ZHLcyJVsps0RYRRr_2" | |
| elif model_type == "50d": | |
| embeddings_id = "1DBaVpJsitQ1qxtUvV1Kz7ThDc3az16kZ" | |
| word_index_id = "1rB4ksHyHZ9skes-fJHMa2Z8J1Qa7awQ9" | |
| return word_index_id, embeddings_id | |
| def download_glove_embeddings_gdrive(model_type): | |
| # Get glove embeddings from Google Drive | |
| word_index_id, embeddings_id = get_model_id_gdrive(model_type) | |
| # Use gdown to download files from Google Drive | |
| embeddings_temp = "embeddings_" + str(model_type) + "_temp.npy" | |
| word_index_temp = "word_index_dict_" + str(model_type) + "_temp.pkl" | |
| # Download word_index pickle file | |
| print("Downloading word index dictionary....\n") | |
| gdown.download(id=word_index_id, output=word_index_temp, quiet=False) | |
| # Download embeddings numpy file | |
| print("Downloading embeddings...\n\n") | |
| gdown.download(id=embeddings_id, output=embeddings_temp, quiet=False) | |
| # @st.cache_data() | |
| def load_glove_embeddings_gdrive(model_type): | |
| """ | |
| Load GloVe embeddings and word index dictionary from local files | |
| Handles different pickle formats for different dimension models | |
| """ | |
| word_index_temp = f"word_index_dict_{model_type}_temp.pkl" | |
| embeddings_temp = f"embeddings_{model_type}_temp.npy" | |
| # Load word index dictionary with different encoding attempts | |
| try: | |
| # First try with latin1 encoding (for 25d and 50d) | |
| with open(word_index_temp, "rb") as f: | |
| word_index_dict = pickle.load(f, encoding="latin1") | |
| except: | |
| try: | |
| # Then try with bytes encoding (for 100d) | |
| with open(word_index_temp, "rb") as f: | |
| word_index_dict = pickle.load(f) | |
| except Exception as e: | |
| st.error(f"Error loading word index dictionary: {str(e)}") | |
| raise e | |
| # Load embeddings numpy array | |
| try: | |
| embeddings = np.load(embeddings_temp) | |
| except Exception as e: | |
| st.error(f"Error loading embeddings: {str(e)}") | |
| raise e | |
| return word_index_dict, embeddings | |
| def load_sentence_transformer_model(model_name): | |
| sentenceTransformer = SentenceTransformer(model_name) | |
| return sentenceTransformer | |
| def get_sentence_transformer_embeddings(sentence, model_name="all-MiniLM-L6-v2"): | |
| """ | |
| Get sentence transformer embeddings for a sentence | |
| """ | |
| # 384-dimensional embedding | |
| # Default model: https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2 | |
| sentenceTransformer = load_sentence_transformer_model(model_name) | |
| try: | |
| return sentenceTransformer.encode(sentence) | |
| except: | |
| if model_name == "all-MiniLM-L6-v2": | |
| return np.zeros(384) | |
| else: | |
| return np.zeros(512) | |
| def get_glove_embeddings(word, word_index_dict, embeddings, model_type): | |
| """ | |
| Get GloVe embedding for a single word | |
| """ | |
| if word.lower() in word_index_dict: | |
| return embeddings[word_index_dict[word.lower()]] | |
| else: | |
| return np.zeros(int(model_type.split("d")[0])) | |
| def get_category_embeddings(embeddings_metadata): | |
| """ | |
| Get embeddings for each category | |
| 1. Split categories into words | |
| 2. Get embeddings for each word | |
| """ | |
| model_name = embeddings_metadata["model_name"] | |
| st.session_state["cat_embed_" + model_name] = {} | |
| for category in st.session_state.categories.split(" "): | |
| if model_name: | |
| if not category in st.session_state["cat_embed_" + model_name]: | |
| st.session_state["cat_embed_" + model_name][category] = get_sentence_transformer_embeddings(category, model_name=model_name) | |
| else: | |
| if not category in st.session_state["cat_embed_" + model_name]: | |
| st.session_state["cat_embed_" + model_name][category] = get_sentence_transformer_embeddings(category) | |
| def update_category_embeddings(embeddings_metadata): | |
| """ | |
| Update embeddings for each category | |
| """ | |
| get_category_embeddings(embeddings_metadata) | |
| ### Plotting utility functions | |
| def plot_piechart(sorted_cosine_scores_items): | |
| sorted_cosine_scores = np.array([ | |
| sorted_cosine_scores_items[index][1] | |
| for index in range(len(sorted_cosine_scores_items)) | |
| ] | |
| ) | |
| categories = st.session_state.categories.split(" ") | |
| categories_sorted = [ | |
| categories[sorted_cosine_scores_items[index][0]] | |
| for index in range(len(sorted_cosine_scores_items)) | |
| ] | |
| fig, ax = plt.subplots() | |
| ax.pie(sorted_cosine_scores, labels=categories_sorted, autopct="%1.1f%%") | |
| st.pyplot(fig) # Display figure | |
| def plot_piechart_helper(sorted_cosine_scores_items): | |
| colors = plt.cm.Pastel1.colors | |
| categories = st.session_state.categories.split(" ") | |
| fig, ax = plt.subplots(figsize=(6, 6)) | |
| labels = [categories[i] for i, _ in sorted_cosine_scores_items] | |
| sizes = [score for _, score in sorted_cosine_scores_items] | |
| explode = np.zeros(len(labels)) | |
| explode[0] = 0.1 | |
| wedges, texts, autotexts = ax.pie( | |
| sizes, | |
| explode=explode, | |
| labels=labels, | |
| colors=colors, | |
| autopct=lambda p: f'{p:.1f}%', | |
| startangle=90, | |
| shadow=True, | |
| pctdistance=0.85, | |
| wedgeprops={'edgecolor': 'white', 'linewidth': 1}, | |
| textprops={'fontsize': 10} | |
| ) | |
| for autotext in autotexts: | |
| autotext.set_color('black') | |
| autotext.set_fontsize(10) | |
| autotext.set_fontweight('bold') | |
| centre_circle = plt.Circle((0,0),0.42,fc='white') | |
| ax.add_artist(centre_circle) | |
| ax.set_title('Category Distribution', fontsize=14, pad=20) | |
| ax.axis('equal') | |
| return fig | |
| def plot_piecharts(sorted_cosine_scores_models): | |
| scores_list = [] | |
| categories = st.session_state.categories.split(" ") | |
| index = 0 | |
| for model in sorted_cosine_scores_models: | |
| scores_list.append(sorted_cosine_scores_models[model]) | |
| index += 1 | |
| if len(sorted_cosine_scores_models) == 2: | |
| fig, (ax1, ax2) = plt.subplots(2) | |
| categories_sorted = [ | |
| categories[scores_list[0][index][0]] for index in range(len(scores_list[0])) | |
| ] | |
| sorted_scores = np.array( | |
| [scores_list[0][index][1] for index in range(len(scores_list[0]))] | |
| ) | |
| ax1.pie(sorted_scores, labels=categories_sorted, autopct="%1.1f%%") | |
| categories_sorted = [ | |
| categories[scores_list[1][index][0]] for index in range(len(scores_list[1])) | |
| ] | |
| sorted_scores = np.array( | |
| [scores_list[1][index][1] for index in range(len(scores_list[1]))] | |
| ) | |
| ax2.pie(sorted_scores, labels=categories_sorted, autopct="%1.1f%%") | |
| st.pyplot(fig) | |
| def plot_alatirchart(sorted_cosine_scores_models): | |
| models = list(sorted_cosine_scores_models.keys()) | |
| tabs = st.tabs(models) | |
| figs = {} | |
| for model in models: | |
| figs[model] = plot_piechart_helper(sorted_cosine_scores_models[model]) | |
| for index in range(len(tabs)): | |
| with tabs[index]: | |
| st.pyplot(figs[models[index]]) | |
| # Task I: Compute Cosine Similarity | |
| def cosine_similarity(x, y): | |
| """ | |
| Exponentiated cosine similarity | |
| 1. Compute cosine similarity | |
| 2. Exponentiate cosine similarity | |
| 3. Return exponentiated cosine similarity | |
| (20 pts) | |
| """ | |
| dot_product = np.dot(x, y) | |
| norm_x = la.norm(x) | |
| norm_y = la.norm(y) | |
| if norm_x == 0 or norm_y == 0: | |
| return 0.0 # Handle zero vectors to avoid division by zero | |
| cos_sim = dot_product / (norm_x * norm_y) | |
| return np.exp(cos_sim) | |
| # Task II: Average Glove Embedding Calculation | |
| def averaged_glove_embeddings_gdrive(sentence, word_index_dict, embeddings, model_type): | |
| """ | |
| Get averaged glove embeddings for a sentence | |
| 1. Split sentence into words | |
| 2. Get embeddings for each word | |
| 3. Sum embeddings for each word | |
| 4. Divide by number of words | |
| 5. Return averaged embeddings | |
| (30 pts) | |
| """ | |
| model_dim = int(model_type.split('d')[0]) | |
| words = sentence.split() | |
| avg_embedding = np.zeros(model_dim) | |
| if not words: | |
| return avg_embedding | |
| for word in words: | |
| word_embed = get_glove_embeddings(word, word_index_dict, embeddings, model_type) | |
| avg_embedding += word_embed | |
| avg_embedding /= len(words) | |
| return avg_embedding | |
| # Task III: Sort the cosine similarity | |
| def get_sorted_cosine_similarity(embeddings_metadata): | |
| """ | |
| Get sorted cosine similarity between input sentence and categories | |
| Steps: | |
| 1. Get embeddings for input sentence | |
| 2. Get embeddings for categories (update if not found) | |
| 3. Compute cosine similarity between input and categories | |
| 4. Sort cosine similarities | |
| 5. Return sorted cosine similarities | |
| (50 pts) | |
| """ | |
| categories = st.session_state.categories.split(" ") | |
| cosine_sim = {} | |
| if embeddings_metadata["embedding_model"] == "glove": | |
| word_index_dict = embeddings_metadata["word_index_dict"] | |
| embeddings = embeddings_metadata["embeddings"] | |
| model_type = embeddings_metadata["model_type"] | |
| input_embedding = averaged_glove_embeddings_gdrive( | |
| st.session_state.text_search, | |
| word_index_dict, | |
| embeddings, model_type | |
| ) | |
| # Compute cosine similarity for each category | |
| for idx, category in enumerate(categories): | |
| cat_embed = get_glove_embeddings(category, word_index_dict, embeddings, model_type) | |
| sim = cosine_similarity(input_embedding, cat_embed) | |
| cosine_sim[idx] = sim | |
| else: | |
| model_name = embeddings_metadata.get("model_name", "") | |
| if f"cat_embed_{model_name}" not in st.session_state: | |
| get_category_embeddings(embeddings_metadata) | |
| category_embeddings = st.session_state[f"cat_embed_{model_name}"] | |
| input_embedding = get_sentence_transformer_embeddings( | |
| st.session_state.text_search, model_name=model_name | |
| ) | |
| for idx, category in enumerate(categories): | |
| if category not in category_embeddings: | |
| # Update missing category embedding | |
| category_embeddings[category] = get_sentence_transformer_embeddings(category, model_name=model_name) | |
| cat_embed = category_embeddings[category] | |
| sim = cosine_similarity(input_embedding, cat_embed) | |
| cosine_sim[idx] = sim | |
| # Sort scores in descending order | |
| sorted_scores = sorted(cosine_sim.items(), key=lambda x: x[1], reverse=True) | |
| return sorted_scores | |
| if __name__ == "__main__": | |
| st.sidebar.title("Model Configuration") | |
| st.sidebar.markdown( | |
| """ | |
| GloVe is an unsupervised learning algorithm for obtaining vector representations for words. Pretrained on | |
| 2 billion tweets with vocabulary size of 1.2 million. Download from [Stanford NLP](http://nlp.stanford.edu/data/glove.twitter.27B.zip). | |
| Jeffrey Pennington, Richard Socher, and Christopher D. Manning. 2014. *GloVe: Global Vectors for Word Representation*. | |
| """ | |
| ) | |
| st_model = st.sidebar.selectbox( | |
| "Sentence Transformer Model", | |
| options=[ | |
| "all-MiniLM-L6-v2", | |
| "all-mpnet-base-v2", | |
| "multi-qa-mpnet-base-dot-v1", | |
| "paraphrase-multilingual-mpnet-base-v2" | |
| ], | |
| index=0, | |
| help="Select pretrained sentence transformer model" | |
| ) | |
| model_type = st.sidebar.selectbox( | |
| "GloVe Dimension", | |
| ("25d", "50d", "100d"), | |
| index=1, | |
| help="Select dimension for GloVe embeddings" | |
| ) | |
| st.title("Semantic Search Demo") | |
| if "categories" not in st.session_state: | |
| st.session_state.categories = "Flowers Colors Cars Weather Food" | |
| if "text_search" not in st.session_state: | |
| st.session_state.text_search = "Roses are red, trucks are blue, and Seattle is grey right now" | |
| st.subheader("Categories (space-separated)") | |
| st.text_input( | |
| label="Categories", | |
| key="categories", | |
| value=st.session_state.categories | |
| ) | |
| st.subheader("Input Sentence") | |
| st.text_input( | |
| label="Your input", | |
| key="text_search", | |
| value=st.session_state.text_search | |
| ) | |
| embeddings_path = f"embeddings_{model_type}_temp.npy" | |
| word_index_dict_path = f"word_index_dict_{model_type}_temp.pkl" | |
| if not os.path.isfile(embeddings_path) or not os.path.isfile(word_index_dict_path): | |
| with st.spinner(f"Downloading GloVe-{model_type} embeddings..."): | |
| download_embeddings(model_type) | |
| word_index_dict, embeddings = load_glove_embeddings_gdrive(model_type) | |
| if st.session_state.text_search.strip(): | |
| glove_metadata = { | |
| "embedding_model": "glove", | |
| "word_index_dict": word_index_dict, | |
| "embeddings": embeddings, | |
| "model_type": model_type, | |
| } | |
| transformer_metadata = { | |
| "embedding_model": "transformers", | |
| "model_name": st_model | |
| } | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| with st.spinner(f"Processing GloVe-{model_type}..."): | |
| sorted_glove = get_sorted_cosine_similarity(glove_metadata) | |
| with col2: | |
| with st.spinner(f"Processing {st_model}..."): | |
| sorted_transformer = get_sorted_cosine_similarity(transformer_metadata) | |
| st.subheader(f"Results for: '{st.session_state.text_search}'") | |
| plot_alatirchart({ | |
| f"Sentence Transformer ({st_model})": sorted_transformer, | |
| f"GloVe-{model_type}": sorted_glove | |
| }) | |
| st.markdown("---") | |
| st.caption("Developed by [Xinghao Chen](https://www.linkedin.com/in/cxh42/) and Weichen Chen") |