PDF-Processor / app.py
167AliRaza's picture
Update app.py
54b1661 verified
import os
import pandas as pd
from pdf2image import convert_from_path
import pytesseract
import tempfile
import io
import gradio as gr
import google.generativeai as genai
from typing import List, Tuple
import time
import csv
# Configure Gemini API
def configure_gemini_api(api_key: str):
"""Configure the Gemini API with the provided key"""
genai.configure(api_key=api_key)
return "βœ… API Key configured successfully!"
def extract_text_from_pdf(pdf_file_path: str) -> str:
"""Extract text from PDF using OCR"""
try:
# Convert PDF to images
pages = convert_from_path(pdf_file_path)
all_text = ""
for i, page in enumerate(pages):
text = pytesseract.image_to_string(page)
all_text += text + "\n"
return all_text
except Exception as e:
return f"Error extracting text: {str(e)}"
def chunk_text(text: str, chunk_size: int = 500) -> List[str]: # Changed default to 500 for more chunks/MCQs
"""Split text into chunks for processing"""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size):
chunks.append(' '.join(words[i:i+chunk_size]))
return chunks
def generate_mcqs_from_chunk(chunk: str, api_key: str, chunk_number: int = 1, mcqs_per_chunk: int = 20) -> List[List[str]]: # Added mcqs_per_chunk param, default 20
"""Generate MCQs from a text chunk using Gemini API"""
print(f"\n=== PROCESSING CHUNK {chunk_number} ===")
print(f"Chunk length: {len(chunk)} characters")
print(f"Chunk preview: {chunk[:200]}...")
models_to_try = [
'gemini-2.0-flash-exp',
'gemini-1.5-flash',
'gemini-1.5-pro'
]
prompt = f"""
Generate exactly {mcqs_per_chunk} multiple choice questions from the following text.
Each question must have:
- A clear, specific question
- 4 options labeled A, B, C, D
- One correct answer (A, B, C, or D)
IMPORTANT: Do NOT include any headers or column names in your response.
Format each question as: Question,OptionA,OptionB,OptionC,OptionD,CorrectAnswer
Rules:
- Start directly with the first question, no headers
- Use commas only as field separators
- If any field contains a comma, wrap it in double quotes
- Each question should be on a new line
- Make questions specific and clear based on the text content
- Ensure all 4 options are plausible but only one is correct
- The correct answer should be A, B, C, or D only
Text to analyze:
{chunk}
"""
# Configure API
genai.configure(api_key=api_key)
mcq_data = []
response = None
for model_name in models_to_try:
try:
print(f"Trying model: {model_name}")
model = genai.GenerativeModel(model_name)
response = model.generate_content(prompt)
if response.text:
print(f"βœ… Successfully used model: {model_name}")
break
except Exception as e:
print(f"❌ Error with {model_name}: {e}")
continue
if response and response.text:
output = response.text.strip()
print(f"\n--- RAW AI RESPONSE FOR CHUNK {chunk_number} ---")
print(output)
print("--- END RAW RESPONSE ---\n")
lines = [line.strip() for line in output.splitlines() if line.strip()]
print(f"Total non-empty lines in response: {len(lines)}")
for idx, line in enumerate(lines):
print(f"Processing line {idx + 1}: {line[:100]}...")
# Skip any header lines that might still appear
if ('Question' in line and 'OptionA' in line and 'OptionB' in line) or line.startswith('Question,'):
print(f"❌ Skipped header line: {line[:50]}...")
continue
# Skip empty lines or lines that don't look like MCQs
if not line or line.count(',') < 5:
print(f"❌ Skipped invalid line (comma count: {line.count(',')}): {line[:50]}...")
continue
# Parse CSV line using proper CSV parsing
try:
# Use StringIO to parse the line as CSV
csv_reader = csv.reader([line])
parts = next(csv_reader)
print(f"Parsed parts: {len(parts)} fields")
# Ensure we have exactly 6 parts and the question is not empty
if len(parts) >= 6 and parts[0].strip() and not parts[0].lower().startswith('question'):
# Clean up each part
cleaned_parts = [part.strip() for part in parts[:6]]
# Validate that correct answer is A, B, C, or D
if cleaned_parts[5].upper() in ['A', 'B', 'C', 'D']:
mcq_data.append(cleaned_parts)
print(f"βœ… Added MCQ: {cleaned_parts[0][:50]}... (Answer: {cleaned_parts[5]})")
else:
print(f"❌ Invalid answer format: {cleaned_parts[5]}")
else:
print(f"❌ Invalid parts count or empty question. Parts: {len(parts)}, First part: '{parts[0] if parts else 'N/A'}'")
except csv.Error as e:
print(f"❌ CSV parsing error: {e}")
# Fallback to simple split if CSV parsing fails
parts = line.split(',')
if len(parts) >= 6 and parts[0].strip() and not parts[0].lower().startswith('question'):
cleaned_parts = [part.strip().strip('"') for part in parts[:6]]
if cleaned_parts[5].upper() in ['A', 'B', 'C', 'D']:
mcq_data.append(cleaned_parts)
print(f"βœ… Added MCQ (fallback): {cleaned_parts[0][:50]}...")
else:
print(f"❌ Invalid answer format (fallback): {cleaned_parts[5]}")
else:
print(f"❌ No response received for chunk {chunk_number}")
print(f"Generated {len(mcq_data)} MCQs from chunk {chunk_number}")
return mcq_data
def process_pdf_to_mcqs(pdf_file, api_key: str, chunk_size: int = 500, mcqs_per_chunk: int = 20, progress=gr.Progress()) -> Tuple[str, str]: # Added mcqs_per_chunk param, default 20
"""Main function to process PDF and generate MCQs"""
if not api_key:
return "❌ Please provide your Gemini API key", None
if not pdf_file:
return "❌ Please upload a PDF file", None
try:
# Extract text from PDF
progress(0.1, desc="Extracting text from PDF...")
extracted_text = extract_text_from_pdf(pdf_file.name)
if extracted_text.startswith("Error"):
return extracted_text, None
# Chunk the text
progress(0.2, desc="Chunking text...")
chunks = chunk_text(extracted_text, chunk_size)
if not chunks:
return "❌ No text could be extracted from the PDF", None
# Generate MCQs from each chunk
all_mcq_data = []
total_chunks = len(chunks)
for i, chunk in enumerate(chunks):
progress((0.2 + (i / total_chunks) * 0.7), desc=f"Processing chunk {i+1}/{total_chunks}...")
chunk_mcqs = generate_mcqs_from_chunk(chunk, api_key, i+1, mcqs_per_chunk)
all_mcq_data.extend(chunk_mcqs)
# Reduced delay to 0.5s for faster processing (to maximize MCQs, but monitor rate limits)
time.sleep(0.5)
progress(0.95, desc="Creating Excel file...")
if not all_mcq_data:
return "❌ No MCQs could be generated from the PDF content", None
# Remove any duplicate questions
seen_questions = set()
unique_mcq_data = []
for mcq in all_mcq_data:
question_text = mcq[0].lower().strip()
if question_text not in seen_questions:
seen_questions.add(question_text)
unique_mcq_data.append(mcq)
# Create DataFrame
df = pd.DataFrame(unique_mcq_data, columns=['Question', 'OptionA', 'OptionB', 'OptionC', 'OptionD', 'CorrectAnswer'])
# Create temporary Excel file for download
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx', mode='wb')
temp_file.close() # Close to allow pandas to write to it
# Write Excel file
with pd.ExcelWriter(temp_file.name, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='MCQs')
progress(1.0, desc="Complete!")
success_message = f"βœ… Successfully generated {len(unique_mcq_data)} unique MCQs from {total_chunks} text chunks ({mcqs_per_chunk} targeted per chunk)!"
return success_message, temp_file.name
except Exception as e:
return f"❌ Error processing PDF: {str(e)}", None
# Create Gradio interface
def create_interface():
with gr.Blocks(title="PDF to MCQ Generator", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# πŸ“š PDF to MCQ Generator
Upload a PDF document and generate multiple choice questions automatically using Google's Gemini AI.
## How to use:
1. Get your Gemini API key from [Google AI Studio](https://aistudio.google.com/app/apikey)
2. Enter your API key below
3. Upload your PDF file
4. Adjust chunk size if needed (smaller = more chunks/MCQs, but slower; default 500 for max MCQs)
5. Adjust MCQs per chunk (higher = more MCQs per chunk, but may hit API limits; default 20 for max)
6. Click "Generate MCQs" and wait for processing
7. Download the generated Excel file with your MCQs
"""
)
with gr.Row():
with gr.Column(scale=2):
api_key_input = gr.Textbox(
label="πŸ”‘ Gemini API Key",
placeholder="Enter your Gemini API key here...",
type="password"
)
pdf_input = gr.File(
label="πŸ“„ Upload PDF File",
file_types=[".pdf"]
)
chunk_size_input = gr.Slider(
minimum=300, # Lowered min to allow even smaller chunks
maximum=3000,
value=500, # Changed default to 500 for more chunks
step=100,
label="πŸ“ Chunk Size (words per processing batch)"
)
mcqs_per_chunk_input = gr.Slider(
minimum=5,
maximum=50, # Increased max for more MCQs per chunk
value=20, # New slider for MCQs per chunk, default 20
step=5,
label="πŸ”’ MCQs per Chunk (higher = more MCQs, but may increase failures)"
)
generate_btn = gr.Button(
"πŸš€ Generate MCQs",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
status_output = gr.Textbox(
label="πŸ“Š Status",
placeholder="Status updates will appear here...",
lines=10
)
download_file = gr.File(
label="⬇️ Download MCQs Excel File",
visible=False
)
# Event handlers
generate_btn.click(
fn=process_pdf_to_mcqs,
inputs=[pdf_input, api_key_input, chunk_size_input, mcqs_per_chunk_input],
outputs=[status_output, download_file],
show_progress=True
).then(
fn=lambda file_path: gr.update(visible=bool(file_path)) if file_path else gr.update(visible=False),
inputs=[download_file],
outputs=[download_file]
)
gr.Markdown(
"""
## πŸ“‹ Features:
- **OCR Text Extraction**: Converts PDF pages to images and extracts text
- **Smart Chunking**: Breaks large documents into manageable pieces (smaller chunks = more MCQs)
- **Configurable MCQs per Chunk**: Now adjustable up to 50 for maximum generation
- **Multiple AI Models**: Automatically tries different Gemini models for best results
- **Excel Output**: Download MCQs in a formatted Excel file
- **Progress Tracking**: Real-time updates on processing status
## ⚠️ Notes:
- To maximize MCQs: Use small chunk size (e.g., 300-500) and high MCQs per chunk (e.g., 20-50)
- Processing time depends on PDF length and settings (more MCQs = longer time)
- Large PDFs are processed in chunks to avoid timeouts
- Make sure your PDF contains readable text (not just images)
- API key is not stored and only used for your session
- Reduced delay between API calls for faster processing, but monitor for rate limits
"""
)
return demo
# Launch the app
if __name__ == "__main__":
demo = create_interface()
demo.launch(server_name="0.0.0.0", server_port=7860)