| import gradio as gr |
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_community.vectorstores import FAISS |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain.chains import RetrievalQA |
| from langchain.prompts import PromptTemplate |
| from langchain_community.chat_models import ChatOpenAI |
| import os |
| from tempfile import NamedTemporaryFile |
|
|
| |
| def load_api_key(): |
| if "OPENROUTER_API_KEY" in os.environ: |
| return os.getenv("OPENROUTER_API_KEY") |
| raise ValueError("API key not found in environment variables") |
| OPENROUTER_API_KEY=load_api_key() |
| |
| def process_pdfs(files): |
| all_chunks = [] |
| for file_info in files: |
| with NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: |
| |
| with open(file_info.name, "rb") as f: |
| tmp_file.write(f.read()) |
| tmp_file_path = tmp_file.name |
| |
| try: |
| loader = PyPDFLoader(tmp_file_path) |
| pages = loader.load() |
| |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| length_function=len |
| ) |
| chunks = text_splitter.split_documents(pages) |
| all_chunks.extend(chunks) |
| finally: |
| os.unlink(tmp_file_path) |
| |
| if not all_chunks: |
| raise ValueError("No content was loaded from the files") |
| |
| embeddings = HuggingFaceEmbeddings( |
| model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2" |
| ) |
| vectorstore = FAISS.from_documents(all_chunks, embeddings) |
| return vectorstore.as_retriever(search_kwargs={"k": 3}) |
|
|
| |
| def load_model(): |
| return ChatOpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=OPENROUTER_API_KEY, |
| model="mistralai/mistral-7b-instruct", |
| temperature=0.3 |
| ) |
|
|
| |
| template = """ |
| You are an intelligent assistant specialized in document analysis. |
| Use the following information from PDF files to answer the question: |
| answer dependent on the language question arabic or english |
| {context} |
| Question: {question} |
| Answer (in detail and in clear language): |
| """ |
|
|
| prompt = PromptTemplate( |
| input_variables=["context", "question"], |
| template=template |
| ) |
|
|
| |
| qa_chain = None |
| chat_history = [] |
|
|
| |
| def respond(message, chat_history): |
| global qa_chain |
| |
| if qa_chain is None: |
| return chat_history + [(message, "Please upload PDF files first")] |
| |
| try: |
| result = qa_chain({"query": message}) |
| response = result["result"] |
| return chat_history + [(message, response)] |
| except Exception as e: |
| return chat_history + [(message, f"An error occurred: {str(e)}")] |
|
|
| |
| def handle_upload(files): |
| global qa_chain |
| try: |
| retriever = process_pdfs(files) |
| llm = load_model() |
| |
| qa_chain = RetrievalQA.from_chain_type( |
| llm=llm, |
| retriever=retriever, |
| chain_type="stuff", |
| chain_type_kwargs={ |
| "prompt": PromptTemplate( |
| template=template, |
| input_variables=["context", "question"] |
| ) |
| }, |
| return_source_documents=False |
| ) |
| return "Files uploaded successfully!" |
| except Exception as e: |
| return f"Error uploading files: {str(e)}" |
|
|
| |
| with gr.Blocks(title="Smart Document Assistant", theme=gr.themes.Default()) as demo: |
| gr.Markdown("# 📄 Smart Document Assistant") |
| gr.Markdown("Upload PDF files then start chatting") |
| |
| |
| chatbot = gr.Chatbot(height=500) |
| |
| |
| with gr.Row(): |
| msg = gr.Textbox( |
| placeholder="Type your question here...", |
| show_label=False, |
| scale=4 |
| ) |
| submit_btn = gr.Button("Send", scale=1) |
| |
| |
| with gr.Row(): |
| file_upload = gr.Files( |
| label="Upload PDF files", |
| file_types=[".pdf"], |
| file_count="multiple" |
| ) |
| upload_status = gr.Textbox(label="Upload Status", interactive=False) |
| |
| clear_btn = gr.Button("Clear Chat") |
| |
|
|
| |
| file_upload.change( |
| handle_upload, |
| inputs=file_upload, |
| outputs=upload_status |
| ) |
| |
| submit_btn.click( |
| respond, |
| inputs=[msg, chatbot], |
| outputs=[chatbot] |
| ).then( |
| lambda: "", |
| None, |
| [msg] |
| ) |
| |
| msg.submit( |
| respond, |
| inputs=[msg, chatbot], |
| outputs=[chatbot] |
| ).then( |
| lambda: "", |
| None, |
| [msg] |
| ) |
| |
| clear_btn.click( |
| lambda: [], |
| None, |
| [chatbot] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| |
| ) |