Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| # Get Hugging Face token from environment (set in Spaces Secrets) | |
| HF_TOKEN = os.environ.get("HUGGINGFACE_TOKEN") | |
| # Model repository details | |
| HF_USERNAME = "khysam2022" | |
| HF_MODEL_NAME = "RAG-DSE-PAST-PAPER-2012-ICT" | |
| MODEL_REPO = f"{HF_USERNAME}/{HF_MODEL_NAME}" | |
| # Global variables for model and vectorstore | |
| model = None | |
| tokenizer = None | |
| vectorstore = None | |
| def load_model(): | |
| """Load the model and tokenizer""" | |
| global model, tokenizer | |
| if model is not None and tokenizer is not None: | |
| return model, tokenizer | |
| print(f"Loading model {MODEL_REPO}...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_REPO, token=HF_TOKEN) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_REPO, | |
| token=HF_TOKEN, | |
| torch_dtype=torch.float16, | |
| device_map="auto" | |
| ) | |
| return model, tokenizer | |
| def process_pdf(pdf_file): | |
| """Process a PDF for RAG""" | |
| global vectorstore | |
| try: | |
| # Save the uploaded file | |
| pdf_path = "uploaded_document.pdf" | |
| with open(pdf_path, "wb") as f: | |
| f.write(pdf_file) | |
| # Load and split the PDF | |
| loader = PyPDFLoader(pdf_path) | |
| documents = loader.load() | |
| # Split documents into chunks | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| chunks = text_splitter.split_documents(documents) | |
| # Create embeddings and vectorstore | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| vectorstore = FAISS.from_documents(chunks, embeddings) | |
| # Cleanup | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| return f"β PDF processed successfully! Found {len(chunks)} text chunks." | |
| except Exception as e: | |
| return f"β Error processing PDF: {str(e)}" | |
| def generate_answer(query): | |
| """Generate answer using the model""" | |
| if model is None or tokenizer is None: | |
| try: | |
| load_model() | |
| except Exception as e: | |
| return f"β Error loading model: {str(e)}" | |
| if vectorstore is None: | |
| return "Please upload a PDF document first." | |
| try: | |
| # Retrieve relevant context | |
| relevant_docs = vectorstore.similarity_search(query, k=3) | |
| context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
| # Create prompt with context | |
| prompt = f""" | |
| You are a helpful assistant analyzing a document. Using only the provided context, answer the question. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Answer: | |
| """ | |
| # Generate response | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=300, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.9, | |
| ) | |
| # Decode and return response | |
| response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) | |
| return response | |
| except Exception as e: | |
| return f"β Error generating answer: {str(e)}" | |
| def direct_query(message): | |
| """Direct query without RAG""" | |
| if model is None or tokenizer is None: | |
| try: | |
| load_model() | |
| except Exception as e: | |
| return f"β Error loading model: {str(e)}" | |
| try: | |
| # Create prompt | |
| prompt = f"User: {message}\nAssistant: " | |
| # Generate response | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=300, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.9, | |
| ) | |
| # Decode and return response | |
| response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) | |
| return response | |
| except Exception as e: | |
| return f"β Error generating answer: {str(e)}" | |
| # Define Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# RAG-DSE-PAST-PAPER-2012-ICT") | |
| gr.Markdown("This demo allows you to chat with the model and ask questions about uploaded documents.") | |
| with gr.Tab("RAG Query"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| pdf_upload = gr.File(label="Upload PDF Document") | |
| process_button = gr.Button("Process Document") | |
| status_text = gr.Textbox(label="Processing Status", interactive=False) | |
| process_button.click(process_pdf, inputs=[pdf_upload], outputs=[status_text]) | |
| with gr.Column(): | |
| query_input = gr.Textbox(label="Your Question", placeholder="Ask a question about the document...") | |
| query_button = gr.Button("Ask Question") | |
| answer_output = gr.Textbox(label="Answer", interactive=False) | |
| query_button.click(generate_answer, inputs=[query_input], outputs=[answer_output]) | |
| with gr.Tab("Direct Chat"): | |
| chat_input = gr.Textbox(label="Your Message", placeholder="Type your message here...") | |
| chat_button = gr.Button("Send Message") | |
| chat_output = gr.Textbox(label="Response", interactive=False) | |
| chat_button.click(direct_query, inputs=[chat_input], outputs=[chat_output]) | |
| # Launch the app | |
| demo.launch() |