Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import time | |
| import logging | |
| import io | |
| import requests | |
| from bs4 import BeautifulSoup | |
| #from PyPDF2 import PdfReader | |
| from dotenv import load_dotenv | |
| import pdfplumber | |
| import docx | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.llms import OpenAI | |
| def fetch_and_process_pdf(url): | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| pdf_file = io.BytesIO(response.content) | |
| text = process_pdf(pdf_file) | |
| return text | |
| except requests.HTTPError as e: | |
| logging.error(f"Failed to fetch PDF from {url}. Error: {e}") | |
| return "" | |
| def process_pdf(pdf): | |
| start_time = time.time() | |
| text = "" | |
| with pdfplumber.open(pdf) as pdf_reader: | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() or "" | |
| end_time = time.time() | |
| logging.info(f"Processed PDF in {end_time - start_time} seconds") | |
| return text | |
| def display_chat_history(): | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| history_text = "" | |
| for chat in st.session_state.chat_history: | |
| history_text += f"Q: {chat['question']}\nA: {chat['answer']}\n{chat['time']}\n---\n" | |
| st.text_area("Chat History", history_text, height=300) | |
| def update_chat_history(question, answer): | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| st.session_state.chat_history.append({ | |
| "question": question, | |
| "answer": answer, | |
| "time": time.strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| def read_pdf(file_path): | |
| with open(file_path, "rb") as file: | |
| pdf_reader = PdfReader(file) | |
| text = "" | |
| for page_num in range(len(pdf_reader.pages)): | |
| text += pdf_reader.pages[page_num].extract_text() | |
| return text | |
| def read_word(file_path): | |
| doc = docx.Document(file_path) | |
| text = "" | |
| for paragraph in doc.paragraphs: | |
| text += paragraph.text + "\n" | |
| return text | |
| def read_documents_from_directory(directory): | |
| combined_text = "" | |
| for filename in os.listdir(directory): | |
| file_path = os.path.join(directory, filename) | |
| if filename.endswith(".pdf"): | |
| combined_text += read_pdf(file_path) | |
| elif filename.endswith(".docx"): | |
| combined_text += read_word(file_path) | |
| return combined_text | |
| def get_pdf_links_from_dataset(url): | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Define the base URL | |
| base_url = "https://huggingface.co" | |
| # Extract and construct absolute URLs | |
| pdf_links = [base_url + link.get('href') for link in soup.find_all('a') if '.pdf' in link.get('href')] | |
| return pdf_links | |
| except requests.HTTPError as e: | |
| logging.error(f"Failed to get PDF links from dataset. Error: {e}") | |
| return [] | |
| #train_directory = r'C:\Users\writa\Downloads\Crypto' | |
| url = "https://huggingface.co/datasets/Writo/realestate_data/tree/main" | |
| def main(): | |
| load_dotenv() | |
| st.set_page_config(page_title="EstateSphere") | |
| st.header("🏢 EstateSphere") | |
| # Ensure train_directory is accessible in Hugging Face Space | |
| #text = read_documents_from_directory(train_directory) | |
| dataset_url = 'https://huggingface.co/datasets/Writo/realestate_data/tree/main' | |
| pdf_links = get_pdf_links_from_dataset(dataset_url) | |
| if pdf_links: | |
| with st.spinner("Processing PDFs, please wait..."): | |
| text = "" | |
| for link in pdf_links: | |
| text += fetch_and_process_pdf(link) | |
| # Processing text and setting up the AI model | |
| char_text_splitter = CharacterTextSplitter(separator="\n", chunk_size=1000, | |
| chunk_overlap=200, length_function=len) | |
| text_chunks = char_text_splitter.split_text(text) | |
| embeddings = OpenAIEmbeddings() | |
| docsearch = FAISS.from_texts(text_chunks, embeddings) | |
| llm = OpenAI() | |
| chain = load_qa_chain(llm, chain_type="appropriate_type") | |
| # Chat interface | |
| query = st.text_input("Type your question:", key="query") | |
| if query: | |
| with st.spinner("Finding your answer..."): | |
| try: | |
| docs = docsearch.similarity_search(query) | |
| response = chain.run(input_documents=docs, question=query) | |
| update_chat_history(query, response) | |
| display_chat_history() | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| # Help and support in the sidebar | |
| st.sidebar.header("Help & Support") | |
| st.sidebar.write("Need assistance? Reach out to our support team.") | |
| # Footer | |
| st.sidebar.text("© 2024 MosaicAI") | |
| update_chat_history(question, answer) | |
| if __name__ == "__main__": | |
| main() | |