import streamlit as st import tempfile from pinecone import ServerlessSpec import qanda from langchain_community.document_loaders import UnstructuredFileLoader from documentchat import get_text_chunks,get_pdf_text from vector_search import encodeaddData,find_k_best_match1,delete,ensure_index_exists from utils import * from dotenv import load_dotenv from io import StringIO import os from pinecone.grpc import PineconeGRPC as pinecone import cv2 def fasto(): ensure_index_exists() def get_loader(file_path): file_extension = os.path.splitext(file_path)[1].lower() if file_extension in ['.pdf', '.txt', '.html', '.htm', '.docx', '.pptx', '.jpg', '.jpeg', '.png', '.gif','.xlsx']: return UnstructuredFileLoader(file_path, mode="elements") else: raise ValueError(f"Unsupported file type: {file_extension}") _ , col2,_ = st.columns([1,7,1]) with col2: col2 = st.header="Simplchat: Chat with your data" url = False query = False pdf = False pdf2 = False data = False uns2 = None options = st.selectbox("Select the type of data source", options=['Web URL','PDF','Unstructured Data','Existing data source']) if options == 'Web URL': url = st.text_input("Enter the URL of the data source") query = st.text_input("Enter your query") button = st.button("Submit") elif options == 'PDF': pdf = st.text_input("Enter your PDF link here") st.write("Or choose .pdf from your local machine") pdf2 = st.file_uploader("Choose pdf file:", type="pdf",accept_multiple_files=True) query = st.text_input("Enter your query") button = st.button("Submit") elif options == 'Unstructured Data': # uns = st.text_input("Enter your File link here") st.write("choose .* from your local machine") uns2 = st.file_uploader("Enter any file", accept_multiple_files=True) query = st.text_input("Enter your query") button = st.button("Submit") elif options == 'Existing data source': data= True query = st.text_input("Enter your query") button = st.button("Submit") if button and url: with st.spinner("Updating the database..."): corpusData = scrape_text(url) encodeaddData(corpusData, url=url, pdf=False, pdf2=None,uns2 = None) st.success("Database Updated") with st.spinner("Finding an answer..."): res = find_k_best_match1(query) context = "\n\n".join([doc.page_content for doc in res]) st.expander("Context").write(context) prompt = qanda.prompt(context,query) answer = qanda.get_answer(prompt) st.success("Answer: "+ answer) if button and pdf: with st.spinner("Updating the database..."): corpusData = pdf_text(pdf=pdf) encodeaddData(corpusData, pdf=pdf, url=False, pdf2=None,uns2 = None) st.success("Database Updated") with st.spinner("Finding an answer..."): res = find_k_best_match1(query) context = "\n\n".join([doc.page_content for doc in res]) st.expander("Context").write(context) prompt = qanda.prompt(context,query) answer = qanda.get_answer(prompt) st.success("Answer: "+ answer) if button and pdf2: with st.spinner("Updating the database..."): text = get_pdf_text(pdf2) corpusData = get_text_chunks(text) # corpusData = extract_data(feed=pdf2) encodeaddData(corpusData, pdf2=pdf2, url=False, pdf=False,uns2 = None) st.success("Database Updated") with st.spinner("Finding an answer..."): res = find_k_best_match1(query) context = "\n\n".join([doc.page_content for doc in res]) st.expander("Context").write(context) prompt = qanda.prompt(context,query) answer = qanda.get_answer(prompt) st.success("Answer: "+ answer) if button and uns2: with st.spinner("Updating the database..."): page_content = "" # Initialize as string metadata = {} # Initialize an empty dictionary for metadata for uploaded_file in uns2: # Create a temporary file to save the uploaded file with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as temp_file: temp_file.write(uploaded_file.getvalue()) temp_file_path = temp_file.name try: # Get the appropriate loader based on file type loader = get_loader(temp_file_path) docs = loader.load() # Extract and concatenate the loaded documents' content for doc in docs: if hasattr(doc, 'page_content'): page_content += doc.page_content + "\n" # Concatenate page_content else: st.warning(f"Document object has no 'page_content' attribute: {doc}") # Example of setting metadata (adjust as needed) metadata['uploaded_files'] = uns2 # Store the uploaded files information metadata['loader_used'] = str(loader) # Store the loader information except ValueError as e: st.error(str(e)) finally: # Delete the temporary file os.remove(temp_file_path) # Create document data with page_content and metadata document_data = {'page_content': page_content, 'metadata': metadata} metadata = document_data['metadata'] corpusData = document_data['page_content'] encodeaddData(corpusData, pdf=False, url=False, pdf2=None,uns2=metadata['uploaded_files']) st.success("Database Updated") with st.spinner("Finding an answer..."): res = find_k_best_match1(query) context = "\n\n".join([doc.page_content for doc in res]) st.expander("Context").write(context) prompt = qanda.prompt(context,query) answer = qanda.get_answer(prompt) st.success("Answer: "+ answer) if button and data: with st.spinner("Finding an answer..."): res = find_k_best_match1(query) context = "\n\n".join([doc.page_content for doc in res]) st.expander("Context").write(context) prompt = qanda.prompt(context,query) answer = qanda.get_answer(prompt) st.success("Answer: "+ answer) st.expander("Delete the indexes from the database") button1 = st.button("Delete the current vectors") if button1: delete()