import os import time import streamlit as st from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_community.document_loaders import TextLoader, PyMuPDFLoader, Docx2txtLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from typing import List from langchain_core.documents import Document from langchain_openai import OpenAIEmbeddings from langchain_core.runnables import RunnablePassthrough from langchain_community.retrievers import BM25Retriever from langchain.retrievers import EnsembleRetriever from langchain_chroma import Chroma import shutil import uuid from dotenv import load_dotenv load_dotenv() # Set page configuration st.set_page_config(page_title="Document Analyzer", layout="wide") st.title("📚 Document Analyzer") # Add instructions in an expander with st.expander("â„šī¸ Click here to view instructions"): st.markdown(""" - Upload files by clicking on "Browse Files" - Avoid interrupting when file/files are under processing, this interrupts the execution and you would have to refresh the page to run the webapp again - You can add more files anytime, just avoid adding/removing files when it's processing the uploaded documents - The processing will trigger whenever you make any changes to the files """) # Initialize session states if 'initialized' not in st.session_state: st.session_state.initialized = False if 'processing' not in st.session_state: st.session_state.processing = False if 'chat_enabled' not in st.session_state: st.session_state.chat_enabled = False if 'session_id' not in st.session_state: # Generate a unique session ID using UUID st.session_state.session_id = str(uuid.uuid4())[:8] def get_chroma_directory(): """Get unique directory name for current session's ChromaDB""" base_dir = "vectorstores" if not os.path.exists(base_dir): os.makedirs(base_dir) return os.path.join(base_dir, f"chroma_db_{st.session_state.session_id}") def cleanup_chroma_db(): """Clean up existing ChromaDB for the current session""" try: chroma_dir = get_chroma_directory() if os.path.exists(chroma_dir): shutil.rmtree(chroma_dir) except Exception as e: print(f"Error cleaning up ChromaDB: {str(e)}") # Log error internally def cleanup_old_vectorstores(): """Clean up vector stores that are older than 24 hours""" try: base_dir = "vectorstores" if not os.path.exists(base_dir): return current_time = time.time() one_day_in_seconds = 24 * 60 * 60 # Get all directories in vectorstores for dir_name in os.listdir(base_dir): dir_path = os.path.join(base_dir, dir_name) if os.path.isdir(dir_path): # Get directory's last modification time last_modified = os.path.getmtime(dir_path) if current_time - last_modified > one_day_in_seconds: shutil.rmtree(dir_path) except Exception as e: print(f"Error cleaning up old vector stores: {str(e)}") # Log error internally if not st.session_state.initialized: # Clean up old vector stores first cleanup_old_vectorstores() # Clear everything only on first run or page refresh if os.path.exists("data"): shutil.rmtree("data") os.makedirs("data") # Clear vectorstores directory for current session if os.path.exists("vectorstores"): os.makedirs("vectorstores", exist_ok=True) st.session_state.uploaded_files = {} st.session_state.previous_files = set() st.session_state.initialized = True def save_uploaded_file(uploaded_file): """Save uploaded file to the data directory""" try: # Create full path file_path = os.path.join("data", uploaded_file.name) # Save the file with open(file_path, "wb") as f: file_bytes = uploaded_file.getvalue() # Get file bytes f.write(file_bytes) # Verify file was saved if os.path.exists(file_path): return file_path else: print(f"File not saved: {file_path}") # Log error internally return None except Exception as e: print(f"Error saving file: {str(e)}") # Log error internally return None def process_documents(uploaded_files_dict): """Process documents and store in ChromaDB""" warning_placeholder = st.empty() warning_placeholder.warning("âš ī¸ Document processing in progress. Please wait before adding or removing files.") success_placeholder = st.empty() try: with st.spinner('Processing documents...'): # Clean up existing ChromaDB before processing cleanup_chroma_db() docs = [] # Process each file for filename, file_info in uploaded_files_dict.items(): file_path = file_info["path"] if not os.path.exists(file_path): print(f"File not found: {file_path}") # Log error internally continue if filename.endswith(".pdf"): document = PyMuPDFLoader(file_path) file_doc = document.load() docs.extend(file_doc) elif filename.endswith(".txt"): document = TextLoader(file_path) file_doc = document.load() docs.extend(file_doc) elif filename.endswith(".docx"): document = Docx2txtLoader(file_path) file_doc = document.load() docs.extend(file_doc) if not docs: st.warning("Unable to process the documents. Please try again.") return False # Split documents text_splitter = RecursiveCharacterTextSplitter( chunk_size=1500, chunk_overlap=400, length_function=len ) chunks = text_splitter.split_documents(docs) # Initialize embeddings embed_func = OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512) try: # Create vectorstore and add documents vectorstore = Chroma.from_documents( collection_name="collection", documents=chunks, embedding=embed_func, persist_directory=get_chroma_directory() ) st.session_state.chat_enabled = True success_placeholder.success('Documents processed successfully!') time.sleep(2) # Show success message for 2 seconds success_placeholder.empty() # Clear the success message return True except Exception as e: print(f"ChromaDB error: {str(e)}") # Log error internally st.warning("Unable to process documents at the moment. Please try again.") st.session_state.chat_enabled = False return False except Exception as e: print(f"Processing error: {str(e)}") # Log error internally st.warning("Unable to process documents at the moment. Please try again.") st.session_state.chat_enabled = False return False finally: warning_placeholder.empty() def doc2str(docs): return "\n\n".join(doc.page_content for doc in docs) def run_chatbot(retriever, llm): """Run the chatbot with the given components""" # Initialize chat prompt prompt = ChatPromptTemplate.from_template(""" You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. {context} Don't start revealing context in your responses until its asked. First look at the question and then think if the context is needed to answer this or its a normal question, once you have judged then only answer the question. When there is no context, just respond on your own knowledge as a normal assistant. Answer the following question: {question}""") # Create the QA chain qa_chain = ( RunnablePassthrough.assign(context=lambda input: doc2str(retriever.invoke(input["question"]))) | prompt | llm | StrOutputParser() ) # Initialize messages in session state if not exists if "messages" not in st.session_state: st.session_state.messages = [] # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Chat input if question := st.chat_input("Ask a question about your documents"): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": question}) with st.chat_message("user"): st.markdown(question) # Create a spinner outside the chat message with st.spinner("Thinking..."): try: # Generate response response = qa_chain.invoke({"question": question}) # Display response in chat message after generation with st.chat_message("assistant"): st.markdown(response) # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": response}) except Exception as e: print(f"Chat error: {str(e)}") # Log error internally with st.chat_message("assistant"): error_msg = "I'm having trouble processing your question. Please try asking something else." st.markdown(error_msg) st.session_state.messages.append({"role": "assistant", "content": error_msg}) def process_and_chat(): """Process documents and handle chat interface""" # File uploader section with st.container(): uploaded_files = st.file_uploader( "Upload your documents", type=["pdf", "txt", "docx"], accept_multiple_files=True, key="file_uploader", label_visibility="collapsed" if st.session_state.processing else "visible" ) # Get current uploaded filenames current_uploaded_filenames = {file.name for file in uploaded_files} if uploaded_files else set() # Check for removed files files_to_remove = set(st.session_state.uploaded_files.keys()) - current_uploaded_filenames if files_to_remove: # Set processing state immediately st.session_state.processing = True st.session_state.chat_enabled = False if "messages" in st.session_state: del st.session_state.messages # Clean up ChromaDB when files are removed cleanup_chroma_db() for file_name in files_to_remove: # Remove file from session state if file_name in st.session_state.uploaded_files: # Delete the file from data directory file_path = st.session_state.uploaded_files[file_name]["path"] if os.path.exists(file_path): os.remove(file_path) # Remove from session state del st.session_state.uploaded_files[file_name] # Process newly uploaded files if uploaded_files: files_added = False for file in uploaded_files: # Only process files that haven't been uploaded before if file.name not in st.session_state.uploaded_files: # Set processing state immediately when new file is detected st.session_state.processing = True st.session_state.chat_enabled = False if "messages" in st.session_state: del st.session_state.messages file_path = save_uploaded_file(file) if file_path: # Only add to session state if file was saved successfully st.session_state.uploaded_files[file.name] = { "path": file_path, "type": file.type } files_added = True # Check for changes in files current_files = set(st.session_state.uploaded_files.keys()) # If files have changed (added or removed), reset chat and process documents if current_files != st.session_state.previous_files or files_to_remove: st.session_state.previous_files = current_files if current_files: # Process documents and enable chat if successful if process_documents(st.session_state.uploaded_files): st.session_state.chat_enabled = True st.session_state.processing = False else: st.warning('Please upload a file to continue') st.session_state.processing = False # If files exist and chat is enabled, show chat interface if current_files and st.session_state.chat_enabled: try: # Initialize components for chat llm = ChatGroq(temperature=0, model_name="llama-3.3-70b-versatile", groq_api_key=os.getenv("GROQ_API_KEY"), max_tokens=8000) # Create vectorstore embed_func = OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512) vectorstore = Chroma( collection_name="collection", embedding_function=embed_func, persist_directory=get_chroma_directory() ) # Create retrievers vectorstore_retriever = vectorstore.as_retriever( search_kwargs={"k": 3} ) # Create keyword retriever text_splitter = RecursiveCharacterTextSplitter( chunk_size=1500, chunk_overlap=400, length_function=len ) docs = [] for file_info in st.session_state.uploaded_files.values(): if file_info["path"].endswith(".pdf"): docs.extend(PyMuPDFLoader(file_info["path"]).load()) elif file_info["path"].endswith(".txt"): docs.extend(TextLoader(file_info["path"]).load()) elif file_info["path"].endswith(".docx"): docs.extend(Docx2txtLoader(file_info["path"]).load()) chunks = text_splitter.split_documents(docs) keyword_retriever = BM25Retriever.from_documents(chunks) keyword_retriever.k = 3 # Combine retrievers ensemble_retriever = EnsembleRetriever( retrievers=[vectorstore_retriever, keyword_retriever], weights=[0.5, 0.5] ) # Run chatbot with fresh components run_chatbot(ensemble_retriever, llm) except Exception as e: print(f"Chat interface error: {str(e)}") # Log error internally st.warning("Please try uploading your documents again.") st.session_state.chat_enabled = False # Clear the previous files to force reprocessing st.session_state.previous_files = set() if "messages" in st.session_state: del st.session_state.messages # Call the main function process_and_chat()