Spaces:
Sleeping
Sleeping
File size: 7,851 Bytes
0fd5dbd f32095c 0fd5dbd f32095c 0fd5dbd f32095c 0fd5dbd f32095c 0fd5dbd f32095c 0fd5dbd f32095c 0fd5dbd f32095c 0fd5dbd f32095c 0fd5dbd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | import os
import uuid
import json
import re
from bs4 import BeautifulSoup
import requests
import streamlit as st
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.llms import Ollama
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv
from langchain_community.embeddings import HuggingFaceEmbeddings
import nltk
from urllib.parse import urljoin, urlparse
import faiss
# Load environment variables (if needed for API keys)
load_dotenv()
# Initialize HuggingFace Embeddings
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
# Download NLTK stopwords
nltk.download('stopwords')
from nltk.corpus import stopwords
STOPWORDS = set(stopwords.words('english'))
# Text Preprocessing Function
def preprocess_text(text):
text = re.sub(r'[^A-Za-z\s]', '', text) # Remove special characters
text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces
text = text.lower() # Convert to lowercase
tokens = text.split()
cleaned_text = " ".join([word for word in tokens if word not in STOPWORDS]) # Remove stopwords
return cleaned_text
# Scrape Website with BeautifulSoup
def scrape_website(url):
visited_urls = set()
scraped_data = {}
def scrape_page(url):
if url in visited_urls:
return
visited_urls.add(url)
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
}
response = requests.get(url, headers=headers)
except requests.RequestException as e:
st.error(f"Failed to retrieve {url}: {e}")
return
soup = BeautifulSoup(response.content, 'html.parser')
# Extract relevant content
relevant_tags = ['p', 'strong', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'div']
content = []
for tag in relevant_tags:
for element in soup.find_all(tag):
text = element.get_text(strip=True)
if text:
content.append(text)
if content:
scraped_data[url] = " ".join(content)
# Find and process all internal links on the page
for link in soup.find_all('a', href=True):
next_url = urljoin(url, link['href'])
if urlparse(next_url).netloc == urlparse(url).netloc and next_url not in visited_urls:
scrape_page(next_url)
scrape_page(url)
return scraped_data
# PDF Text Extraction
def get_pdf_text(pdf_docs):
text = ""
for pdf in pdf_docs:
pdf_reader = PdfReader(pdf)
for page in pdf_reader.pages:
text += page.extract_text() or "" # Handle None
return preprocess_text(text)
# Split Text into Manageable Chunks
def get_text_chunks(text):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=15000, chunk_overlap=1000)
chunks = text_splitter.split_text(text)
return chunks
# Create FAISS Vector Store with UUID
def create_faiss_with_uuid(text_chunks):
# Generate a unique UUID for this document
unique_id = str(uuid.uuid4()) # Generate unique identifier
# Create a new FAISS index for the document
vector_store = FAISS.from_texts(text_chunks, embeddings) # Create FAISS from chunks
# Define a directory to store the FAISS index (using the UUID as part of the directory name)
faiss_directory = f'./faiss_index_{unique_id}'
os.makedirs(faiss_directory, exist_ok=True)
# Save the FAISS index in a directory with the UUID
vector_store.save_local(faiss_directory) # Save locally with a unique directory name
return unique_id, faiss_directory # Return the UUID and the directory path
# Build Conversational Chain
def get_conversational_chain():
prompt_template = """
Answer the question as detailed as possible from the provided context. If the answer is not in
provided context, just say, "answer is not available in the context." Don't provide the wrong answer.\n\n
Context:\n {context}\n
Question: \n{question}\n
Answer:
"""
model = Ollama(model="qwen2.5:0.5b") # Initialize LLaMA model
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt)
return chain
# Handle User Input and Process Questions with UUID-based FAISS Index
def user_input(user_question, faiss_directory):
# Load the FAISS index based on the given directory (UUID-based)
new_db = FAISS.load_local(faiss_directory, embeddings, allow_dangerous_deserialization=True)
# Perform similarity search and answer the user's question
docs = new_db.similarity_search(user_question)
chain = get_conversational_chain()
response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True)
st.write("Reply: ", response["output_text"])
# Main Function for Streamlit App
def main():
st.set_page_config("Chat PDF & URL", layout="wide")
st.header("Chat with PDF or URL using Ollama 💁")
user_question = st.text_input("Ask a Question from the Processed Data")
if user_question and 'faiss_directory' in st.session_state:
faiss_directory = st.session_state['faiss_directory']
user_input(user_question, faiss_directory)
with st.sidebar:
st.title("Menu:")
# User selects between PDF or URL
option = st.radio("Choose input type:", ("PDF", "URL"))
if option == "PDF":
pdf_docs = st.file_uploader("Upload PDF Files:", accept_multiple_files=True)
if st.button("Submit & Process"):
with st.spinner("Processing..."):
if pdf_docs:
raw_text = get_pdf_text(pdf_docs)
text_chunks = get_text_chunks(raw_text)
unique_id, faiss_directory = create_faiss_with_uuid(text_chunks)
st.session_state['faiss_directory'] = faiss_directory
st.success("PDF data is ready for queries!")
else:
st.error("No PDF files were uploaded.")
elif option == "URL":
url_input = st.text_input("Enter a URL to scrape text:")
if st.button("Submit & Process"):
with st.spinner("Processing..."):
if url_input:
try:
# Run BeautifulSoup and get scraped data
scraped_data = scrape_website(url_input)
# Combine and preprocess scraped data
raw_text = preprocess_text(" ".join(scraped_data.values()))
# Split text into chunks and index in FAISS
text_chunks = get_text_chunks(raw_text)
unique_id, faiss_directory = create_faiss_with_uuid(text_chunks)
st.session_state['faiss_directory'] = faiss_directory
st.success("Scraped data is ready for queries!")
except Exception as e:
st.error(f"Failed to scrape or process data: {e}")
else:
st.error("No URL was provided.")
if __name__ == "__main__":
main()
|