|
|
import gradio as gr |
|
|
import os |
|
|
import PyPDF2 |
|
|
from llama_index.core.retrievers import VectorIndexRetriever |
|
|
from llama_index.core.query_engine import RetrieverQueryEngine |
|
|
from llama_index.core import Settings, SimpleDirectoryReader, VectorStoreIndex |
|
|
from llama_index.core.postprocessor import SimilarityPostprocessor |
|
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
|
from llama_index.core import Document |
|
|
import re |
|
|
from transformers import LlamaTokenizer, LlamaForCausalLM |
|
|
|
|
|
|
|
|
print("started...........................................................") |
|
|
model_path = "/mnt/data1/backup/viswaz/Project_K/huggingface_cache/Mistral-7B-Instruct-v0.2" |
|
|
tokenizer = LlamaTokenizer.from_pretrained(model_path) |
|
|
model = LlamaForCausalLM.from_pretrained( |
|
|
model_path, |
|
|
device_map="auto", |
|
|
cache_dir="/mnt/data1/backup/viswaz/Project_K/huggingface_cache/", |
|
|
) |
|
|
|
|
|
|
|
|
print("stage1 ------------------ completed") |
|
|
|
|
|
def generate_response(prompt, model): |
|
|
encoded_input = tokenizer(prompt, return_tensors="pt", add_special_tokens=True) |
|
|
model_inputs = encoded_input.to('cuda') |
|
|
|
|
|
generated_ids = model.generate(**model_inputs, |
|
|
max_new_tokens=150, |
|
|
do_sample=True, |
|
|
pad_token_id=tokenizer.eos_token_id) |
|
|
|
|
|
return generated_ids |
|
|
|
|
|
print("stage2 ------------------ completed") |
|
|
|
|
|
intstructions_string = f""" you are a textbot that helps in finding answers to questions in the research papers, blogs,pdf's or any text context. |
|
|
,make your answers more meaningful and short" |
|
|
|
|
|
please answer the following question |
|
|
""" |
|
|
|
|
|
prompt_template_w_context = lambda context, question: f'''[INST] {intstructions_string} |
|
|
|
|
|
{context} |
|
|
|
|
|
Please answer to the following question. Use the context above if it is helpful. |
|
|
|
|
|
{question} |
|
|
|
|
|
[/INST]''' |
|
|
|
|
|
print("stage3 ------------------ completed") |
|
|
|
|
|
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") |
|
|
Settings.llm = None |
|
|
Settings.chunk_size = 256 |
|
|
Settings.chunk_overlap = 25 |
|
|
top_k = 3 |
|
|
|
|
|
print("stage4 ------------------ completed") |
|
|
|
|
|
def respond(files, question): |
|
|
questions = question.split("\n") |
|
|
documents = [] |
|
|
answers ="" |
|
|
texts_between_inst_and_eos = [] |
|
|
for file in files: |
|
|
if file.endswith('.pdf'): |
|
|
pdf_file_obj = open(file, 'rb') |
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file_obj) |
|
|
text = '' |
|
|
for page_num in range(len(pdf_reader.pages)): |
|
|
page_obj = pdf_reader.pages[page_num] |
|
|
text += page_obj.extract_text() |
|
|
documents.append(Document(text=text)) |
|
|
else: |
|
|
with open(file, 'r') as f: |
|
|
documents.append(Document(text=f.read())) |
|
|
|
|
|
print("stage5 ------------------ completed") |
|
|
|
|
|
joined_documents = "" |
|
|
for doc in documents: |
|
|
if "Member-only story" in doc.text: |
|
|
continue |
|
|
|
|
|
if "The Data Entrepreneurs" in doc.text: |
|
|
continue |
|
|
|
|
|
if " min read" in doc.text: |
|
|
continue |
|
|
|
|
|
joined_documents += doc.text |
|
|
|
|
|
print("stage6 ------------------ completed") |
|
|
|
|
|
index = VectorStoreIndex.from_documents([Document(text=joined_documents)]) |
|
|
|
|
|
retriever = VectorIndexRetriever( |
|
|
index=index, |
|
|
similarity_top_k=top_k, |
|
|
) |
|
|
|
|
|
query_engine = RetrieverQueryEngine( |
|
|
retriever=retriever, |
|
|
node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.5)], |
|
|
) |
|
|
for each_question in questions: |
|
|
response_each = query_engine.query(each_question) |
|
|
context = "Context:\n" |
|
|
for j in range(top_k): |
|
|
context = context + response_each.source_nodes[j].text + "\n\n" |
|
|
|
|
|
prompt = prompt_template_w_context(context, each_question) |
|
|
|
|
|
outputs = generate_response(prompt, model) |
|
|
|
|
|
decoded_output = tokenizer.batch_decode(outputs)[0] |
|
|
|
|
|
inst_eos_texts = re.findall(r'\[\/INST\](.*?)\<\/s\>', decoded_output, re.DOTALL) |
|
|
texts_between_inst_and_eos.extend(inst_eos_texts) |
|
|
answers += ' '.join(texts_between_inst_and_eos) |
|
|
|
|
|
print("stage7 ------------------ completed") |
|
|
print(answers) |
|
|
return answers |
|
|
|
|
|
print("stage8 ------------------ completed") |
|
|
|
|
|
inputs = [ |
|
|
gr.File(type="filepath", label="Upload PDF/Text Files",file_count="multiple"), |
|
|
gr.Text(label="Enter your question here") |
|
|
] |
|
|
|
|
|
print("stage9 ------------------ completed") |
|
|
|
|
|
output = gr.Text() |
|
|
|
|
|
print("stage10 ------------------ completed") |
|
|
|
|
|
interface = gr.Interface(respond, inputs=inputs, outputs=output, title="Question Answering System") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
interface.launch() |
|
|
|