Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import streamlit as st | |
| from langchain_groq import ChatGroq | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_community.document_loaders import TextLoader, PyMuPDFLoader, Docx2txtLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from typing import List | |
| from langchain_core.documents import Document | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain_community.retrievers import BM25Retriever | |
| from langchain.retrievers import EnsembleRetriever | |
| from langchain_chroma import Chroma | |
| import shutil | |
| import uuid | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Set page configuration | |
| st.set_page_config(page_title="Document Analyzer", layout="wide") | |
| st.title("📚 Document Analyzer") | |
| # Add instructions in an expander | |
| with st.expander("ℹ️ Click here to view instructions"): | |
| st.markdown(""" | |
| - Upload files by clicking on "Browse Files" | |
| - Avoid interrupting when file/files are under processing, this interrupts the execution and you would have to refresh the page to run the webapp again | |
| - You can add more files anytime, just avoid adding/removing files when it's processing the uploaded documents | |
| - The processing will trigger whenever you make any changes to the files | |
| """) | |
| # Initialize session states | |
| if 'initialized' not in st.session_state: | |
| st.session_state.initialized = False | |
| if 'processing' not in st.session_state: | |
| st.session_state.processing = False | |
| if 'chat_enabled' not in st.session_state: | |
| st.session_state.chat_enabled = False | |
| if 'session_id' not in st.session_state: | |
| # Generate a unique session ID using UUID | |
| st.session_state.session_id = str(uuid.uuid4())[:8] | |
| def get_chroma_directory(): | |
| """Get unique directory name for current session's ChromaDB""" | |
| base_dir = "vectorstores" | |
| if not os.path.exists(base_dir): | |
| os.makedirs(base_dir) | |
| return os.path.join(base_dir, f"chroma_db_{st.session_state.session_id}") | |
| def cleanup_chroma_db(): | |
| """Clean up existing ChromaDB for the current session""" | |
| try: | |
| chroma_dir = get_chroma_directory() | |
| if os.path.exists(chroma_dir): | |
| shutil.rmtree(chroma_dir) | |
| except Exception as e: | |
| print(f"Error cleaning up ChromaDB: {str(e)}") # Log error internally | |
| def cleanup_old_vectorstores(): | |
| """Clean up vector stores that are older than 24 hours""" | |
| try: | |
| base_dir = "vectorstores" | |
| if not os.path.exists(base_dir): | |
| return | |
| current_time = time.time() | |
| one_day_in_seconds = 24 * 60 * 60 | |
| # Get all directories in vectorstores | |
| for dir_name in os.listdir(base_dir): | |
| dir_path = os.path.join(base_dir, dir_name) | |
| if os.path.isdir(dir_path): | |
| # Get directory's last modification time | |
| last_modified = os.path.getmtime(dir_path) | |
| if current_time - last_modified > one_day_in_seconds: | |
| shutil.rmtree(dir_path) | |
| except Exception as e: | |
| print(f"Error cleaning up old vector stores: {str(e)}") # Log error internally | |
| if not st.session_state.initialized: | |
| # Clean up old vector stores first | |
| cleanup_old_vectorstores() | |
| # Clear everything only on first run or page refresh | |
| if os.path.exists("data"): | |
| shutil.rmtree("data") | |
| os.makedirs("data") | |
| # Clear vectorstores directory for current session | |
| if os.path.exists("vectorstores"): | |
| os.makedirs("vectorstores", exist_ok=True) | |
| st.session_state.uploaded_files = {} | |
| st.session_state.previous_files = set() | |
| st.session_state.initialized = True | |
| def save_uploaded_file(uploaded_file): | |
| """Save uploaded file to the data directory""" | |
| try: | |
| # Create full path | |
| file_path = os.path.join("data", uploaded_file.name) | |
| # Save the file | |
| with open(file_path, "wb") as f: | |
| file_bytes = uploaded_file.getvalue() # Get file bytes | |
| f.write(file_bytes) | |
| # Verify file was saved | |
| if os.path.exists(file_path): | |
| return file_path | |
| else: | |
| print(f"File not saved: {file_path}") # Log error internally | |
| return None | |
| except Exception as e: | |
| print(f"Error saving file: {str(e)}") # Log error internally | |
| return None | |
| def process_documents(uploaded_files_dict): | |
| """Process documents and store in ChromaDB""" | |
| warning_placeholder = st.empty() | |
| warning_placeholder.warning("⚠️ Document processing in progress. Please wait before adding or removing files.") | |
| success_placeholder = st.empty() | |
| try: | |
| with st.spinner('Processing documents...'): | |
| # Clean up existing ChromaDB before processing | |
| cleanup_chroma_db() | |
| docs = [] | |
| # Process each file | |
| for filename, file_info in uploaded_files_dict.items(): | |
| file_path = file_info["path"] | |
| if not os.path.exists(file_path): | |
| print(f"File not found: {file_path}") # Log error internally | |
| continue | |
| if filename.endswith(".pdf"): | |
| document = PyMuPDFLoader(file_path) | |
| file_doc = document.load() | |
| docs.extend(file_doc) | |
| elif filename.endswith(".txt"): | |
| document = TextLoader(file_path) | |
| file_doc = document.load() | |
| docs.extend(file_doc) | |
| elif filename.endswith(".docx"): | |
| document = Docx2txtLoader(file_path) | |
| file_doc = document.load() | |
| docs.extend(file_doc) | |
| if not docs: | |
| st.warning("Unable to process the documents. Please try again.") | |
| return False | |
| # Split documents | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1500, | |
| chunk_overlap=400, | |
| length_function=len | |
| ) | |
| chunks = text_splitter.split_documents(docs) | |
| # Initialize embeddings | |
| embed_func = OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512) | |
| try: | |
| # Create vectorstore and add documents | |
| vectorstore = Chroma.from_documents( | |
| collection_name="collection", | |
| documents=chunks, | |
| embedding=embed_func, | |
| persist_directory=get_chroma_directory() | |
| ) | |
| st.session_state.chat_enabled = True | |
| success_placeholder.success('Documents processed successfully!') | |
| time.sleep(2) # Show success message for 2 seconds | |
| success_placeholder.empty() # Clear the success message | |
| return True | |
| except Exception as e: | |
| print(f"ChromaDB error: {str(e)}") # Log error internally | |
| st.warning("Unable to process documents at the moment. Please try again.") | |
| st.session_state.chat_enabled = False | |
| return False | |
| except Exception as e: | |
| print(f"Processing error: {str(e)}") # Log error internally | |
| st.warning("Unable to process documents at the moment. Please try again.") | |
| st.session_state.chat_enabled = False | |
| return False | |
| finally: | |
| warning_placeholder.empty() | |
| def doc2str(docs): | |
| return "\n\n".join(doc.page_content for doc in docs) | |
| def run_chatbot(retriever, llm): | |
| """Run the chatbot with the given components""" | |
| # Initialize chat prompt | |
| prompt = ChatPromptTemplate.from_template(""" | |
| You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. | |
| <context> | |
| {context} | |
| </context> | |
| <important> | |
| Don't start revealing context in your responses until its asked. First look at the question and then think if the context is needed to answer this or its a normal question, once you have judged then only answer the question. | |
| When there is no context, just respond on your own knowledge as a normal assistant. | |
| </important> | |
| Answer the following question: | |
| {question}""") | |
| # Create the QA chain | |
| qa_chain = ( | |
| RunnablePassthrough.assign(context=lambda input: doc2str(retriever.invoke(input["question"]))) | |
| | prompt | |
| | llm | |
| | StrOutputParser() | |
| ) | |
| # Initialize messages in session state if not exists | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| # Display chat messages | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| # Chat input | |
| if question := st.chat_input("Ask a question about your documents"): | |
| # Add user message to chat history | |
| st.session_state.messages.append({"role": "user", "content": question}) | |
| with st.chat_message("user"): | |
| st.markdown(question) | |
| # Create a spinner outside the chat message | |
| with st.spinner("Thinking..."): | |
| try: | |
| # Generate response | |
| response = qa_chain.invoke({"question": question}) | |
| # Display response in chat message after generation | |
| with st.chat_message("assistant"): | |
| st.markdown(response) | |
| # Add assistant response to chat history | |
| st.session_state.messages.append({"role": "assistant", "content": response}) | |
| except Exception as e: | |
| print(f"Chat error: {str(e)}") # Log error internally | |
| with st.chat_message("assistant"): | |
| error_msg = "I'm having trouble processing your question. Please try asking something else." | |
| st.markdown(error_msg) | |
| st.session_state.messages.append({"role": "assistant", "content": error_msg}) | |
| def process_and_chat(): | |
| """Process documents and handle chat interface""" | |
| # File uploader section | |
| with st.container(): | |
| uploaded_files = st.file_uploader( | |
| "Upload your documents", | |
| type=["pdf", "txt", "docx"], | |
| accept_multiple_files=True, | |
| key="file_uploader", | |
| label_visibility="collapsed" if st.session_state.processing else "visible" | |
| ) | |
| # Get current uploaded filenames | |
| current_uploaded_filenames = {file.name for file in uploaded_files} if uploaded_files else set() | |
| # Check for removed files | |
| files_to_remove = set(st.session_state.uploaded_files.keys()) - current_uploaded_filenames | |
| if files_to_remove: | |
| # Set processing state immediately | |
| st.session_state.processing = True | |
| st.session_state.chat_enabled = False | |
| if "messages" in st.session_state: | |
| del st.session_state.messages | |
| # Clean up ChromaDB when files are removed | |
| cleanup_chroma_db() | |
| for file_name in files_to_remove: | |
| # Remove file from session state | |
| if file_name in st.session_state.uploaded_files: | |
| # Delete the file from data directory | |
| file_path = st.session_state.uploaded_files[file_name]["path"] | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| # Remove from session state | |
| del st.session_state.uploaded_files[file_name] | |
| # Process newly uploaded files | |
| if uploaded_files: | |
| files_added = False | |
| for file in uploaded_files: | |
| # Only process files that haven't been uploaded before | |
| if file.name not in st.session_state.uploaded_files: | |
| # Set processing state immediately when new file is detected | |
| st.session_state.processing = True | |
| st.session_state.chat_enabled = False | |
| if "messages" in st.session_state: | |
| del st.session_state.messages | |
| file_path = save_uploaded_file(file) | |
| if file_path: # Only add to session state if file was saved successfully | |
| st.session_state.uploaded_files[file.name] = { | |
| "path": file_path, | |
| "type": file.type | |
| } | |
| files_added = True | |
| # Check for changes in files | |
| current_files = set(st.session_state.uploaded_files.keys()) | |
| # If files have changed (added or removed), reset chat and process documents | |
| if current_files != st.session_state.previous_files or files_to_remove: | |
| st.session_state.previous_files = current_files | |
| if current_files: | |
| # Process documents and enable chat if successful | |
| if process_documents(st.session_state.uploaded_files): | |
| st.session_state.chat_enabled = True | |
| st.session_state.processing = False | |
| else: | |
| st.warning('Please upload a file to continue') | |
| st.session_state.processing = False | |
| # If files exist and chat is enabled, show chat interface | |
| if current_files and st.session_state.chat_enabled: | |
| try: | |
| # Initialize components for chat | |
| llm = ChatGroq(temperature=0, model_name="llama-3.3-70b-versatile", groq_api_key=os.getenv("GROQ_API_KEY"), max_tokens=8000) | |
| # Create vectorstore | |
| embed_func = OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512) | |
| vectorstore = Chroma( | |
| collection_name="collection", | |
| embedding_function=embed_func, | |
| persist_directory=get_chroma_directory() | |
| ) | |
| # Create retrievers | |
| vectorstore_retriever = vectorstore.as_retriever( | |
| search_kwargs={"k": 3} | |
| ) | |
| # Create keyword retriever | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1500, | |
| chunk_overlap=400, | |
| length_function=len | |
| ) | |
| docs = [] | |
| for file_info in st.session_state.uploaded_files.values(): | |
| if file_info["path"].endswith(".pdf"): | |
| docs.extend(PyMuPDFLoader(file_info["path"]).load()) | |
| elif file_info["path"].endswith(".txt"): | |
| docs.extend(TextLoader(file_info["path"]).load()) | |
| elif file_info["path"].endswith(".docx"): | |
| docs.extend(Docx2txtLoader(file_info["path"]).load()) | |
| chunks = text_splitter.split_documents(docs) | |
| keyword_retriever = BM25Retriever.from_documents(chunks) | |
| keyword_retriever.k = 3 | |
| # Combine retrievers | |
| ensemble_retriever = EnsembleRetriever( | |
| retrievers=[vectorstore_retriever, keyword_retriever], | |
| weights=[0.5, 0.5] | |
| ) | |
| # Run chatbot with fresh components | |
| run_chatbot(ensemble_retriever, llm) | |
| except Exception as e: | |
| print(f"Chat interface error: {str(e)}") # Log error internally | |
| st.warning("Please try uploading your documents again.") | |
| st.session_state.chat_enabled = False | |
| # Clear the previous files to force reprocessing | |
| st.session_state.previous_files = set() | |
| if "messages" in st.session_state: | |
| del st.session_state.messages | |
| # Call the main function | |
| process_and_chat() |