Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import numpy as np | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.chains import LLMChain | |
| from langchain import PromptTemplate | |
| import re | |
| import pandas as pd | |
| from langchain.vectorstores import FAISS | |
| import requests | |
| from typing import List | |
| from langchain.schema import ( | |
| SystemMessage, | |
| HumanMessage, | |
| AIMessage | |
| ) | |
| import os | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.chat_models import ChatOpenAI | |
| from fuzzywuzzy import process | |
| from langchain.llms.base import LLM | |
| from typing import Optional, List, Mapping, Any | |
| import anthropic | |
| import ast | |
| CHARACTER_CUT_OFF = 20000 | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| embeddings = HuggingFaceEmbeddings(model_name = "msmarco-distilbert-base-dot-prod-v3") | |
| db = FAISS.load_local('retrieval_db', embeddings) | |
| mp_docs = {} | |
| llm = ChatOpenAI( | |
| temperature=0, | |
| model='gpt-3.5-turbo-16k' | |
| ) | |
| # List of product names | |
| products = [] | |
| for id in db.index_to_docstore_id.values(): | |
| prod_name = db.docstore.search(id).metadata['Product Name'] | |
| products.append(prod_name) | |
| def add_text(history, text): | |
| print(history) | |
| history = history + [(text, None)] | |
| return history, "" | |
| # Claude Custom Class | |
| class ClaudeLLM(LLM): | |
| def _llm_type(self) -> str: | |
| return "custom" | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: | |
| client = anthropic.Client(os.environ['ANTHROPIC_KEY']) | |
| # How about the formatted prompt? | |
| prompt_formatted = ( | |
| f"{anthropic.HUMAN_PROMPT}{prompt}\n{anthropic.AI_PROMPT}" | |
| ) | |
| response = client.completion( | |
| prompt=prompt_formatted, | |
| stop_sequences=[anthropic.HUMAN_PROMPT], | |
| model="claude-instant-v1-100k", | |
| max_tokens_to_sample=100000, | |
| temperature=0.3, | |
| ) | |
| return response["completion"] | |
| # return Bard().get_answer(prompt)['content'] | |
| # response = requests.post( | |
| # "http://127.0.0.1:8000/prompt", | |
| # json={ | |
| # "prompt": prompt, | |
| # "temperature": 0, | |
| # "max_new_tokens": 256, | |
| # "stop": stop + ["Observation:"] | |
| # } | |
| # ) | |
| # response.raise_for_status() | |
| # return response.json()["response"] | |
| def _identifying_params(self) -> Mapping[str, Any]: | |
| """Get the identifying parameters.""" | |
| return { | |
| } | |
| def identify_products(query): | |
| llm = ClaudeLLM() | |
| prompt = PromptTemplate( | |
| input_variables=["query",], | |
| template=""" | |
| Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request. | |
| Instruction: | |
| Extract Product Names from the following query in a list like the following example. | |
| Example: | |
| Query: What is the difference between A71 and A81 products? | |
| Response: ['A71', 'A81'] | |
| End of Example. | |
| Query: {query} | |
| Response: | |
| """, | |
| ) | |
| chain = LLMChain(llm=llm, prompt=prompt) | |
| response = chain.run(query=query,) | |
| return ast.literal_eval(response.strip()) | |
| def correct_product_name(user_input, products, threshold=50): | |
| best_match = process.extractOne(user_input, products) | |
| # If the match percentage is above the threshold, return the match, otherwise return "No suggestions found" | |
| if best_match[1] >= threshold: | |
| return best_match[0] | |
| else: | |
| return 0 | |
| def retrieve_products(query): | |
| idx_prods = identify_products(query) | |
| correct_prods = [] | |
| print(idx_prods) | |
| for prod in idx_prods: | |
| adj_prod = correct_product_name(prod, products) | |
| if (adj_prod): | |
| correct_prods.append(adj_prod) | |
| print(correct_prods) | |
| if correct_prods: | |
| prods_mp = {} | |
| for prod in correct_prods: | |
| # Retrieve from db, each product 4-chunks | |
| docs = db.similarity_search(query = "", filter = {'Product Name': prod}, k = 4, fetch_k = 49190) | |
| prods_mp[prod] = "\n".join([doc.page_content for doc in docs]) | |
| return prods_mp | |
| return False | |
| # How to access the file? Where is it saved? | |
| def add_file(history, files): | |
| history = [] | |
| files = files[0] | |
| docs = [] | |
| for file in files: | |
| loader = UnstructuredPDFLoader(file.name) | |
| text = loader.load() | |
| # pdf_content = pdf2text(file.name) | |
| docs += text_splitter.split_documents(text) | |
| # print(docs[0]) | |
| global db | |
| if(type(db) != str): | |
| # local_db = | |
| docs += list(db.docstore._dict.values()) | |
| print(docs) | |
| history = history + [(f"{len(files)} PDF(s) Uploaded", None),] | |
| db = FAISS.from_documents(docs, embeddings) | |
| print(f"History in add file: {history}") | |
| # print(db.docstore()) | |
| print(type(history), type(history[0])) | |
| return ([history,],) | |
| def qa_retrieve(chatlog,): | |
| print(f"Chatlog qa: {chatlog}") | |
| query = chatlog[-1][0] | |
| docs = "" | |
| global db | |
| print(db) | |
| global mp_docs | |
| mp_products = retrieve_products(query) | |
| if not(mp_products): | |
| if mp_docs: | |
| mp_products = mp_docs | |
| else: | |
| mp_docs = mp_products | |
| mp_products = [f"Product: {product}\n Information: {information}" for product, information in mp_products.items()] | |
| print(f"DOCS RETRIEVED: {mp_docs.values}") | |
| prompt = PromptTemplate( | |
| input_variables=["query", "products"], | |
| template=""" | |
| You are a Physics specialist that that can identify main products and their characteristics from the products you have been given. | |
| You will help the user with questions related to different electronic products, retrieving advanced physics equations, tables and many other valuable information between products. | |
| Once you have reviewed the documents, you will be able to offer detailed analysis and guidance based on the user's question. | |
| Answer the following question: {query} | |
| Use the following documents for each product: | |
| {products} | |
| Only use the factual information from the documents to answer the question. You are very careful in the products or theory name and will not invent or imagine names. | |
| Make sure to always answer the question with the same language the question is in. If it's in chinese make sure to answer in chinese. If it's in english answer in english. | |
| If you feel like you don't have enough information to answer the question, say "I don't know". | |
| """, | |
| ) | |
| # llm = BardLLM() | |
| chain = LLMChain(llm=llm, prompt = prompt) | |
| response = chain.run(query=query, products="\n".join(mp_products)) | |
| chatlog[-1][1] = response | |
| return chatlog | |
| def flush(): | |
| return None | |
| with gr.Blocks() as demo: | |
| chatbot = gr.Chatbot([], elem_id="chatbot").style(height=750) | |
| with gr.Row(): | |
| with gr.Column(scale=0.65): | |
| txt = gr.components.Textbox( | |
| placeholder="Ask me anything",show_label=False | |
| ) | |
| with gr.Column(scale=0.15, min_width=0): | |
| btn = gr.UploadButton("📁", file_types=["text"], file_count = 'multiple') | |
| # with gr.Row(): | |
| # with gr.Column(scale=0.85): | |
| # url = gr.components.Textbox( | |
| # label="Website URLs", | |
| # placeholder="https://www.example.org/ https://www.example.com/", | |
| # ) | |
| with gr.Column(scale=0.15, min_width = 0): | |
| send_btn = gr.Button("📨") | |
| with gr.Row(): | |
| with gr.Column(): | |
| clear = gr.Button("Clear") | |
| pdf_content = gr.Textbox("", visible = False) | |
| txt.submit(add_text, [chatbot, txt], [chatbot, txt]).then( | |
| qa_retrieve, [chatbot], chatbot | |
| ).then(lambda : (None), outputs = [ pdf_content]) | |
| btn.upload(add_file, [chatbot, btn], [chatbot,], batch = True).then(qa_retrieve, [chatbot], chatbot) | |
| send_btn.click(add_text, [chatbot, txt, ], [chatbot, txt]).then( | |
| qa_retrieve, [chatbot, ], chatbot).then(lambda : None, outputs = [ pdf_content]) | |
| clear.click(flush, None, outputs = chatbot, queue=False) | |
| demo.queue(concurrency_count = 4) | |
| demo.launch() | |