Spaces:
Sleeping
Sleeping
File size: 7,283 Bytes
ccd7971 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | import streamlit as st
import tempfile
from pinecone import ServerlessSpec
import qanda
from langchain_community.document_loaders import UnstructuredFileLoader
from documentchat import get_text_chunks,get_pdf_text
from vector_search import encodeaddData,find_k_best_match1,delete,ensure_index_exists
from utils import *
from dotenv import load_dotenv
from io import StringIO
import os
from pinecone.grpc import PineconeGRPC as pinecone
import cv2
def fasto():
ensure_index_exists()
def get_loader(file_path):
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension in ['.pdf', '.txt', '.html', '.htm', '.docx', '.pptx', '.jpg', '.jpeg', '.png', '.gif','.xlsx']:
return UnstructuredFileLoader(file_path, mode="elements")
else:
raise ValueError(f"Unsupported file type: {file_extension}")
_ , col2,_ = st.columns([1,7,1])
with col2:
col2 = st.header="Simplchat: Chat with your data"
url = False
query = False
pdf = False
pdf2 = False
data = False
uns2 = None
options = st.selectbox("Select the type of data source",
options=['Web URL','PDF','Unstructured Data','Existing data source'])
if options == 'Web URL':
url = st.text_input("Enter the URL of the data source")
query = st.text_input("Enter your query")
button = st.button("Submit")
elif options == 'PDF':
pdf = st.text_input("Enter your PDF link here")
st.write("Or choose .pdf from your local machine")
pdf2 = st.file_uploader("Choose pdf file:", type="pdf",accept_multiple_files=True)
query = st.text_input("Enter your query")
button = st.button("Submit")
elif options == 'Unstructured Data':
# uns = st.text_input("Enter your File link here")
st.write("choose .* from your local machine")
uns2 = st.file_uploader("Enter any file", accept_multiple_files=True)
query = st.text_input("Enter your query")
button = st.button("Submit")
elif options == 'Existing data source':
data= True
query = st.text_input("Enter your query")
button = st.button("Submit")
if button and url:
with st.spinner("Updating the database..."):
corpusData = scrape_text(url)
encodeaddData(corpusData, url=url, pdf=False, pdf2=None,uns2 = None)
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and pdf:
with st.spinner("Updating the database..."):
corpusData = pdf_text(pdf=pdf)
encodeaddData(corpusData, pdf=pdf, url=False, pdf2=None,uns2 = None)
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and pdf2:
with st.spinner("Updating the database..."):
text = get_pdf_text(pdf2)
corpusData = get_text_chunks(text)
# corpusData = extract_data(feed=pdf2)
encodeaddData(corpusData, pdf2=pdf2, url=False, pdf=False,uns2 = None)
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and uns2:
with st.spinner("Updating the database..."):
page_content = "" # Initialize as string
metadata = {} # Initialize an empty dictionary for metadata
for uploaded_file in uns2:
# Create a temporary file to save the uploaded file
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as temp_file:
temp_file.write(uploaded_file.getvalue())
temp_file_path = temp_file.name
try:
# Get the appropriate loader based on file type
loader = get_loader(temp_file_path)
docs = loader.load()
# Extract and concatenate the loaded documents' content
for doc in docs:
if hasattr(doc, 'page_content'):
page_content += doc.page_content + "\n" # Concatenate page_content
else:
st.warning(f"Document object has no 'page_content' attribute: {doc}")
# Example of setting metadata (adjust as needed)
metadata['uploaded_files'] = uns2 # Store the uploaded files information
metadata['loader_used'] = str(loader) # Store the loader information
except ValueError as e:
st.error(str(e))
finally:
# Delete the temporary file
os.remove(temp_file_path)
# Create document data with page_content and metadata
document_data = {'page_content': page_content, 'metadata': metadata}
metadata = document_data['metadata']
corpusData = document_data['page_content']
encodeaddData(corpusData, pdf=False, url=False, pdf2=None,uns2=metadata['uploaded_files'])
st.success("Database Updated")
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
if button and data:
with st.spinner("Finding an answer..."):
res = find_k_best_match1(query)
context = "\n\n".join([doc.page_content for doc in res])
st.expander("Context").write(context)
prompt = qanda.prompt(context,query)
answer = qanda.get_answer(prompt)
st.success("Answer: "+ answer)
st.expander("Delete the indexes from the database")
button1 = st.button("Delete the current vectors")
if button1:
delete()
|