Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import tempfile | |
| import base64 | |
| import os | |
| import nltk | |
| nltk.download('punkt') | |
| from src.utils.ingest_text import create_vector_database | |
| from src.utils.ingest_image import extract_and_store_images | |
| from src.utils.text_qa import qa_bot | |
| from src.utils.image_qa import query_and_print_results | |
| import nest_asyncio | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain_community.chat_message_histories import StreamlitChatMessageHistory | |
| from dotenv import load_dotenv | |
| # Setup | |
| nest_asyncio.apply() | |
| load_dotenv() | |
| st.set_page_config(layout='wide', page_title="InsightFusion Chat") | |
| # ---- Session State Initialization ---- | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "chain" not in st.session_state: | |
| st.session_state.chain = None | |
| if "image_vdb" not in st.session_state: | |
| st.session_state.image_vdb = None | |
| # ---- Chat Memory Setup ---- | |
| memory_storage = StreamlitChatMessageHistory(key="chat_messages") | |
| memory = ConversationBufferWindowMemory( | |
| memory_key="chat_history", | |
| human_prefix="User", | |
| chat_memory=memory_storage, | |
| k=3 | |
| ) | |
| # ---- Background Image Setup ---- | |
| image_bg = r"data/pexels-andreea-ch-371539-1166644.jpg" | |
| def add_bg_from_local(image_file): | |
| with open(image_file, "rb") as img: | |
| encoded_string = base64.b64encode(img.read()) | |
| st.markdown(f"""<style>.stApp {{ | |
| background-image: url(data:image/png;base64,{encoded_string.decode()}); | |
| background-size: cover; | |
| }}</style>""", unsafe_allow_html=True) | |
| add_bg_from_local(image_bg) | |
| # ---- Title ---- | |
| st.markdown(""" | |
| <svg width="600" height="100"> | |
| <text x="50%" y="50%" font-family="San serif" font-size="42px" fill="Black" text-anchor="middle" stroke="white" | |
| stroke-width="0.3" stroke-linejoin="round">InsightFusion Chat | |
| </text> | |
| </svg> | |
| """, unsafe_allow_html=True) | |
| # ---- Utility ---- | |
| def get_answer(query, chain): | |
| try: | |
| response = chain.invoke(query) | |
| return response['result'] | |
| except Exception as e: | |
| st.error(f"Error in get_answer: {e}") | |
| return None | |
| # ---- File Upload ---- | |
| path = None | |
| uploaded_file = st.file_uploader("Upload a PDF file", type="pdf") | |
| if uploaded_file is not None: | |
| temp_file_path = os.path.join("temp", uploaded_file.name) | |
| os.makedirs("temp", exist_ok=True) | |
| with open(temp_file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| path = os.path.abspath(temp_file_path) | |
| st.success("Document uploaded successfully!") | |
| st.write(f"File saved to: {path}") | |
| # ---- Demo PDF Option ---- | |
| st.markdown("### Or use a demo file:") | |
| if st.button("Use Demo PDF"): | |
| demo_file_path = os.path.join("pdf_resource", "sample.pdf") # Replace with actual demo name | |
| if os.path.exists(demo_file_path): | |
| path = os.path.abspath(demo_file_path) | |
| st.success(f"Using demo file: {path}") | |
| with st.spinner("Processing demo file..."): | |
| try: | |
| client = create_vector_database(path) | |
| image_vdb = extract_and_store_images(path) | |
| chain = qa_bot(client) | |
| st.session_state.chain = chain | |
| st.session_state.image_vdb = image_vdb | |
| st.success("Demo file processed successfully.") | |
| except Exception as e: | |
| st.error(f"Error processing demo PDF: {e}") | |
| else: | |
| st.error("Demo PDF not found in pdf_resource/.") | |
| # ---- Processing Button ---- | |
| if st.button("Start Processing"): | |
| if path is not None: | |
| with st.spinner("Processing file..."): | |
| try: | |
| client = create_vector_database(path) | |
| image_vdb = extract_and_store_images(path) | |
| chain = qa_bot(client) | |
| st.session_state.chain = chain | |
| st.session_state.image_vdb = image_vdb | |
| st.success("File processed successfully.") | |
| except Exception as e: | |
| st.error(f"Error during processing: {e}") | |
| else: | |
| st.error("Please upload a file or select a demo.") | |
| # ---- Style Customization ---- | |
| st.markdown(""" | |
| <style> | |
| .stChatInputContainer > div { | |
| background-color: #000000; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # ---- Chat Interface ---- | |
| if user_input := st.chat_input("User Input"): | |
| if st.session_state.chain and st.session_state.image_vdb: | |
| with st.chat_message("user"): | |
| st.markdown(user_input) | |
| with st.spinner("Generating Response..."): | |
| response = get_answer(user_input, st.session_state.chain) | |
| if response: | |
| with st.chat_message("assistant"): | |
| st.markdown(response) | |
| memory.save_context({"input": user_input}, {"output": response}) | |
| st.session_state.messages.append({"role": "user", "content": user_input}) | |
| st.session_state.messages.append({"role": "assistant", "content": response}) | |
| try: | |
| query_and_print_results(st.session_state.image_vdb, user_input) | |
| except Exception as e: | |
| st.error(f"Error querying image database: {e}") | |
| else: | |
| st.error("Failed to generate a response.") | |
| else: | |
| st.error("Please process a file before chatting.") | |
| # ---- Display Chat History ---- | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): | |
| st.write(msg["content"]) | |
| # ---- Display Memory History (LangChain) ---- | |
| for i, msg in enumerate(memory_storage.messages): | |
| role = "user" if i % 2 == 0 else "assistant" | |
| st.chat_message(role).markdown(msg.content) | |