Prajwal3009's picture
Upload 30 files
ccd7971 verified
import streamlit as st
import tempfile
from pinecone import ServerlessSpec
import qanda
from langchain_community.document_loaders import UnstructuredFileLoader
from documentchat import get_text_chunks,get_pdf_text
from vector_search import encodeaddData,find_k_best_match1,delete,ensure_index_exists
from utils import *
from dotenv import load_dotenv
from io import StringIO
import os
from pinecone.grpc import PineconeGRPC as pinecone
import cv2
def fasto():
ensure_index_exists()
def get_loader(file_path):
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension in ['.pdf', '.txt', '.html', '.htm', '.docx', '.pptx', '.jpg', '.jpeg', '.png', '.gif','.xlsx']:
return UnstructuredFileLoader(file_path, mode="elements")
else:
raise ValueError(f"Unsupported file type: {file_extension}")
_ , col2,_ = st.columns([1,7,1])
with col2:
col2 = st.header="Simplchat: Chat with your data"
url = False
query = False
pdf = False
pdf2 = False
data = False
uns2 = None
options = st.selectbox("Select the type of data source",
options=['Web URL','PDF','Unstructured Data','Existing data source'])
if options == 'Web URL':
url = st.text_input("Enter the URL of the data source")
query = st.text_input("Enter your query")
button = st.button("Submit")
elif options == 'PDF':
pdf = st.text_input("Enter your PDF link here")
st.write("Or choose .pdf from your local machine")
pdf2 = st.file_uploader("Choose pdf file:", type="pdf",accept_multiple_files=True)
query = st.text_input("Enter your query")
button = st.button("Submit")
elif options == 'Unstructured Data':
# uns = st.text_input("Enter your File link here")
st.write("choose .* from your local machine")
uns2 = st.file_uploader("Enter any file", accept_multiple_files=True)
query = st.text_input("Enter your query")
button = st.button("Submit")
elif options == 'Existing data source':
data= True
query = st.text_input("Enter your query")
button = st.button("Submit")
if button and url:
with st.spinner("Updating the database..."):
corpusData = scrape_text(url)
encodeaddData(corpusData, url=url, pdf=False, pdf2=None,uns2 = None)
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and pdf:
with st.spinner("Updating the database..."):
corpusData = pdf_text(pdf=pdf)
encodeaddData(corpusData, pdf=pdf, url=False, pdf2=None,uns2 = None)
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and pdf2:
with st.spinner("Updating the database..."):
text = get_pdf_text(pdf2)
corpusData = get_text_chunks(text)
# corpusData = extract_data(feed=pdf2)
encodeaddData(corpusData, pdf2=pdf2, url=False, pdf=False,uns2 = None)
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and uns2:
with st.spinner("Updating the database..."):
page_content = "" # Initialize as string
metadata = {} # Initialize an empty dictionary for metadata
for uploaded_file in uns2:
# Create a temporary file to save the uploaded file
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as temp_file:
temp_file.write(uploaded_file.getvalue())
temp_file_path = temp_file.name
try:
# Get the appropriate loader based on file type
loader = get_loader(temp_file_path)
docs = loader.load()
# Extract and concatenate the loaded documents' content
for doc in docs:
if hasattr(doc, 'page_content'):
page_content += doc.page_content + "\n" # Concatenate page_content
else:
st.warning(f"Document object has no 'page_content' attribute: {doc}")
# Example of setting metadata (adjust as needed)
metadata['uploaded_files'] = uns2 # Store the uploaded files information
metadata['loader_used'] = str(loader) # Store the loader information
except ValueError as e:
st.error(str(e))
finally:
# Delete the temporary file
os.remove(temp_file_path)
# Create document data with page_content and metadata
document_data = {'page_content': page_content, 'metadata': metadata}
metadata = document_data['metadata']
corpusData = document_data['page_content']
encodeaddData(corpusData, pdf=False, url=False, pdf2=None,uns2=metadata['uploaded_files'])
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and data:
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
st.expander("Delete the indexes from the database")
button1 = st.button("Delete the current vectors")
if button1:
delete()