Spaces:
Sleeping
Sleeping
| """ | |
| Gradio interface for the RAG-based response system. | |
| This will be deployed to your Hugging Face Space. | |
| """ | |
| import gradio as gr | |
| import os | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| import torch | |
| from transformers import AutoTokenizer, AutoModel, pipeline | |
| from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| # Define paths | |
| DOCUMENTS_DIR = Path("documents") | |
| DOCUMENTS_DIR.mkdir(exist_ok=True) | |
| VECTOR_DB_PATH = Path("vector_db") | |
| # Initialize models | |
| model_name = "sentence-transformers/all-MiniLM-L6-v2" | |
| embeddings = HuggingFaceEmbeddings(model_name=model_name) | |
| # Initialize vector store | |
| if VECTOR_DB_PATH.exists(): | |
| try: | |
| vector_db = FAISS.load_local(str(VECTOR_DB_PATH), embeddings) | |
| print("Loaded existing vector database.") | |
| except Exception as e: | |
| print(f"Error loading vector database: {e}") | |
| vector_db = None | |
| else: | |
| vector_db = None | |
| # Define possible intents | |
| POSSIBLE_INTENTS = [ | |
| "product_inquiry", | |
| "technical_support", | |
| "billing_question", | |
| "general_information", | |
| "appointment_scheduling", | |
| "complaint", | |
| "other" | |
| ] | |
| # Default responses for when RAG fails or no documents are available | |
| DEFAULT_RESPONSES = { | |
| "product_inquiry": "Thank you for your interest in our products. I'll gather the information and have someone contact you with more details.", | |
| "technical_support": "I understand you're experiencing technical issues. Let me find the right person to help you resolve this.", | |
| "billing_question": "Thank you for your billing inquiry. I'll connect you with our billing department for assistance.", | |
| "general_information": "Thank you for reaching out. I'll make sure you get the information you need.", | |
| "appointment_scheduling": "I'd be happy to help schedule an appointment for you. Let me find the next available slot.", | |
| "complaint": "I'm sorry to hear about your experience. Your feedback is important to us, and we'll address this promptly.", | |
| "other": "Thank you for your call. I'll make sure your message gets to the right person." | |
| } | |
| def load_pdf(file): | |
| """Load a PDF document into the vector store""" | |
| global vector_db | |
| try: | |
| # Save the uploaded file temporarily | |
| temp_dir = tempfile.mkdtemp() | |
| temp_path = os.path.join(temp_dir, file.name) | |
| with open(temp_path, "wb") as f: | |
| f.write(file.read()) | |
| # Save a copy to the documents directory | |
| target_path = os.path.join(DOCUMENTS_DIR, file.name) | |
| with open(target_path, "wb") as f: | |
| with open(temp_path, "rb") as src: | |
| f.write(src.read()) | |
| # Load and process the PDF | |
| loader = PyPDFLoader(temp_path) | |
| documents = loader.load() | |
| # Split the documents | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| chunks = text_splitter.split_documents(documents) | |
| # Update or create vector store | |
| if vector_db is None: | |
| vector_db = FAISS.from_documents(chunks, embeddings) | |
| vector_db.save_local(str(VECTOR_DB_PATH)) | |
| else: | |
| vector_db.add_documents(chunks) | |
| vector_db.save_local(str(VECTOR_DB_PATH)) | |
| return f"Successfully added {file.name} to the knowledge base with {len(chunks)} chunks." | |
| except Exception as e: | |
| return f"Error processing PDF: {str(e)}" | |
| def load_website(url): | |
| """Load a website into the vector store""" | |
| global vector_db | |
| try: | |
| # Load content from website | |
| loader = WebBaseLoader(url) | |
| documents = loader.load() | |
| # Save the URL reference | |
| with open(os.path.join(DOCUMENTS_DIR, "websites.txt"), "a") as f: | |
| f.write(f"{url}\n") | |
| # Split the documents | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| chunks = text_splitter.split_documents(documents) | |
| # Update or create vector store | |
| if vector_db is None: | |
| vector_db = FAISS.from_documents(chunks, embeddings) | |
| vector_db.save_local(str(VECTOR_DB_PATH)) | |
| else: | |
| vector_db.add_documents(chunks) | |
| vector_db.save_local(str(VECTOR_DB_PATH)) | |
| return f"Successfully added {url} to the knowledge base with {len(chunks)} chunks." | |
| except Exception as e: | |
| return f"Error processing website: {str(e)}" | |
| def generate_response(query, intent=None): | |
| """Generate a response based on the query and intent""" | |
| global vector_db | |
| # If no intent provided, use a default | |
| if not intent or intent not in POSSIBLE_INTENTS: | |
| intent = "general_information" | |
| # If no vector database, return default response | |
| if vector_db is None: | |
| return DEFAULT_RESPONSES.get(intent, DEFAULT_RESPONSES["other"]) | |
| try: | |
| # Query the vector database | |
| retrieved_docs = vector_db.similarity_search(query, k=3) | |
| if not retrieved_docs: | |
| return DEFAULT_RESPONSES.get(intent, DEFAULT_RESPONSES["other"]) | |
| # Combine retrieved document chunks | |
| context = "\n\n".join([doc.page_content for doc in retrieved_docs]) | |
| # Create prompt for the model | |
| prompt = f""" | |
| Based on the following context information and the user's query, | |
| provide a helpful, professional, and concise response. | |
| Context: | |
| {context} | |
| User query: {query} | |
| Intent: {intent} | |
| Response: | |
| """ | |
| # Generate response using a pre-trained model | |
| tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base") | |
| model = AutoModel.from_pretrained("google/flan-t5-base") | |
| inputs = tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True) | |
| outputs = model.generate( | |
| inputs["input_ids"], | |
| max_length=200, | |
| num_beams=4, | |
| early_stopping=True | |
| ) | |
| response_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # If model output is empty or too short, use retrieved text with a prefix | |
| if len(response_text) < 20: | |
| response_text = f"Based on the information I have: {context[:200]}..." | |
| return response_text | |
| except Exception as e: | |
| print(f"Error generating response: {e}") | |
| return DEFAULT_RESPONSES.get(intent, DEFAULT_RESPONSES["other"]) | |
| def list_documents(): | |
| """List all documents in the knowledge base""" | |
| files = list(DOCUMENTS_DIR.glob("*.pdf")) | |
| # Add websites if available | |
| website_file = DOCUMENTS_DIR / "websites.txt" | |
| websites = [] | |
| if website_file.exists(): | |
| with open(website_file, "r") as f: | |
| websites = [line.strip() for line in f if line.strip()] | |
| return { | |
| "PDFs": [f.name for f in files], | |
| "Websites": websites | |
| } | |
| # Create the Gradio interface | |
| with gr.Blocks(title="Call Assistant RAG System") as demo: | |
| gr.Markdown("# Call Assistant RAG System") | |
| gr.Markdown("Add documents and websites to the knowledge base, and test the response generation.") | |
| with gr.Tab("Add Knowledge"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| pdf_input = gr.File(label="Upload PDF Document") | |
| pdf_button = gr.Button("Add PDF to Knowledge Base") | |
| pdf_output = gr.Textbox(label="PDF Upload Status") | |
| pdf_button.click( | |
| load_pdf, | |
| inputs=[pdf_input], | |
| outputs=[pdf_output] | |
| ) | |
| with gr.Column(): | |
| url_input = gr.Textbox(label="Website URL") | |
| url_button = gr.Button("Add Website to Knowledge Base") | |
| url_output = gr.Textbox(label="Website Status") | |
| url_button.click( | |
| load_website, | |
| inputs=[url_input], | |
| outputs=[url_output] | |
| ) | |
| with gr.Tab("Knowledge Base"): | |
| list_button = gr.Button("List Documents in Knowledge Base") | |
| knowledge_output = gr.JSON(label="Documents") | |
| list_button.click( | |
| list_documents, | |
| inputs=[], | |
| outputs=[knowledge_output] | |
| ) | |
| with gr.Tab("Test Response Generation"): | |
| with gr.Row(): | |
| query_input = gr.Textbox(label="Query / Transcription") | |
| intent_input = gr.Dropdown( | |
| choices=POSSIBLE_INTENTS, | |
| label="Intent", | |
| value="general_information" | |
| ) | |
| test_button = gr.Button("Generate Response") | |
| response_output = gr.Textbox(label="Generated Response") | |
| test_button.click( | |
| generate_response, | |
| inputs=[query_input, intent_input], | |
| outputs=[response_output] | |
| ) | |
| with gr.Tab("API"): | |
| gr.Markdown(""" | |
| ## API Documentation | |
| This Gradio app exposes an API endpoint that can be used by your Twilio application. | |
| ### Endpoint: `/api/predict` | |
| **Method:** POST | |
| **Input:** | |
| ```json | |
| { | |
| "data": [ | |
| "user's transcribed query", | |
| "detected intent" | |
| ] | |
| } | |
| ``` | |
| **Output:** | |
| ```json | |
| { | |
| "data": [ | |
| "generated response" | |
| ], | |
| "duration": 1.2345 | |
| } | |
| ``` | |
| **Example Python code to call the API:** | |
| ```python | |
| import requests | |
| response = requests.post( | |
| "https://huggingface.co/spaces/iajitpanday/vBot-1.5/api/predict", | |
| json={"data": ["How do I reset my password?", "technical_support"]} | |
| ) | |
| result = response.json() | |
| generated_response = result["data"][0] | |
| ``` | |
| """) | |
| # Define API function for Gradio Spaces | |
| def api_response(query, intent=None): | |
| """API function for Gradio Spaces""" | |
| response = generate_response(query, intent) | |
| return [response] | |
| # Define the API | |
| demo.queue() | |
| demo.launch() |