| import os |
| import json |
| import re |
| import gradio as gr |
| import requests |
| from duckduckgo_search import DDGS |
| from typing import List |
| from pydantic import BaseModel, Field |
| from tempfile import NamedTemporaryFile |
| from langchain_community.vectorstores import FAISS |
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from llama_parse import LlamaParse |
| from langchain_core.documents import Document |
|
|
| |
| huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") |
| llama_cloud_api_key = os.environ.get("LLAMA_CLOUD_API_KEY") |
|
|
| |
| llama_parser = LlamaParse( |
| api_key=llama_cloud_api_key, |
| result_type="markdown", |
| num_workers=4, |
| verbose=True, |
| language="en", |
| ) |
|
|
| def load_document(file: NamedTemporaryFile, parser: str = "pypdf") -> List[Document]: |
| """Loads and splits the document into pages.""" |
| if parser == "pypdf": |
| loader = PyPDFLoader(file.name) |
| return loader.load_and_split() |
| elif parser == "llamaparse": |
| try: |
| documents = llama_parser.load_data(file.name) |
| return [Document(page_content=doc.text, metadata={"source": file.name}) for doc in documents] |
| except Exception as e: |
| print(f"Error using Llama Parse: {str(e)}") |
| print("Falling back to PyPDF parser") |
| loader = PyPDFLoader(file.name) |
| return loader.load_and_split() |
| else: |
| raise ValueError("Invalid parser specified. Use 'pypdf' or 'llamaparse'.") |
|
|
| def get_embeddings(): |
| return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") |
|
|
| def update_vectors(files, parser): |
| if not files: |
| return "Please upload at least one PDF file." |
| |
| embed = get_embeddings() |
| total_chunks = 0 |
| |
| all_data = [] |
| for file in files: |
| data = load_document(file, parser) |
| all_data.extend(data) |
| total_chunks += len(data) |
| |
| if os.path.exists("faiss_database"): |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
| database.add_documents(all_data) |
| else: |
| database = FAISS.from_documents(all_data, embed) |
| |
| database.save_local("faiss_database") |
| |
| return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files using {parser}." |
|
|
| def generate_chunked_response(prompt, max_tokens=1000, max_chunks=5): |
| API_URL = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.3" |
| headers = {"Authorization": f"Bearer {huggingface_token}"} |
| payload = { |
| "inputs": prompt, |
| "parameters": { |
| "max_new_tokens": max_tokens, |
| "temperature": 0.7, |
| "top_p": 0.95, |
| "top_k": 40, |
| "repetition_penalty": 1.1 |
| } |
| } |
| |
| full_response = "" |
| for _ in range(max_chunks): |
| response = requests.post(API_URL, headers=headers, json=payload) |
| if response.status_code == 200: |
| result = response.json() |
| if isinstance(result, list) and len(result) > 0: |
| chunk = result[0].get('generated_text', '') |
| full_response += chunk |
| if chunk.endswith((".", "!", "?")): |
| break |
| else: |
| break |
| else: |
| break |
| return full_response.strip() |
|
|
| def duckduckgo_search(query): |
| with DDGS() as ddgs: |
| results = ddgs.text(query, max_results=5) |
| return results |
|
|
| class CitingSources(BaseModel): |
| sources: List[str] = Field( |
| ..., |
| description="List of sources to cite. Should be an URL of the source." |
| ) |
|
|
| def get_response_with_search(query): |
| search_results = duckduckgo_search(query) |
| context = "\n".join(f"{result['title']}\n{result['body']}\nSource: {result['href']}\n" |
| for result in search_results if 'body' in result) |
| |
| prompt = f"""<s>[INST] Using the following context: |
| {context} |
| Write a detailed and complete research document that fulfills the following user request: '{query}' |
| After writing the document, please provide a list of sources used in your response. [/INST]""" |
| |
| generated_text = generate_chunked_response(prompt) |
| |
| content_start = generated_text.find("[/INST]") |
| if content_start != -1: |
| generated_text = generated_text[content_start + 7:].strip() |
| |
| parts = generated_text.split("Sources:", 1) |
| main_content = parts[0].strip() |
| sources = parts[1].strip() if len(parts) > 1 else "" |
| |
| return main_content, sources |
|
|
| def get_response_from_pdf(query): |
| embed = get_embeddings() |
| if os.path.exists("faiss_database"): |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
| else: |
| return "No documents available. Please upload PDF documents to answer questions.", "" |
|
|
| retriever = database.as_retriever() |
| relevant_docs = retriever.get_relevant_documents(query) |
| context_str = "\n".join([doc.page_content for doc in relevant_docs]) |
|
|
| prompt = f"""<s>[INST] Using the following context from the PDF documents: |
| {context_str} |
| Write a detailed and complete response that answers the following user question: '{query}' |
| After writing the response, please provide a list of sources used (document names) in your answer. [/INST]""" |
|
|
| generated_text = generate_chunked_response(prompt) |
|
|
| content_start = generated_text.find("[/INST]") |
| if content_start != -1: |
| generated_text = generated_text[content_start + 7:].strip() |
|
|
| parts = generated_text.split("Sources:", 1) |
| main_content = parts[0].strip() |
| sources = parts[1].strip() if len(parts) > 1 else "" |
|
|
| return main_content, sources |
|
|
| def chatbot_interface(message, history, use_web_search): |
| if use_web_search: |
| main_content, sources = get_response_with_search(message) |
| else: |
| main_content, sources = get_response_from_pdf(message) |
| |
| formatted_response = f"{main_content}\n\nSources:\n{sources}" |
| return formatted_response |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# AI-powered Web Search and PDF Chat Assistant") |
| |
| with gr.Row(): |
| file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) |
| parser_dropdown = gr.Dropdown(choices=["pypdf", "llamaparse"], label="Select PDF Parser", value="pypdf") |
| update_button = gr.Button("Upload Document") |
| |
| update_output = gr.Textbox(label="Update Status") |
| update_button.click(update_vectors, inputs=[file_input, parser_dropdown], outputs=update_output) |
| |
| with gr.Row(): |
| chatbot = gr.Chatbot(label="Conversation") |
| with gr.Column(): |
| msg = gr.Textbox(label="Ask a question") |
| use_web_search = gr.Checkbox(label="Use Web Search", value=False) |
| submit = gr.Button("Submit") |
| |
| gr.Examples( |
| examples=[ |
| ["What are the latest developments in AI?"], |
| ["Tell me about recent updates on GitHub"], |
| ["What are the best hotels in Galapagos, Ecuador?"], |
| ["Summarize recent advancements in Python programming"], |
| ], |
| inputs=msg, |
| ) |
|
|
| submit.click(chatbot_interface, inputs=[msg, chatbot, use_web_search], outputs=[chatbot]) |
| msg.submit(chatbot_interface, inputs=[msg, chatbot, use_web_search], outputs=[chatbot]) |
|
|
| gr.Markdown( |
| """ |
| ## How to use |
| 1. Upload PDF documents using the file input at the top. |
| 2. Select the PDF parser (pypdf or llamaparse) and click "Upload Document" to update the vector store. |
| 3. Ask questions in the textbox. |
| 4. Toggle "Use Web Search" to switch between PDF chat and web search. |
| 5. Click "Submit" or press Enter to get a response. |
| """ |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(share=True) |