Spaces:
Sleeping
Sleeping
| from typing import Any | |
| import gradio as gr | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.vectorstores import Chroma | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.document_loaders import PyPDFLoader | |
| import fitz | |
| from PIL import Image | |
| import chromadb | |
| import re | |
| import uuid | |
| enable_box = gr.Textbox.update(value = None, placeholder = 'Upload your OpenAI API key',interactive = True) | |
| disable_box = gr.Textbox.update(value = 'OpenAI API key is Set', interactive = False) | |
| def set_apikey(api_key: str): | |
| app.OPENAI_API_KEY = api_key | |
| return disable_box | |
| def enable_api_box(): | |
| return enable_box | |
| def add_text(history, text: str): | |
| if not text: | |
| raise gr.Error('enter text') | |
| history = history + [(text,'')] | |
| return history | |
| class my_app: | |
| def __init__(self, OPENAI_API_KEY: str = None ) -> None: | |
| self.OPENAI_API_KEY: str = OPENAI_API_KEY | |
| self.chain = None | |
| self.chat_history: list = [] | |
| self.N: int = 0 | |
| self.count: int = 0 | |
| def __call__(self, file: str) -> Any: | |
| if self.count==0: | |
| self.chain = self.build_chain(file) | |
| self.count+=1 | |
| return self.chain | |
| def chroma_client(self): | |
| #create a chroma client | |
| client = chromadb.Client() | |
| #create a collecyion | |
| collection = client.get_or_create_collection(name="my-collection") | |
| return client | |
| def process_file(self,file: str): | |
| loader = PyPDFLoader(file.name) | |
| documents = loader.load() | |
| pattern = r"/([^/]+)$" | |
| match = re.search(pattern, file.name) | |
| file_name = match.group(1) | |
| return documents, file_name | |
| def build_chain(self, file: str): | |
| documents, file_name = self.process_file(file) | |
| #Load embeddings model | |
| embeddings = OpenAIEmbeddings(openai_api_key=self.OPENAI_API_KEY) | |
| pdfsearch = Chroma.from_documents(documents, embeddings, collection_name= file_name,) | |
| chain = ConversationalRetrievalChain.from_llm( | |
| ChatOpenAI(temperature=0.0, openai_api_key=self.OPENAI_API_KEY), | |
| retriever=pdfsearch.as_retriever(search_kwargs={"k": 1}), | |
| return_source_documents=True,) | |
| return chain | |
| def get_response(history, query, file): | |
| if not file: | |
| raise gr.Error(message='Upload a PDF') | |
| chain = app(file) | |
| result = chain({"question": query, 'chat_history':app.chat_history},return_only_outputs=True) | |
| app.chat_history += [(query, result["answer"])] | |
| app.N = list(result['source_documents'][0])[1][1]['page'] | |
| for char in result['answer']: | |
| history[-1][-1] += char | |
| yield history,'' | |
| def render_file(file): | |
| doc = fitz.open(file.name) | |
| page = doc[app.N] | |
| #Render the page as a PNG image with a resolution of 300 DPI | |
| pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) | |
| image = Image.frombytes('RGB', [pix.width, pix.height], pix.samples) | |
| return image | |
| def render_first(file): | |
| doc = fitz.open(file.name) | |
| page = doc[0] | |
| #Render the page as a PNG image with a resolution of 300 DPI | |
| pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) | |
| image = Image.frombytes('RGB', [pix.width, pix.height], pix.samples) | |
| return image,[] | |
| app = my_app() | |
| with gr.Blocks() as demo: | |
| with gr.Column(): | |
| with gr.Row(): | |
| with gr.Column(scale=0.8): | |
| api_key = gr.Textbox(placeholder='Enter OpenAI API key', show_label=False, interactive=True).style(container=False) | |
| with gr.Column(scale=0.2): | |
| change_api_key = gr.Button('Change Key') | |
| with gr.Row(): | |
| chatbot = gr.Chatbot(value=[], elem_id='chatbot').style(height=650) | |
| show_img = gr.Image(label='Upload PDF', tool='select' ).style(height=680) | |
| with gr.Row(): | |
| with gr.Column(scale=0.60): | |
| txt = gr.Textbox( | |
| show_label=False, | |
| placeholder="Enter text and press enter", | |
| ).style(container=False) | |
| with gr.Column(scale=0.20): | |
| submit_btn = gr.Button('submit') | |
| with gr.Column(scale=0.20): | |
| btn = gr.UploadButton("📁 upload a PDF", file_types=[".pdf"]).style() | |
| api_key.submit( | |
| fn=set_apikey, | |
| inputs=[api_key], | |
| outputs=[api_key,]) | |
| change_api_key.click( | |
| fn= enable_api_box, | |
| outputs=[api_key]) | |
| btn.upload( | |
| fn=render_first, | |
| inputs=[btn], | |
| outputs=[show_img,chatbot],) | |
| submit_btn.click( | |
| fn=add_text, | |
| inputs=[chatbot,txt], | |
| outputs=[chatbot, ], | |
| queue=False).success( | |
| fn=get_response, | |
| inputs = [chatbot, txt, btn], | |
| outputs = [chatbot,txt]).success( | |
| fn=render_file, | |
| inputs = [btn], | |
| outputs=[show_img] | |
| ) | |
| demo.queue() | |
| demo.launch() |