Spaces:
Running
Running
| import streamlit as st | |
| from dotenv import load_dotenv | |
| from langchain_community.document_loaders import WebBaseLoader | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores.faiss import FAISS | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| import os | |
| from langchain_groq import ChatGroq | |
| from langchain.chains.qa_with_sources.retrieval import RetrievalQAWithSourcesChain | |
| from langchain.prompts import PromptTemplate | |
| from bs4 import SoupStrainer | |
| import PyPDF2 | |
| # Load environment variables | |
| load_dotenv() | |
| # Get Groq API key from environment variable (recommended) or use hardcoded fallback | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| if not GROQ_API_KEY: | |
| GROQ_API_KEY = "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" | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| body { | |
| background: linear-gradient(135deg, #1e3c72, #2a5298); | |
| color: #ffffff; | |
| font-family: 'Arial', sans-serif; | |
| } | |
| .stSidebar, .main .block-container { | |
| background: rgba(255, 255, 255, 0.1); | |
| border-radius: 15px; | |
| backdrop-filter: blur(10px); | |
| -webkit-backdrop-filter: blur(10px); | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| box-shadow: 0 8px 32px rgba(0, 0, 0, 0.2); | |
| padding: 20px; | |
| } | |
| .stTextInput > div > input { | |
| background: rgba(255, 255, 255, 0.15); | |
| color: #ffffff; | |
| border: 1px solid rgba(255, 255, 255, 0.3); | |
| border-radius: 10px; | |
| padding: 10px; | |
| box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1); | |
| } | |
| .stButton > button { | |
| background: linear-gradient(45deg, #6b48ff, #00ddeb); | |
| color: #ffffff; | |
| border: none; | |
| border-radius: 10px; | |
| padding: 10px 20px; | |
| font-weight: bold; | |
| box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2); | |
| transition: transform 0.2s; | |
| } | |
| .stButton > button:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 6px 16px rgba(0, 0, 0, 0.3); | |
| } | |
| h1, h2, h3 { | |
| color: #ffffff; | |
| text-shadow: 0 2px 4px rgba(0, 0, 0, 0.2); | |
| } | |
| .stText { | |
| color: #e0e0e0; | |
| font-weight: bold; | |
| } | |
| .stAlert { | |
| background: rgba(255, 50, 50, 0.2); | |
| border: 1px solid rgba(255, 50, 50, 0.5); | |
| border-radius: 10px; | |
| color: #ffcccc; | |
| } | |
| .stAlert[role="alert"] > div { | |
| background: rgba(255, 200, 0, 0.2); | |
| border: 1px solid rgba(255, 200, 0, 0.5); | |
| color: #fff5cc; | |
| } | |
| .stSpinner > div { | |
| color: #00ddeb; | |
| } | |
| .footer { | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| padding: 10px; | |
| background: rgba(255, 255, 255, 0.1); | |
| border-top: 1px solid rgba(255, 255, 255, 0.2); | |
| position: fixed; | |
| bottom: 0; | |
| width: 100%; | |
| color: #e0e0e0; | |
| font-size: 14px; | |
| } | |
| .footer img { | |
| margin-right: 10px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Display logo as the title | |
| st.image("https://i.postimg.cc/2j0QWF3Z/Removal-575.png", width=390) | |
| # Initialize session state | |
| if "url_content" not in st.session_state: | |
| st.session_state.url_content = None | |
| if "summary" not in st.session_state: | |
| st.session_state.summary = None | |
| if "vectorstore" not in st.session_state: | |
| st.session_state.vectorstore = None | |
| if "index_created" not in st.session_state: | |
| st.session_state.index_created = False | |
| if "content_type" not in st.session_state: | |
| st.session_state.content_type = None | |
| if "token_count" not in st.session_state: | |
| st.session_state.token_count = 0 | |
| # Initialize LLM once at the start | |
| if "llm" not in st.session_state: | |
| st.session_state.llm = ChatGroq( | |
| api_key=GROQ_API_KEY, | |
| model="llama3-70b-8192", | |
| max_tokens=512 # Keep reduced to minimize resource usage | |
| ) | |
| # Sidebar for URL and PDF input | |
| with st.sidebar: | |
| st.header("Enter Web URL") | |
| url = st.text_input("URL", placeholder="e.g., https://mahatirtusher.com/astronomy-mythology/") | |
| process_url_clicked = st.button("Process URL") | |
| st.header("Upload PDF File") | |
| pdf_file = st.file_uploader("Upload a PDF", type=["pdf"], help="Upload a text-based PDF for best results. Please remember, if the uploaded pdf is too large, you are requested not to summarize it. Rather keep asking question") | |
| process_pdf_clicked = st.button("Process PDF") | |
| # Main content container | |
| main_container = st.container() | |
| # Custom prompt for detailed answers | |
| qa_prompt = PromptTemplate( | |
| template="""You are an expert assistant tasked with providing detailed, extensive, and comprehensive answers. Use the provided context to answer the question thoroughly, including explanations, examples, and additional relevant information. If the context is limited, expand on the topic with your knowledge to ensure a complete response. In case of explaining anything, break the topic and explain step by step. Sometimes use your own reasoning and knowledge to explain anything to the users. If the users ask any question in Bengali, you too will answer it in fine and detailed Bengali. | |
| Context: {context} | |
| Question: {question} | |
| Answer with sources: """ | |
| ) | |
| # Function to estimate token count (approximation: 1 token β 4 characters for English text) | |
| def estimate_token_count(text): | |
| if not text: | |
| return 0 | |
| # Approximate token count: 1 token β 4 characters (including spaces and punctuation) | |
| return len(text) // 4 | |
| # Function to summarize content | |
| def summarize_content(content, llm): | |
| # Shorter summary for web URLs and PDFs (5-10 sentences) | |
| summary_prompt = f"""Summarize the following content in 5-10 sentences, capturing the main points and key details in easy expression: | |
| {content} | |
| Summary: """ | |
| summary = llm.invoke(summary_prompt).content | |
| return summary | |
| # Function to extract text from PDF | |
| def extract_text_from_pdf(pdf_file): | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| if not text.strip(): | |
| st.error("No text could be extracted from the PDF. This may be a scanned or image-based PDF. Please upload a text-based PDF.") | |
| return None | |
| return text | |
| except Exception as e: | |
| st.error(f"Error extracting text from PDF: {str(e)}") | |
| return None | |
| # Function to process and chunk text | |
| def process_content(text, embeddings, source): | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| separators=["\n\n", "\n", ".", " ", ""] | |
| ) | |
| docs = text_splitter.create_documents([text], metadatas=[{"source": source}]) | |
| if not docs: | |
| st.error("No documents created from the content.") | |
| return None | |
| vectorstore = FAISS.from_documents(docs, embeddings) | |
| return vectorstore | |
| # Function to create QA chain | |
| def create_qa_chain(vectorstore, llm): | |
| if vectorstore is None: | |
| st.error("Vector store is not initialized. Cannot create QA chain.") | |
| return None | |
| retriever = vectorstore.as_retriever(search_kwargs={"k": 2}) | |
| qa_chain = RetrievalQAWithSourcesChain.from_chain_type( | |
| llm=llm, | |
| retriever=retriever, | |
| chain_type="stuff", | |
| chain_type_kwargs={ | |
| "prompt": qa_prompt, | |
| "document_variable_name": "context" | |
| } | |
| ) | |
| return qa_chain | |
| # Reset session state when switching content types | |
| def reset_session_state(): | |
| st.session_state.url_content = None | |
| st.session_state.summary = None | |
| st.session_state.vectorstore = None | |
| st.session_state.index_created = False | |
| st.session_state.content_type = None | |
| st.session_state.token_count = 0 | |
| if "qa_chain" in st.session_state: | |
| st.session_state.qa_chain = None | |
| # Process Web URL | |
| if process_url_clicked: | |
| with main_container: | |
| if not url.strip(): | |
| st.error("Please provide a valid URL.") | |
| else: | |
| with st.spinner("Processing URL..."): | |
| try: | |
| # Reset session state to avoid stale data | |
| reset_session_state() | |
| st.text("Data Loading...Started...β β β ") | |
| parse_only = SoupStrainer(['title', 'p', 'h1', 'h2', 'h3']) | |
| loader = WebBaseLoader( | |
| web_paths=[url.strip()], | |
| bs_kwargs={"parse_only": parse_only}, | |
| requests_kwargs={"headers": {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}}) | |
| data = loader.load() | |
| if not data or all(len(doc.page_content.strip()) == 0 for doc in data): | |
| st.error("No content loaded from URL. Try a different URL (e.g., https://www.bbc.com/news/science-environment-67299122).") | |
| st.stop() | |
| # Initialize embeddings only when needed | |
| if "embeddings" not in st.session_state: | |
| st.session_state.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| st.session_state.url_content = "\n".join([doc.page_content for doc in data]) | |
| embeddings = st.session_state.embeddings | |
| st.session_state.vectorstore = process_content(st.session_state.url_content, embeddings, source=url.strip()) | |
| st.session_state.index_created = True | |
| st.session_state.content_type = "web" | |
| st.session_state.token_count = estimate_token_count(st.session_state.url_content) | |
| st.text(f"Estimated token count: {st.session_state.token_count}") | |
| st.text("Content processed successfully! β β β ") | |
| except Exception as e: | |
| st.error(f"Error processing URL: {str(e)}") | |
| st.stop() | |
| # Process PDF File | |
| if process_pdf_clicked: | |
| with main_container: | |
| if not pdf_file: | |
| st.error("Please upload a PDF file.") | |
| else: | |
| with st.spinner("Processing PDF..."): | |
| try: | |
| # Reset session state to avoid stale data | |
| reset_session_state() | |
| st.text("Extracting Text from PDF...Started...β β β ") | |
| pdf_text = extract_text_from_pdf(pdf_file) | |
| if not pdf_text: | |
| st.stop() | |
| # Initialize embeddings only when needed | |
| if "embeddings" not in st.session_state: | |
| st.session_state.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| st.session_state.url_content = pdf_text | |
| embeddings = st.session_state.embeddings | |
| st.session_state.vectorstore = process_content(st.session_state.url_content, embeddings, source=pdf_file.name) | |
| st.session_state.index_created = True | |
| st.session_state.content_type = "pdf" | |
| st.session_state.token_count = estimate_token_count(st.session_state.url_content) | |
| st.text(f"Estimated token count: {st.session_state.token_count}") | |
| st.text("PDF processed successfully! β β β ") | |
| except Exception as e: | |
| st.error(f"Error processing PDF: {str(e)}") | |
| st.stop() | |
| # Summary button with token limit check | |
| with main_container: | |
| if st.session_state.url_content: | |
| # Check if content is too large for summarization (threshold: 5,000 tokens to stay under 6,000 TPM limit) | |
| if st.session_state.token_count > 5000 and st.session_state.content_type == "pdf": | |
| st.warning("If the PDF is large, users are requested not to summarize it, rather they can keep asking questions.") | |
| elif st.session_state.token_count > 5000 and st.session_state.content_type == "web": | |
| st.warning("The web content is too large to summarize (estimated tokens: " + str(st.session_state.token_count) + "). Please ask questions instead.") | |
| else: | |
| if st.button("Generate Summary"): | |
| with st.spinner("Generating summary..."): | |
| try: | |
| st.session_state.summary = summarize_content(st.session_state.url_content, st.session_state.llm) | |
| except Exception as e: | |
| st.error(f"Error generating summary: {str(e)}") | |
| if "rate_limit_exceeded" in str(e): | |
| st.warning("The content is too large for summarization due to API rate limits. Please ask questions instead or try a smaller document.") | |
| st.stop() | |
| # Display summary if generated | |
| if st.session_state.summary: | |
| with main_container: | |
| st.header("Summary of the Content") | |
| st.write(st.session_state.summary) | |
| # Query input with Ask button | |
| if st.session_state.url_content and st.session_state.index_created: | |
| with main_container: | |
| st.header("Ask a Question") | |
| query = st.text_input("Question", placeholder="e.g., What is the article about?") | |
| ask_clicked = st.button("Ask") | |
| if ask_clicked and query: | |
| with st.spinner("Processing your question..."): | |
| try: | |
| if "qa_chain" not in st.session_state or st.session_state.qa_chain is None: | |
| st.session_state.qa_chain = create_qa_chain(st.session_state.vectorstore, st.session_state.llm) | |
| if st.session_state.qa_chain is None: | |
| st.error("Failed to create QA chain.") | |
| st.stop() | |
| result = st.session_state.qa_chain({"question": query}, return_only_outputs=True) | |
| if not result.get("answer"): | |
| st.warning("No answer generated. Try a different question or content.") | |
| st.stop() | |
| st.header("Answer") | |
| st.write(result["answer"]) | |
| sources = result.get("sources", "") | |
| if sources: | |
| st.subheader("Sources:") | |
| sources_list = sources.split("\n") | |
| for source in sources_list: | |
| st.write(source) | |
| else: | |
| st.write("No sources found.") | |
| except Exception as e: | |
| st.error(f"Error answering query: {str(e)}") | |
| st.stop() | |
| # Footer with tiny logo and text | |
| st.markdown( | |
| """ | |
| <div class="footer"> | |
| <img src="https://i.postimg.cc/2j0QWF3Z/Removal-575.png" width="120"> | |
| WebChatter Β© 2025 | Developed by Mahatir Ahmed Tusher | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) |