Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import fitz # PyMuPDF | |
| import os | |
| import time | |
| import tempfile | |
| import faiss | |
| import numpy as np | |
| import json | |
| from dotenv import load_dotenv | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.docstore.document import Document | |
| from keybert import KeyBERT | |
| from textblob import TextBlob | |
| from groq import Groq, RateLimitError | |
| # Load environment | |
| load_dotenv() | |
| client = Groq(api_key=os.environ.get("wbm1")) | |
| GROQ_MODEL = "llama3-8b-8192" | |
| # Streamlit setup | |
| st.set_page_config(page_title="π§ Smart PDF ChatBot", layout="centered") | |
| st.title("π¬ Smart PDF ChatBot") | |
| st.markdown(""" | |
| Upload one or more PDFs. Get summaries, insights, and interact with AI about the content using a persistent memory chat. | |
| """) | |
| uploaded_files = st.file_uploader("π Upload PDF files", type=["pdf"], accept_multiple_files=True) | |
| # Utilities | |
| def extract_text_from_pdf(file): | |
| doc = fitz.open(stream=file.read(), filetype="pdf") | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| return text | |
| def split_text(text): | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=150) | |
| return splitter.split_text(text) | |
| def create_vector_store(chunks): | |
| documents = [Document(page_content=c) for c in chunks] | |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| return FAISS.from_documents(documents, embeddings) | |
| def summarize_chunks(chunks): | |
| chunk_summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| while True: | |
| try: | |
| response = client.chat.completions.create( | |
| model=GROQ_MODEL, | |
| messages=[ | |
| {"role": "system", "content": "You are an AI that summarizes documents."}, | |
| {"role": "user", "content": f"Summarize this chunk:\n{chunk}"} | |
| ] | |
| ) | |
| chunk_summaries.append(response.choices[0].message.content) | |
| break | |
| except RateLimitError as e: | |
| error_data = json.loads(str(e).split(" - ", 1)[-1]) | |
| wait_time = float(error_data["error"]["message"].split("in ")[-1].split("s")[0]) | |
| st.warning(f"Rate limit hit while summarizing. Retrying in {wait_time:.2f} seconds...") | |
| time.sleep(wait_time) | |
| except Exception as e: | |
| chunk_summaries.append(f"[Error summarizing chunk {i}]: {str(e)}") | |
| break | |
| return "\n".join(chunk_summaries) | |
| def ask_question(vectorstore, question): | |
| docs = vectorstore.similarity_search(question, k=3) | |
| context = "\n".join([d.page_content for d in docs]) | |
| while True: | |
| try: | |
| response = client.chat.completions.create( | |
| model=GROQ_MODEL, | |
| messages=[ | |
| {"role": "system", "content": "You answer questions based on document context."}, | |
| {"role": "user", "content": f"Context:\n{context}\n\nQuestion:\n{question}"} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| except RateLimitError as e: | |
| error_data = json.loads(str(e).split(" - ", 1)[-1]) | |
| wait_time = float(error_data["error"]["message"].split("in ")[-1].split("s")[0]) | |
| st.warning(f"Rate limit hit. Retrying in {wait_time:.2f} seconds...") | |
| time.sleep(wait_time) | |
| except Exception as e: | |
| return f"[Error answering question]: {str(e)}" | |
| def extract_keywords(text, top_n=10): | |
| kw_model = KeyBERT() | |
| keywords = kw_model.extract_keywords(text, top_n=top_n, stop_words='english') | |
| return [kw[0] for kw in keywords] | |
| def get_sentiment(text): | |
| blob = TextBlob(text) | |
| polarity = blob.sentiment.polarity | |
| if polarity > 0.2: | |
| return "π Positive" | |
| elif polarity < -0.2: | |
| return "π Negative" | |
| else: | |
| return "π Neutral" | |
| def make_download_button(text, filename="summary.txt"): | |
| st.download_button("πΎ Download Summary", data=text, file_name=filename, mime="text/plain") | |
| # App logic | |
| if uploaded_files: | |
| all_text = "" | |
| for file in uploaded_files: | |
| st.write(f"π Processing {file.name}...") | |
| text = extract_text_from_pdf(file) | |
| all_text += f"\n\n{text}" | |
| st.subheader("π Extracting Insights...") | |
| chunks = split_text(all_text) | |
| vectorstore = create_vector_store(chunks) | |
| st.write("π Generating summary...") | |
| summary = summarize_chunks(chunks) | |
| st.success(summary) | |
| make_download_button(summary) | |
| st.subheader("π Keywords") | |
| keywords = extract_keywords(summary) | |
| st.write(", ".join(keywords)) | |
| st.subheader("π Sentiment") | |
| sentiment = get_sentiment(summary) | |
| st.write(sentiment) | |
| st.markdown("---") | |
| st.subheader("π¬ Ask a question about the documents") | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| user_question = st.text_input("Type your question") | |
| if user_question: | |
| with st.spinner("π€ Thinking..."): | |
| answer = ask_question(vectorstore, user_question) | |
| st.session_state.chat_history.append((user_question, answer)) | |
| for q, a in st.session_state.chat_history: | |
| st.markdown(f"**You:** {q}") | |
| st.markdown(f"**AI:** {a}") | |
| else: | |
| st.info("π₯ Upload one or more PDF files to get started.") | |