Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_groq import ChatGroq | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.runnables import RunnableLambda | |
| # ββ Global state ββββββββββββββββββββββββββββββββββββββββββββββ | |
| vectorstore = None | |
| qa_chain = None | |
| retrieved_docs = {} | |
| # ββ Groq key from HF Secret βββββββββββββββββββββββββββββββββββ | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "") | |
| # ββ Embedding model (loaded once) βββββββββββββββββββββββββββββ | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": "cpu"}, | |
| encode_kwargs={"normalize_embeddings": True} | |
| ) | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def format_docs(docs): | |
| return "\n\n".join( | |
| f"[Page {doc.metadata.get('page', '?') + 1}]\n{doc.page_content}" | |
| for doc in docs | |
| ) | |
| def build_chain(): | |
| llm = ChatGroq( | |
| api_key=GROQ_API_KEY, | |
| model="llama-3.3-70b-versatile", | |
| temperature=0.2, | |
| max_tokens=1024, | |
| ) | |
| PROMPT = PromptTemplate( | |
| template="""You are a helpful assistant. Use the context below to answer the question. | |
| If the answer is not in the context, say "I don't have enough information to answer that." | |
| Context: | |
| {context} | |
| Question: {question} | |
| Answer:""", | |
| input_variables=["context", "question"] | |
| ) | |
| retriever = vectorstore.as_retriever( | |
| search_type="similarity", | |
| search_kwargs={"k": 4} | |
| ) | |
| def retrieve_and_format(input_dict): | |
| question = input_dict["question"] | |
| docs = retriever.invoke(question) | |
| retrieved_docs["docs"] = docs | |
| return { | |
| "context": format_docs(docs), | |
| "question": question | |
| } | |
| return ( | |
| RunnableLambda(retrieve_and_format) | |
| | PROMPT | |
| | llm | |
| | StrOutputParser() | |
| ) | |
| # ββ Core functions ββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_pdf(pdf_file, progress=gr.Progress()): | |
| global vectorstore, qa_chain | |
| if pdf_file is None: | |
| return "β οΈ Please upload a PDF file." | |
| if not GROQ_API_KEY: | |
| return "β GROQ_API_KEY secret is not set in HF Space settings." | |
| try: | |
| progress(0.1, desc="Loading PDF...") | |
| loader = PyPDFLoader(pdf_file.name) | |
| pages = loader.load() | |
| progress(0.3, desc="Splitting into chunks...") | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=800, | |
| chunk_overlap=100, | |
| separators=["\n\n", "\n", ".", " "] | |
| ) | |
| chunks = splitter.split_documents(pages) | |
| progress(0.6, desc="Building FAISS index...") | |
| vectorstore = FAISS.from_documents(chunks, embeddings) | |
| progress(0.9, desc="Setting up RAG chain...") | |
| qa_chain = build_chain() | |
| progress(1.0, desc="Done!") | |
| return f"β Ready! Loaded **{len(pages)} pages** β **{len(chunks)} chunks**." | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| def answer_question(question, history): | |
| if vectorstore is None or qa_chain is None: | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": "β οΈ Please upload a PDF first."}) | |
| return "", history | |
| if not question.strip(): | |
| return "", history | |
| try: | |
| answer = qa_chain.invoke({"question": question}) | |
| docs = retrieved_docs.get("docs", []) | |
| if docs: | |
| pages = sorted(set( | |
| doc.metadata.get("page", 0) + 1 | |
| for doc in docs | |
| )) | |
| answer += f"\n\nπ *Sources: pages {pages}*" | |
| except Exception as e: | |
| answer = f"β Error: {str(e)}" | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": answer}) | |
| return "", history | |
| def clear_all(): | |
| global vectorstore, qa_chain, retrieved_docs | |
| vectorstore = None | |
| qa_chain = None | |
| retrieved_docs = {} | |
| return [], "", "ποΈ Cleared. Upload a new PDF to start again." | |
| # ββ Gradio UI βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="PDF RAG Chatbot") as demo: | |
| gr.Markdown("# π PDF RAG Chatbot\nUpload a PDF and ask questions about it.") | |
| with gr.Row(): | |
| # ββ Left panel ββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Upload Document") | |
| pdf_upload = gr.File(label="Choose PDF", file_types=[".pdf"]) | |
| process_btn = gr.Button("π₯ Process PDF", variant="primary") | |
| status_box = gr.Markdown("*Upload a PDF to begin.*") | |
| # ββ Right panel βββββββββββββββββββββββββββββββββββββββ | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π¬ Chat") | |
| chatbot = gr.Chatbot(height=500) # β type="messages" removed | |
| with gr.Row(): | |
| question_box = gr.Textbox( | |
| placeholder="Ask a question about your PDF...", | |
| show_label=False, | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Send", variant="primary", scale=1) | |
| clear_btn = gr.Button("ποΈ Clear Chat & Reset") | |
| # ββ Event handlers ββββββββββββββββββββββββββββββββββββββββ | |
| process_btn.click( | |
| process_pdf, | |
| inputs=[pdf_upload], | |
| outputs=[status_box] | |
| ) | |
| submit_btn.click( | |
| answer_question, | |
| inputs=[question_box, chatbot], | |
| outputs=[question_box, chatbot] | |
| ) | |
| question_box.submit( | |
| answer_question, | |
| inputs=[question_box, chatbot], | |
| outputs=[question_box, chatbot] | |
| ) | |
| clear_btn.click( | |
| clear_all, | |
| outputs=[chatbot, question_box, status_box] | |
| ) | |
| demo.launch(theme=gr.themes.Soft()) |