Spaces:
Sleeping
Sleeping
| # app.py - Main application file for Hugging Face Space | |
| import gradio as gr | |
| import os | |
| from typing import List, Tuple | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| import PyPDF2 | |
| import docx | |
| import openai | |
| import tempfile | |
| class RAGChatbot: | |
| def __init__(self): | |
| """Initialize the RAG chatbot with embedding model and vector store.""" | |
| # Initialize embedding model | |
| print("Loading embedding model...") | |
| self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| # Initialize vector store (FAISS) | |
| self.dimension = 384 # Dimension for all-MiniLM-L6-v2 | |
| self.index = faiss.IndexFlatL2(self.dimension) | |
| # Store for document chunks | |
| self.documents = [] | |
| # Text splitter for chunking documents | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=50, | |
| length_function=len, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| # Get OpenAI API key from Hugging Face Secrets | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| def read_pdf(self, file_path: str) -> str: | |
| """Extract text from PDF file.""" | |
| text = "" | |
| try: | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page_num in range(len(pdf_reader.pages)): | |
| page = pdf_reader.pages[page_num] | |
| text += page.extract_text() or "" | |
| except Exception as e: | |
| print(f"Error reading PDF: {e}") | |
| return text | |
| def read_docx(self, file_path: str) -> str: | |
| """Extract text from DOCX file.""" | |
| text = "" | |
| try: | |
| doc = docx.Document(file_path) | |
| for paragraph in doc.paragraphs: | |
| text += paragraph.text + "\n" | |
| except Exception as e: | |
| print(f"Error reading DOCX: {e}") | |
| return text | |
| def read_txt(self, file_path: str) -> str: | |
| """Read text from TXT file.""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| except Exception as e: | |
| print(f"Error reading TXT: {e}") | |
| return "" | |
| def process_documents(self, files) -> str: | |
| """Process uploaded documents and add to vector store.""" | |
| if not files: | |
| return "No files uploaded." | |
| all_text = "" | |
| processed_files = 0 | |
| for file in files: | |
| try: | |
| # Get file extension | |
| file_path = file.name | |
| # Read file based on extension | |
| if file_path.endswith('.pdf'): | |
| text = self.read_pdf(file_path) | |
| elif file_path.endswith('.docx'): | |
| text = self.read_docx(file_path) | |
| elif file_path.endswith('.txt'): | |
| text = self.read_txt(file_path) | |
| else: | |
| continue | |
| all_text += text + "\n" | |
| processed_files += 1 | |
| except Exception as e: | |
| print(f"Error processing file {file.name}: {e}") | |
| continue | |
| if not all_text.strip(): | |
| return "No text content found in the uploaded documents." | |
| # Split text into chunks | |
| chunks = self.text_splitter.split_text(all_text) | |
| if not chunks: | |
| return "No text chunks created from documents." | |
| # Create embeddings for chunks | |
| embeddings = self.embedding_model.encode(chunks) | |
| # Add to FAISS index | |
| for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): | |
| self.index.add(np.array([embedding])) | |
| self.documents.append(chunk) | |
| return f"✅ Successfully processed {len(chunks)} text chunks from {processed_files} documents." | |
| def retrieve_relevant_chunks(self, query: str, k: int = 3) -> List[str]: | |
| """Retrieve k most relevant chunks for the query.""" | |
| if len(self.documents) == 0: | |
| return [] | |
| # Create embedding for query | |
| query_embedding = self.embedding_model.encode([query]) | |
| # Search in FAISS index | |
| distances, indices = self.index.search(query_embedding, min(k, len(self.documents))) | |
| # Get relevant documents | |
| relevant_chunks = [self.documents[idx] for idx in indices[0] if idx < len(self.documents)] | |
| return relevant_chunks | |
| def generate_response(self, query: str, context: List[str]) -> str: | |
| """Generate response using OpenAI API with retrieved context.""" | |
| if not openai.api_key: | |
| return "⚠️ OpenAI API key not configured. Please add OPENAI_API_KEY to the Space secrets." | |
| if not context: | |
| return "No relevant documents found. Please upload documents first." | |
| # Prepare context string | |
| context_str = "\n\n".join(context[:3]) # Limit context to avoid token limits | |
| # Create prompt | |
| prompt = f"""You are a helpful assistant. Use the following context to answer the question. | |
| If you cannot answer the question based on the context, say so. | |
| Context: | |
| {context_str} | |
| Question: {query} | |
| Answer:""" | |
| try: | |
| # Call OpenAI API (updated for new API) | |
| from openai import OpenAI | |
| client = OpenAI(api_key=openai.api_key) | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error generating response: {str(e)}" | |
| def chat(self, message: str, history: List[Tuple[str, str]]) -> Tuple[str, List[Tuple[str, str]]]: | |
| """Main chat function that combines retrieval and generation.""" | |
| if not message.strip(): | |
| return "", history | |
| # Retrieve relevant chunks | |
| relevant_chunks = self.retrieve_relevant_chunks(message) | |
| # Generate response | |
| response = self.generate_response(message, relevant_chunks) | |
| # Update history | |
| history.append((message, response)) | |
| return "", history | |
| # Initialize the chatbot | |
| print("Initializing RAG Chatbot...") | |
| chatbot = RAGChatbot() | |
| # Create Gradio interface | |
| with gr.Blocks(title="RAG Chatbot", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # 🤖 RAG Chatbot with Gradio | |
| Upload your documents and start asking questions! The chatbot will retrieve relevant information from your documents to answer your queries. | |
| **Supported formats:** PDF, DOCX, TXT | **Powered by:** Sentence-BERT + FAISS + OpenAI | |
| """ | |
| ) | |
| with gr.Tab("📄 Upload Documents"): | |
| file_upload = gr.File( | |
| label="Upload Documents", | |
| file_count="multiple", | |
| file_types=[".pdf", ".docx", ".txt"] | |
| ) | |
| upload_button = gr.Button("Process Documents", variant="primary") | |
| upload_status = gr.Textbox(label="Status", interactive=False) | |
| upload_button.click( | |
| fn=chatbot.process_documents, | |
| inputs=[file_upload], | |
| outputs=[upload_status] | |
| ) | |
| with gr.Tab("💬 Chat"): | |
| chatbot_interface = gr.Chatbot( | |
| label="Chat History", | |
| height=400, | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Your Question", | |
| placeholder="Ask a question about your documents...", | |
| lines=1, | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Send", variant="primary", scale=1) | |
| clear = gr.Button("🗑️ Clear Chat") | |
| # Handle message submission | |
| msg.submit( | |
| fn=chatbot.chat, | |
| inputs=[msg, chatbot_interface], | |
| outputs=[msg, chatbot_interface] | |
| ) | |
| submit_btn.click( | |
| fn=chatbot.chat, | |
| inputs=[msg, chatbot_interface], | |
| outputs=[msg, chatbot_interface] | |
| ) | |
| # Clear chat history | |
| clear.click( | |
| lambda: (None, []), | |
| outputs=[msg, chatbot_interface] | |
| ) | |
| with gr.Tab("⚙️ Settings"): | |
| gr.Markdown( | |
| """ | |
| ### Configuration | |
| | Component | Details | | |
| |-----------|---------| | |
| | **Embedding Model** | all-MiniLM-L6-v2 | | |
| | **Vector Store** | FAISS | | |
| | **LLM** | OpenAI GPT-3.5-turbo | | |
| | **Chunk Size** | 500 characters | | |
| | **Chunk Overlap** | 50 characters | | |
| | **Retrieved Chunks** | 3 | | |
| ### About | |
| This RAG chatbot uses retrieval-augmented generation to answer questions based on your uploaded documents. | |
| """ | |
| ) | |
| # Launch the app | |
| demo.launch() | |
| # ----------------------------------- | |
| # requirements.txt - Dependencies file | |
| """ | |
| gradio==4.19.2 | |
| sentence-transformers==2.3.1 | |
| faiss-cpu==1.7.4 | |
| langchain==0.1.6 | |
| openai==1.12.0 | |
| PyPDF2==3.0.1 | |
| python-docx==1.1.0 | |
| numpy==1.24.3 | |
| """ | |
| # ----------------------------------- | |
| # README.md - Documentation for your Space | |
| """ | |
| --- | |
| title: RAG Chatbot | |
| emoji: 🤖 | |
| colorFrom: blue | |
| colorTo: green | |
| sdk: gradio | |
| sdk_version: 4.19.2 | |
| app_file: app.py | |
| pinned: false | |
| license: mit | |
| --- | |
| # RAG Chatbot | |
| A Retrieval-Augmented Generation chatbot built with Gradio, FAISS, and OpenAI. | |
| ## Features | |
| - Upload PDF, DOCX, and TXT documents | |
| - Semantic search using Sentence-BERT embeddings | |
| - Context-aware responses using OpenAI GPT-3.5 | |
| - Interactive chat interface | |
| ## Setup | |
| Add your OpenAI API key to the Space secrets: | |
| 1. Go to Settings → Variables and secrets | |
| 2. Add a new secret named `OPENAI_API_KEY` | |
| 3. Paste your OpenAI API key | |
| ## Usage | |
| 1. Upload your documents in the Upload Documents tab | |
| 2. Wait for processing confirmation | |
| 3. Go to the Chat tab and start asking questions! | |
| Check out the [GitHub repository](https://github.com/yourusername/rag-chatbot) for more details. | |
| """ |