Spaces:
Runtime error
Runtime error
| import torch | |
| import torch.nn.functional as F | |
| from torch import Tensor | |
| from transformers import AutoTokenizer, AutoModel | |
| import threading | |
| import queue | |
| import gradio as gr | |
| import os | |
| title = """ | |
| # 👋🏻Welcome to 🙋🏻♂️Tonic's 🐣e5-mistral🛌🏻Embeddings """ | |
| description = """ | |
| You can use this ZeroGPU Space to test out the current model [intfloat/e5-mistral-7b-instruct](https://huggingface.co/intfloat/e5-mistral-7b-instruct). 🐣e5-mistral🛌🏻 has a larger context🪟window, a different prompting/return🛠️mechanism and generally better results than other embedding models. use it via API to create embeddings or try out the sentence similarity to see how various optimization parameters affect performance. | |
| You can also use 🐣e5-mistral🛌🏻 by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co/spaces/Tonic/e5?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3> | |
| Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community 👻 [](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Tonic-AI](https://github.com/tonic-ai) & contribute to 🌟 [DataTonic](https://github.com/Tonic-AI/DataTonic) 🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗 | |
| """ | |
| os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:30' | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| tasks = { | |
| 'ArguAna': 'Given a claim, find documents that refute the claim', | |
| 'ClimateFEVER': 'Given a claim about climate change, retrieve documents that support or refute the claim', | |
| 'DBPedia': 'Given a query, retrieve relevant entity descriptions from DBPedia', | |
| 'FEVER': 'Given a claim, retrieve documents that support or refute the claim', | |
| 'FiQA2018': 'Given a financial question, retrieve user replies that best answer the question', | |
| 'HotpotQA': 'Given a multi-hop question, retrieve documents that can help answer the question', | |
| 'MSMARCO': 'Given a web search query, retrieve relevant passages that answer the query', | |
| 'NFCorpus': 'Given a question, retrieve relevant documents that best answer the question', | |
| 'NQ': 'Given a question, retrieve Wikipedia passages that answer the question', | |
| 'QuoraRetrieval': 'Given a question, retrieve questions that are semantically equivalent to the given question', | |
| 'SCIDOCS': 'Given a scientific paper title, retrieve paper abstracts that are cited by the given paper', | |
| 'SciFact': 'Given a scientific claim, retrieve documents that support or refute the claim', | |
| 'Touche2020': 'Given a question, retrieve detailed and persuasive arguments that answer the question', | |
| 'TRECCOVID': 'Given a query on COVID-19, retrieve documents that answer the query', | |
| } | |
| # Global queue for embedding requests | |
| embedding_request_queue = queue.Queue() | |
| embedding_response_queue = queue.Queue() | |
| tokenizer = AutoTokenizer.from_pretrained('intfloat/e5-mistral-7b-instruct') | |
| model = AutoModel.from_pretrained('intfloat/e5-mistral-7b-instruct', torch_dtype=torch.float16, device_map=device) | |
| def last_token_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor: | |
| left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0]) | |
| if left_padding: | |
| return last_hidden_states[:, -1] | |
| else: | |
| sequence_lengths = attention_mask.sum(dim=1) - 1 | |
| batch_size = last_hidden_states.shape[0] | |
| return last_hidden_states[torch.arange(batch_size, device=last_hidden_states.device), sequence_lengths] | |
| def clear_cuda_cache(): | |
| torch.cuda.empty_cache() | |
| def free_memory(*args): | |
| for arg in args: | |
| del arg | |
| def load_corpus_from_json(file_path): | |
| with open(file_path, 'r') as file: | |
| data = json.load(file) | |
| return data | |
| def embedding_worker(): | |
| while True: | |
| # Wait for an item in the queue | |
| item = embedding_request_queue.get() | |
| if item is None: | |
| break | |
| selected_task, input_text = item | |
| embeddings = compute_embeddings(selected_task, input_text) | |
| formatted_response = format_response(embeddings) | |
| embedding_response_queue.put(formatted_response) | |
| embedding_request_queue.task_done() | |
| clear_cuda_cache() | |
| threading.Thread(target=embedding_worker, daemon=True).start() | |
| def compute_embeddings(selected_task, input_text): | |
| try: | |
| task_description = tasks[selected_task] | |
| except KeyError: | |
| print(f"Selected task not found: {selected_task}") | |
| return f"Error: Task '{selected_task}' not found. Please select a valid task." | |
| max_length = 2048 | |
| processed_texts = [f'Instruct: {task_description}\nQuery: {input_text}'] | |
| batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) | |
| batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] | |
| batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') | |
| batch_dict = {k: v.to(device) for k, v in batch_dict.items()} | |
| outputs = model(**batch_dict) | |
| embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) | |
| embeddings = F.normalize(embeddings, p=2, dim=1) | |
| embeddings_list = embeddings.detach().cpu().numpy().tolist() | |
| clear_cuda_cache() | |
| return embeddings_list | |
| def decode_embedding(embedding_str): | |
| try: | |
| embedding = [float(num) for num in embedding_str.split(',')] | |
| embedding_tensor = torch.tensor(embedding, dtype=torch.float16, device=device) | |
| decoded_embedding = tokenizer.decode(embedding_tensor[0], skip_special_tokens=True) | |
| return decoded_embedding.cpu().numpy().tolist() | |
| except Exception as e: | |
| return f"Error in decoding: {str(e)}" | |
| def compute_similarity(selected_task, sentence1, sentence2, extra_sentence1, extra_sentence2): | |
| try: | |
| task_description = tasks[selected_task] | |
| except KeyError: | |
| print(f"Selected task not found: {selected_task}") | |
| return f"Error: Task '{selected_task}' not found. Please select a valid task." | |
| # Compute embeddings for each sentence | |
| embeddings1 = compute_embeddings(selected_task, sentence1) | |
| embeddings2 = compute_embeddings(selected_task, sentence2) | |
| embeddings3 = compute_embeddings(selected_task, extra_sentence1) | |
| embeddings4 = compute_embeddings(selected_task, extra_sentence2) | |
| # Convert embeddings to tensors | |
| embeddings_tensor1 = torch.tensor(embeddings1).to(device).half() | |
| embeddings_tensor2 = torch.tensor(embeddings2).to(device).half() | |
| embeddings_tensor3 = torch.tensor(embeddings3).to(device).half() | |
| embeddings_tensor4 = torch.tensor(embeddings4).to(device).half() | |
| # Compute cosine similarity | |
| similarity1 = compute_cosine_similarity(embeddings1, embeddings2) | |
| similarity2 = compute_cosine_similarity(embeddings1, embeddings3) | |
| similarity3 = compute_cosine_similarity(embeddings1, embeddings4) | |
| # Free memory | |
| free_memory(embeddings1, embeddings2, embeddings3, embeddings4) | |
| similarity_scores = {"Similarity 1-2": similarity1, "Similarity 1-3": similarity2, "Similarity 1-4": similarity3} | |
| clear_cuda_cache() | |
| return similarity_scores | |
| def compute_cosine_similarity(emb1, emb2): | |
| tensor1 = torch.tensor(emb1).to(device).half() | |
| tensor2 = torch.tensor(emb2).to(device).half() | |
| similarity = F.cosine_similarity(tensor1, tensor2).item() | |
| free_memory(tensor1, tensor2) | |
| clear_cuda_cache() | |
| return similarity | |
| def compute_embeddings_batch(input_texts): | |
| max_length = 2042 | |
| processed_texts = [f'Instruct: {task_description}\nQuery: {text}' for text in input_texts] | |
| batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) | |
| batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] | |
| batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') | |
| batch_dict = {k: v.to(device) for k, v in batch_dict.items()} | |
| outputs = model(**batch_dict) | |
| embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) | |
| embeddings = F.normalize(embeddings, p=2, dim=1) | |
| clear_cuda_cache() | |
| return embeddings.detach().cpu().numpy() | |
| def semantic_search(query_embedding, corpus_embeddings, top_k=5): | |
| scores = np.dot(corpus_embeddings, query_embedding.T).flatten() | |
| top_k_indices = np.argsort(scores)[::-1][:top_k] | |
| return top_k_indices, scores[top_k_indices] | |
| def search_similar_sentences(input_question, corpus_sentences, corpus_embeddings): | |
| question_embedding = compute_embeddings_batch([input_question])[0] | |
| top_k_indices, top_k_scores = semantic_search(question_embedding, corpus_embeddings) | |
| results = [(corpus_sentences[i], top_k_scores[i]) for i in top_k_indices] | |
| return results | |
| # openai response object formatting | |
| def format_response(embeddings): | |
| return { | |
| "data": [ | |
| { | |
| "embedding": embeddings, | |
| "index": 0, | |
| "object": "embedding" | |
| } | |
| ], | |
| "model": "e5-mistral", | |
| "object": "list", | |
| "usage": { | |
| "prompt_tokens": 17, | |
| "total_tokens": 17 | |
| } | |
| } | |
| def generate_and_format_embeddings(selected_task, input_text): | |
| embedding_request_queue.put((selected_task, input_text)) | |
| response = embedding_response_queue.get() | |
| embedding_response_queue.task_done() | |
| clear_cuda_cache() | |
| return response | |
| def app_interface(): | |
| corpus_sentences = [] | |
| corpus_embeddings = [] | |
| with gr.Blocks() as demo: | |
| gr.Markdown(title) | |
| gr.Markdown(description) | |
| with gr.Row(): | |
| task_dropdown = gr.Dropdown(list(tasks.keys()), label="Select a Task", value=list(tasks.keys())[0]) | |
| with gr.Tab("Embedding Generation"): | |
| input_text_box = gr.Textbox(label="📖Input Text") | |
| compute_button = gr.Button("Try🐣🛌🏻e5") | |
| output_display = gr.Textbox(label="🐣e5-mistral🛌🏻 Embeddings") | |
| compute_button.click( | |
| fn=compute_embeddings, | |
| inputs=[task_dropdown, input_text_box], | |
| outputs=output_display | |
| ) | |
| with gr.Tab("Sentence Similarity"): | |
| sentence1_box = gr.Textbox(label="'Focus Sentence' - The 'Subject'") | |
| sentence2_box = gr.Textbox(label="'Input Sentence' - 1") | |
| extra_sentence1_box = gr.Textbox(label="'Input Sentence' - 2") | |
| extra_sentence2_box = gr.Textbox(label="'Input Sentence' - 3") | |
| similarity_button = gr.Button("Compute Similarity") | |
| similarity_output = gr.Textbox(label="🐣e5-mistral🛌🏻 Similarity Scores") | |
| similarity_button.click( | |
| fn=compute_similarity, | |
| inputs=[task_dropdown, sentence1_box, sentence2_box, extra_sentence1_box, extra_sentence2_box], | |
| outputs=similarity_output | |
| ) | |
| with gr.Tab("Load Corpus"): | |
| json_uploader = gr.File(label="Upload JSON File") | |
| load_corpus_button = gr.Button("Load Corpus") | |
| corpus_status = gr.Textbox(label="Corpus Status", value="Corpus not loaded") | |
| def load_corpus(file_info): | |
| if file_info is None: | |
| return "No file uploaded. Please upload a JSON file." | |
| try: | |
| global corpus_sentences, corpus_embeddings | |
| corpus_sentences = load_corpus_from_json(file_info['name']) | |
| corpus_embeddings = compute_embeddings_batch(corpus_sentences) | |
| return "Corpus loaded successfully with {} sentences.".format(len(corpus_sentences)) | |
| except Exception as e: | |
| return "Error loading corpus: {}".format(e) | |
| load_corpus_button.click( | |
| fn=load_corpus, | |
| inputs=json_uploader, | |
| outputs=corpus_status | |
| ) | |
| with gr.Tab("Semantic Search"): | |
| input_question_box = gr.Textbox(label="Enter your question") | |
| search_button = gr.Button("Search") | |
| search_results_output = gr.Textbox(label="Search Results") | |
| def perform_search(input_question): | |
| if not corpus_sentences or not corpus_embeddings: | |
| return "Corpus is not loaded. Please load a corpus first." | |
| return search_similar_sentences(input_question, corpus_sentences, corpus_embeddings) | |
| search_button.click( | |
| fn=perform_search, | |
| inputs=input_question_box, | |
| outputs=search_results_output | |
| ) | |
| with gr.Tab("Connector-like Embeddings"): | |
| with gr.Row(): | |
| input_text_box_connector = gr.Textbox(label="Input Text", placeholder="Enter text or array of texts") | |
| model_dropdown_connector = gr.Dropdown(label="Model", choices=["ArguAna", "ClimateFEVER", "DBPedia", "FEVER", "FiQA2018", "HotpotQA", "MSMARCO", "NFCorpus", "NQ", "QuoraRetrieval", "SCIDOCS", "SciFact", "Touche2020", "TRECCOVID"], value="text-embedding-ada-002") | |
| encoding_format_connector = gr.Radio(label="Encoding Format", choices=["float", "base64"], value="float") | |
| user_connector = gr.Textbox(label="User", placeholder="Enter user identifier (optional)") | |
| submit_button_connector = gr.Button("Generate Embeddings") | |
| output_display_connector = gr.JSON(label="Embeddings Output") | |
| submit_button_connector.click( | |
| fn=generate_and_format_embeddings, | |
| inputs=[model_dropdown_connector, input_text_box_connector], | |
| outputs=output_display_connector | |
| ) | |
| # with gr.Tab("Decode Embedding"): | |
| # embedding_input = gr.Textbox(label="Enter Embedding (comma-separated floats)") | |
| # decode_button = gr.Button("Decode") | |
| # decoded_output = gr.Textbox(label="Decoded Embedding") | |
| # | |
| # decode_button.click( | |
| # fn=decode_embedding, | |
| # inputs=embedding_input, | |
| # outputs=decoded_output | |
| # ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_text_box | |
| with gr.Column(): | |
| compute_button | |
| output_display | |
| return demo | |
| app_interface().queue() | |
| app_interface().launch(share=True) |