ask-ASH / app.py
elia-waefler's picture
licence
3b72722
"""
incomplete, but functional RAG App
stores vectors in session state, or locally.
add function to display retrieved documents
"""
import time
from datetime import datetime
# import openai
# import tiktoken
import streamlit as st
from PyPDF2 import PdfReader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceInstructEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from html_templates import css, bot_template, user_template
from langchain.llms import HuggingFaceHub
import os
import numpy as np
import faiss_utils
from langchain_community.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
def merge_faiss_indices(index1, index2):
"""
Merge two FAISS indices into a new index, assuming both are of the same type and dimensionality.
Args:
index1 (faiss.Index): The first FAISS index.
index2 (faiss.Index): The second FAISS index.
Returns:
faiss.Index: A new FAISS index containing all vectors from index1 and index2.
"""
# Check if both indices are the same type
if type(index1) != type(index2):
raise ValueError("Indices are of different types")
# Check dimensionality
if index1.d != index2.d:
raise ValueError("Indices have different dimensionality")
# Determine type of indices
if isinstance(index1, FAISS.IndexFlatL2):
# Handle simple flat indices
d = index1.d
# Extract vectors from both indices
xb1 = FAISS.rev_swig_ptr(index1.xb.data(), index1.ntotal * d)
xb2 = FAISS.rev_swig_ptr(index2.xb.data(), index2.ntotal * d)
# Combine vectors
xb_combined = np.vstack((xb1, xb2))
# Create a new index and add combined vectors
new_index = FAISS.IndexFlatL2(d)
new_index.add(xb_combined)
return new_index
elif isinstance(index1, FAISS.IndexIVFFlat):
# Handle quantized indices (IndexIVFFlat)
d = index1.d
nlist = index1.nlist
quantizer = FAISS.IndexFlatL2(d) # Re-create the appropriate quantizer
# Create a new index with the same configuration
new_index = FAISS.IndexIVFFlat(quantizer, d, nlist, FAISS.METRIC_L2)
# If the indices are already trained, you can directly add the vectors
# Otherwise, you may need to train new_index using a representative subset of vectors
vecs1 = FAISS.rev_swig_ptr(index1.xb.data(), index1.ntotal * d)
vecs2 = FAISS.rev_swig_ptr(index2.xb.data(), index2.ntotal * d)
new_index.add(vecs1)
new_index.add(vecs2)
return new_index
else:
raise TypeError("Index type not supported for merging in this function")
def get_pdf_text(pdf_docs):
text = ""
for pdf in pdf_docs:
pdf_reader = PdfReader(pdf)
for page in pdf_reader.pages:
text += page.extract_text()
return text
def get_text_chunks(text):
text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
chunks = text_splitter.split_text(text)
return chunks
def get_faiss_vectorstore(text_chunks):
if sst.openai:
my_embeddings = OpenAIEmbeddings()
else:
my_embeddings = HuggingFaceInstructEmbeddings(model_name="hkunlp/instructor-xl")
vectorstore = FAISS.from_texts(texts=text_chunks, embedding=my_embeddings)
return vectorstore
def get_conversation_chain(vectorstore):
if sst.openai:
llm = ChatOpenAI()
else:
llm = HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature": 0.5, "max_length": 512})
memory = ConversationBufferMemory(
memory_key='chat_history', return_messages=True)
conversation_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vectorstore.as_retriever(),
memory=memory
)
return conversation_chain
def handle_userinput(user_question):
response = sst.conversation({'question': user_question})
sst.chat_history = response['chat_history']
for i, message in enumerate(sst.chat_history):
# Display user message
if i % 2 == 0:
st.write(user_template.replace("{{MSG}}", message.content), unsafe_allow_html=True)
else:
print(message)
# Display AI response
st.write(bot_template.replace("{{MSG}}", message.content), unsafe_allow_html=True)
# Display source document information if available in the message
def save_uploaded_file(uploaded_file):
try:
# Create a static folder if it doesn't exist
if not os.path.exists('static'):
os.makedirs('static')
# Write the uploaded file to a new file in the static directory
with open(os.path.join('static', uploaded_file.name), "wb") as f:
f.write(uploaded_file.getbuffer())
return True
except Exception as e:
print(e)
return False
if True:
BASE_URL = "https://api.vectara.io/v1"
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
OPENAI_ORG_ID = os.environ["OPENAI_ORG_ID"]
PINECONE_API_KEY = os.environ["PINECONE_API_KEY_LCBIM"]
HUGGINGFACEHUB_API_TOKEN = os.environ["HUGGINGFACEHUB_API_TOKEN"]
VECTARA_API_KEY = os.environ["VECTARA_API_KEY"]
VECTARA_CUSTOMER_ID = os.environ["VECTARA_CUSTOMER_ID"]
headers = {"Authorization": f"Bearer {VECTARA_API_KEY}", "Content-Type": "application/json"}
def main():
st.set_page_config(page_title="Anna Seiler Haus KI-Assistent", page_icon=":hospital:")
st.write(css, unsafe_allow_html=True)
if True:
if "conversation" not in sst:
sst.conversation = None
if "chat_history" not in sst:
sst.chat_history = None
if "page" not in sst:
sst.page = "home"
if "openai" not in sst:
sst.openai = True
if "login" not in sst:
sst.login = False
if 'submitted_user_query' not in sst:
sst.submitted_user_query = ''
if 'submitted_user_safe' not in sst:
sst.submitted_user_safe = ''
if 'submitted_user_load' not in sst:
sst.submitted_user_load = ''
if 'widget_user_load' not in sst:
sst.widget_user_load = 'U3_alle' # Init the vectorstore
if 'vectorstore' not in sst:
sst.vectorstore = None
def submit_user_query():
sst.submitted_user_query = sst.widget_user_query
sst.widget_user_query = ''
def submit_user_safe():
sst.submitted_user_safe = sst.widget_user_safe
sst.widget_user_safe = ''
if sst.vectorstore is not None:
# faiss_name = str(datetime.now().strftime("%Y%m%d%H%M%S")) + "faiss_index"
faiss_utils.save_local(sst.vectorstore, path=sst.submitted_user_safe)
st.sidebar.success("saved")
else:
st.sidebar.warning("No embeddings to save. Please process documents first.")
def submit_user_load():
sst.submitted_user_load = sst.widget_user_load
sst.widget_user_load = ''
if os.path.exists(sst.submitted_user_load):
new_db = faiss_utils.load_vectorstore(f"{sst.submitted_user_load}/faiss_index.index")
if sst.vectorstore is not None:
if new_db is not None: # Check if this is working
sst.vectorstore.merge_from(new_db)
sst.conversation = get_conversation_chain(sst.vectorstore)
st.sidebar.success("faiss loaded")
else:
if new_db is not None: # Check if this is working
sst.vectorstore = new_db
sst.conversation = get_conversation_chain(new_db)
st.sidebar.success("faiss loaded")
else:
st.sidebar.warning("Couldn't load/find embeddings")
st.header("Anna Seiler Haus KI-Assistent ASH :hospital:")
if st.toggle("show README"):
st.subheader("Funktion: ")
st.write("dieses proof-of-concept von Elia Wäfler demonstriert das Potential von RAG (Retrival Augmented Generation) für BIM2FM Dokumentenablagen am Beispiel Dokumente U3 ASH (Anna Seiler Haus, Inselspital Bern). chatte mit den Dokumenten, oder lade selber ein oder mehrere PDF-Dokumente hoch, um RAG auszuprobieren. die vektoren werden lokal oder im st.session_state gespeichert. Feedback und Bugs gerne an elia.waefler@insel.ch")
st.write("Vielen Dank.")
st.write("")
st.subheader("Licence and credits")
st.write("THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.")
st.write("special thanks to OpenAI, STREAMLIT, HUGGINGFACE, LANGCHAIN and alejandro-ao")
l, r = st.columns(2)
with l:
st.subheader("Limitationen: ")
st.write("bisher nur Text aus PDFs")
st.write("macht Fehler, kann falsche Informationen geben")
st.write("prompts werden bisher nicht geprüft")
st.write("")
with r:
st.subheader("geplante Erweiterungen:")
st.write("Tabellen, Bilder werden auch vektorisiert, um die retrival qualität zu verbessern")
st.write("on permise anwendung mit mixtral 7b oder vergleichbar")
st.write("Ecodomus API einbinden, um alle Dokumente einzubinden.")
st.write("")
if sst.login:
#user_question = st.text_input("Ask a question about your documents:", key="user_query", on_change=handle_query)
st.text_input('Ask a question about your documents:', key='widget_user_query', on_change=submit_user_query)
#sst.openai = st.toggle(label="use openai?")
if sst.submitted_user_query:
if sst.vectorstore is not None:
handle_userinput(sst.submitted_user_query)
sst.submitted_user_query = False
else:
st.warning("no vectorstore loaded.")
with st.sidebar:
st.subheader("Your documents")
pdf_docs = st.file_uploader("Upload your PDFs here and click on 'Process'", accept_multiple_files=True)
if st.button("Process"):
with st.spinner("Processing"):
vec = get_faiss_vectorstore(get_text_chunks(get_pdf_text(pdf_docs)))
sst.vectorstore = vec
sst.conversation = get_conversation_chain(vec)
st.success("embedding complete")
st.text_input('Safe Embeddings to: (copy path of folder)', key='widget_user_safe',
on_change=submit_user_safe)
st.text_input('Load Embeddings from: (copy path of folder)', key='widget_user_load',
on_change=submit_user_load)
if st.toggle("reset vectorstore?"):
if st.button("Yes, reset"):
sst.vectorstore = None
st.warning("vectorstore reset complete")
else:
st.warning("unsaved embeddings will be lost.")
else:
user_pw = st.text_input("ASK_ASH_PASSWORD: ", type="password")
if st.button("check"):
time.sleep(0.5)
if user_pw == ASK_ASH_PASSWORD:
sst.login = True
if "first_load" not in sst:
submit_user_load()
sst.first_load = True
st.rerun()
if __name__ == '__main__':
sst = st.session_state
ASK_ASH_PASSWORD = os.environ["ASK_ASH_PASSWORD"]
main()