WebChatter / app.py
MahatirTusher's picture
Update app.py
1803c27 verified
import streamlit as st
from dotenv import load_dotenv
from langchain_community.document_loaders import WebBaseLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores.faiss import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
import os
from langchain_groq import ChatGroq
from langchain.chains.qa_with_sources.retrieval import RetrievalQAWithSourcesChain
from langchain.prompts import PromptTemplate
from bs4 import SoupStrainer
import PyPDF2
# Load environment variables
load_dotenv()
# Get Groq API key from environment variable (recommended) or use hardcoded fallback
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
GROQ_API_KEY = "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"
# Custom CSS
st.markdown("""
<style>
body {
background: linear-gradient(135deg, #1e3c72, #2a5298);
color: #ffffff;
font-family: 'Arial', sans-serif;
}
.stSidebar, .main .block-container {
background: rgba(255, 255, 255, 0.1);
border-radius: 15px;
backdrop-filter: blur(10px);
-webkit-backdrop-filter: blur(10px);
border: 1px solid rgba(255, 255, 255, 0.2);
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.2);
padding: 20px;
}
.stTextInput > div > input {
background: rgba(255, 255, 255, 0.15);
color: #ffffff;
border: 1px solid rgba(255, 255, 255, 0.3);
border-radius: 10px;
padding: 10px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);
}
.stButton > button {
background: linear-gradient(45deg, #6b48ff, #00ddeb);
color: #ffffff;
border: none;
border-radius: 10px;
padding: 10px 20px;
font-weight: bold;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2);
transition: transform 0.2s;
}
.stButton > button:hover {
transform: translateY(-2px);
box-shadow: 0 6px 16px rgba(0, 0, 0, 0.3);
}
h1, h2, h3 {
color: #ffffff;
text-shadow: 0 2px 4px rgba(0, 0, 0, 0.2);
}
.stText {
color: #e0e0e0;
font-weight: bold;
}
.stAlert {
background: rgba(255, 50, 50, 0.2);
border: 1px solid rgba(255, 50, 50, 0.5);
border-radius: 10px;
color: #ffcccc;
}
.stAlert[role="alert"] > div {
background: rgba(255, 200, 0, 0.2);
border: 1px solid rgba(255, 200, 0, 0.5);
color: #fff5cc;
}
.stSpinner > div {
color: #00ddeb;
}
.footer {
display: flex;
align-items: center;
justify-content: center;
padding: 10px;
background: rgba(255, 255, 255, 0.1);
border-top: 1px solid rgba(255, 255, 255, 0.2);
position: fixed;
bottom: 0;
width: 100%;
color: #e0e0e0;
font-size: 14px;
}
.footer img {
margin-right: 10px;
}
</style>
""", unsafe_allow_html=True)
# Display logo as the title
st.image("https://i.postimg.cc/2j0QWF3Z/Removal-575.png", width=390)
# Initialize session state
if "url_content" not in st.session_state:
st.session_state.url_content = None
if "summary" not in st.session_state:
st.session_state.summary = None
if "vectorstore" not in st.session_state:
st.session_state.vectorstore = None
if "index_created" not in st.session_state:
st.session_state.index_created = False
if "content_type" not in st.session_state:
st.session_state.content_type = None
if "token_count" not in st.session_state:
st.session_state.token_count = 0
# Initialize LLM once at the start
if "llm" not in st.session_state:
st.session_state.llm = ChatGroq(
api_key=GROQ_API_KEY,
model="llama3-70b-8192",
max_tokens=512 # Keep reduced to minimize resource usage
)
# Sidebar for URL and PDF input
with st.sidebar:
st.header("Enter Web URL")
url = st.text_input("URL", placeholder="e.g., https://mahatirtusher.com/astronomy-mythology/")
process_url_clicked = st.button("Process URL")
st.header("Upload PDF File")
pdf_file = st.file_uploader("Upload a PDF", type=["pdf"], help="Upload a text-based PDF for best results. Please remember, if the uploaded pdf is too large, you are requested not to summarize it. Rather keep asking question")
process_pdf_clicked = st.button("Process PDF")
# Main content container
main_container = st.container()
# Custom prompt for detailed answers
qa_prompt = PromptTemplate(
template="""You are an expert assistant tasked with providing detailed, extensive, and comprehensive answers. Use the provided context to answer the question thoroughly, including explanations, examples, and additional relevant information. If the context is limited, expand on the topic with your knowledge to ensure a complete response. In case of explaining anything, break the topic and explain step by step. Sometimes use your own reasoning and knowledge to explain anything to the users. If the users ask any question in Bengali, you too will answer it in fine and detailed Bengali.
Context: {context}
Question: {question}
Answer with sources: """
)
# Function to estimate token count (approximation: 1 token β‰ˆ 4 characters for English text)
def estimate_token_count(text):
if not text:
return 0
# Approximate token count: 1 token β‰ˆ 4 characters (including spaces and punctuation)
return len(text) // 4
# Function to summarize content
def summarize_content(content, llm):
# Shorter summary for web URLs and PDFs (5-10 sentences)
summary_prompt = f"""Summarize the following content in 5-10 sentences, capturing the main points and key details in easy expression:
{content}
Summary: """
summary = llm.invoke(summary_prompt).content
return summary
# Function to extract text from PDF
def extract_text_from_pdf(pdf_file):
try:
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if not text.strip():
st.error("No text could be extracted from the PDF. This may be a scanned or image-based PDF. Please upload a text-based PDF.")
return None
return text
except Exception as e:
st.error(f"Error extracting text from PDF: {str(e)}")
return None
# Function to process and chunk text
def process_content(text, embeddings, source):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " ", ""]
)
docs = text_splitter.create_documents([text], metadatas=[{"source": source}])
if not docs:
st.error("No documents created from the content.")
return None
vectorstore = FAISS.from_documents(docs, embeddings)
return vectorstore
# Function to create QA chain
def create_qa_chain(vectorstore, llm):
if vectorstore is None:
st.error("Vector store is not initialized. Cannot create QA chain.")
return None
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
llm=llm,
retriever=retriever,
chain_type="stuff",
chain_type_kwargs={
"prompt": qa_prompt,
"document_variable_name": "context"
}
)
return qa_chain
# Reset session state when switching content types
def reset_session_state():
st.session_state.url_content = None
st.session_state.summary = None
st.session_state.vectorstore = None
st.session_state.index_created = False
st.session_state.content_type = None
st.session_state.token_count = 0
if "qa_chain" in st.session_state:
st.session_state.qa_chain = None
# Process Web URL
if process_url_clicked:
with main_container:
if not url.strip():
st.error("Please provide a valid URL.")
else:
with st.spinner("Processing URL..."):
try:
# Reset session state to avoid stale data
reset_session_state()
st.text("Data Loading...Started...βœ…βœ…βœ…")
parse_only = SoupStrainer(['title', 'p', 'h1', 'h2', 'h3'])
loader = WebBaseLoader(
web_paths=[url.strip()],
bs_kwargs={"parse_only": parse_only},
requests_kwargs={"headers": {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}})
data = loader.load()
if not data or all(len(doc.page_content.strip()) == 0 for doc in data):
st.error("No content loaded from URL. Try a different URL (e.g., https://www.bbc.com/news/science-environment-67299122).")
st.stop()
# Initialize embeddings only when needed
if "embeddings" not in st.session_state:
st.session_state.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
st.session_state.url_content = "\n".join([doc.page_content for doc in data])
embeddings = st.session_state.embeddings
st.session_state.vectorstore = process_content(st.session_state.url_content, embeddings, source=url.strip())
st.session_state.index_created = True
st.session_state.content_type = "web"
st.session_state.token_count = estimate_token_count(st.session_state.url_content)
st.text(f"Estimated token count: {st.session_state.token_count}")
st.text("Content processed successfully! βœ…βœ…βœ…")
except Exception as e:
st.error(f"Error processing URL: {str(e)}")
st.stop()
# Process PDF File
if process_pdf_clicked:
with main_container:
if not pdf_file:
st.error("Please upload a PDF file.")
else:
with st.spinner("Processing PDF..."):
try:
# Reset session state to avoid stale data
reset_session_state()
st.text("Extracting Text from PDF...Started...βœ…βœ…βœ…")
pdf_text = extract_text_from_pdf(pdf_file)
if not pdf_text:
st.stop()
# Initialize embeddings only when needed
if "embeddings" not in st.session_state:
st.session_state.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
st.session_state.url_content = pdf_text
embeddings = st.session_state.embeddings
st.session_state.vectorstore = process_content(st.session_state.url_content, embeddings, source=pdf_file.name)
st.session_state.index_created = True
st.session_state.content_type = "pdf"
st.session_state.token_count = estimate_token_count(st.session_state.url_content)
st.text(f"Estimated token count: {st.session_state.token_count}")
st.text("PDF processed successfully! βœ…βœ…βœ…")
except Exception as e:
st.error(f"Error processing PDF: {str(e)}")
st.stop()
# Summary button with token limit check
with main_container:
if st.session_state.url_content:
# Check if content is too large for summarization (threshold: 5,000 tokens to stay under 6,000 TPM limit)
if st.session_state.token_count > 5000 and st.session_state.content_type == "pdf":
st.warning("If the PDF is large, users are requested not to summarize it, rather they can keep asking questions.")
elif st.session_state.token_count > 5000 and st.session_state.content_type == "web":
st.warning("The web content is too large to summarize (estimated tokens: " + str(st.session_state.token_count) + "). Please ask questions instead.")
else:
if st.button("Generate Summary"):
with st.spinner("Generating summary..."):
try:
st.session_state.summary = summarize_content(st.session_state.url_content, st.session_state.llm)
except Exception as e:
st.error(f"Error generating summary: {str(e)}")
if "rate_limit_exceeded" in str(e):
st.warning("The content is too large for summarization due to API rate limits. Please ask questions instead or try a smaller document.")
st.stop()
# Display summary if generated
if st.session_state.summary:
with main_container:
st.header("Summary of the Content")
st.write(st.session_state.summary)
# Query input with Ask button
if st.session_state.url_content and st.session_state.index_created:
with main_container:
st.header("Ask a Question")
query = st.text_input("Question", placeholder="e.g., What is the article about?")
ask_clicked = st.button("Ask")
if ask_clicked and query:
with st.spinner("Processing your question..."):
try:
if "qa_chain" not in st.session_state or st.session_state.qa_chain is None:
st.session_state.qa_chain = create_qa_chain(st.session_state.vectorstore, st.session_state.llm)
if st.session_state.qa_chain is None:
st.error("Failed to create QA chain.")
st.stop()
result = st.session_state.qa_chain({"question": query}, return_only_outputs=True)
if not result.get("answer"):
st.warning("No answer generated. Try a different question or content.")
st.stop()
st.header("Answer")
st.write(result["answer"])
sources = result.get("sources", "")
if sources:
st.subheader("Sources:")
sources_list = sources.split("\n")
for source in sources_list:
st.write(source)
else:
st.write("No sources found.")
except Exception as e:
st.error(f"Error answering query: {str(e)}")
st.stop()
# Footer with tiny logo and text
st.markdown(
"""
<div class="footer">
<img src="https://i.postimg.cc/2j0QWF3Z/Removal-575.png" width="120">
WebChatter Β© 2025 | Developed by Mahatir Ahmed Tusher
</div>
""",
unsafe_allow_html=True
)