Spaces:
Sleeping
Sleeping
| from openai import OpenAI | |
| import numpy as np | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate | |
| from langchain.schema.runnable import RunnablePassthrough | |
| from langchain.schema.output_parser import StrOutputParser | |
| from scipy.spatial.distance import cosine | |
| def find_first_with_docket(items): | |
| # Loop through each item in the list | |
| k = 0 | |
| for item in items: | |
| # Check if "docket" is in the item (case-insensitive search) | |
| if "docket" in item.lower(): | |
| return item | |
| k = k + 1 | |
| # Return None if no item contains "docket" | |
| return 0 | |
| def escape_markdownold(text): | |
| # List of markdown special characters to escape | |
| special_chars = r"\*|_|#|\{|\}|\[|\]|\(|\)|\#|\+|\-|\.|\!|\\" | |
| # Use regex sub function to escape special characters by adding a backslash before them | |
| escaped_text = re.sub(f"([{special_chars}])", r"\\\1", text) | |
| return escaped_text | |
| def escape_markdown(text): | |
| # List of special characters in markdown that need escaping | |
| markdown_chars = ["\\", "`", "*", "_", "{", "}", "[", "]", "(", ")", "#", "+", "-", ".", "!", "|", ">","$"] | |
| # Escape each character with a backslash | |
| for char in markdown_chars: | |
| text = text.replace(char, "\\" + char) | |
| return text | |
| if not started: | |
| print("------------starting------------") | |
| import pickle | |
| # Path to the pickle file where you want to save your data | |
| pickle_file_path = 'vectorstore.pkl' | |
| with open(pickle_file_path, 'rb') as file: | |
| st.session_state.docs = pickle.load(file) | |
| st.session_state.embeddings = np.load('embeddings.npy') | |
| def strip_repeated_dots_and_blanks(text): | |
| # Replace multiple dots with a single dot | |
| text = re.sub(r'\.{2,}', '.', text) | |
| # Replace multiple spaces with a single space | |
| text = re.sub(r' {2,}', ' ', text) | |
| text = re.sub('\n \n', '\n\n', text) | |
| return text | |
| # Function to get embeddings from OpenAI API | |
| def get_embeddings(texts): | |
| client = OpenAI() | |
| embeddings = [] | |
| for k in texts: | |
| response = client.embeddings.create( | |
| input = k, | |
| model="text-embedding-3-small" | |
| ) | |
| embeddings = embeddings + [response.data[0].embedding] | |
| return embeddings #[item['embedding'] for item in response['data']] | |
| def cosine_similarity(vec_a, vec_b): | |
| # Note: Cosine similarity is 1 - cosine distance | |
| return 1 - cosine(vec_a, vec_b) | |
| def askq(query): | |
| embeddings = st.session_state.embeddings | |
| docs = st.session_state.docs | |
| question = strip_repeated_dots_and_blanks(query) | |
| query_embedding = get_embeddings([query])[0] | |
| # Calculate similarity of each text to the query | |
| similarities = [cosine_similarity(embedding, query_embedding) for embedding in embeddings] | |
| similarities_array = np.array(similarities) | |
| highest_indices = np.argpartition(similarities_array, -5)[-5:] | |
| # Sort these two indices by their similarity values so the highest comes first | |
| highest_indices = highest_indices[np.argsort(similarities_array[highest_indices])[::-1]] | |
| # highest_indices = highest_indices[highest_indices>0.5] | |
| # highest_indices = [x for x in highest_indices if x > 0.5] | |
| # highest_indices = [index for index in highest_indices if similarities_array[index] > 0.5] | |
| # Filter indices by their corresponding similarity values | |
| filtered_indices_and_values = [(index, similarities_array[index]) for index in highest_indices if similarities_array[index] > 0.5] | |
| highest_indices = [item[0] for item in filtered_indices_and_values] | |
| highest_simularities = [item[1] for item in filtered_indices_and_values] | |
| selected_items = [docs[i].page_content for i in highest_indices] | |
| selected_sources = [docs[i].metadata['source'] for i in highest_indices] | |
| selected_chunks = [docs[i].metadata['chunk'] for i in highest_indices] | |
| selected_chunks = [docs[i].metadata['chunk'] for i in similarities_array] | |
| content = ' '.join(selected_items) | |
| # Combine content and question into a single prompt | |
| prompt = f"""Anwser the question or request provided given content. If an awnser can't be found in the provided ontent respond that you could not find | |
| the awnser to the question, applogice and say that you will instead tell a lawyer joke follow that with the joke. | |
| Content: {content}\n\nQuestion: {question}\nAnswer:""" | |
| # Define LLM | |
| llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.2) | |
| #llm = ChatOpenAI(model_name="gpt-4", temperature=0.2) | |
| Awnser = llm.invoke(prompt).content | |
| return Awnser, selected_items,selected_sources,selected_chunks,highest_simularities | |
| import numpy as np | |
| import streamlit as st | |
| # Assuming `strip_repeated_dots_and_blanks`, `get_embeddings`, and `cosine_similarity` are defined elsewhere correctly | |
| # Assuming `ChatOpenAI` is a correctly defined or imported class for handling OpenAI chat | |
| def ask(query): | |
| embeddings = st.session_state.embeddings | |
| docs = st.session_state.docs | |
| question = strip_repeated_dots_and_blanks(query) | |
| query_embedding = get_embeddings([query])[0] | |
| # Calculate similarity of each text to the query | |
| similarities = [cosine_similarity(embedding, query_embedding) for embedding in embeddings] | |
| # Create a NumPy array of similarities | |
| similarities_array = np.array(similarities) | |
| # Get indices of the top 5 most similar texts | |
| highest_indices = np.argpartition(similarities_array, -5)[-5:] | |
| # Sort the top 5 indices by their similarity values in descending order | |
| highest_indices = highest_indices[np.argsort(similarities_array[highest_indices])[::-1]] | |
| # Filter indices and their corresponding similarity values for those above 0.5 | |
| filtered_indices_and_values = [(index, similarities_array[index]) for index in highest_indices if similarities_array[index] > 0.4] | |
| # Extract filtered indices and their similarities | |
| highest_indices = [item[0] for item in filtered_indices_and_values] | |
| highest_simularities = [item[1] for item in filtered_indices_and_values] | |
| # Select items based on filtered indices | |
| selected_items = [docs[i].page_content for i in highest_indices] | |
| selected_sources = [docs[i].metadata['source'] for i in highest_indices] | |
| selected_chunks = [docs[i].metadata['chunk'] for i in highest_indices] | |
| titles = [docs[i].metadata['title'] for i in highest_indices] | |
| dates = [docs[i].metadata['date'] for i in highest_indices] | |
| # Combine selected items into a single content string | |
| content = ' '.join(selected_items) | |
| # Prepare the prompt | |
| prompt = f"""Answer the question or request provided given the content. If an answer can't be found in the provided content, | |
| respond that you could not find the answer to the question, apologize and instead provide a suggestion for where to search for more information related to the question. | |
| \ | |
| ------------------- | |
| Content: {content}\n\nQuestion: {question}\nAnswer: | |
| ------------------- | |
| """ | |
| # Initialize the LLM (assuming correct implementation or import) | |
| llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.1) | |
| answer = llm.invoke(prompt).content | |
| return answer, selected_items, selected_sources, titles, dates, selected_chunks, highest_simularities | |