ash2203's picture
Update app.py
0590ae6 verified
import os
import time
import streamlit as st
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import TextLoader, PyMuPDFLoader, Docx2txtLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing import List
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_core.runnables import RunnablePassthrough
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain_chroma import Chroma
import shutil
import uuid
from dotenv import load_dotenv
load_dotenv()
# Set page configuration
st.set_page_config(page_title="Document Analyzer", layout="wide")
st.title("📚 Document Analyzer")
# Add instructions in an expander
with st.expander("ℹ️ Click here to view instructions"):
st.markdown("""
- Upload files by clicking on "Browse Files"
- Avoid interrupting when file/files are under processing, this interrupts the execution and you would have to refresh the page to run the webapp again
- You can add more files anytime, just avoid adding/removing files when it's processing the uploaded documents
- The processing will trigger whenever you make any changes to the files
""")
# Initialize session states
if 'initialized' not in st.session_state:
st.session_state.initialized = False
if 'processing' not in st.session_state:
st.session_state.processing = False
if 'chat_enabled' not in st.session_state:
st.session_state.chat_enabled = False
if 'session_id' not in st.session_state:
# Generate a unique session ID using UUID
st.session_state.session_id = str(uuid.uuid4())[:8]
def get_chroma_directory():
"""Get unique directory name for current session's ChromaDB"""
base_dir = "vectorstores"
if not os.path.exists(base_dir):
os.makedirs(base_dir)
return os.path.join(base_dir, f"chroma_db_{st.session_state.session_id}")
def cleanup_chroma_db():
"""Clean up existing ChromaDB for the current session"""
try:
chroma_dir = get_chroma_directory()
if os.path.exists(chroma_dir):
shutil.rmtree(chroma_dir)
except Exception as e:
print(f"Error cleaning up ChromaDB: {str(e)}") # Log error internally
def cleanup_old_vectorstores():
"""Clean up vector stores that are older than 24 hours"""
try:
base_dir = "vectorstores"
if not os.path.exists(base_dir):
return
current_time = time.time()
one_day_in_seconds = 24 * 60 * 60
# Get all directories in vectorstores
for dir_name in os.listdir(base_dir):
dir_path = os.path.join(base_dir, dir_name)
if os.path.isdir(dir_path):
# Get directory's last modification time
last_modified = os.path.getmtime(dir_path)
if current_time - last_modified > one_day_in_seconds:
shutil.rmtree(dir_path)
except Exception as e:
print(f"Error cleaning up old vector stores: {str(e)}") # Log error internally
if not st.session_state.initialized:
# Clean up old vector stores first
cleanup_old_vectorstores()
# Clear everything only on first run or page refresh
if os.path.exists("data"):
shutil.rmtree("data")
os.makedirs("data")
# Clear vectorstores directory for current session
if os.path.exists("vectorstores"):
os.makedirs("vectorstores", exist_ok=True)
st.session_state.uploaded_files = {}
st.session_state.previous_files = set()
st.session_state.initialized = True
def save_uploaded_file(uploaded_file):
"""Save uploaded file to the data directory"""
try:
# Create full path
file_path = os.path.join("data", uploaded_file.name)
# Save the file
with open(file_path, "wb") as f:
file_bytes = uploaded_file.getvalue() # Get file bytes
f.write(file_bytes)
# Verify file was saved
if os.path.exists(file_path):
return file_path
else:
print(f"File not saved: {file_path}") # Log error internally
return None
except Exception as e:
print(f"Error saving file: {str(e)}") # Log error internally
return None
def process_documents(uploaded_files_dict):
"""Process documents and store in ChromaDB"""
warning_placeholder = st.empty()
warning_placeholder.warning("⚠️ Document processing in progress. Please wait before adding or removing files.")
success_placeholder = st.empty()
try:
with st.spinner('Processing documents...'):
# Clean up existing ChromaDB before processing
cleanup_chroma_db()
docs = []
# Process each file
for filename, file_info in uploaded_files_dict.items():
file_path = file_info["path"]
if not os.path.exists(file_path):
print(f"File not found: {file_path}") # Log error internally
continue
if filename.endswith(".pdf"):
document = PyMuPDFLoader(file_path)
file_doc = document.load()
docs.extend(file_doc)
elif filename.endswith(".txt"):
document = TextLoader(file_path)
file_doc = document.load()
docs.extend(file_doc)
elif filename.endswith(".docx"):
document = Docx2txtLoader(file_path)
file_doc = document.load()
docs.extend(file_doc)
if not docs:
st.warning("Unable to process the documents. Please try again.")
return False
# Split documents
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=400,
length_function=len
)
chunks = text_splitter.split_documents(docs)
# Initialize embeddings
embed_func = OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512)
try:
# Create vectorstore and add documents
vectorstore = Chroma.from_documents(
collection_name="collection",
documents=chunks,
embedding=embed_func,
persist_directory=get_chroma_directory()
)
st.session_state.chat_enabled = True
success_placeholder.success('Documents processed successfully!')
time.sleep(2) # Show success message for 2 seconds
success_placeholder.empty() # Clear the success message
return True
except Exception as e:
print(f"ChromaDB error: {str(e)}") # Log error internally
st.warning("Unable to process documents at the moment. Please try again.")
st.session_state.chat_enabled = False
return False
except Exception as e:
print(f"Processing error: {str(e)}") # Log error internally
st.warning("Unable to process documents at the moment. Please try again.")
st.session_state.chat_enabled = False
return False
finally:
warning_placeholder.empty()
def doc2str(docs):
return "\n\n".join(doc.page_content for doc in docs)
def run_chatbot(retriever, llm):
"""Run the chatbot with the given components"""
# Initialize chat prompt
prompt = ChatPromptTemplate.from_template("""
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
<context>
{context}
</context>
<important>
Don't start revealing context in your responses until its asked. First look at the question and then think if the context is needed to answer this or its a normal question, once you have judged then only answer the question.
When there is no context, just respond on your own knowledge as a normal assistant.
</important>
Answer the following question:
{question}""")
# Create the QA chain
qa_chain = (
RunnablePassthrough.assign(context=lambda input: doc2str(retriever.invoke(input["question"])))
| prompt
| llm
| StrOutputParser()
)
# Initialize messages in session state if not exists
if "messages" not in st.session_state:
st.session_state.messages = []
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
if question := st.chat_input("Ask a question about your documents"):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": question})
with st.chat_message("user"):
st.markdown(question)
# Create a spinner outside the chat message
with st.spinner("Thinking..."):
try:
# Generate response
response = qa_chain.invoke({"question": question})
# Display response in chat message after generation
with st.chat_message("assistant"):
st.markdown(response)
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": response})
except Exception as e:
print(f"Chat error: {str(e)}") # Log error internally
with st.chat_message("assistant"):
error_msg = "I'm having trouble processing your question. Please try asking something else."
st.markdown(error_msg)
st.session_state.messages.append({"role": "assistant", "content": error_msg})
def process_and_chat():
"""Process documents and handle chat interface"""
# File uploader section
with st.container():
uploaded_files = st.file_uploader(
"Upload your documents",
type=["pdf", "txt", "docx"],
accept_multiple_files=True,
key="file_uploader",
label_visibility="collapsed" if st.session_state.processing else "visible"
)
# Get current uploaded filenames
current_uploaded_filenames = {file.name for file in uploaded_files} if uploaded_files else set()
# Check for removed files
files_to_remove = set(st.session_state.uploaded_files.keys()) - current_uploaded_filenames
if files_to_remove:
# Set processing state immediately
st.session_state.processing = True
st.session_state.chat_enabled = False
if "messages" in st.session_state:
del st.session_state.messages
# Clean up ChromaDB when files are removed
cleanup_chroma_db()
for file_name in files_to_remove:
# Remove file from session state
if file_name in st.session_state.uploaded_files:
# Delete the file from data directory
file_path = st.session_state.uploaded_files[file_name]["path"]
if os.path.exists(file_path):
os.remove(file_path)
# Remove from session state
del st.session_state.uploaded_files[file_name]
# Process newly uploaded files
if uploaded_files:
files_added = False
for file in uploaded_files:
# Only process files that haven't been uploaded before
if file.name not in st.session_state.uploaded_files:
# Set processing state immediately when new file is detected
st.session_state.processing = True
st.session_state.chat_enabled = False
if "messages" in st.session_state:
del st.session_state.messages
file_path = save_uploaded_file(file)
if file_path: # Only add to session state if file was saved successfully
st.session_state.uploaded_files[file.name] = {
"path": file_path,
"type": file.type
}
files_added = True
# Check for changes in files
current_files = set(st.session_state.uploaded_files.keys())
# If files have changed (added or removed), reset chat and process documents
if current_files != st.session_state.previous_files or files_to_remove:
st.session_state.previous_files = current_files
if current_files:
# Process documents and enable chat if successful
if process_documents(st.session_state.uploaded_files):
st.session_state.chat_enabled = True
st.session_state.processing = False
else:
st.warning('Please upload a file to continue')
st.session_state.processing = False
# If files exist and chat is enabled, show chat interface
if current_files and st.session_state.chat_enabled:
try:
# Initialize components for chat
llm = ChatGroq(temperature=0, model_name="llama-3.3-70b-versatile", groq_api_key=os.getenv("GROQ_API_KEY"), max_tokens=8000)
# Create vectorstore
embed_func = OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512)
vectorstore = Chroma(
collection_name="collection",
embedding_function=embed_func,
persist_directory=get_chroma_directory()
)
# Create retrievers
vectorstore_retriever = vectorstore.as_retriever(
search_kwargs={"k": 3}
)
# Create keyword retriever
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=400,
length_function=len
)
docs = []
for file_info in st.session_state.uploaded_files.values():
if file_info["path"].endswith(".pdf"):
docs.extend(PyMuPDFLoader(file_info["path"]).load())
elif file_info["path"].endswith(".txt"):
docs.extend(TextLoader(file_info["path"]).load())
elif file_info["path"].endswith(".docx"):
docs.extend(Docx2txtLoader(file_info["path"]).load())
chunks = text_splitter.split_documents(docs)
keyword_retriever = BM25Retriever.from_documents(chunks)
keyword_retriever.k = 3
# Combine retrievers
ensemble_retriever = EnsembleRetriever(
retrievers=[vectorstore_retriever, keyword_retriever],
weights=[0.5, 0.5]
)
# Run chatbot with fresh components
run_chatbot(ensemble_retriever, llm)
except Exception as e:
print(f"Chat interface error: {str(e)}") # Log error internally
st.warning("Please try uploading your documents again.")
st.session_state.chat_enabled = False
# Clear the previous files to force reprocessing
st.session_state.previous_files = set()
if "messages" in st.session_state:
del st.session_state.messages
# Call the main function
process_and_chat()