|
|
import gradio as gr |
|
|
import PyPDF2 |
|
|
import io |
|
|
import time |
|
|
import os |
|
|
from together import Together |
|
|
import textwrap |
|
|
import tempfile |
|
|
|
|
|
def extract_text_from_pdf(pdf_file): |
|
|
"""Extract text from a PDF file""" |
|
|
text = "" |
|
|
try: |
|
|
|
|
|
if hasattr(pdf_file, 'read'): |
|
|
|
|
|
pdf_content = pdf_file.read() |
|
|
|
|
|
if hasattr(pdf_file, 'seek'): |
|
|
pdf_file.seek(0) |
|
|
else: |
|
|
|
|
|
pdf_content = pdf_file |
|
|
|
|
|
|
|
|
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content)) |
|
|
|
|
|
|
|
|
for page_num in range(len(pdf_reader.pages)): |
|
|
page_text = pdf_reader.pages[page_num].extract_text() |
|
|
if page_text: |
|
|
text += page_text + "\n\n" |
|
|
else: |
|
|
text += f"[Page {page_num+1} - No extractable text found]\n\n" |
|
|
|
|
|
if not text.strip(): |
|
|
return "No text could be extracted from the PDF. The document may be scanned or image-based." |
|
|
|
|
|
return text |
|
|
except Exception as e: |
|
|
return f"Error extracting text from PDF: {str(e)}" |
|
|
|
|
|
def format_chat_history(history): |
|
|
"""Format the chat history for display""" |
|
|
formatted_history = [] |
|
|
for user_msg, bot_msg in history: |
|
|
formatted_history.append((user_msg, bot_msg)) |
|
|
return formatted_history |
|
|
|
|
|
def chat_with_pdf(api_key, pdf_text, user_question, history): |
|
|
"""Chat with the PDF using Together API""" |
|
|
if not api_key.strip(): |
|
|
return history + [(user_question, "Error: Please enter your Together API key.")], history |
|
|
|
|
|
if not pdf_text.strip() or pdf_text.startswith("Error") or pdf_text.startswith("No text"): |
|
|
return history + [(user_question, "Error: Please upload a valid PDF file with extractable text first.")], history |
|
|
|
|
|
if not user_question.strip(): |
|
|
return history + [(user_question, "Error: Please enter a question.")], history |
|
|
|
|
|
try: |
|
|
|
|
|
client = Together(api_key=api_key) |
|
|
|
|
|
|
|
|
|
|
|
max_context_length = 10000 |
|
|
|
|
|
if len(pdf_text) > max_context_length: |
|
|
|
|
|
half_length = max_context_length // 2 |
|
|
pdf_context = pdf_text[:half_length] + "\n\n[...Content truncated due to length...]\n\n" + pdf_text[-half_length:] |
|
|
else: |
|
|
pdf_context = pdf_text |
|
|
|
|
|
system_message = f"""You are an intelligent assistant designed to read, understand, and extract information from PDF documents. |
|
|
Based on any question or query the user asks—whether it's about content, summaries, data extraction, definitions, insights, or interpretation—you will |
|
|
analyze the following PDF content and provide an accurate, helpful response grounded in the document. Always respond with clear, concise, and context-aware information. |
|
|
PDF CONTENT: |
|
|
{pdf_context} |
|
|
Answer the user's questions only based on the PDF content above. If the answer cannot be found in the PDF, politely state that the information is not available in the provided document.""" |
|
|
|
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": system_message}, |
|
|
] |
|
|
|
|
|
|
|
|
for h_user, h_bot in history: |
|
|
messages.append({"role": "user", "content": h_user}) |
|
|
messages.append({"role": "assistant", "content": h_bot}) |
|
|
|
|
|
|
|
|
messages.append({"role": "user", "content": user_question}) |
|
|
|
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", |
|
|
messages=messages, |
|
|
max_tokens=5000, |
|
|
temperature=0.7, |
|
|
) |
|
|
|
|
|
|
|
|
assistant_response = response.choices[0].message.content |
|
|
|
|
|
|
|
|
new_history = history + [(user_question, assistant_response)] |
|
|
|
|
|
return new_history, new_history |
|
|
|
|
|
except Exception as e: |
|
|
error_message = f"Error: {str(e)}" |
|
|
return history + [(user_question, error_message)], history |
|
|
|
|
|
def process_pdf(pdf_file, api_key_input): |
|
|
"""Process the uploaded PDF file""" |
|
|
if pdf_file is None: |
|
|
return "Please upload a PDF file.", "", [] |
|
|
|
|
|
try: |
|
|
|
|
|
file_name = os.path.basename(pdf_file.name) if hasattr(pdf_file, 'name') else "Uploaded PDF" |
|
|
|
|
|
|
|
|
pdf_text = extract_text_from_pdf(pdf_file) |
|
|
|
|
|
|
|
|
if pdf_text.startswith("Error extracting text from PDF"): |
|
|
return f"❌ {pdf_text}", "", [] |
|
|
|
|
|
if not pdf_text.strip() or pdf_text.startswith("No text could be extracted"): |
|
|
return f"⚠️ {pdf_text}", "", [] |
|
|
|
|
|
|
|
|
word_count = len(pdf_text.split()) |
|
|
|
|
|
|
|
|
status_message = f"✅ Successfully processed PDF: {file_name} ({word_count} words extracted)" |
|
|
|
|
|
|
|
|
return status_message, pdf_text, [] |
|
|
except Exception as e: |
|
|
return f"❌ Error processing PDF: {str(e)}", "", [] |
|
|
|
|
|
def validate_api_key(api_key): |
|
|
"""Simple validation for API key format""" |
|
|
if not api_key or not api_key.strip(): |
|
|
return "❌ API Key is required" |
|
|
|
|
|
if len(api_key.strip()) < 10: |
|
|
return "❌ API Key appears to be too short" |
|
|
|
|
|
return "✓ API Key format looks valid (not verified with server)" |
|
|
|
|
|
|
|
|
with gr.Blocks(title="ChatPDF with Together AI", theme=gr.themes.Ocean()) as app: |
|
|
gr.Markdown("# 📄 ChatPDF with Together AI") |
|
|
gr.Markdown("Upload a PDF and chat with it using the Llama-3.3-70B model.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
|
|
|
api_key_input = gr.Textbox( |
|
|
label="Together API Key", |
|
|
placeholder="Enter your Together API key here...", |
|
|
type="password" |
|
|
) |
|
|
|
|
|
|
|
|
api_key_status = gr.Textbox( |
|
|
label="API Key Status", |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
|
|
|
pdf_file = gr.File( |
|
|
label="Upload PDF", |
|
|
file_types=[".pdf"], |
|
|
type="binary" |
|
|
) |
|
|
|
|
|
|
|
|
process_button = gr.Button("Process PDF") |
|
|
|
|
|
|
|
|
status_message = gr.Textbox( |
|
|
label="Status", |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
|
|
|
pdf_text = gr.Textbox(visible=False) |
|
|
|
|
|
|
|
|
with gr.Accordion("PDF Content Preview", open=False): |
|
|
pdf_preview = gr.Textbox( |
|
|
label="Extracted Text Preview", |
|
|
interactive=False, |
|
|
max_lines=10, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
|
|
|
chatbot = gr.Chatbot( |
|
|
label="Chat with PDF", |
|
|
height=500, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
|
|
|
question = gr.Textbox( |
|
|
label="Ask a question about the PDF", |
|
|
placeholder="What is the main topic of this document?", |
|
|
lines=2 |
|
|
) |
|
|
|
|
|
|
|
|
submit_button = gr.Button("Submit Question") |
|
|
|
|
|
|
|
|
def update_preview(text): |
|
|
"""Update the preview with the first few lines of the PDF text""" |
|
|
if not text or text.startswith("Error") or text.startswith("No text"): |
|
|
return text |
|
|
|
|
|
|
|
|
preview = text[:500] |
|
|
if len(text) > 500: |
|
|
preview += "...\n[Text truncated for preview. Full text will be used for chat.]" |
|
|
return preview |
|
|
|
|
|
|
|
|
api_key_input.change( |
|
|
fn=validate_api_key, |
|
|
inputs=[api_key_input], |
|
|
outputs=[api_key_status] |
|
|
) |
|
|
|
|
|
process_button.click( |
|
|
fn=process_pdf, |
|
|
inputs=[pdf_file, api_key_input], |
|
|
outputs=[status_message, pdf_text, chatbot] |
|
|
).then( |
|
|
fn=update_preview, |
|
|
inputs=[pdf_text], |
|
|
outputs=[pdf_preview] |
|
|
) |
|
|
|
|
|
submit_button.click( |
|
|
fn=chat_with_pdf, |
|
|
inputs=[api_key_input, pdf_text, question, chatbot], |
|
|
outputs=[chatbot, chatbot] |
|
|
).then( |
|
|
fn=lambda: "", |
|
|
outputs=question |
|
|
) |
|
|
|
|
|
question.submit( |
|
|
fn=chat_with_pdf, |
|
|
inputs=[api_key_input, pdf_text, question, chatbot], |
|
|
outputs=[chatbot, chatbot] |
|
|
).then( |
|
|
fn=lambda: "", |
|
|
outputs=question |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.launch(share=True) |