Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from pinecone import Pinecone, ServerlessSpec | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| import torch | |
| import PyPDF2 | |
| import io | |
| import os | |
| from tqdm import tqdm | |
| # Initialize models | |
| embeddings_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base") | |
| model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base") | |
| # Initialize Pinecone with environment variable | |
| PINECONE_API_KEY = os.getenv('a32ec76b-bb29-447c-8acf-72934513d1cd') | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| # Create index if it doesn't exist | |
| if 'pdf-index' not in pc.list_indexes().names(): | |
| pc.create_index( | |
| name='pdf-index', | |
| dimension=384, # dimension for 'all-MiniLM-L6-v2' | |
| metric='cosine', | |
| spec=ServerlessSpec( | |
| cloud='aws', | |
| region='us-east-1' | |
| ) | |
| ) | |
| # Connect to index | |
| index = pc.Index('pdf-index') | |
| # Function to extract text from the PDF file using PyPDF2 | |
| def process_pdf(file): | |
| # Get the file path from the 'file' attribute (Gradio passes file as a temporary file) | |
| pdf_path = file.name | |
| # Open the PDF file in read-binary mode | |
| with open(pdf_path, 'rb') as f: | |
| # Create a PdfReader object | |
| pdf_reader = PyPDF2.PdfReader(f) | |
| # Initialize an empty string to hold the extracted text | |
| pdf_content = "" | |
| # Loop through all pages in the PDF and extract text | |
| for page_num in range(len(pdf_reader.pages)): | |
| page = pdf_reader.pages[page_num] | |
| pdf_content += page.extract_text() # Extract text from each page | |
| return pdf_content | |
| pdf_file = io.BytesIO(pdf_content) | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| # Extract text from PDF | |
| text_chunks = [] | |
| for page in reader.pages: | |
| text = page.extract_text() | |
| # Split into smaller chunks (roughly 1000 characters each) | |
| chunks = [text[i:i+1000] for i in range(0, len(text), 1000)] | |
| text_chunks.extend(chunks) | |
| # Create embeddings and upload to Pinecone | |
| processed_chunks = 0 | |
| for i, chunk in enumerate(text_chunks): | |
| try: | |
| # Create embedding | |
| embedding = embeddings_model.encode(chunk) | |
| # Upload to Pinecone | |
| index.upsert( | |
| vectors=[( | |
| f"{file.name}_chunk_{i}", | |
| embedding.tolist(), | |
| { | |
| 'file_name': file.name, | |
| 'chunk_num': i, | |
| 'text': chunk | |
| } | |
| )] | |
| ) | |
| processed_chunks += 1 | |
| except Exception as e: | |
| print(f"Error processing chunk {i}: {str(e)}") | |
| return f"Successfully processed {processed_chunks} chunks from {file.name}" | |
| def process_multiple_pdfs(files): | |
| results = [] | |
| for file in files: | |
| result = process_pdf(file) | |
| results.append(result) | |
| return "\n".join(results) | |
| def search_documents(query): | |
| # Create embedding for the query | |
| query_embedding = embeddings_model.encode(query) | |
| # Search Pinecone | |
| results = index.query( | |
| vector=query_embedding.tolist(), | |
| top_k=3, | |
| include_metadata=True | |
| ) | |
| # Generate answer using FLAN-T5 | |
| context = "\n".join([match['metadata']['text'] for match in results['matches']]) | |
| prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:" | |
| inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True) | |
| outputs = model.generate( | |
| **inputs, | |
| max_length=512, | |
| num_beams=4, | |
| temperature=0.7, | |
| top_p=0.9 | |
| ) | |
| answer = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Format sources | |
| sources = [f"Source: {match['metadata']['file_name']}" for match in results['matches']] | |
| return answer, "\n".join(sources) | |
| # Create Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# PDF Document Search and Q&A") | |
| with gr.Tab("Upload Documents"): | |
| file_output = gr.File( | |
| file_count="multiple", | |
| label="Upload PDF Files" | |
| ) | |
| upload_button = gr.Button("Process PDFs") | |
| upload_output = gr.Textbox(label="Processing Results") | |
| with gr.Tab("Search and Ask"): | |
| query_input = gr.Textbox(label="Enter your question") | |
| search_button = gr.Button("Search") | |
| answer_output = gr.Textbox(label="Answer") | |
| sources_output = gr.Textbox(label="Sources") | |
| upload_button.click( | |
| process_multiple_pdfs, | |
| inputs=[file_output], | |
| outputs=[upload_output] | |
| ) | |
| search_button.click( | |
| search_documents, | |
| inputs=[query_input], | |
| outputs=[answer_output, sources_output] | |
| ) | |
| demo.launch() |