Spaces:
Sleeping
Sleeping
| import faiss | |
| from annoy import AnnoyIndex | |
| # Build Annoy index | |
| def create_annoy_index(embeddings, num_trees=10): | |
| index = AnnoyIndex(embeddings.shape[1], 'angular') | |
| for i, emb in enumerate(embeddings): | |
| index.add_item(i, emb) | |
| index.build(num_trees) | |
| return index | |
| # Query Annoy index | |
| def retrieve_relevant_text(query, annoy_index, texts, top_k=3): | |
| query_embedding = embedder.encode([query])[0] | |
| indices = annoy_index.get_nns_by_vector(query_embedding, top_k) | |
| return [texts[i] for i in indices] | |
| # Function to create an Annoy index from the embeddings | |
| def create_annoy_index(embeddings, num_trees=10): | |
| index = AnnoyIndex(embeddings.shape[1], 'angular') # Using angular distance metric | |
| for i, emb in enumerate(embeddings): | |
| index.add_item(i, emb) | |
| index.build(num_trees) | |
| return index | |
| # Function to retrieve the most relevant text using Annoy | |
| def retrieve_relevant_text(query, annoy_index, texts, top_k=3): | |
| query_embedding = embedder.encode([query], convert_to_tensor=True) | |
| indices = annoy_index.get_nns_by_vector(query_embedding[0], top_k) | |
| return [texts[i] for i in indices] | |
| import os | |
| import fitz # PyMuPDF for PDF extraction | |
| import faiss # for efficient vector search | |
| import numpy as np | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline, RagTokenizer, RagRetriever, RagSequenceForGeneration | |
| from sentence_transformers import SentenceTransformer | |
| import streamlit as st | |
| # Load the pre-trained RAG model and tokenizer | |
| model_name = "facebook/rag-token-nq" # You can change this to a different open-source RAG model if needed | |
| tokenizer = RagTokenizer.from_pretrained(model_name) | |
| model = RagSequenceForGeneration.from_pretrained(model_name) | |
| # Initialize sentence transformer model for embeddings | |
| embedder = SentenceTransformer('all-MiniLM-L6-v2') | |
| # Function to extract text from a PDF file | |
| def extract_text_from_pdf(pdf_file): | |
| pdf_document = fitz.open(pdf_file) | |
| text = "" | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document.load_page(page_num) | |
| text += page.get_text("text") | |
| return text | |
| # Function to create embeddings from text data | |
| def create_embeddings(text_data): | |
| embeddings = embedder.encode(text_data, convert_to_tensor=True) | |
| return embeddings | |
| # Function to create a FAISS index from the embeddings | |
| def create_faiss_index(embeddings): | |
| index = faiss.IndexFlatL2(embeddings.shape[1]) # Using L2 distance metric | |
| index.add(embeddings) | |
| return index | |
| # Function to retrieve the most relevant text using FAISS | |
| def retrieve_relevant_text(query, faiss_index, texts, top_k=3): | |
| query_embedding = embedder.encode([query], convert_to_tensor=True) | |
| D, I = faiss_index.search(query_embedding, top_k) # D: distances, I: indices | |
| return [texts[i] for i in I[0]] | |
| # Main function to answer questions based on uploaded PDF | |
| def get_answer_from_pdf(pdf_file, query): | |
| # Step 1: Extract text from the uploaded PDF file | |
| document_text = extract_text_from_pdf(pdf_file) | |
| # Step 2: Split the document text into chunks (optional but recommended for large docs) | |
| text_chunks = document_text.split('\n') | |
| # Step 3: Create embeddings for each chunk of text | |
| embeddings = create_embeddings(text_chunks) | |
| # Step 4: Create a FAISS index for efficient retrieval | |
| faiss_index = create_faiss_index(embeddings) | |
| # Step 5: Retrieve relevant text from the document based on the query | |
| relevant_texts = retrieve_relevant_text(query, faiss_index, text_chunks) | |
| # Step 6: Combine the relevant text and pass it to the RAG model | |
| context = " ".join(relevant_texts) | |
| inputs = tokenizer([query], return_tensors="pt", padding=True, truncation=True) | |
| context_inputs = tokenizer(context, return_tensors="pt", padding=True, truncation=True) | |
| # Generate the answer | |
| outputs = model.generate(input_ids=inputs["input_ids"], context_input_ids=context_inputs["input_ids"]) | |
| answer = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| return answer | |
| # Streamlit UI | |
| def main(): | |
| st.title("RAG Application - PDF Q&A") | |
| # Upload PDF file | |
| uploaded_file = st.file_uploader("Upload a PDF Document", type="pdf") | |
| if uploaded_file is not None: | |
| # Ask a question from the uploaded PDF | |
| question = st.text_input("Ask a question based on the document:") | |
| if question: | |
| # Get the answer from the PDF document | |
| answer = get_answer_from_pdf(uploaded_file, question) | |
| # Display the answer | |
| st.write("Answer: ", answer) | |
| if __name__ == "__main__": | |
| main() | |