Spaces:
Sleeping
Sleeping
File size: 7,072 Bytes
cd7638d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
import gradio as gr
import uuid
import sqlite3
import json
import re
import PyPDF2
import numpy as np
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM
from sklearn.metrics.pairwise import cosine_similarity
# Local imports
from database1 import create_db
from first1 import pdf_query
from ans_generator1 import AnswerGenerator
import sqlite3, json
from q_generator1 import QGenerator
from transformers import pipeline
# Initialize models
qgen = QGenerator()
ansgen = AnswerGenerator()
# Load FLAN-T5 model
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base", use_fast=False)
model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-base")
qa_model = pipeline("text2text-generation", model=model, tokenizer=tokenizer)
# ✅ Upload and process PDF
# ✅ Updated version – supports multiple PDF files
def upload_pdf(files):
try:
messages = []
for file in files:
filename = file.name
token = str(uuid.uuid4())
pdf_reader = PyPDF2.PdfReader(file)
text = "".join([page.extract_text() or "" for page in pdf_reader.pages])
chunks = [text[i:i + 500] for i in range(0, len(text), 500)]
create_db(token, chunks, filename, text)
messages.append(f"✅ Uploaded and stored: {filename} (Token: {token})")
return "\n".join(messages)
except Exception as e:
return f"❌ Error: {str(e)}"
# Load QG and QA once
qgen = QGenerator()
qa_model = pipeline("text2text-generation", model="google/flan-t5-base")
def generate_qa(token):
try:
if not token:
return "⚠️ Please provide a token."
print("📥 Received Token:", token)
# Load chunk_data using token
with sqlite3.connect("my_database.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT chunk_data FROM token_data WHERE token_id = ?", (token,))
row = cursor.fetchone()
if not row:
print("❌ No data found for token in DB.")
return "❌ No data found for this token."
chunks = json.loads(row[0])
if not chunks:
print("⚠️ Chunk data is empty.")
return "⚠️ No content available in database for this PDF."
qa_pairs = []
for i, chunk in enumerate(chunks):
print(f"\n🔹 Processing chunk {i+1}/{len(chunks)}")
questions = qgen.generate(chunk)
print(f"🧠 Questions generated: {questions}")
if not questions:
print("⚠️ No questions generated for this chunk.")
continue
for question in questions[:2]: # Max 2 Qs per chunk
prompt = f"Context: {chunk}\n\nQuestion: {question}\n\nAnswer:"
print(f"➡️ Prompt:\n{prompt}")
try:
result = qa_model(prompt, max_length=256, do_sample=False)
print(f"⬅️ Raw model output: {result}")
if isinstance(result, list) and "generated_text" in result[0]:
answer = result[0]["generated_text"].strip()
elif isinstance(result, dict) and "answer" in result:
answer = result["answer"].strip()
else:
answer = "N/A"
print(f"✅ Final Answer: {answer}")
qa_pairs.append(f"Q: {question}\nA: {answer}")
except Exception as e:
print(f"❌ QA model failed: {e}")
continue
if not qa_pairs:
print("⚠️ No Q&A pairs generated.")
return "⚠️ No Q&A pairs generated."
print("✅ Final Q&A generated successfully.")
return "\n\n".join(qa_pairs)
except Exception as e:
print(f"🔥 Exception in generate_qa(): {e}")
return f"❌ Error: {str(e)}"
# ✅ Ask question using token (semantic similarity)
def ask_question(token, question):
try:
with sqlite3.connect("my_database.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT chunk_data FROM token_data WHERE token_id = ?", (token,))
row = cursor.fetchone()
if not row:
return "❌ Token not found."
chunks = json.loads(row[0])
processor = pdf_query()
model = processor.model
clean_chunks = [re.sub(r'\s+', ' ', c.strip()) for c in chunks if c.strip()]
if not clean_chunks:
return "⚠️ No valid content found in PDF."
chunk_embeddings = model.encode(clean_chunks)
q_embedding = model.encode([question])
scores = cosine_similarity(q_embedding, chunk_embeddings)[0]
top_index = int(np.argmax(scores))
top_score = float(scores[top_index])
best_text = clean_chunks[top_index]
return f"Q: {question}\nA: {best_text}\nScore: {round(top_score, 3)}"
except Exception as e:
return f"❌ Error: {str(e)}"
# ✅ Gradio UI
with gr.Blocks(theme="default") as demo:
gr.Markdown(
"""
<div style='text-align: center; padding: 1rem;'>
<h1 style='color: #3b82f6;'>📄 AI-Powered PDF Q&A System</h1>
<p style='font-size: 1.1rem;'>Upload your PDFs, generate smart questions, and get intelligent answers.</p>
</div>
"""
)
with gr.Tab("📤 1. Upload PDF"):
gr.Markdown("### 🗂 Upload a PDF File")
file = gr.File(label="Upload one or more PDFs", file_types=[".pdf"], file_count="multiple")
upload_out = gr.Textbox(label="Upload Result", interactive=False)
file.change(fn=upload_pdf, inputs=file, outputs=upload_out)
with gr.Blocks(title="PDF Q&A Generator") as demo:
with gr.Tab("🧠 2. Generate Questions & Answers"):
gr.Markdown("### 🤖 Generate Questions and Answers from Uploaded PDF")
fname = gr.Textbox(label="📄 Enter Uploaded Filename", placeholder="example.pdf")
output_box = gr.Textbox(label="📝 Generated Q&A", lines=15, interactive=False)
gr.Button("🚀 Generate Q&A").click(fn=generate_qa, inputs=fname, outputs=output_box)
with gr.Tab("❓ 3. Ask a Question"):
gr.Markdown("### 💬 Ask a question based on uploaded PDF")
token_box = gr.Textbox(label="Token ID", placeholder="e.g., 123e4567-e89b-12d3-a456...")
question_box = gr.Textbox(label="Type your question", placeholder="What is the main topic discussed?")
answer_result = gr.Textbox(label="Answer Output", lines=6, interactive=False)
gr.Button("🎯 Get Answer").click(fn=ask_question, inputs=[token_box, question_box], outputs=answer_result)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)
|