| | import faiss |
| | import numpy as np |
| | from sentence_transformers import SentenceTransformer |
| | import fitz |
| | from docx import Document |
| | from pptx import Presentation |
| | import gradio as gr |
| |
|
| | |
| | retrieve = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
| |
|
| | |
| | documents = [] |
| | doc_embeddings = [] |
| | index = None |
| |
|
| | |
| | def process_pdf(file_path): |
| | try: |
| | doc = fitz.open(file_path) |
| | text = "" |
| | for page_num in range(doc.page_count): |
| | text += doc[page_num].get_text() |
| | return text |
| | except Exception as e: |
| | return f"Error reading PDF: {e}" |
| |
|
| | |
| | def process_docx(file_path): |
| | try: |
| | doc = Document(file_path) |
| | text = "\n".join([para.text for para in doc.paragraphs]) |
| | return text |
| | except Exception as e: |
| | return f"Error reading DOCX: {e}" |
| |
|
| | |
| | def process_pptx(file_path): |
| | try: |
| | presentation = Presentation(file_path) |
| | text = "" |
| | for slide in presentation.slides: |
| | for shape in slide.shapes: |
| | if hasattr(shape, "text"): |
| | text += shape.text + "\n" |
| | return text |
| | except Exception as e: |
| | return f"Error reading PPTX: {e}" |
| |
|
| | |
| | def add_to_index(text): |
| | global index, doc_embeddings, documents |
| | if text.strip(): |
| | embedding = retrieve.encode([text])[0] |
| | doc_embeddings.append(embedding) |
| | documents.append(text) |
| | |
| | embeddings_matrix = np.array(doc_embeddings) |
| | index = faiss.IndexFlatL2(embeddings_matrix.shape[1]) |
| | index.add(embeddings_matrix) |
| |
|
| | |
| | def load_document(file_path): |
| | if file_path.endswith('.pdf'): |
| | text = process_pdf(file_path) |
| | elif file_path.endswith('.docx'): |
| | text = process_docx(file_path) |
| | elif file_path.endswith('.pptx'): |
| | text = process_pptx(file_path) |
| | else: |
| | return "Unsupported file format" |
| | |
| | if isinstance(text, str) and "Error" not in text: |
| | add_to_index(text) |
| | return "Document loaded and indexed successfully." |
| | return text |
| |
|
| | |
| | def retrieve_docs(query, k=2): |
| | if not index: |
| | return ["Index not initialized. Please upload and process a document first."] |
| | query_embedding = retrieve.encode([query]) |
| | distances, indices = index.search(np.array(query_embedding), k) |
| | results = [documents[i] for i in indices[0]] |
| | return results |
| |
|
| | |
| | def generate_response(retrieved_docs): |
| | if retrieved_docs: |
| | context = " ".join(retrieved_docs) |
| | response = f"Generated response based on retrieved docs:\n\n{context[:500]}..." |
| | return response |
| | return "No relevant documents found to generate a response." |
| |
|
| | |
| | def rag_application(query, file): |
| | |
| | if file: |
| | load_result = load_document(file.name) |
| | if "Error" in load_result: |
| | return load_result, "" |
| | |
| | |
| | retrieved_docs = retrieve_docs(query) |
| | docs_output = "\n".join([f"- {doc[:200]}..." for doc in retrieved_docs]) |
| |
|
| | |
| | response = generate_response(retrieved_docs) |
| | return docs_output, response |
| |
|
| | |
| | iface = gr.Interface( |
| | fn=rag_application, |
| | inputs=[ |
| | "text", |
| | "file" |
| | ], |
| | outputs=[ |
| | "text", |
| | "text" |
| | ], |
| | title="RAG Application with Single File Upload", |
| | description="Upload a PDF, DOCX, or PPTX file and ask questions. The RAG application retrieves relevant documents and generates a response." |
| | ) |
| |
|
| | iface.launch() |
| |
|