import gradio as gr from pinecone import Pinecone, ServerlessSpec from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModelForSeq2SeqLM import torch import PyPDF2 import io import os from tqdm import tqdm # Initialize models embeddings_model = SentenceTransformer('all-MiniLM-L6-v2') tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base") model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base") # Initialize Pinecone with environment variable PINECONE_API_KEY = os.getenv('a32ec76b-bb29-447c-8acf-72934513d1cd') pc = Pinecone(api_key=PINECONE_API_KEY) # Create index if it doesn't exist if 'pdf-index' not in pc.list_indexes().names(): pc.create_index( name='pdf-index', dimension=384, # dimension for 'all-MiniLM-L6-v2' metric='cosine', spec=ServerlessSpec( cloud='aws', region='us-east-1' ) ) # Connect to index index = pc.Index('pdf-index') # Function to extract text from the PDF file using PyPDF2 def process_pdf(file): # Get the file path from the 'file' attribute (Gradio passes file as a temporary file) pdf_path = file.name # Open the PDF file in read-binary mode with open(pdf_path, 'rb') as f: # Create a PdfReader object pdf_reader = PyPDF2.PdfReader(f) # Initialize an empty string to hold the extracted text pdf_content = "" # Loop through all pages in the PDF and extract text for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] pdf_content += page.extract_text() # Extract text from each page return pdf_content pdf_file = io.BytesIO(pdf_content) reader = PyPDF2.PdfReader(pdf_file) # Extract text from PDF text_chunks = [] for page in reader.pages: text = page.extract_text() # Split into smaller chunks (roughly 1000 characters each) chunks = [text[i:i+1000] for i in range(0, len(text), 1000)] text_chunks.extend(chunks) # Create embeddings and upload to Pinecone processed_chunks = 0 for i, chunk in enumerate(text_chunks): try: # Create embedding embedding = embeddings_model.encode(chunk) # Upload to Pinecone index.upsert( vectors=[( f"{file.name}_chunk_{i}", embedding.tolist(), { 'file_name': file.name, 'chunk_num': i, 'text': chunk } )] ) processed_chunks += 1 except Exception as e: print(f"Error processing chunk {i}: {str(e)}") return f"Successfully processed {processed_chunks} chunks from {file.name}" def process_multiple_pdfs(files): results = [] for file in files: result = process_pdf(file) results.append(result) return "\n".join(results) def search_documents(query): # Create embedding for the query query_embedding = embeddings_model.encode(query) # Search Pinecone results = index.query( vector=query_embedding.tolist(), top_k=3, include_metadata=True ) # Generate answer using FLAN-T5 context = "\n".join([match['metadata']['text'] for match in results['matches']]) prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:" inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True) outputs = model.generate( **inputs, max_length=512, num_beams=4, temperature=0.7, top_p=0.9 ) answer = tokenizer.decode(outputs[0], skip_special_tokens=True) # Format sources sources = [f"Source: {match['metadata']['file_name']}" for match in results['matches']] return answer, "\n".join(sources) # Create Gradio interface with gr.Blocks() as demo: gr.Markdown("# PDF Document Search and Q&A") with gr.Tab("Upload Documents"): file_output = gr.File( file_count="multiple", label="Upload PDF Files" ) upload_button = gr.Button("Process PDFs") upload_output = gr.Textbox(label="Processing Results") with gr.Tab("Search and Ask"): query_input = gr.Textbox(label="Enter your question") search_button = gr.Button("Search") answer_output = gr.Textbox(label="Answer") sources_output = gr.Textbox(label="Sources") upload_button.click( process_multiple_pdfs, inputs=[file_output], outputs=[upload_output] ) search_button.click( search_documents, inputs=[query_input], outputs=[answer_output, sources_output] ) demo.launch()