SimranShaikh's picture
commit
52bcdc8 verified
# app.py - Main Hugging Face Spaces Application
import gradio as gr
import PyPDF2
import pdfplumber
import fitz # PyMuPDF
import pandas as pd
import re
import logging
import os
import tempfile
from typing import Dict, List, Tuple, Optional
from pathlib import Path
import json
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PDFProcessorError(Exception):
"""Custom exception for PDF processing errors"""
pass
def enhanced_pdf_processor(file_path: str) -> Dict:
"""
Enhanced PDF processor for Hugging Face deployment
"""
results = {
'text': '',
'tables': [],
'metadata': {},
'extraction_method': 'unknown',
'success': False,
'error': None,
'file_info': {},
'summary': ''
}
try:
# Validate file
if not os.path.exists(file_path):
results['error'] = f"File does not exist: {file_path}"
return results
# Get file info
results['file_info'] = get_file_info(file_path)
# Try different extraction methods
extraction_methods = [
('PyMuPDF', extract_with_pymupdf),
('pdfplumber', extract_with_pdfplumber),
('PyPDF2', extract_with_pypdf2)
]
for method_name, method_func in extraction_methods:
try:
logger.info(f"Trying extraction method: {method_name}")
if method_name == 'pdfplumber':
text_result, tables = method_func(file_path)
if text_result and len(text_result.strip()) > 10:
results['text'] = text_result
results['tables'] = tables
results['extraction_method'] = method_name
results['success'] = True
break
elif method_name == 'PyMuPDF':
text_result, metadata = method_func(file_path)
if text_result and len(text_result.strip()) > 10:
results['text'] = text_result
results['metadata'] = metadata
results['extraction_method'] = method_name
results['success'] = True
break
else: # PyPDF2
text_result = method_func(file_path)
if text_result and len(text_result.strip()) > 10:
results['text'] = text_result
results['extraction_method'] = method_name
results['success'] = True
break
except Exception as e:
logger.warning(f"{method_name} failed: {str(e)}")
continue
# Generate summary if successful
if results['success']:
results['summary'] = generate_document_summary(results['text'])
else:
results['error'] = "All extraction methods failed"
except Exception as e:
results['error'] = f"Processing error: {str(e)}"
logger.error(f"PDF processing error: {e}")
return results
def extract_with_pypdf2(file_path: str) -> str:
"""Extract text using PyPDF2"""
text = ""
try:
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
if reader.is_encrypted:
try:
reader.decrypt("")
except:
raise PDFProcessorError("PDF is encrypted")
for page_num, page in enumerate(reader.pages):
try:
page_text = page.extract_text()
if page_text:
text += f"\n--- Page {page_num + 1} ---\n{page_text}\n"
except Exception as e:
logger.warning(f"Failed to extract page {page_num + 1}: {e}")
return clean_text(text)
except Exception as e:
raise PDFProcessorError(f"PyPDF2 extraction failed: {e}")
def extract_with_pdfplumber(file_path: str) -> Tuple[str, List[Dict]]:
"""Extract text and tables using pdfplumber"""
text = ""
tables = []
try:
with pdfplumber.open(file_path) as pdf:
for page_num, page in enumerate(pdf.pages):
try:
# Extract text
page_text = page.extract_text()
if page_text:
text += f"\n--- Page {page_num + 1} ---\n{page_text}\n"
# Extract tables
page_tables = page.extract_tables()
for table_num, table in enumerate(page_tables):
if table and len(table) > 1:
tables.append({
'page': page_num + 1,
'table_number': table_num + 1,
'data': table,
'text_representation': table_to_text(table)
})
except Exception as e:
logger.warning(f"Failed to process page {page_num + 1}: {e}")
return clean_text(text), tables
except Exception as e:
raise PDFProcessorError(f"pdfplumber extraction failed: {e}")
def extract_with_pymupdf(file_path: str) -> Tuple[str, Dict]:
"""Extract text using PyMuPDF"""
text = ""
metadata = {}
try:
doc = fitz.open(file_path)
# Extract metadata
try:
doc_metadata = doc.metadata or {}
metadata = {
'page_count': doc.page_count,
'title': doc_metadata.get('title', ''),
'author': doc_metadata.get('author', ''),
'subject': doc_metadata.get('subject', ''),
'creator': doc_metadata.get('creator', ''),
'creation_date': doc_metadata.get('creationDate', '')
}
except Exception as e:
metadata = {'page_count': doc.page_count}
# Extract text
for page_num in range(doc.page_count):
try:
page = doc[page_num]
page_text = page.get_text()
if page_text:
text += f"\n--- Page {page_num + 1} ---\n{page_text}\n"
except Exception as e:
logger.warning(f"Failed to extract page {page_num + 1}: {e}")
doc.close()
return clean_text(text), metadata
except Exception as e:
raise PDFProcessorError(f"PyMuPDF extraction failed: {e}")
def clean_text(text: str) -> str:
"""Clean extracted text"""
if not text:
return ""
# Remove excessive whitespace
text = re.sub(r'\n\s*\n', '\n\n', text)
text = re.sub(r' +', ' ', text)
# Remove problematic characters
text = text.replace('\ufffd', '')
text = text.replace('\x00', '')
text = text.replace('\u200b', '')
return text.strip()
def table_to_text(table: List[List]) -> str:
"""Convert table to text"""
if not table:
return ""
text_lines = []
for row in table:
if row:
clean_row = [str(cell).strip() if cell else "" for cell in row]
if any(clean_row):
text_lines.append(" | ".join(clean_row))
return "\n".join(text_lines)
def get_file_info(file_path: str) -> Dict:
"""Get file information"""
try:
path = Path(file_path)
stat = path.stat()
return {
'name': path.name,
'size': stat.st_size,
'size_mb': round(stat.st_size / (1024 * 1024), 2)
}
except Exception:
return {}
def generate_document_summary(text: str) -> str:
"""Generate a simple document summary"""
if not text:
return "No text extracted"
# Basic statistics
words = len(text.split())
lines = len(text.split('\n'))
chars = len(text)
# Extract first few sentences for preview
sentences = re.split(r'[.!?]+', text)
preview = '. '.join(sentences[:3]).strip()
if len(preview) > 300:
preview = preview[:300] + "..."
return f"""
Document Statistics:
- Characters: {chars:,}
- Words: {words:,}
- Lines: {lines:,}
Preview:
{preview}
"""
def process_pdf_file(file) -> Tuple[str, str, str, str]:
"""
Process uploaded PDF file for Gradio interface
"""
if file is None:
return "No file uploaded", "", "", ""
try:
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(file.read())
tmp_file_path = tmp_file.name
# Process the PDF
result = enhanced_pdf_processor(tmp_file_path)
# Clean up
os.unlink(tmp_file_path)
if result['success']:
# Format results for display
status = f"✅ Successfully processed using {result['extraction_method']}"
# File info
file_info = result.get('file_info', {})
info = f"""
File: {file_info.get('name', 'Unknown')}
Size: {file_info.get('size_mb', 0)} MB
Pages: {result.get('metadata', {}).get('page_count', 'Unknown')}
"""
# Summary
summary = result.get('summary', 'No summary available')
# Full text (truncated for display)
full_text = result['text']
if len(full_text) > 5000:
display_text = full_text[:5000] + f"\n\n... (Text truncated. Total length: {len(full_text)} characters)"
else:
display_text = full_text
# Tables info
if result['tables']:
tables_info = f"\n\nTables found: {len(result['tables'])}"
for i, table in enumerate(result['tables'][:3]): # Show first 3 tables
tables_info += f"\n\nTable {i+1} (Page {table['page']}):\n"
tables_info += table['text_representation'][:500]
if len(table['text_representation']) > 500:
tables_info += "..."
display_text += tables_info
return status, info, summary, display_text
else:
error_msg = result.get('error', 'Unknown error')
return f"❌ Processing failed: {error_msg}", "", "", ""
except Exception as e:
return f"❌ Error: {str(e)}", "", "", ""
def answer_question(text: str, question: str) -> str:
"""
Simple keyword-based question answering
"""
if not text or not question:
return "Please provide both text and a question."
# Convert to lowercase for searching
text_lower = text.lower()
question_lower = question.lower()
# Extract keywords from question
keywords = [word for word in question_lower.split() if len(word) > 3]
# Find relevant sentences
sentences = re.split(r'[.!?]+', text)
relevant_sentences = []
for sentence in sentences:
sentence_lower = sentence.lower()
score = sum(1 for keyword in keywords if keyword in sentence_lower)
if score > 0:
relevant_sentences.append((sentence.strip(), score))
# Sort by relevance and take top 3
relevant_sentences.sort(key=lambda x: x[1], reverse=True)
top_sentences = [sent[0] for sent in relevant_sentences[:3]]
if top_sentences:
return f"Based on the document, here are the most relevant sections:\n\n" + "\n\n".join(top_sentences)
else:
return "I couldn't find information related to your question in the document."
# Global variable to store extracted text
extracted_text = ""
def update_extracted_text(status, info, summary, full_text):
"""Update global extracted text variable"""
global extracted_text
extracted_text = full_text
return status, info, summary, full_text
def qa_interface(question):
"""Interface for question answering"""
global extracted_text
return answer_question(extracted_text, question)
# Create Gradio interface
with gr.Blocks(title="PDF Processor & Q&A System") as app:
gr.Markdown("# 📄 PDF Processor & Question Answering System")
gr.Markdown("Upload a PDF file to extract text and ask questions about its content.")
with gr.Tab("PDF Processing"):
with gr.Row():
with gr.Column():
file_input = gr.File(label="Upload PDF", file_types=[".pdf"])
process_btn = gr.Button("Process PDF", variant="primary")
with gr.Column():
status_output = gr.Textbox(label="Status", lines=2)
info_output = gr.Textbox(label="File Information", lines=4)
summary_output = gr.Textbox(label="Document Summary", lines=8)
text_output = gr.Textbox(label="Extracted Text", lines=15, max_lines=20)
with gr.Tab("Question & Answer"):
gr.Markdown("Ask questions about the processed PDF content.")
with gr.Row():
question_input = gr.Textbox(label="Your Question", placeholder="What is this document about?")
ask_btn = gr.Button("Ask Question", variant="primary")
answer_output = gr.Textbox(label="Answer", lines=8)
# Event handlers
process_btn.click(
fn=process_pdf_file,
inputs=[file_input],
outputs=[status_output, info_output, summary_output, text_output]
).then(
fn=update_extracted_text,
inputs=[status_output, info_output, summary_output, text_output],
outputs=[status_output, info_output, summary_output, text_output]
)
ask_btn.click(
fn=qa_interface,
inputs=[question_input],
outputs=[answer_output]
)
# Example
gr.Examples(
examples=[
["What is the main topic of this document?"],
["What are the key findings?"],
["Who are the authors?"],
["What is the conclusion?"]
],
inputs=[question_input]
)
if __name__ == "__main__":
app.launch()