Spaces:
Running
Running
| import gradio as gr | |
| import asyncio | |
| import threading | |
| import time | |
| import shutil | |
| from pathlib import Path | |
| import uuid | |
| import os | |
| from dotenv import load_dotenv | |
| # Import your existing modules | |
| from data_loader import load_and_chunk_pdf, embed_texts | |
| from vector_db import QdrantStorage | |
| from custom_types import RAGSearchResult | |
| from openai import OpenAI | |
| load_dotenv() | |
| # Initialize OpenAI client | |
| openai_client = OpenAI() | |
| class RAGProcessor: | |
| def __init__(self): | |
| self.vector_store = QdrantStorage() | |
| self.uploads_dir = Path("uploads") | |
| self.uploads_dir.mkdir(parents=True, exist_ok=True) | |
| def save_uploaded_pdf(self, file) -> Path: | |
| """Save uploaded PDF file with unique name""" | |
| # In Gradio, file is a string path, not a file object | |
| if isinstance(file, str): | |
| # File is already saved by Gradio, just copy it with a unique name | |
| source_path = Path(file) | |
| unique_id = str(uuid.uuid4())[:8] | |
| file_stem = source_path.stem | |
| file_suffix = source_path.suffix | |
| unique_filename = f"{file_stem}_{unique_id}{file_suffix}" | |
| file_path = self.uploads_dir / unique_filename | |
| # Copy the file to our uploads directory | |
| shutil.copy2(source_path, file_path) | |
| return file_path | |
| else: | |
| # Fallback for other file types (shouldn't happen in Gradio) | |
| unique_id = str(uuid.uuid4())[:8] | |
| file_stem = Path(file.name).stem | |
| file_suffix = Path(file.name).suffix | |
| unique_filename = f"{file_stem}_{unique_id}{file_suffix}" | |
| file_path = self.uploads_dir / unique_filename | |
| file_bytes = file.getbuffer() | |
| file_path.write_bytes(file_bytes) | |
| return file_path | |
| def ingest_pdf(self, pdf_path: Path) -> str: | |
| """Process and ingest PDF into vector database""" | |
| try: | |
| # Load and chunk the PDF | |
| chunks = load_and_chunk_pdf(str(pdf_path)) | |
| # Generate embeddings | |
| embeddings = embed_texts(chunks) | |
| # Generate unique IDs | |
| source_id = pdf_path.stem | |
| ids = [str(uuid.uuid5(uuid.NAMESPACE_URL, f"{source_id}:{i}")) for i in range(len(chunks))] | |
| # Create payloads | |
| payloads = [{"source": source_id, "text": chunks[i]} for i in range(len(chunks))] | |
| # Upsert to vector database | |
| self.vector_store.upsert(ids, embeddings, payloads) | |
| return f"Successfully ingested {len(chunks)} chunks from {pdf_path.name}" | |
| except Exception as e: | |
| return f"Error ingesting PDF: {str(e)}" | |
| def query_pdf(self, question: str, top_k: int = 5, source_filter: str = None) -> dict: | |
| """Query the vector database and generate answer""" | |
| try: | |
| # Generate query embedding | |
| query_embedding = embed_texts([question])[0] | |
| # Search vector database | |
| search_results = self.vector_store.search(query_embedding, top_k, source_filter) | |
| if not search_results["contexts"]: | |
| return { | |
| "answer": "No relevant information found in the uploaded PDFs.", | |
| "sources": [], | |
| "contexts": [] | |
| } | |
| # Create context for LLM | |
| context_block = "\n\n".join(f"- {c}" for c in search_results["contexts"]) | |
| user_content = ( | |
| "Use the following context to answer the question.\n\n" | |
| f"Context:\n{context_block}\n\n" | |
| f"Question: {question}\n" | |
| "Answer concisely using the context above." | |
| ) | |
| # Generate answer using OpenAI | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4", | |
| messages=[ | |
| {"role": "system", "content": "You answer questions using only the provided context."}, | |
| {"role": "user", "content": user_content} | |
| ], | |
| max_tokens=1024, | |
| temperature=0.2 | |
| ) | |
| answer = response.choices[0].message.content.strip() | |
| return { | |
| "answer": answer, | |
| "sources": search_results["sources"], | |
| "contexts": search_results["contexts"] | |
| } | |
| except Exception as e: | |
| return { | |
| "answer": f"Error processing query: {str(e)}", | |
| "sources": [], | |
| "contexts": [] | |
| } | |
| def get_most_recent_pdf(self) -> str: | |
| """Get the most recently uploaded PDF filename""" | |
| if not self.uploads_dir.exists(): | |
| return None | |
| pdf_files = list(self.uploads_dir.glob("*.pdf")) | |
| if not pdf_files: | |
| return None | |
| most_recent = max(pdf_files, key=lambda p: p.stat().st_mtime) | |
| return most_recent.stem | |
| # Initialize the RAG processor | |
| rag_processor = RAGProcessor() | |
| def upload_and_ingest_pdf(file): | |
| """Handle PDF upload and ingestion""" | |
| if file is None: | |
| return "Please upload a PDF file." | |
| # Save the uploaded file | |
| pdf_path = rag_processor.save_uploaded_pdf(file) | |
| # Ingest the PDF | |
| result = rag_processor.ingest_pdf(pdf_path) | |
| return result | |
| def ask_question(question, top_k, use_recent_pdf): | |
| """Handle question asking""" | |
| if not question.strip(): | |
| return "Please enter a question.", [] | |
| # Determine source filter | |
| source_filter = None | |
| if use_recent_pdf: | |
| recent_pdf = rag_processor.get_most_recent_pdf() | |
| if recent_pdf: | |
| source_filter = recent_pdf | |
| else: | |
| return "No recent PDF found. Please upload a PDF first.", [] | |
| # Query the system | |
| result = rag_processor.query_pdf(question, int(top_k), source_filter) | |
| # Format sources for display | |
| sources_text = "\n".join([f"• {source}" for source in result["sources"]]) if result["sources"] else "No sources found" | |
| return result["answer"], sources_text | |
| # Create Gradio interface | |
| with gr.Blocks(title="RAG PDF Chat", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 📄 RAG PDF Chat Application") | |
| gr.Markdown("Upload PDFs and ask questions about their content using AI-powered retrieval.") | |
| with gr.Tab("Upload PDF"): | |
| gr.Markdown("### Upload a PDF Document") | |
| pdf_upload = gr.File( | |
| label="Choose a PDF file", | |
| file_types=[".pdf"], | |
| file_count="single" | |
| ) | |
| upload_btn = gr.Button("Upload & Process PDF", variant="primary") | |
| upload_status = gr.Textbox( | |
| label="Upload Status", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| upload_btn.click( | |
| fn=upload_and_ingest_pdf, | |
| inputs=[pdf_upload], | |
| outputs=[upload_status] | |
| ) | |
| with gr.Tab("Ask Questions"): | |
| gr.Markdown("### Ask Questions About Your PDFs") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| question_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="What is the main topic of the document?", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| top_k_slider = gr.Slider( | |
| minimum=1, | |
| maximum=20, | |
| value=5, | |
| step=1, | |
| label="Number of chunks to retrieve" | |
| ) | |
| use_recent_checkbox = gr.Checkbox( | |
| label="Search only in most recent PDF", | |
| value=True | |
| ) | |
| ask_btn = gr.Button("Ask Question", variant="primary") | |
| with gr.Column(scale=2): | |
| recent_pdf_info = gr.Markdown("") | |
| with gr.Row(): | |
| with gr.Column(): | |
| answer_output = gr.Textbox( | |
| label="Answer", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| with gr.Column(): | |
| sources_output = gr.Textbox( | |
| label="Sources", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| # Update recent PDF info | |
| def update_recent_pdf_info(): | |
| recent_pdf = rag_processor.get_most_recent_pdf() | |
| if recent_pdf: | |
| return f"🔍 **Most recent PDF:** {recent_pdf}" | |
| else: | |
| return "⚠️ **No PDFs uploaded yet.**" | |
| # Update the recent PDF info when the demo loads | |
| demo.load( | |
| fn=update_recent_pdf_info, | |
| outputs=[recent_pdf_info] | |
| ) | |
| ask_btn.click( | |
| fn=ask_question, | |
| inputs=[question_input, top_k_slider, use_recent_checkbox], | |
| outputs=[answer_output, sources_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |