import os os.system("pip uninstall -y gradio") os.system("pip install gradio==3.31.0") import numpy as np from sentence_transformers import SentenceTransformer, models import faiss import nltk from nltk.tokenize import sent_tokenize, word_tokenize import openai import pickle import gradio as gr import base64 from pathlib import Path import pandas as pd import gzip openai.api_key = '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' nltk.download('punkt') # Load BERT model model = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1') # Directory containing text files directory = "cleaned_files" # Define the index file name index_filename = "faiss.index" # Define the mapping file name mapping_filename = "mapping.pkl1" # Declare Textbox globally txt = gr.Textbox( label="Type your query here:", placeholder="What would you like to learn today?" ).style(container=True) def apply_html(text, color): if "" in text and "
" in text: # If the text contains table tags, modify the table structure for Gradio table_start = text.index("") table_end = text.index("
") + len("") table_content = text[table_start:table_end] # Modify the table structure for Gradio modified_table = table_content.replace("", "
") modified_table = modified_table.replace("
", "") modified_table = modified_table.replace("", "") # Replace the modified table back into the original text modified_text = text[:table_start] + modified_table + text[table_end:] return modified_text else: # Return the plain text as is return text ''' def apply_html(text, color): return f'{text}' ''' def apply_filelist_html(text, color): return f'{text}' # Check if the index file exists if os.path.exists(index_filename) and os.path.exists(mapping_filename): # Load the index from disk index = faiss.read_index(index_filename) # Load the mapping from disk with open(mapping_filename, 'rb') as f: chunks, filenames = pickle.load(f) else: # Lists to hold file names, corresponding embeddings and text chunks filenames = [] embeddings = [] chunks = [] # Define chunk size and overlap chunk_size = 5 # Size of each chunk overlap = 2 # Size of overlap between chunks # Iterate over files to create the index for filename in os.listdir(directory): if filename.endswith(".txt"): with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file: text = file.read() # Split text into sentences sentences = sent_tokenize(text) # Group sentences into chunks with overlap for i in range(0, len(sentences), chunk_size-overlap): chunk = ' '.join(sentences[i:i+chunk_size]) chunks.append(chunk) # Compute BERT embedding and append to list embeddings.append(model.encode(chunk)) filenames.append(filename) # Convert list of embeddings to numpy array embeddings = np.array(embeddings) # Dimension of our vector space d = embeddings.shape[1] # Construct the index index = faiss.IndexFlatL2(d) # Add vectors to the index index.add(embeddings) # Save the index to disk faiss.write_index(index, index_filename) # Save the mapping to disk with open(mapping_filename, 'wb') as f: pickle.dump((chunks, filenames), f) def add_text(history, text): # Apply selected rules if history is not None: # If all rules pass, add message to chat history with bot's response set to None history.append([apply_html(text, "blue"), None]) return history, text def bot(query, history, fileListHistory, k=5): print("QUERY : " + query) # Compute embedding for the query query_embedding = model.encode(query) # Faiss works with single precision query_embedding = query_embedding.astype('float32') # Search the index D, I = index.search(np.array([query_embedding]), k) # Retrieve and join the top k chunks top_chunks = [chunks[I[0, i]] for i in range(I.shape[1])] context = '\n'.join(top_chunks) # Retrieve the corresponding filenames top_filenames = [filenames[I[0, i]] for i in range(I.shape[1])] # Deduplicate file list top_filenames = list(set(top_filenames)) # Print the filenames print("Corresponding filenames: ", top_filenames) # Add the query and filenames to the fileListHistory # Create file links file_links = [f'{filename.replace(".txt", ".pdf")}' for filename in top_filenames] file_links_str = ', '.join(file_links) # Update file history with query and file links fileListHistory.append([apply_filelist_html(f"QUERY: {query} | REFERENCES: {file_links_str}", "green"), None]) # Call OpenAI API prompt = f'''The following is a query from a user who is a mechanic. Use the context provided to respond to the user. QUERY: {query} CONTEXT: {context} Respond to the point. Do not include terms like - (according to the context provided) in your response.''' #Remember to respond in bullet points. Respond with a table when appropriate messages = [{"role": "user", "content": prompt}] print(messages) # Initialize response response = None # Send messages to OpenAI API # Attempt the call 3 times for i in range(3): try: # Send message to OpenAI API response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, max_tokens=1000, stop=None, temperature=0, top_p=1, frequency_penalty=0, presence_penalty=0, ) # If the call is successful, break the loop break except openai.OpenAIError as e: # If the call times out, wait for 1 second and then try again if str(e) == "Request timed out": time.sleep(1) else: # If the error is something else, break the loop break # If the call was not successful after 3 attempts, set the response to a timeout message if response is None: print("Unfortunately, the connection to ChatGPT timed out. Please try after some time.") if history is not None and len(history) > 0: # Update the chat history with the bot's response history[-1][1] = apply_html(response.text.strip(), "black") else: # Print the generated response print("\nGPT RESPONSE:\n") print(response['choices'][0]['message']['content'].strip()) if history is not None and len(history) > 0: # Update the chat history with the bot's response history[-1][1] = apply_html(response['choices'][0]['message']['content'].strip(), "black") ''' # Send messages to OpenAI API # Attempt the call 3 times for i in range(3): try: # Send message to OpenAI API response = openai.Completion.create( engine="text-davinci-002", prompt=prompt, max_tokens=1000, temperature=0, top_p=1, frequency_penalty=0, presence_penalty=0, ) # If the call is successful, break the loop break except openai.OpenAIError as e: # If the call times out, wait for 1 second and then try again if str(e) == "Request timed out": time.sleep(1) else: # If the error is something else, break the loop break # If the call was not successful after 3 attempts, set the response to a timeout message if response is None: print("Unfortunately, the connection to ChatGPT timed out. Please try after some time.") if history is not None and len(history) > 0: # Update the chat history with the bot's response history[-1][1] = apply_html(response.text.strip(), "black") else: # Print the generated response print("\nGPT RESPONSE:\n") print(response.choices[0].text.strip()) if history is not None and len(history) > 0: # Update the chat history with the bot's response history[-1][1] = apply_html(response.choices[0].text.strip(), "black") ''' return history, fileListHistory # Open the image and convert it to base64 with open(Path("rybot_small.png"), "rb") as img_file: img_str = base64.b64encode(img_file.read()).decode() html_code = f'''
RyBOT image RyBOT

[ "I'm smart but the humans have me running on a hamster wheel. Please forgive the slow responses." ]

''' css = """ .feedback textarea {background-color: #e9f0f7} .gradio-container {background-color: #eeeeee} """ def clear_textbox(): print("Calling CLEAR") return None with gr.Blocks(theme=gr.themes.Soft(), css=css, title="RyBOT") as demo: gr.HTML(html_code) chatbot = gr.Chatbot([], elem_id="chatbot", label="Chat", color_map=["blue","grey"]).style(height=450) fileListBot = gr.Chatbot([], elem_id="fileListBot", label="References", color_map=["blue","grey"]).style(height=150) txt = gr.Textbox( label="Type your query here:", placeholder="What would you like to find today?" ).style(container=True) txt.submit( add_text, [chatbot, txt], [chatbot, txt] ).then( bot, [txt, chatbot, fileListBot], [chatbot, fileListBot] ).then( clear_textbox, inputs=None, outputs=[txt] ) btn = gr.Button(value="Send") btn.click( add_text, [chatbot, txt], [chatbot, txt], ).then( bot, [txt, chatbot, fileListBot], [chatbot, fileListBot] ).then( clear_textbox, inputs=None, outputs=[txt] ) demo.launch()