RAGtimeSearch / app.py
abakerdp's picture
Update app.py
8d6dc42 verified
import gradio as gr
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import PyPDF2
import io
import os
from tqdm import tqdm
# Initialize models
embeddings_model = SentenceTransformer('all-MiniLM-L6-v2')
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base")
model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base")
# Initialize Pinecone with environment variable
PINECONE_API_KEY = os.getenv('a32ec76b-bb29-447c-8acf-72934513d1cd')
pc = Pinecone(api_key=PINECONE_API_KEY)
# Create index if it doesn't exist
if 'pdf-index' not in pc.list_indexes().names():
pc.create_index(
name='pdf-index',
dimension=384, # dimension for 'all-MiniLM-L6-v2'
metric='cosine',
spec=ServerlessSpec(
cloud='aws',
region='us-east-1'
)
)
# Connect to index
index = pc.Index('pdf-index')
# Function to extract text from the PDF file using PyPDF2
def process_pdf(file):
# Get the file path from the 'file' attribute (Gradio passes file as a temporary file)
pdf_path = file.name
# Open the PDF file in read-binary mode
with open(pdf_path, 'rb') as f:
# Create a PdfReader object
pdf_reader = PyPDF2.PdfReader(f)
# Initialize an empty string to hold the extracted text
pdf_content = ""
# Loop through all pages in the PDF and extract text
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
pdf_content += page.extract_text() # Extract text from each page
return pdf_content
pdf_file = io.BytesIO(pdf_content)
reader = PyPDF2.PdfReader(pdf_file)
# Extract text from PDF
text_chunks = []
for page in reader.pages:
text = page.extract_text()
# Split into smaller chunks (roughly 1000 characters each)
chunks = [text[i:i+1000] for i in range(0, len(text), 1000)]
text_chunks.extend(chunks)
# Create embeddings and upload to Pinecone
processed_chunks = 0
for i, chunk in enumerate(text_chunks):
try:
# Create embedding
embedding = embeddings_model.encode(chunk)
# Upload to Pinecone
index.upsert(
vectors=[(
f"{file.name}_chunk_{i}",
embedding.tolist(),
{
'file_name': file.name,
'chunk_num': i,
'text': chunk
}
)]
)
processed_chunks += 1
except Exception as e:
print(f"Error processing chunk {i}: {str(e)}")
return f"Successfully processed {processed_chunks} chunks from {file.name}"
def process_multiple_pdfs(files):
results = []
for file in files:
result = process_pdf(file)
results.append(result)
return "\n".join(results)
def search_documents(query):
# Create embedding for the query
query_embedding = embeddings_model.encode(query)
# Search Pinecone
results = index.query(
vector=query_embedding.tolist(),
top_k=3,
include_metadata=True
)
# Generate answer using FLAN-T5
context = "\n".join([match['metadata']['text'] for match in results['matches']])
prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True)
outputs = model.generate(
**inputs,
max_length=512,
num_beams=4,
temperature=0.7,
top_p=0.9
)
answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
# Format sources
sources = [f"Source: {match['metadata']['file_name']}" for match in results['matches']]
return answer, "\n".join(sources)
# Create Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# PDF Document Search and Q&A")
with gr.Tab("Upload Documents"):
file_output = gr.File(
file_count="multiple",
label="Upload PDF Files"
)
upload_button = gr.Button("Process PDFs")
upload_output = gr.Textbox(label="Processing Results")
with gr.Tab("Search and Ask"):
query_input = gr.Textbox(label="Enter your question")
search_button = gr.Button("Search")
answer_output = gr.Textbox(label="Answer")
sources_output = gr.Textbox(label="Sources")
upload_button.click(
process_multiple_pdfs,
inputs=[file_output],
outputs=[upload_output]
)
search_button.click(
search_documents,
inputs=[query_input],
outputs=[answer_output, sources_output]
)
demo.launch()