| import os | |
| from io import BytesIO | |
| from PIL import Image | |
| import google.generativeai as genai | |
| import google.ai.generativelanguage as glm | |
| from langchain.vectorstores import Chroma | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| import streamlit as st | |
| st.title("DocsGPT") | |
| genai.configure(api_key=os.environ['GOOGLE_API_KEY']) | |
| st.markdown( | |
| """ | |
| <style> | |
| .css-1jc7ptx, .e1ewe7hr3, .viewerBadge_container__1QSob, | |
| .styles_viewerBadge__1yB5_, .viewerBadge_link__1S137, | |
| .viewerBadge_text__1JaDK { | |
| display: none; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| rag = glm.Tool( | |
| function_declarations=[ | |
| glm.FunctionDeclaration( | |
| name='vector_search', | |
| description="Returns the content of the document user attached. Make sure that your not passing query as a question use like **keywords** instead. Use this function to search for contents in the user attached or uploaded documents to you. Try not to completly paste the user question as query, instead use keywords.", | |
| parameters=glm.Schema( | |
| type=glm.Type.OBJECT, | |
| properties={ | |
| 'query': glm.Schema(type=glm.Type.STRING), | |
| }, | |
| required=['query'] | |
| ) | |
| ) | |
| ] | |
| ) | |
| gemini = genai.GenerativeModel('gemini-pro', tools=[rag]) | |
| gemini_vision = genai.GenerativeModel('gemini-pro-vision') | |
| class rawkn: | |
| def __init__(self, text): | |
| self.text = text | |
| def get_relevant_documents(self, query): | |
| return self.text | |
| def loader_data(files, include_getting_real): | |
| file_type = files[0].type if len(files) > 0 else "application/pdf" | |
| total_content = '' | |
| num_pages = 0 | |
| if include_getting_real: | |
| files.append("./getting_real_basecamp.pdf") | |
| for file in files: | |
| if file_type == "application/pdf": | |
| pdf_reader = PdfReader(file) | |
| content = '' | |
| for page in pdf_reader.pages: | |
| num_pages += 1 | |
| content += page.extract_text() | |
| for img in page.images: | |
| try: | |
| image_stream = BytesIO(img.data) | |
| img = Image.open(image_stream) | |
| img_desc = gemini_vision.generate_content(["Generate a detailed description of the image. If it is a flow chart, please create a flowchart that exactly as it is. If it is table, try to create a table exactly like in the image. write all the text in the image it it contains any text. Clearly explain the image in more detailed.\nAlso make sure give a nice heading to the image contant.", img]).candidates[0].content.parts[0].text | |
| print("***************************") | |
| print(img_desc) | |
| print("***************************") | |
| content += "Image content:\n" + img_desc | |
| except: | |
| print("cannot extract image") | |
| if file_type == "text/plain": | |
| content = file.read() | |
| content = content.decode("utf-8") | |
| total_content += content | |
| if num_pages <= 2: | |
| chunk_size = 500 | |
| elif num_pages <= 3: | |
| chunk_size = 1000 | |
| elif num_pages <= 5: | |
| chunk_size = 2000 | |
| elif num_pages <= 10: | |
| chunk_size = 3000 | |
| else: | |
| chunk_size = 4000 | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=0) | |
| texts = text_splitter.split_text(total_content) | |
| try: | |
| embeddings = GoogleGenerativeAIEmbeddings(model = "models/embedding-001") | |
| vector_store = Chroma.from_texts(texts, embeddings).as_retriever() | |
| st.session_state.knowledge = vector_store | |
| st.session_state.chat.history.append(glm.Content( | |
| parts=[glm.Part( | |
| text=f"Now i've uploaded some files.\nHere are the list of documents you have access to:\n{[i.name if type(i) != str else i for i in files]}" | |
| )], | |
| role="user" | |
| ) | |
| ) | |
| st.session_state.chat.history.append(glm.Content( | |
| parts=[glm.Part( | |
| text=f"Sure! Ask me anything about the documents you have uploaded. I can help you with that." | |
| )], | |
| role="model" | |
| ) | |
| ) | |
| except: | |
| st.session_state.knowledge = rawkn(total_content) | |
| if "history" not in st.session_state: | |
| st.session_state.history = [] | |
| if "knowledge" not in st.session_state: | |
| st.session_state.knowledge = None | |
| if "chat" not in st.session_state: | |
| st.session_state.chat = gemini.start_chat(history=[glm.Content( | |
| parts=[glm.Part( | |
| text="Your name is DocsGPT. You are very helpful and can assist with documents uploaded by the user. Use the vector_search tool/function to search for contents in the user attached or uploaded documents to you.\nYou have access to all documents uploaded by the user." | |
| )], | |
| role="user" | |
| ), | |
| glm.Content( | |
| parts=[glm.Part( | |
| text="Sure, i can do that for you." | |
| )], | |
| role="model" | |
| )]) | |
| for history in st.session_state.history: | |
| with st.chat_message(history["role"]): | |
| st.markdown(history["text"]) | |
| with st.sidebar: | |
| st.title("Knowledge") | |
| st.markdown("""### Tips to use DocsGPT: | |
| - Upload your documents [pdf, txt] to DocsGPT and make sure to click on the process button. | |
| - wait for a second and then start chatting with DocsGPT. | |
| - While asking questions to DocsGPT about your uploaded files, please refer your uploaded files as *Document*, *Docs*, *attached or uploaded docs*, so the model can easily understands what you are referring to.""") | |
| files = st.file_uploader("Upload a file", accept_multiple_files=True, type=["pdf", "txt"]) | |
| include_getting_real = st.checkbox("Include getting-real?") | |
| process = st.button("Process") | |
| if process and files: | |
| with st.spinner('loading your file. This may take a while...'): | |
| loader_data(files, include_getting_real) | |
| elif process and include_getting_real: | |
| with st.spinner('loading your file. This may take a while...'): | |
| loader_data([], include_getting_real) | |
| if prompt := st.chat_input("Enter your message..."): | |
| st.session_state.history.append({"role": "user", "text": prompt}) | |
| with st.chat_message("user"): | |
| st.markdown(prompt) | |
| with st.chat_message("assistant"): | |
| message_placeholder = st.empty() | |
| response = st.session_state.chat.send_message(prompt) | |
| if response.candidates[0].content.parts[0].text == '': | |
| args = response.candidates[0].content.parts[0].function_call.args['query'] | |
| if st.session_state.knowledge is not None: | |
| print("searching for ", args) | |
| related_docs = str(st.session_state.knowledge.get_relevant_documents(args)) | |
| print(related_docs) | |
| else: | |
| related_docs = 'No knowledge documents loaded' | |
| response = st.session_state.chat.send_message( | |
| glm.Content( | |
| parts=[glm.Part( | |
| function_response = glm.FunctionResponse( | |
| name='vector_search', | |
| response={'rag': related_docs}, | |
| ) | |
| )] | |
| ) | |
| ).candidates[0].content.parts[0].text | |
| else: | |
| response = response.candidates[0].content.parts[0].text | |
| print(st.session_state.chat.history) | |
| message_placeholder.markdown(response) | |
| st.session_state.history.append({"role": "assistant", "text": response}) | |